diff --git a/docs/changelog/102545.yaml b/docs/changelog/102545.yaml new file mode 100644 index 0000000000000..bd2743397d1e4 --- /dev/null +++ b/docs/changelog/102545.yaml @@ -0,0 +1,7 @@ +pr: 102545 +summary: Fix LIMIT pushdown with MV_EXPAND +area: ES|QL +type: bug +issues: + - 102084 + - 102061 diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/mv_expand.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/mv_expand.csv-spec index 3a1ae3985e129..f0bf08b0e3174 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/mv_expand.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/mv_expand.csv-spec @@ -308,6 +308,61 @@ a:long | b:long | c:long | gender:keyword | str:keyword | x:key ; +// see https://github.com/elastic/elasticsearch/issues/102061 +sortMvExpand#[skip:-8.11.99] +row a = 1 | sort a | mv_expand a; + +a:integer +1 +; + + +// see https://github.com/elastic/elasticsearch/issues/102061 +limitSortMvExpand#[skip:-8.11.99] +row a = 1 | limit 1 | sort a | mv_expand a; + +a:integer +1 +; + + +// see https://github.com/elastic/elasticsearch/issues/102061 +limitSortMultipleMvExpand#[skip:-8.11.99] +row a = [1, 2, 3, 4, 5], b = 2, c = 3 | sort a | mv_expand a | mv_expand b | mv_expand c | limit 3; + +a:integer | b:integer | c:integer +1 | 2 | 3 +2 | 2 | 3 +3 | 2 | 3 +; + + +multipleLimitSortMultipleMvExpand#[skip:-8.11.99] +row a = [1, 2, 3, 4, 5], b = 2, c = 3 | sort a | mv_expand a | limit 2 | mv_expand b | mv_expand c | limit 3; + +a:integer | b:integer | c:integer +1 | 2 | 3 +2 | 2 | 3 +; + + +multipleLimitSortMultipleMvExpand2#[skip:-8.11.99] +row a = [1, 2, 3, 4, 5], b = 2, c = 3 | sort a | mv_expand a | limit 3 | mv_expand b | mv_expand c | limit 2; + +a:integer | b:integer | c:integer +1 | 2 | 3 +2 | 2 | 3 +; + + +//see https://github.com/elastic/elasticsearch/issues/102084 +whereMvExpand#[skip:-8.11.99] +row a = 1, b = -15 | where b > 3 | mv_expand b; + +a:integer | b:integer +; + + //see https://github.com/elastic/elasticsearch/issues/102912 statsDissectThatOverwritesAndMvExpand#[skip:-8.11.99] row a = "a", b = 1 | stats e = min(b) by a | dissect a "%{e}" | mv_expand e; @@ -324,3 +379,4 @@ from employees | where emp_no == 10001 | keep * | mv_expand first_name; avg_worked_seconds:long | birth_date:date | emp_no:integer | first_name:keyword | gender:keyword | height:double | height.float:double | height.half_float:double | height.scaled_float:double | hire_date:date | is_rehired:boolean | job_positions:keyword | languages:integer | languages.byte:integer | languages.long:long | languages.short:integer | last_name:keyword | salary:integer | salary_change:double | salary_change.int:integer | salary_change.keyword:keyword | salary_change.long:long | still_hired:boolean 268728049 | 1953-09-02T00:00:00.000Z | 10001 | Georgi | M | 2.03 | 2.0299999713897705 | 2.029296875 | 2.0300000000000002 | 1986-06-26T00:00:00.000Z | [false, true] | [Accountant, Senior Python Developer] | 2 | 2 | 2 | 2 | Facello | 57305 | 1.19 | 1 | 1.19 | 1 | true ; + diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java index 674a32db1f0fb..f32bdbeb5359e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java @@ -362,7 +362,8 @@ private LogicalPlan resolveMvExpand(MvExpand p, List childrenOutput) null, false ) - : resolved + : resolved, + p.limit() ); } return p; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypes.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypes.java index a0e9c620d0fce..787da390b768e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypes.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypes.java @@ -785,7 +785,8 @@ static void writeLimit(PlanStreamOutput out, Limit limit) throws IOException { } static MvExpand readMvExpand(PlanStreamInput in) throws IOException { - return new MvExpand(in.readSource(), in.readLogicalPlanNode(), in.readNamedExpression(), in.readAttribute()); + // last parameter (limit) is a local value that is needed only during the logical planning, no need to serialize/deserialize it + return new MvExpand(in.readSource(), in.readLogicalPlanNode(), in.readNamedExpression(), in.readAttribute(), -1); } static void writeMvExpand(PlanStreamOutput out, MvExpand mvExpand) throws IOException { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java index e4f67838731a0..6afbea6613969 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java @@ -158,10 +158,10 @@ protected static List> rules() { ); var skip = new Batch<>("Skip Compute", new SkipQueryOnLimitZero()); - var defaultTopN = new Batch<>("Add default TopN", new AddDefaultTopN()); + var defaultLimits = new Batch<>("Add default limits", new AddLimitToMvExpand(), new AddDefaultTopN()); var label = new Batch<>("Set as Optimized", Limiter.ONCE, new SetAsOptimized()); - return asList(substitutions, operators(), skip, cleanup(), defaultTopN, label); + return asList(substitutions, operators(), skip, cleanup(), defaultLimits, label); } // TODO: currently this rule only works for aggregate functions (AVG) @@ -441,6 +441,13 @@ protected LogicalPlan rule(Limit limit) { } else if (limit.child() instanceof UnaryPlan unary) { if (unary instanceof Eval || unary instanceof Project || unary instanceof RegexExtract || unary instanceof Enrich) { return unary.replaceChild(limit.replaceChild(unary.child())); + } else if (unary instanceof MvExpand mvx) { + var limitSource = limit.limit(); + var limitVal = (int) limitSource.fold(); + if (mvx.limit() < 0 || mvx.limit() > limitVal) { + mvx = new MvExpand(mvx.source(), mvx.child(), mvx.target(), mvx.expanded(), limitVal); + } + return mvx.replaceChild(limit.replaceChild(mvx.child())); } // check if there's a 'visible' descendant limit lower than the current one // and if so, align the current limit since it adds no value @@ -1049,6 +1056,41 @@ protected LogicalPlan rule(LogicalPlan plan, LogicalOptimizerContext context) { } } + /** + * This is not an optimization: this is last step of pushing LIMIT past MV_EXPAND, ie. + * + * 1. PushDownAndCombineLimits: "mv_expand x (unbounded) | limit y" -> "limit y | mv_expand x (with inner limit y)" + * 2. further pushdown of LIMIT (eg. needed for TopN) + * ... further rules + * 3. GO TO 1 (normal batch loop) + * 4. only once AddLimitToMvExpand: "mv_expand x (with inner limit y)" -> "mv_expand x (unbounded) | limit y" + * + * The final limit has to be added again because the implementation of MV_EXPAND operator is unbounded + * + * The reason why this cannot be a single rule + * (eg. "mv_expand x | limit y" -> "limit y | mv_expand x | limit y" ) + * is that LIMIT push-down (PushDownAndCombineLimits) is executed multiple times, and it would result in an infinite loop + * + */ + static class AddLimitToMvExpand extends OptimizerRules.OptimizerRule { + AddLimitToMvExpand() { + super(TransformDirection.UP); + } + + @Override + protected LogicalPlan rule(MvExpand plan) { + if (plan.limit() >= 0) { + return new Limit( + plan.source(), + new Literal(Source.EMPTY, plan.limit(), DataTypes.INTEGER), + new MvExpand(plan.source(), plan.child(), plan.target(), plan.expanded(), -1) + ); + } + return plan; + } + + } + public static class ReplaceRegexMatch extends OptimizerRules.ReplaceRegexMatch { protected Expression regexToEquals(RegexMatch regexMatch, Literal literal) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java index f9d1a252afe42..d11d560529748 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java @@ -151,8 +151,7 @@ public PlanFactory visitDissectCommand(EsqlBaseParser.DissectCommandContext ctx) public PlanFactory visitMvExpandCommand(EsqlBaseParser.MvExpandCommandContext ctx) { UnresolvedAttribute field = visitQualifiedName(ctx.qualifiedName()); Source src = source(ctx); - return child -> new MvExpand(src, child, field, new UnresolvedAttribute(src, field.name())); - + return child -> new MvExpand(src, child, field, new UnresolvedAttribute(src, field.name()), -1); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/MvExpand.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/MvExpand.java index 0cdcd4af00026..9a38ece8d0c4d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/MvExpand.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/MvExpand.java @@ -24,10 +24,13 @@ public class MvExpand extends UnaryPlan { private List output; - public MvExpand(Source source, LogicalPlan child, NamedExpression target, Attribute expanded) { + private final int limit; + + public MvExpand(Source source, LogicalPlan child, NamedExpression target, Attribute expanded, int limit) { super(source, child); this.target = target; this.expanded = expanded; + this.limit = limit; } public static List calculateOutput(List input, NamedExpression target, Attribute expanded) { @@ -57,7 +60,7 @@ public boolean expressionsResolved() { @Override public UnaryPlan replaceChild(LogicalPlan newChild) { - return new MvExpand(source(), newChild, target, expanded); + return new MvExpand(source(), newChild, target, expanded, limit); } @Override @@ -68,14 +71,18 @@ public List output() { return output; } + public int limit() { + return limit; + } + @Override protected NodeInfo info() { - return NodeInfo.create(this, MvExpand::new, child(), target, expanded); + return NodeInfo.create(this, MvExpand::new, child(), target, expanded, limit); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), target, expanded); + return Objects.hash(super.hashCode(), target, expanded, limit); } @Override @@ -83,6 +90,8 @@ public boolean equals(Object obj) { if (false == super.equals(obj)) { return false; } - return Objects.equals(target, ((MvExpand) obj).target) && Objects.equals(expanded, ((MvExpand) obj).expanded); + return Objects.equals(target, ((MvExpand) obj).target) + && Objects.equals(expanded, ((MvExpand) obj).expanded) + && limit == ((MvExpand) obj).limit; } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/Mapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/Mapper.java index 3eea84b0bd1f9..1b7745d4418d9 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/Mapper.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/Mapper.java @@ -151,6 +151,7 @@ private PhysicalPlan map(UnaryPlan p, PhysicalPlan child) { } if (p instanceof MvExpand mvExpand) { + assert mvExpand.limit() < 0; return new MvExpandExec(mvExpand.source(), map(mvExpand.child()), mvExpand.target(), mvExpand.expanded()); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypesTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypesTests.java index 85612427a1867..7ce8321a60d86 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypesTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypesTests.java @@ -488,7 +488,7 @@ public void testEsqlProject() throws IOException { public void testMvExpand() throws IOException { var esRelation = new EsRelation(Source.EMPTY, randomEsIndex(), List.of(randomFieldAttribute()), randomBoolean()); - var orig = new MvExpand(Source.EMPTY, esRelation, randomFieldAttribute(), randomFieldAttribute()); + var orig = new MvExpand(Source.EMPTY, esRelation, randomFieldAttribute(), randomFieldAttribute(), -1); BytesStreamOutput bso = new BytesStreamOutput(); PlanStreamOutput out = new PlanStreamOutput(bso, planNameRegistry); PlanNamedTypes.writeMvExpand(out, orig); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java index 6320294d7ee54..fa56ee1dd4c33 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java @@ -1024,17 +1024,18 @@ public void testDontPushDownLimitPastMvExpand() { /** * Expected - * EsqlProject[[emp_no{f}#141, first_name{f}#142, languages{f}#143, lll{r}#132, salary{f}#147]] - * \_TopN[[Order[salary{f}#147,DESC,FIRST], Order[first_name{f}#142,ASC,LAST]],5[INTEGER]] - * \_Limit[5[INTEGER]] - * \_MvExpand[salary{f}#147] - * \_Eval[[languages{f}#143 + 5[INTEGER] AS lll]] - * \_Filter[languages{f}#143 > 1[INTEGER]] - * \_Limit[10[INTEGER]] - * \_MvExpand[first_name{f}#142] - * \_TopN[[Order[emp_no{f}#141,DESC,FIRST]],10[INTEGER]] - * \_Filter[emp_no{f}#141 < 10006[INTEGER]] - * \_EsRelation[test][emp_no{f}#141, first_name{f}#142, languages{f}#1..] + * EsqlProject[[emp_no{f}#19, first_name{r}#29, languages{f}#22, lll{r}#8, salary{r}#30]] + * \_TopN[[Order[salary{r}#30,DESC,FIRST]],5[INTEGER]] + * \_Limit[5[INTEGER]] + * \_MvExpand[salary{f}#24,salary{r}#30,2147483647] + * \_Eval[[languages{f}#22 + 5[INTEGER] AS lll]] + * \_Limit[5[INTEGER]] + * \_Filter[languages{f}#22 > 1[INTEGER]] + * \_Limit[10[INTEGER]] + * \_MvExpand[first_name{f}#20,first_name{r}#29,2147483647] + * \_TopN[[Order[emp_no{f}#19,DESC,FIRST]],10[INTEGER]] + * \_Filter[emp_no{f}#19 ≤ 10006[INTEGER]] + * \_EsRelation[test][_meta_field{f}#25, emp_no{f}#19, first_name{f}#20, ..] */ public void testMultipleMvExpandWithSortAndLimit() { LogicalPlan plan = optimizedPlan(""" @@ -1059,7 +1060,8 @@ public void testMultipleMvExpandWithSortAndLimit() { assertThat(limit.limit().fold(), equalTo(5)); var mvExp = as(limit.child(), MvExpand.class); var eval = as(mvExp.child(), Eval.class); - var filter = as(eval.child(), Filter.class); + var limit5 = as(eval.child(), Limit.class); + var filter = as(limit5.child(), Filter.class); limit = as(filter.child(), Limit.class); assertThat(limit.limit().fold(), equalTo(10)); mvExp = as(limit.child(), MvExpand.class); @@ -2510,6 +2512,70 @@ public void testMvExpandFoldable() { var row = as(expand.child(), Row.class); } + /** + * Expected: + * Limit[500[INTEGER]] + * \_MvExpand[a{r}#2,a{r}#6,2147483647] + * \_TopN[[Order[a{r}#2,ASC,LAST]],500[INTEGER]] + * \_Row[[1[INTEGER] AS a]] + */ + public void testSortMvExpand() { + LogicalPlan plan = optimizedPlan(""" + row a = 1 + | sort a + | mv_expand a"""); + + var limit = as(plan, Limit.class); + var expand = as(limit.child(), MvExpand.class); + var topN = as(expand.child(), TopN.class); + var row = as(topN.child(), Row.class); + } + + /** + * Expected: + * Limit[20[INTEGER]] + * \_MvExpand[emp_no{f}#4,emp_no{r}#14,-1] + * \_TopN[[Order[emp_no{f}#4,ASC,LAST]],20[INTEGER]] + * \_EsRelation[test][_meta_field{f}#10, emp_no{f}#4, first_name{f}#5, ge..] + */ + public void testSortMvExpandLimit() { + LogicalPlan plan = optimizedPlan(""" + from test + | sort emp_no + | mv_expand emp_no + | limit 20"""); + + var limit = as(plan, Limit.class); + assertThat(limit.limit().fold(), is(20)); + var expand = as(limit.child(), MvExpand.class); + var topN = as(expand.child(), TopN.class); + assertThat(topN.limit().fold(), is(20)); + var row = as(topN.child(), EsRelation.class); + } + + /** + * Expected: + * Limit[500[INTEGER]] + * \_MvExpand[b{r}#4,b{r}#8,-1] + * \_Limit[500[INTEGER]] + * \_Row[[1[INTEGER] AS a, -15[INTEGER] AS b]] + * + * see https://github.com/elastic/elasticsearch/issues/102084 + */ + public void testWhereMvExpand() { + LogicalPlan plan = optimizedPlan(""" + row a = 1, b = -15 + | where b < 3 + | mv_expand b"""); + + var limit = as(plan, Limit.class); + assertThat(limit.limit().fold(), is(500)); + var expand = as(limit.child(), MvExpand.class); + var limit2 = as(expand.child(), Limit.class); + assertThat(limit2.limit().fold(), is(500)); + var row = as(limit2.child(), Row.class); + } + /** * Expected * Limit[500[INTEGER]]