From 495de5fe475519113c7913da85c61b55b96aa009 Mon Sep 17 00:00:00 2001 From: Andrei Stefan Date: Tue, 5 Dec 2023 09:53:25 +0200 Subject: [PATCH 1/2] Add Analyzer level default limit for certain queries using mv_expand Fix small issues with the optimizer rule --- .../xpack/esql/analysis/Analyzer.java | 72 +++++++- .../esql/optimizer/LogicalPlanOptimizer.java | 52 ++---- .../optimizer/LogicalPlanOptimizerTests.java | 170 ++++++++++++++---- 3 files changed, 215 insertions(+), 79 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java index a8462703a2b37..aaba8a87d3c5b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java @@ -48,7 +48,9 @@ import org.elasticsearch.xpack.ql.plan.logical.EsRelation; import org.elasticsearch.xpack.ql.plan.logical.Limit; import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.ql.plan.logical.OrderBy; import org.elasticsearch.xpack.ql.plan.logical.Project; +import org.elasticsearch.xpack.ql.plan.logical.UnaryPlan; import org.elasticsearch.xpack.ql.rule.ParameterizedRule; import org.elasticsearch.xpack.ql.rule.ParameterizedRuleExecutor; import org.elasticsearch.xpack.ql.rule.Rule; @@ -647,7 +649,75 @@ public LogicalPlan apply(LogicalPlan logicalPlan, AnalyzerContext context) { } else { limit = context.configuration().resultTruncationMaxSize(); // user provided a limit: cap result entries to the max } - return new Limit(Source.EMPTY, new Literal(Source.EMPTY, limit, DataTypes.INTEGER), logicalPlan); + + Limit l = new Limit(Source.EMPTY, new Literal(Source.EMPTY, limit, DataTypes.INTEGER), logicalPlan); + return maybeAddDefaultLimitForMvExpand(l); + } + + /** + * This adds the implicit limit to a plan that has a sort and no limit between EsRelation and the first MvExpand. + * To date, the only known use case that "needs" this is a query of the form + * from test + * | sort emp_no + * | mv_expand first_name + * | rename first_name AS x + * | where x LIKE "*a*" + * | limit 15 + * + * or + * + * from test + * | sort emp_no + * | mv_expand first_name + * | sort first_name + * | limit 15 + * + * LogicalPlanAnalyzer.PushDownAndCombineLimits rule will copy the "limit 15" after "sort emp_no" if there is no filter + * on the expanded values OR if there is no sort between "limit" and "mv_expand". In these two situations, to be able for + * "sort emp_no" to form a TopN, we need a limit. Since the "limit 15" in the query cannot be pushed down (otherwise, it will change + * the results that reach mv_expand command) we need some kind of value there. For now, this is the implicit limit. + * The second query above becomes: + * + * from test + * | sort emp_no + * | limit 10000 + * | mv_expand first_name + * | sort first_name + * | limit 15 + */ + private Limit maybeAddDefaultLimitForMvExpand(Limit limit) { + LogicalPlan plan = limit.child(); + MvExpand mvExpand = null; + UnaryPlan esRelationParent = null; + UnaryPlan orderByParent = null; + + // basically, locate the closest to Lucene mv_expand and any potential sort + while (plan instanceof UnaryPlan unaryPlan) { + if (plan instanceof MvExpand mve) { + mvExpand = mve; + orderByParent = null; + } else if (plan instanceof OrderBy && mvExpand != null) { + orderByParent = esRelationParent; + } + plan = unaryPlan.child(); + esRelationParent = unaryPlan; + } + + // when these two are found, place the default limit before sort + if (mvExpand != null && orderByParent != null && plan instanceof EsRelation) { + var duplicateLimit = new Limit(limit.source(), limit.limit(), orderByParent.child()); + return limit.replaceChild(propagateLimitUntilEsRelation(duplicateLimit, orderByParent, (UnaryPlan) limit.child())); + } + + return limit; + } + + private LogicalPlan propagateLimitUntilEsRelation(Limit duplicateLimit, UnaryPlan esRelationParent, UnaryPlan child) { + if (child == esRelationParent) { + return esRelationParent.replaceChild(duplicateLimit); + } else { + return child.replaceChild(propagateLimitUntilEsRelation(duplicateLimit, esRelationParent, (UnaryPlan) child.child())); + } } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java index 66654b78c3af4..ed5b7f250fdcf 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java @@ -52,7 +52,6 @@ import org.elasticsearch.xpack.ql.optimizer.OptimizerRules.SetAsOptimized; import org.elasticsearch.xpack.ql.optimizer.OptimizerRules.SimplifyComparisonsArithmetics; import org.elasticsearch.xpack.ql.plan.logical.Aggregate; -import org.elasticsearch.xpack.ql.plan.logical.EsRelation; import org.elasticsearch.xpack.ql.plan.logical.Filter; import org.elasticsearch.xpack.ql.plan.logical.Limit; import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan; @@ -62,7 +61,6 @@ import org.elasticsearch.xpack.ql.rule.ParameterizedRule; import org.elasticsearch.xpack.ql.rule.ParameterizedRuleExecutor; import org.elasticsearch.xpack.ql.rule.Rule; -import org.elasticsearch.xpack.ql.tree.Source; import org.elasticsearch.xpack.ql.type.DataType; import org.elasticsearch.xpack.ql.type.DataTypes; import org.elasticsearch.xpack.ql.util.CollectionUtils; @@ -155,10 +153,9 @@ protected static List> rules() { new PushDownAndCombineLimits(), new ReplaceLimitAndSortAsTopN() ); - var defaultTopN = new Batch<>("Add default TopN", new AddDefaultTopN()); var label = new Batch<>("Set as Optimized", Limiter.ONCE, new SetAsOptimized()); - return asList(substitutions, operators, skip, cleanup, defaultTopN, label); + return asList(substitutions, operators, skip, cleanup, label); } // TODO: currently this rule only works for aggregate functions (AVG) @@ -493,13 +490,16 @@ protected LogicalPlan rule(Limit limit) { || child instanceof Limit; if (shouldSkip == false && child instanceof UnaryPlan unary) { - MvExpand mvExpand = descendantMvExpand(unary); + // in case unary is THE MvExpand, return it right away + MvExpand mvExpand = unary instanceof MvExpand mve ? mve : descendantMvExpand(unary); if (mvExpand != null) { Limit limitBeforeMvExpand = limitBeforeMvExpand(mvExpand); // if there is no "appropriate" limit before mv_expand, then push down a copy of the one after it so that: // - a possible TopN is properly built as low as possible in the tree (closed to Lucene) // - the input of mv_expand is as small as possible before it is expanded (less rows to inflate and occupy memory) - if (limitBeforeMvExpand == null) { + // limitBeforeMvExpand > limit is a cheap way of not indefinetely copying the same limit past mv_expand + // ">" will enforce the limit of the mv_expand, limitting the results that come to mv_expand to bare minimum + if (limitBeforeMvExpand == null || (int) limitBeforeMvExpand.limit().fold() > (int) limit.limit().fold()) { var duplicateLimit = new Limit(limit.source(), limit.limit(), mvExpand.child()); return limit.replaceChild(propagateDuplicateLimitUntilMvExpand(duplicateLimit, mvExpand, unary)); } @@ -511,7 +511,7 @@ protected LogicalPlan rule(Limit limit) { private static MvExpand descendantMvExpand(UnaryPlan unary) { UnaryPlan plan = unary; AttributeSet filterReferences = new AttributeSet(); - while (plan instanceof Aggregate == false) { + while (plan instanceof UnaryPlan) { if (plan instanceof MvExpand mve) { // don't return the mv_expand that has a filter after it which uses the expanded values // since this will trigger the use of a potentially incorrect (too restrictive) limit further down in the tree @@ -532,6 +532,8 @@ private static MvExpand descendantMvExpand(UnaryPlan unary) { // something like from test | sort emp_no | mv_expand job_positions | sort first_name | limit 5 // (the sort first_name likely changes the order of the docs after sort emp_no, so "limit 5" shouldn't be copied down return null; + } else if (plan instanceof Limit) { + return null; } if (plan.child() instanceof UnaryPlan unaryPlan) { @@ -545,7 +547,7 @@ private static MvExpand descendantMvExpand(UnaryPlan unary) { private static Limit limitBeforeMvExpand(MvExpand mvExpand) { UnaryPlan plan = mvExpand; - while (plan instanceof Aggregate == false) { + while (plan instanceof UnaryPlan) { if (plan instanceof Limit limit) { return limit; } @@ -1016,40 +1018,6 @@ protected LogicalPlan rule(Limit plan) { } } - /** - * This adds an explicit TopN node to a plan that only has an OrderBy right before Lucene. - * To date, the only known use case that "needs" this is a query of the form - * from test - * | sort emp_no - * | mv_expand first_name - * | rename first_name AS x - * | where x LIKE "*a*" - * | limit 15 - * - * or - * - * from test - * | sort emp_no - * | mv_expand first_name - * | sort first_name - * | limit 15 - * - * PushDownAndCombineLimits rule will copy the "limit 15" after "sort emp_no" if there is no filter on the expanded values - * OR if there is no sort between "limit" and "mv_expand". - * But, since this type of query has such a filter, the "sort emp_no" will have no limit when it reaches the current rule. - */ - static class AddDefaultTopN extends ParameterizedOptimizerRule { - - @Override - protected LogicalPlan rule(LogicalPlan plan, LogicalOptimizerContext context) { - if (plan instanceof UnaryPlan unary && unary.child() instanceof OrderBy order && order.child() instanceof EsRelation relation) { - var limit = new Literal(Source.EMPTY, context.configuration().resultTruncationMaxSize(), DataTypes.INTEGER); - return unary.replaceChild(new TopN(plan.source(), relation, order.order(), limit)); - } - return plan; - } - } - public static class ReplaceRegexMatch extends OptimizerRules.ReplaceRegexMatch { protected Expression regexToEquals(RegexMatch regexMatch, Literal literal) { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java index 352dccc046588..30553d1f99c51 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java @@ -968,17 +968,18 @@ public void testDontPushDownLimitPastMvExpand() { /** * Expected - * EsqlProject[[emp_no{f}#141, first_name{f}#142, languages{f}#143, lll{r}#132, salary{f}#147]] - * \_TopN[[Order[salary{f}#147,DESC,FIRST], Order[first_name{f}#142,ASC,LAST]],5[INTEGER]] - * \_Limit[5[INTEGER]] - * \_MvExpand[salary{f}#147] - * \_Eval[[languages{f}#143 + 5[INTEGER] AS lll]] - * \_Filter[languages{f}#143 > 1[INTEGER]] - * \_Limit[10[INTEGER]] - * \_MvExpand[first_name{f}#142] - * \_TopN[[Order[emp_no{f}#141,DESC,FIRST]],10[INTEGER]] - * \_Filter[emp_no{f}#141 < 10006[INTEGER]] - * \_EsRelation[test][emp_no{f}#141, first_name{f}#142, languages{f}#1..] + * EsqlProject[[emp_no{f}#19, first_name{r}#29, languages{f}#22, lll{r}#8, salary{r}#30]] + * \_TopN[[Order[salary{r}#30,DESC,FIRST]],5[INTEGER]] + * \_Limit[5[INTEGER]] + * \_MvExpand[salary{f}#24,salary{r}#30,2147483647] + * \_Eval[[languages{f}#22 + 5[INTEGER] AS lll]] + * \_Limit[5[INTEGER]] + * \_Filter[languages{f}#22 > 1[INTEGER]] + * \_Limit[10[INTEGER]] + * \_MvExpand[first_name{f}#20,first_name{r}#29,2147483647] + * \_TopN[[Order[emp_no{f}#19,DESC,FIRST]],10[INTEGER]] + * \_Filter[emp_no{f}#19 ≤ 10006[INTEGER]] + * \_EsRelation[test][_meta_field{f}#25, emp_no{f}#19, first_name{f}#20, ..] */ public void testMultipleMvExpandWithSortAndLimit() { LogicalPlan plan = optimizedPlan(""" @@ -1003,7 +1004,8 @@ public void testMultipleMvExpandWithSortAndLimit() { assertThat(limit.limit().fold(), equalTo(5)); var mvExp = as(limit.child(), MvExpand.class); var eval = as(mvExp.child(), Eval.class); - var filter = as(eval.child(), Filter.class); + var limit5 = as(eval.child(), Limit.class); + var filter = as(limit5.child(), Filter.class); limit = as(filter.child(), Limit.class); assertThat(limit.limit().fold(), equalTo(10)); mvExp = as(limit.child(), MvExpand.class); @@ -1203,6 +1205,38 @@ public void testAddDefaultLimit_BeforeMvExpand_WithFilterOnExpandedField() { as(topN.child(), EsRelation.class); } + /** + * Expected + * EsqlProject[[emp_no{f}#104, first_name{f}#105, salary{f}#106]] + * \_TopN[[Order[salary{f}#106,ASC,LAST], Order[first_name{f}#105,ASC,LAST]],15[INTEGER]] + * \_Filter[gender{f}#215 == [46][KEYWORD] AND WILDCARDLIKE(first_name{f}#105)] + * \_MvExpand[first_name{f}#105] + * \_TopN[[Order[emp_no{f}#104,ASC,LAST]],500[INTEGER]] + * \_EsRelation[employees][emp_no{f}#104, first_name{f}#105, salary{f}#106] + */ + public void testAddDefaultLimit_BeforeMvExpand_WithFilterOnExpandedField_ResultTruncationDefaultSize() { + LogicalPlan plan = optimizedPlan(""" + from test + | sort emp_no + | mv_expand first_name + | where gender == "F" + | where first_name LIKE "R*" + | keep emp_no, first_name, salary + | sort salary, first_name"""); + + var keep = as(plan, EsqlProject.class); + var topN = as(keep.child(), TopN.class); + assertThat(topN.limit().fold(), equalTo(500)); + assertThat(orderNames(topN), contains("salary", "first_name")); + var filter = as(topN.child(), Filter.class); + assertThat(filter.condition(), instanceOf(And.class)); + var mvExp = as(filter.child(), MvExpand.class); + topN = as(mvExp.child(), TopN.class); + assertThat(topN.limit().fold(), equalTo(500)); + assertThat(orderNames(topN), contains("emp_no")); + as(topN.child(), EsRelation.class); + } + /** * Expected * EsqlProject[[emp_no{f}#104, first_name{f}#105, salary{f}#106]] @@ -1273,6 +1307,94 @@ public void testAddDefaultLimit_BeforeMvExpand_WithFilterOnExpandedFieldAlias() as(topN.child(), EsRelation.class); } + /** + * Expected + * Limit[2[INTEGER]] + * \_Filter[a{r}#6 > 2[INTEGER]] + * \_MvExpand[a{r}#2,a{r}#6] + * \_Row[[[1, 2, 3][INTEGER] AS a]] + */ + public void testMvExpandFoldable() { + LogicalPlan plan = optimizedPlan(""" + row a = [1, 2, 3] + | mv_expand a + | where a > 2 + | limit 2"""); + + var limit = as(plan, Limit.class); + var filter = as(limit.child(), Filter.class); + var expand = as(filter.child(), MvExpand.class); + assertThat(filter.condition(), instanceOf(GreaterThan.class)); + var filterProp = ((GreaterThan) filter.condition()).left(); + assertTrue(expand.expanded().semanticEquals(filterProp)); + assertFalse(expand.target().semanticEquals(filterProp)); + var row = as(expand.child(), Row.class); + } + + /** + * Expected: + * Limit[500[INTEGER]] + * \_MvExpand[a{r}#2,a{r}#6,2147483647] + * \_TopN[[Order[a{r}#2,ASC,LAST]],500[INTEGER]] + * \_Row[[1[INTEGER] AS a]] + */ + public void testSortMvExpand() { + LogicalPlan plan = optimizedPlan(""" + row a = 1 + | sort a + | mv_expand a"""); + + var limit = as(plan, Limit.class); + var expand = as(limit.child(), MvExpand.class); + var topN = as(expand.child(), TopN.class); + var row = as(topN.child(), Row.class); + } + + /** + * Expected: + * Limit[20[INTEGER]] + * \_MvExpand[emp_no{f}#4,emp_no{r}#14,-1] + * \_TopN[[Order[emp_no{f}#4,ASC,LAST]],20[INTEGER]] + * \_EsRelation[test][_meta_field{f}#10, emp_no{f}#4, first_name{f}#5, ge..] + */ + public void testSortMvExpandLimit() { + LogicalPlan plan = optimizedPlan(""" + from test + | sort emp_no + | mv_expand emp_no + | limit 20"""); + + var limit = as(plan, Limit.class); + assertThat(limit.limit().fold(), is(20)); + var expand = as(limit.child(), MvExpand.class); + var topN = as(expand.child(), TopN.class); + assertThat(topN.limit().fold(), is(20)); + var row = as(topN.child(), EsRelation.class); + } + + /** + * Expected: + * Limit[500[INTEGER]] + * \_MvExpand[b{r}#4,b{r}#8,-1] + * \_Limit[500[INTEGER]] + * \_Row[[1[INTEGER] AS a, -15[INTEGER] AS b]] + * + * see https://github.com/elastic/elasticsearch/issues/102084 + */ + public void testWhereMvExpand() { + LogicalPlan plan = optimizedPlan(""" + row a = 1, b = -15 + | where b < 3 + | mv_expand b"""); + + var limit = as(plan, Limit.class); + assertThat(limit.limit().fold(), is(500)); + var expand = as(limit.child(), MvExpand.class); + var limit2 = as(expand.child(), Limit.class); + assertThat(limit2.limit().fold(), is(500)); + var row = as(limit2.child(), Row.class); + } + private static List orderNames(TopN topN) { return topN.order().stream().map(o -> as(o.child(), NamedExpression.class).name()).toList(); } @@ -2430,30 +2552,6 @@ public void testEliminateDuplicateAggsNonCount() { var source = as(agg.child(), EsRelation.class); } - /** - * Expected - * Limit[2[INTEGER]] - * \_Filter[a{r}#6 > 2[INTEGER]] - * \_MvExpand[a{r}#2,a{r}#6] - * \_Row[[[1, 2, 3][INTEGER] AS a]] - */ - public void testMvExpandFoldable() { - LogicalPlan plan = optimizedPlan(""" - row a = [1, 2, 3] - | mv_expand a - | where a > 2 - | limit 2"""); - - var limit = as(plan, Limit.class); - var filter = as(limit.child(), Filter.class); - var expand = as(filter.child(), MvExpand.class); - assertThat(filter.condition(), instanceOf(GreaterThan.class)); - var filterProp = ((GreaterThan) filter.condition()).left(); - assertTrue(expand.expanded().semanticEquals(filterProp)); - assertFalse(expand.target().semanticEquals(filterProp)); - var row = as(expand.child(), Row.class); - } - /** * Expected * Limit[500[INTEGER]] From 777486ee29da80314dff115e9dfdcdd0acc080f6 Mon Sep 17 00:00:00 2001 From: Andrei Stefan Date: Mon, 18 Dec 2023 19:38:41 +0200 Subject: [PATCH 2/2] Change the way mv_expand is handled --- .../xpack/esql/analysis/Analyzer.java | 74 +-- .../xpack/esql/io/stream/PlanNamedTypes.java | 2 +- .../esql/optimizer/LogicalPlanOptimizer.java | 160 ++++--- .../xpack/esql/parser/LogicalPlanBuilder.java | 2 +- .../xpack/esql/plan/logical/MvExpand.java | 22 +- .../esql/io/stream/PlanNamedTypesTests.java | 2 +- .../optimizer/LogicalPlanOptimizerTests.java | 446 ++++++++++++++---- 7 files changed, 484 insertions(+), 224 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java index ee62f5ccf75ce..04ca376e77699 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java @@ -49,9 +49,7 @@ import org.elasticsearch.xpack.ql.plan.logical.EsRelation; import org.elasticsearch.xpack.ql.plan.logical.Limit; import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan; -import org.elasticsearch.xpack.ql.plan.logical.OrderBy; import org.elasticsearch.xpack.ql.plan.logical.Project; -import org.elasticsearch.xpack.ql.plan.logical.UnaryPlan; import org.elasticsearch.xpack.ql.rule.ParameterizedRule; import org.elasticsearch.xpack.ql.rule.ParameterizedRuleExecutor; import org.elasticsearch.xpack.ql.rule.Rule; @@ -364,7 +362,8 @@ private LogicalPlan resolveMvExpand(MvExpand p, List childrenOutput) null, false ) - : resolved + : resolved, + p.limit() ); } return p; @@ -704,74 +703,7 @@ public LogicalPlan apply(LogicalPlan logicalPlan, AnalyzerContext context) { limit = context.configuration().resultTruncationMaxSize(); // user provided a limit: cap result entries to the max } - Limit l = new Limit(Source.EMPTY, new Literal(Source.EMPTY, limit, DataTypes.INTEGER), logicalPlan); - return maybeAddDefaultLimitForMvExpand(l); - } - - /** - * This adds the implicit limit to a plan that has a sort and no limit between EsRelation and the first MvExpand. - * To date, the only known use case that "needs" this is a query of the form - * from test - * | sort emp_no - * | mv_expand first_name - * | rename first_name AS x - * | where x LIKE "*a*" - * | limit 15 - * - * or - * - * from test - * | sort emp_no - * | mv_expand first_name - * | sort first_name - * | limit 15 - * - * LogicalPlanAnalyzer.PushDownAndCombineLimits rule will copy the "limit 15" after "sort emp_no" if there is no filter - * on the expanded values OR if there is no sort between "limit" and "mv_expand". In these two situations, to be able for - * "sort emp_no" to form a TopN, we need a limit. Since the "limit 15" in the query cannot be pushed down (otherwise, it will change - * the results that reach mv_expand command) we need some kind of value there. For now, this is the implicit limit. - * The second query above becomes: - * - * from test - * | sort emp_no - * | limit 10000 - * | mv_expand first_name - * | sort first_name - * | limit 15 - */ - private Limit maybeAddDefaultLimitForMvExpand(Limit limit) { - LogicalPlan plan = limit.child(); - MvExpand mvExpand = null; - UnaryPlan esRelationParent = null; - UnaryPlan orderByParent = null; - - // basically, locate the closest to Lucene mv_expand and any potential sort - while (plan instanceof UnaryPlan unaryPlan) { - if (plan instanceof MvExpand mve) { - mvExpand = mve; - orderByParent = null; - } else if (plan instanceof OrderBy && mvExpand != null) { - orderByParent = esRelationParent; - } - plan = unaryPlan.child(); - esRelationParent = unaryPlan; - } - - // when these two are found, place the default limit before sort - if (mvExpand != null && orderByParent != null && plan instanceof EsRelation) { - var duplicateLimit = new Limit(limit.source(), limit.limit(), orderByParent.child()); - return limit.replaceChild(propagateLimitUntilEsRelation(duplicateLimit, orderByParent, (UnaryPlan) limit.child())); - } - - return limit; - } - - private LogicalPlan propagateLimitUntilEsRelation(Limit duplicateLimit, UnaryPlan esRelationParent, UnaryPlan child) { - if (child == esRelationParent) { - return esRelationParent.replaceChild(duplicateLimit); - } else { - return child.replaceChild(propagateLimitUntilEsRelation(duplicateLimit, esRelationParent, (UnaryPlan) child.child())); - } + return new Limit(Source.EMPTY, new Literal(Source.EMPTY, limit, DataTypes.INTEGER), logicalPlan); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypes.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypes.java index a0e9c620d0fce..83bf856ff4ac1 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypes.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypes.java @@ -785,7 +785,7 @@ static void writeLimit(PlanStreamOutput out, Limit limit) throws IOException { } static MvExpand readMvExpand(PlanStreamInput in) throws IOException { - return new MvExpand(in.readSource(), in.readLogicalPlanNode(), in.readNamedExpression(), in.readAttribute()); + return new MvExpand(in.readSource(), in.readLogicalPlanNode(), in.readNamedExpression(), in.readAttribute(), null); } static void writeMvExpand(PlanStreamOutput out, MvExpand mvExpand) throws IOException { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java index ed5b7f250fdcf..7d0bd59f9e7bc 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java @@ -135,13 +135,16 @@ protected static List> rules() { new PruneColumns(), new PruneLiteralsInOrderBy(), new PushDownAndCombineLimits(), - new DuplicateLimitAfterMvExpand(), new PushDownAndCombineFilters(), new PushDownEval(), new PushDownRegexExtract(), new PushDownEnrich(), new PushDownAndCombineOrderBy(), new PruneOrderByBeforeStats(), + // see https://github.com/elastic/elasticsearch/issues/103543 + // new PushDownMvExpandPastProject(), + new DuplicateLimitAfterMvExpand(), + new PushDownMvExpandPastOrderBy(), new PruneRedundantSortClauses() ); @@ -151,7 +154,8 @@ protected static List> rules() { new ReplaceDuplicateAggWithEval(), // pushing down limits again, because ReplaceDuplicateAggWithEval could create new Project nodes that can still be optimized new PushDownAndCombineLimits(), - new ReplaceLimitAndSortAsTopN() + new ReplaceLimitAndSortAsTopN(), + new CleanUpMvExpand() ); var label = new Batch<>("Set as Optimized", Limiter.ONCE, new SetAsOptimized()); @@ -433,7 +437,7 @@ protected LogicalPlan rule(Limit limit) { var l2 = (int) childLimit.limit().fold(); return new Limit(limit.source(), Literal.of(limitSource, Math.min(l1, l2)), childLimit.child()); } else if (limit.child() instanceof UnaryPlan unary) { - if (unary instanceof Eval || unary instanceof Project || unary instanceof RegexExtract || unary instanceof Enrich) { + if (isTransparentForLimit(unary)) { return unary.replaceChild(limit.replaceChild(unary.child())); } // check if there's a 'visible' descendant limit lower than the current one @@ -478,64 +482,33 @@ private static Limit descendantLimit(UnaryPlan unary) { } } - static class DuplicateLimitAfterMvExpand extends OptimizerRules.OptimizerRule { + private static boolean isTransparentForLimit(LogicalPlan plan) { + return plan instanceof Eval || plan instanceof Project || plan instanceof RegexExtract || plan instanceof Enrich; + } + protected static class PushDownMvExpandPastOrderBy extends OptimizerRules.OptimizerRule { @Override - protected LogicalPlan rule(Limit limit) { - var child = limit.child(); - var shouldSkip = child instanceof Eval - || child instanceof Project - || child instanceof RegexExtract - || child instanceof Enrich - || child instanceof Limit; - - if (shouldSkip == false && child instanceof UnaryPlan unary) { - // in case unary is THE MvExpand, return it right away - MvExpand mvExpand = unary instanceof MvExpand mve ? mve : descendantMvExpand(unary); - if (mvExpand != null) { - Limit limitBeforeMvExpand = limitBeforeMvExpand(mvExpand); - // if there is no "appropriate" limit before mv_expand, then push down a copy of the one after it so that: - // - a possible TopN is properly built as low as possible in the tree (closed to Lucene) - // - the input of mv_expand is as small as possible before it is expanded (less rows to inflate and occupy memory) - // limitBeforeMvExpand > limit is a cheap way of not indefinetely copying the same limit past mv_expand - // ">" will enforce the limit of the mv_expand, limitting the results that come to mv_expand to bare minimum - if (limitBeforeMvExpand == null || (int) limitBeforeMvExpand.limit().fold() > (int) limit.limit().fold()) { - var duplicateLimit = new Limit(limit.source(), limit.limit(), mvExpand.child()); - return limit.replaceChild(propagateDuplicateLimitUntilMvExpand(duplicateLimit, mvExpand, unary)); - } + protected LogicalPlan rule(OrderBy upperOrderBy) { + if (upperOrderBy.child() instanceof UnaryPlan unary) { + MvExpand mvExpand = descendantMvExpand(unary); + if (mvExpand != null && mvExpand.child() instanceof OrderBy) { + return upperOrderBy.replaceChild(replaceChildUntilMvExpandAndOrderBy(unary, mvExpand)); } } - return limit; + + return upperOrderBy; } private static MvExpand descendantMvExpand(UnaryPlan unary) { UnaryPlan plan = unary; - AttributeSet filterReferences = new AttributeSet(); while (plan instanceof UnaryPlan) { - if (plan instanceof MvExpand mve) { - // don't return the mv_expand that has a filter after it which uses the expanded values - // since this will trigger the use of a potentially incorrect (too restrictive) limit further down in the tree - if (filterReferences.isEmpty() == false) { - if (filterReferences.contains(mve.target()) // the same field or reference attribute is used in mv_expand AND filter - || mve.target() instanceof ReferenceAttribute // or the mv_expand attr hasn't yet been resolved to a field attr - // or not all filter references have been resolved to field attributes - || filterReferences.stream().anyMatch(ref -> ref instanceof ReferenceAttribute)) { - return null; - } - } + // find the first MvExpand that also has an orderBy in front of it, otherwise keep looking, + // otherwise we will skip valid mv_expands further down the tree + if (plan instanceof MvExpand mve && mve.child() instanceof OrderBy) { return mve; - } else if (plan instanceof Filter filter) { - // gather all the filters' references to be checked later when a mv_expand is found - filterReferences.addAll(filter.references()); } else if (plan instanceof OrderBy) { - // ordering after mv_expand COULD break the order of the results, so the limit shouldn't be copied past mv_expand - // something like from test | sort emp_no | mv_expand job_positions | sort first_name | limit 5 - // (the sort first_name likely changes the order of the docs after sort emp_no, so "limit 5" shouldn't be copied down - return null; - } else if (plan instanceof Limit) { return null; } - if (plan.child() instanceof UnaryPlan unaryPlan) { plan = unaryPlan; } else { @@ -544,31 +517,97 @@ private static MvExpand descendantMvExpand(UnaryPlan unary) { } return null; } + } - private static Limit limitBeforeMvExpand(MvExpand mvExpand) { - UnaryPlan plan = mvExpand; - while (plan instanceof UnaryPlan) { - if (plan instanceof Limit limit) { - return limit; + protected static class PushDownMvExpandPastProject extends OptimizerRules.OptimizerRule { + @Override + protected LogicalPlan rule(MvExpand mvExpand) { + LogicalPlan child = mvExpand.child(); + if (child instanceof Project) { + return pushDownPastProject(mvExpand); + } + + return mvExpand; + } + } + + static class DuplicateLimitAfterMvExpand extends OptimizerRules.OptimizerRule { + @Override + protected LogicalPlan rule(Limit limit) { + var child = limit.child(); + // let the PushDownAndCombineLimits rule to push Limit further down + var shouldSkip = isTransparentForLimit(child) || child instanceof Limit; + if (shouldSkip || child instanceof UnaryPlan == false) { + return limit; + } + + UnaryPlan unary = (UnaryPlan) child; + MvExpand mvExpand = null; + AttributeSet filterReferences = new AttributeSet(); + boolean hasOrderBy = false; + boolean isFiltered = false; + // look for any orderBy between this limit and mv_expand + // and, separately, for any filter that is filtering the expanded values of mv_expand + while (mvExpand == null && unary instanceof UnaryPlan) { + if (unary instanceof MvExpand mve) { + if (filterReferences.isEmpty() == false) { + if (filterReferences.contains(mve.expanded())) {// if the filter is on the mv_expand field + isFiltered = true; + } + } + mvExpand = mve; + break; + } else if (unary instanceof Filter filter) { + // gather all the filters' references to be checked later when a mv_expand is found + filterReferences.addAll(filter.references()); + } else if (unary instanceof OrderBy) { + hasOrderBy = true; + break; + } else if (unary instanceof Limit) { + break; } - if (plan.child() instanceof UnaryPlan unaryPlan) { - plan = unaryPlan; + + if (unary.child() instanceof UnaryPlan unaryPlan) { + unary = unaryPlan; } else { break; } } - return null; + + if (mvExpand != null) { + // if the filter is on the expanded values and the order of the results doesn't change, push down mv_expand past sort + if (isFiltered && hasOrderBy == false) { + if (mvExpand.child() instanceof OrderBy) { + return limit.replaceChild(replaceChildUntilMvExpandAndOrderBy((UnaryPlan) child, mvExpand)); + } + } else if (mvExpand.limit() == null) { // otherwise push down a copy of the limit before mv_expand + var duplicateLimit = new Limit(limit.source(), limit.limit(), mvExpand.child()); + return limit.replaceChild(propagateDuplicateLimitUntilMvExpand(duplicateLimit, mvExpand, (UnaryPlan) child)); + } + } + + return limit; } private LogicalPlan propagateDuplicateLimitUntilMvExpand(Limit duplicateLimit, MvExpand mvExpand, UnaryPlan child) { if (child == mvExpand) { - return mvExpand.replaceChild(duplicateLimit); + return new MvExpand(mvExpand.source(), duplicateLimit, mvExpand.target(), mvExpand.expanded(), duplicateLimit); } else { return child.replaceChild(propagateDuplicateLimitUntilMvExpand(duplicateLimit, mvExpand, (UnaryPlan) child.child())); } } } + private static LogicalPlan replaceChildUntilMvExpandAndOrderBy(UnaryPlan child, MvExpand mvExpand) { + if (child == mvExpand) { + assert mvExpand.child() instanceof OrderBy; + OrderBy orderBy = (OrderBy) mvExpand.child(); + return orderBy.replaceChild(mvExpand.replaceChild(orderBy.child())); + } else { + return child.replaceChild(replaceChildUntilMvExpandAndOrderBy((UnaryPlan) child.child(), mvExpand)); + } + } + // 3 in (field, 4, 5) --> 3 in (field) or 3 in (4, 5) public static class SplitInWithFoldableValue extends OptimizerRules.OptimizerExpressionRule { @@ -1018,6 +1057,13 @@ protected LogicalPlan rule(Limit plan) { } } + static class CleanUpMvExpand extends OptimizerRules.OptimizerRule { + @Override + protected LogicalPlan rule(MvExpand plan) { + return plan.limit() == null ? plan : new MvExpand(plan.source(), plan.child(), plan.target(), plan.expanded(), null); + } + } + public static class ReplaceRegexMatch extends OptimizerRules.ReplaceRegexMatch { protected Expression regexToEquals(RegexMatch regexMatch, Literal literal) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java index f9d1a252afe42..1d21ceb12335b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java @@ -151,7 +151,7 @@ public PlanFactory visitDissectCommand(EsqlBaseParser.DissectCommandContext ctx) public PlanFactory visitMvExpandCommand(EsqlBaseParser.MvExpandCommandContext ctx) { UnresolvedAttribute field = visitQualifiedName(ctx.qualifiedName()); Source src = source(ctx); - return child -> new MvExpand(src, child, field, new UnresolvedAttribute(src, field.name())); + return child -> new MvExpand(src, child, field, new UnresolvedAttribute(src, field.name()), null); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/MvExpand.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/MvExpand.java index 0cdcd4af00026..126dbc858a7d2 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/MvExpand.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/MvExpand.java @@ -9,6 +9,7 @@ import org.elasticsearch.xpack.ql.expression.Attribute; import org.elasticsearch.xpack.ql.expression.NamedExpression; +import org.elasticsearch.xpack.ql.plan.logical.Limit; import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.ql.plan.logical.UnaryPlan; import org.elasticsearch.xpack.ql.tree.NodeInfo; @@ -23,11 +24,13 @@ public class MvExpand extends UnaryPlan { private final Attribute expanded; private List output; + private Limit limit; // a temporary "marker" indicating if a limit has been "associated" with this mv_expand - public MvExpand(Source source, LogicalPlan child, NamedExpression target, Attribute expanded) { + public MvExpand(Source source, LogicalPlan child, NamedExpression target, Attribute expanded, Limit limit) { super(source, child); this.target = target; this.expanded = expanded; + this.limit = limit; } public static List calculateOutput(List input, NamedExpression target, Attribute expanded) { @@ -50,6 +53,10 @@ public Attribute expanded() { return expanded; } + public Limit limit() { + return limit; + } + @Override public boolean expressionsResolved() { return target.resolved(); @@ -57,7 +64,7 @@ public boolean expressionsResolved() { @Override public UnaryPlan replaceChild(LogicalPlan newChild) { - return new MvExpand(source(), newChild, target, expanded); + return new MvExpand(source(), newChild, target, expanded, limit); } @Override @@ -70,19 +77,20 @@ public List output() { @Override protected NodeInfo info() { - return NodeInfo.create(this, MvExpand::new, child(), target, expanded); + return NodeInfo.create(this, MvExpand::new, child(), target, expanded, limit); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), target, expanded); + return Objects.hash(super.hashCode(), target, expanded, limit); } @Override public boolean equals(Object obj) { - if (false == super.equals(obj)) { - return false; + if (super.equals(obj)) { + var other = (MvExpand) obj; + return Objects.equals(target, other.target) && Objects.equals(expanded, other.expanded) && Objects.equals(limit, other.limit); } - return Objects.equals(target, ((MvExpand) obj).target) && Objects.equals(expanded, ((MvExpand) obj).expanded); + return false; } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypesTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypesTests.java index 85612427a1867..f3900a5273cc3 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypesTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypesTests.java @@ -488,7 +488,7 @@ public void testEsqlProject() throws IOException { public void testMvExpand() throws IOException { var esRelation = new EsRelation(Source.EMPTY, randomEsIndex(), List.of(randomFieldAttribute()), randomBoolean()); - var orig = new MvExpand(Source.EMPTY, esRelation, randomFieldAttribute(), randomFieldAttribute()); + var orig = new MvExpand(Source.EMPTY, esRelation, randomFieldAttribute(), randomFieldAttribute(), null); BytesStreamOutput bso = new BytesStreamOutput(); PlanStreamOutput out = new PlanStreamOutput(bso, planNameRegistry); PlanNamedTypes.writeMvExpand(out, orig); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java index 30553d1f99c51..9e78fca5ea165 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java @@ -107,7 +107,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.startsWith; -//@TestLogging(value = "org.elasticsearch.xpack.esql:TRACE", reason = "debug") +//@TestLogging(value = "org.elasticsearch.xpack.esql.optimizer:TRACE", reason = "debug") public class LogicalPlanOptimizerTests extends ESTestCase { private static final Literal ONE = L(1); @@ -897,12 +897,11 @@ public void testCombineOrderByThroughFilter() { /** * Expected - * TopN[[Order[first_name{f}#170,ASC,LAST]],500[INTEGER]] - * \_MvExpand[first_name{f}#170] - * \_TopN[[Order[emp_no{f}#169,ASC,LAST]],500[INTEGER]] - * \_EsRelation[test][avg_worked_seconds{f}#167, birth_date{f}#168, emp_n..] + * TopN[[Order[first_name{r}#16,ASC,LAST]],500[INTEGER]] + * \_MvExpand[first_name{f}#7,first_name{r}#16] + * \_EsRelation[test][_meta_field{f}#12, emp_no{f}#6, first_name{f}#7, ge..] */ - public void testDontCombineOrderByThroughMvExpand() { + public void testRemoveUnusedSortBeforeMvExpand_DefaultLimit500() { LogicalPlan plan = optimizedPlan(""" from test | sort emp_no @@ -911,10 +910,30 @@ public void testDontCombineOrderByThroughMvExpand() { var topN = as(plan, TopN.class); assertThat(orderNames(topN), contains("first_name")); + assertThat(topN.limit().fold(), equalTo(500)); var mvExpand = as(topN.child(), MvExpand.class); - topN = as(mvExpand.child(), TopN.class); - assertThat(orderNames(topN), contains("emp_no")); - as(topN.child(), EsRelation.class); + as(mvExpand.child(), EsRelation.class); + } + + /** + * Expected + * TopN[[Order[first_name{r}#16,ASC,LAST]],10000[INTEGER]] + * \_MvExpand[first_name{f}#7,first_name{r}#16] + * \_EsRelation[test][_meta_field{f}#12, emp_no{f}#6, first_name{f}#7, ge..] + */ + public void testRemoveUnusedSortBeforeMvExpand_DefaultLimit10000() { + LogicalPlan plan = optimizedPlan(""" + from test + | sort emp_no + | mv_expand first_name + | sort first_name + | limit 15000"""); + + var topN = as(plan, TopN.class); + assertThat(orderNames(topN), contains("first_name")); + assertThat(topN.limit().fold(), equalTo(10000)); + var mvExpand = as(topN.child(), MvExpand.class); + as(mvExpand.child(), EsRelation.class); } /** @@ -966,6 +985,85 @@ public void testDontPushDownLimitPastMvExpand() { as(limit.child(), EsRelation.class); } + /** + * Expected + * Limit[10[INTEGER]] + * \_MvExpand[first_name{f}#7,first_name{r}#16] + * \_TopN[[Order[emp_no{r}#6,DESC,FIRST]],10[INTEGER]] + * \_Filter[emp_no{f}#6 ≤ 10006[INTEGER]] + * \_EsRelation[test][_meta_field{f}#12, emp_no{f}#6, first_name{f}#7, ge..] + */ + public void testFilterWithSortBeforeMvExpand() { + LogicalPlan plan = optimizedPlan(""" + from test + | where emp_no <= 10006 + | sort emp_no desc + | mv_expand first_name + | limit 10"""); + + var limit = as(plan, Limit.class); + assertThat(limit.limit().fold(), equalTo(10)); + var mvExp = as(limit.child(), MvExpand.class); + var topN = as(mvExp.child(), TopN.class); + assertThat(topN.limit().fold(), equalTo(10)); + assertThat(orderNames(topN), contains("emp_no")); + var filter = as(topN.child(), Filter.class); + as(filter.child(), EsRelation.class); + } + + /** + * Expected + * + * TopN[[Order[first_name{f}#10,ASC,LAST]],500[INTEGER]] + * \_MvExpand[last_name{f}#13,last_name{r}#20,null] + * \_Filter[emp_no{r}#19 > 10050[INTEGER]] + * \_MvExpand[emp_no{f}#9,emp_no{r}#19,null] + * \_EsRelation[test][_meta_field{f}#15, emp_no{f}#9, first_name{f}#10, g..] + */ + public void testMultiMvExpand_SortDownBelow() { + LogicalPlan plan = optimizedPlan(""" + from test + | sort last_name ASC + | mv_expand emp_no + | where emp_no > 10050 + | mv_expand last_name + | sort first_name"""); + + var topN = as(plan, TopN.class); + assertThat(topN.limit().fold(), equalTo(500)); + assertThat(orderNames(topN), contains("first_name")); + var mvExpand = as(topN.child(), MvExpand.class); + var filter = as(mvExpand.child(), Filter.class); + mvExpand = as(filter.child(), MvExpand.class); + as(mvExpand.child(), EsRelation.class); + } + + /** + * Not sure what to expect from this one. + * The default LIMIT cannot be duplicated past mv_expand because there is a filter on the expanded values and we want the filter to + * act on all values. + * + * If we try to rewrite it to + * from test + * | mv_expand first_name + * | where first_name is not null + * | sort last_name + * | limit 500 + * | keep first_name + * + * sorting on last_name after expansion is not exactly the same thing as sorting on last_name before expansion. The way the expansion + * works should be the same as sorting on last_name after the expansion. + */ + @AwaitsFix(bugUrl = "") + public void testImpossibleTopNWithMvExpand() { + LogicalPlan plan = optimizedPlan(""" + from test + | sort last_name + | keep first_name + | mv_expand first_name + | where first_name is not null"""); + } + /** * Expected * EsqlProject[[emp_no{f}#19, first_name{r}#29, languages{f}#22, lll{r}#8, salary{r}#30]] @@ -1017,13 +1115,12 @@ public void testMultipleMvExpandWithSortAndLimit() { /** * Expected - * EsqlProject[[emp_no{f}#350, first_name{f}#351, salary{f}#352]] - * \_TopN[[Order[salary{f}#352,ASC,LAST], Order[first_name{f}#351,ASC,LAST]],5[INTEGER]] - * \_MvExpand[first_name{f}#351] - * \_TopN[[Order[emp_no{f}#350,ASC,LAST]],10000[INTEGER]] - * \_EsRelation[employees][emp_no{f}#350, first_name{f}#351, salary{f}#352] + * EsqlProject[[emp_no{f}#10, first_name{r}#20, salary{f}#15]] + * \_TopN[[Order[salary{f}#15,ASC,LAST], Order[first_name{r}#20,ASC,LAST]],5[INTEGER]] + * \_MvExpand[first_name{f}#11,first_name{r}#20] + * \_EsRelation[test][_meta_field{f}#16, emp_no{f}#10, first_name{f}#11, ..] */ - public void testPushDownLimitThroughMultipleSort_AfterMvExpand() { + public void testRemoveSortBeforeMvExpand() { LogicalPlan plan = optimizedPlan(""" from test | sort emp_no @@ -1037,24 +1134,73 @@ public void testPushDownLimitThroughMultipleSort_AfterMvExpand() { assertThat(topN.limit().fold(), equalTo(5)); assertThat(orderNames(topN), contains("salary", "first_name")); var mvExp = as(topN.child(), MvExpand.class); - topN = as(mvExp.child(), TopN.class); - assertThat(topN.limit().fold(), equalTo(10000)); - assertThat(orderNames(topN), contains("emp_no")); - as(topN.child(), EsRelation.class); + as(mvExp.child(), EsRelation.class); } /** + * This test checks that the removal of the duplicated "limit" by the PropagateEmptyRelation is not added back by the mv_expand rules. + * * Expected - * EsqlProject[[emp_no{f}#361, first_name{f}#362, salary{f}#363]] - * \_TopN[[Order[first_name{f}#362,ASC,LAST]],5[INTEGER]] - * \_TopN[[Order[salary{f}#363,ASC,LAST]],5[INTEGER]] - * \_MvExpand[first_name{f}#362] - * \_TopN[[Order[emp_no{f}#361,ASC,LAST]],10000[INTEGER]] - * \_EsRelation[employees][emp_no{f}#361, first_name{f}#362, salary{f}#363] + * LocalRelation[[_meta_field{f}#10, emp_no{f}#4, first_name{r}#14, gender{f}#6, job{f}#11, job.raw{f}#12, languages{f}#7, last + * _name{f}#8, long_noidx{f}#13, salary{f}#9],EMPTY] */ - public void testPushDownLimitThroughMultipleSort_AfterMvExpand2() { + public void testInfinteLoopInLimitPushDown_WithLocalRelation() { LogicalPlan plan = optimizedPlan(""" from test + | where false + | mv_expand first_name"""); + + assertThat(plan, instanceOf(LocalRelation.class)); + } + + /** + * Expected + * + * Limit[10000[INTEGER]] + * \_MvExpand[c{r}#7,c{r}#16] + * \_EsqlProject[[c{r}#7, a{r}#3]] + * \_TopN[[Order[a{r}#3,ASC,FIRST]],7300[INTEGER]] + * \_Limit[7300[INTEGER]] + * \_MvExpand[b{r}#5,b{r}#15] + * \_Limit[7300[INTEGER]] + * \_Row[[null[NULL] AS a, 123[INTEGER] AS b, 234[INTEGER] AS c]] + */ + public void testLimitThenSortBeforeMvExpand() { + LogicalPlan plan = optimizedPlan(""" + row a = null, b = 123, c = 234 + | mv_expand b + | limit 7300 + | keep c, a + | sort a NULLS FIRST + | mv_expand c"""); + + var limit = as(plan, Limit.class); + assertThat(limit.limit().fold(), equalTo(10000)); + var mvExpand = as(limit.child(), MvExpand.class); + var project = as(mvExpand.child(), EsqlProject.class); + var topN = as(project.child(), TopN.class); + assertThat(topN.limit().fold(), equalTo(7300)); + assertThat(orderNames(topN), contains("a")); + limit = as(topN.child(), Limit.class); + mvExpand = as(limit.child(), MvExpand.class); + limit = as(mvExpand.child(), Limit.class); + assertThat(limit.limit().fold(), equalTo(7300)); + as(limit.child(), Row.class); + } + + /** + * Expected + * EsqlProject[[emp_no{f}#10, first_name{r}#20, salary{f}#15]] + * \_TopN[[Order[first_name{r}#20,ASC,LAST]],5[INTEGER]] + * \_TopN[[Order[salary{f}#15,ASC,LAST]],5[INTEGER]] + * \_MvExpand[first_name{f}#11,first_name{r}#20] + * \_Limit[123[INTEGER]] + * \_EsRelation[test][_meta_field{f}#16, emp_no{f}#10, first_name{f}#11, ..] + */ + public void testRemoveSortBeforeMvExpand_WithLimit() { + LogicalPlan plan = optimizedPlan(""" + from test + | limit 123 | sort emp_no | mv_expand first_name | keep emp_no, first_name, salary @@ -1070,10 +1216,71 @@ public void testPushDownLimitThroughMultipleSort_AfterMvExpand2() { assertThat(topN.limit().fold(), equalTo(5)); assertThat(orderNames(topN), contains("salary")); var mvExp = as(topN.child(), MvExpand.class); - topN = as(mvExp.child(), TopN.class); + var limit = as(mvExp.child(), Limit.class); + assertThat(limit.limit().fold(), equalTo(123)); + as(limit.child(), EsRelation.class); + } + + /** + * Expected + * + * EsqlProject[[_meta_field{f}#15, emp_no{f}#9, first_name{f}#10, gender{f}#11, job{f}#16, job.raw{f}#17, languages{f}#12, la + * st_name{f}#13 AS ln, long_noidx{f}#18, salary{f}#14]] + * \_TopN[[Order[emp_no{f}#9,ASC,LAST]],10000[INTEGER]] + * \_MvExpand[gender{f}#11,gender{r}#19] + * \_Limit[20[INTEGER]] + * \_EsRelation[test][_meta_field{f}#15, emp_no{f}#9, first_name{f}#10, g..] + */ + @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/103543") + public void testMvExpandWithSortsBeforeAndAfter() { + LogicalPlan plan = optimizedPlan(""" + from test + | limit 20 + | rename last_name AS ln + | sort first_name DESC NULLS FIRST + | mv_expand gender + | sort emp_no ASC"""); + + var project = as(plan, EsqlProject.class); + var topN = as(project.child(), TopN.class); assertThat(topN.limit().fold(), equalTo(10000)); assertThat(orderNames(topN), contains("emp_no")); - as(topN.child(), EsRelation.class); + var mvExp = as(topN.child(), MvExpand.class); + var limit = as(mvExp.child(), Limit.class); + assertThat(limit.limit().fold(), equalTo(20)); + as(limit.child(), EsRelation.class); + } + + /** + * Expected + * EsqlProject[[emp_no{f}#10, first_name{r}#20, salary{f}#15]] + * \_TopN[[Order[first_name{r}#20,ASC,LAST]],5[INTEGER]] + * \_TopN[[Order[salary{f}#15,ASC,LAST]],5[INTEGER]] + * \_MvExpand[first_name{f}#11,first_name{r}#20] + * \_Limit[123[INTEGER]] + * \_EsRelation[test][_meta_field{f}#16, emp_no{f}#10, first_name{f}#11, ..] + */ + public void testRemoveSortBeforeMvExpand_WithEval() { + LogicalPlan plan = optimizedPlan(""" + from test + | sort emp_no + | eval foo = emp_no + 100 + | mv_expand first_name + | keep emp_no, first_name, salary, foo + | sort salary + | limit 5 + | sort first_name"""); + + var keep = as(plan, EsqlProject.class); + var topN = as(keep.child(), TopN.class); + assertThat(topN.limit().fold(), equalTo(5)); + assertThat(orderNames(topN), contains("first_name")); + topN = as(topN.child(), TopN.class); + assertThat(topN.limit().fold(), equalTo(5)); + assertThat(orderNames(topN), contains("salary")); + var mvExp = as(topN.child(), MvExpand.class); + var eval = as(mvExp.child(), Eval.class); + as(eval.child(), EsRelation.class); } /** @@ -1107,6 +1314,52 @@ public void testDontPushDownLimitPastAggregate_AndMvExpand() { as(topN.child(), EsRelation.class); } + /** + * Expected + * TopN[[Order[emp_no{f}#6,ASC,LAST]],5[INTEGER]] + * \_Filter[ISNOTNULL(first_name{r}#16)] + * \_MvExpand[first_name{f}#7,first_name{r}#16] + * \_EsRelation[test][_meta_field{f}#12, emp_no{f}#6, first_name{f}#7, ge..] + */ + public void testDontPushDownLimit_ButPushMvExpandPastSort() { + LogicalPlan plan = optimizedPlan(""" + from test + | sort emp_no + | mv_expand first_name + | where first_name is not null + | limit 5"""); + + var topN = as(plan, TopN.class); + assertThat(topN.limit().fold(), equalTo(5)); + assertThat(orderNames(topN), contains("emp_no")); + var filter = as(topN.child(), Filter.class); + var mvExp = as(filter.child(), MvExpand.class); + as(mvExp.child(), EsRelation.class); + } + + /** + * Expected + * TopN[[Order[emp_no{f}#6,ASC,LAST]],5[INTEGER]] + * \_Filter[ISNOTNULL(first_name{r}#16)] + * \_MvExpand[first_name{f}#7,first_name{r}#16] + * \_EsRelation[test][_meta_field{f}#12, emp_no{f}#6, first_name{f}#7, ge..] + */ + public void testDontPushDownLimit_ButPushMvExpandPastSort2() { + LogicalPlan plan = optimizedPlan(""" + from test + | sort emp_no + | mv_expand first_name + | where first_name is not null + | limit 5"""); + + var topN = as(plan, TopN.class); + assertThat(topN.limit().fold(), equalTo(5)); + assertThat(orderNames(topN), contains("emp_no")); + var filter = as(topN.child(), Filter.class); + var mvExp = as(filter.child(), MvExpand.class); + as(mvExp.child(), EsRelation.class); + } + /** * Expected * Limit[5[INTEGER]] @@ -1141,14 +1394,13 @@ public void testPushDown_TheRightLimit_PastMvExpand() { /** * Expected - * EsqlProject[[first_name{f}#11, emp_no{f}#10, salary{f}#12, b{r}#4]] - * \_TopN[[Order[salary{f}#12,ASC,LAST]],5[INTEGER]] - * \_Eval[[100[INTEGER] AS b]] - * \_MvExpand[first_name{f}#11] - * \_TopN[[Order[first_name{f}#11,ASC,LAST]],10000[INTEGER]] - * \_EsRelation[employees][emp_no{f}#10, first_name{f}#11, salary{f}#12] + * EsqlProject[[first_name{r}#22, emp_no{f}#12, salary{f}#17, b{r}#6]] + * \_TopN[[Order[salary{f}#17,ASC,LAST]],5[INTEGER]] + * \_Eval[[100[INTEGER] AS b]] + * \_MvExpand[first_name{f}#13,first_name{r}#22] + * \_EsRelation[test][_meta_field{f}#18, emp_no{f}#12, first_name{f}#13, ..] */ - public void testPushDownLimit_PastEvalAndMvExpand() { + public void testPushDownMvExpand_PastSameFieldSorting() { LogicalPlan plan = optimizedPlan(""" from test | sort first_name @@ -1164,22 +1416,64 @@ public void testPushDownLimit_PastEvalAndMvExpand() { assertThat(orderNames(topN), contains("salary")); var eval = as(topN.child(), Eval.class); var mvExp = as(eval.child(), MvExpand.class); - topN = as(mvExp.child(), TopN.class); - assertThat(topN.limit().fold(), equalTo(10000)); + as(mvExp.child(), EsRelation.class); + } + + /** + * Expected + * TopN[[Order[salary{f}#11,ASC,LAST]],5[INTEGER]] + * \_MvExpand[first_name{f}#7,first_name{r}#16] + * \_EsRelation[test][_meta_field{f}#12, emp_no{f}#6, first_name{f}#7, ge..] + */ + public void testPushDownMvExpand_PastSameFieldSorting2() { + LogicalPlan plan = optimizedPlan(""" + from test + | sort first_name + | mv_expand first_name + | sort salary + | limit 5"""); + + var topN = as(plan, TopN.class); + assertThat(topN.limit().fold(), equalTo(5)); + assertThat(orderNames(topN), contains("salary")); + var mvExp = as(topN.child(), MvExpand.class); + as(mvExp.child(), EsRelation.class); + } + + /** + * Expected + * EsqlProject[[first_name{r}#16]] + * \_Limit[500[INTEGER]] + * \_MvExpand[first_name{f}#7,first_name{r}#16] + * \_TopN[[Order[first_name{f}#7,ASC,LAST]],500[INTEGER]] + * \_EsRelation[test][_meta_field{f}#12, emp_no{f}#6, first_name{f}#7, ge..] + */ + public void testDontPushDownMvExpand_DefaultLimit() { + LogicalPlan plan = optimizedPlan(""" + from test + | sort first_name + | mv_expand first_name + | keep first_name"""); + + var keep = as(plan, EsqlProject.class); + var limit = as(keep.child(), Limit.class); + assertThat(limit.limit().fold(), equalTo(500)); + var mvExp = as(limit.child(), MvExpand.class); + var topN = as(mvExp.child(), TopN.class); + assertThat(topN.limit().fold(), equalTo(500)); assertThat(orderNames(topN), contains("first_name")); as(topN.child(), EsRelation.class); } /** * Expected - * EsqlProject[[emp_no{f}#104, first_name{f}#105, salary{f}#106]] - * \_TopN[[Order[salary{f}#106,ASC,LAST], Order[first_name{f}#105,ASC,LAST]],15[INTEGER]] - * \_Filter[gender{f}#215 == [46][KEYWORD] AND WILDCARDLIKE(first_name{f}#105)] - * \_MvExpand[first_name{f}#105] - * \_TopN[[Order[emp_no{f}#104,ASC,LAST]],10000[INTEGER]] - * \_EsRelation[employees][emp_no{f}#104, first_name{f}#105, salary{f}#106] + * EsqlProject[[emp_no{f}#12, first_name{r}#22, salary{f}#17]] + * \_TopN[[Order[salary{f}#17,ASC,LAST], Order[first_name{r}#22,ASC,LAST]],15[INTEGER]] + * \_Filter[gender{f}#14 == [46][KEYWORD] AND WILDCARDLIKE(first_name{r}#22)] + * \_MvExpand[first_name{f}#13,first_name{r}#22] + * \_EsRelation[test][_meta_field{f}#18, emp_no{f}#12, first_name{f}#13, ..] */ - public void testAddDefaultLimit_BeforeMvExpand_WithFilterOnExpandedField() { + public void testPushDownMvExpand() { LogicalPlan plan = optimizedPlan(""" from test | sort emp_no @@ -1197,24 +1491,18 @@ public void testAddDefaultLimit_BeforeMvExpand_WithFilterOnExpandedField() { var filter = as(topN.child(), Filter.class); assertThat(filter.condition(), instanceOf(And.class)); var mvExp = as(filter.child(), MvExpand.class); - topN = as(mvExp.child(), TopN.class); - // the filter acts on first_name (the one used in mv_expand), so the limit 15 is not pushed down past mv_expand - // instead the default limit is added - assertThat(topN.limit().fold(), equalTo(10000)); - assertThat(orderNames(topN), contains("emp_no")); - as(topN.child(), EsRelation.class); + as(mvExp.child(), EsRelation.class); } /** * Expected - * EsqlProject[[emp_no{f}#104, first_name{f}#105, salary{f}#106]] - * \_TopN[[Order[salary{f}#106,ASC,LAST], Order[first_name{f}#105,ASC,LAST]],15[INTEGER]] - * \_Filter[gender{f}#215 == [46][KEYWORD] AND WILDCARDLIKE(first_name{f}#105)] - * \_MvExpand[first_name{f}#105] - * \_TopN[[Order[emp_no{f}#104,ASC,LAST]],500[INTEGER]] - * \_EsRelation[employees][emp_no{f}#104, first_name{f}#105, salary{f}#106] + * EsqlProject[[emp_no{f}#12, first_name{r}#22, salary{f}#17]] + * \_TopN[[Order[salary{f}#17,ASC,LAST], Order[first_name{r}#22,ASC,LAST]],500[INTEGER]] + * \_Filter[gender{f}#14 == [46][KEYWORD] AND WILDCARDLIKE(first_name{r}#22)] + * \_MvExpand[first_name{f}#13,first_name{r}#22] + * \_EsRelation[test][_meta_field{f}#18, emp_no{f}#12, first_name{f}#13, ..] */ - public void testAddDefaultLimit_BeforeMvExpand_WithFilterOnExpandedField_ResultTruncationDefaultSize() { + public void testPushDownMvExpand2() { LogicalPlan plan = optimizedPlan(""" from test | sort emp_no @@ -1231,22 +1519,18 @@ public void testAddDefaultLimit_BeforeMvExpand_WithFilterOnExpandedField_ResultT var filter = as(topN.child(), Filter.class); assertThat(filter.condition(), instanceOf(And.class)); var mvExp = as(filter.child(), MvExpand.class); - topN = as(mvExp.child(), TopN.class); - assertThat(topN.limit().fold(), equalTo(500)); - assertThat(orderNames(topN), contains("emp_no")); - as(topN.child(), EsRelation.class); + as(mvExp.child(), EsRelation.class); } /** * Expected - * EsqlProject[[emp_no{f}#104, first_name{f}#105, salary{f}#106]] - * \_TopN[[Order[salary{f}#106,ASC,LAST], Order[first_name{f}#105,ASC,LAST]],15[INTEGER]] - * \_Filter[gender{f}#215 == [46][KEYWORD] AND salary{f}#106 > 60000[INTEGER]] - * \_MvExpand[first_name{f}#105] - * \_TopN[[Order[emp_no{f}#104,ASC,LAST]],10000[INTEGER]] - * \_EsRelation[employees][emp_no{f}#104, first_name{f}#105, salary{f}#106] + * EsqlProject[[emp_no{f}#12, first_name{r}#22, salary{f}#17]] + * \_TopN[[Order[salary{f}#17,ASC,LAST], Order[first_name{r}#22,ASC,LAST]],15[INTEGER]] + * \_Filter[gender{f}#14 == [46][KEYWORD] AND salary{f}#17 > 60000[INTEGER]] + * \_MvExpand[first_name{f}#13,first_name{r}#22] + * \_EsRelation[test][_meta_field{f}#18, emp_no{f}#12, first_name{f}#13, ..] */ - public void testAddDefaultLimit_BeforeMvExpand_WithFilter_NOT_OnExpandedField() { + public void testPushDownMvExpand3() { LogicalPlan plan = optimizedPlan(""" from test | sort emp_no @@ -1264,24 +1548,18 @@ public void testAddDefaultLimit_BeforeMvExpand_WithFilter_NOT_OnExpandedField() var filter = as(topN.child(), Filter.class); assertThat(filter.condition(), instanceOf(And.class)); var mvExp = as(filter.child(), MvExpand.class); - topN = as(mvExp.child(), TopN.class); - // the filters after mv_expand do not act on the expanded field values, as such the limit 15 is the one being pushed down - // otherwise that limit wouldn't have pushed down and the default limit was instead being added by default before mv_expanded - assertThat(topN.limit().fold(), equalTo(10000)); - assertThat(orderNames(topN), contains("emp_no")); - as(topN.child(), EsRelation.class); + as(mvExp.child(), EsRelation.class); } /** * Expected - * EsqlProject[[emp_no{f}#116, first_name{f}#117 AS x, salary{f}#119]] - * \_TopN[[Order[salary{f}#119,ASC,LAST], Order[first_name{f}#117,ASC,LAST]],15[INTEGER]] - * \_Filter[gender{f}#118 == [46][KEYWORD] AND WILDCARDLIKE(first_name{f}#117)] - * \_MvExpand[first_name{f}#117] - * \_TopN[[Order[gender{f}#118,ASC,LAST]],10000[INTEGER]] - * \_EsRelation[employees][emp_no{f}#116, first_name{f}#117, gender{f}#118, sa..] + * EsqlProject[[emp_no{f}#14, first_name{r}#24 AS x, salary{f}#19]] + * \_TopN[[Order[salary{f}#19,ASC,LAST], Order[first_name{r}#24,ASC,LAST]],15[INTEGER]] + * \_Filter[gender{f}#16 == [46][KEYWORD] AND WILDCARDLIKE(first_name{r}#24)] + * \_MvExpand[first_name{f}#15,first_name{r}#24] + * \_EsRelation[test][_meta_field{f}#20, emp_no{f}#14, first_name{f}#15, ..] */ - public void testAddDefaultLimit_BeforeMvExpand_WithFilterOnExpandedFieldAlias() { + public void testPushDownMvExpand4() { LogicalPlan plan = optimizedPlan(""" from test | sort gender @@ -1300,11 +1578,7 @@ public void testAddDefaultLimit_BeforeMvExpand_WithFilterOnExpandedFieldAlias() var filter = as(topN.child(), Filter.class); assertThat(filter.condition(), instanceOf(And.class)); var mvExp = as(filter.child(), MvExpand.class); - topN = as(mvExp.child(), TopN.class); - // the filter uses an alias ("x") to the expanded field ("first_name"), so the default limit is used and not the one provided - assertThat(topN.limit().fold(), equalTo(10000)); - assertThat(orderNames(topN), contains("gender")); - as(topN.child(), EsRelation.class); + as(mvExp.child(), EsRelation.class); } /**