Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,8 @@ private LogicalPlan resolveMvExpand(MvExpand p, List<Attribute> childrenOutput)
null,
false
)
: resolved
: resolved,
p.limit()
);
}
return p;
Expand Down Expand Up @@ -701,6 +702,7 @@ public LogicalPlan apply(LogicalPlan logicalPlan, AnalyzerContext context) {
} else {
limit = context.configuration().resultTruncationMaxSize(); // user provided a limit: cap result entries to the max
}

return new Limit(Source.EMPTY, new Literal(Source.EMPTY, limit, DataTypes.INTEGER), logicalPlan);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ static void writeLimit(PlanStreamOutput out, Limit limit) throws IOException {
}

static MvExpand readMvExpand(PlanStreamInput in) throws IOException {
return new MvExpand(in.readSource(), in.readLogicalPlanNode(), in.readNamedExpression(), in.readAttribute());
return new MvExpand(in.readSource(), in.readLogicalPlanNode(), in.readNamedExpression(), in.readAttribute(), null);
}

static void writeMvExpand(PlanStreamOutput out, MvExpand mvExpand) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.elasticsearch.xpack.ql.optimizer.OptimizerRules.SetAsOptimized;
import org.elasticsearch.xpack.ql.optimizer.OptimizerRules.SimplifyComparisonsArithmetics;
import org.elasticsearch.xpack.ql.plan.logical.Aggregate;
import org.elasticsearch.xpack.ql.plan.logical.EsRelation;
import org.elasticsearch.xpack.ql.plan.logical.Filter;
import org.elasticsearch.xpack.ql.plan.logical.Limit;
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
Expand All @@ -62,7 +61,6 @@
import org.elasticsearch.xpack.ql.rule.ParameterizedRule;
import org.elasticsearch.xpack.ql.rule.ParameterizedRuleExecutor;
import org.elasticsearch.xpack.ql.rule.Rule;
import org.elasticsearch.xpack.ql.tree.Source;
import org.elasticsearch.xpack.ql.type.DataType;
import org.elasticsearch.xpack.ql.type.DataTypes;
import org.elasticsearch.xpack.ql.util.CollectionUtils;
Expand Down Expand Up @@ -137,13 +135,16 @@ protected static List<Batch<LogicalPlan>> rules() {
new PruneColumns(),
new PruneLiteralsInOrderBy(),
new PushDownAndCombineLimits(),
new DuplicateLimitAfterMvExpand(),
new PushDownAndCombineFilters(),
new PushDownEval(),
new PushDownRegexExtract(),
new PushDownEnrich(),
new PushDownAndCombineOrderBy(),
new PruneOrderByBeforeStats(),
// see https://github.com/elastic/elasticsearch/issues/103543
// new PushDownMvExpandPastProject(),
new DuplicateLimitAfterMvExpand(),
new PushDownMvExpandPastOrderBy(),
new PruneRedundantSortClauses()
);

Expand All @@ -153,12 +154,12 @@ protected static List<Batch<LogicalPlan>> rules() {
new ReplaceDuplicateAggWithEval(),
// pushing down limits again, because ReplaceDuplicateAggWithEval could create new Project nodes that can still be optimized
new PushDownAndCombineLimits(),
new ReplaceLimitAndSortAsTopN()
new ReplaceLimitAndSortAsTopN(),
new CleanUpMvExpand()
);
var defaultTopN = new Batch<>("Add default TopN", new AddDefaultTopN());
var label = new Batch<>("Set as Optimized", Limiter.ONCE, new SetAsOptimized());

return asList(substitutions, operators, skip, cleanup, defaultTopN, label);
return asList(substitutions, operators, skip, cleanup, label);
}

