Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ESQL: Fix synthetic attribute pruning #111413

Merged
merged 51 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
205dfad
Use AggregatorMode instead of AggregateExec.Mode
alex-spies Jul 29, 2024
1cabef2
Make AggregateExec track intermediateAttributes
alex-spies Jul 29, 2024
54e0436
Implement QueryPlan.requiredInputSet
alex-spies Jul 29, 2024
890701e
Simplify and correct ProjectAwayColumns
alex-spies Jul 29, 2024
bb0c926
Small refactor
alex-spies Jul 29, 2024
f22f3a1
Squash some minor mistakes
alex-spies Jul 29, 2024
dbc9199
Update tests
alex-spies Jul 29, 2024
741dbd7
Simplify DependencyConsistency
alex-spies Jul 30, 2024
9e4dab5
Small refactor
alex-spies Jul 30, 2024
d78f073
Tiny refactor
alex-spies Jul 30, 2024
9695190
Start using synthetic=true again
alex-spies Jul 30, 2024
c343812
Merge remote-tracking branch 'upstream/main' into fix-synthetic-attri…
alex-spies Jul 30, 2024
c5e74be
Update docs/changelog/111413.yaml
alex-spies Jul 30, 2024
ac2d6e4
Do not filter out <no-fields> in the Analyzer
alex-spies Jul 30, 2024
54e1847
Merge remote-tracking branch 'upstream/main' into fix-synthetic-attri…
alex-spies Jul 30, 2024
7db76d8
Merge remote-tracking branch 'upstream/main' into fix-synthetic-attri…
alex-spies Aug 14, 2024
01d4c5f
Fix AggregateExec ser/de tests, hash, equals
alex-spies Aug 14, 2024
5443fef
Merge remote-tracking branch 'upstream/main' into fix-synthetic-attri…
alex-spies Aug 19, 2024
ddfdd2c
Add and use new constructor for FieldAttribute
alex-spies Aug 19, 2024
e7c212e
Mark another one as synthetic
alex-spies Aug 19, 2024
3c648b3
Rename requiredInputSet->childrenReferences
alex-spies Aug 19, 2024
4f01e7c
Synthetic as :s in debug output, e.g. field{f:s}#1
alex-spies Aug 19, 2024
ff8b72d
Spotless
alex-spies Aug 19, 2024
e4b258a
Merge remote-tracking branch 'upstream/main' into fix-synthetic-attri…
alex-spies Aug 20, 2024
9ad9e1c
Revert changes to AggregateExec
alex-spies Aug 20, 2024
7ff1991
Revert "Revert changes to AggregateExec"
alex-spies Aug 20, 2024
2eabfc2
Avoid using QueryPlan.references
alex-spies Aug 20, 2024
f1b9b86
Replace references with childrenReferences
alex-spies Aug 20, 2024
63c39a7
Fix a whoopsie
alex-spies Aug 20, 2024
6028664
Improve comments
alex-spies Aug 20, 2024
e193fb1
Add new transport version for AggregateExec
alex-spies Aug 20, 2024
ac44b1b
More tests for ProjectAwayColumns
alex-spies Aug 21, 2024
41e9159
Merge remote-tracking branch 'upstream/main' into fix-synthetic-attri…
alex-spies Aug 21, 2024
febb40f
Improve comment
alex-spies Aug 21, 2024
93cf2cf
Don't make computeReferences public
alex-spies Aug 21, 2024
9dbd5ea
Checkstyle
alex-spies Aug 21, 2024
d4dc892
Reduce noise via static imports
alex-spies Aug 22, 2024
0505dc4
Synthetic label: {f$} instead of {f:s}
alex-spies Aug 22, 2024
6f36f70
Make synth attr prefix inaccessible
alex-spies Aug 22, 2024
ec67cf2
Merge remote-tracking branch 'upstream/main' into fix-synthetic-attri…
alex-spies Aug 22, 2024
259d93f
Make FieldAttribute ctor with type package private
alex-spies Aug 22, 2024
7849978
Merge remote-tracking branch 'upstream/main' into fix-synthetic-attri…
alex-spies Aug 22, 2024
3582136
Merge remote-tracking branch 'upstream/main' into fix-synthetic-attri…
alex-spies Aug 28, 2024
d1f55e3
Refactor fieldNames for METRICS + add test
alex-spies Aug 28, 2024
776ed78
Update comment
alex-spies Aug 28, 2024
77431f6
Undo refactor to fieldNames; rely on index mode
alex-spies Aug 29, 2024
7c3aaf5
Merge remote-tracking branch 'upstream/main' into fix-synthetic-attri…
alex-spies Aug 29, 2024
e557eb4
Merge remote-tracking branch 'upstream/main' into fix-synthetic-attri…
alex-spies Aug 30, 2024
cb69899
Merge remote-tracking branch 'upstream/main' into fix-synthetic-attri…
alex-spies Sep 3, 2024
f65747d
Use default implementation for computeReferences
alex-spies Sep 3, 2024
a7b8715
Rename static helpers in Aggregate and Eval
alex-spies Sep 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ public QueryPlan(Source source, List<PlanType> children) {

public abstract List<Attribute> output();

/**
* The attributes required to be in the {@link QueryPlan#inputSet()} for this plan to be valid.
*/
public abstract AttributeSet requiredInputSet();
alex-spies marked this conversation as resolved.
Show resolved Hide resolved

public AttributeSet outputSet() {
if (lazyOutputSet == null) {
lazyOutputSet = new AttributeSet(output());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,24 @@

public enum AggregatorMode {

/**
* Maps raw inputs to intermediate outputs.
*/
INITIAL(false, true),

/**
* Maps intermediate inputs to intermediate outputs.
*/
INTERMEDIATE(true, true),

/**
* Maps intermediate inputs to final outputs.
*/
FINAL(true, false),

// most useful for testing
/**
* Maps raw inputs to final outputs. Most useful for testing.
*/
SINGLE(false, false);

private final boolean inputPartial;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.BytesRefs;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.compute.aggregation.AggregatorMode;
import org.elasticsearch.dissect.DissectParser;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.query.QueryBuilder;
Expand Down Expand Up @@ -168,7 +169,8 @@ static AggregateExec readAggregateExec(PlanStreamInput in) throws IOException {
in.readPhysicalPlanNode(),
in.readNamedWriteableCollectionAsList(Expression.class),
in.readNamedWriteableCollectionAsList(NamedExpression.class),
in.readEnum(AggregateExec.Mode.class),
in.readEnum(AggregatorMode.class),
in.readNamedWriteableCollectionAsList(Attribute.class),
in.readOptionalVInt()
);
}
Expand All @@ -179,6 +181,7 @@ static void writeAggregateExec(PlanStreamOutput out, AggregateExec aggregateExec
out.writeNamedWriteableCollection(aggregateExec.groupings());
out.writeNamedWriteableCollection(aggregateExec.aggregates());
out.writeEnum(aggregateExec.getMode());
out.writeNamedWriteableCollection(aggregateExec.intermediateAttributes());
out.writeOptionalVInt(aggregateExec.estimatedRowSize());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,11 @@ private Tuple<List<Attribute>, List<Stat>> pushableStats(AggregateExec aggregate
singletonList(agg),
emptyList()
);
// TODO: the attributes have been recreated here; they will have wrong name ids, and the dependency check will fail.
// We need to refactor AbstractPhysicalOperationProviders.intermediateAttributes so it doesn't return just a list
// of attributes, but a mapping from the logical to the physical attributes. And this mapping needs to be kept
// track of inside AggregateExec
// Likely required for https://github.com/elastic/elasticsearch/issues/105436
tuple.v1().addAll(intermediateAttributes);
tuple.v2().add(stat);
}
Expand Down Expand Up @@ -520,6 +525,7 @@ && allowedForDocValues(fieldAttribute, agg, foundAttributes)) {
agg.groupings(),
orderedAggregates,
agg.getMode(),
agg.intermediateAttributes(),
agg.estimatedRowSize()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public static String temporaryName(Expression inner, Expression outer, int suffi
}

public static String locallyUniqueTemporaryName(String inner, String outer) {
return FieldAttribute.SYNTHETIC_ATTRIBUTE_NAME_PREFIX + inner + "$" + outer + "$" + new NameId();
return rawTemporaryName(inner, outer, (new NameId()).toString());
}

public static String rawTemporaryName(String inner, String outer, String suffix) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.esql.optimizer;

import org.elasticsearch.compute.aggregation.AggregatorMode;
import org.elasticsearch.xpack.esql.common.Failures;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
Expand Down Expand Up @@ -151,7 +152,7 @@ protected AttributeSet generates(PhysicalPlan physicalPlan) {
@Override
protected AttributeSet references(PhysicalPlan plan) {
if (plan instanceof AggregateExec aggregate) {
if (aggregate.getMode() == AggregateExec.Mode.FINAL) {
if (aggregate.getMode() == AggregatorMode.FINAL) {
// lousy hack - need to generate the intermediate aggs yet the intermediateAggs method keep creating new IDs on each
// call
// in practice, the final aggregate should clearly declare the expected properties not hold on the original ones
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,9 @@
import org.elasticsearch.xpack.esql.common.Failure;
import org.elasticsearch.xpack.esql.core.expression.Alias;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.AttributeMap;
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.Expressions;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.core.rule.ParameterizedRuleExecutor;
import org.elasticsearch.xpack.esql.core.rule.Rule;
import org.elasticsearch.xpack.esql.core.rule.RuleExecutor;
Expand All @@ -25,15 +22,9 @@
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
import org.elasticsearch.xpack.esql.plan.logical.Eval;
import org.elasticsearch.xpack.esql.plan.logical.Project;
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
import org.elasticsearch.xpack.esql.plan.physical.EnrichExec;
import org.elasticsearch.xpack.esql.plan.physical.ExchangeExec;
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
import org.elasticsearch.xpack.esql.plan.physical.HashJoinExec;
import org.elasticsearch.xpack.esql.plan.physical.MvExpandExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.plan.physical.ProjectExec;
import org.elasticsearch.xpack.esql.plan.physical.RegexExtractExec;
import org.elasticsearch.xpack.esql.plan.physical.UnaryExec;

import java.util.ArrayList;
Expand Down Expand Up @@ -89,62 +80,26 @@ static class ProjectAwayColumns extends Rule<PhysicalPlan, PhysicalPlan> {

@Override
public PhysicalPlan apply(PhysicalPlan plan) {
var projectAll = new Holder<>(TRUE);
var keepCollecting = new Holder<>(TRUE);
var attributes = new AttributeSet();
var aliases = new AttributeMap<Expression>();
Holder<Boolean> keepTraversing = new Holder<>(TRUE);
// Invariant: if we add a projection with these attributes after the current plan node, the plan remains valid
// and the overall output will not change.
Holder<AttributeSet> requiredAttributes = new Holder<>(plan.outputSet());
alex-spies marked this conversation as resolved.
Show resolved Hide resolved

return plan.transformDown(UnaryExec.class, p -> {
// no need for project all
if (p instanceof ProjectExec || p instanceof AggregateExec) {
projectAll.set(FALSE);
// This will require updating should we choose to have non-unary execution plans in the future.
return plan.transformDown(UnaryExec.class, currentPlanNode -> {
if (keepTraversing.get() == false) {
return currentPlanNode;
}
if (keepCollecting.get()) {
p.forEachExpression(NamedExpression.class, ne -> {
var attr = ne.toAttribute();
// filter out attributes declared as aliases before
if (ne instanceof Alias as) {
aliases.put(attr, as.child());
attributes.remove(attr);
} else {
// skip synthetically added attributes (the ones from AVG), see LogicalPlanOptimizer.SubstituteSurrogates
if (attr.synthetic() == false && aliases.containsKey(attr) == false) {
Comment on lines -110 to -111
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Synthetic attributes were skipped because we had to solve an NPE; IMHO this was not a correct long term fix, as it made heavy assumptions on where synthetic attributes are used.

attributes.add(attr);
}
}
});
if (p instanceof RegexExtractExec ree) {
attributes.removeAll(ree.extractedFields());
}
if (p instanceof MvExpandExec mvee) {
attributes.remove(mvee.expanded());
}
if (p instanceof HashJoinExec join) {
attributes.removeAll(join.addedFields());
for (Attribute rhs : join.rightFields()) {
if (join.leftFields().stream().anyMatch(x -> x.semanticEquals(rhs)) == false) {
attributes.remove(rhs);
}
}
}
if (p instanceof EnrichExec ee) {
for (NamedExpression enrichField : ee.enrichFields()) {
// TODO: why is this different then the remove above?
attributes.remove(enrichField instanceof Alias a ? a.child() : enrichField);
}
}
}
if (p instanceof ExchangeExec exec) {
keepCollecting.set(FALSE);
if (currentPlanNode instanceof ExchangeExec exec) {
keepTraversing.set(FALSE);
var child = exec.child();
// otherwise expect a Fragment
if (child instanceof FragmentExec fragmentExec) {
var logicalFragment = fragmentExec.fragment();

// no need for projection when dealing with aggs
if (logicalFragment instanceof Aggregate == false) {
var selectAll = projectAll.get();
var output = selectAll ? exec.child().output() : new ArrayList<>(attributes);
List<Attribute> output = new ArrayList<>(requiredAttributes.get());
// if all the fields are filtered out, it's only the count that matters
// however until a proper fix (see https://github.com/elastic/elasticsearch/issues/98703)
// add a synthetic field (so it doesn't clash with the user defined one) to return a constant
Expand All @@ -156,19 +111,22 @@ public PhysicalPlan apply(PhysicalPlan plan) {
output = Expressions.asAttributes(fields);
}
// add a logical projection (let the local replanning remove it if needed)
p = exec.replaceChild(
new FragmentExec(
Source.EMPTY,
new Project(logicalFragment.source(), logicalFragment, output),
fragmentExec.esFilter(),
fragmentExec.estimatedRowSize(),
fragmentExec.reducer()
)
FragmentExec newChild = new FragmentExec(
Source.EMPTY,
new Project(logicalFragment.source(), logicalFragment, output),
fragmentExec.esFilter(),
fragmentExec.estimatedRowSize(),
fragmentExec.reducer()
);
return new ExchangeExec(exec.source(), output, exec.isInBetweenAggs(), newChild);
}
}
} else {
AttributeSet childOutput = currentPlanNode.inputSet();
AttributeSet addedAttributes = currentPlanNode.outputSet().subtract(childOutput);
requiredAttributes.set(requiredAttributes.get().subtract(addedAttributes).combine(currentPlanNode.requiredInputSet()));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main change; the way we collected all attributes that occurred and then removed generated attributes is fully abstracted away here.

}
return p;
return currentPlanNode;
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.esql.core.capabilities.Resolvables;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.Expressions;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
Expand Down Expand Up @@ -120,11 +121,24 @@ public boolean expressionsResolved() {
@Override
public List<Attribute> output() {
if (lazyOutput == null) {
lazyOutput = mergeOutputAttributes(Expressions.asAttributes(aggregates()), emptyList());
lazyOutput = outputAttributes(aggregates);
}
return lazyOutput;
}

public static List<Attribute> outputAttributes(List<? extends NamedExpression> aggregates) {
return mergeOutputAttributes(Expressions.asAttributes(aggregates), emptyList());
}

@Override
public AttributeSet requiredInputSet() {
return requiredInputAttributes(aggregates, groupings);
}

public static AttributeSet requiredInputAttributes(List<? extends NamedExpression> aggregates, List<? extends Expression> groupings) {
return Expressions.references(groupings).combine(Expressions.references(aggregates));
}

@Override
public int hashCode() {
return Objects.hash(aggregateType, groupings, aggregates, child());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.xpack.esql.plan.logical;

import org.elasticsearch.xpack.esql.core.capabilities.Resolvables;
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.Source;
Expand All @@ -27,6 +28,11 @@ public List<NamedExpression> removals() {
return removals;
}

@Override
public AttributeSet requiredInputSet() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't drop different here? It's usage is to capture what the query wants to remove from projections and, in essence, a drop doesn't "live" too long, being transformed in a projection in the Analyzer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True. Drop essentially is not a real logical plan node, it's an AST node. The same is true for Lookup and Keep.

We can have this throw UnsupportedOperationException instead; although I think returning references() is correct, too; this contains the UnresolvedAttributes that this is trying to drop, excluding any wildcards.

return references();
}

@Override
public boolean expressionsResolved() {
return Resolvables.resolved(removals);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.xpack.esql.core.capabilities.Resolvables;
import org.elasticsearch.xpack.esql.core.expression.Alias;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
import org.elasticsearch.xpack.esql.core.expression.EmptyAttribute;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.NameId;
Expand Down Expand Up @@ -104,6 +105,11 @@ public Mode mode() {
return mode;
}

@Override
public AttributeSet requiredInputSet() {
return matchField.references();
}

@Override
public boolean expressionsResolved() {
return policyName.resolved()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.elasticsearch.xpack.esql.core.expression.Alias;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.AttributeMap;
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
import org.elasticsearch.xpack.esql.core.expression.Expressions;
import org.elasticsearch.xpack.esql.core.expression.NameId;
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
Expand Down Expand Up @@ -47,6 +49,16 @@ public List<Attribute> output() {
return lazyOutput;
}

@Override
public AttributeSet requiredInputSet() {
return requiredAttributesFromChild(fields);
}

public static AttributeSet requiredAttributesFromChild(List<Alias> fields) {
AttributeSet generated = new AttributeSet(asAttributes(fields));
return Expressions.references(fields).subtract(generated);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same remark - is this used elsewhere externally?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, in EvalExec!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why the methods across Eval and Aggregate don't have the same name? Something like doComputeReferences() or determineReferences(), etc... required is a misnomer in this context.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll just overload computeReferences (and output for Aggregate), that should be nice and consistent.


@Override
public List<Attribute> generatedAttributes() {
return asAttributes(fields);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/
package org.elasticsearch.xpack.esql.plan.logical;

import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.Source;
Expand All @@ -26,6 +27,11 @@ public Filter(Source source, LogicalPlan child, Expression condition) {
this.condition = condition;
}

@Override
public AttributeSet requiredInputSet() {
return references();
}

@Override
protected NodeInfo<Filter> info() {
return NodeInfo.create(this, Filter::new, child(), condition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ public String getWriteableName() {
return ENTRY.name;
}

@Override
public AttributeSet requiredInputSet() {
return references();
}

@Override
protected NodeInfo<InlineStats> info() {
return NodeInfo.create(this, InlineStats::new, child(), groupings, aggregates);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/
package org.elasticsearch.xpack.esql.plan.logical;

import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
import org.elasticsearch.xpack.esql.core.tree.Source;

import java.util.Collections;
Expand All @@ -17,6 +18,11 @@ protected LeafPlan(Source source) {
super(source, Collections.emptyList());
}

@Override
public AttributeSet requiredInputSet() {
return AttributeSet.EMPTY;
}

@Override
public final LogicalPlan replaceChildren(List<LogicalPlan> newChildren) {
throw new UnsupportedOperationException("this type of node doesn't have any children to replace");
Expand Down
Loading