From 2283ca1aa6c6f639d5d995adc36454be79b8bb2d Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Fri, 7 Mar 2025 10:53:08 +0100 Subject: [PATCH] Do not retry CBE --- .../esql/plugin/DataNodeRequestSender.java | 23 ++++++++++--------- .../plugin/DataNodeRequestSenderTests.java | 16 +++++++++++-- 2 files changed, 26 insertions(+), 13 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java index a5b8c13f9a730..57f4ba53be646 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java @@ -17,6 +17,7 @@ import org.elasticsearch.action.search.SearchShardsResponse; import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.compute.operator.DriverProfile; import org.elasticsearch.compute.operator.FailureCollector; @@ -211,18 +212,18 @@ private static Exception unwrapFailure(Exception e) { private void trackShardLevelFailure(ShardId shardId, boolean fatal, Exception originalEx) { final Exception e = unwrapFailure(originalEx); - // Retain only one meaningful exception and avoid suppressing previous failures to minimize memory usage, especially when handling - // many shards. + final boolean isTaskCanceledException = ExceptionsHelper.unwrap(e, TaskCancelledException.class) != null; + final boolean isCircuitBreakerException = ExceptionsHelper.unwrap(e, CircuitBreakingException.class) != null; shardFailures.compute(shardId, (k, current) -> { - boolean mergedFatal = fatal || ExceptionsHelper.unwrap(e, TaskCancelledException.class) != null; - if (current == null) { - return new ShardFailure(mergedFatal, e); - } - mergedFatal |= current.fatal; - if (e instanceof NoShardAvailableActionException || ExceptionsHelper.unwrap(e, TaskCancelledException.class) != null) { - return new ShardFailure(mergedFatal, current.failure); - } - return new ShardFailure(mergedFatal, e); + boolean mergedFatal = fatal || isTaskCanceledException || isCircuitBreakerException; + return current == null + ? new ShardFailure(mergedFatal, e) + : new ShardFailure( + mergedFatal || current.fatal, + // Retain only one meaningful exception and avoid suppressing previous failures to minimize memory usage, + // especially when handling many shards. + isTaskCanceledException || e instanceof NoShardAvailableActionException ? current.failure : e + ); }); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java index e34deb4d55f20..bf350998e701a 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java @@ -15,6 +15,8 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; +import org.elasticsearch.common.breaker.CircuitBreaker.Durability; +import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsExecutors; @@ -123,8 +125,7 @@ public void testMissingShards() { var future = sendRequests(targetShards, false, (node, shardIds, aliasFilters, listener) -> { fail("expect no data-node request is sent when target shards are missing"); }); - var error = expectThrows(NoShardAvailableActionException.class, future::actionGet); - assertThat(error.getMessage(), containsString("no shard copies found")); + expectThrows(NoShardAvailableActionException.class, containsString("no shard copies found"), future::actionGet); } { var targetShards = List.of(targetShard(shard1, node1), targetShard(shard3), targetShard(shard4, node2, node3)); @@ -244,6 +245,17 @@ public void testAllowPartialResults() { assertThat(resp.successfulShards, equalTo(1)); } + public void testDoNotRetryCircuitBreakerException() { + var targetShards = List.of(targetShard(shard1, node1, node2)); + var sent = ConcurrentCollections.newQueue(); + var future = sendRequests(targetShards, false, (node, shardIds, aliasFilters, listener) -> { + sent.add(new NodeRequest(node, shardIds, aliasFilters)); + runWithDelay(() -> listener.onFailure(new CircuitBreakingException("cbe", randomFrom(Durability.values())), false)); + }); + expectThrows(CircuitBreakingException.class, equalTo("cbe"), future::actionGet); + assertThat(sent.size(), equalTo(1)); + } + static DataNodeRequestSender.TargetShard targetShard(ShardId shardId, DiscoveryNode... nodes) { return new DataNodeRequestSender.TargetShard(shardId, new ArrayList<>(Arrays.asList(nodes)), null); }