diff --git a/docs/changelog/128917.yaml b/docs/changelog/128917.yaml new file mode 100644 index 0000000000000..2d80fb5b43354 --- /dev/null +++ b/docs/changelog/128917.yaml @@ -0,0 +1,6 @@ +pr: 128917 +summary: Adopt a "LogicalPlan" approach to running multiple sub-queries (with INLINESTATS + so far) +area: ES|QL +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 03fbcbc5b18fe..fd3852d8ecca3 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -333,6 +333,7 @@ static TransportVersion def(int id) { public static final TransportVersion ESQL_SERIALIZE_TIMESERIES_FIELD_TYPE = def(9_114_0_00); public static final TransportVersion ML_INFERENCE_IBM_WATSONX_COMPLETION_ADDED = def(9_115_0_00); public static final TransportVersion ESQL_SPLIT_ON_BIG_VALUES = def(9_116_0_00); + public static final TransportVersion ESQL_LOCAL_RELATION_WITH_NEW_BLOCKS = def(9_117_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java index be0feade7f8af..f244069aab57a 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java @@ -49,7 +49,7 @@ import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.FORK_V9; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V2; -import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V7; +import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V8; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP_V12; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_PLANNING_V1; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.METADATA_FIELDS_REMOTE_TEST; @@ -128,7 +128,7 @@ protected void shouldSkipTest(String testName) throws IOException { assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS.capabilityName())); assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V2.capabilityName())); assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_PLANNING_V1.capabilityName())); - assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V7.capabilityName())); + assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V8.capabilityName())); assumeFalse("LOOKUP JOIN not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_LOOKUP_V12.capabilityName())); // Unmapped fields require a coorect capability response from every cluster, which isn't currently implemented. assumeFalse("UNMAPPED FIELDS not yet supported in CCS", testCase.requiredCapabilities.contains(UNMAPPED_FIELDS.capabilityName())); diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java index 7ec9ee6344551..c304e9d9b742d 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java @@ -83,6 +83,7 @@ import org.elasticsearch.xpack.esql.plan.logical.EsRelation; import org.elasticsearch.xpack.esql.plan.logical.Limit; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.plan.logical.local.EmptyLocalSupplier; import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation; import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier; import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; @@ -446,7 +447,7 @@ public static Literal L(Object value) { } public static LogicalPlan emptySource() { - return new LocalRelation(Source.EMPTY, emptyList(), LocalSupplier.EMPTY); + return new LocalRelation(Source.EMPTY, emptyList(), EmptyLocalSupplier.EMPTY); } public static LogicalPlan localSource(BlockFactory blockFactory, List fields, List row) { diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec index 30ed34d9ec611..ae37995415e95 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec @@ -3,7 +3,7 @@ // allFieldsReturned -required_capability: inlinestats_v7 +required_capability: inlinestats_v8 FROM hosts METADATA _index | INLINESTATS c = COUNT(*) BY host_group @@ -16,7 +16,7 @@ eth0 |epsilon gw instance|epsilon |[fe80::cae2:65ff:fece:feb9, ; maxOfInt -required_capability: inlinestats_v7 +required_capability: inlinestats_v8 // tag::max-languages[] FROM employees | KEEP emp_no, languages @@ -38,7 +38,7 @@ emp_no:integer | languages:integer | max_lang:integer ; maxOfIntByKeyword -required_capability: inlinestats_v7 +required_capability: inlinestats_v8 FROM employees | KEEP emp_no, languages, gender @@ -56,7 +56,7 @@ emp_no:integer | languages:integer | max_lang:integer | gender:keyword ; maxOfLongByKeyword -required_capability: inlinestats_v7 +required_capability: inlinestats_v8 FROM employees | KEEP emp_no, avg_worked_seconds, gender @@ -71,7 +71,7 @@ emp_no:integer | avg_worked_seconds:long | max_avg_worked_seconds:long | gender: ; maxOfLong -required_capability: inlinestats_v7 +required_capability: inlinestats_v8 FROM employees | KEEP emp_no, avg_worked_seconds, gender @@ -84,7 +84,7 @@ emp_no:integer | avg_worked_seconds:long | gender:keyword | max_avg_worked_secon ; maxOfLongByCalculatedKeyword -required_capability: inlinestats_v7 +required_capability: inlinestats_v8 // tag::longest-tenured-by-first[] FROM employees @@ -107,7 +107,7 @@ emp_no:integer | avg_worked_seconds:long | last_name:keyword | max_avg_worked_se ; maxOfLongByCalculatedNamedKeyword -required_capability: inlinestats_v7 +required_capability: inlinestats_v8 FROM employees | KEEP emp_no, avg_worked_seconds, last_name @@ -126,7 +126,7 @@ emp_no:integer | avg_worked_seconds:long | last_name:keyword | max_avg_worked_se ; maxOfLongByCalculatedDroppedKeyword -required_capability: inlinestats_v7 +required_capability: inlinestats_v8 FROM employees | INLINESTATS max_avg_worked_seconds = MAX(avg_worked_seconds) BY l = SUBSTRING(last_name, 0, 1) @@ -145,7 +145,7 @@ emp_no:integer | avg_worked_seconds:long | last_name:keyword | max_avg_worked_se ; maxOfLongByEvaledKeyword -required_capability: inlinestats_v7 +required_capability: inlinestats_v8 FROM employees | EVAL l = SUBSTRING(last_name, 0, 1) @@ -165,7 +165,7 @@ emp_no:integer | avg_worked_seconds:long | max_avg_worked_seconds:long | l:keywo ; maxOfLongByInt -required_capability: inlinestats_v7 +required_capability: inlinestats_v8 FROM employees | KEEP emp_no, avg_worked_seconds, languages @@ -183,7 +183,7 @@ emp_no:integer | avg_worked_seconds:long | max_avg_worked_seconds:long | languag ; maxOfLongByIntDouble -required_capability: inlinestats_v7 +required_capability: inlinestats_v8 FROM employees | KEEP emp_no, avg_worked_seconds, languages, height @@ -200,9 +200,33 @@ emp_no:integer | avg_worked_seconds:long | max_avg_worked_seconds:long | languag 10086 | 328580163 | 328580163 | 1 | 1.7 ; +two +required_capability: inlinestats_v8 -two-Ignore -required_capability: join_planning_v1 +FROM employees +| KEEP emp_no, languages, avg_worked_seconds, gender +| INLINESTATS avg_avg_worked_seconds = AVG(avg_worked_seconds) BY languages +| WHERE avg_worked_seconds > avg_avg_worked_seconds +| INLINESTATS max_languages = MAX(languages) BY gender +| SORT emp_no ASC +| LIMIT 10; + +emp_no:integer |avg_worked_seconds:long|avg_avg_worked_seconds:double|languages:integer|max_languages:integer|gender:keyword +10002 |328922887 |3.133013149047619E8 |5 |5 |F +10006 |372957040 |2.978159518235294E8 |3 |5 |F +10007 |393084805 |2.863684210555556E8 |4 |5 |F +10010 |315236372 |2.863684210555556E8 |4 |5 |null +10012 |365510850 |3.133013149047619E8 |5 |5 |null +10015 |390266432 |3.133013149047619E8 |5 |5 |null +10018 |309604079 |3.0318626831578946E8 |2 |5 |null +10019 |342855721 |2.94833632E8 |1 |5 |null +10020 |373309605 |3.181719481E8 |null |5 |M +10023 |330870342 |3.181719481E8 |null |5 |F +; + +three +required_capability: inlinestats_v8 +// used to fail with AssertionError at org.elasticsearch.xpack.esql.plan.logical.Limit.writeTo(Limit.java:70) FROM employees | KEEP emp_no, languages, avg_worked_seconds, gender @@ -210,16 +234,44 @@ FROM employees | WHERE avg_worked_seconds > avg_avg_worked_seconds | INLINESTATS max_languages = MAX(languages) BY gender | SORT emp_no ASC -| LIMIT 3; +| LIMIT 10 +| INLINESTATS min(languages), max(languages) by gender +; + +emp_no:integer |avg_worked_seconds:long|avg_avg_worked_seconds:double|languages:integer|max_languages:integer|min(languages):integer|max(languages):integer|gender:keyword +10002 |328922887 |3.133013149047619E8 |5 |5 |3 |5 |F +10006 |372957040 |2.978159518235294E8 |3 |5 |3 |5 |F +10007 |393084805 |2.863684210555556E8 |4 |5 |3 |5 |F +10010 |315236372 |2.863684210555556E8 |4 |5 |1 |5 |null +10012 |365510850 |3.133013149047619E8 |5 |5 |1 |5 |null +10015 |390266432 |3.133013149047619E8 |5 |5 |1 |5 |null +10018 |309604079 |3.0318626831578946E8 |2 |5 |1 |5 |null +10019 |342855721 |2.94833632E8 |1 |5 |1 |5 |null +10020 |373309605 |3.181719481E8 |null |5 |null |null |M +10023 |330870342 |3.181719481E8 |null |5 |3 |5 |F +; + +// TODO: INLINESTATS unit test needed for this one +pushDownSort_To_LeftSideOnly +required_capability: inlinestats_v8 + +from employees +| sort emp_no +| inlinestats avg = avg(salary) by languages +| limit 5 +| keep emp_no, avg, languages, gender +; -emp_no:integer | languages:integer | avg_worked_seconds:long | gender:keyword | avg_avg_worked_seconds:double | max_languages:integer - 10002 | 5 | 328922887 | F | 3.133013149047619E8 | 5 - 10006 | 3 | 372957040 | F | 2.978159518235294E8 | 5 - 10007 | 4 | 393084805 | F | 2.863684210555556E8 | 5 + emp_no:integer| avg:double |languages:integer|gender:keyword +10001 |57305.0 |2 |M +10002 |46272.5 |5 |F +10003 |61805.0 |4 |M +10004 |46272.5 |5 |M +10005 |63528.0 |1 |M ; byMultivaluedSimple -required_capability: inlinestats_v7 +required_capability: inlinestats_v8 // tag::mv-group[] FROM airports @@ -237,7 +289,7 @@ abbrev:keyword | type:keyword | scalerank:integer | min_scalerank:integer ; byMultivaluedMvExpand -required_capability: inlinestats_v7 +required_capability: inlinestats_v8 // tag::mv-expand[] FROM airports @@ -257,7 +309,7 @@ GWL |9 |4 |military ; byMvExpand -required_capability: inlinestats_v7 +required_capability: inlinestats_v8 // tag::extreme-airports[] FROM airports @@ -285,20 +337,31 @@ FROM airports // end::extreme-airports-result[] ; -brokenwhy-Ignore -required_capability: join_planning_v1 +mvMinMvExpand +required_capability: inlinestats_v8 FROM airports +| EVAL original_type = type | INLINESTATS min_scalerank=MIN(scalerank) BY type | MV_EXPAND type -| WHERE scalerank == MV_MIN(scalerank); +| EVAL mvMin_scalerank = MV_MIN(scalerank) +| WHERE scalerank == MV_MIN(scalerank) +| SORT abbrev DESC NULLS LAST, type +| LIMIT 7 +; -abbrev:keyword | type:keyword | scalerank:integer | min_scalerank:integer - GWL | [mid, military] | 9 | [2, 4] +abbrev:keyword |city:keyword |city_location:geo_point |country:keyword| location:geo_point | name:text |scalerank:integer|original_type:keyword |min_scalerank:integer|type:keyword|mvMin_scalerank:integer +ZRH |Zürich |POINT (8.5411 47.3744) |Switzerland |POINT (8.56221279534765 47.4523895064915) |Zurich Int'l |3 |major |2 |major |3 +ZNZ |Zanzibar |POINT (39.199 -6.165) |Tanzania |POINT (39.2223319841558 -6.21857034620282)|Zanzibar |4 |mid |2 |mid |4 +ZLO |Cihuatlán |POINT (-104.5667 19.25) |Mexico |POINT (-104.560095200097 19.1480860285854)|Playa de Oro Int'l |7 |mid |2 |mid |7 +ZHHH |Wuhan |POINT (114.2881 30.5872)|China |POINT (114.24694737615 30.6017141196702) |Wang-Chia Tun Airbase|6 |[mid, military] |[2, 4] |mid |6 +ZHHH |Wuhan |POINT (114.2881 30.5872)|China |POINT (114.24694737615 30.6017141196702) |Wang-Chia Tun Airbase|6 |[mid, military] |[2, 4] |military |6 +ZGC |Lanzhou |POINT (103.8318 36.0617)|China |POINT (103.615415363043 36.5078842461237) |Lanzhou Zhongchuan |6 |mid |2 |mid |6 +ZAR |Zaria |POINT (7.7 11.0667) |Nigeria |POINT (7.68726764310577 11.1352958601071) |Zaria |8 |mid |2 |mid |8 ; -afterStats-Ignore -required_capability: join_planning_v1 +afterStats +required_capability: inlinestats_v8 FROM airports | STATS count=COUNT(*) BY country @@ -321,7 +384,7 @@ count:long | country:keyword | avg:double ; afterWhere -required_capability: inlinestats_v7 +required_capability: inlinestats_v8 FROM airports | WHERE country != "United States" @@ -338,30 +401,32 @@ abbrev:keyword | country:keyword | count:long BDQ | India | 50 ; -afterLookup-Ignore -required_capability: join_planning_v1 +afterLookup +required_capability: inlinestats_v8 +required_capability: join_lookup_v12 FROM airports -| RENAME scalerank AS int -| LOOKUP int_number_names ON int -| RENAME name as scalerank -| DROP int +| EVAL backup_scalerank = scalerank +| RENAME scalerank AS language_code +| LOOKUP JOIN languages_lookup ON language_code +| RENAME language_name as scalerank +| DROP language_code | INLINESTATS count=COUNT(*) BY scalerank -| SORT abbrev ASC -| KEEP abbrev, scalerank -| LIMIT 4 +| SORT abbrev DESC +| KEEP abbrev, *scalerank +| LIMIT 5 ; -abbrev:keyword | scalerank:keyword - ABJ | four - ABQ | six - ABV | five - ACA | four +abbrev:keyword |backup_scalerank:integer| scalerank:keyword +null |8 |null +null |8 |null +null |8 |null +ZRH |3 |Spanish +ZNZ |4 |German ; afterEnrich -required_capability: join_planning_v1 -required_capability: inlinestats_v7 +required_capability: inlinestats_v8 required_capability: enrich_load FROM airports @@ -382,7 +447,7 @@ abbrev:keyword | city:keyword | "COUNT(*)":long | region:text ; beforeStats -required_capability: inlinestats_v7 +required_capability: inlinestats_v8 FROM airports | EVAL lat = ST_Y(location) @@ -395,7 +460,7 @@ northern:long | southern:long ; beforeKeepSort -required_capability: inlinestats_v7 +required_capability: inlinestats_v8 FROM employees | INLINESTATS max_salary = MAX(salary) by languages @@ -410,7 +475,7 @@ emp_no:integer | languages:integer | max_salary:integer ; beforeKeepWhere -required_capability: inlinestats_v7 +required_capability: inlinestats_v8 FROM employees | INLINESTATS max_salary = MAX(salary) by languages @@ -423,8 +488,7 @@ emp_no:integer | languages:integer | max_salary:integer ; beforeEnrich -required_capability: join_planning_v1 -required_capability: inlinestats_v7 +required_capability: inlinestats_v8 required_capability: enrich_load FROM airports @@ -442,9 +506,9 @@ ABV |Abuja |385 |major |Municipal Are ACA |Acapulco de Juárez|385 |major |Acapulco de Juárez ; -beforeAndAfterEnrich-Ignore -required_capability: join_planning_v1 -// does not fail, but it returns null for all values of count_region +beforeAndAfterEnrich +required_capability: inlinestats_v8 +required_capability: enrich_load FROM airports | KEEP abbrev, type, city @@ -453,72 +517,162 @@ FROM airports | WHERE MV_COUNT(region) == 1 | INLINESTATS count_region=COUNT(*) BY region | SORT abbrev ASC -| LIMIT 3 +| WHERE STARTS_WITH(abbrev, "AL") +| LIMIT 5 ; -abbrev:keyword | city:keyword |"COUNT(*)":long | type:keyword | count_region:long | region:text -ABJ |Abidjan |499 |mid | 1 |Abidjan -ABV |Abuja |385 |major | 1 |Municipal Area Council -ACA |Acapulco de Juárez|385 |major | 1 |Acapulco de Juárez +abbrev:keyword | city:keyword |"COUNT(*)":long| type:keyword | count_region:long | region:text +ALA |Almaty |385 |major |2 |Жетісу ауданы +ALB |Colonie |499 |mid |1 |Town of Colonie +ALC |Alicante |385 |major |1 |Alacant / Alicante +ALG |Algiers |385 |major |1 |Alger +ALL |Albenga |499 |mid |1 |Albenga ; -shadowing-Ignore -required_capability: join_planning_v1 +shadowing +required_capability: inlinestats_v8 ROW left = "left", client_ip = "172.21.0.5", env = "env", right = "right" -| INLINESTATS env=VALUES(right) BY client_ip +| INLINESTATS env = VALUES(right) BY client_ip ; -left:keyword | client_ip:keyword | right:keyword | env:keyword -left | 172.21.0.5 | right | right +left:keyword | right:keyword | env:keyword | client_ip:keyword +left | right | right | 172.21.0.5 ; -shadowingMulti-Ignore -required_capability: join_planning_v1 +shadowingMulti +required_capability: inlinestats_v8 ROW left = "left", airport = "Zurich Airport ZRH", city = "Zürich", middle = "middle", region = "North-East Switzerland", right = "right" | INLINESTATS airport=VALUES(left), region=VALUES(left), city_boundary=VALUES(left) BY city ; -left:keyword | city:keyword | middle:keyword | right:keyword | airport:keyword | region:keyword | city_boundary:keyword -left | Zürich | middle | right | left | left | left +left:keyword | middle:keyword | right:keyword | airport:keyword | region:keyword | city_boundary:keyword | city:keyword +left | middle | right | left | left | left | Zürich ; -shadowingSelf-Ignore -required_capability: join_planning_v1 +shadowingSelf +required_capability: inlinestats_v8 -ROW city="Raleigh" -| INLINESTATS city=COUNT(city) +ROW city = "Raleigh" +| INLINESTATS city = COUNT(city) ; city:long 1 ; -shadowingSelfBySelf-Ignore -required_capability: join_planning_v1 +shadowingSelfBySelf +required_capability: inlinestats_v8 -ROW city="Raleigh" -| INLINESTATS city=COUNT(city) BY city +ROW city = "Raleigh" +| INLINESTATS city = COUNT(city) BY city ; +warning:Line 2:15: Field 'city' shadowed by field at line 2:37 -city:long -1 +city:keyword +Raleigh ; -shadowingInternal-Ignore -required_capability: join_planning_v1 +shadowingInternal +required_capability: inlinestats_v8 ROW city = "Zürich" -| INLINESTATS x=VALUES(city), x=VALUES(city) +| INLINESTATS x = VALUES(city), x = VALUES(city) ; +warning:Line 2:15: Field 'x' shadowed by field at line 2:33 city:keyword | x:keyword Zürich | Zürich ; +multiInlinestatsWithRow +required_capability: inlinestats_v8 + +row x = 1 +| inlinestats x = max(x) + min(x) +| eval y = x + 1 +| inlinestats sum(y) +| inlinestats count(y), count(x) +; + + x:integer | y:integer | sum(y):long | count(y):long | count(x):long +2 |3 |3 |1 |1 +; + +ignoreUnusedEvaledValue_AndInlineStats +required_capability: inlinestats_v8 + +ROW x = 1 +| INLINESTATS max(x) +| EVAL y = x + 1 +| KEEP x +; + +x:integer +1 +; + +ignoreUnusedEvaledValue_AndInlineStats2 +required_capability: inlinestats_v8 + +ROW x = 1, z = 2 +| INLINESTATS max(x) +| EVAL a = x + 1, b = z + 2 +| KEEP x, z +; + +x:integer | z:integer +1 | 2 +; + +ignoreUnusedEvaledValue_AndInlineStats3 +required_capability: inlinestats_v8 + +from employees +| inlinestats max(salary) +| eval y = salary + 1 +| keep salary +| sort salary desc +| limit 1 +; + + salary:integer +74999 +; + +ignoreUnusedEvaledValue_AndInlineStats4 +required_capability: inlinestats_v8 + +from employees +| inlinestats max(salary), m = min(salary) by gender +| eval y = concat(gender, "") +| keep emp_no +| sort emp_no desc +| limit 1 +; + +emp_no:integer +10100 +; + +ignoreUnusedEvaledValue_AndInlineStats5 +required_capability: inlinestats_v8 + +from employees +| inlinestats max(salary), m = min(salary) by gender +| eval y = m / 2 +| keep emp_no +| sort emp_no desc +| limit 1 +; + +emp_no:integer +10100 +; + byConstant -required_capability: inlinestats_v7 +required_capability: inlinestats_v8 FROM employees | KEEP emp_no, languages @@ -537,7 +691,7 @@ emp_no:integer | languages:integer | max_lang:integer | y:integer ; aggConstant -required_capability: inlinestats_v7 +required_capability: inlinestats_v8 FROM employees | KEEP emp_no @@ -555,7 +709,7 @@ one:integer | emp_no:integer ; percentile -required_capability: inlinestats_v7 +required_capability: inlinestats_v8 FROM employees | KEEP emp_no, salary @@ -574,7 +728,7 @@ emp_no:integer | salary:integer | ninety_fifth_salary:double ; byTwoCalculated -required_capability: inlinestats_v7 +required_capability: inlinestats_v8 FROM airports | WHERE abbrev IS NOT NULL @@ -592,9 +746,9 @@ abbrev:keyword | scalerank:integer | location:geo_point ZLO | 7 | POINT (-104.560095200097 19.1480860285854) | 2 | 20 | -100 ; -byTwoCalculatedSecondOverwrites-Ignore -required_capability: join_planning_v1 +byTwoCalculatedSecondOverwrites required_capability: stats_alias_collision_warnings +required_capability: inlinestats_v8 FROM airports | WHERE abbrev IS NOT NULL @@ -607,15 +761,15 @@ FROM airports ; warning:Line 5:4: Field 'x' shadowed by field at line 6:3 -abbrev:keyword | scalerank:integer | location:geo_point | x:double | min_sl:integer - ZRH | 3 | POINT(8.56221279534765 47.4523895064915) | 10 | 2 - ZNZ | 4 | POINT (39.2223319841558 -6.21857034620282) | 40 | 2 - ZLO | 7 | POINT (-104.560095200097 19.1480860285854) | -100 | 2 +abbrev:keyword | scalerank:integer | location:geo_point | min_sl:integer| x:double + ZRH | 3 | POINT (8.56221279534765 47.4523895064915) | 2 | 10 + ZNZ | 4 | POINT (39.2223319841558 -6.21857034620282) | 2 | 40 + ZLO | 7 | POINT (-104.560095200097 19.1480860285854) | 2 | -100 ; -byTwoCalculatedSecondOverwritesReferencingFirst-Ignore -required_capability: join_planning_v1 +byTwoCalculatedSecondOverwritesReferencingFirst required_capability: stats_alias_collision_warnings +required_capability: inlinestats_v8 FROM airports | WHERE abbrev IS NOT NULL @@ -629,37 +783,36 @@ FROM airports ; warning:Line 6:4: Field 'x' shadowed by field at line 7:3 -abbrev:keyword | scalerank:integer | location:geo_point | x:double | min_sl:integer - ZRH | 3 | POINT(8.56221279534765 47.4523895064915) | 10 | 2 - ZNZ | 4 | POINT (39.2223319841558 -6.21857034620282) | 40 | 2 - ZLO | 7 | POINT (-104.560095200097 19.1480860285854) | -100 | 2 +abbrev:keyword | scalerank:integer | location:geo_point | min_sl:integer| x:double + ZRH | 3 | POINT (8.56221279534765 47.4523895064915) | 2 | 10 + ZNZ | 4 | POINT (39.2223319841558 -6.21857034620282) | 2 | 40 + ZLO | 7 | POINT (-104.560095200097 19.1480860285854) | 2 | -100 ; -groupShadowsAgg-Ignore -required_capability: join_planning_v1 +groupShadowsAgg required_capability: stats_alias_collision_warnings +required_capability: inlinestats_v8 FROM airports | WHERE abbrev IS NOT NULL | KEEP abbrev, scalerank, location | INLINESTATS min_sl=MIN(scalerank) - , lat_10 = ROUND(ST_Y(location), -1) - BY lat_10 = ROUND(ST_Y(location), -1) + BY min_sl = ROUND(ST_Y(location), -1) , lon_10 = ROUND(ST_X(location), -1) | SORT abbrev DESC | LIMIT 3 ; -warning:Line 5:3: Field 'lat_10' shadowed by field at line 6:4 +warning:Line 4:15: Field 'min_sl' shadowed by field at line 5:4 -abbrev:keyword | scalerank:integer | location:geo_point | lat_10:double | lon_10:double | min_sl:integer - ZRH | 3 | POINT(8.56221279534765 47.4523895064915) | 50 | 10 | 2 - ZNZ | 4 | POINT (39.2223319841558 -6.21857034620282) | -10 | 40 | 4 - ZLO | 7 | POINT (-104.560095200097 19.1480860285854) | 20 | -100 | 2 +abbrev:keyword | scalerank:integer | location:geo_point | min_sl:double | lon_10:double + ZRH | 3 | POINT(8.56221279534765 47.4523895064915) | 50 | 10 + ZNZ | 4 | POINT (39.2223319841558 -6.21857034620282) | -10 | 40 + ZLO | 7 | POINT (-104.560095200097 19.1480860285854) | 20 | -100 ; groupShadowsField -required_capability: inlinestats_v7 +required_capability: inlinestats_v8 FROM employees | KEEP emp_no, salary, hire_date @@ -678,7 +831,7 @@ emp_no:integer | salary:integer | avg_salary:double | hire_date:datetime ; groupByExpression_And_ExistentField -required_capability: inlinestats_v7 +required_capability: inlinestats_v8 FROM employees | KEEP emp_no, languages, gender | EVAL x = "ABC" @@ -696,7 +849,7 @@ emp_no:integer | languages:integer | x:keyword | max_lang:integer | y:keyword | ; groupByRenamedColumn -required_capability: inlinestats_v7 +required_capability: inlinestats_v8 FROM employees | KEEP emp_no, languages, gender | INLINESTATS max_lang = MAX(languages) BY y = gender @@ -714,11 +867,13 @@ emp_no:integer | languages:integer | gender:keyword | max_lang:integer | y:keywo ; // fails with AssertionError at org.elasticsearch.xpack.esql.plan.logical.Limit.writeTo(Limit.java:70) -groupByMultipleRenamedColumns_AndOneExpression_Last-Ignore +groupByMultipleRenamedColumns_AndOneExpression_Last +required_capability: inlinestats_v8 FROM employees | KEEP emp_no, languages, gender, first_name | INLINESTATS max_lang = MAX(languages) BY y = gender, l = languages, f = left(first_name,1) +| SORT emp_no | LIMIT 10 ; @@ -736,11 +891,13 @@ emp_no:integer | languages:integer | gender:keyword|first_name:keyword|max_lang: ; // fails with AssertionError at org.elasticsearch.xpack.esql.plan.logical.Limit.writeTo(Limit.java:70) -groupByMultipleRenamedColumns_AndTwoExpressions-Ignore +groupByMultipleRenamedColumns_AndTwoExpressions +required_capability: inlinestats_v8 FROM employees | KEEP emp_no, languages, gender, first_name | INLINESTATS max_lang = MAX(languages) BY f1 = left(first_name, 1), y = gender, f2 = left(first_name, 1), l = languages +| SORT emp_no | LIMIT 10 ; @@ -758,12 +915,14 @@ emp_no:integer | languages:integer | gender:keyword|first_name:keyword|max_lang: ; // fails with AssertionError at org.elasticsearch.xpack.esql.plan.logical.Limit.writeTo(Limit.java:70) -groupByMultipleRenamedColumns_AndMultipleRenames-Ignore +groupByMultipleRenamedColumns_AndMultipleRenames +required_capability: inlinestats_v8 FROM employees | KEEP emp_no, languages, gender, first_name | RENAME first_name as f | INLINESTATS max_lang = MAX(languages) BY y = gender, l = languages, first_name = left(f, 1) +| SORT emp_no | LIMIT 10 ; @@ -781,12 +940,14 @@ emp_no:integer | languages:integer | gender:keyword| f:keyword |max_lang: ; // fails with AssertionError at org.elasticsearch.xpack.esql.plan.logical.Limit.writeTo(Limit.java:70) -groupByMultipleRenamedColumns_AndSameNameExpressionGroupingOverride-Ignore +groupByMultipleRenamedColumns_AndSameNameExpressionGroupingOverride +required_capability: inlinestats_v8 FROM employees | KEEP emp_no, languages, gender, first_name | RENAME first_name as f | INLINESTATS max_lang = MAX(languages) BY y = gender, l = languages, f = left(f, 1) +| SORT emp_no | LIMIT 10 ; @@ -804,7 +965,7 @@ emp_no:integer | languages:integer | gender:keyword|max_lang:integer| y:keyword ; twoAggregatesGroupedBy_AField_And_AnExpression -required_capability: inlinestats_v7 +required_capability: inlinestats_v8 FROM employees | KEEP emp_no, languages, gender, last_name @@ -826,7 +987,7 @@ emp_no:integer |languages:integer|last_name:keyword|max_lang:integer|min_lang:in ; groupByMultipleRenamedColumns_InversedOrder -required_capability: inlinestats_v7 +required_capability: inlinestats_v8 FROM employees | KEEP emp_no, languages, still_hired, gender @@ -844,7 +1005,7 @@ emp_no:integer |languages:integer|still_hired:boolean| gender:keyword|max_lang:i ; groupByMultipleRenamedColumns_InversedOrder_ComplexEval -required_capability: inlinestats_v7 +required_capability: inlinestats_v8 FROM employees | KEEP emp_no, languages, still_hired, gender @@ -863,7 +1024,7 @@ emp_no:integer |languages:integer|still_hired:boolean| gender:keyword|multilingu ; groupByMultipleRenamedColumns_AndComplexEval -required_capability: inlinestats_v7 +required_capability: inlinestats_v8 FROM employees | KEEP emp_no, languages, still_hired, gender @@ -882,12 +1043,14 @@ emp_no:integer |languages:integer|still_hired:boolean| gender:keyword|multilingu ; // fails with AssertionError at org.elasticsearch.xpack.esql.plan.logical.Limit.writeTo(Limit.java:70) -groupByMultipleRenamedColumns_AndConstantValue-Ignore +groupByMultipleRenamedColumns_AndConstantValue +required_capability: inlinestats_v8 FROM employees | KEEP emp_no, languages, gender, first_name | EVAL x = "ABC" | INLINESTATS max_lang = MAX(languages) BY y = gender, l = languages, f = to_lower(x) +| SORT emp_no | LIMIT 10 ; @@ -905,7 +1068,7 @@ emp_no:integer |languages:integer|gender:keyword |first_name:keyword | x:keyw ; groupByRenamedExpression -required_capability: inlinestats_v7 +required_capability: inlinestats_v8 FROM employees | KEEP emp_no, languages, gender, last_name @@ -927,7 +1090,7 @@ emp_no:integer |languages:integer|last_name:keyword|max_lang:integer|min_lang:in ; doubleFilterOnLeftAndRight_InlineStats_Sides -required_capability: inlinestats_v7 +required_capability: inlinestats_v8 FROM employees | INLINESTATS max_salary = MAX(salary), min_salary = MIN(salary) by languages @@ -948,7 +1111,7 @@ emp_no:integer |languages:integer|salary:integer |max_salary:integer|min_salary: ; filterOnInlineStatsAggs -required_capability: inlinestats_v7 +required_capability: inlinestats_v8 FROM employees | INLINESTATS max_salary = MAX(salary), min_salary = MIN(salary) by languages @@ -967,7 +1130,7 @@ emp_no:integer |languages:integer|salary:integer |max_salary:integer|min_salary: ; filterOnInlineStatsAggsValues_And_Groupings -required_capability: inlinestats_v7 +required_capability: inlinestats_v8 FROM employees | INLINESTATS max_salary = MAX(salary), min_salary = MIN(salary) by languages @@ -985,19 +1148,226 @@ emp_no:integer |languages:integer|salary:integer |max_salary:integer|min_salary: 10066 |5 |31897 |66817 |25324 ; -// Fails with (in json format only; for format=txt it returns empty result) -// line 1:101: Plan [FieldExtractExec[description{f}#274, host{f}#272, ip0{f}#273, _index..]<[],[]>] optimized incorrectly due to duplicate output attribute ip1{r}#267 -// line 1:101: Plan [TopNExec[[Order[ip1{r}#267,ASC,LAST]],1[INTEGER],null]] optimized incorrectly due to duplicate output attribute ip1{r}#267 -// line 1:44: Plan [HashJoinExec[[host_group{f}#271, card{f}#276],[host_group{f}#271, card{f}#276],[host_group{r}#271, card{r}#276],[ip1{r}#267]]] optimized incorrectly due to duplicate output attribute ip1{r}#267 -inlineStatsOverrideEVALed_FieldWithSameName-Ignore +inlineStatsOverrideEVALed_FieldWithSameName +required_capability: inlinestats_v8 FROM hosts METADATA _index | EVAL x = ip1 | INLINESTATS ip1 = COUNT(*) BY host_group, card -| SORT ip1 +| SORT ip1 DESC, x | LIMIT 1 ; - description:text | host:keyword | ip0:ip | _index:keyword | x:ip | ip1:long | host_group:text | card:keyword -alpha db server |alpha |127.0.0.1 |hosts |127.0.0.1|1 |DB servers |eth0 +description:text| host:keyword | ip0:ip | _index:keyword | x:ip | ip1:long | host_group:text | card:keyword +beta k8s server |beta |127.0.0.1 |hosts |127.0.0.2|2 |Kubernetes cluster|eth1 +; + +doubleShadowing +required_capability: inlinestats_v8 + +FROM employees +| INLINESTATS salary = min(salary) BY gender +| INLINESTATS salary = max(salary) BY gender +| KEEP salary, gender +| SORT salary DESC, gender +| LIMIT 5 +; + +salary:integer |gender:keyword +25976 |F +25976 |F +25976 |F +25976 |F +25976 |F +; + +doubleShadowing_WithIntertwinedFilters +required_capability: inlinestats_v8 + +FROM employees +| WHERE salary > 30000 +| INLINESTATS salary = min(salary) BY gender +| WHERE salary > 31000 +| INLINESTATS salary = max(salary) BY gender +| WHERE salary < 31500 +| KEEP salary, gender +| SORT salary DESC, gender +; + +salary:integer |gender:keyword +31120 |null +31120 |null +31120 |null +31120 |null +31120 |null +31120 |null +31120 |null +31120 |null +31120 |null +; + +shadowingAggregateByNextGrouping +required_capability: inlinestats_v8 + +FROM employees +| KEEP gender, languages, emp_no, salary +| INLINESTATS gender = count_distinct(gender) BY languages +| INLINESTATS avg(salary) BY gender +| SORT emp_no +| LIMIT 3 +; + +emp_no:integer |salary:integer |languages:integer|avg(salary):double|gender:long +10001 |57305 |2 |48248.55 |2 +10002 |56371 |5 |48248.55 |2 +10003 |61805 |4 |48248.55 |2 +; + +doubleShadowingWithEval +required_capability: inlinestats_v8 + +from employees +| eval salary = salary/100 +| inlinestats salary=min(salary) by gender +| inlinestats salary=max(salary) by gender +| keep salary, gender +| sort salary desc, gender +| limit 5 +; + +salary:integer|gender:keyword +259 |F +259 |F +259 |F +259 |F +259 |F +; + +doubleShadowingWithDoubleStats +required_capability: inlinestats_v8 + +from employees +| stats salary=min(salary) by gender +| stats salary=max(salary) by gender +| inlinestats salary=min(salary) +| inlinestats salary=max(salary) +; +ignoreOrder:true + +gender:keyword |salary:integer +null |25324 +F |25324 +M |25324 +; + +renamingGroupingWithItself +required_capability: inlinestats_v8 + +FROM employees +| EVAL x = gender +| INLINESTATS min_sl = MIN(salary) BY x = x +| SORT salary DESC +| KEEP salary, x, gender, min_sl, emp_no +| LIMIT 5 +; + +salary:integer |x:keyword|gender:keyword |min_sl:integer |emp_no:integer +74999 |M |M |25945 |10029 +74970 |M |M |25945 |10045 +74572 |F |F |25976 |10007 +73851 |F |F |25976 |10027 +73717 |null |null |25324 |10019 ; + +overridingGroupings +required_capability: inlinestats_v8 + +FROM employees +| INLINESTATS min_sl = MIN(salary) BY x = gender, x = languages +| KEEP salary, x, gender, min_sl, emp_no +| SORT salary +| LIMIT 5 +; +warning:Line 2:39: Field 'x' shadowed by field at line 2:51 + +salary:integer |x:integer |gender:keyword |min_sl:integer |emp_no:integer +25324 |5 |null |25324 |10015 +25945 |5 |M |25324 |10035 +25976 |1 |F |25976 |10092 +26436 |3 |M |26436 |10048 +27215 |4 |F |27215 |10057 +; + +overridingExpressionGroupings +required_capability: inlinestats_v8 + +FROM employees +| INLINESTATS min_sl = MIN(salary) BY x = TO_LOWER(gender), x = CONCAT(gender, gender) +| SORT salary DESC +| KEEP salary, x, gender, min_sl, emp_no +| LIMIT 5 +; +warning:Line 2:39: Field 'x' shadowed by field at line 2:61 + +salary:integer |x:keyword |gender:keyword |min_sl:integer |emp_no:integer +74999 |MM |M |25945 |10029 +74970 |MM |M |25945 |10045 +74572 |FF |F |25976 |10007 +73851 |FF |F |25976 |10027 +73717 |null |null |25324 |10019 +; + +reusingEvalExpressions_UsedInGroupings +required_capability: inlinestats_v8 + +FROM employees +| KEEP salary, gender, emp_no +| EVAL x = TO_LOWER(gender), x = CONCAT(x, " ", x) +| INLINESTATS min_sl = MIN(salary) BY x +| SORT salary DESC +| LIMIT 5 +; + +salary:integer |gender:keyword |emp_no:integer |min_sl:integer | x:keyword +74999 |M |10029 |25945 |m m +74970 |M |10045 |25945 |m m +74572 |F |10007 |25976 |f f +73851 |F |10027 |25976 |f f +73717 |null |10019 |25324 |null +; + +statsBeforeInlinestatsWithTopAndBucket1 +required_capability: inlinestats_v8 + +FROM books +| STATS avg_rating = AVG(ratings) BY decade = BUCKET(year, 10) +| INLINESTATS decades = TOP(decade, 3, "DESC") +| SORT avg_rating DESC +| LIMIT 10 +; + +avg_rating:double | decade:double | decades:double +4.954999923706055 |1960.0 |[2020.0, 2010.0, 2000.0] +4.387647109873154 |1990.0 |[2020.0, 2010.0, 2000.0] +4.339166651169459 |2000.0 |[2020.0, 2010.0, 2000.0] +4.274615342800434 |2010.0 |[2020.0, 2010.0, 2000.0] +4.063333352406819 |1970.0 |[2020.0, 2010.0, 2000.0] +3.880000114440918 |2020.0 |[2020.0, 2010.0, 2000.0] +3.6633334159851074|1980.0 |[2020.0, 2010.0, 2000.0] +; + +statsBeforeInlinestatsWithTopAndBucket2 +required_capability: inlinestats_v8 + +FROM sample_data +| STATS total_duration = SUM(event_duration) BY day = BUCKET(@timestamp, 1 HOUR) +| INLINESTATS days = TOP(day, 2, "ASC") +| SORT total_duration ASC +| LIMIT 5 +; + +total_duration:long | day:date | days:date +6215122 |2023-10-23T12:00:00.000Z|[2023-10-23T12:00:00.000Z, 2023-10-23T13:00:00.000Z] +17016205 |2023-10-23T13:00:00.000Z|[2023-10-23T12:00:00.000Z, 2023-10-23T13:00:00.000Z] +; + diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index 7f8c5efb45cc1..ed3953c86a8bb 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -906,7 +906,7 @@ public enum Cap { * Fixes a series of issues with inlinestats which had an incomplete implementation after lookup and inlinestats * were refactored. */ - INLINESTATS_V7(EsqlPlugin.INLINESTATS_FEATURE_FLAG), + INLINESTATS_V8(EsqlPlugin.INLINESTATS_FEATURE_FLAG), /** * Support partial_results diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PropagateEmptyRelation.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PropagateEmptyRelation.java index b6f185c856693..200a0a9269575 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PropagateEmptyRelation.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PropagateEmptyRelation.java @@ -20,6 +20,7 @@ import org.elasticsearch.xpack.esql.plan.logical.Aggregate; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan; +import org.elasticsearch.xpack.esql.plan.logical.local.EmptyLocalSupplier; import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation; import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier; import org.elasticsearch.xpack.esql.planner.PlannerUtils; @@ -36,7 +37,7 @@ public PropagateEmptyRelation() { @Override protected LogicalPlan rule(UnaryPlan plan, LogicalOptimizerContext ctx) { LogicalPlan p = plan; - if (plan.child() instanceof LocalRelation local && local.supplier() == LocalSupplier.EMPTY) { + if (plan.child() instanceof LocalRelation local && local.supplier() == EmptyLocalSupplier.EMPTY) { // only care about non-grouped aggs might return something (count) if (plan instanceof Aggregate agg && agg.groupings().isEmpty()) { List emptyBlocks = aggsFromEmpty(ctx.foldCtx(), agg.aggregates()); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PropagateInlineEvals.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PropagateInlineEvals.java index 1d921a5037b6f..c83e4b319763a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PropagateInlineEvals.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PropagateInlineEvals.java @@ -23,7 +23,7 @@ /** * Replace any evaluation from the inlined aggregation side (right side) to the left side (source) to perform the matching. - * In INLINE m = MIN(x) BY a + b the right side contains STATS m = MIN(X) BY a + b. + * In INLINESTATS m = MIN(x) BY a + b the right side contains STATS m = MIN(X) BY a + b. * As the grouping key is used to perform the join, the evaluation required for creating it has to be copied to the left side * as well. */ diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumns.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumns.java index 58e20030adb40..2fa6335914b6d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumns.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumns.java @@ -22,6 +22,7 @@ import org.elasticsearch.xpack.esql.plan.logical.Fork; import org.elasticsearch.xpack.esql.plan.logical.Limit; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin; import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation; import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier; import org.elasticsearch.xpack.esql.planner.PlannerUtils; @@ -39,15 +40,18 @@ public final class PruneColumns extends Rule { public LogicalPlan apply(LogicalPlan plan) { // track used references var used = plan.outputSet().asBuilder(); + // track inlinestats' own aggregation output (right-hand side of the join) so that any other plan on the left-hand side of the + // inline join won't have its columns pruned due to the lack of "visibility" into the right hand side output/Attributes + var inlineJoinRightOutput = new ArrayList(); Holder forkPresent = new Holder<>(false); // while going top-to-bottom (upstream) var pl = plan.transformDown(p -> { - // Note: It is NOT required to do anything special for binary plans like JOINs. It is perfectly fine that transformDown descends - // first into the left side, adding all kinds of attributes to the `used` set, and then descends into the right side - even - // though the `used` set will contain stuff only used in the left hand side. That's because any attribute that is used in the - // left hand side must have been created in the left side as well. Even field attributes belonging to the same index fields will - // have different name ids in the left and right hand sides - as in the extreme example + // Note: It is NOT required to do anything special for binary plans like JOINs, except INLINESTATS. It is perfectly fine that + // transformDown descends first into the left side, adding all kinds of attributes to the `used` set, and then descends into + // the right side - even though the `used` set will contain stuff only used in the left hand side. That's because any attribute + // that is used in the left hand side must have been created in the left side as well. Even field attributes belonging to the + // same index fields will have different name ids in the left and right hand sides - as in the extreme example // `FROM lookup_idx | LOOKUP JOIN lookup_idx ON key_field`. // skip nodes that simply pass the input through @@ -63,6 +67,11 @@ public LogicalPlan apply(LogicalPlan plan) { return p; } + // TODO: INLINESTATS unit testing for tracking this set + if (p instanceof InlineJoin ij) { + inlineJoinRightOutput.addAll(ij.right().outputSet()); + } + // remember used boolean recheck; // analyze the unused items against dedicated 'producer' nodes such as Eval and Aggregate @@ -70,7 +79,8 @@ public LogicalPlan apply(LogicalPlan plan) { do { recheck = false; if (p instanceof Aggregate aggregate) { - var remaining = removeUnused(aggregate.aggregates(), used); + // TODO: INLINESTATS https://github.com/elastic/elasticsearch/pull/128917#discussion_r2175162099 + var remaining = removeUnused(aggregate.aggregates(), used, inlineJoinRightOutput); if (remaining != null) { if (remaining.isEmpty()) { @@ -96,8 +106,19 @@ public LogicalPlan apply(LogicalPlan plan) { p = aggregate.with(aggregate.groupings(), remaining); } } + } else if (p instanceof InlineJoin ij) {// TODO: InlineStats - add unit tests for this IJ removal + var remaining = removeUnused(ij.right().output(), used, inlineJoinRightOutput); + if (remaining != null) { + if (remaining.isEmpty()) { + // remove the InlineJoin altogether + p = ij.left(); + recheck = true; + } + // TODO: InlineStats - prune ONLY the unused output columns from it? In other words, don't perform more aggs + // if they will not be used anyway + } } else if (p instanceof Eval eval) { - var remaining = removeUnused(eval.fields(), used); + var remaining = removeUnused(eval.fields(), used, inlineJoinRightOutput); // no fields, no eval if (remaining != null) { if (remaining.isEmpty()) { @@ -111,7 +132,7 @@ public LogicalPlan apply(LogicalPlan plan) { // Normally, pruning EsRelation has no effect because InsertFieldExtraction only extracts the required fields, anyway. // However, InsertFieldExtraction can't be currently used in LOOKUP JOIN right index, // it works differently as we extract all fields (other than the join key) that the EsRelation has. - var remaining = removeUnused(esr.output(), used); + var remaining = removeUnused(esr.output(), used, inlineJoinRightOutput); if (remaining != null) { p = new EsRelation(esr.source(), esr.indexPattern(), esr.indexMode(), esr.indexNameWithModes(), remaining); } @@ -131,14 +152,15 @@ public LogicalPlan apply(LogicalPlan plan) { * Prunes attributes from the list not found in the given set. * Returns null if no changed occurred. */ - private static List removeUnused(List named, AttributeSet.Builder used) { + private static List removeUnused(List named, AttributeSet.Builder used, List exceptions) { var clone = new ArrayList<>(named); var it = clone.listIterator(clone.size()); // due to Eval, go in reverse while (it.hasPrevious()) { N prev = it.previous(); - if (used.contains(prev.toAttribute()) == false) { + var attr = prev.toAttribute(); + if (used.contains(attr) == false && exceptions.contains(attr) == false) { it.remove(); } else { used.addAll(prev.references()); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneEmptyPlans.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneEmptyPlans.java index afd2b4e05493b..6f887a1ceb34b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneEmptyPlans.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneEmptyPlans.java @@ -9,13 +9,13 @@ import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan; +import org.elasticsearch.xpack.esql.plan.logical.local.EmptyLocalSupplier; import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation; -import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier; public final class PruneEmptyPlans extends OptimizerRules.OptimizerRule { public static LogicalPlan skipPlan(UnaryPlan plan) { - return new LocalRelation(plan.source(), plan.output(), LocalSupplier.EMPTY); + return new LocalRelation(plan.source(), plan.output(), EmptyLocalSupplier.EMPTY); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/ReplaceRowAsLocalRelation.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/ReplaceRowAsLocalRelation.java index 9e7b6ce80422d..a5ea01c53aa4e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/ReplaceRowAsLocalRelation.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/ReplaceRowAsLocalRelation.java @@ -11,8 +11,8 @@ import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.logical.Row; +import org.elasticsearch.xpack.esql.plan.logical.local.CopyingLocalSupplier; import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation; -import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier; import org.elasticsearch.xpack.esql.planner.PlannerUtils; import java.util.ArrayList; @@ -29,6 +29,6 @@ protected LogicalPlan rule(Row row, LogicalOptimizerContext context) { List values = new ArrayList<>(fields.size()); fields.forEach(f -> values.add(f.child().fold(context.foldCtx()))); var blocks = BlockUtils.fromListRow(PlannerUtils.NON_BREAKING_BLOCK_FACTORY, values); - return new LocalRelation(row.source(), row.output(), LocalSupplier.of(blocks)); + return new LocalRelation(row.source(), row.output(), new CopyingLocalSupplier(blocks)); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/SkipQueryOnEmptyMappings.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/SkipQueryOnEmptyMappings.java index d57a3de21b4a6..0255e8aaadffb 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/SkipQueryOnEmptyMappings.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/SkipQueryOnEmptyMappings.java @@ -9,13 +9,13 @@ import org.elasticsearch.xpack.esql.plan.logical.EsRelation; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.plan.logical.local.EmptyLocalSupplier; import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation; -import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier; public final class SkipQueryOnEmptyMappings extends OptimizerRules.OptimizerRule { @Override protected LogicalPlan rule(EsRelation plan) { - return plan.concreteIndices().isEmpty() ? new LocalRelation(plan.source(), plan.output(), LocalSupplier.EMPTY) : plan; + return plan.concreteIndices().isEmpty() ? new LocalRelation(plan.source(), plan.output(), EmptyLocalSupplier.EMPTY) : plan; } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/PlanWritables.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/PlanWritables.java index 2fe9f5182ae00..9312f2abdf509 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/PlanWritables.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/PlanWritables.java @@ -28,7 +28,10 @@ import org.elasticsearch.xpack.esql.plan.logical.inference.Rerank; import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin; import org.elasticsearch.xpack.esql.plan.logical.join.Join; +import org.elasticsearch.xpack.esql.plan.logical.local.CopyingLocalSupplier; +import org.elasticsearch.xpack.esql.plan.logical.local.EmptyLocalSupplier; import org.elasticsearch.xpack.esql.plan.logical.local.EsqlProject; +import org.elasticsearch.xpack.esql.plan.logical.local.ImmediateLocalSupplier; import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation; import org.elasticsearch.xpack.esql.plan.physical.AggregateExec; import org.elasticsearch.xpack.esql.plan.physical.DissectExec; @@ -65,6 +68,7 @@ public static List getNamedWriteables() { List entries = new ArrayList<>(); entries.addAll(logical()); entries.addAll(physical()); + entries.addAll(others()); return entries; } @@ -124,4 +128,8 @@ public static List physical() { TopNExec.ENTRY ); } + + public static List others() { + return List.of(CopyingLocalSupplier.ENTRY, ImmediateLocalSupplier.ENTRY, EmptyLocalSupplier.ENTRY); + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java index 724aa2da25983..be34631ec8149 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Objects; +import static java.util.Collections.emptyList; import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes; /** @@ -101,6 +102,9 @@ private JoinConfig joinConfig() { for (Expression g : groupings) { namedGroupings.add(Expressions.attribute(g)); } + // last named grouping wins, just like it happens for regular STATS + // ie BY x = field_1, x = field_2, the grouping is actually performed on second x (field_2) + namedGroupings = mergeOutputAttributes(namedGroupings, emptyList()); List leftFields = new ArrayList<>(groupings.size()); List rightFields = new ArrayList<>(groupings.size()); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/InlineJoin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/InlineJoin.java index 8ce476120abcf..653722ec131f8 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/InlineJoin.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/InlineJoin.java @@ -13,10 +13,10 @@ import org.elasticsearch.compute.data.BlockUtils; import org.elasticsearch.xpack.esql.core.expression.Alias; import org.elasticsearch.xpack.esql.core.expression.Attribute; -import org.elasticsearch.xpack.esql.core.expression.AttributeSet; import org.elasticsearch.xpack.esql.core.expression.Literal; import org.elasticsearch.xpack.esql.core.tree.NodeInfo; import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.core.util.Holder; import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; import org.elasticsearch.xpack.esql.plan.logical.Eval; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; @@ -56,16 +56,9 @@ public static LogicalPlan stubSource(UnaryPlan sourcePlan, LogicalPlan target) { return sourcePlan.replaceChild(new StubRelation(sourcePlan.source(), target.output())); } - /** - * Replaces the stubbed source with the actual source. - */ - public static LogicalPlan replaceStub(LogicalPlan source, LogicalPlan stubbed) { - return stubbed.transformUp(StubRelation.class, stubRelation -> source); - } - /** * TODO: perform better planning - * Keep the join in place or replace it with a projection in case no grouping is necessary. + * Keep the join in place or replace it with an Eval in case no grouping is necessary. */ public static LogicalPlan inlineData(InlineJoin target, LocalRelation data) { if (target.config().matchFields().isEmpty()) { @@ -82,6 +75,70 @@ public static LogicalPlan inlineData(InlineJoin target, LocalRelation data) { } } + /** + * Replaces the stubbed source with the actual source. + * NOTE: this will replace all {@link StubRelation}s found with the source and the method is meant to be used to replace one node only + * when being called on a plan that has only ONE StubRelation in it. + */ + public static LogicalPlan replaceStub(LogicalPlan source, LogicalPlan stubbed) { + // here we could have used stubbed.transformUp(StubRelation.class, stubRelation -> source) + // but transformUp skips changing a node if its transformed variant is equal to its original variant. + // A StubRelation can contain in its output ReferenceAttributes which do not use NameIds for equality, but only names and + // two ReferenceAttributes with the same name are equal and the transformation will not be applied. + return stubbed.transformUp(UnaryPlan.class, up -> { + if (up.child() instanceof StubRelation) { + return up.replaceChild(source); + } + return up; + }); + } + + /** + * @param stubReplacedSubPlan - the completed / "destubbed" right-hand side of the bottommost InlineJoin in the plan. For example: + * Aggregate[[],[MAX(x{r}#99,true[BOOLEAN]) AS y#102]] + * \_Limit[1000[INTEGER],false] + * \_LocalRelation[[x{r}#99],[IntVectorBlock[vector=ConstantIntVector[positions=1, value=1]]]] + * @param originalSubPlan - the original (unchanged) right-hand side of the bottommost InlineJoin in the plan. For example: + * Aggregate[[],[MAX(x{r}#99,true[BOOLEAN]) AS y#102]] + * \_StubRelation[[x{r}#99]]] + */ + public record LogicalPlanTuple(LogicalPlan stubReplacedSubPlan, LogicalPlan originalSubPlan) {} + + /** + * Finds the "first" (closest to the source command or bottom up in the tree) {@link InlineJoin}, replaces the {@link StubRelation} + * of the right-hand side with left-hand side's source and returns a tuple. + * + * Original optimized plan: + * Limit[1000[INTEGER],true] + * \_InlineJoin[LEFT,[],[],[]] + * |_Limit[1000[INTEGER],false] + * | \_LocalRelation[[x{r}#99],[IntVectorBlock[vector=ConstantIntVector[positions=1, value=1]]]] + * \_Aggregate[[],[MAX(x{r}#99,true[BOOLEAN]) AS y#102]] + * \_StubRelation[[x{r}#99]] + * + * Takes the right hand side: + * Aggregate[[],[MAX(x{r}#99,true[BOOLEAN]) AS y#102]] + * \_StubRelation[[x{r}#99]]] + * + * And uses the left-hand side's source as its source: + * Aggregate[[],[MAX(x{r}#99,true[BOOLEAN]) AS y#102]] + * \_Limit[1000[INTEGER],false] + * \_LocalRelation[[x{r}#99],[IntVectorBlock[vector=ConstantIntVector[positions=1, value=1]]]] + */ + public static LogicalPlanTuple firstSubPlan(LogicalPlan optimizedPlan) { + Holder subPlan = new Holder<>(); + // Collect the first inlinejoin (bottom up in the tree) + optimizedPlan.forEachUp(InlineJoin.class, ij -> { + // extract the right side of the plan and replace its source + if (subPlan.get() == null && ij.right().anyMatch(p -> p instanceof StubRelation)) { + var p = replaceStub(ij.left(), ij.right()); + p.setOptimized(); + subPlan.set(new LogicalPlanTuple(p, ij.right())); + } + }); + return subPlan.get(); + } + public InlineJoin(Source source, LogicalPlan left, LogicalPlan right, JoinConfig config) { super(source, left, right, config); } @@ -139,20 +196,12 @@ public List computeOutput(List left, List right JoinType joinType = config().type(); List output; if (LEFT.equals(joinType)) { - AttributeSet rightFields = AttributeSet.of(config().rightFields()); - List leftOutputWithoutMatchFields = new ArrayList<>(); - // at this point "left" part of the join contains all the attributes that represent the input of the join - // including any aliasing (evals) of expressions used as grouping attributes (or join "match fields") in the join itself - for (Attribute attr : left().output()) { - if (rightFields.contains(attr) == false) { - // the aforementioned groupings expressions or aliasing are removed from the left set of attributes - leftOutputWithoutMatchFields.add(attr); - } - } - // the actual output of the join will place the left hand side attributes (excluding any aliasing of the groupings) - // as first columns in the output followed by whatever the right hand side of join adds in this order: aggregates first, - // followed by groupings (this order should be preserved inside the rightFields() output) - output = mergeOutputAttributes(right, leftOutputWithoutMatchFields); + List leftOutputWithoutKeys = left.stream().filter(attr -> config().leftFields().contains(attr) == false).toList(); + List rightWithAppendedKeys = new ArrayList<>(right); + rightWithAppendedKeys.removeAll(config().rightFields()); + rightWithAppendedKeys.addAll(config().leftFields()); + + output = mergeOutputAttributes(rightWithAppendedKeys, leftOutputWithoutKeys); } else { throw new IllegalArgumentException(joinType.joinName() + " unsupported"); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/local/CopyingLocalSupplier.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/local/CopyingLocalSupplier.java new file mode 100644 index 0000000000000..38b7bb1a5fcd4 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/local/CopyingLocalSupplier.java @@ -0,0 +1,105 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.plan.logical.local; + +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BlockUtils; +import org.elasticsearch.xpack.esql.optimizer.rules.logical.ReplaceRowAsLocalRelation; +import org.elasticsearch.xpack.esql.plan.logical.InlineStats; +import org.elasticsearch.xpack.esql.planner.PlannerUtils; +import org.elasticsearch.xpack.esql.session.EsqlSession; + +import java.io.IOException; +import java.util.Arrays; + +/** + * A {@link LocalSupplier} that allways creates a new copy of the {@link Block}s initially provided at creation time. + * This is created specifically for {@link InlineStats} usage in {@link EsqlSession} for queries that use ROW command. + * + * The ROW which gets replaced by {@link ReplaceRowAsLocalRelation} with a {@link LocalRelation} will have its blocks + * used (and released) at least twice: + * - the {@link LocalRelation} from the left-hand side is used as a source for the right-hand side + * - the same {@link LocalRelation} is then used to continue the execution of the query on the left-hand side + * + * It delegates all its operations to {@link ImmediateLocalSupplier} and, to prevent the double release, it will always + * create a deep copy of the blocks received in the constructor initially. + * + * Example with the flow and the blocks reuse for a query like "row x = 1 | inlinestats y = max(x)" + * Step 1: + * Limit[1000[INTEGER],true] + * \_InlineJoin[LEFT,[],[],[]] + * |_Limit[1000[INTEGER],false] + * | \_LocalRelation[[x{r}#99],[IntVectorBlock[vector=ConstantIntVector[positions=1, value=1]]]] + * \_Aggregate[[],[MAX(x{r}#99,true[BOOLEAN]) AS y#102]] + * \_StubRelation[[x{r}#99]] + * + * Step 2: + * Aggregate[[],[MAX(x{r}#99,true[BOOLEAN]) AS y#102]] + * \_Limit[1000[INTEGER],false] + * \_LocalRelation[[x{r}#99],[IntVectorBlock[vector=ConstantIntVector[positions=1, value=1]]]] + * + * Step 3: + * Limit[1000[INTEGER],true] + * \_Eval[[1[INTEGER] AS y#102]] + * \_Limit[1000[INTEGER],false] + * \_LocalRelation[[x{r}#99],[IntVectorBlock[vector=ConstantIntVector[positions=1, value=1]]]] + */ +public class CopyingLocalSupplier implements LocalSupplier { + + public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( + LocalSupplier.class, + "CopyingSupplier", + CopyingLocalSupplier::new + ); + + private final ImmediateLocalSupplier delegate; + + public CopyingLocalSupplier(Block[] blocks) { + delegate = new ImmediateLocalSupplier(blocks); + } + + public CopyingLocalSupplier(StreamInput in) throws IOException { + delegate = new ImmediateLocalSupplier(in); + } + + @Override + public Block[] get() { + Block[] blockCopies = new Block[delegate.blocks.length]; + for (int i = 0; i < blockCopies.length; i++) { + blockCopies[i] = BlockUtils.deepCopyOf(delegate.blocks[i], PlannerUtils.NON_BREAKING_BLOCK_FACTORY); + } + return blockCopies; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + delegate.writeTo(out); + } + + @Override + public String getWriteableName() { + return ENTRY.name; + } + + @Override + public boolean equals(Object obj) { + if (obj == null || obj.getClass() != getClass()) { + return false; + } + CopyingLocalSupplier other = (CopyingLocalSupplier) obj; + return Arrays.equals(delegate.blocks, other.delegate.blocks); + } + + @Override + public int hashCode() { + return delegate.hashCode(); + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/local/EmptyLocalSupplier.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/local/EmptyLocalSupplier.java new file mode 100644 index 0000000000000..265b1d6c434f1 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/local/EmptyLocalSupplier.java @@ -0,0 +1,53 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.plan.logical.local; + +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BlockUtils; + +import java.io.IOException; + +public class EmptyLocalSupplier implements LocalSupplier { + + public static final LocalSupplier EMPTY = new EmptyLocalSupplier(); + public static final String NAME = "EmptySupplier"; + public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LocalSupplier.class, NAME, in -> EMPTY); + + private EmptyLocalSupplier() {} + + @Override + public String getWriteableName() { + return NAME; + } + + @Override + public Block[] get() { + return BlockUtils.NO_BLOCKS; + } + + @Override + public String toString() { + return "EMPTY"; + } + + @Override + public void writeTo(StreamOutput out) throws IOException {} + + @Override + public boolean equals(Object obj) { + return obj == EMPTY; + } + + @Override + public int hashCode() { + return 0; + } + +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/local/ImmediateLocalSupplier.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/local/ImmediateLocalSupplier.java index c076a23891bd8..8c2a51459e7dd 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/local/ImmediateLocalSupplier.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/local/ImmediateLocalSupplier.java @@ -7,8 +7,11 @@ package org.elasticsearch.xpack.esql.plan.logical.local; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.compute.data.Block; +import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput; import java.io.IOException; @@ -18,12 +21,23 @@ * A {@link LocalSupplier} that contains already filled {@link Block}s. */ public class ImmediateLocalSupplier implements LocalSupplier { - private final Block[] blocks; - public ImmediateLocalSupplier(Block[] blocks) { + public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( + LocalSupplier.class, + "ImmediateSupplier", + ImmediateLocalSupplier::new + ); + + final Block[] blocks; + + ImmediateLocalSupplier(Block[] blocks) { this.blocks = blocks; } + ImmediateLocalSupplier(StreamInput in) throws IOException { + this(((PlanStreamInput) in).readCachedBlockArray()); + } + @Override public Block[] get() { return blocks; @@ -52,4 +66,9 @@ public boolean equals(Object obj) { public int hashCode() { return Arrays.hashCode(blocks); } + + @Override + public String getWriteableName() { + return ENTRY.name; + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/local/LocalRelation.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/local/LocalRelation.java index d6106bae6b6b8..7a83fd800ab8e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/local/LocalRelation.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/local/LocalRelation.java @@ -6,6 +6,7 @@ */ package org.elasticsearch.xpack.esql.plan.logical.local; +import org.elasticsearch.TransportVersions; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -15,6 +16,7 @@ import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; import org.elasticsearch.xpack.esql.plan.logical.LeafPlan; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec; import java.io.IOException; import java.util.List; @@ -39,14 +41,26 @@ public LocalRelation(Source source, List output, LocalSupplier suppli public LocalRelation(StreamInput in) throws IOException { super(Source.readFrom((PlanStreamInput) in)); this.output = in.readNamedWriteableCollectionAsList(Attribute.class); - this.supplier = LocalSupplier.readFrom((PlanStreamInput) in); + if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOCAL_RELATION_WITH_NEW_BLOCKS)) { + this.supplier = in.readNamedWriteable(LocalSupplier.class); + } else { + this.supplier = LocalSourceExec.readLegacyLocalSupplierFrom((PlanStreamInput) in); + } } @Override public void writeTo(StreamOutput out) throws IOException { source().writeTo(out); out.writeNamedWriteableCollection(output); - supplier.writeTo(out); + if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOCAL_RELATION_WITH_NEW_BLOCKS)) { + out.writeNamedWriteable(supplier); + } else { + if (supplier == EmptyLocalSupplier.EMPTY) { + out.writeVInt(0); + } else {// here we can only have an ImmediateLocalSupplier as this was the only implementation apart from EMPTY + ((ImmediateLocalSupplier) supplier).writeTo(out); + } + } } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/local/LocalSupplier.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/local/LocalSupplier.java index 3b81da06d7077..fe626d84b2902 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/local/LocalSupplier.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/local/LocalSupplier.java @@ -7,13 +7,10 @@ package org.elasticsearch.xpack.esql.plan.logical.local; -import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.NamedWriteable; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.compute.data.Block; -import org.elasticsearch.compute.data.BlockUtils; -import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; -import java.io.IOException; import java.util.function.Supplier; /** @@ -25,41 +22,10 @@ * {@link UnsupportedOperationException}. *

*/ -public interface LocalSupplier extends Supplier, Writeable { - - LocalSupplier EMPTY = new LocalSupplier() { - @Override - public Block[] get() { - return BlockUtils.NO_BLOCKS; - } - - @Override - public String toString() { - return "EMPTY"; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeVInt(0); - } - - @Override - public boolean equals(Object obj) { - return obj == EMPTY; - } - - @Override - public int hashCode() { - return 0; - } - }; +public interface LocalSupplier extends Supplier, NamedWriteable { static LocalSupplier of(Block[] blocks) { return new ImmediateLocalSupplier(blocks); } - static LocalSupplier readFrom(PlanStreamInput in) throws IOException { - Block[] blocks = in.readCachedBlockArray(); - return blocks.length == 0 ? EMPTY : of(blocks); - } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/HashJoinExec.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/HashJoinExec.java index 3818b4e5a4c32..55a34af436f8a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/HashJoinExec.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/HashJoinExec.java @@ -23,6 +23,8 @@ import java.util.Objects; import java.util.Set; +import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes; + public class HashJoinExec extends BinaryExec implements EstimatesRowSize { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( PhysicalPlan.class, @@ -107,11 +109,12 @@ public PhysicalPlan estimateRowSize(State state) { @Override public List output() { if (lazyOutput == null) { - lazyOutput = new ArrayList<>(left().output()); - var rightFieldNames = rightFields.stream().map(Attribute::name).toList(); - lazyOutput.removeIf(a -> rightFieldNames.contains(a.name())); - lazyOutput.addAll(addedFields); - lazyOutput.addAll(rightFields); + List leftOutputWithoutKeys = left().output().stream().filter(attr -> leftFields.contains(attr) == false).toList(); + List rightWithAppendedKeys = new ArrayList<>(right().output()); + rightWithAppendedKeys.removeAll(rightFields); + rightWithAppendedKeys.addAll(leftFields); + + lazyOutput = mergeOutputAttributes(rightWithAppendedKeys, leftOutputWithoutKeys); } return lazyOutput; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/LocalSourceExec.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/LocalSourceExec.java index 84e09bdd643a4..5994ce813c851 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/LocalSourceExec.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/LocalSourceExec.java @@ -7,13 +7,17 @@ package org.elasticsearch.xpack.esql.plan.physical; +import org.elasticsearch.TransportVersions; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.compute.data.Block; import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.core.tree.NodeInfo; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; +import org.elasticsearch.xpack.esql.plan.logical.local.EmptyLocalSupplier; +import org.elasticsearch.xpack.esql.plan.logical.local.ImmediateLocalSupplier; import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier; import java.io.IOException; @@ -39,14 +43,41 @@ public LocalSourceExec(Source source, List output, LocalSupplier supp public LocalSourceExec(StreamInput in) throws IOException { super(Source.readFrom((PlanStreamInput) in)); this.output = in.readNamedWriteableCollectionAsList(Attribute.class); - this.supplier = LocalSupplier.readFrom((PlanStreamInput) in); + if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOCAL_RELATION_WITH_NEW_BLOCKS)) { + this.supplier = in.readNamedWriteable(LocalSupplier.class); + } else { + this.supplier = readLegacyLocalSupplierFrom((PlanStreamInput) in); + } + } + + /** + * Legacy {@link LocalSupplier} deserialization for code that didn't use {@link org.elasticsearch.common.io.stream.NamedWriteable}s + * and the {@link LocalSupplier} had only one implementation (the {@link ImmediateLocalSupplier}). + * + * @param in + * @return + * @throws IOException + */ + public static LocalSupplier readLegacyLocalSupplierFrom(PlanStreamInput in) throws IOException { + Block[] blocks = in.readCachedBlockArray(); + return blocks.length == 0 ? EmptyLocalSupplier.EMPTY : LocalSupplier.of(blocks); } @Override public void writeTo(StreamOutput out) throws IOException { source().writeTo(out); out.writeNamedWriteableCollection(output); - supplier.writeTo(out); + if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOCAL_RELATION_WITH_NEW_BLOCKS)) { + out.writeNamedWriteable(supplier); + } else { + if (supplier == EmptyLocalSupplier.EMPTY) { + out.writeVInt(0); + } else { + // here we can only have an ImmediateLocalSupplier as this was the only implementation apart from EMPTY + // for earlier versions + ((ImmediateLocalSupplier) supplier).writeTo(out); + } + } } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java index 137a2118b0d54..4bdb90af10316 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java @@ -25,7 +25,6 @@ import org.elasticsearch.xpack.esql.plan.logical.TopN; import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan; import org.elasticsearch.xpack.esql.plan.logical.inference.Rerank; -import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin; import org.elasticsearch.xpack.esql.plan.logical.join.Join; import org.elasticsearch.xpack.esql.plan.logical.join.JoinConfig; import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes; @@ -207,14 +206,10 @@ private PhysicalPlan mapBinary(BinaryPlan bp) { throw new EsqlIllegalArgumentException("unsupported join type [" + config.type() + "]"); } - if (join instanceof InlineJoin) { - return new FragmentExec(bp); - } - PhysicalPlan left = map(bp.left()); // only broadcast joins supported for now - hence push down as a streaming operator - if (left instanceof FragmentExec fragment) { + if (left instanceof FragmentExec) { return new FragmentExec(bp); } @@ -228,7 +223,7 @@ private PhysicalPlan mapBinary(BinaryPlan bp) { config.matchFields(), config.leftFields(), config.rightFields(), - join.output() + join.rightOutputFields() ); } if (right instanceof FragmentExec fragment diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index b6dd0c40f3481..090f85936a3ae 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -104,7 +104,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -114,6 +113,7 @@ import static org.elasticsearch.index.query.QueryBuilders.boolQuery; import static org.elasticsearch.xpack.esql.core.tree.Source.EMPTY; import static org.elasticsearch.xpack.esql.core.util.StringUtils.WILDCARD; +import static org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin.firstSubPlan; public class EsqlSession { @@ -217,8 +217,8 @@ public void executeOptimizedPlan( LogicalPlan optimizedPlan, ActionListener listener ) { - PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request); - if (explainMode) { + if (explainMode) {// TODO: INLINESTATS come back to the explain mode branch and reevaluate + PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request); String physicalPlanString = physicalPlan.toString(); List fields = List.of( new ReferenceAttribute(EMPTY, "role", DataType.KEYWORD), @@ -231,44 +231,30 @@ public void executeOptimizedPlan( values.add(List.of("coordinator", "optimizedPhysicalPlan", physicalPlanString)); var blocks = BlockUtils.fromList(PlannerUtils.NON_BREAKING_BLOCK_FACTORY, values); physicalPlan = new LocalSourceExec(Source.EMPTY, fields, LocalSupplier.of(blocks)); + planRunner.run(physicalPlan, listener); + } else { + // TODO: this could be snuck into the underlying listener + EsqlCCSUtils.updateExecutionInfoAtEndOfPlanning(executionInfo); + // execute any potential subplans + executeSubPlans(optimizedPlan, planRunner, executionInfo, request, listener); } - // TODO: this could be snuck into the underlying listener - EsqlCCSUtils.updateExecutionInfoAtEndOfPlanning(executionInfo); - // execute any potential subplans - executeSubPlans(physicalPlan, planRunner, executionInfo, request, listener); } - private record PlanTuple(PhysicalPlan physical, LogicalPlan logical) {} - private void executeSubPlans( - PhysicalPlan physicalPlan, + LogicalPlan optimizedPlan, PlanRunner runner, EsqlExecutionInfo executionInfo, EsqlQueryRequest request, ActionListener listener ) { - List subplans = new ArrayList<>(); - - // Currently the inlinestats are limited and supported as streaming operators, thus present inside the fragment as logical plans - // Below they get collected, translated into a separate, coordinator based plan and the results 'broadcasted' as a local relation - physicalPlan.forEachUp(FragmentExec.class, f -> { - f.fragment().forEachUp(InlineJoin.class, ij -> { - // extract the right side of the plan and replace its source - LogicalPlan subplan = InlineJoin.replaceStub(ij.left(), ij.right()); - // mark the new root node as optimized - subplan.setOptimized(); - PhysicalPlan subqueryPlan = logicalPlanToPhysicalPlan(subplan, request); - subplans.add(new PlanTuple(subqueryPlan, ij.right())); - }); - }); - - Iterator iterator = subplans.iterator(); + var subPlan = firstSubPlan(optimizedPlan); // TODO: merge into one method - if (subplans.size() > 0) { + if (subPlan != null) { // code-path to execute subplans - executeSubPlan(new DriverCompletionInfo.Accumulator(), physicalPlan, iterator, executionInfo, runner, listener); + executeSubPlan(new DriverCompletionInfo.Accumulator(), optimizedPlan, subPlan, executionInfo, runner, request, listener); } else { + PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request); // execute main plan runner.run(physicalPlan, listener); } @@ -276,40 +262,47 @@ private void executeSubPlans( private void executeSubPlan( DriverCompletionInfo.Accumulator completionInfoAccumulator, - PhysicalPlan plan, - Iterator subPlanIterator, + LogicalPlan optimizedPlan, + InlineJoin.LogicalPlanTuple subPlans, EsqlExecutionInfo executionInfo, PlanRunner runner, + EsqlQueryRequest request, ActionListener listener ) { - PlanTuple tuple = subPlanIterator.next(); + LOGGER.debug("Executing subplan:\n{}", subPlans.stubReplacedSubPlan()); + // Create a physical plan out of the logical sub-plan + var physicalSubPlan = logicalPlanToPhysicalPlan(subPlans.stubReplacedSubPlan(), request); - runner.run(tuple.physical, listener.delegateFailureAndWrap((next, result) -> { + runner.run(physicalSubPlan, listener.delegateFailureAndWrap((next, result) -> { try { + // Translate the subquery into a separate, coordinator based plan and the results 'broadcasted' as a local relation completionInfoAccumulator.accumulate(result.completionInfo()); - LocalRelation resultWrapper = resultToPlan(tuple.logical, result); + LocalRelation resultWrapper = resultToPlan(subPlans.stubReplacedSubPlan(), result); // replace the original logical plan with the backing result - PhysicalPlan newPlan = plan.transformUp(FragmentExec.class, f -> { - LogicalPlan frag = f.fragment(); - return f.withFragment( - frag.transformUp( - InlineJoin.class, - ij -> ij.right() == tuple.logical ? InlineJoin.inlineData(ij, resultWrapper) : ij - ) - ); - }); - - if (subPlanIterator.hasNext() == false) { - runner.run(newPlan, next.delegateFailureAndWrap((finalListener, finalResult) -> { + LogicalPlan newLogicalPlan = optimizedPlan.transformUp( + InlineJoin.class, + // use object equality since the right-hand side shouldn't have changed in the optimizedPlan at this point + // and equals would have ignored name IDs anyway + ij -> ij.right() == subPlans.originalSubPlan() ? InlineJoin.inlineData(ij, resultWrapper) : ij + ); + // TODO: INLINESTATS can we do better here and further optimize the plan AFTER one of the subplans executed? + newLogicalPlan.setOptimized(); + LOGGER.debug("Plan after previous subplan execution:\n{}", newLogicalPlan); + // look for the next inlinejoin plan + var newSubPlan = firstSubPlan(newLogicalPlan); + + if (newSubPlan == null) {// run the final "main" plan + LOGGER.debug("Executing final plan:\n{}", newLogicalPlan); + var newPhysicalPlan = logicalPlanToPhysicalPlan(newLogicalPlan, request); + runner.run(newPhysicalPlan, next.delegateFailureAndWrap((finalListener, finalResult) -> { completionInfoAccumulator.accumulate(finalResult.completionInfo()); finalListener.onResponse( new Result(finalResult.schema(), finalResult.pages(), completionInfoAccumulator.finish(), executionInfo) ); })); - } else { - // continue executing the subplans - executeSubPlan(completionInfoAccumulator, newPlan, subPlanIterator, executionInfo, runner, next); + } else {// continue executing the subplans + executeSubPlan(completionInfoAccumulator, newLogicalPlan, newSubPlan, executionInfo, runner, request, listener); } } finally { Releasables.closeExpectNoException(Releasables.wrap(Iterators.map(result.pages().iterator(), p -> p::releaseBlocks))); @@ -317,7 +310,7 @@ private void executeSubPlan( })); } - private LocalRelation resultToPlan(LogicalPlan plan, Result result) { + private static LocalRelation resultToPlan(LogicalPlan plan, Result result) { List pages = result.pages(); List schema = result.schema(); // if (pages.size() > 1) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/SessionUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/SessionUtils.java index 85abc635967a6..294d8110fd7b2 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/SessionUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/SessionUtils.java @@ -26,7 +26,7 @@ public static Block[] fromPages(List schema, List pages) { // Limit ourselves to 1mb of results similar to LOOKUP for now. long bytesUsed = pages.stream().mapToLong(Page::ramBytesUsedByBlocks).sum(); if (bytesUsed > ByteSizeValue.ofMb(1).getBytes()) { - throw new IllegalArgumentException("first phase result too large [" + ByteSizeValue.ofBytes(bytesUsed) + "] > 1mb"); + throw new IllegalArgumentException("sub-plan execution results too large [" + ByteSizeValue.ofBytes(bytesUsed) + "] > 1mb"); } int positionCount = pages.stream().mapToInt(Page::getPositionCount).sum(); Block.Builder[] builders = new Block.Builder[schema.size()]; diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java index 6ae6c6eea3b6f..0b69fc4cdd6f3 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java @@ -4120,6 +4120,21 @@ public void testImplicitCastingForDateAndDateNanosFields() { assertEquals("test*", esRelation.indexPattern()); } + public void testGroupingOverridesInStats() { + verifyUnsupported(""" + from test + | stats MIN(salary) BY x = languages, x = x + 1 + """, "Found 1 problem\n" + "line 2:43: Unknown column [x]", "mapping-default.json"); + } + + public void testGroupingOverridesInInlinestats() { + assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled()); + verifyUnsupported(""" + from test + | inlinestats MIN(salary) BY x = languages, x = x + 1 + """, "Found 1 problem\n" + "line 2:49: Unknown column [x]", "mapping-default.json"); + } + private void verifyNameAndType(String actualName, DataType actualType, String expectedName, DataType expectedType) { assertEquals(expectedName, actualName); assertEquals(expectedType, actualType); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizerTests.java index f2b70c99253b8..02805933979a8 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizerTests.java @@ -52,9 +52,9 @@ import org.elasticsearch.xpack.esql.plan.logical.Project; import org.elasticsearch.xpack.esql.plan.logical.Row; import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan; +import org.elasticsearch.xpack.esql.plan.logical.local.EmptyLocalSupplier; import org.elasticsearch.xpack.esql.plan.logical.local.EsqlProject; import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation; -import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier; import org.elasticsearch.xpack.esql.stats.SearchStats; import org.hamcrest.Matchers; import org.junit.BeforeClass; @@ -681,7 +681,7 @@ public void testReplaceStringCasingAndRLikeWithLocalRelation() { var plan = localPlan("FROM test | WHERE TO_LOWER(TO_UPPER(first_name)) RLIKE \"VALÜ*\""); var local = as(plan, LocalRelation.class); - assertThat(local.supplier(), equalTo(LocalSupplier.EMPTY)); + assertThat(local.supplier(), equalTo(EmptyLocalSupplier.EMPTY)); } // same plan as in testReplaceUpperStringCasingWithInsensitiveRLike, but with LIKE instead of RLIKE @@ -720,7 +720,7 @@ public void testReplaceStringCasingAndLikeWithLocalRelation() { var plan = localPlan("FROM test | WHERE TO_LOWER(TO_UPPER(first_name)) LIKE \"VALÜ*\""); var local = as(plan, LocalRelation.class); - assertThat(local.supplier(), equalTo(LocalSupplier.EMPTY)); + assertThat(local.supplier(), equalTo(EmptyLocalSupplier.EMPTY)); } /** @@ -780,7 +780,7 @@ private IsNotNull isNotNull(Expression field) { private LocalRelation asEmptyRelation(Object o) { var empty = as(o, LocalRelation.class); - assertThat(empty.supplier(), is(LocalSupplier.EMPTY)); + assertThat(empty.supplier(), is(EmptyLocalSupplier.EMPTY)); return empty; } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java index 7795bf5c2d9ff..d159f9949f2b2 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java @@ -128,9 +128,9 @@ import org.elasticsearch.xpack.esql.plan.logical.join.JoinConfig; import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes; import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin; +import org.elasticsearch.xpack.esql.plan.logical.local.EmptyLocalSupplier; import org.elasticsearch.xpack.esql.plan.logical.local.EsqlProject; import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation; -import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier; import java.time.Duration; import java.util.ArrayList; @@ -1238,7 +1238,7 @@ public void testPushdownLimitsPastLeftJoin() { var rule = new PushDownAndCombineLimits(); var leftChild = emptySource(); - var rightChild = new LocalRelation(Source.EMPTY, List.of(fieldAttribute()), LocalSupplier.EMPTY); + var rightChild = new LocalRelation(Source.EMPTY, List.of(fieldAttribute()), EmptyLocalSupplier.EMPTY); assertNotEquals(leftChild, rightChild); var joinConfig = new JoinConfig(JoinTypes.LEFT, List.of(), List.of(), List.of()); @@ -3089,7 +3089,7 @@ public void testFoldInEval() { """); var local = as(plan, LocalRelation.class); - assertThat(local.supplier(), is(LocalSupplier.EMPTY)); + assertThat(local.supplier(), is(EmptyLocalSupplier.EMPTY)); } public void testFoldFromRow() { @@ -5340,7 +5340,7 @@ public void testNoWrongIsNotNullPruning() { """); var local = as(plan, LocalRelation.class); - assertThat(local.supplier(), equalTo(LocalSupplier.EMPTY)); + assertThat(local.supplier(), equalTo(EmptyLocalSupplier.EMPTY)); assertWarnings( "Line 2:16: evaluation of [a + b] failed, treating result as null. Only first 20 failures recorded.", "Line 2:16: java.lang.IllegalArgumentException: single-value function encountered multi-value" @@ -6107,7 +6107,7 @@ public void testSimplifyComparisonArithmeticSkippedOnFloats() { public void testReplaceStringCasingWithInsensitiveEqualsUpperFalse() { var plan = optimizedPlan("FROM test | WHERE TO_UPPER(first_name) == \"VALÜe\""); var local = as(plan, LocalRelation.class); - assertThat(local.supplier(), equalTo(LocalSupplier.EMPTY)); + assertThat(local.supplier(), equalTo(EmptyLocalSupplier.EMPTY)); } public void testReplaceStringCasingWithInsensitiveEqualsUpperTrue() { @@ -6122,7 +6122,7 @@ public void testReplaceStringCasingWithInsensitiveEqualsUpperTrue() { public void testReplaceStringCasingWithInsensitiveEqualsLowerFalse() { var plan = optimizedPlan("FROM test | WHERE TO_LOWER(first_name) == \"VALÜe\""); var local = as(plan, LocalRelation.class); - assertThat(local.supplier(), equalTo(LocalSupplier.EMPTY)); + assertThat(local.supplier(), equalTo(EmptyLocalSupplier.EMPTY)); } public void testReplaceStringCasingWithInsensitiveEqualsLowerTrue() { @@ -7318,7 +7318,7 @@ public void testWhereNull() { | LIMIT 12 """); var local = as(plan, LocalRelation.class); - assertThat(local.supplier(), equalTo(LocalSupplier.EMPTY)); + assertThat(local.supplier(), equalTo(EmptyLocalSupplier.EMPTY)); } public void testFunctionNamedParamsAsFunctionArgument() { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java index 99eded20b1687..ef9887e98cdc2 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java @@ -109,8 +109,8 @@ import org.elasticsearch.xpack.esql.plan.logical.TopN; import org.elasticsearch.xpack.esql.plan.logical.join.Join; import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes; +import org.elasticsearch.xpack.esql.plan.logical.local.EmptyLocalSupplier; import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation; -import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier; import org.elasticsearch.xpack.esql.plan.physical.AggregateExec; import org.elasticsearch.xpack.esql.plan.physical.DissectExec; import org.elasticsearch.xpack.esql.plan.physical.EnrichExec; @@ -2547,7 +2547,7 @@ public void testLocallyMissingField() { var limit = as(optimized, LimitExec.class); var exchange = as(limit.child(), ExchangeExec.class); var source = as(exchange.child(), LocalSourceExec.class); - assertEquals(LocalSupplier.EMPTY, source.supplier()); + assertEquals(EmptyLocalSupplier.EMPTY, source.supplier()); } /** diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PropagateInlineEvalsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PropagateInlineEvalsTests.java index 088b5c1c9205e..148cfd7ca4fa6 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PropagateInlineEvalsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PropagateInlineEvalsTests.java @@ -83,7 +83,7 @@ public static void init() { * \_StubRelation[[emp_no{f}#11, languages{f}#14, gender{f}#13, y{r}#10]] */ public void testGroupingAliasingMoved_To_LeftSideOfJoin() { - assumeTrue("Requires INLINESTATS", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled()); + assumeTrue("Requires INLINESTATS", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled()); var plan = plan(""" from test | keep emp_no, languages, gender @@ -126,7 +126,7 @@ public void testGroupingAliasingMoved_To_LeftSideOfJoin() { * {r}#21]] */ public void testGroupingAliasingMoved_To_LeftSideOfJoin_WithExpression() { - assumeTrue("Requires INLINESTATS", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled()); + assumeTrue("Requires INLINESTATS", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled()); var plan = plan(""" from test | keep emp_no, languages, gender, last_name, first_name diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/AbstractLogicalPlanSerializationTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/AbstractLogicalPlanSerializationTests.java index 415edea4cd40c..4960bae3d62f4 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/AbstractLogicalPlanSerializationTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/AbstractLogicalPlanSerializationTests.java @@ -30,6 +30,7 @@ public static LogicalPlan randomChild(int depth) { protected final NamedWriteableRegistry getNamedWriteableRegistry() { List entries = new ArrayList<>(); entries.addAll(PlanWritables.logical()); + entries.addAll(PlanWritables.others()); entries.addAll(ExpressionWritables.aggregates()); entries.addAll(ExpressionWritables.allExpressions()); return new NamedWriteableRegistry(entries); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/local/CopyingLocalSupplierTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/local/CopyingLocalSupplierTests.java new file mode 100644 index 0000000000000..b5503c5c6cc27 --- /dev/null +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/local/CopyingLocalSupplierTests.java @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.plan.logical.local; + +import org.elasticsearch.TransportVersion; +import org.elasticsearch.TransportVersions; +import org.elasticsearch.compute.data.Block; + +import static org.hamcrest.Matchers.equalTo; + +public class CopyingLocalSupplierTests extends LocalSupplierTests { + + @Override + protected LocalSupplier createTestInstance() { + Block[] blocks = randomList(1, 10, LocalSupplierTests::randomBlock).toArray(Block[]::new); + return new CopyingLocalSupplier(blocks); + } + + protected void assertOnBWCObject(LocalSupplier testInstance, LocalSupplier bwcDeserializedObject, TransportVersion version) { + assertNotSame(version.toString(), bwcDeserializedObject, testInstance); + if (version.onOrAfter(TransportVersions.ESQL_LOCAL_RELATION_WITH_NEW_BLOCKS)) { + assertThat(testInstance, equalTo(bwcDeserializedObject)); + } else { + assertTrue(version.toString(), bwcDeserializedObject instanceof ImmediateLocalSupplier); + } + assertEquals(version.toString(), testInstance.hashCode(), bwcDeserializedObject.hashCode()); + } +} diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/local/EmptyLocalSupplierTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/local/EmptyLocalSupplierTests.java new file mode 100644 index 0000000000000..c1a12e50417df --- /dev/null +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/local/EmptyLocalSupplierTests.java @@ -0,0 +1,40 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.plan.logical.local; + +import org.elasticsearch.TransportVersion; +import org.elasticsearch.TransportVersions; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput; + +import java.io.IOException; + +import static org.hamcrest.Matchers.equalTo; + +public class EmptyLocalSupplierTests extends LocalSupplierTests { + + @Override + protected LocalSupplier createTestInstance() { + return EmptyLocalSupplier.EMPTY; + } + + protected void assertOnBWCObject(LocalSupplier testInstance, LocalSupplier bwcDeserializedObject, TransportVersion version) { + assertSame(version.toString(), bwcDeserializedObject, testInstance); + assertThat(version.toString(), bwcDeserializedObject, equalTo(EmptyLocalSupplier.EMPTY)); + assertEquals(version.toString(), testInstance.hashCode(), bwcDeserializedObject.hashCode()); + } + + @Override + protected void writeTo(BytesStreamOutput output, LocalSupplier instance, TransportVersion version) throws IOException { + if (version.onOrAfter(TransportVersions.ESQL_LOCAL_RELATION_WITH_NEW_BLOCKS)) { + new PlanStreamOutput(output, null).writeNamedWriteable(instance); + } else { + output.writeVInt(0); + } + } +} diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/local/ImmediateLocalSupplierTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/local/ImmediateLocalSupplierTests.java new file mode 100644 index 0000000000000..1de9581f4dbc0 --- /dev/null +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/local/ImmediateLocalSupplierTests.java @@ -0,0 +1,28 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.plan.logical.local; + +import org.elasticsearch.TransportVersion; +import org.elasticsearch.compute.data.Block; + +import static org.hamcrest.Matchers.equalTo; + +public class ImmediateLocalSupplierTests extends LocalSupplierTests { + + @Override + protected LocalSupplier createTestInstance() { + Block[] blocks = randomList(1, 10, LocalSupplierTests::randomBlock).toArray(Block[]::new); + return new ImmediateLocalSupplier(blocks); + } + + protected void assertOnBWCObject(LocalSupplier testInstance, LocalSupplier bwcDeserializedObject, TransportVersion version) { + assertNotSame(version.toString(), bwcDeserializedObject, testInstance); + assertThat(version.toString(), testInstance, equalTo(bwcDeserializedObject)); + assertEquals(version.toString(), testInstance.hashCode(), bwcDeserializedObject.hashCode()); + } +} diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/local/LocalSupplierTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/local/LocalSupplierTests.java index b7e934e273f00..1d144b995711f 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/local/LocalSupplierTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/local/LocalSupplierTests.java @@ -8,45 +8,95 @@ package org.elasticsearch.xpack.esql.plan.logical.local; import org.elasticsearch.TransportVersion; +import org.elasticsearch.TransportVersions; import org.elasticsearch.common.breaker.NoopCircuitBreaker; import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.test.AbstractWireTestCase; +import org.elasticsearch.test.TransportVersionUtils; import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput; +import org.elasticsearch.xpack.esql.plan.PlanWritables; +import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec; import java.io.IOException; import java.util.Arrays; +import java.util.NavigableSet; + +public abstract class LocalSupplierTests extends AbstractWireTestCase { + + private static final NavigableSet DEFAULT_BWC_VERSIONS = getAllBWCVersions(); -public class LocalSupplierTests extends AbstractWireTestCase { private static final BlockFactory BLOCK_FACTORY = BlockFactory.getInstance( new NoopCircuitBreaker("noop-esql-breaker"), BigArrays.NON_RECYCLING_INSTANCE ); + private static NavigableSet getAllBWCVersions() { + return TransportVersionUtils.allReleasedVersions().tailSet(TransportVersions.MINIMUM_COMPATIBLE, true); + } + + public final void testBwcSerialization() throws IOException { + for (int runs = 0; runs < NUMBER_OF_TEST_RUNS; runs++) { + LocalSupplier testInstance = createTestInstance(); + for (TransportVersion bwcVersion : DEFAULT_BWC_VERSIONS) { + assertBwcSerialization(testInstance, bwcVersion); + } + } + } + + protected final void assertBwcSerialization(LocalSupplier testInstance, TransportVersion version) throws IOException { + LocalSupplier deserializedInstance = copyInstance(testInstance, version); + assertOnBWCObject(testInstance, deserializedInstance, version); + } + + protected abstract void assertOnBWCObject(LocalSupplier testInstance, LocalSupplier bwcDeserializedObject, TransportVersion version); + @Override protected LocalSupplier copyInstance(LocalSupplier instance, TransportVersion version) throws IOException { try (BytesStreamOutput output = new BytesStreamOutput()) { output.setTransportVersion(version); - instance.writeTo(new PlanStreamOutput(output, null)); + writeTo(output, instance, version); try (StreamInput in = output.bytes().streamInput()) { in.setTransportVersion(version); - return LocalSupplier.readFrom(new PlanStreamInput(in, getNamedWriteableRegistry(), null)); + return readFrom(in, version); } } } + protected void writeTo(BytesStreamOutput output, LocalSupplier instance, TransportVersion version) throws IOException { + if (version.onOrAfter(TransportVersions.ESQL_LOCAL_RELATION_WITH_NEW_BLOCKS)) { + new PlanStreamOutput(output, null).writeNamedWriteable(instance); + } else { + instance.writeTo(new PlanStreamOutput(output, null)); + } + } + + protected LocalSupplier readFrom(StreamInput input, TransportVersion version) throws IOException { + if (version.onOrAfter(TransportVersions.ESQL_LOCAL_RELATION_WITH_NEW_BLOCKS)) { + return new PlanStreamInput(input, getNamedWriteableRegistry(), null).readNamedWriteable(LocalSupplier.class); + } else { + return LocalSourceExec.readLegacyLocalSupplierFrom(new PlanStreamInput(input, getNamedWriteableRegistry(), null)); + } + } + @Override protected LocalSupplier createTestInstance() { - return randomBoolean() ? LocalSupplier.EMPTY : randomNonEmpty(); + return randomLocalSupplier(); + } + + public static LocalSupplier randomLocalSupplier() { + return randomBoolean() ? EmptyLocalSupplier.EMPTY : randomNonEmpty(); } public static LocalSupplier randomNonEmpty() { - return LocalSupplier.of(randomList(1, 10, LocalSupplierTests::randomBlock).toArray(Block[]::new)); + Block[] blocks = randomList(1, 10, LocalSupplierTests::randomBlock).toArray(Block[]::new); + return randomBoolean() ? LocalSupplier.of(blocks) : new CopyingLocalSupplier(blocks); } @Override @@ -54,7 +104,7 @@ protected LocalSupplier mutateInstance(LocalSupplier instance) throws IOExceptio Block[] blocks = instance.get(); if (blocks.length > 0 && randomBoolean()) { if (randomBoolean()) { - return LocalSupplier.EMPTY; + return EmptyLocalSupplier.EMPTY; } return LocalSupplier.of(Arrays.copyOf(blocks, blocks.length - 1, Block[].class)); } @@ -63,7 +113,12 @@ protected LocalSupplier mutateInstance(LocalSupplier instance) throws IOExceptio return LocalSupplier.of(blocks); } - private static Block randomBlock() { + @Override + protected NamedWriteableRegistry getNamedWriteableRegistry() { + return new NamedWriteableRegistry(PlanWritables.others()); + } + + static Block randomBlock() { int len = between(1, 1000); try (IntBlock.Builder ints = BLOCK_FACTORY.newIntBlockBuilder(len)) { for (int i = 0; i < len; i++) { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/LocalSourceExecSerializationTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/LocalSourceExecSerializationTests.java index 2a5f844d3fee0..e77acc06e4706 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/LocalSourceExecSerializationTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/LocalSourceExecSerializationTests.java @@ -19,7 +19,7 @@ public class LocalSourceExecSerializationTests extends AbstractPhysicalPlanSeria public static LocalSourceExec randomLocalSourceExec() { Source source = randomSource(); List output = randomFieldAttributes(1, 9, false); - LocalSupplier supplier = randomBoolean() ? LocalSupplier.EMPTY : LocalSupplierTests.randomNonEmpty(); + LocalSupplier supplier = LocalSupplierTests.randomLocalSupplier(); return new LocalSourceExec(source, output, supplier); } @@ -35,7 +35,7 @@ protected LocalSourceExec mutateInstance(LocalSourceExec instance) throws IOExce if (randomBoolean()) { output = randomValueOtherThan(output, () -> randomFieldAttributes(1, 9, false)); } else { - supplier = randomValueOtherThan(supplier, () -> randomBoolean() ? LocalSupplier.EMPTY : LocalSupplierTests.randomNonEmpty()); + supplier = randomValueOtherThan(supplier, () -> LocalSupplierTests.randomLocalSupplier()); } return new LocalSourceExec(instance.source(), output, supplier); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverFieldNamesTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverFieldNamesTests.java index ae66488f419e1..7dee2803231e0 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverFieldNamesTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverFieldNamesTests.java @@ -30,7 +30,7 @@ public void testBasicFromCommand() { } public void testBasicFromCommandWithInlinestats() { - assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled()); + assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled()); assertFieldNames("from test | inlinestats max(salary) by gender", ALL_FIELDS); } @@ -39,7 +39,7 @@ public void testBasicFromCommandWithMetadata() { } public void testBasicFromCommandWithMetadata_AndInlinestats() { - assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled()); + assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled()); assertFieldNames("from test metadata _index, _id, _version | inlinestats max(salary)", ALL_FIELDS); } @@ -305,7 +305,7 @@ public void testLimitZero() { } public void testLimitZero_WithInlinestats() { - assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled()); + assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled()); assertFieldNames(""" FROM employees | INLINESTATS COUNT(*), MAX(salary) BY gender @@ -320,7 +320,7 @@ public void testDocsDropHeight() { } public void testDocsDropHeight_WithInlinestats() { - assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled()); + assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled()); assertFieldNames(""" FROM employees | DROP height @@ -336,7 +336,7 @@ public void testDocsDropHeightWithWildcard() { } public void testDocsDropHeightWithWildcard_AndInlinestats() { - assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled()); + assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled()); assertFieldNames(""" FROM employees | INLINESTATS MAX(salary) BY gender @@ -503,7 +503,7 @@ public void testSortWithLimitOne_DropHeight() { } public void testSortWithLimitOne_DropHeight_WithInlinestats() { - assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled()); + assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled()); assertFieldNames("from employees | inlinestats avg(salary) by languages | sort languages | limit 1 | drop height*", ALL_FIELDS); } @@ -803,7 +803,7 @@ public void testFilterById() { } public void testFilterById_WithInlinestats() { - assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled()); + assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled()); assertFieldNames("FROM apps metadata _id | INLINESTATS max(rate) | WHERE _id == \"4\"", ALL_FIELDS); } @@ -1274,7 +1274,7 @@ public void testProjectDropPattern() { } public void testProjectDropPattern_WithInlinestats() { - assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled()); + assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled()); assertFieldNames(""" from test | inlinestats max(foo) by bar @@ -1357,7 +1357,7 @@ public void testCountAllAndOtherStatGrouped() { } public void testCountAllAndOtherStatGrouped_WithInlinestats() { - assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled()); + assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled()); assertFieldNames(""" from test | inlinestats c = count(*), min = min(emp_no) by languages @@ -1396,7 +1396,7 @@ public void testCountAllWithEval() { } public void testCountAllWithEval_AndInlinestats() { - assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled()); + assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled()); assertFieldNames(""" from test | rename languages as l @@ -1409,7 +1409,7 @@ public void testCountAllWithEval_AndInlinestats() { } public void testKeepAfterEval_AndInlinestats() { - assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled()); + assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled()); assertFieldNames(""" from test | rename languages as l @@ -1422,7 +1422,7 @@ public void testKeepAfterEval_AndInlinestats() { } public void testKeepBeforeEval_AndInlinestats() { - assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled()); + assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled()); assertFieldNames(""" from test | rename languages as l @@ -1435,7 +1435,7 @@ public void testKeepBeforeEval_AndInlinestats() { } public void testStatsBeforeEval_AndInlinestats() { - assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled()); + assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled()); assertFieldNames(""" from test | rename languages as l @@ -1447,7 +1447,7 @@ public void testStatsBeforeEval_AndInlinestats() { } public void testStatsBeforeInlinestats() { - assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled()); + assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled()); assertFieldNames(""" from test | stats min = min(salary) by languages @@ -1456,7 +1456,7 @@ public void testStatsBeforeInlinestats() { } public void testKeepBeforeInlinestats() { - assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled()); + assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled()); assertFieldNames(""" from test | keep languages, salary