Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

package org.elasticsearch.xpack.search;

import org.apache.http.HttpEntity;
import org.apache.http.util.EntityUtils;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.core.config.Configurator;
import org.elasticsearch.client.Request;
Expand All @@ -16,7 +18,6 @@
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.ErrorTraceHelper;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.MockLog;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
Expand All @@ -31,7 +32,7 @@
reason = "testing debug log output to identify race condition",
value = "org.elasticsearch.xpack.search.MutableSearchResponse:DEBUG,org.elasticsearch.xpack.search.AsyncSearchTask:DEBUG"
)
public class AsyncSearchErrorTraceIT extends ESIntegTestCase {
public class AsyncSearchErrorTraceIT extends AsyncSearchIntegTestCase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed for the fix or is it an unrelated improvement?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn’t necessary for this test; I added it to keep it consistent with the other tests in the same package.


@Override
protected boolean addMockHttpTransport() {
Expand Down Expand Up @@ -77,10 +78,15 @@ public void testAsyncSearchFailingQueryErrorTraceDefault() throws Exception {
createAsyncRequest.addParameter("wait_for_completion_timeout", "0ms");
ErrorTraceHelper.expectStackTraceCleared(internalCluster());
Map<String, Object> createAsyncResponseEntity = performRequestAndGetResponseEntity(createAsyncRequest);
if (Boolean.TRUE.equals(createAsyncResponseEntity.get("is_running"))) {
String asyncExecutionId = (String) createAsyncResponseEntity.get("id");
Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId);
awaitAsyncRequestDoneRunning(getAsyncRequest);

try {
if (Boolean.TRUE.equals(createAsyncResponseEntity.get("is_running"))) {
String asyncExecutionId = (String) createAsyncResponseEntity.get("id");
Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId);
awaitAsyncRequestDoneRunning(getAsyncRequest);
}
} finally {
deleteAsyncSearchIfPresent(createAsyncResponseEntity);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense to me, I see a similar thing is done in CCSDuelIT for async searches.

Copy link
Contributor Author

@drempapis drempapis Nov 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, Ben, for the review. Yes, this is actually the most important part of this PR, ensuring that the entry (with id) in the index is deleted before the test reaches the “after test cleanup,” where the exception is thrown.

}
}

Expand All @@ -103,11 +109,16 @@ public void testAsyncSearchFailingQueryErrorTraceTrue() throws Exception {
createAsyncRequest.addParameter("wait_for_completion_timeout", "0ms");
ErrorTraceHelper.expectStackTraceObserved(internalCluster());
Map<String, Object> createAsyncResponseEntity = performRequestAndGetResponseEntity(createAsyncRequest);
if (Boolean.TRUE.equals(createAsyncResponseEntity.get("is_running"))) {
String asyncExecutionId = (String) createAsyncResponseEntity.get("id");
Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId);
getAsyncRequest.addParameter("error_trace", "true");
awaitAsyncRequestDoneRunning(getAsyncRequest);

try {
if (Boolean.TRUE.equals(createAsyncResponseEntity.get("is_running"))) {
String asyncExecutionId = (String) createAsyncResponseEntity.get("id");
Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId);
getAsyncRequest.addParameter("error_trace", "true");
awaitAsyncRequestDoneRunning(getAsyncRequest);
}
} finally {
deleteAsyncSearchIfPresent(createAsyncResponseEntity);
}
}

Expand All @@ -130,11 +141,16 @@ public void testAsyncSearchFailingQueryErrorTraceFalse() throws Exception {
createAsyncRequest.addParameter("wait_for_completion_timeout", "0ms");
ErrorTraceHelper.expectStackTraceCleared(internalCluster());
Map<String, Object> createAsyncResponseEntity = performRequestAndGetResponseEntity(createAsyncRequest);
if (Boolean.TRUE.equals(createAsyncResponseEntity.get("is_running"))) {
String asyncExecutionId = (String) createAsyncResponseEntity.get("id");
Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId);
getAsyncRequest.addParameter("error_trace", "false");
awaitAsyncRequestDoneRunning(getAsyncRequest);

try {
if (Boolean.TRUE.equals(createAsyncResponseEntity.get("is_running"))) {
String asyncExecutionId = (String) createAsyncResponseEntity.get("id");
Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId);
getAsyncRequest.addParameter("error_trace", "false");
awaitAsyncRequestDoneRunning(getAsyncRequest);
}
} finally {
deleteAsyncSearchIfPresent(createAsyncResponseEntity);
}
}

