Skip to content
7 changes: 7 additions & 0 deletions docs/changelog/102545.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pr: 102545
summary: Fix LIMIT pushdown with MV_EXPAND
area: ES|QL
type: bug
issues:
- 102084
- 102061
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,61 @@ a:long | b:long | c:long | gender:keyword | str:keyword | x:key
;


// see https://github.com/elastic/elasticsearch/issues/102061
sortMvExpand#[skip:-8.11.99]
row a = 1 | sort a | mv_expand a;

a:integer
1
;


// see https://github.com/elastic/elasticsearch/issues/102061
limitSortMvExpand#[skip:-8.11.99]
row a = 1 | limit 1 | sort a | mv_expand a;

a:integer
1
;


// see https://github.com/elastic/elasticsearch/issues/102061
limitSortMultipleMvExpand#[skip:-8.11.99]
row a = [1, 2, 3, 4, 5], b = 2, c = 3 | sort a | mv_expand a | mv_expand b | mv_expand c | limit 3;

a:integer | b:integer | c:integer
1 | 2 | 3
2 | 2 | 3
3 | 2 | 3
;


multipleLimitSortMultipleMvExpand#[skip:-8.11.99]
row a = [1, 2, 3, 4, 5], b = 2, c = 3 | sort a | mv_expand a | limit 2 | mv_expand b | mv_expand c | limit 3;

a:integer | b:integer | c:integer
1 | 2 | 3
2 | 2 | 3
;


multipleLimitSortMultipleMvExpand2#[skip:-8.11.99]
row a = [1, 2, 3, 4, 5], b = 2, c = 3 | sort a | mv_expand a | limit 3 | mv_expand b | mv_expand c | limit 2;

a:integer | b:integer | c:integer
1 | 2 | 3
2 | 2 | 3
;


//see https://github.com/elastic/elasticsearch/issues/102084
whereMvExpand#[skip:-8.11.99]
row a = 1, b = -15 | where b > 3 | mv_expand b;

a:integer | b:integer
;


//see https://github.com/elastic/elasticsearch/issues/102912
statsDissectThatOverwritesAndMvExpand#[skip:-8.11.99]
row a = "a", b = 1 | stats e = min(b) by a | dissect a "%{e}" | mv_expand e;
Expand All @@ -324,3 +379,4 @@ from employees | where emp_no == 10001 | keep * | mv_expand first_name;
avg_worked_seconds:long | birth_date:date | emp_no:integer | first_name:keyword | gender:keyword | height:double | height.float:double | height.half_float:double | height.scaled_float:double | hire_date:date | is_rehired:boolean | job_positions:keyword | languages:integer | languages.byte:integer | languages.long:long | languages.short:integer | last_name:keyword | salary:integer | salary_change:double | salary_change.int:integer | salary_change.keyword:keyword | salary_change.long:long | still_hired:boolean
268728049 | 1953-09-02T00:00:00.000Z | 10001 | Georgi | M | 2.03 | 2.0299999713897705 | 2.029296875 | 2.0300000000000002 | 1986-06-26T00:00:00.000Z | [false, true] | [Accountant, Senior Python Developer] | 2 | 2 | 2 | 2 | Facello | 57305 | 1.19 | 1 | 1.19 | 1 | true
;

Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,8 @@ private LogicalPlan resolveMvExpand(MvExpand p, List<Attribute> childrenOutput)
null,
false
)
: resolved
: resolved,
p.limit()
);
}
return p;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,8 @@ static void writeLimit(PlanStreamOutput out, Limit limit) throws IOException {
}

static MvExpand readMvExpand(PlanStreamInput in) throws IOException {
return new MvExpand(in.readSource(), in.readLogicalPlanNode(), in.readNamedExpression(), in.readAttribute());
// last parameter (limit) is a local value that is needed only during the logical planning, no need to serialize/deserialize it
return new MvExpand(in.readSource(), in.readLogicalPlanNode(), in.readNamedExpression(), in.readAttribute(), -1);
}

