Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
package org.elasticsearch.xpack.esql.execution;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.indices.IndicesExpressionGrouper;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.telemetry.metric.MeterRegistry;
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
Expand Down Expand Up @@ -76,7 +75,6 @@ public void esql(
FoldContext foldContext,
EnrichPolicyResolver enrichPolicyResolver,
EsqlExecutionInfo executionInfo,
IndicesExpressionGrouper indicesExpressionGrouper,
EsqlSession.PlanRunner planRunner,
TransportActionServices services,
ActionListener<Result> listener
Expand All @@ -94,7 +92,6 @@ public void esql(
mapper,
verifier,
planTelemetry,
indicesExpressionGrouper,
services
);
QueryMetric clientId = QueryMetric.fromString("rest");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ public static IndexResolution invalid(String invalid) {
return new IndexResolution(null, invalid, Set.of(), Map.of());
}

public static IndexResolution empty(String indexPattern) {
return valid(new EsIndex(indexPattern, Map.of(), Map.of()));
}

public static IndexResolution notFound(String name) {
Objects.requireNonNull(name, "name must not be null");
return invalid("Unknown index [" + name + "]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,6 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener<Es
foldCtx,
enrichPolicyResolver,
executionInfo,
remoteClusterService,
planRunner,
services,
ActionListener.wrap(result -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,23 @@
package org.elasticsearch.xpack.esql.session;

import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.Strings;
import org.elasticsearch.compute.operator.DriverCompletionInfo;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.indices.IndicesExpressionGrouper;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.NoSuchRemoteClusterException;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.xpack.esql.VerificationException;
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo.Cluster;
import org.elasticsearch.xpack.esql.analysis.Analyzer;
import org.elasticsearch.xpack.esql.index.IndexResolution;
import org.elasticsearch.xpack.esql.plan.IndexPattern;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;

import java.util.ArrayList;
Expand All @@ -39,8 +34,11 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Stream;

import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.toSet;

public class EsqlCCSUtils {
Expand Down Expand Up @@ -302,52 +300,34 @@ static void updateExecutionInfoAtEndOfPlanning(EsqlExecutionInfo execInfo) {
}

/**
* Checks the index expression for the presence of remote clusters.
* Checks resolved indices for the presence of remote clusters.
* If found, it will ensure that the caller has a valid Enterprise (or Trial) license on the querying cluster
* as well as initialize corresponding cluster state in execution info.
* @throws org.elasticsearch.ElasticsearchStatusException if the license is not valid (or present) for ES|QL CCS search.
*/
public static void initCrossClusterState(
IndicesExpressionGrouper indicesGrouper,
XPackLicenseState licenseState,
IndexPattern indexPattern,
EsqlExecutionInfo executionInfo
) throws ElasticsearchStatusException {
if (indexPattern == null) {
return;
}
IndexResolution resolvedIndices,
EsqlExecutionInfo executionInfo,
XPackLicenseState licenseState
) {
executionInfo.clusterInfoInitializing(true);
try {
var groupedIndices = indicesGrouper.groupIndices(
IndicesOptions.DEFAULT,
Strings.splitStringByCommaToArray(indexPattern.indexPattern()),
false
);

executionInfo.clusterInfoInitializing(true);
// initialize the cluster entries in EsqlExecutionInfo before throwing the invalid license error
// so that the CCS telemetry handler can recognize that this error is CCS-related
try {
for (var entry : groupedIndices.entrySet()) {
final String clusterAlias = entry.getKey();
final String indexExpr = Strings.arrayToCommaDelimitedString(entry.getValue().indices());
executionInfo.swapCluster(clusterAlias, (k, v) -> {
assert v == null : "No cluster for " + clusterAlias + " should have been added to ExecutionInfo yet";
return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.shouldSkipOnFailure(clusterAlias));
});
}
} finally {
executionInfo.clusterInfoInitializing(false);
}

if (executionInfo.isCrossClusterSearch() && EsqlLicenseChecker.isCcsAllowed(licenseState) == false) {
throw EsqlLicenseChecker.invalidLicenseForCcsException(licenseState);
}
} catch (NoSuchRemoteClusterException e) {
if (EsqlLicenseChecker.isCcsAllowed(licenseState)) {
throw e;
} else {
throw EsqlLicenseChecker.invalidLicenseForCcsException(licenseState);
}
Stream.concat(
resolvedIndices.failures().keySet().stream().map(remote -> Map.entry(remote, "")),
resolvedIndices.resolvedIndices()
.stream()
.map(index -> Map.entry(RemoteClusterAware.parseClusterAlias(index), RemoteClusterAware.splitIndexName(index)[1]))
).collect(groupingBy(Map.Entry::getKey, mapping(Map.Entry::getValue, joining(",")))).forEach((clusterAlias, indexExpr) -> {
executionInfo.swapCluster(clusterAlias, (k, v) -> {
assert v == null : "No cluster for " + clusterAlias + " should have been added to ExecutionInfo yet";
return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.shouldSkipOnFailure(clusterAlias));
});
});
} finally {
executionInfo.clusterInfoInitializing(false);
}
if (executionInfo.isCrossClusterSearch() && EsqlLicenseChecker.isCcsAllowed(licenseState) == false) {
throw EsqlLicenseChecker.invalidLicenseForCcsException(licenseState);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.compute.data.Block;
Expand All @@ -29,11 +30,11 @@
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesExpressionGrouper;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.NoSuchRemoteClusterException;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.xpack.esql.VerificationException;
Expand Down Expand Up @@ -81,12 +82,14 @@
import org.elasticsearch.xpack.esql.telemetry.PlanTelemetry;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -126,7 +129,6 @@ public interface PlanRunner {
private final Mapper mapper;
private final PhysicalPlanOptimizer physicalPlanOptimizer;
private final PlanTelemetry planTelemetry;
private final IndicesExpressionGrouper indicesExpressionGrouper;
private final InferenceService inferenceService;
private final RemoteClusterService remoteClusterService;
private final BlockFactory blockFactory;
Expand All @@ -148,7 +150,6 @@ public EsqlSession(
Mapper mapper,
Verifier verifier,
PlanTelemetry planTelemetry,
IndicesExpressionGrouper indicesExpressionGrouper,
TransportActionServices services
) {
this.sessionId = sessionId;
Expand All @@ -163,7 +164,6 @@ public EsqlSession(
this.logicalPlanOptimizer = logicalPlanOptimizer;
this.physicalPlanOptimizer = new PhysicalPlanOptimizer(new PhysicalOptimizerContext(configuration));
this.planTelemetry = planTelemetry;
this.indicesExpressionGrouper = indicesExpressionGrouper;
this.inferenceService = services.inferenceService();
this.preMapper = new PreMapper(services);
this.remoteClusterService = services.transportService().getRemoteClusterService();
Expand Down Expand Up @@ -453,8 +453,6 @@ private void resolveIndices(
PreAnalysisResult result,
ActionListener<LogicalPlan> logicalPlanListener
) {
EsqlCCSUtils.initCrossClusterState(indicesExpressionGrouper, verifier.licenseState(), preAnalysis.indexPattern(), executionInfo);

SubscribableListener.<PreAnalysisResult>newForked(l -> preAnalyzeMainIndices(preAnalysis, executionInfo, result, requestFilter, l))
.andThenApply(r -> {
if (r.indices.isValid()
Expand Down Expand Up @@ -708,34 +706,44 @@ private void preAnalyzeMainIndices(
ThreadPool.Names.SYSTEM_READ
);
if (preAnalysis.indexPattern() != null) {
if (executionInfo.clusterAliases().isEmpty()) {
// return empty resolution if the expression is pure CCS and resolved no remote clusters (like no-such-cluster*:index)
listener.onResponse(
result.withIndices(IndexResolution.valid(new EsIndex(preAnalysis.indexPattern().indexPattern(), Map.of(), Map.of())))
);
} else {
indexResolver.resolveAsMergedMapping(
preAnalysis.indexPattern().indexPattern(),
result.fieldNames,
// Maybe if no indices are returned, retry without index mode and provide a clearer error message.
switch (preAnalysis.indexMode()) {
case IndexMode.TIME_SERIES -> {
var indexModeFilter = new TermQueryBuilder(IndexModeFieldMapper.NAME, IndexMode.TIME_SERIES.getName());
yield requestFilter != null
? new BoolQueryBuilder().filter(requestFilter).filter(indexModeFilter)
: indexModeFilter;
}
default -> requestFilter;
},
preAnalysis.indexMode() == IndexMode.TIME_SERIES,
preAnalysis.supportsAggregateMetricDouble(),
preAnalysis.supportsDenseVector(),
listener.delegateFailureAndWrap((l, indexResolution) -> {
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.failures());
l.onResponse(result.withIndices(indexResolution));
})
);
}
indexResolver.resolveAsMergedMapping(
preAnalysis.indexPattern().indexPattern(),
result.fieldNames,
// Maybe if no indices are returned, retry without index mode and provide a clearer error message.
switch (preAnalysis.indexMode()) {
case IndexMode.TIME_SERIES -> {
var indexModeFilter = new TermQueryBuilder(IndexModeFieldMapper.NAME, IndexMode.TIME_SERIES.getName());
yield requestFilter != null
? new BoolQueryBuilder().filter(requestFilter).filter(indexModeFilter)
: indexModeFilter;
}
default -> requestFilter;
},
preAnalysis.indexMode() == IndexMode.TIME_SERIES,
preAnalysis.supportsAggregateMetricDouble(),
preAnalysis.supportsDenseVector(),
ActionListener.wrap(indexResolution -> {

// TODO this could be removed when allow_empty=true is properly handled
if (Objects.equals(indexResolution, IndexResolution.notFound(preAnalysis.indexPattern().indexPattern()))
&& Arrays.stream(Strings.commaDelimitedListToStringArray(preAnalysis.indexPattern().indexPattern()))
.map(RemoteClusterAware::parseClusterAlias)
.allMatch(remote -> remote.contains("*"))) {
indexResolution = IndexResolution.empty(preAnalysis.indexPattern().indexPattern());
}

EsqlCCSUtils.initCrossClusterState(indexResolution, executionInfo, verifier.licenseState());
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.failures());
listener.onResponse(result.withIndices(indexResolution));
}, failure -> {
if (failure instanceof NoSuchRemoteClusterException
&& EsqlLicenseChecker.isCcsAllowed(verifier.licenseState()) == false) {
listener.onFailure(EsqlLicenseChecker.invalidLicenseForCcsException(verifier.licenseState()));
} else {
listener.onFailure(failure);
}
})
);
} else {
// occurs when dealing with local relations (row a = 1)
listener.onResponse(result.withIndices(IndexResolution.invalid("[none specified]")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,6 @@ private ActualResults executePlan(BigArrays bigArrays) throws Exception {
mapper,
TEST_VERIFIER,
new PlanTelemetry(functionRegistry),
null,
EsqlTestUtils.MOCK_TRANSPORT_ACTION_SERVICES
);
TestPhysicalOperationProviders physicalOperationProviders = testOperationProviders(foldCtx, testDatasets);
Expand Down
Loading