diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkProcessor2RetryIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkProcessor2RetryIT.java index 205457b264bb3..caf2b8e73a2fe 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkProcessor2RetryIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkProcessor2RetryIT.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.query.QueryBuilders; @@ -34,6 +35,12 @@ public class BulkProcessor2RetryIT extends ESIntegTestCase { private static final String INDEX_NAME = "test"; Map requestToExecutionCountMap = new ConcurrentHashMap<>(); + /* + * We can't call ESIntegTestCase.client() from a transport thread because it winds up calling a blocking operation that trips an + * assertion error if you're doing it from the transport thread. So we stash a random client in this variable for use when we nned a + * client in a transport thread. + */ + private Client clientsForTransportThread; @Override protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { @@ -58,7 +65,6 @@ public void testBulkRejectionLoadWithoutBackoff() throws Throwable { // value = "org.elasticsearch.action.bulk.Retry2:trace", // reason = "Logging information about locks useful for tracking down deadlock" // ) - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/94941") public void testBulkRejectionLoadWithBackoff() throws Throwable { boolean rejectedExecutionExpected = false; executeBulkRejectionLoad(8, rejectedExecutionExpected); @@ -66,6 +72,7 @@ public void testBulkRejectionLoadWithBackoff() throws Throwable { @SuppressWarnings("unchecked") private void executeBulkRejectionLoad(int maxRetries, boolean rejectedExecutionExpected) throws Throwable { + clientsForTransportThread = client(); int numberOfAsyncOps = randomIntBetween(600, 700); final CountDownLatch latch = new CountDownLatch(numberOfAsyncOps); final Set successfulResponses = Collections.newSetFromMap(new ConcurrentHashMap<>()); @@ -110,6 +117,8 @@ public void afterBulk(long executionId, BulkRequest request, Exception failure) } rejectedAfterAllRetries = true; } + } else if (failure.getStatus() == RestStatus.SERVICE_UNAVAILABLE) { + // The test framework throws this at us sometimes } else { throw new AssertionError("Unexpected failure status: " + failure.getStatus()); } @@ -128,6 +137,8 @@ public void afterBulk(long executionId, BulkRequest request, Exception failure) rejectedAfterAllRetries = true; } // ignored, we exceeded the write queue size when dispatching the initial bulk request + } else if (ExceptionsHelper.status(failureTuple.v2()) == RestStatus.SERVICE_UNAVAILABLE) { + // The test framework throws this at us sometimes } else { Throwable t = failureTuple.v2(); // we're not expecting any other errors @@ -164,7 +175,7 @@ void countAndBulk(BulkRequest request, ActionListener listener) { for (DocWriteRequest docWriteRequest : request.requests) { requestToExecutionCountMap.compute(docWriteRequest.id(), (key, value) -> value == null ? 1 : value + 1); } - client().bulk(request, listener); + clientsForTransportThread.bulk(request, listener); } }