static void writeMvExpand(PlanStreamOutput out, MvExpand mvExpand) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,10 @@ protected static List<Batch<LogicalPlan>> rules() {
);

var skip = new Batch<>("Skip Compute", new SkipQueryOnLimitZero());
var defaultTopN = new Batch<>("Add default TopN", new AddDefaultTopN());
var defaultLimits = new Batch<>("Add default limits", new AddLimitToMvExpand(), new AddDefaultTopN());
var label = new Batch<>("Set as Optimized", Limiter.ONCE, new SetAsOptimized());

return asList(substitutions, operators(), skip, cleanup(), defaultTopN, label);
return asList(substitutions, operators(), skip, cleanup(), defaultLimits, label);
}

// TODO: currently this rule only works for aggregate functions (AVG)
Expand Down Expand Up @@ -441,6 +441,13 @@ protected LogicalPlan rule(Limit limit) {
} else if (limit.child() instanceof UnaryPlan unary) {
if (unary instanceof Eval || unary instanceof Project || unary instanceof RegexExtract || unary instanceof Enrich) {
return unary.replaceChild(limit.replaceChild(unary.child()));
} else if (unary instanceof MvExpand mvx) {
var limitSource = limit.limit();
var limitVal = (int) limitSource.fold();
if (mvx.limit() < 0 || mvx.limit() > limitVal) {
mvx = new MvExpand(mvx.source(), mvx.child(), mvx.target(), mvx.expanded(), limitVal);
}
return mvx.replaceChild(limit.replaceChild(mvx.child()));
}
// check if there's a 'visible' descendant limit lower than the current one
// and if so, align the current limit since it adds no value
Expand Down Expand Up @@ -1049,6 +1056,41 @@ protected LogicalPlan rule(LogicalPlan plan, LogicalOptimizerContext context) {
}
}

/**
* This is not an optimization: this is last step of pushing LIMIT past MV_EXPAND, ie.
*
* 1. PushDownAndCombineLimits: "mv_expand x (unbounded) | limit y" -> "limit y | mv_expand x (with inner limit y)"
* 2. further pushdown of LIMIT (eg. needed for TopN)
* ... further rules
* 3. GO TO 1 (normal batch loop)
* 4. only once AddLimitToMvExpand: "mv_expand x (with inner limit y)" -> "mv_expand x (unbounded) | limit y"
*
* The final limit has to be added again because the implementation of MV_EXPAND operator is unbounded
*
* The reason why this cannot be a single rule
* (eg. "mv_expand x | limit y" -> "limit y | mv_expand x | limit y" )
* is that LIMIT push-down (PushDownAndCombineLimits) is executed multiple times, and it would result in an infinite loop
*
*/
static class AddLimitToMvExpand extends OptimizerRules.OptimizerRule<MvExpand> {
AddLimitToMvExpand() {
super(TransformDirection.UP);
}

@Override
protected LogicalPlan rule(MvExpand plan) {
if (plan.limit() >= 0) {
return new Limit(
plan.source(),
new Literal(Source.EMPTY, plan.limit(), DataTypes.INTEGER),
new MvExpand(plan.source(), plan.child(), plan.target(), plan.expanded(), -1)
);
}
return plan;
}

}

