Skip to content

Commit

Permalink
Partially revert #294 (#353)
Browse files Browse the repository at this point in the history
#294 tries to improve field extraction however it doesn't seem to be
working. This PR logically reverts the PR and temporary brings back the
old behavior.

See #352
  • Loading branch information
costin committed Nov 4, 2022
1 parent 9587855 commit 792d614
Show file tree
Hide file tree
Showing 13 changed files with 297 additions and 66 deletions.
Expand Up @@ -11,7 +11,7 @@
/**
* Block implementation that stores a constant integer value.
*/
public final class ConstantIntBlock extends Block {
public class ConstantIntBlock extends Block {

private final int value;

Expand Down Expand Up @@ -41,6 +41,37 @@ public Object getObject(int position) {
return getInt(position);
}

@Override
public Block getRow(int position) {
Block curr = this;
return new ConstantIntBlock(value, 1) {
@Override
public int getInt(int ignored) {
return curr.getInt(position);
}

@Override
public long getLong(int ignored) {
return curr.getLong(position);
}

@Override
public double getDouble(int ignored) {
return curr.getDouble(position);
}

@Override
public Object getObject(int ignored) {
return curr.getObject(position);
}

@Override
public String toString() {
return "only-position " + position + ": " + curr;
}
};
}

@Override
public String toString() {
return "ConstantIntBlock{positions=" + getPositionCount() + ", value=" + value + '}';
Expand Down
19 changes: 19 additions & 0 deletions server/src/main/java/org/elasticsearch/compute/data/Page.java
Expand Up @@ -100,6 +100,25 @@ public Page appendBlock(Block block) {
return new Page(false, positionCount, newBlocks);
}

/**
* Creates a new page, replacing a block at the given index with a new block.
*
* @param blockIndex the index of the block to replace
* @param block the replacement block
* @return a new Page with the block replaced
* @throws IllegalArgumentException if the given block does not have the same number of
* positions as the blocks in this Page
*/
public Page replaceBlock(int blockIndex, Block block) {
if (positionCount != block.getPositionCount()) {
throw new IllegalArgumentException("Block does not have same position count");
}

Block[] newBlocks = Arrays.copyOf(blocks, blocks.length);
newBlocks[blockIndex] = block;
return new Page(false, positionCount, newBlocks);
}

@Override
public String toString() {
return "Page{" + "blocks=" + Arrays.toString(blocks) + '}';
Expand Down
Expand Up @@ -14,8 +14,8 @@
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.SortedNumericDocValues;
import org.elasticsearch.compute.Experimental;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.ConstantIntBlock;
import org.elasticsearch.compute.data.IntArrayBlock;
import org.elasticsearch.compute.data.LongArrayBlock;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.Operator;
Expand Down Expand Up @@ -90,7 +90,7 @@ public boolean needsInput() {

@Override
public void addInput(Page page) {
IntArrayBlock docs = (IntArrayBlock) page.getBlock(docChannel);
Block docs = page.getBlock(docChannel);
ConstantIntBlock leafOrd = (ConstantIntBlock) page.getBlock(leafOrdChannel);
ConstantIntBlock shardOrd = (ConstantIntBlock) page.getBlock(shardChannel);

Expand Down
Expand Up @@ -43,7 +43,7 @@ public Page getOutput() {
for (int i = 0; i < block.getPositionCount(); i++) {
newBlock[i] = doubleTransformer.apply(block.getLong(i));
}
Page lastPage = lastInput.appendBlock(new DoubleArrayBlock(newBlock, block.getPositionCount()));
Page lastPage = lastInput.replaceBlock(channel, new DoubleArrayBlock(newBlock, block.getPositionCount()));
lastInput = null;
return lastPage;
}
Expand Down
Expand Up @@ -53,4 +53,13 @@ public void testAppend() {
IntStream.range(0, 10).forEach(i -> assertThat((long) i, is(block2.getLong(i))));
}

public void testReplace() {
Page page1 = new Page(new IntArrayBlock(IntStream.range(0, 10).toArray(), 10));
Page page2 = page1.replaceBlock(0, new LongArrayBlock(LongStream.range(0, 10).toArray(), 10));
assertThat(1, is(page1.getBlockCount()));
assertThat(1, is(page2.getBlockCount()));
Block block = page2.getBlock(0);
IntStream.range(0, 10).forEach(i -> assertThat((long) i, is(block.getLong(i))));
}

}
Expand Up @@ -42,7 +42,9 @@
import java.util.stream.LongStream;

import static org.elasticsearch.test.ESIntegTestCase.Scope.SUITE;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasItem;

@Experimental
@ESIntegTestCase.ClusterScope(scope = SUITE, numDataNodes = 1, numClientNodes = 0, supportsDedicatedMasters = false)
Expand All @@ -58,7 +60,18 @@ public void setupIndex() {
.indices()
.prepareCreate("test")
.setSettings(Settings.builder().put("index.number_of_shards", ESTestCase.randomIntBetween(1, 5)))
.setMapping("time", "type=date")
.setMapping(
"data",
"type=long",
"data_d",
"type=double",
"count",
"type=long",
"count_d",
"type=double",
"time",
"type=date"
)
.get()
);
long timestamp = epoch;
Expand Down Expand Up @@ -222,6 +235,12 @@ public void testFrom() {
EsqlQueryResponse results = run("from test");
logger.info(results);
Assert.assertEquals(40, results.values().size());
assertThat(results.columns(), hasItem(equalTo(new ColumnInfo("count", "long"))));
assertThat(results.columns(), hasItem(equalTo(new ColumnInfo("count_d", "double"))));
assertThat(results.columns(), hasItem(equalTo(new ColumnInfo("data", "long"))));
assertThat(results.columns(), hasItem(equalTo(new ColumnInfo("data_d", "double"))));
assertThat(results.columns(), hasItem(equalTo(new ColumnInfo("time", "date"))));
// TODO: we have some extra internal columns as well (_doc_id, ...) that we should drop
}

public void testFromSortLimit() {
Expand Down Expand Up @@ -254,7 +273,7 @@ public void testFromEvalStats() {
assertEquals("avg(ratio)", results.columns().get(0).name());
assertEquals("double", results.columns().get(0).type());
assertEquals(1, results.values().get(0).size());
assertEquals(0.96d, (double) results.values().get(0).get(0), 0.01d);
assertEquals(0.03d, (double) results.values().get(0).get(0), 0.01d);
}

public void testFromStatsEvalWithPragma() {
Expand Down
Expand Up @@ -11,8 +11,10 @@
import org.elasticsearch.compute.Experimental;
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
import org.elasticsearch.xpack.esql.plan.physical.EvalExec;
import org.elasticsearch.xpack.esql.plan.physical.ExchangeExec;
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
import org.elasticsearch.xpack.esql.plan.physical.FilterExec;
import org.elasticsearch.xpack.esql.plan.physical.LimitExec;
import org.elasticsearch.xpack.esql.plan.physical.OrderExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
Expand All @@ -21,7 +23,9 @@
import org.elasticsearch.xpack.esql.session.EsqlConfiguration;
import org.elasticsearch.xpack.ql.expression.Attribute;
import org.elasticsearch.xpack.ql.expression.Expression;
import org.elasticsearch.xpack.ql.expression.Expressions;
import org.elasticsearch.xpack.ql.expression.FieldAttribute;
import org.elasticsearch.xpack.ql.expression.NamedExpression;
import org.elasticsearch.xpack.ql.optimizer.OptimizerRules;
import org.elasticsearch.xpack.ql.rule.Rule;
import org.elasticsearch.xpack.ql.rule.RuleExecutor;
Expand All @@ -31,6 +35,7 @@
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.stream.Stream;

@Experimental
public class PhysicalPlanOptimizer extends RuleExecutor<PhysicalPlan> {
Expand Down Expand Up @@ -58,9 +63,22 @@ protected Iterable<RuleExecutor<PhysicalPlan>.Batch> batches() {
batches.add(new Batch("Create topN", Limiter.ONCE, new CreateTopN()));
batches.add(new Batch("Split nodes", Limiter.ONCE, new SplitAggregate(), new SplitTopN()));
batches.add(new Batch("Add exchange", Limiter.ONCE, new AddExchangeOnSingleNodeSplit()));

batches.add(
new Batch(
"Move FieldExtract upwards",
new FieldExtractPastEval(),
new FieldExtractPastFilter(),
new FieldExtractPastLimit(),
new FieldExtractPastTopN(),
new FieldExtractPastAggregate(),
new FieldExtractPastExchange(),
new EmptyFieldExtractRemoval()
)
);
// TODO: Needs another project at the end - depends on https://github.com/elastic/elasticsearch-internal/issues/293
Batch fieldExtract = new Batch("Lazy field loading", Limiter.ONCE, new AddFieldExtraction());
batches.add(fieldExtract);
// Batch fieldExtract = new Batch("Lazy field loading", Limiter.ONCE, new AddFieldExtraction());
// batches.add(fieldExtract);

// TODO: add rule to prune _doc_id, _segment_id, _shard_id at the top
// Batch addProject = new Batch("Add project", new AddProjectWhenInternalFieldNoLongerNeeded());
Expand All @@ -71,6 +89,89 @@ protected Iterable<RuleExecutor<PhysicalPlan>.Batch> batches() {
return batches;
}

private static class FieldExtractPastEval extends OptimizerRule<EvalExec> {
@Override
protected PhysicalPlan rule(EvalExec eval) {
if (eval.child()instanceof FieldExtractExec fieldExtractExec) {
// If you have an ExtractFieldNode below an EvalNode,
// only extract the things that the eval needs, and extract the rest above eval
return possiblySplitExtractFieldNode(eval, eval.fields(), fieldExtractExec, true);
}
return eval;
}
}

private static class FieldExtractPastFilter extends OptimizerRule<FilterExec> {
@Override
protected PhysicalPlan rule(FilterExec filterExec) {
if (filterExec.child()instanceof FieldExtractExec fieldExtractExec) {
// If you have an ExtractFieldNode below an FilterNode,
// only extract the things that the filter needs, and extract the rest above filter
return possiblySplitExtractFieldNode(
filterExec,
List.of(Expressions.wrapAsNamed(filterExec.condition())),
fieldExtractExec,
true
);
}
return filterExec;
}
}

private static class FieldExtractPastExchange extends OptimizerRule<ExchangeExec> {
protected PhysicalPlan rule(ExchangeExec exchangeExec) {
if (exchangeExec.child()instanceof FieldExtractExec fieldExtractExec) {
// TODO: Once we go distributed, we can't do this
return possiblySplitExtractFieldNode(exchangeExec, List.of(), fieldExtractExec, true);
}
return exchangeExec;
}
}

private static class FieldExtractPastAggregate extends OptimizerRule<AggregateExec> {
protected PhysicalPlan rule(AggregateExec aggregateExec) {
if (aggregateExec.child()instanceof FieldExtractExec fieldExtractExec) {
// If you have an ExtractFieldNode below an Aggregate,
// only extract the things that the aggregate needs, and extract the rest above eval
List<? extends NamedExpression> namedExpressions = Stream.concat(
aggregateExec.aggregates().stream(),
aggregateExec.groupings().stream().map(Expressions::wrapAsNamed)
).toList();
return possiblySplitExtractFieldNode(aggregateExec, namedExpressions, fieldExtractExec, false);
}
return aggregateExec;
}
}

private static class FieldExtractPastLimit extends OptimizerRule<LimitExec> {
@Override
protected PhysicalPlan rule(LimitExec limitExec) {
if (limitExec.child()instanceof FieldExtractExec fieldExtractExec) {
return possiblySplitExtractFieldNode(
limitExec,
List.of(Expressions.wrapAsNamed(limitExec.limit())),
fieldExtractExec,
true
);
}
return limitExec;
}
}

private static class FieldExtractPastTopN extends OptimizerRule<TopNExec> {
@Override
protected PhysicalPlan rule(TopNExec topNExec) {
if (topNExec.child()instanceof FieldExtractExec fieldExtractExec) {
List<? extends NamedExpression> namedExpressions = Stream.concat(
topNExec.order().stream().map(Expressions::wrapAsNamed),
Stream.of(topNExec.getLimit()).map(Expressions::wrapAsNamed)
).toList();
return possiblySplitExtractFieldNode(topNExec, namedExpressions, fieldExtractExec, true);
}
return topNExec;
}
}

static class AddFieldExtraction extends OptimizerRule<UnaryExec> {

// start from the source upwards
Expand Down Expand Up @@ -100,13 +201,58 @@ protected PhysicalPlan rule(UnaryExec plan) {

// ignore exchanges
if (missing.isEmpty() == false) {
plan = plan.replaceChild(new FieldExtractExec(plan.source(), plan.child(), missing));
// plan = plan.replaceChild(new FieldExtractExec(plan.source(), plan.child(), missing));
}

return plan;
}
}

private static UnaryExec possiblySplitExtractFieldNode(
UnaryExec parent,
List<? extends NamedExpression> namedExpressions,
FieldExtractExec fieldExtractExec,
boolean preserveUnused
) {
List<Attribute> attributesToKeep = new ArrayList<>();
List<Attribute> attributesToMoveUp = new ArrayList<>();
outer: for (Attribute fieldExtractAttribute : fieldExtractExec.attributesToExtract()) {
if (namedExpressions.stream().anyMatch(ne -> ne.anyMatch(e -> e.semanticEquals(fieldExtractAttribute)))) {
attributesToKeep.add(fieldExtractAttribute);
} else {
if (preserveUnused) {
attributesToMoveUp.add(fieldExtractAttribute);
}
}
}
if (attributesToKeep.size() == fieldExtractExec.attributesToExtract().size()) {
return parent;
}
return new FieldExtractExec(
fieldExtractExec.source(),
parent.replaceChild(
new FieldExtractExec(
fieldExtractExec.source(),
fieldExtractExec.child(),
attributesToKeep,
fieldExtractExec.sourceAttributes()
)
),
attributesToMoveUp,
fieldExtractExec.sourceAttributes()
);
}

private static class EmptyFieldExtractRemoval extends OptimizerRule<FieldExtractExec> {
@Override
protected PhysicalPlan rule(FieldExtractExec fieldExtractExec) {
if (fieldExtractExec.attributesToExtract().isEmpty()) {
return fieldExtractExec.child();
}
return fieldExtractExec;
}
}

private static class SplitAggregate extends OptimizerRule<AggregateExec> {

@Override
Expand Down

0 comments on commit 792d614

Please sign in to comment.