Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.query.QueryBuilders;
Expand All @@ -34,6 +35,12 @@
public class BulkProcessor2RetryIT extends ESIntegTestCase {
private static final String INDEX_NAME = "test";
Map<String, Integer> requestToExecutionCountMap = new ConcurrentHashMap<>();
/*
* We can't call ESIntegTestCase.client() from a transport thread because it winds up calling a blocking operation that trips an
* assertion error if you're doing it from the transport thread. So we stash a random client in this variable for use when we nned a
* client in a transport thread.
*/
private Client clientsForTransportThread;

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
Expand All @@ -58,14 +65,14 @@ public void testBulkRejectionLoadWithoutBackoff() throws Throwable {
// value = "org.elasticsearch.action.bulk.Retry2:trace",
// reason = "Logging information about locks useful for tracking down deadlock"
// )
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this TestLogging still useful? or should we remove it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I probably ought to remove it. I come here from time to time to grab this for use in other tests because I can never remember the syntax. :)

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/94941")
public void testBulkRejectionLoadWithBackoff() throws Throwable {
boolean rejectedExecutionExpected = false;
executeBulkRejectionLoad(8, rejectedExecutionExpected);
}

@SuppressWarnings("unchecked")
private void executeBulkRejectionLoad(int maxRetries, boolean rejectedExecutionExpected) throws Throwable {
clientsForTransportThread = client();
int numberOfAsyncOps = randomIntBetween(600, 700);
final CountDownLatch latch = new CountDownLatch(numberOfAsyncOps);
final Set<BulkResponse> successfulResponses = Collections.newSetFromMap(new ConcurrentHashMap<>());
Expand Down Expand Up @@ -110,6 +117,8 @@ public void afterBulk(long executionId, BulkRequest request, Exception failure)
}
rejectedAfterAllRetries = true;
}
} else if (failure.getStatus() == RestStatus.SERVICE_UNAVAILABLE) {
// The test framework throws this at us sometimes
} else {
throw new AssertionError("Unexpected failure status: " + failure.getStatus());
}
Expand All @@ -128,6 +137,8 @@ public void afterBulk(long executionId, BulkRequest request, Exception failure)
rejectedAfterAllRetries = true;
}
// ignored, we exceeded the write queue size when dispatching the initial bulk request
} else if (ExceptionsHelper.status(failureTuple.v2()) == RestStatus.SERVICE_UNAVAILABLE) {
// The test framework throws this at us sometimes
} else {
Throwable t = failureTuple.v2();
// we're not expecting any other errors
Expand Down Expand Up @@ -164,7 +175,7 @@ void countAndBulk(BulkRequest request, ActionListener<BulkResponse> listener) {
for (DocWriteRequest<?> docWriteRequest : request.requests) {
requestToExecutionCountMap.compute(docWriteRequest.id(), (key, value) -> value == null ? 1 : value + 1);
}
client().bulk(request, listener);
clientsForTransportThread.bulk(request, listener);
}

}