Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -297,13 +297,15 @@ private LogicalPlan resolveIndex(UnresolvedRelation plan, IndexResolution indexR

var attributes = mappingAsAttributes(plan.source(), esIndex.mapping());
attributes.addAll(plan.metadataFields());
return new EsRelation(
var esRelation = new EsRelation(
plan.source(),
esIndex.name(),
plan.indexMode(),
esIndex.indexNameWithModes(),
attributes.isEmpty() ? NO_FIELDS : attributes
);
esRelation.addResolution(esIndex.original(), esIndex.concrete());
return esRelation;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@
import org.elasticsearch.xpack.esql.core.type.EsField;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;

public record EsIndex(
String name,
/** Map of field names to {@link EsField} instances representing that field */
Map<String, EsField> mapping,
/* transient */ Map<String, List<String>> original,
/* transient */ Map<String, List<String>> concrete,
Map<String, IndexMode> indexNameWithModes,
/** Fields mapped only in some (but *not* all) indices. Since this is only used by the analyzer, it is not serialized. */
Set<String> partiallyUnmappedFields
/* transient */ Set<String> partiallyUnmappedFields
) implements Writeable {

public EsIndex {
Expand All @@ -32,22 +35,31 @@ public record EsIndex(
}

public EsIndex(String name, Map<String, EsField> mapping, Map<String, IndexMode> indexNameWithModes) {
this(name, mapping, indexNameWithModes, Set.of());
this(name, mapping, Map.of(), Map.of(), indexNameWithModes, Set.of());
}

/**
* Intended for tests. Returns an index with an empty index mode map.
*/
public EsIndex(String name, Map<String, EsField> mapping) {
this(name, mapping, Map.of(), Set.of());
this(name, mapping, Map.of(), Map.of(), Map.of(), Set.of());
}

public EsIndex(
String name,
Map<String, EsField> mapping,
Map<String, IndexMode> indexNameWithModes,
Set<String> partiallyUnmappedFields
) {
this(name, mapping, Map.of(), Map.of(), indexNameWithModes, partiallyUnmappedFields);
}

public static EsIndex readFrom(StreamInput in) throws IOException {
String name = in.readString();
Map<String, EsField> mapping = in.readImmutableMap(StreamInput::readString, EsField::readFrom);
Map<String, IndexMode> indexNameWithModes = in.readMap(IndexMode::readFrom);
// partially unmapped fields shouldn't pass the coordinator node anyway, since they are only used by the Analyzer.
return new EsIndex(name, mapping, indexNameWithModes, Set.of());
return new EsIndex(name, mapping, Map.of(), Map.of(), indexNameWithModes, Set.of());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand All @@ -40,6 +41,23 @@ public class EsRelation extends LeafPlan {
private final Map<String, IndexMode> indexNameWithModes;
private final List<Attribute> attrs;

// TODO serialize
private final transient Map<String, List<String>> original = new HashMap<>();
private final transient Map<String, List<String>> concrete = new HashMap<>();

public void addResolution(Map<String, List<String>> original, Map<String, List<String>> concrete) {
this.original.putAll(original);
this.concrete.putAll(concrete);
}

public Map<String, List<String>> getOriginal() {
return original;
}

public Map<String, List<String>> getConcrete() {
return concrete;
}

public EsRelation(Source source, EsIndex index, IndexMode indexMode) {
this(source, index.name(), indexMode, index.indexNameWithModes(), flatten(source, index.mapping()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
package org.elasticsearch.xpack.esql.planner;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.compute.aggregation.AggregatorMode;
Expand Down Expand Up @@ -61,14 +60,12 @@
import org.elasticsearch.xpack.esql.stats.SearchStats;

import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static java.util.Arrays.asList;
import static org.elasticsearch.index.mapper.MappedFieldType.FieldExtractPreference.DOC_VALUES;
import static org.elasticsearch.index.mapper.MappedFieldType.FieldExtractPreference.EXTRACT_SPATIAL_BOUNDS;
import static org.elasticsearch.index.mapper.MappedFieldType.FieldExtractPreference.NONE;
Expand Down Expand Up @@ -155,30 +152,6 @@ private static ReducedPlan getPhysicalPlanReduction(int estimatedRowSize, Physic
return new ReducedPlan(EstimatesRowSize.estimateRowSize(estimatedRowSize, plan));
}

/**
* Returns a set of concrete indices after resolving the original indices specified in the FROM command.
*/
public static Set<String> planConcreteIndices(PhysicalPlan plan) {
if (plan == null) {
return Set.of();
}
var indices = new LinkedHashSet<String>();
forEachRelation(plan, relation -> indices.addAll(relation.concreteIndices()));
return indices;
}

/**
* Returns the original indices specified in the FROM command of the query. We need the original query to resolve alias filters.
*/
public static String[] planOriginalIndices(PhysicalPlan plan) {
if (plan == null) {
return Strings.EMPTY_ARRAY;
}
var indices = new LinkedHashSet<String>();
forEachRelation(plan, relation -> indices.addAll(asList(Strings.commaDelimitedListToStringArray(relation.indexPattern()))));
return indices.toArray(String[]::new);
}

public static boolean requiresSortedTimeSeriesSource(PhysicalPlan plan) {
return plan.anyMatch(e -> {
if (e instanceof FragmentExec f) {
Expand All @@ -188,7 +161,7 @@ public static boolean requiresSortedTimeSeriesSource(PhysicalPlan plan) {
});
}

private static void forEachRelation(PhysicalPlan plan, Consumer<EsRelation> action) {
public static void forEachRelation(PhysicalPlan plan, Consumer<EsRelation> action) {
plan.forEachDown(FragmentExec.class, f -> f.fragment().forEachDown(EsRelation.class, r -> {
if (r.indexMode() != IndexMode.LOOKUP) {
action.accept(r);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ public IndicesOptions indicesOptions() {
return plan.originalIndices().indicesOptions();
}

@Override
public boolean allowsCrossProject() {
return true;
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
if (parentTaskId.isSet() == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.RemoteException;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.Page;
Expand Down Expand Up @@ -67,6 +69,7 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -325,8 +328,20 @@ public void executePlan(
listener.onFailure(new IllegalStateException("expected data node plan starts with an ExchangeSink; got " + dataNodePlan));
return;
}
Map<String, OriginalIndices> clusterToConcreteIndices = transportService.getRemoteClusterService()
.groupIndices(SearchRequest.DEFAULT_INDICES_OPTIONS, PlannerUtils.planConcreteIndices(physicalPlan).toArray(String[]::new));

var options = IndicesOptions.builder(SearchRequest.DEFAULT_INDICES_OPTIONS)
.crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true))
.build();

Map<String, OriginalIndices> clusterToConcreteIndices = new HashMap<>();
PlannerUtils.forEachRelation(physicalPlan, relation -> {
// TODO do we need to merge entries here?
clusterToConcreteIndices.putAll(
Maps.transformValues(relation.getConcrete(), v -> new OriginalIndices(v.toArray(String[]::new), options))
);
});
LOGGER.info("--> Concrete indices {}", clusterToConcreteIndices);

QueryPragmas queryPragmas = configuration.pragmas();
Runnable cancelQueryOnFailure = cancelQueryOnFailure(rootTask);
if (dataNodePlan == null) {
Expand Down Expand Up @@ -369,8 +384,16 @@ public void executePlan(
return;
}
}
Map<String, OriginalIndices> clusterToOriginalIndices = transportService.getRemoteClusterService()
.groupIndices(SearchRequest.DEFAULT_INDICES_OPTIONS, PlannerUtils.planOriginalIndices(physicalPlan));

Map<String, OriginalIndices> clusterToOriginalIndices = new HashMap<>();
PlannerUtils.forEachRelation(physicalPlan, relation -> {
// TODO do we need to merge entries here?
clusterToOriginalIndices.putAll(
Maps.transformValues(relation.getOriginal(), v -> new OriginalIndices(v.toArray(String[]::new), options))
);
});
LOGGER.info("--> Original indices {}", clusterToOriginalIndices);

var localOriginalIndices = clusterToOriginalIndices.remove(LOCAL_CLUSTER);
var localConcreteIndices = clusterToConcreteIndices.remove(LOCAL_CLUSTER);
/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.TriConsumer;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.unit.ByteSizeValue;
Expand Down Expand Up @@ -803,20 +804,42 @@ private void preAnalyzeMainIndices(
QueryBuilder requestFilter,
ActionListener<PreAnalysisResult> listener
) {
EsqlCCSUtils.initCrossClusterState(
indicesExpressionGrouper,
verifier.licenseState(),
preAnalysis.indexes().keySet(),
executionInfo
);
// The main index pattern dictates on which nodes the query can be executed,
// so we use the minimum transport version from this field caps request.
forAll(
preAnalysis.indexes().entrySet().iterator(),
result,
(entry, r, l) -> preAnalyzeMainIndices(entry.getKey(), entry.getValue(), preAnalysis, executionInfo, r, requestFilter, l),
listener
assert ThreadPool.assertCurrentThreadPool(
ThreadPool.Names.SEARCH,
ThreadPool.Names.SEARCH_COORDINATION,
ThreadPool.Names.SYSTEM_READ
);
if (crossProjectModeDecider.crossProjectEnabled() == false) {
EsqlCCSUtils.initCrossClusterState(
indicesExpressionGrouper,
verifier.licenseState(),
preAnalysis.indexes().keySet(),
executionInfo
);
// The main index pattern dictates on which nodes the query can be executed,
// so we use the minimum transport version from this field caps request.
forAll(
preAnalysis.indexes().entrySet().iterator(),
result,
(entry, r, l) -> preAnalyzeMainIndices(entry.getKey(), entry.getValue(), preAnalysis, executionInfo, r, requestFilter, l),
listener
);
} else {
forAll(
preAnalysis.indexes().entrySet().iterator(),
result,
(entry, r, l) -> preAnalyzeFlatMainIndices(
entry.getKey(),
entry.getValue(),
preAnalysis,
executionInfo,
r,
requestFilter,
l
),
listener
);
}
}

private void preAnalyzeMainIndices(
Expand All @@ -828,11 +851,6 @@ private void preAnalyzeMainIndices(
QueryBuilder requestFilter,
ActionListener<PreAnalysisResult> listener
) {
assert ThreadPool.assertCurrentThreadPool(
ThreadPool.Names.SEARCH,
ThreadPool.Names.SEARCH_COORDINATION,
ThreadPool.Names.SYSTEM_READ
);
if (executionInfo.clusterAliases().isEmpty()) {
// return empty resolution if the expression is pure CCS and resolved no remote clusters (like no-such-cluster*:index)
listener.onResponse(result.withIndices(indexPattern, IndexResolution.empty(indexPattern.indexPattern())));
Expand Down Expand Up @@ -864,6 +882,50 @@ private void preAnalyzeMainIndices(
}
}

private void preAnalyzeFlatMainIndices(
IndexPattern indexPattern,
IndexMode indexMode,
PreAnalyzer.PreAnalysis preAnalysis,
EsqlExecutionInfo executionInfo,
PreAnalysisResult result,
QueryBuilder requestFilter,
ActionListener<PreAnalysisResult> listener
) {
indexResolver.resolveFlatIndicesVersioned(
indexPattern.indexPattern(),
result.fieldNames,
// Maybe if no indices are returned, retry without index mode and provide a clearer error message.
switch (indexMode) {
case IndexMode.TIME_SERIES -> {
var indexModeFilter = new TermQueryBuilder(IndexModeFieldMapper.NAME, IndexMode.TIME_SERIES.getName());
yield requestFilter != null ? new BoolQueryBuilder().filter(requestFilter).filter(indexModeFilter) : indexModeFilter;
}
default -> requestFilter;
},
indexMode == IndexMode.TIME_SERIES,
preAnalysis.useAggregateMetricDoubleWhenNotSupported(),
preAnalysis.useDenseVectorWhenNotSupported(),
listener.delegateFailureAndWrap((l, indexResolution) -> {
LOGGER.info("--> Resolved indices {}", indexResolution);

indexResolution.inner().get().original().forEach((clusterAlias, indices) -> {
executionInfo.swapCluster(clusterAlias, (k, v) -> {
var indexExpr = Strings.collectionToCommaDelimitedString(indices);
if (v != null) {
indexExpr = v.getIndexExpression() + "," + indexExpr;
}
return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.shouldSkipOnFailure(clusterAlias));
});
});

EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.inner().failures());
l.onResponse(
result.withIndices(indexPattern, indexResolution.inner()).withMinimumTransportVersion(indexResolution.minimumVersion())
);
})
);
}

private void analyzeWithRetry(
LogicalPlan parsed,
Configuration configuration,
Expand Down
Loading