diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java index 2efa3433cbb74..310d9906dc4b7 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java @@ -17,6 +17,7 @@ import org.elasticsearch.action.search.SearchShardsResponse; import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.compute.operator.DriverProfile; import org.elasticsearch.compute.operator.FailureCollector; @@ -209,8 +210,9 @@ private static Exception unwrapFailure(Exception e) { private void trackShardLevelFailure(ShardId shardId, boolean fatal, Exception originalEx) { final Exception e = unwrapFailure(originalEx); final boolean isTaskCanceledException = ExceptionsHelper.unwrap(e, TaskCancelledException.class) != null; + final boolean isCircuitBreakerException = ExceptionsHelper.unwrap(e, CircuitBreakingException.class) != null; shardFailures.compute(shardId, (k, current) -> { - boolean mergedFatal = fatal || isTaskCanceledException; + boolean mergedFatal = fatal || isTaskCanceledException || isCircuitBreakerException; return current == null ? new ShardFailure(mergedFatal, e) : new ShardFailure( diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java index 5f30a5257494e..dd02d6fc3b189 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java @@ -15,6 +15,8 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; +import org.elasticsearch.common.breaker.CircuitBreaker.Durability; +import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsExecutors; @@ -271,6 +273,17 @@ public void testNonFatalFailedOnAllNodes() { assertThat(sent.size(), equalTo(2)); } + public void testDoNotRetryCircuitBreakerException() { + var targetShards = List.of(targetShard(shard1, node1, node2)); + var sent = ConcurrentCollections.newQueue(); + var future = sendRequests(targetShards, false, (node, shardIds, aliasFilters, listener) -> { + sent.add(new NodeRequest(node, shardIds, aliasFilters)); + runWithDelay(() -> listener.onFailure(new CircuitBreakingException("cbe", randomFrom(Durability.values())), false)); + }); + expectThrows(CircuitBreakingException.class, equalTo("cbe"), future::actionGet); + assertThat(sent.size(), equalTo(1)); + } + static DataNodeRequestSender.TargetShard targetShard(ShardId shardId, DiscoveryNode... nodes) { return new DataNodeRequestSender.TargetShard(shardId, new ArrayList<>(Arrays.asList(nodes)), null); }