diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec index 8bcc2c2ff3502..e75c68f4a379d 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec @@ -182,6 +182,22 @@ language_code:integer | language_name:keyword | country:keyword 2 | [German, German, German] | [Austria, Germany, Switzerland] ; +repeatedIndexOnFrom +required_capability: join_lookup_v7 +required_capability: join_lookup_repeated_index_from + +FROM languages_lookup +| LOOKUP JOIN languages_lookup ON language_code +| SORT language_code +; + +language_code:integer | language_name:keyword +1 | English +2 | French +3 | Spanish +4 | German +; + ############################################### # Filtering tests with languages_lookup index ############################################### @@ -1061,4 +1077,3 @@ ignoreOrder:true 2023-10-23T12:27:28.948Z | 172.21.2.113 | 2764889 | QA | null 2023-10-23T12:15:03.360Z | 172.21.2.162 | 3450233 | QA | null ; - diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index 4fcabb02b2d4f..e60fc3b053363 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -549,6 +549,11 @@ public enum Cap { */ JOIN_LOOKUP_V7(Build.current().isSnapshot()), + /** + * LOOKUP JOIN with the same index as the FROM + */ + JOIN_LOOKUP_REPEATED_INDEX_FROM(JOIN_LOOKUP_V7.isEnabled()), + /** * Fix for https://github.com/elastic/elasticsearch/issues/117054 */ diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java index 37f89891860d8..a312d048db0ad 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java @@ -22,6 +22,7 @@ import org.elasticsearch.xpack.esql.core.expression.AttributeSet; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.predicate.Predicates; +import org.elasticsearch.xpack.esql.core.tree.Node; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.core.util.Holder; @@ -40,6 +41,7 @@ import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec; import org.elasticsearch.xpack.esql.plan.physical.ExchangeSourceExec; import org.elasticsearch.xpack.esql.plan.physical.FragmentExec; +import org.elasticsearch.xpack.esql.plan.physical.LookupJoinExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.planner.mapper.LocalMapper; import org.elasticsearch.xpack.esql.planner.mapper.Mapper; @@ -48,9 +50,12 @@ import org.elasticsearch.xpack.esql.stats.SearchStats; import java.util.ArrayList; +import java.util.Collection; import java.util.LinkedHashSet; import java.util.List; import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Function; import static java.util.Arrays.asList; import static org.elasticsearch.index.mapper.MappedFieldType.FieldExtractPreference.DOC_VALUES; @@ -105,10 +110,27 @@ public static Set planConcreteIndices(PhysicalPlan plan) { return Set.of(); } var indices = new LinkedHashSet(); - plan.forEachUp(FragmentExec.class, f -> f.fragment().forEachUp(EsRelation.class, r -> indices.addAll(r.index().concreteIndices()))); + // TODO: This only works for LEFT join, we still need to support RIGHT join + forEachUpWithChildren(plan, node -> { + if (node instanceof FragmentExec f) { + f.fragment().forEachUp(EsRelation.class, r -> indices.addAll(r.index().concreteIndices())); + } + }, node -> node instanceof LookupJoinExec join ? List.of(join.left()) : node.children()); return indices; } + /** + * Similar to {@link Node#forEachUp(Consumer)}, but with a custom callback to get the node children. + */ + private static > void forEachUpWithChildren( + T node, + Consumer action, + Function> childrenGetter + ) { + childrenGetter.apply(node).forEach(c -> forEachUpWithChildren(c, action, childrenGetter)); + action.accept(node); + } + /** * Returns the original indices specified in the FROM command of the query. We need the original query to resolve alias filters. */ diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 9b59b98a7cdc2..e77a2443df2dd 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -63,12 +63,8 @@ import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.enrich.EnrichLookupService; import org.elasticsearch.xpack.esql.enrich.LookupFromIndexService; -import org.elasticsearch.xpack.esql.plan.logical.EsRelation; -import org.elasticsearch.xpack.esql.plan.logical.join.Join; import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec; import org.elasticsearch.xpack.esql.plan.physical.ExchangeSourceExec; -import org.elasticsearch.xpack.esql.plan.physical.FragmentExec; -import org.elasticsearch.xpack.esql.plan.physical.LookupJoinExec; import org.elasticsearch.xpack.esql.plan.physical.OutputExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders; @@ -81,7 +77,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -167,11 +162,9 @@ public void execute( Map clusterToConcreteIndices = transportService.getRemoteClusterService() .groupIndices(SearchRequest.DEFAULT_INDICES_OPTIONS, PlannerUtils.planConcreteIndices(physicalPlan).toArray(String[]::new)); QueryPragmas queryPragmas = configuration.pragmas(); - Set lookupIndexNames = findLookupIndexNames(physicalPlan); - Set concreteIndexNames = selectConcreteIndices(clusterToConcreteIndices, lookupIndexNames); if (dataNodePlan == null) { - if (concreteIndexNames.isEmpty() == false) { - String error = "expected no concrete indices without data node plan; got " + concreteIndexNames; + if (clusterToConcreteIndices.values().stream().allMatch(v -> v.indices().length == 0) == false) { + String error = "expected no concrete indices without data node plan; got " + clusterToConcreteIndices; assert false : error; listener.onFailure(new IllegalStateException(error)); return; @@ -194,7 +187,7 @@ public void execute( return; } } else { - if (concreteIndexNames.isEmpty()) { + if (clusterToConcreteIndices.values().stream().allMatch(v -> v.indices().length == 0)) { var error = "expected concrete indices with data node plan but got empty; data node plan " + dataNodePlan; assert false : error; listener.onFailure(new IllegalStateException(error)); @@ -268,42 +261,6 @@ public void execute( } } - private Set selectConcreteIndices(Map clusterToConcreteIndices, Set indexesToIgnore) { - Set concreteIndexNames = new HashSet<>(); - clusterToConcreteIndices.forEach((clusterAlias, concreteIndices) -> { - for (String index : concreteIndices.indices()) { - if (indexesToIgnore.contains(index) == false) { - concreteIndexNames.add(index); - } - } - }); - return concreteIndexNames; - } - - private Set findLookupIndexNames(PhysicalPlan physicalPlan) { - Set lookupIndexNames = new HashSet<>(); - // When planning JOIN on the coordinator node: "LookupJoinExec.lookup()->FragmentExec.fragment()->EsRelation.index()" - physicalPlan.forEachDown( - LookupJoinExec.class, - lookupJoinExec -> lookupJoinExec.lookup() - .forEachDown( - FragmentExec.class, - frag -> frag.fragment().forEachDown(EsRelation.class, esRelation -> lookupIndexNames.add(esRelation.index().name())) - ) - ); - // When planning JOIN on the data node: "FragmentExec.fragment()->Join.right()->EsRelation.index()" - // TODO this only works for LEFT join, so we still need to support RIGHT join - physicalPlan.forEachDown( - FragmentExec.class, - fragmentExec -> fragmentExec.fragment() - .forEachDown( - Join.class, - join -> join.right().forEachDown(EsRelation.class, esRelation -> lookupIndexNames.add(esRelation.index().name())) - ) - ); - return lookupIndexNames; - } - // For queries like: FROM logs* | LIMIT 0 (including cross-cluster LIMIT 0 queries) private static void updateShardCountForCoordinatorOnlyQuery(EsqlExecutionInfo execInfo) { if (execInfo.isCrossClusterSearch()) {