diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java index 62d5904b58b86..cdc242daa682c 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java @@ -10,6 +10,7 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.internal.Client; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreakingException; @@ -41,6 +42,7 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.in; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; @@ -62,12 +64,15 @@ private static class ClusterSetup { void populateIndices() throws Exception { local.okIds = populateIndex(LOCAL_CLUSTER, "ok-local", local.okShards, between(1, 100)); populateIndexWithFailingFields(LOCAL_CLUSTER, "fail-local", local.failingShards); + createUnavailableIndex(LOCAL_CLUSTER, "unavailable-local"); remote1.okIds = populateIndex(REMOTE_CLUSTER_1, "ok-cluster1", remote1.okShards, between(1, 100)); populateIndexWithFailingFields(REMOTE_CLUSTER_1, "fail-cluster1", remote1.failingShards); + createUnavailableIndex(REMOTE_CLUSTER_1, "unavailable-cluster1"); remote2.okIds = populateIndex(REMOTE_CLUSTER_2, "ok-cluster2", remote2.okShards, between(1, 100)); populateIndexWithFailingFields(REMOTE_CLUSTER_2, "fail-cluster2", remote2.failingShards); + createUnavailableIndex(REMOTE_CLUSTER_2, "unavailable-cluster2"); } private void assertClusterPartial(EsqlQueryResponse resp, String clusterAlias, ClusterSetup cluster) { @@ -356,6 +361,42 @@ private static Exception randomFailure() { ); } + public void testResolutionFailures() throws Exception { + populateIndices(); + EsqlQueryRequest request = new EsqlQueryRequest(); + request.allowPartialResults(true); + request.query("FROM ok*,unavailable* | LIMIT 1000"); + try (var resp = runQuery(request)) { + assertThat(EsqlTestUtils.getValuesList(resp), hasSize(local.okIds.size())); + assertTrue(resp.isPartial()); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + var localCluster = executionInfo.getCluster(LOCAL_CLUSTER); + assertThat(localCluster.getFailures(), not(empty())); + assertThat(localCluster.getFailures().get(0).reason(), containsString("index [unavailable-local] has no active shard copy")); + } + request.query("FROM *:ok*,unavailable* | LIMIT 1000"); + try (var resp = runQuery(request)) { + assertThat(EsqlTestUtils.getValuesList(resp), hasSize(remote1.okIds.size() + remote2.okIds.size())); + assertTrue(resp.isPartial()); + var executionInfo = resp.getExecutionInfo(); + var localCluster = executionInfo.getCluster(LOCAL_CLUSTER); + assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(localCluster.getFailures(), not(empty())); + assertThat(localCluster.getFailures().get(0).reason(), containsString("index [unavailable-local] has no active shard copy")); + assertThat(executionInfo.getCluster(REMOTE_CLUSTER_1).getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(executionInfo.getCluster(REMOTE_CLUSTER_2).getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + } + request.query("FROM ok*,cluster-a:unavailable* | LIMIT 1000"); + try (var resp = runQuery(request)) { + assertThat(EsqlTestUtils.getValuesList(resp), hasSize(local.okIds.size())); + assertTrue(resp.isPartial()); + var remote1 = resp.getExecutionInfo().getCluster(REMOTE_CLUSTER_1); + assertThat(remote1.getFailures(), not(empty())); + assertThat(remote1.getFailures().get(0).reason(), containsString("index [unavailable-cluster1] has no active shard copy")); + assertThat(resp.getExecutionInfo().getCluster(LOCAL_CLUSTER).getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + } + } + private Set populateIndexWithFailingFields(String clusterAlias, String indexName, int numShards) throws IOException { Client client = client(clusterAlias); XContentBuilder mapping = JsonXContent.contentBuilder().startObject(); @@ -398,4 +439,15 @@ private Set populateIndexWithFailingFields(String clusterAlias, String i } return ids; } + + private void createUnavailableIndex(String clusterAlias, String indexName) throws IOException { + Client client = client(clusterAlias); + assertAcked( + client.admin() + .indices() + .prepareCreate(indexName) + .setSettings(Settings.builder().put("index.routing.allocation.include._name", "no_such_node")) + .setWaitForActiveShards(ActiveShardCount.NONE) + ); + } } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/CanMatchIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/CanMatchIT.java index e1ef6730c1f05..541e6a1421946 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/CanMatchIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/CanMatchIT.java @@ -19,8 +19,10 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.esql.action.AbstractEsqlIntegTestCase; +import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; import org.elasticsearch.xpack.esql.action.EsqlQueryResponse; import java.util.Collection; @@ -363,6 +365,10 @@ public void testFailOnUnavailableShards() throws Exception { syncEsqlQueryRequest().query("from events,logs | KEEP timestamp,message").allowPartialResults(true) ) ) { + assertTrue(resp.isPartial()); + EsqlExecutionInfo.Cluster local = resp.getExecutionInfo().getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + assertThat(local.getFailures(), hasSize(1)); + assertThat(local.getFailures().get(0).reason(), containsString("index [logs] has no active shard copy")); assertThat(getValuesList(resp), hasSize(3)); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java index 55c36aa1cf353..61d0d3b0e1026 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java @@ -28,6 +28,7 @@ import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.EnumMap; import java.util.Iterator; @@ -562,8 +563,14 @@ public Cluster.Builder setFailedShards(int failedShards) { return this; } - public Cluster.Builder setFailures(List failures) { - this.failures = failures; + public Cluster.Builder addFailures(List failures) { + if (failures.isEmpty()) { + return this; + } + if (this.failures == null) { + this.failures = new ArrayList<>(original.failures); + } + this.failures.addAll(failures); return this; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/index/IndexResolution.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/index/IndexResolution.java index b9040d2ef40d6..4d31f48da77de 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/index/IndexResolution.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/index/IndexResolution.java @@ -6,10 +6,10 @@ */ package org.elasticsearch.xpack.esql.index; -import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure; import org.elasticsearch.core.Nullable; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -19,33 +19,26 @@ public final class IndexResolution { /** * @param index EsIndex encapsulating requested index expression, resolved mappings and index modes from field-caps. * @param resolvedIndices Set of concrete indices resolved by field-caps. (This information is not always present in the EsIndex). - * @param unavailableShards Set of shards that were unavailable during index resolution - * @param unavailableClusters Remote clusters that could not be contacted during planning + * @param failures failures occurred during field-caps. * @return valid IndexResolution */ - public static IndexResolution valid( - EsIndex index, - Set resolvedIndices, - Set unavailableShards, - Map unavailableClusters - ) { + public static IndexResolution valid(EsIndex index, Set resolvedIndices, Map> failures) { Objects.requireNonNull(index, "index must not be null if it was found"); Objects.requireNonNull(resolvedIndices, "resolvedIndices must not be null"); - Objects.requireNonNull(unavailableShards, "unavailableShards must not be null"); - Objects.requireNonNull(unavailableClusters, "unavailableClusters must not be null"); - return new IndexResolution(index, null, resolvedIndices, unavailableShards, unavailableClusters); + Objects.requireNonNull(failures, "failures must not be null"); + return new IndexResolution(index, null, resolvedIndices, failures); } /** * Use this method only if the set of concrete resolved indices is the same as EsIndex#concreteIndices(). */ public static IndexResolution valid(EsIndex index) { - return valid(index, index.concreteIndices(), Set.of(), Map.of()); + return valid(index, index.concreteIndices(), Map.of()); } public static IndexResolution invalid(String invalid) { Objects.requireNonNull(invalid, "invalid must not be null to signal that the index is invalid"); - return new IndexResolution(null, invalid, Set.of(), Set.of(), Map.of()); + return new IndexResolution(null, invalid, Set.of(), Map.of()); } public static IndexResolution notFound(String name) { @@ -59,22 +52,19 @@ public static IndexResolution notFound(String name) { // all indices found by field-caps private final Set resolvedIndices; - private final Set unavailableShards; - // remote clusters included in the user's index expression that could not be connected to - private final Map unavailableClusters; + // map from cluster alias to failures that occurred during field-caps. + private final Map> failures; private IndexResolution( EsIndex index, @Nullable String invalid, Set resolvedIndices, - Set unavailableShards, - Map unavailableClusters + Map> failures ) { this.index = index; this.invalid = invalid; this.resolvedIndices = resolvedIndices; - this.unavailableShards = unavailableShards; - this.unavailableClusters = unavailableClusters; + this.failures = failures; } public boolean matches(String indexName) { @@ -101,11 +91,10 @@ public boolean isValid() { } /** - * @return Map of unavailable clusters (could not be connected to during field-caps query). Key of map is cluster alias, - * value is the {@link FieldCapabilitiesFailure} describing the issue. + * @return Map from cluster alias to failures that occurred during field-caps. */ - public Map unavailableClusters() { - return unavailableClusters; + public Map> failures() { + return failures; } /** @@ -115,13 +104,6 @@ public Set resolvedIndices() { return resolvedIndices; } - /** - * @return set of unavailable shards during index resolution - */ - public Set getUnavailableShards() { - return unavailableShards; - } - @Override public boolean equals(Object obj) { if (obj == null || obj.getClass() != getClass()) { @@ -131,12 +113,12 @@ public boolean equals(Object obj) { return Objects.equals(index, other.index) && Objects.equals(invalid, other.invalid) && Objects.equals(resolvedIndices, other.resolvedIndices) - && Objects.equals(unavailableClusters, other.unavailableClusters); + && Objects.equals(failures, other.failures); } @Override public int hashCode() { - return Objects.hash(index, invalid, resolvedIndices, unavailableClusters); + return Objects.hash(index, invalid, resolvedIndices, failures); } @Override @@ -152,7 +134,7 @@ public String toString() { + ", resolvedIndices=" + resolvedIndices + ", unavailableClusters=" - + unavailableClusters + + failures + '}'; } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java index 5064b2bbd101a..4e8a89d024b71 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java @@ -166,7 +166,7 @@ private void updateExecutionInfo(EsqlExecutionInfo executionInfo, String cluster builder.setTook(executionInfo.tookSoFar()); } if (v.getStatus() == EsqlExecutionInfo.Cluster.Status.RUNNING) { - builder.setFailures(resp.failures); + builder.addFailures(resp.failures); if (executionInfo.isStopped() || resp.failedShards > 0 || resp.failures.isEmpty() == false) { builder.setStatus(EsqlExecutionInfo.Cluster.Status.PARTIAL); } else { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 39e3503b5fdd9..4c13eb384924e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -395,9 +395,13 @@ public void executePlan( var builder = new EsqlExecutionInfo.Cluster.Builder(v).setTook(tookTime); if (v.getStatus() == EsqlExecutionInfo.Cluster.Status.RUNNING) { final Integer failedShards = execInfo.getCluster(LOCAL_CLUSTER).getFailedShards(); - var status = localClusterWasInterrupted.get() || (failedShards != null && failedShards > 0) - ? EsqlExecutionInfo.Cluster.Status.PARTIAL - : EsqlExecutionInfo.Cluster.Status.SUCCESSFUL; + // Set the local cluster status (including the final driver) to partial if the query was stopped + // or encountered resolution or execution failures. + var status = localClusterWasInterrupted.get() + || (failedShards != null && failedShards > 0) + || v.getFailures().isEmpty() == false + ? EsqlExecutionInfo.Cluster.Status.PARTIAL + : EsqlExecutionInfo.Cluster.Status.SUCCESSFUL; builder.setStatus(status); } return builder.build(); @@ -445,7 +449,7 @@ public void executePlan( .setSuccessfulShards(r.getSuccessfulShards()) .setSkippedShards(r.getSkippedShards()) .setFailedShards(r.getFailedShards()) - .setFailures(r.failures) + .addFailures(r.failures) .build() ); dataNodesListener.onResponse(r.getCompletionInfo()); @@ -455,7 +459,7 @@ public void executePlan( LOCAL_CLUSTER, (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus( EsqlExecutionInfo.Cluster.Status.PARTIAL - ).setFailures(List.of(new ShardSearchFailure(e))).build() + ).addFailures(List.of(new ShardSearchFailure(e))).build() ); dataNodesListener.onResponse(DriverCompletionInfo.EMPTY); } else { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java index d507b8275178d..4b25ddfdfc2db 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java @@ -15,7 +15,6 @@ import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.compute.operator.DriverCompletionInfo; import org.elasticsearch.core.Nullable; import org.elasticsearch.index.IndexNotFoundException; @@ -35,6 +34,7 @@ import org.elasticsearch.xpack.esql.plan.IndexPattern; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -47,15 +47,23 @@ public class EsqlCCSUtils { private EsqlCCSUtils() {} - static Map determineUnavailableRemoteClusters(List failures) { - Map unavailableRemotes = new HashMap<>(); + static Map> groupFailuresPerCluster(List failures) { + Map> perCluster = new HashMap<>(); for (FieldCapabilitiesFailure failure : failures) { - if (ExceptionsHelper.isRemoteUnavailableException(failure.getException())) { - for (String indexExpression : failure.getIndices()) { - if (indexExpression.indexOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR) > 0) { - unavailableRemotes.put(RemoteClusterAware.parseClusterAlias(indexExpression), failure); - } - } + String cluster = RemoteClusterAware.parseClusterAlias(failure.getIndices()[0]); + perCluster.computeIfAbsent(cluster, k -> new ArrayList<>()).add(failure); + } + return perCluster; + } + + static Map determineUnavailableRemoteClusters(Map> failures) { + Map unavailableRemotes = new HashMap<>(failures.size()); + for (var e : failures.entrySet()) { + if (Strings.isEmpty(e.getKey())) { + continue; + } + if (e.getValue().stream().allMatch(f -> ExceptionsHelper.isRemoteUnavailableException(f.getException()))) { + unavailableRemotes.put(e.getKey(), e.getValue().get(0)); } } return unavailableRemotes; @@ -136,8 +144,8 @@ static void updateExecutionInfoToReturnEmptyResult(EsqlExecutionInfo executionIn } else { builder.setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED); // add this exception to the failures list only if there is no failure already recorded there - if (v.getFailures() == null || v.getFailures().size() == 0) { - builder.setFailures(List.of(new ShardSearchFailure(exceptionForResponse))); + if (v.getFailures().isEmpty()) { + builder.addFailures(List.of(new ShardSearchFailure(exceptionForResponse))); } } return builder.build(); @@ -188,18 +196,15 @@ static void updateExecutionInfoWithUnavailableClusters(EsqlExecutionInfo execInf static void updateExecutionInfoWithClustersWithNoMatchingIndices( EsqlExecutionInfo executionInfo, IndexResolution indexResolution, + Set unavailableClusters, QueryBuilder filter ) { - Set clustersWithResolvedIndices = new HashSet<>(); - // determine missing clusters + final Set clustersWithNoMatchingIndices = new HashSet<>(executionInfo.clusterAliases()); for (String indexName : indexResolution.resolvedIndices()) { - clustersWithResolvedIndices.add(RemoteClusterAware.parseClusterAlias(indexName)); + clustersWithNoMatchingIndices.remove(RemoteClusterAware.parseClusterAlias(indexName)); } - Set clustersRequested = executionInfo.clusterAliases(); - Set clustersWithNoMatchingIndices = Sets.difference(clustersRequested, clustersWithResolvedIndices); - clustersWithNoMatchingIndices.removeAll(indexResolution.unavailableClusters().keySet()); - - /** + clustersWithNoMatchingIndices.removeAll(unavailableClusters); + /* * Rules enforced at planning time around non-matching indices * 1. fail query if no matching indices on any cluster (VerificationException) - that is handled elsewhere * 2. fail query if a cluster has no matching indices *and* a concrete index was specified - handled here @@ -238,10 +243,22 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices( ); } } else { + // We check for the valid resolution because if we have empty resolution it's still an error. if (indexResolution.isValid()) { - // no matching indices and no concrete index requested - just mark it as done, no error - // We check for the valid resolution because if we have empty resolution it's still an error. - markClusterWithFinalStateAndNoShards(executionInfo, c, Cluster.Status.SUCCESSFUL, null); + List failures = indexResolution.failures().getOrDefault(c, List.of()); + // No matching indices, no concrete index requested, and no error in field-caps; just mark as done. + if (failures.isEmpty()) { + markClusterWithFinalStateAndNoShards(executionInfo, c, Cluster.Status.SUCCESSFUL, null); + } else { + // skip reporting index_not_found exceptions to avoid spamming users with such errors + // when queries use a remote cluster wildcard, e.g., `*:my-logs*`. + Exception nonIndexNotFound = failures.stream() + .map(FieldCapabilitiesFailure::getException) + .filter(ex -> ExceptionsHelper.unwrap(ex, IndexNotFoundException.class) == null) + .findAny() + .orElse(null); + markClusterWithFinalStateAndNoShards(executionInfo, c, Cluster.Status.SKIPPED, nonIndexNotFound); + } } } } @@ -252,7 +269,8 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices( // Filter-less version, mainly for testing where we don't need filter support static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionInfo executionInfo, IndexResolution indexResolution) { - updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution, null); + var unavailableClusters = EsqlCCSUtils.determineUnavailableRemoteClusters(indexResolution.failures()).keySet(); + updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution, unavailableClusters, null); } // visible for testing @@ -360,7 +378,7 @@ public static void markClusterWithFinalStateAndNoShards( .setSkippedShards(Objects.requireNonNullElse(v.getSkippedShards(), 0)) .setFailedShards(Objects.requireNonNullElse(v.getFailedShards(), 0)); if (ex != null) { - builder.setFailures(List.of(new ShardSearchFailure(ex))); + builder.addFailures(List.of(new ShardSearchFailure(ex))); } return builder.build(); }); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index d148f7f4db73c..dde54bccf2da4 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -7,8 +7,12 @@ package org.elasticsearch.xpack.esql.session; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; +import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure; +import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.common.Strings; @@ -19,15 +23,19 @@ import org.elasticsearch.compute.data.BlockUtils; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.DriverCompletionInfo; +import org.elasticsearch.compute.operator.FailureCollector; +import org.elasticsearch.core.CheckedFunction; import org.elasticsearch.core.Releasables; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.mapper.IndexModeFieldMapper; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.TermQueryBuilder; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesExpressionGrouper; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; +import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.xpack.esql.VerificationException; import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; import org.elasticsearch.xpack.esql.action.EsqlQueryRequest; @@ -108,7 +116,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.function.Function; import java.util.stream.Collectors; import static org.elasticsearch.index.query.QueryBuilders.boolQuery; @@ -331,6 +338,53 @@ private LogicalPlan parse(String query, QueryParams params) { return parsed; } + /** + * Associates errors that occurred during field-caps with the cluster info in the execution info. + * - Skips clusters that are no longer running, as they have already been marked as successful, skipped, or failed. + * - If allow_partial_results or skip_unavailable is enabled, stores the failures in the cluster info but allows execution to continue. + * - Otherwise, aborts execution with the failures. + */ + static void handleFieldCapsFailures( + boolean allowPartialResults, + EsqlExecutionInfo executionInfo, + Map> failures + ) throws Exception { + FailureCollector failureCollector = new FailureCollector(); + for (var e : failures.entrySet()) { + String clusterAlias = e.getKey(); + EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(clusterAlias); + if (cluster.getStatus() != EsqlExecutionInfo.Cluster.Status.RUNNING) { + assert cluster.getStatus() != EsqlExecutionInfo.Cluster.Status.SUCCESSFUL : "can't mark a cluster success with failures"; + continue; + } + if (allowPartialResults == false && executionInfo.isSkipUnavailable(clusterAlias) == false) { + for (FieldCapabilitiesFailure failure : e.getValue()) { + failureCollector.unwrapAndCollect(failure.getException()); + } + } else if (cluster.getFailures().isEmpty()) { + var shardFailures = e.getValue().stream().map(f -> { + ShardId shardId = null; + if (ExceptionsHelper.unwrapCause(f.getException()) instanceof ElasticsearchException es) { + shardId = es.getShardId(); + } + if (shardId != null) { + return new ShardSearchFailure(f.getException(), new SearchShardTarget(null, shardId, clusterAlias)); + } else { + return new ShardSearchFailure(f.getException()); + } + }).toList(); + executionInfo.swapCluster( + clusterAlias, + (k, curr) -> new EsqlExecutionInfo.Cluster.Builder(cluster).addFailures(shardFailures).build() + ); + } + } + Exception failure = failureCollector.getFailure(); + if (failure != null) { + throw failure; + } + } + public void analyzedPlan( LogicalPlan parsed, EsqlExecutionInfo executionInfo, @@ -342,7 +396,8 @@ public void analyzedPlan( return; } - Function analyzeAction = (l) -> { + CheckedFunction analyzeAction = (l) -> { + handleFieldCapsFailures(configuration.allowPartialResults(), executionInfo, l.indices.failures()); Analyzer analyzer = new Analyzer( new AnalyzerContext(configuration, functionRegistry, l.indices, l.lookupIndices, l.enrichResolution, l.inferenceResolution), verifier @@ -402,8 +457,14 @@ public void analyzedPlan( try { // the order here is tricky - if the cluster has been filtered and later became unavailable, // do we want to declare it successful or skipped? For now, unavailability takes precedence. - EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, result.indices.unavailableClusters()); - EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, result.indices, null); + var unavailableClusters = EsqlCCSUtils.determineUnavailableRemoteClusters(result.indices.failures()); + EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, unavailableClusters); + EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices( + executionInfo, + result.indices, + unavailableClusters.keySet(), + null + ); plan = analyzeAction.apply(result); } catch (Exception e) { l.onFailure(e); @@ -485,11 +546,7 @@ private void preAnalyzeMainIndices( result.fieldNames, requestFilter, listener.delegateFailure((l, indexResolution) -> { - if (configuration.allowPartialResults() == false && indexResolution.getUnavailableShards().isEmpty() == false) { - l.onFailure(indexResolution.getUnavailableShards().iterator().next()); - } else { - l.onResponse(result.withIndexResolution(indexResolution)); - } + l.onResponse(result.withIndexResolution(indexResolution)); }) ); } @@ -514,7 +571,10 @@ private boolean allCCSClustersSkipped( ActionListener logicalPlanListener ) { IndexResolution indexResolution = result.indices; - EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.unavailableClusters()); + EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters( + executionInfo, + EsqlCCSUtils.determineUnavailableRemoteClusters(indexResolution.failures()) + ); if (executionInfo.isCrossClusterSearch() && executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING).findAny().isEmpty()) { // for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel Exception @@ -528,7 +588,7 @@ private boolean allCCSClustersSkipped( } private static void analyzeAndMaybeRetry( - Function analyzeAction, + CheckedFunction analyzeAction, QueryBuilder requestFilter, PreAnalysisResult result, EsqlExecutionInfo executionInfo, @@ -544,7 +604,13 @@ private static void analyzeAndMaybeRetry( if (result.indices.isValid() || requestFilter != null) { // We won't run this check with no filter and no valid indices since this may lead to false positive - missing index report // when the resolution result is not valid for a different reason. - EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, result.indices, requestFilter); + var unavailableClusters = EsqlCCSUtils.determineUnavailableRemoteClusters(result.indices.failures()).keySet(); + EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices( + executionInfo, + result.indices, + unavailableClusters, + requestFilter + ); } plan = analyzeAction.apply(result); } catch (Exception e) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java index d2f79ceb1316f..16401574b0f58 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java @@ -7,8 +7,6 @@ package org.elasticsearch.xpack.esql.session; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.NoShardAvailableActionException; -import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesIndexResponse; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse; @@ -149,17 +147,6 @@ public static IndexResolution mergedMappings(String indexPattern, FieldCapabilit } } - Map unavailableRemotes = EsqlCCSUtils.determineUnavailableRemoteClusters( - fieldCapsResponse.getFailures() - ); - - Set unavailableShards = new HashSet<>(); - for (FieldCapabilitiesFailure failure : fieldCapsResponse.getFailures()) { - if (failure.getException() instanceof NoShardAvailableActionException e) { - unavailableShards.add(e); - } - } - Map concreteIndices = Maps.newMapWithExpectedSize(fieldCapsResponse.getIndexResponses().size()); for (FieldCapabilitiesIndexResponse ir : fieldCapsResponse.getIndexResponses()) { concreteIndices.put(ir.getIndexName(), ir.getIndexMode()); @@ -171,7 +158,8 @@ public static IndexResolution mergedMappings(String indexPattern, FieldCapabilit } // If all the mappings are empty we return an empty set of resolved indices to line up with QL var index = new EsIndex(indexPattern, rootFields, allEmpty ? Map.of() : concreteIndices, partiallyUnmappedFields); - return IndexResolution.valid(index, concreteIndices.keySet(), unavailableShards, unavailableRemotes); + var failures = EsqlCCSUtils.groupFailuresPerCluster(fieldCapsResponse.getFailures()); + return IndexResolution.valid(index, concreteIndices.keySet(), failures); } private static Map> collectFieldCaps(FieldCapabilitiesResponse fieldCapsResponse) { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfoTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfoTests.java index 111d86669af22..19899e62ca057 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfoTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfoTests.java @@ -57,7 +57,7 @@ public void testHasMetadataPartial() { assertFalse(info.hasMetadataToReport()); info.swapCluster(key, (k, v) -> { EsqlExecutionInfo.Cluster.Builder builder = new EsqlExecutionInfo.Cluster.Builder(v); - builder.setFailures(List.of(new ShardSearchFailure(new IllegalStateException("shard failure")))); + builder.addFailures(List.of(new ShardSearchFailure(new IllegalStateException("shard failure")))); return builder.build(); }); assertTrue(info.hasMetadataToReport()); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java index 2d488d7e41ee8..a3d719cf91276 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java @@ -253,7 +253,7 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { ) ); - IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), Set.of(), Map.of()); + IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), Map.of()); EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); @@ -296,8 +296,7 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { IndexMode.STANDARD ) ); - Map unavailableClusters = Map.of(); - IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), Set.of(), unavailableClusters); + IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), Map.of()); EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); @@ -338,8 +337,8 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { ); // remote1 is unavailable var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); - Map unavailableClusters = Map.of(REMOTE1_ALIAS, failure); - IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), Set.of(), unavailableClusters); + var unavailableClusters = Map.of(REMOTE1_ALIAS, List.of(failure)); + IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), unavailableClusters); EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); @@ -381,8 +380,8 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { ); var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); - Map unavailableClusters = Map.of(REMOTE1_ALIAS, failure); - IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), Set.of(), unavailableClusters); + var unavailableClusters = Map.of(REMOTE1_ALIAS, List.of(failure)); + IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), unavailableClusters); EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER_ALIAS); @@ -430,8 +429,8 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { // remote1 is unavailable var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); - Map unavailableClusters = Map.of(REMOTE1_ALIAS, failure); - IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), Set.of(), unavailableClusters); + var unavailableClusters = Map.of(REMOTE1_ALIAS, List.of(failure)); + IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), unavailableClusters); EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); @@ -463,7 +462,9 @@ public void testDetermineUnavailableRemoteClusters() { ) ); - Map unavailableClusters = EsqlCCSUtils.determineUnavailableRemoteClusters(failures); + Map unavailableClusters = EsqlCCSUtils.determineUnavailableRemoteClusters( + EsqlCCSUtils.groupFailuresPerCluster(failures) + ); assertThat(unavailableClusters.keySet(), equalTo(Set.of("remote1", "remote2"))); } @@ -473,7 +474,8 @@ public void testDetermineUnavailableRemoteClusters() { failures.add(new FieldCapabilitiesFailure(new String[] { "remote2:mylogs1" }, new NoSuchRemoteClusterException("remote2"))); failures.add(new FieldCapabilitiesFailure(new String[] { "remote2:mylogs1" }, new NoSeedNodeLeftException("no seed node"))); - Map unavailableClusters = EsqlCCSUtils.determineUnavailableRemoteClusters(failures); + var groupedFailures = EsqlCCSUtils.groupFailuresPerCluster(failures); + Map unavailableClusters = EsqlCCSUtils.determineUnavailableRemoteClusters(groupedFailures); assertThat(unavailableClusters.keySet(), equalTo(Set.of("remote2"))); } @@ -487,7 +489,8 @@ public void testDetermineUnavailableRemoteClusters() { new IllegalStateException("Unable to open any connections") ) ); - Map unavailableClusters = EsqlCCSUtils.determineUnavailableRemoteClusters(failures); + var groupedFailures = EsqlCCSUtils.groupFailuresPerCluster(failures); + Map unavailableClusters = EsqlCCSUtils.determineUnavailableRemoteClusters(groupedFailures); assertThat(unavailableClusters.keySet(), equalTo(Set.of("remote2"))); } @@ -495,14 +498,16 @@ public void testDetermineUnavailableRemoteClusters() { { List failures = new ArrayList<>(); failures.add(new FieldCapabilitiesFailure(new String[] { "remote1:mylogs1" }, new RuntimeException("foo"))); - Map unavailableClusters = EsqlCCSUtils.determineUnavailableRemoteClusters(failures); + var groupedFailures = EsqlCCSUtils.groupFailuresPerCluster(failures); + Map unavailableClusters = EsqlCCSUtils.determineUnavailableRemoteClusters(groupedFailures); assertThat(unavailableClusters.keySet(), equalTo(Set.of())); } // empty failures list { List failures = new ArrayList<>(); - Map unavailableClusters = EsqlCCSUtils.determineUnavailableRemoteClusters(failures); + var groupedFailures = EsqlCCSUtils.groupFailuresPerCluster(failures); + Map unavailableClusters = EsqlCCSUtils.determineUnavailableRemoteClusters(groupedFailures); assertThat(unavailableClusters.keySet(), equalTo(Set.of())); } }