diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java index b6134609825a0..9eefa8421b593 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java @@ -297,13 +297,15 @@ private LogicalPlan resolveIndex(UnresolvedRelation plan, IndexResolution indexR var attributes = mappingAsAttributes(plan.source(), esIndex.mapping()); attributes.addAll(plan.metadataFields()); - return new EsRelation( + var esRelation = new EsRelation( plan.source(), esIndex.name(), plan.indexMode(), esIndex.indexNameWithModes(), attributes.isEmpty() ? NO_FIELDS : attributes ); + esRelation.addResolution(esIndex.original(), esIndex.concrete()); + return esRelation; } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/index/EsIndex.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/index/EsIndex.java index cbdd1c05efeba..a4532eda34206 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/index/EsIndex.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/index/EsIndex.java @@ -13,6 +13,7 @@ import org.elasticsearch.xpack.esql.core.type.EsField; import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.Set; @@ -20,9 +21,11 @@ public record EsIndex( String name, /** Map of field names to {@link EsField} instances representing that field */ Map mapping, + /* transient */ Map> original, + /* transient */ Map> concrete, Map indexNameWithModes, /** Fields mapped only in some (but *not* all) indices. Since this is only used by the analyzer, it is not serialized. */ - Set partiallyUnmappedFields + /* transient */ Set partiallyUnmappedFields ) implements Writeable { public EsIndex { @@ -32,14 +35,23 @@ public record EsIndex( } public EsIndex(String name, Map mapping, Map indexNameWithModes) { - this(name, mapping, indexNameWithModes, Set.of()); + this(name, mapping, Map.of(), Map.of(), indexNameWithModes, Set.of()); } /** * Intended for tests. Returns an index with an empty index mode map. */ public EsIndex(String name, Map mapping) { - this(name, mapping, Map.of(), Set.of()); + this(name, mapping, Map.of(), Map.of(), Map.of(), Set.of()); + } + + public EsIndex( + String name, + Map mapping, + Map indexNameWithModes, + Set partiallyUnmappedFields + ) { + this(name, mapping, Map.of(), Map.of(), indexNameWithModes, partiallyUnmappedFields); } public static EsIndex readFrom(StreamInput in) throws IOException { @@ -47,7 +59,7 @@ public static EsIndex readFrom(StreamInput in) throws IOException { Map mapping = in.readImmutableMap(StreamInput::readString, EsField::readFrom); Map indexNameWithModes = in.readMap(IndexMode::readFrom); // partially unmapped fields shouldn't pass the coordinator node anyway, since they are only used by the Analyzer. - return new EsIndex(name, mapping, indexNameWithModes, Set.of()); + return new EsIndex(name, mapping, Map.of(), Map.of(), indexNameWithModes, Set.of()); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/EsRelation.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/EsRelation.java index fd085d611cd94..84ff4ae1c6b37 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/EsRelation.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/EsRelation.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -40,6 +41,23 @@ public class EsRelation extends LeafPlan { private final Map indexNameWithModes; private final List attrs; + // TODO serialize + private final transient Map> original = new HashMap<>(); + private final transient Map> concrete = new HashMap<>(); + + public void addResolution(Map> original, Map> concrete) { + this.original.putAll(original); + this.concrete.putAll(concrete); + } + + public Map> getOriginal() { + return original; + } + + public Map> getConcrete() { + return concrete; + } + public EsRelation(Source source, EsIndex index, IndexMode indexMode) { this(source, index.name(), indexMode, index.indexNameWithModes(), flatten(source, index.mapping())); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java index 2ea6c3c8f5ed3..539d969e0484f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java @@ -8,7 +8,6 @@ package org.elasticsearch.xpack.esql.planner; import org.elasticsearch.TransportVersion; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.breaker.NoopCircuitBreaker; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.compute.aggregation.AggregatorMode; @@ -61,14 +60,12 @@ import org.elasticsearch.xpack.esql.stats.SearchStats; import java.util.ArrayList; -import java.util.LinkedHashSet; import java.util.List; import java.util.Set; import java.util.function.Consumer; import java.util.function.Predicate; import java.util.stream.Collectors; -import static java.util.Arrays.asList; import static org.elasticsearch.index.mapper.MappedFieldType.FieldExtractPreference.DOC_VALUES; import static org.elasticsearch.index.mapper.MappedFieldType.FieldExtractPreference.EXTRACT_SPATIAL_BOUNDS; import static org.elasticsearch.index.mapper.MappedFieldType.FieldExtractPreference.NONE; @@ -155,30 +152,6 @@ private static ReducedPlan getPhysicalPlanReduction(int estimatedRowSize, Physic return new ReducedPlan(EstimatesRowSize.estimateRowSize(estimatedRowSize, plan)); } - /** - * Returns a set of concrete indices after resolving the original indices specified in the FROM command. - */ - public static Set planConcreteIndices(PhysicalPlan plan) { - if (plan == null) { - return Set.of(); - } - var indices = new LinkedHashSet(); - forEachRelation(plan, relation -> indices.addAll(relation.concreteIndices())); - return indices; - } - - /** - * Returns the original indices specified in the FROM command of the query. We need the original query to resolve alias filters. - */ - public static String[] planOriginalIndices(PhysicalPlan plan) { - if (plan == null) { - return Strings.EMPTY_ARRAY; - } - var indices = new LinkedHashSet(); - forEachRelation(plan, relation -> indices.addAll(asList(Strings.commaDelimitedListToStringArray(relation.indexPattern())))); - return indices.toArray(String[]::new); - } - public static boolean requiresSortedTimeSeriesSource(PhysicalPlan plan) { return plan.anyMatch(e -> { if (e instanceof FragmentExec f) { @@ -188,7 +161,7 @@ public static boolean requiresSortedTimeSeriesSource(PhysicalPlan plan) { }); } - private static void forEachRelation(PhysicalPlan plan, Consumer action) { + public static void forEachRelation(PhysicalPlan plan, Consumer action) { plan.forEachDown(FragmentExec.class, f -> f.fragment().forEachDown(EsRelation.class, r -> { if (r.indexMode() != IndexMode.LOOKUP) { action.accept(r); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeRequest.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeRequest.java index 28cef1bcab8ce..243e3faca5b5a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeRequest.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeRequest.java @@ -103,6 +103,11 @@ public IndicesOptions indicesOptions() { return plan.originalIndices().indicesOptions(); } + @Override + public boolean allowsCrossProject() { + return true; + } + @Override public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { if (parentTaskId.isSet() == false) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 1fc52270a60d2..bd6672c3bc03a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -12,10 +12,12 @@ import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.RemoteException; import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.Maps; import org.elasticsearch.common.util.concurrent.RunOnce; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.Page; @@ -67,6 +69,7 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -325,8 +328,20 @@ public void executePlan( listener.onFailure(new IllegalStateException("expected data node plan starts with an ExchangeSink; got " + dataNodePlan)); return; } - Map clusterToConcreteIndices = transportService.getRemoteClusterService() - .groupIndices(SearchRequest.DEFAULT_INDICES_OPTIONS, PlannerUtils.planConcreteIndices(physicalPlan).toArray(String[]::new)); + + var options = IndicesOptions.builder(SearchRequest.DEFAULT_INDICES_OPTIONS) + .crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true)) + .build(); + + Map clusterToConcreteIndices = new HashMap<>(); + PlannerUtils.forEachRelation(physicalPlan, relation -> { + // TODO do we need to merge entries here? + clusterToConcreteIndices.putAll( + Maps.transformValues(relation.getConcrete(), v -> new OriginalIndices(v.toArray(String[]::new), options)) + ); + }); + LOGGER.info("--> Concrete indices {}", clusterToConcreteIndices); + QueryPragmas queryPragmas = configuration.pragmas(); Runnable cancelQueryOnFailure = cancelQueryOnFailure(rootTask); if (dataNodePlan == null) { @@ -369,8 +384,16 @@ public void executePlan( return; } } - Map clusterToOriginalIndices = transportService.getRemoteClusterService() - .groupIndices(SearchRequest.DEFAULT_INDICES_OPTIONS, PlannerUtils.planOriginalIndices(physicalPlan)); + + Map clusterToOriginalIndices = new HashMap<>(); + PlannerUtils.forEachRelation(physicalPlan, relation -> { + // TODO do we need to merge entries here? + clusterToOriginalIndices.putAll( + Maps.transformValues(relation.getOriginal(), v -> new OriginalIndices(v.toArray(String[]::new), options)) + ); + }); + LOGGER.info("--> Original indices {}", clusterToOriginalIndices); + var localOriginalIndices = clusterToOriginalIndices.remove(LOCAL_CLUSTER); var localConcreteIndices = clusterToConcreteIndices.remove(LOCAL_CLUSTER); /* diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index c6855a9a3a43f..414ba7452dfa5 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -14,6 +14,7 @@ import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.support.SubscribableListener; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.TriConsumer; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.unit.ByteSizeValue; @@ -803,20 +804,42 @@ private void preAnalyzeMainIndices( QueryBuilder requestFilter, ActionListener listener ) { - EsqlCCSUtils.initCrossClusterState( - indicesExpressionGrouper, - verifier.licenseState(), - preAnalysis.indexes().keySet(), - executionInfo - ); - // The main index pattern dictates on which nodes the query can be executed, - // so we use the minimum transport version from this field caps request. - forAll( - preAnalysis.indexes().entrySet().iterator(), - result, - (entry, r, l) -> preAnalyzeMainIndices(entry.getKey(), entry.getValue(), preAnalysis, executionInfo, r, requestFilter, l), - listener + assert ThreadPool.assertCurrentThreadPool( + ThreadPool.Names.SEARCH, + ThreadPool.Names.SEARCH_COORDINATION, + ThreadPool.Names.SYSTEM_READ ); + if (crossProjectModeDecider.crossProjectEnabled() == false) { + EsqlCCSUtils.initCrossClusterState( + indicesExpressionGrouper, + verifier.licenseState(), + preAnalysis.indexes().keySet(), + executionInfo + ); + // The main index pattern dictates on which nodes the query can be executed, + // so we use the minimum transport version from this field caps request. + forAll( + preAnalysis.indexes().entrySet().iterator(), + result, + (entry, r, l) -> preAnalyzeMainIndices(entry.getKey(), entry.getValue(), preAnalysis, executionInfo, r, requestFilter, l), + listener + ); + } else { + forAll( + preAnalysis.indexes().entrySet().iterator(), + result, + (entry, r, l) -> preAnalyzeFlatMainIndices( + entry.getKey(), + entry.getValue(), + preAnalysis, + executionInfo, + r, + requestFilter, + l + ), + listener + ); + } } private void preAnalyzeMainIndices( @@ -828,11 +851,6 @@ private void preAnalyzeMainIndices( QueryBuilder requestFilter, ActionListener listener ) { - assert ThreadPool.assertCurrentThreadPool( - ThreadPool.Names.SEARCH, - ThreadPool.Names.SEARCH_COORDINATION, - ThreadPool.Names.SYSTEM_READ - ); if (executionInfo.clusterAliases().isEmpty()) { // return empty resolution if the expression is pure CCS and resolved no remote clusters (like no-such-cluster*:index) listener.onResponse(result.withIndices(indexPattern, IndexResolution.empty(indexPattern.indexPattern()))); @@ -864,6 +882,50 @@ private void preAnalyzeMainIndices( } } + private void preAnalyzeFlatMainIndices( + IndexPattern indexPattern, + IndexMode indexMode, + PreAnalyzer.PreAnalysis preAnalysis, + EsqlExecutionInfo executionInfo, + PreAnalysisResult result, + QueryBuilder requestFilter, + ActionListener listener + ) { + indexResolver.resolveFlatIndicesVersioned( + indexPattern.indexPattern(), + result.fieldNames, + // Maybe if no indices are returned, retry without index mode and provide a clearer error message. + switch (indexMode) { + case IndexMode.TIME_SERIES -> { + var indexModeFilter = new TermQueryBuilder(IndexModeFieldMapper.NAME, IndexMode.TIME_SERIES.getName()); + yield requestFilter != null ? new BoolQueryBuilder().filter(requestFilter).filter(indexModeFilter) : indexModeFilter; + } + default -> requestFilter; + }, + indexMode == IndexMode.TIME_SERIES, + preAnalysis.useAggregateMetricDoubleWhenNotSupported(), + preAnalysis.useDenseVectorWhenNotSupported(), + listener.delegateFailureAndWrap((l, indexResolution) -> { + LOGGER.info("--> Resolved indices {}", indexResolution); + + indexResolution.inner().get().original().forEach((clusterAlias, indices) -> { + executionInfo.swapCluster(clusterAlias, (k, v) -> { + var indexExpr = Strings.collectionToCommaDelimitedString(indices); + if (v != null) { + indexExpr = v.getIndexExpression() + "," + indexExpr; + } + return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.shouldSkipOnFailure(clusterAlias)); + }); + }); + + EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.inner().failures()); + l.onResponse( + result.withIndices(indexPattern, indexResolution.inner()).withMinimumTransportVersion(indexResolution.minimumVersion()) + ); + }) + ); + } + private void analyzeWithRetry( LogicalPlan parsed, Configuration configuration, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java index 26d9f67925a33..60da3fac16124 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java @@ -14,6 +14,7 @@ import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse; import org.elasticsearch.action.fieldcaps.IndexFieldCapabilities; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.IndicesOptions.CrossProjectModeOptions; import org.elasticsearch.client.internal.Client; import org.elasticsearch.common.Strings; import org.elasticsearch.common.util.Maps; @@ -24,6 +25,7 @@ import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.xpack.esql.action.EsqlResolveFieldsAction; import org.elasticsearch.xpack.esql.action.EsqlResolveFieldsResponse; import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute; @@ -77,6 +79,22 @@ public class IndexResolver { ) .build(); + public static final IndicesOptions FIELD_CAPS_FLAT_INDICES_OPTIONS = IndicesOptions.builder() + .concreteTargetOptions(IndicesOptions.ConcreteTargetOptions.ERROR_WHEN_UNAVAILABLE_TARGETS) + .wildcardOptions( + IndicesOptions.WildcardOptions.builder() + .matchOpen(true) + .matchClosed(false) + .includeHidden(false) + .allowEmptyExpressions(true) + .resolveAliases(true) + ) + .gatekeeperOptions( + IndicesOptions.GatekeeperOptions.builder().ignoreThrottled(true).allowClosedIndices(true).allowAliasToMultipleIndices(true) + ) + .crossProjectModeOptions(new CrossProjectModeOptions(true)) + .build(); + private final Client client; public IndexResolver(Client client) { @@ -136,6 +154,31 @@ public void resolveIndicesVersioned( ); } + public void resolveFlatIndicesVersioned( + String indexWildcard, + Set fieldNames, + QueryBuilder requestFilter, + boolean includeAllDimensions, + boolean useAggregateMetricDoubleWhenNotSupported, + boolean useDenseVectorWhenNotSupported, + ActionListener> listener + ) { + var request = createFieldCapsRequest(indexWildcard, fieldNames, requestFilter, includeAllDimensions); + request.includeResolvedTo(true); + request.indicesOptions(FIELD_CAPS_FLAT_INDICES_OPTIONS); + client.execute(EsqlResolveFieldsAction.TYPE, request, listener.delegateFailureAndWrap((l, response) -> { + FieldsInfo info = new FieldsInfo( + response.caps(), + response.caps().minTransportVersion(), + Build.current().isSnapshot(), + useAggregateMetricDoubleWhenNotSupported, + useDenseVectorWhenNotSupported + ); + LOGGER.debug("minimum transport version {} {}", response.caps().minTransportVersion(), info.effectiveMinTransportVersion()); + l.onResponse(new Versioned<>(mergedMappings(indexWildcard, info), info.effectiveMinTransportVersion())); + })); + } + /** * Information for resolving a field. * @param caps {@link FieldCapabilitiesResponse} from all indices involved in the query @@ -266,13 +309,48 @@ public static IndexResolution mergedMappings(String indexPattern, FieldsInfo fie for (FieldCapabilitiesIndexResponse ir : fieldsInfo.caps.getIndexResponses()) { allEmpty &= ir.get().isEmpty(); } + + var original = new HashMap>(); + var concrete = new HashMap>(); + + for (var expression : fieldsInfo.caps.getResolvedLocally().expressions()) { + if (expression.localExpressions().indices().isEmpty() == false) { + original.computeIfAbsent(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, k -> new ArrayList<>()).add(expression.original()); + concrete.computeIfAbsent(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, k -> new ArrayList<>()) + .addAll(expression.localExpressions().indices()); + } + } + for (var entry : fieldsInfo.caps.getResolvedRemotely().entrySet()) { + var remote = entry.getKey(); + for (var expression : entry.getValue().expressions()) { + if (expression.localExpressions().indices().isEmpty() == false) { + original.computeIfAbsent(remote, k -> new ArrayList<>()).add(expression.original()); + concrete.computeIfAbsent(remote, k -> new ArrayList<>()).addAll(expression.localExpressions().indices()); + } + } + } + + LOGGER.info( + "--->Resolution info\nlocal {}\nremote {}", + fieldsInfo.caps.getResolvedLocally(), + fieldsInfo.caps.getResolvedRemotely() + ); + LOGGER.info("--->Indices: \noriginal: {}\nconcrete: {}", original, concrete); + // If all the mappings are empty we return an empty set of resolved indices to line up with QL // Introduced with #46775 // We need to be able to differentiate between an empty mapping index and an empty index due to fields not being found. An empty // mapping index will generate no columns (important) for a query like FROM empty-mapping-index, whereas an empty result here but // for fields that do not exist in the index (but the index has a mapping) will result in "VerificationException Unknown column" // errors. - var index = new EsIndex(indexPattern, rootFields, allEmpty ? Map.of() : concreteIndices, partiallyUnmappedFields); + var index = new EsIndex( + indexPattern, + rootFields, + original, + concrete, + allEmpty ? Map.of() : concreteIndices, + partiallyUnmappedFields + ); var failures = EsqlCCSUtils.groupFailuresPerCluster(fieldsInfo.caps.getFailures()); return IndexResolution.valid(index, concreteIndices.keySet(), failures); }