From 24f81b5be25beb9a96ade09dac79d13c3e39c179 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?= Date: Mon, 16 Dec 2024 14:27:59 +0100 Subject: [PATCH 1/4] Allow using the same index in FROM and LOOKUP --- .../xpack/esql/core/tree/Node.java | 7 +++ .../xpack/esql/planner/PlannerUtils.java | 8 ++- .../xpack/esql/plugin/ComputeService.java | 49 ++----------------- 3 files changed, 17 insertions(+), 47 deletions(-) diff --git a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/tree/Node.java b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/tree/Node.java index feeba39756373..e0a84ae921165 100644 --- a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/tree/Node.java +++ b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/tree/Node.java @@ -11,6 +11,7 @@ import java.util.ArrayList; import java.util.BitSet; +import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Objects; @@ -85,6 +86,12 @@ public void forEachUp(Consumer action) { action.accept((T) this); } + @SuppressWarnings("unchecked") + public void forEachUp(Consumer action, Function> childrenGetter) { + childrenGetter.apply((T) this).forEach(c -> c.forEachUp(action, childrenGetter)); + action.accept((T) this); + } + @SuppressWarnings("unchecked") public void forEachUp(Class typeToken, Consumer action) { forEachUp(t -> { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java index 37f89891860d8..b11d5d7576a50 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java @@ -40,6 +40,7 @@ import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec; import org.elasticsearch.xpack.esql.plan.physical.ExchangeSourceExec; import org.elasticsearch.xpack.esql.plan.physical.FragmentExec; +import org.elasticsearch.xpack.esql.plan.physical.LookupJoinExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.planner.mapper.LocalMapper; import org.elasticsearch.xpack.esql.planner.mapper.Mapper; @@ -105,7 +106,12 @@ public static Set planConcreteIndices(PhysicalPlan plan) { return Set.of(); } var indices = new LinkedHashSet(); - plan.forEachUp(FragmentExec.class, f -> f.fragment().forEachUp(EsRelation.class, r -> indices.addAll(r.index().concreteIndices()))); + // TODO: This only works for LEFT join, we still need to support RIGHT join + plan.forEachUp(node -> { + if (node instanceof FragmentExec f) { + f.fragment().forEachUp(EsRelation.class, r -> indices.addAll(r.index().concreteIndices())); + } + }, node -> node instanceof LookupJoinExec join ? List.of(join.left()) : node.children()); return indices; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 9b59b98a7cdc2..e77a2443df2dd 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -63,12 +63,8 @@ import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.enrich.EnrichLookupService; import org.elasticsearch.xpack.esql.enrich.LookupFromIndexService; -import org.elasticsearch.xpack.esql.plan.logical.EsRelation; -import org.elasticsearch.xpack.esql.plan.logical.join.Join; import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec; import org.elasticsearch.xpack.esql.plan.physical.ExchangeSourceExec; -import org.elasticsearch.xpack.esql.plan.physical.FragmentExec; -import org.elasticsearch.xpack.esql.plan.physical.LookupJoinExec; import org.elasticsearch.xpack.esql.plan.physical.OutputExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders; @@ -81,7 +77,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -167,11 +162,9 @@ public void execute( Map clusterToConcreteIndices = transportService.getRemoteClusterService() .groupIndices(SearchRequest.DEFAULT_INDICES_OPTIONS, PlannerUtils.planConcreteIndices(physicalPlan).toArray(String[]::new)); QueryPragmas queryPragmas = configuration.pragmas(); - Set lookupIndexNames = findLookupIndexNames(physicalPlan); - Set concreteIndexNames = selectConcreteIndices(clusterToConcreteIndices, lookupIndexNames); if (dataNodePlan == null) { - if (concreteIndexNames.isEmpty() == false) { - String error = "expected no concrete indices without data node plan; got " + concreteIndexNames; + if (clusterToConcreteIndices.values().stream().allMatch(v -> v.indices().length == 0) == false) { + String error = "expected no concrete indices without data node plan; got " + clusterToConcreteIndices; assert false : error; listener.onFailure(new IllegalStateException(error)); return; @@ -194,7 +187,7 @@ public void execute( return; } } else { - if (concreteIndexNames.isEmpty()) { + if (clusterToConcreteIndices.values().stream().allMatch(v -> v.indices().length == 0)) { var error = "expected concrete indices with data node plan but got empty; data node plan " + dataNodePlan; assert false : error; listener.onFailure(new IllegalStateException(error)); @@ -268,42 +261,6 @@ public void execute( } } - private Set selectConcreteIndices(Map clusterToConcreteIndices, Set indexesToIgnore) { - Set concreteIndexNames = new HashSet<>(); - clusterToConcreteIndices.forEach((clusterAlias, concreteIndices) -> { - for (String index : concreteIndices.indices()) { - if (indexesToIgnore.contains(index) == false) { - concreteIndexNames.add(index); - } - } - }); - return concreteIndexNames; - } - - private Set findLookupIndexNames(PhysicalPlan physicalPlan) { - Set lookupIndexNames = new HashSet<>(); - // When planning JOIN on the coordinator node: "LookupJoinExec.lookup()->FragmentExec.fragment()->EsRelation.index()" - physicalPlan.forEachDown( - LookupJoinExec.class, - lookupJoinExec -> lookupJoinExec.lookup() - .forEachDown( - FragmentExec.class, - frag -> frag.fragment().forEachDown(EsRelation.class, esRelation -> lookupIndexNames.add(esRelation.index().name())) - ) - ); - // When planning JOIN on the data node: "FragmentExec.fragment()->Join.right()->EsRelation.index()" - // TODO this only works for LEFT join, so we still need to support RIGHT join - physicalPlan.forEachDown( - FragmentExec.class, - fragmentExec -> fragmentExec.fragment() - .forEachDown( - Join.class, - join -> join.right().forEachDown(EsRelation.class, esRelation -> lookupIndexNames.add(esRelation.index().name())) - ) - ); - return lookupIndexNames; - } - // For queries like: FROM logs* | LIMIT 0 (including cross-cluster LIMIT 0 queries) private static void updateShardCountForCoordinatorOnlyQuery(EsqlExecutionInfo execInfo) { if (execInfo.isCrossClusterSearch()) { From 14683f27d0fcc954b72c5c57cc78c32bc9c9344a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?= Date: Mon, 16 Dec 2024 14:32:53 +0100 Subject: [PATCH 2/4] Added capability and test --- .../src/main/resources/lookup-join.csv-spec | 17 +++++++++++++++++ .../xpack/esql/action/EsqlCapabilities.java | 5 +++++ 2 files changed, 22 insertions(+) diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec index c39f4ae7b4e0c..699fcfe856c73 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec @@ -483,3 +483,20 @@ emp_no:integer | language_code:integer | language_name:keyword 10001 | 2 | French 10003 | 4 | German ; + +repeatedIndexOnFrom +required_capability: join_lookup_v5 +required_capability: join_lookup_repeated_index_from + +FROM languages_lookup +| LOOKUP JOIN languages_lookup ON language_code +| SORT language_code +; + +language_code:integer | language_name:keyword +1 | English +2 | French +3 | Spanish +4 | German + +; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index 649ec1eba9785..d4b0d2b870343 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -549,6 +549,11 @@ public enum Cap { */ JOIN_LOOKUP_V5(Build.current().isSnapshot()), + /** + * LOOKUP JOIN with the same index as the FROM + */ + JOIN_LOOKUP_REPEATED_INDEX_FROM(JOIN_LOOKUP_V5.isEnabled()), + /** * Fix for https://github.com/elastic/elasticsearch/issues/117054 */ From 721f70279a8eef84e15804cb0a260f7c59dc586f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?= Date: Tue, 17 Dec 2024 13:27:50 +0100 Subject: [PATCH 3/4] Moved Node method to PlannerUtils --- .../xpack/esql/core/tree/Node.java | 7 ------- .../xpack/esql/planner/PlannerUtils.java | 18 +++++++++++++++++- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/tree/Node.java b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/tree/Node.java index e0a84ae921165..feeba39756373 100644 --- a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/tree/Node.java +++ b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/tree/Node.java @@ -11,7 +11,6 @@ import java.util.ArrayList; import java.util.BitSet; -import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Objects; @@ -86,12 +85,6 @@ public void forEachUp(Consumer action) { action.accept((T) this); } - @SuppressWarnings("unchecked") - public void forEachUp(Consumer action, Function> childrenGetter) { - childrenGetter.apply((T) this).forEach(c -> c.forEachUp(action, childrenGetter)); - action.accept((T) this); - } - @SuppressWarnings("unchecked") public void forEachUp(Class typeToken, Consumer action) { forEachUp(t -> { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java index b11d5d7576a50..a312d048db0ad 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java @@ -22,6 +22,7 @@ import org.elasticsearch.xpack.esql.core.expression.AttributeSet; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.predicate.Predicates; +import org.elasticsearch.xpack.esql.core.tree.Node; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.core.util.Holder; @@ -49,9 +50,12 @@ import org.elasticsearch.xpack.esql.stats.SearchStats; import java.util.ArrayList; +import java.util.Collection; import java.util.LinkedHashSet; import java.util.List; import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Function; import static java.util.Arrays.asList; import static org.elasticsearch.index.mapper.MappedFieldType.FieldExtractPreference.DOC_VALUES; @@ -107,7 +111,7 @@ public static Set planConcreteIndices(PhysicalPlan plan) { } var indices = new LinkedHashSet(); // TODO: This only works for LEFT join, we still need to support RIGHT join - plan.forEachUp(node -> { + forEachUpWithChildren(plan, node -> { if (node instanceof FragmentExec f) { f.fragment().forEachUp(EsRelation.class, r -> indices.addAll(r.index().concreteIndices())); } @@ -115,6 +119,18 @@ public static Set planConcreteIndices(PhysicalPlan plan) { return indices; } + /** + * Similar to {@link Node#forEachUp(Consumer)}, but with a custom callback to get the node children. + */ + private static > void forEachUpWithChildren( + T node, + Consumer action, + Function> childrenGetter + ) { + childrenGetter.apply(node).forEach(c -> forEachUpWithChildren(c, action, childrenGetter)); + action.accept(node); + } + /** * Returns the original indices specified in the FROM command of the query. We need the original query to resolve alias filters. */ From d330a3f573636b8669c4961ebf694446cb869da7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?= Date: Tue, 17 Dec 2024 16:17:35 +0100 Subject: [PATCH 4/4] Post merge capability fix --- .../qa/testFixtures/src/main/resources/lookup-join.csv-spec | 2 +- .../org/elasticsearch/xpack/esql/action/EsqlCapabilities.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec index 2c6ceea0eb289..e75c68f4a379d 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec @@ -183,7 +183,7 @@ language_code:integer | language_name:keyword | country:keyword ; repeatedIndexOnFrom -required_capability: join_lookup_v6 +required_capability: join_lookup_v7 required_capability: join_lookup_repeated_index_from FROM languages_lookup diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index dc7b2ea1c1e08..e60fc3b053363 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -552,7 +552,7 @@ public enum Cap { /** * LOOKUP JOIN with the same index as the FROM */ - JOIN_LOOKUP_REPEATED_INDEX_FROM(JOIN_LOOKUP_V6.isEnabled()), + JOIN_LOOKUP_REPEATED_INDEX_FROM(JOIN_LOOKUP_V7.isEnabled()), /** * Fix for https://github.com/elastic/elasticsearch/issues/117054