Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,19 @@ public Map<String, Cluster> getClusters() {
return clusterInfo;
}

/**
* This creates an initial Cluster object with indexExpression and skipUnavailable.
*/
public void initCluster(String clusterAlias, String indexExpression) {
swapCluster(clusterAlias, (ca, previous) -> {
var expr = indexExpression;
if (previous != null) {
expr = previous.getIndexExpression() + "," + indexExpression;
}
return new Cluster(clusterAlias, expr, shouldSkipOnFailure(clusterAlias));
});
}

/**
* Utility to swap a Cluster object. Guidelines for the remapping function:
* <ul>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,34 +447,22 @@ public void messageReceived(LookupRequest request, TransportChannel channel, Tas
}
try (ThreadContext.StoredContext ignored = threadContext.stashWithOrigin(ClientHelper.ENRICH_ORIGIN)) {
String indexName = EnrichPolicy.getBaseName(policyName);
indexResolver.resolveIndices(
indexName,
IndexResolver.ALL_FIELDS,
null,
false,
// Disable aggregate_metric_double and dense_vector until we get version checks in planning
false,
false,
refs.acquire(indexResult -> {
if (indexResult.isValid() && indexResult.get().concreteQualifiedIndices().size() == 1) {
EsIndex esIndex = indexResult.get();
var concreteIndices = Map.of(
request.clusterAlias,
Iterables.get(esIndex.concreteQualifiedIndices(), 0)
);
var resolved = new ResolvedEnrichPolicy(
p.getMatchField(),
p.getType(),
p.getEnrichFields(),
concreteIndices,
esIndex.mapping()
);
resolvedPolices.put(policyName, resolved);
} else {
failures.put(policyName, indexResult.toString());
}
})
);
indexResolver.resolveIndices(indexName, IndexResolver.ALL_FIELDS, refs.acquire(indexResult -> {
if (indexResult.isValid() && indexResult.get().concreteQualifiedIndices().size() == 1) {
EsIndex esIndex = indexResult.get();
var concreteIndices = Map.of(request.clusterAlias, Iterables.get(esIndex.concreteQualifiedIndices(), 0));
var resolved = new ResolvedEnrichPolicy(
p.getMatchField(),
p.getType(),
p.getEnrichFields(),
concreteIndices,
esIndex.mapping()
);
resolvedPolices.put(policyName, resolved);
} else {
failures.put(policyName, indexResult.toString());
}
}));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ private static QueryRewriteContext queryRewriteContext(TransportActionServices s
ClusterState clusterState = services.clusterService().state();
ResolvedIndices resolvedIndices = ResolvedIndices.resolveWithIndexNamesAndOptions(
indexNames.toArray(String[]::new),
IndexResolver.FIELD_CAPS_INDICES_OPTIONS,
IndexResolver.DEFAULT_OPTIONS,
services.projectResolver().getProjectMetadata(clusterState),
services.indexNameExpressionResolver(),
services.transportService().getRemoteClusterService(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo.Cluster;
import org.elasticsearch.xpack.esql.analysis.Analyzer;
import org.elasticsearch.xpack.esql.index.EsIndex;
import org.elasticsearch.xpack.esql.index.IndexResolution;
import org.elasticsearch.xpack.esql.plan.IndexPattern;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
Expand Down Expand Up @@ -337,13 +338,7 @@ public static void initCrossClusterState(
// so that the CCS telemetry handler can recognize that this error is CCS-related
try {
groupedIndices.forEach((clusterAlias, indices) -> {
executionInfo.swapCluster(clusterAlias, (k, v) -> {
var indexExpr = Strings.arrayToCommaDelimitedString(indices.indices());
if (v != null) {
indexExpr = v.getIndexExpression() + "," + indexExpr;
}
return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.shouldSkipOnFailure(clusterAlias));
});
executionInfo.initCluster(clusterAlias, Strings.arrayToCommaDelimitedString(indices.indices()));
});
} finally {
executionInfo.clusterInfoInitializing(false);
Expand All @@ -362,6 +357,12 @@ public static void initCrossClusterState(
}
}

public static void initCrossClusterState(EsIndex esIndex, EsqlExecutionInfo executionInfo) {
esIndex.originalIndices().forEach((clusterAlias, indices) -> {
executionInfo.initCluster(clusterAlias, Strings.collectionToCommaDelimitedString(indices));
});
}

/**
* Mark cluster with a final status (success or failure).
* Most metrics are set to 0 if not set yet, except for "took" which is set to the total time taken so far.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -605,11 +605,6 @@ private void preAnalyzeLookupIndex(
indexResolver.resolveIndices(
EsqlCCSUtils.createQualifiedLookupIndexExpressionFromAvailableClusters(executionInfo, localPattern),
result.wildcardJoinIndices().contains(localPattern) ? IndexResolver.ALL_FIELDS : result.fieldNames,
null,
false,
// Disable aggregate_metric_double and dense_vector until we get version checks in planning
false,
false,
listener.map(indexResolution -> receiveLookupIndexResolution(result, localPattern, executionInfo, indexResolution))
);
}
Expand Down Expand Up @@ -810,20 +805,34 @@ private void preAnalyzeMainIndices(
QueryBuilder requestFilter,
ActionListener<PreAnalysisResult> listener
) {
EsqlCCSUtils.initCrossClusterState(
indicesExpressionGrouper,
verifier.licenseState(),
preAnalysis.indexes().keySet(),
executionInfo
);
// The main index pattern dictates on which nodes the query can be executed,
// so we use the minimum transport version from this field caps request.
forAll(
preAnalysis.indexes().entrySet().iterator(),
result,
(entry, r, l) -> preAnalyzeMainIndices(entry.getKey(), entry.getValue(), preAnalysis, executionInfo, r, requestFilter, l),
listener
assert ThreadPool.assertCurrentThreadPool(
ThreadPool.Names.SEARCH,
ThreadPool.Names.SEARCH_COORDINATION,
ThreadPool.Names.SYSTEM_READ
);
if (crossProjectModeDecider.crossProjectEnabled() == false) {
EsqlCCSUtils.initCrossClusterState(
indicesExpressionGrouper,
verifier.licenseState(),
preAnalysis.indexes().keySet(),
executionInfo
);
// The main index pattern dictates on which nodes the query can be executed,
// so we use the minimum transport version from this field caps request.
forAll(
preAnalysis.indexes().entrySet().iterator(),
result,
(e, r, l) -> preAnalyzeMainIndices(e.getKey(), e.getValue(), preAnalysis, executionInfo, r, requestFilter, l),
listener
);
} else {
forAll(
preAnalysis.indexes().entrySet().iterator(),
result,
(e, r, l) -> preAnalyzeFlatMainIndices(e.getKey(), e.getValue(), preAnalysis, executionInfo, r, requestFilter, l),
listener
);
}
}

private void preAnalyzeMainIndices(
Expand All @@ -835,28 +844,14 @@ private void preAnalyzeMainIndices(
QueryBuilder requestFilter,
ActionListener<PreAnalysisResult> listener
) {
assert ThreadPool.assertCurrentThreadPool(
ThreadPool.Names.SEARCH,
ThreadPool.Names.SEARCH_COORDINATION,
ThreadPool.Names.SYSTEM_READ
);
if (executionInfo.clusterAliases().isEmpty()) {
// return empty resolution if the expression is pure CCS and resolved no remote clusters (like no-such-cluster*:index)
listener.onResponse(result.withIndices(indexPattern, IndexResolution.empty(indexPattern.indexPattern())));
} else {
indexResolver.resolveIndicesVersioned(
indexPattern.indexPattern(),
result.fieldNames,
// Maybe if no indices are returned, retry without index mode and provide a clearer error message.
switch (indexMode) {
case IndexMode.TIME_SERIES -> {
var indexModeFilter = new TermQueryBuilder(IndexModeFieldMapper.NAME, IndexMode.TIME_SERIES.getName());
yield requestFilter != null
? new BoolQueryBuilder().filter(requestFilter).filter(indexModeFilter)
: indexModeFilter;
}
default -> requestFilter;
},
createQueryFilter(indexMode, requestFilter),
indexMode == IndexMode.TIME_SERIES,
preAnalysis.useAggregateMetricDoubleWhenNotSupported(),
preAnalysis.useDenseVectorWhenNotSupported(),
Expand All @@ -872,6 +867,43 @@ private void preAnalyzeMainIndices(
}
}

private void preAnalyzeFlatMainIndices(
IndexPattern indexPattern,
IndexMode indexMode,
PreAnalyzer.PreAnalysis preAnalysis,
EsqlExecutionInfo executionInfo,
PreAnalysisResult result,
QueryBuilder requestFilter,
ActionListener<PreAnalysisResult> listener
) {
indexResolver.resolveFlatWorldIndicesVersioned(
indexPattern.indexPattern(),
result.fieldNames,
createQueryFilter(indexMode, requestFilter),
indexMode == IndexMode.TIME_SERIES,
preAnalysis.useAggregateMetricDoubleWhenNotSupported(),
preAnalysis.useDenseVectorWhenNotSupported(),
listener.delegateFailureAndWrap((l, indexResolution) -> {
EsqlCCSUtils.initCrossClusterState(indexResolution.inner().get(), executionInfo);
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.inner().failures());
l.onResponse(
result.withIndices(indexPattern, indexResolution.inner()).withMinimumTransportVersion(indexResolution.minimumVersion())
);
})
);
}

private static QueryBuilder createQueryFilter(IndexMode indexMode, QueryBuilder requestFilter) {
// Maybe if no indices are returned, retry without index mode and provide a clearer error message.
return switch (indexMode) {
case IndexMode.TIME_SERIES -> {
var indexModeFilter = new TermQueryBuilder(IndexModeFieldMapper.NAME, IndexMode.TIME_SERIES.getName());
yield requestFilter != null ? new BoolQueryBuilder().filter(requestFilter).filter(indexModeFilter) : indexModeFilter;
}
default -> requestFilter;
};
}

private void analyzeWithRetry(
LogicalPlan parsed,
Configuration configuration,
Expand Down
Loading