// TODO: currently this rule only works for aggregate functions (AVG)
Expand Down Expand Up @@ -436,7 +437,7 @@ protected LogicalPlan rule(Limit limit) {
var l2 = (int) childLimit.limit().fold();
return new Limit(limit.source(), Literal.of(limitSource, Math.min(l1, l2)), childLimit.child());
} else if (limit.child() instanceof UnaryPlan unary) {
if (unary instanceof Eval || unary instanceof Project || unary instanceof RegexExtract || unary instanceof Enrich) {
if (isTransparentForLimit(unary)) {
return unary.replaceChild(limit.replaceChild(unary.child()));
}
// check if there's a 'visible' descendant limit lower than the current one
Expand Down Expand Up @@ -481,59 +482,33 @@ private static Limit descendantLimit(UnaryPlan unary) {
}
}

static class DuplicateLimitAfterMvExpand extends OptimizerRules.OptimizerRule<Limit> {
private static boolean isTransparentForLimit(LogicalPlan plan) {
return plan instanceof Eval || plan instanceof Project || plan instanceof RegexExtract || plan instanceof Enrich;
}

protected static class PushDownMvExpandPastOrderBy extends OptimizerRules.OptimizerRule<OrderBy> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: a small example as javadoc would immensely help understand how this rule works.

Question: how is pushing MvExpand past an OrderBy beneficial?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: a small example as javadoc would immensely help understand how this rule works.

My advice: take a look at the tests in LogicalPlanOptimizerTests and their javadoc, where the optimized plan is visible. Alternatively, trace logging on org.elasticsearch.xpack.esql.optimizer to see what each rule is doing when.

Question: how is pushing MvExpand past an OrderBy beneficial?

In the description of the PR I have identified two cases for this type of pushdown. If the semantics of the query remains the same (and they are in those two situations I mentioned), it is beneficial to form a TopN by having sort (OrderBy) closer to a limit. Since every query now has a limit, the aim is to have the sort closer to it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

I still think it should be obvious what a rule does from either its name or a javadoc; the optimizer tests are not sufficient as they contain complete optimization runs, so there's no fast way to see what a rule does (in isolation). This rule is also sufficiently complex that skimming the code does not give an immediate idea of what to expect in terms of its behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, sorry - this is actually one of the simpler rules - I thought I left this comment further down.

Still - pushing down mv_expand past a sort is not an obvious optimization (obvious would be e.g. pushing down filters to reduce number of rows). A short comment explaining the reasoning for why it exists in the first place would be important IMHO.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to reconstruct the behavior of this rule and the reasoning behind it, and I have to say that it took me some time and I'm still unsure I got all the nuances.

This is a problem by itself: the rule tries to analyze a potentially large portion of the plan and then makes changes in positions that are very far from the entry point of the rule; it's hard to understand now, so it will be even harder to maintain in the long term, when the original intentions will be harder to recollect and when the language (and the planner) will be richer and more complex.

This said, I tried to understand if the rule is correct in all cases, and my conclusion is that it's not.
My understanding is that the intention is the following: if you have a plan with sort | mv_expand | ... | sort, you replace it with mv_expand | sort | ... | sort, in the assumption that eventually the first sort will be removed (because redundant?).
This is correct as long as there are no commands in between that drop possible candidate results, or for which the order actually matters.

The case that we are ignoring now is LIMIT: a query like sort | mv_expand | ... | limit | ... | sort is different from mv_expand | sort | ... | limit | ... | sort, especially when the first sort happens on the expanded key. Both descendantMvExpand() and replaceChildUntilMvExpandAndOrderBy() are not taking this case into consideration.

Cases we could miss tomorrow are all the commands that rely on the intermediate order, that include inline stats, commands that access previous records and other cases we can hardly completely define now.

In short, this rule would be much better defined as a local rule that converts sort | mv_expand -> mv_expand | sort, but unfortunately it's not generally valid, and defining all the cases when it's valid is risky and error prone. Also, the general usefulness of this change is not completely clear (to me at least).
We can try to fix the above, but we are paying a high cost to maintainability for arguably small advantages.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++, agree that it's hard to verify this rule for all cases, esp. as we might add new operators that might invalidate it in the future, silently.

If we want to keep the sort | mv_expand | ... | sort -> mv_expand | sort | ... | sort, we probably should restrict the operators that can go in the ... part explicitly, ideally adding at least a unit test per such operator to verify that output doesn't change after optimization. That is a lot of work, though :/

@Override
protected LogicalPlan rule(Limit limit) {
var child = limit.child();
var shouldSkip = child instanceof Eval
|| child instanceof Project
|| child instanceof RegexExtract
|| child instanceof Enrich
|| child instanceof Limit;

if (shouldSkip == false && child instanceof UnaryPlan unary) {
protected LogicalPlan rule(OrderBy upperOrderBy) {
if (upperOrderBy.child() instanceof UnaryPlan unary) {
MvExpand mvExpand = descendantMvExpand(unary);
if (mvExpand != null) {
Limit limitBeforeMvExpand = limitBeforeMvExpand(mvExpand);
// if there is no "appropriate" limit before mv_expand, then push down a copy of the one after it so that:
// - a possible TopN is properly built as low as possible in the tree (closed to Lucene)
// - the input of mv_expand is as small as possible before it is expanded (less rows to inflate and occupy memory)
if (limitBeforeMvExpand == null) {
var duplicateLimit = new Limit(limit.source(), limit.limit(), mvExpand.child());
return limit.replaceChild(propagateDuplicateLimitUntilMvExpand(duplicateLimit, mvExpand, unary));
}
if (mvExpand != null && mvExpand.child() instanceof OrderBy) {
return upperOrderBy.replaceChild(replaceChildUntilMvExpandAndOrderBy(unary, mvExpand));
}
}
return limit;

return upperOrderBy;
}

private static MvExpand descendantMvExpand(UnaryPlan unary) {
UnaryPlan plan = unary;
AttributeSet filterReferences = new AttributeSet();
while (plan instanceof Aggregate == false) {
if (plan instanceof MvExpand mve) {
// don't return the mv_expand that has a filter after it which uses the expanded values
// since this will trigger the use of a potentially incorrect (too restrictive) limit further down in the tree
if (filterReferences.isEmpty() == false) {
if (filterReferences.contains(mve.target()) // the same field or reference attribute is used in mv_expand AND filter
|| mve.target() instanceof ReferenceAttribute // or the mv_expand attr hasn't yet been resolved to a field attr
// or not all filter references have been resolved to field attributes
|| filterReferences.stream().anyMatch(ref -> ref instanceof ReferenceAttribute)) {
return null;
}
}
while (plan instanceof UnaryPlan) {
// find the first MvExpand that also has an orderBy in front of it, otherwise keep looking,
// otherwise we will skip valid mv_expands further down the tree
if (plan instanceof MvExpand mve && mve.child() instanceof OrderBy) {
return mve;
} else if (plan instanceof Filter filter) {
// gather all the filters' references to be checked later when a mv_expand is found
filterReferences.addAll(filter.references());
} else if (plan instanceof OrderBy) {
// ordering after mv_expand COULD break the order of the results, so the limit shouldn't be copied past mv_expand
// something like from test | sort emp_no | mv_expand job_positions | sort first_name | limit 5
// (the sort first_name likely changes the order of the docs after sort emp_no, so "limit 5" shouldn't be copied down
return null;
}

if (plan.child() instanceof UnaryPlan unaryPlan) {
plan = unaryPlan;
} else {
Expand All @@ -542,31 +517,97 @@ private static MvExpand descendantMvExpand(UnaryPlan unary) {
}
return null;
}
}

private static Limit limitBeforeMvExpand(MvExpand mvExpand) {
UnaryPlan plan = mvExpand;
while (plan instanceof Aggregate == false) {
if (plan instanceof Limit limit) {
return limit;
protected static class PushDownMvExpandPastProject extends OptimizerRules.OptimizerRule<MvExpand> {
@Override
protected LogicalPlan rule(MvExpand mvExpand) {
LogicalPlan child = mvExpand.child();
if (child instanceof Project) {
return pushDownPastProject(mvExpand);
}

return mvExpand;
}
}

static class DuplicateLimitAfterMvExpand extends OptimizerRules.OptimizerRule<Limit> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should contain an example + reasoning for when this is possible as javadoc IMO.

@Override
protected LogicalPlan rule(Limit limit) {
var child = limit.child();
// let the PushDownAndCombineLimits rule to push Limit further down
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing a verb?

var shouldSkip = isTransparentForLimit(child) || child instanceof Limit;
if (shouldSkip || child instanceof UnaryPlan == false) {
return limit;
}

UnaryPlan unary = (UnaryPlan) child;
MvExpand mvExpand = null;
AttributeSet filterReferences = new AttributeSet();
boolean hasOrderBy = false;
boolean isFiltered = false;
// look for any orderBy between this limit and mv_expand
// and, separately, for any filter that is filtering the expanded values of mv_expand
while (mvExpand == null && unary instanceof UnaryPlan) {
if (unary instanceof MvExpand mve) {
if (filterReferences.isEmpty() == false) {
if (filterReferences.contains(mve.expanded())) {// if the filter is on the mv_expand field
isFiltered = true;
}
}
mvExpand = mve;
break;
} else if (unary instanceof Filter filter) {
// gather all the filters' references to be checked later when a mv_expand is found
filterReferences.addAll(filter.references());
} else if (unary instanceof OrderBy) {
hasOrderBy = true;
break;
} else if (unary instanceof Limit) {
break;
}
if (plan.child() instanceof UnaryPlan unaryPlan) {
plan = unaryPlan;

if (unary.child() instanceof UnaryPlan unaryPlan) {
unary = unaryPlan;
} else {
break;
}
}
return null;

if (mvExpand != null) {
// if the filter is on the expanded values and the order of the results doesn't change, push down mv_expand past sort
if (isFiltered && hasOrderBy == false) {
Comment on lines +577 to +579
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think mvExpand == null and hasOrderBy are mutually exclusive. If so, maybe this can be written a bit easier.

if (mvExpand.child() instanceof OrderBy) {
return limit.replaceChild(replaceChildUntilMvExpandAndOrderBy((UnaryPlan) child, mvExpand));
Comment on lines +580 to +581
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're pushing down MvExpand rather than introducing an additional limit, shouldn't this case be handled in a separate rule?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, I am not sure. The line you referenced here is, basically, an exception to the general rule of duplicating the limit. And this exception here doesn't fulfill the same conditions as PushDownMvExpandPastOrderBy.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can leave it but I think it should be documented; this is definitely unexpected.

}
} else if (mvExpand.limit() == null) { // otherwise push down a copy of the limit before mv_expand
var duplicateLimit = new Limit(limit.source(), limit.limit(), mvExpand.child());
return limit.replaceChild(propagateDuplicateLimitUntilMvExpand(duplicateLimit, mvExpand, (UnaryPlan) child));
}
}

return limit;
}

private LogicalPlan propagateDuplicateLimitUntilMvExpand(Limit duplicateLimit, MvExpand mvExpand, UnaryPlan child) {
if (child == mvExpand) {
return mvExpand.replaceChild(duplicateLimit);
return new MvExpand(mvExpand.source(), duplicateLimit, mvExpand.target(), mvExpand.expanded(), duplicateLimit);
} else {
return child.replaceChild(propagateDuplicateLimitUntilMvExpand(duplicateLimit, mvExpand, (UnaryPlan) child.child()));
}
}
}

private static LogicalPlan replaceChildUntilMvExpandAndOrderBy(UnaryPlan child, MvExpand mvExpand) {
if (child == mvExpand) {
assert mvExpand.child() instanceof OrderBy;
OrderBy orderBy = (OrderBy) mvExpand.child();
return orderBy.replaceChild(mvExpand.replaceChild(orderBy.child()));
} else {
return child.replaceChild(replaceChildUntilMvExpandAndOrderBy((UnaryPlan) child.child(), mvExpand));
}
}

// 3 in (field, 4, 5) --> 3 in (field) or 3 in (4, 5)
public static class SplitInWithFoldableValue extends OptimizerRules.OptimizerExpressionRule<In> {

Expand Down Expand Up @@ -1016,37 +1057,10 @@ protected LogicalPlan rule(Limit plan) {
}
}

/**
* This adds an explicit TopN node to a plan that only has an OrderBy right before Lucene.
* To date, the only known use case that "needs" this is a query of the form
* from test
* | sort emp_no
* | mv_expand first_name
* | rename first_name AS x
* | where x LIKE "*a*"
* | limit 15
*
* or
*
* from test
* | sort emp_no
* | mv_expand first_name
* | sort first_name
* | limit 15
*
* PushDownAndCombineLimits rule will copy the "limit 15" after "sort emp_no" if there is no filter on the expanded values
* OR if there is no sort between "limit" and "mv_expand".
* But, since this type of query has such a filter, the "sort emp_no" will have no limit when it reaches the current rule.
*/
static class AddDefaultTopN extends ParameterizedOptimizerRule<LogicalPlan, LogicalOptimizerContext> {

static class CleanUpMvExpand extends OptimizerRules.OptimizerRule<MvExpand> {
@Override
protected LogicalPlan rule(LogicalPlan plan, LogicalOptimizerContext context) {
if (plan instanceof UnaryPlan unary && unary.child() instanceof OrderBy order && order.child() instanceof EsRelation relation) {
var limit = new Literal(Source.EMPTY, context.configuration().resultTruncationMaxSize(), DataTypes.INTEGER);
return unary.replaceChild(new TopN(plan.source(), relation, order.order(), limit));
}
return plan;
protected LogicalPlan rule(MvExpand plan) {
return plan.limit() == null ? plan : new MvExpand(plan.source(), plan.child(), plan.target(), plan.expanded(), null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public PlanFactory visitDissectCommand(EsqlBaseParser.DissectCommandContext ctx)
public PlanFactory visitMvExpandCommand(EsqlBaseParser.MvExpandCommandContext ctx) {
UnresolvedAttribute field = visitQualifiedName(ctx.qualifiedName());
Source src = source(ctx);
return child -> new MvExpand(src, child, field, new UnresolvedAttribute(src, field.name()));
return child -> new MvExpand(src, child, field, new UnresolvedAttribute(src, field.name()), null);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import org.elasticsearch.xpack.ql.expression.Attribute;
import org.elasticsearch.xpack.ql.expression.NamedExpression;
import org.elasticsearch.xpack.ql.plan.logical.Limit;
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.ql.plan.logical.UnaryPlan;
import org.elasticsearch.xpack.ql.tree.NodeInfo;
Expand All @@ -23,11 +24,13 @@ public class MvExpand extends UnaryPlan {
private final Attribute expanded;

private List<Attribute> output;
private Limit limit; // a temporary "marker" indicating if a limit has been "associated" with this mv_expand

public MvExpand(Source source, LogicalPlan child, NamedExpression target, Attribute expanded) {
public MvExpand(Source source, LogicalPlan child, NamedExpression target, Attribute expanded, Limit limit) {
super(source, child);
this.target = target;
this.expanded = expanded;
this.limit = limit;
}

public static List<Attribute> calculateOutput(List<Attribute> input, NamedExpression target, Attribute expanded) {
Expand All @@ -50,14 +53,18 @@ public Attribute expanded() {
return expanded;
}

public Limit limit() {
return limit;
}

@Override
public boolean expressionsResolved() {
return target.resolved();
}

@Override
public UnaryPlan replaceChild(LogicalPlan newChild) {
return new MvExpand(source(), newChild, target, expanded);
return new MvExpand(source(), newChild, target, expanded, limit);
}

@Override
Expand All @@ -70,19 +77,20 @@ public List<Attribute> output() {

@Override
protected NodeInfo<? extends LogicalPlan> info() {
return NodeInfo.create(this, MvExpand::new, child(), target, expanded);
return NodeInfo.create(this, MvExpand::new, child(), target, expanded, limit);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), target, expanded);
return Objects.hash(super.hashCode(), target, expanded, limit);
}

@Override
public boolean equals(Object obj) {
if (false == super.equals(obj)) {
return false;
if (super.equals(obj)) {
var other = (MvExpand) obj;
return Objects.equals(target, other.target) && Objects.equals(expanded, other.expanded) && Objects.equals(limit, other.limit);
}
return Objects.equals(target, ((MvExpand) obj).target) && Objects.equals(expanded, ((MvExpand) obj).expanded);
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ public void testEsqlProject() throws IOException {

public void testMvExpand() throws IOException {
var esRelation = new EsRelation(Source.EMPTY, randomEsIndex(), List.of(randomFieldAttribute()), randomBoolean());
var orig = new MvExpand(Source.EMPTY, esRelation, randomFieldAttribute(), randomFieldAttribute());
var orig = new MvExpand(Source.EMPTY, esRelation, randomFieldAttribute(), randomFieldAttribute(), null);
BytesStreamOutput bso = new BytesStreamOutput();
PlanStreamOutput out = new PlanStreamOutput(bso, planNameRegistry);
PlanNamedTypes.writeMvExpand(out, orig);
Expand Down
Loading