From 5c6b3f47500ecc7beb38ccf8a859d7e0603b28ae Mon Sep 17 00:00:00 2001 From: Alexander Spies Date: Tue, 15 Jul 2025 12:41:22 +0200 Subject: [PATCH 1/5] Allow remote enrich after LOOKUP JOIN --- .../src/main/resources/lookup-join.csv-spec | 70 +++++++++++++++++++ .../action/CrossClusterAsyncEnrichStopIT.java | 2 +- .../xpack/esql/action/EsqlCapabilities.java | 6 ++ .../xpack/esql/planner/mapper/Mapper.java | 7 ++ 4 files changed, 84 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec index bdf0413a03d02..77ad020929fc4 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec @@ -1875,6 +1875,76 @@ type:keyword | language_code:integer | language_name:keyword Production | 3 | Spanish ; +enrichAfterLookupJoin +required_capability: join_lookup_v12 + +FROM sample_data +| KEEP message +| WHERE message == "Connected to 10.1.0.1" +| EVAL language_code = "1" +| LOOKUP JOIN message_types_lookup ON message +| ENRICH languages_policy ON language_code +; + +message:keyword | language_code:keyword | type:keyword | language_name:keyword +Connected to 10.1.0.1 | 1 | Success | English +; + +remoteEnrichAfterLookupJoin +required_capability: join_lookup_v12 +required_capability: remote_enrich_after_lookup_join + +# TODO: a bunch more tests, also switch orders, use double _remote enrich, double lookup join etc. Also add tests with +# _coordinator enrich. What about ROW? + +FROM sample_data +| KEEP message +| WHERE message == "Connected to 10.1.0.1" +| EVAL language_code = "1" +| LOOKUP JOIN message_types_lookup ON message +| ENRICH _remote:languages_policy ON language_code +; + +message:keyword | language_code:keyword | type:keyword | language_name:keyword +Connected to 10.1.0.1 | 1 | Success | English +; + +remoteEnrichSortAfterLookupJoin +required_capability: join_lookup_v12 +required_capability: remote_enrich_after_lookup_join + +FROM sample_data +| KEEP message +| WHERE message == "Connected to 10.1.0.1" OR message == "Connected to 10.1.0.2" +| EVAL language_code = "1" +| LOOKUP JOIN message_types_lookup ON message +| ENRICH _remote:languages_policy ON language_code +| SORT message ASC +; + +message:keyword | language_code:keyword | type:keyword | language_name:keyword +Connected to 10.1.0.1 | 1 | Success | English +Connected to 10.1.0.2 | 1 | Success | English +; + +sortRemoteEnrichAfterLookupJoin +required_capability: join_lookup_v12 +required_capability: remote_enrich_after_lookup_join + +FROM sample_data +| KEEP message +| WHERE message == "Connected to 10.1.0.1" OR message == "Connected to 10.1.0.2" +| EVAL language_code = "1" +| LOOKUP JOIN message_types_lookup ON message +| SORT message ASC +| ENRICH _remote:languages_policy ON language_code +| LIMIT 2 +; + +message:keyword | language_code:keyword | type:keyword | language_name:keyword +Connected to 10.1.0.1 | 1 | Success | English +Connected to 10.1.0.2 | 1 | Success | English +; ############################################### # LOOKUP JOIN on mixed numerical fields diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java index 815b08409723c..cea8bbefc0a5c 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java @@ -117,7 +117,7 @@ public void testEnrichAfterStop() throws Exception { SimplePauseFieldPlugin.allowEmitting.countDown(); try (EsqlQueryResponse resp = stopAction.actionGet(30, TimeUnit.SECONDS)) { - // Compare this to CrossClustersEnrichIT.testEnrichTwiceThenAggs - the results from c2 will be absent + // Compare this to CrossClusterEnrichIT.testEnrichTwiceThenAggs - the results from c2 will be absent // because we stopped it before processing the data assertThat( getValuesList(resp), diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index 4759579b94d24..618369704d4d5 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -1205,6 +1205,12 @@ public enum Cap { */ ENABLE_LOOKUP_JOIN_ON_REMOTE(Build.current().isSnapshot()), + /** + * Fix the planning of {@code | ENRICH _remote:policy} when there's a preceding {@code | LOOKUP JOIN}, + * see java.lang.ClassCastException when combining LOOKUP JOIN and remote ENRICH + */ + REMOTE_ENRICH_AFTER_LOOKUP_JOIN, + /** * MATCH PHRASE function */ diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java index 4d1d65d63932d..09e9ed45c1382 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java @@ -28,6 +28,7 @@ import org.elasticsearch.xpack.esql.plan.logical.join.Join; import org.elasticsearch.xpack.esql.plan.logical.join.JoinConfig; import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes; +import org.elasticsearch.xpack.esql.plan.physical.BinaryExec; import org.elasticsearch.xpack.esql.plan.physical.EnrichExec; import org.elasticsearch.xpack.esql.plan.physical.ExchangeExec; import org.elasticsearch.xpack.esql.plan.physical.FragmentExec; @@ -102,7 +103,13 @@ private PhysicalPlan mapUnary(UnaryPlan unary) { // 5. So we should be keeping: LimitExec, ExchangeExec, OrderExec, TopNExec (actually OrderExec probably can't happen anyway). Holder hasFragment = new Holder<>(false); + // Remove most plan nodes between this remote ENRICH and the data node's fragment so they're not executed twice; + // include the plan up until this ENRICH in the fragment. var childTransformed = mappedChild.transformUp(f -> { + if (f instanceof BinaryExec be) { + // Remove any LOOKUP JOIN or inline Join from INLINE STATS do avoid double execution + return be.left(); + } // Once we reached FragmentExec, we stuff our Enrich under it if (f instanceof FragmentExec) { hasFragment.set(true); From 3aaf1b4b5ac9e4838cf18a725e357a58fcd95969 Mon Sep 17 00:00:00 2001 From: Alexander Spies Date: Tue, 15 Jul 2025 12:46:17 +0200 Subject: [PATCH 2/5] Update docs/changelog/131286.yaml --- docs/changelog/131286.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 docs/changelog/131286.yaml diff --git a/docs/changelog/131286.yaml b/docs/changelog/131286.yaml new file mode 100644 index 0000000000000..29ba5f5f484aa --- /dev/null +++ b/docs/changelog/131286.yaml @@ -0,0 +1,6 @@ +pr: 131286 +summary: Allow remote enrich after LOOKUP JOIN +area: ES|QL +type: bug +issues: + - 129372 From 4efe247ad7dfd28a5bea67e8b196a0640ce985da Mon Sep 17 00:00:00 2001 From: Alexander Spies Date: Thu, 17 Jul 2025 08:44:52 +0200 Subject: [PATCH 3/5] Update docs/changelog/131286.yaml --- docs/changelog/131286.yaml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/docs/changelog/131286.yaml b/docs/changelog/131286.yaml index 29ba5f5f484aa..455982aeeea7c 100644 --- a/docs/changelog/131286.yaml +++ b/docs/changelog/131286.yaml @@ -2,5 +2,4 @@ pr: 131286 summary: Allow remote enrich after LOOKUP JOIN area: ES|QL type: bug -issues: - - 129372 +issues: [] From 0f65dd57636c86a753bfa7f11e5e8a15597bc883 Mon Sep 17 00:00:00 2001 From: Alexander Spies Date: Thu, 17 Jul 2025 09:38:10 +0200 Subject: [PATCH 4/5] Make LJs remote before remote enrich Alternative approach: rather than reinventing the wheel, let's just mark any LOOKUP JOINs upstream from a remote ENRICH as remote, too, so the validation automatically kicks in. --- .../xpack/esql/parser/LogicalPlanBuilder.java | 10 ++- .../xpack/esql/planner/mapper/Mapper.java | 7 +- .../esql/analysis/AnalyzerTestUtils.java | 49 ++++++------ .../xpack/esql/analysis/VerifierTests.java | 78 ++++++++++++++++++- 4 files changed, 110 insertions(+), 34 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java index 9e232bd8f02f9..7c40b4e342b53 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java @@ -463,7 +463,7 @@ public LogicalPlan visitShowInfo(EsqlBaseParser.ShowInfoContext ctx) { @Override public PlanFactory visitEnrichCommand(EsqlBaseParser.EnrichCommandContext ctx) { - return p -> { + return child -> { var source = source(ctx); Tuple tuple = parsePolicyName(ctx.policyName); Mode mode = tuple.v1(); @@ -482,9 +482,15 @@ public PlanFactory visitEnrichCommand(EsqlBaseParser.EnrichCommandContext ctx) { } List keepClauses = visitList(this, ctx.enrichWithClause(), NamedExpression.class); + + // If this is a remote-only ENRICH, any upstream LOOKUP JOINs need to be treated as remote-only, too. + LogicalPlan updatedChild = (mode == Mode.REMOTE) == false + ? child + : child.transformDown(LookupJoin.class, lj -> new LookupJoin(lj.source(), lj.left(), lj.right(), lj.config(), true)); + return new Enrich( source, - p, + updatedChild, mode, Literal.keyword(source(ctx.policyName), policyNameString), matchField, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java index 09e9ed45c1382..f9aa26b0987de 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java @@ -28,7 +28,6 @@ import org.elasticsearch.xpack.esql.plan.logical.join.Join; import org.elasticsearch.xpack.esql.plan.logical.join.JoinConfig; import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes; -import org.elasticsearch.xpack.esql.plan.physical.BinaryExec; import org.elasticsearch.xpack.esql.plan.physical.EnrichExec; import org.elasticsearch.xpack.esql.plan.physical.ExchangeExec; import org.elasticsearch.xpack.esql.plan.physical.FragmentExec; @@ -88,7 +87,7 @@ private PhysicalPlan mapUnary(UnaryPlan unary) { PhysicalPlan mappedChild = map(unary.child()); // - // TODO - this is hard to follow and needs reworking + // TODO - this is hard to follow, causes bugs and needs reworking // https://github.com/elastic/elasticsearch/issues/115897 // if (unary instanceof Enrich enrich && enrich.mode() == Enrich.Mode.REMOTE) { @@ -106,10 +105,6 @@ private PhysicalPlan mapUnary(UnaryPlan unary) { // Remove most plan nodes between this remote ENRICH and the data node's fragment so they're not executed twice; // include the plan up until this ENRICH in the fragment. var childTransformed = mappedChild.transformUp(f -> { - if (f instanceof BinaryExec be) { - // Remove any LOOKUP JOIN or inline Join from INLINE STATS do avoid double execution - return be.left(); - } // Once we reached FragmentExec, we stuff our Enrich under it if (f instanceof FragmentExec) { hasFragment.set(true); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTestUtils.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTestUtils.java index cbb825ca9581b..fbfa18dccc477 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTestUtils.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTestUtils.java @@ -36,9 +36,9 @@ import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.GEO_MATCH_TYPE; import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.MATCH_TYPE; import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.RANGE_TYPE; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_CFG; import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER; import static org.elasticsearch.xpack.esql.EsqlTestUtils.configuration; -import static org.elasticsearch.xpack.esql.EsqlTestUtils.emptyInferenceResolution; public final class AnalyzerTestUtils { @@ -61,27 +61,36 @@ public static Analyzer analyzer(IndexResolution indexResolution, Map lookupResolution, Verifier verifier) { + return analyzer(indexResolution, lookupResolution, defaultEnrichResolution(), verifier); + } + + public static Analyzer analyzer( + IndexResolution indexResolution, + Map lookupResolution, + EnrichResolution enrichResolution, + Verifier verifier + ) { + return analyzer(indexResolution, lookupResolution, enrichResolution, verifier, TEST_CFG); + } + + public static Analyzer analyzer( + IndexResolution indexResolution, + Map lookupResolution, + EnrichResolution enrichResolution, + Verifier verifier, + Configuration config + ) { return new Analyzer( new AnalyzerContext( - EsqlTestUtils.TEST_CFG, + config, new EsqlFunctionRegistry(), indexResolution, lookupResolution, - defaultEnrichResolution(), + enrichResolution, defaultInferenceResolution() ), verifier @@ -89,17 +98,7 @@ public static Analyzer analyzer(IndexResolution indexResolution, Map query("FROM test,remote:test | EVAL language_code = languages | LOOKUP JOIN languages_lookup ON language_code") ); assertThat(e.getMessage(), containsString("remote clusters are not supported with LOOKUP JOIN")); - } private void checkFullTextFunctionsInStats(String functionInvocation) { From 6d5215348e8bd6f70df6ffeb74054bacdea35717 Mon Sep 17 00:00:00 2001 From: Alexander Spies Date: Fri, 18 Jul 2025 09:29:36 +0200 Subject: [PATCH 5/5] Update 131286.yaml --- docs/changelog/131286.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/changelog/131286.yaml b/docs/changelog/131286.yaml index 455982aeeea7c..ce1fc56860fc5 100644 --- a/docs/changelog/131286.yaml +++ b/docs/changelog/131286.yaml @@ -1,5 +1,5 @@ pr: 131286 summary: Allow remote enrich after LOOKUP JOIN area: ES|QL -type: bug +type: enhancement issues: []