Expand Down Expand Up @@ -169,19 +185,24 @@ public void testDataNodeLogsStackTrace() throws Exception {
try (var mockLog = MockLog.capture(SearchService.class)) {
ErrorTraceHelper.addSeenLoggingExpectations(numShards, mockLog, errorTriggeringIndex);
Map<String, Object> createAsyncResponseEntity = performRequestAndGetResponseEntity(createAsyncRequest);
if (Boolean.TRUE.equals(createAsyncResponseEntity.get("is_running"))) {
String asyncExecutionId = (String) createAsyncResponseEntity.get("id");
Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId);
// Use the same value of error_trace as the search request
if (errorTraceValue == 0) {
getAsyncRequest.addParameter("error_trace", "true");
} else if (errorTraceValue == 1) {
getAsyncRequest.addParameter("error_trace", "false");
} // else empty
awaitAsyncRequestDoneRunning(getAsyncRequest);
}

mockLog.assertAllExpectationsMatched();
try {
if (Boolean.TRUE.equals(createAsyncResponseEntity.get("is_running"))) {
String asyncExecutionId = (String) createAsyncResponseEntity.get("id");
Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId);
// Use the same value of error_trace as the search request
if (errorTraceValue == 0) {
getAsyncRequest.addParameter("error_trace", "true");
} else if (errorTraceValue == 1) {
getAsyncRequest.addParameter("error_trace", "false");
} // else empty
awaitAsyncRequestDoneRunning(getAsyncRequest);
}

mockLog.assertAllExpectationsMatched();
} finally {
deleteAsyncSearchIfPresent(createAsyncResponseEntity);
}
}
}

Expand All @@ -204,11 +225,16 @@ public void testAsyncSearchFailingQueryErrorTraceFalseOnSubmitAndTrueOnGet() thr
createAsyncSearchRequest.addParameter("wait_for_completion_timeout", "0ms");
ErrorTraceHelper.expectStackTraceCleared(internalCluster());
Map<String, Object> createAsyncResponseEntity = performRequestAndGetResponseEntity(createAsyncSearchRequest);
if (Boolean.TRUE.equals(createAsyncResponseEntity.get("is_running"))) {
String asyncExecutionId = (String) createAsyncResponseEntity.get("id");
Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId);
getAsyncRequest.addParameter("error_trace", "true");
awaitAsyncRequestDoneRunning(getAsyncRequest);

try {
if (Boolean.TRUE.equals(createAsyncResponseEntity.get("is_running"))) {
String asyncExecutionId = (String) createAsyncResponseEntity.get("id");
Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId);
getAsyncRequest.addParameter("error_trace", "true");
awaitAsyncRequestDoneRunning(getAsyncRequest);
}
} finally {
deleteAsyncSearchIfPresent(createAsyncResponseEntity);
}
}

Expand All @@ -231,11 +257,15 @@ public void testAsyncSearchFailingQueryErrorTraceTrueOnSubmitAndFalseOnGet() thr
createAsyncSearchRequest.addParameter("wait_for_completion_timeout", "0ms");
ErrorTraceHelper.expectStackTraceObserved(internalCluster());
Map<String, Object> createAsyncResponseEntity = performRequestAndGetResponseEntity(createAsyncSearchRequest);
if (Boolean.TRUE.equals(createAsyncResponseEntity.get("is_running"))) {
String asyncExecutionId = (String) createAsyncResponseEntity.get("id");
Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId);
getAsyncRequest.addParameter("error_trace", "false");
awaitAsyncRequestDoneRunning(getAsyncRequest);
try {
if (Boolean.TRUE.equals(createAsyncResponseEntity.get("is_running"))) {
String asyncExecutionId = (String) createAsyncResponseEntity.get("id");
Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId);
getAsyncRequest.addParameter("error_trace", "false");
awaitAsyncRequestDoneRunning(getAsyncRequest);
}
} finally {
deleteAsyncSearchIfPresent(createAsyncResponseEntity);
}
}

Expand All @@ -245,6 +275,26 @@ private Map<String, Object> performRequestAndGetResponseEntity(Request r) throws
return XContentHelper.convertToMap(entityContentType.xContent(), response.getEntity().getContent(), false);
}

private void deleteAsyncSearchIfPresent(Map<String, Object> map) throws IOException {
String id = (String) map.get("id");
if (id == null) {
return;
}

// Make sure the .async-search system index is green before deleting it
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why ensure green?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the time we reach the cleanup phase, the .async-search shard may still be relocating or recovering, which is when shard-lock timeouts are most likely to occur during test teardown. To prevent this, we ensure that the .async-search system index is fully ready and stable before deleting the async search result.

try {
ensureGreen(".async-search");
} catch (Exception ignore) {
// the index may not exist
}

Response response = getRestClient().performRequest(new Request("DELETE", "/_async_search/" + id));
HttpEntity entity = response.getEntity();
if (entity != null) {
EntityUtils.consumeQuietly(entity);
}
}

private void awaitAsyncRequestDoneRunning(Request getAsyncRequest) throws Exception {
assertBusy(() -> {
Map<String, Object> getAsyncResponseEntity = performRequestAndGetResponseEntity(getAsyncRequest);
Expand Down