public static class ReplaceRegexMatch extends OptimizerRules.ReplaceRegexMatch {

protected Expression regexToEquals(RegexMatch<?> regexMatch, Literal literal) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,7 @@ public PlanFactory visitDissectCommand(EsqlBaseParser.DissectCommandContext ctx)
public PlanFactory visitMvExpandCommand(EsqlBaseParser.MvExpandCommandContext ctx) {
UnresolvedAttribute field = visitQualifiedName(ctx.qualifiedName());
Source src = source(ctx);
return child -> new MvExpand(src, child, field, new UnresolvedAttribute(src, field.name()));

return child -> new MvExpand(src, child, field, new UnresolvedAttribute(src, field.name()), -1);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@ public class MvExpand extends UnaryPlan {

private List<Attribute> output;

public MvExpand(Source source, LogicalPlan child, NamedExpression target, Attribute expanded) {
private final int limit;

public MvExpand(Source source, LogicalPlan child, NamedExpression target, Attribute expanded, int limit) {
super(source, child);
this.target = target;
this.expanded = expanded;
this.limit = limit;
}

public static List<Attribute> calculateOutput(List<Attribute> input, NamedExpression target, Attribute expanded) {
Expand Down Expand Up @@ -57,7 +60,7 @@ public boolean expressionsResolved() {

@Override
public UnaryPlan replaceChild(LogicalPlan newChild) {
return new MvExpand(source(), newChild, target, expanded);
return new MvExpand(source(), newChild, target, expanded, limit);
}

@Override
Expand All @@ -68,21 +71,27 @@ public List<Attribute> output() {
return output;
}

public int limit() {
return limit;
}

@Override
protected NodeInfo<? extends LogicalPlan> info() {
return NodeInfo.create(this, MvExpand::new, child(), target, expanded);
return NodeInfo.create(this, MvExpand::new, child(), target, expanded, limit);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), target, expanded);
return Objects.hash(super.hashCode(), target, expanded, limit);
}

@Override
public boolean equals(Object obj) {
if (false == super.equals(obj)) {
return false;
}
return Objects.equals(target, ((MvExpand) obj).target) && Objects.equals(expanded, ((MvExpand) obj).expanded);
return Objects.equals(target, ((MvExpand) obj).target)
&& Objects.equals(expanded, ((MvExpand) obj).expanded)
&& limit == ((MvExpand) obj).limit;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ private PhysicalPlan map(UnaryPlan p, PhysicalPlan child) {
}

if (p instanceof MvExpand mvExpand) {
assert mvExpand.limit() < 0;
return new MvExpandExec(mvExpand.source(), map(mvExpand.child()), mvExpand.target(), mvExpand.expanded());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ public void testEsqlProject() throws IOException {

public void testMvExpand() throws IOException {
var esRelation = new EsRelation(Source.EMPTY, randomEsIndex(), List.of(randomFieldAttribute()), randomBoolean());
var orig = new MvExpand(Source.EMPTY, esRelation, randomFieldAttribute(), randomFieldAttribute());
var orig = new MvExpand(Source.EMPTY, esRelation, randomFieldAttribute(), randomFieldAttribute(), -1);
BytesStreamOutput bso = new BytesStreamOutput();
PlanStreamOutput out = new PlanStreamOutput(bso, planNameRegistry);
PlanNamedTypes.writeMvExpand(out, orig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1024,17 +1024,18 @@ public void testDontPushDownLimitPastMvExpand() {

/**
* Expected
* EsqlProject[[emp_no{f}#141, first_name{f}#142, languages{f}#143, lll{r}#132, salary{f}#147]]
* \_TopN[[Order[salary{f}#147,DESC,FIRST], Order[first_name{f}#142,ASC,LAST]],5[INTEGER]]
* \_Limit[5[INTEGER]]
* \_MvExpand[salary{f}#147]
* \_Eval[[languages{f}#143 + 5[INTEGER] AS lll]]
* \_Filter[languages{f}#143 > 1[INTEGER]]
* \_Limit[10[INTEGER]]
* \_MvExpand[first_name{f}#142]
* \_TopN[[Order[emp_no{f}#141,DESC,FIRST]],10[INTEGER]]
* \_Filter[emp_no{f}#141 &lt; 10006[INTEGER]]
* \_EsRelation[test][emp_no{f}#141, first_name{f}#142, languages{f}#1..]
* EsqlProject[[emp_no{f}#19, first_name{r}#29, languages{f}#22, lll{r}#8, salary{r}#30]]
* \_TopN[[Order[salary{r}#30,DESC,FIRST]],5[INTEGER]]
* \_Limit[5[INTEGER]]
* \_MvExpand[salary{f}#24,salary{r}#30,2147483647]
* \_Eval[[languages{f}#22 + 5[INTEGER] AS lll]]
* \_Limit[5[INTEGER]]
* \_Filter[languages{f}#22 > 1[INTEGER]]
* \_Limit[10[INTEGER]]
* \_MvExpand[first_name{f}#20,first_name{r}#29,2147483647]
* \_TopN[[Order[emp_no{f}#19,DESC,FIRST]],10[INTEGER]]
* \_Filter[emp_no{f}#19 &le; 10006[INTEGER]]
* \_EsRelation[test][_meta_field{f}#25, emp_no{f}#19, first_name{f}#20, ..]
*/
public void testMultipleMvExpandWithSortAndLimit() {
LogicalPlan plan = optimizedPlan("""
Expand All @@ -1059,7 +1060,8 @@ public void testMultipleMvExpandWithSortAndLimit() {
assertThat(limit.limit().fold(), equalTo(5));
var mvExp = as(limit.child(), MvExpand.class);
var eval = as(mvExp.child(), Eval.class);
var filter = as(eval.child(), Filter.class);
var limit5 = as(eval.child(), Limit.class);
var filter = as(limit5.child(), Filter.class);
limit = as(filter.child(), Limit.class);
assertThat(limit.limit().fold(), equalTo(10));
mvExp = as(limit.child(), MvExpand.class);
Expand Down Expand Up @@ -2510,6 +2512,70 @@ public void testMvExpandFoldable() {
var row = as(expand.child(), Row.class);
}

/**
* Expected:
* Limit[500[INTEGER]]
* \_MvExpand[a{r}#2,a{r}#6,2147483647]
* \_TopN[[Order[a{r}#2,ASC,LAST]],500[INTEGER]]
* \_Row[[1[INTEGER] AS a]]
*/
public void testSortMvExpand() {
LogicalPlan plan = optimizedPlan("""
row a = 1
| sort a
| mv_expand a""");

var limit = as(plan, Limit.class);
var expand = as(limit.child(), MvExpand.class);
var topN = as(expand.child(), TopN.class);
var row = as(topN.child(), Row.class);
}

/**
* Expected:
* Limit[20[INTEGER]]
* \_MvExpand[emp_no{f}#4,emp_no{r}#14,-1]
* \_TopN[[Order[emp_no{f}#4,ASC,LAST]],20[INTEGER]]
* \_EsRelation[test][_meta_field{f}#10, emp_no{f}#4, first_name{f}#5, ge..]
*/
public void testSortMvExpandLimit() {
LogicalPlan plan = optimizedPlan("""
from test
| sort emp_no
| mv_expand emp_no
| limit 20""");

var limit = as(plan, Limit.class);
assertThat(limit.limit().fold(), is(20));
var expand = as(limit.child(), MvExpand.class);
var topN = as(expand.child(), TopN.class);
assertThat(topN.limit().fold(), is(20));
var row = as(topN.child(), EsRelation.class);
}

/**
* Expected:
* Limit[500[INTEGER]]
* \_MvExpand[b{r}#4,b{r}#8,-1]
* \_Limit[500[INTEGER]]
* \_Row[[1[INTEGER] AS a, -15[INTEGER] AS b]]
*
* see https://github.com/elastic/elasticsearch/issues/102084
*/
public void testWhereMvExpand() {
LogicalPlan plan = optimizedPlan("""
row a = 1, b = -15
| where b < 3
| mv_expand b""");

var limit = as(plan, Limit.class);
assertThat(limit.limit().fold(), is(500));
var expand = as(limit.child(), MvExpand.class);
var limit2 = as(expand.child(), Limit.class);
assertThat(limit2.limit().fold(), is(500));
var row = as(limit2.child(), Row.class);
}

/**
* Expected
* Limit[500[INTEGER]]
Expand Down