diff --git a/docs/changelog/135078.yaml b/docs/changelog/135078.yaml new file mode 100644 index 0000000000000..a68589e06946c --- /dev/null +++ b/docs/changelog/135078.yaml @@ -0,0 +1,6 @@ +pr: 135078 +summary: Fix async get results with inconsistent headers +area: ES|QL +type: bug +issues: + - 135042 diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java index 5e85c20b026ab..797ffd595a4e0 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java @@ -16,6 +16,7 @@ import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; +import org.elasticsearch.client.RestClient; import org.elasticsearch.client.WarningsHandler; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.io.Streams; @@ -41,6 +42,7 @@ import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStream; +import java.io.UncheckedIOException; import java.nio.charset.StandardCharsets; import java.time.ZoneId; import java.util.ArrayList; @@ -51,6 +53,8 @@ import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.function.IntFunction; import static java.util.Collections.emptySet; @@ -60,11 +64,11 @@ import static org.elasticsearch.test.MapMatcher.assertMap; import static org.elasticsearch.test.MapMatcher.matchesMap; import static org.elasticsearch.xpack.esql.EsqlTestUtils.as; -import static org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase.assertNotPartial; import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.Mode.ASYNC; import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.Mode.SYNC; import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.dateTimeToString; import static org.hamcrest.Matchers.any; +import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.emptyOrNullString; @@ -396,7 +400,9 @@ public void testCSVNoHeaderMode() throws IOException { options.addHeader("Content-Type", mediaType); options.addHeader("Accept", "text/csv; header=absent"); request.setOptions(options); - HttpEntity entity = performRequest(request, new AssertWarnings.NoWarnings()); + Response response = performRequest(request); + assertWarnings(response, new AssertWarnings.NoWarnings()); + HttpEntity entity = response.getEntity(); String actual = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)); assertEquals("keyword0,0\r\n", actual); } @@ -1258,7 +1264,10 @@ public static Map runEsql( var results = mode == ASYNC ? runEsqlAsync(requestObject, randomBoolean(), assertWarnings) : runEsqlSync(requestObject, assertWarnings); - return checkPartialResults ? assertNotPartial(results) : results; + if (checkPartialResults) { + assertNotPartial(results); + } + return results; } public static Map runEsql(RequestObjectBuilder requestObject, AssertWarnings assertWarnings, Mode mode) @@ -1269,8 +1278,17 @@ public static Map runEsql(RequestObjectBuilder requestObject, As public static Map runEsqlSync(RequestObjectBuilder requestObject, AssertWarnings assertWarnings) throws IOException { Request request = prepareRequestWithOptions(requestObject, SYNC); - HttpEntity entity = performRequest(request, assertWarnings); - return entityToMap(entity, requestObject.contentType()); + Response response = performRequest(request); + HttpEntity entity = response.getEntity(); + Map json = entityToMap(entity, requestObject.contentType()); + + var supportsAsyncHeadersFix = hasCapabilities(adminClient(), List.of("async_query_status_headers_fix")); + if (supportsAsyncHeadersFix) { + assertNoAsyncHeaders(response); + } + assertWarnings(response, assertWarnings); + + return json; } public static Map runEsqlAsync(RequestObjectBuilder requestObject, AssertWarnings assertWarnings) throws IOException { @@ -1298,17 +1316,18 @@ public static Map runEsqlAsync( checkKeepOnCompletion(requestObject, json, keepOnCompletion); String id = (String) json.get("id"); - var supportsAsyncHeaders = clusterHasCapability("POST", "/_query", List.of(), List.of("async_query_status_headers")).orElse(false); - var supportsSuggestedCast = clusterHasCapability("POST", "/_query", List.of(), List.of("suggested_cast")).orElse(false); + var supportsAsyncHeaders = hasCapabilities(adminClient(), List.of("async_query_status_headers_fix")); + var supportsSuggestedCast = hasCapabilities(adminClient(), List.of("suggested_cast")); + + // Check headers on initial query call + if (supportsAsyncHeaders) { + assertAsyncHeaders(response, id, (boolean) json.get("is_running")); + } if (id == null) { // no id returned from an async call, must have completed immediately and without keep_on_completion assertThat(requestObject.keepOnCompletion(), either(nullValue()).or(is(false))); assertThat((boolean) json.get("is_running"), is(false)); - if (supportsAsyncHeaders) { - assertThat(response.getHeader("X-Elasticsearch-Async-Id"), nullValue()); - assertThat(response.getHeader("X-Elasticsearch-Async-Is-Running"), is("?0")); - } assertWarnings(response, assertWarnings); json.remove("is_running"); // remove this to not mess up later map assertions return Collections.unmodifiableMap(json); @@ -1329,11 +1348,6 @@ public static Map runEsqlAsync( assertThat(json.get("pages"), nullValue()); } - if (supportsAsyncHeaders) { - assertThat(response.getHeader("X-Elasticsearch-Async-Id"), is(id)); - assertThat(response.getHeader("X-Elasticsearch-Async-Is-Running"), is(isRunning ? "?1" : "?0")); - } - // issue a second request to "async get" the results Request getRequest = prepareAsyncGetRequest(id); getRequest.setOptions(request.getOptions()); @@ -1343,6 +1357,11 @@ public static Map runEsqlAsync( var result = entityToMap(entity, requestObject.contentType()); + // Check headers on get call + if (supportsAsyncHeaders) { + assertAsyncHeaders(response, id, (boolean) result.get("is_running")); + } + // assert initial contents, if any, are the same as async get contents if (initialColumns != null) { if (supportsSuggestedCast == false) { @@ -1361,6 +1380,26 @@ public static Map runEsqlAsync( return removeAsyncProperties(result); } + record CapabilitesCacheKey(RestClient client, List capabilities) {} + + /** + * Cache of capabilities. + */ + private static final ConcurrentMap capabilities = new ConcurrentHashMap<>(); + + public static boolean hasCapabilities(RestClient client, List requiredCapabilities) { + if (requiredCapabilities.isEmpty()) { + return true; + } + return capabilities.computeIfAbsent(new CapabilitesCacheKey(client, requiredCapabilities), r -> { + try { + return clusterHasCapability(client, "POST", "/_query", List.of(), requiredCapabilities).orElse(false); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + } + private static Object removeOriginalTypesAndSuggestedCast(Object response) { if (response instanceof ArrayList columns) { var newColumns = new ArrayList<>(); @@ -1589,7 +1628,8 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma } Response response = performRequest(request); - HttpEntity entity = assertWarnings(response, new AssertWarnings.NoWarnings()); + assertWarnings(response, new AssertWarnings.NoWarnings()); + HttpEntity entity = response.getEntity(); // get the content, it could be empty because the request might have not completed String initialValue = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)); @@ -1642,7 +1682,8 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma // if `addParam` is false, `options` will already have an `Accept` header getRequest.setOptions(options); response = performRequest(getRequest); - entity = assertWarnings(response, new AssertWarnings.NoWarnings()); + assertWarnings(response, new AssertWarnings.NoWarnings()); + entity = response.getEntity(); } String newValue = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)); @@ -1681,10 +1722,6 @@ private static String attachBody(RequestObjectBuilder requestObject, Request req return mediaType; } - private static HttpEntity performRequest(Request request, AssertWarnings assertWarnings) throws IOException { - return assertWarnings(performRequest(request), assertWarnings); - } - protected static Response performRequest(Request request) throws IOException { Response response = client().performRequest(request); if (shouldLog()) { @@ -1695,14 +1732,19 @@ protected static Response performRequest(Request request) throws IOException { return response; } - private static HttpEntity assertWarnings(Response response, AssertWarnings assertWarnings) { + static void assertNotPartial(Map answer) { + var clusters = answer.get("_clusters"); + var reason = "unexpected partial results" + (clusters != null ? ": _clusters=" + clusters : ""); + assertThat(reason, answer.get("is_partial"), anyOf(nullValue(), is(false))); + } + + private static void assertWarnings(Response response, AssertWarnings assertWarnings) { List warnings = new ArrayList<>(response.getWarnings()); warnings.removeAll(mutedWarnings()); if (shouldLog()) { LOGGER.info("RESPONSE warnings (after muted)={}", warnings); } assertWarnings.assertWarnings(warnings); - return response.getEntity(); } private static Set mutedWarnings() { @@ -1813,6 +1855,16 @@ private static void createIndex(String indexName, boolean lookupMode, String map assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode()); } + private static void assertAsyncHeaders(Response response, @Nullable String asyncId, boolean isRunning) { + assertThat(response.getHeader("X-Elasticsearch-Async-Id"), asyncId == null ? nullValue() : equalTo(asyncId)); + assertThat(response.getHeader("X-Elasticsearch-Async-Is-Running"), isRunning ? is("?1") : is("?0")); + } + + private static void assertNoAsyncHeaders(Response response) { + assertThat(response.getHeader("X-Elasticsearch-Async-Id"), nullValue()); + assertThat(response.getHeader("X-Elasticsearch-Async-Is-Running"), nullValue()); + } + public static RequestObjectBuilder requestObjectBuilder() throws IOException { return new RequestObjectBuilder(); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index ecc41b147356f..b441854fe060c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -628,6 +628,11 @@ public enum Cap { */ ASYNC_QUERY_STATUS_HEADERS, + /** + * Fix async headers not being sent on "get" requests + */ + ASYNC_QUERY_STATUS_HEADERS_FIX, + /** * Consider the upper bound when computing the interval in BUCKET auto mode. */ diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java index 28f49b61942b3..93d6dc9f870cc 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java @@ -222,7 +222,7 @@ public boolean isRunning() { } public boolean isAsync() { - return isRunning; + return isAsync; } public boolean isPartial() { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlAsyncGetResultsAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlAsyncGetResultsAction.java index cc917dfa7a30c..44e9d8728d335 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlAsyncGetResultsAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlAsyncGetResultsAction.java @@ -23,6 +23,7 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.async.AsyncExecutionId; import org.elasticsearch.xpack.core.async.GetAsyncResultRequest; import org.elasticsearch.xpack.esql.VerificationException; import org.elasticsearch.xpack.esql.action.EsqlAsyncGetResultAction; @@ -35,6 +36,7 @@ public class TransportEsqlAsyncGetResultsAction extends AbstractTransportQlAsyncGetResultsAction { private final BlockFactory blockFactory; + private final ThreadPool threadPool; @Inject public TransportEsqlAsyncGetResultsAction( @@ -43,9 +45,9 @@ public TransportEsqlAsyncGetResultsAction( ClusterService clusterService, NamedWriteableRegistry registry, Client client, - ThreadPool threadPool, BigArrays bigArrays, - BlockFactoryProvider blockFactoryProvider + BlockFactoryProvider blockFactoryProvider, + ThreadPool threadPool ) { super( EsqlAsyncGetResultAction.NAME, @@ -59,11 +61,12 @@ public TransportEsqlAsyncGetResultsAction( EsqlQueryTask.class ); this.blockFactory = blockFactoryProvider.blockFactory(); + this.threadPool = threadPool; } @Override protected void doExecute(Task task, GetAsyncResultRequest request, ActionListener listener) { - super.doExecute(task, request, unwrapListener(listener)); + super.doExecute(task, request, unwrapListener(request.getId(), listener)); } @Override @@ -75,14 +78,21 @@ public Writeable.Reader responseReader() { static final String VERIFY_EX_NAME = ElasticsearchException.getExceptionName(new VerificationException("")); /** - * Unwraps the exception in the case of failure. This keeps the exception types - * the same as the sync API, namely ParsingException and VerificationException. + * Adds async headers, and unwraps the exception in the case of failure. + *

+ * This keeps the exception types the same as the sync API, namely ParsingException and VerificationException. + *

*/ - static ActionListener unwrapListener(ActionListener listener) { + ActionListener unwrapListener(String asyncExecutionId, ActionListener listener) { return new ActionListener<>() { @Override - public void onResponse(R o) { - listener.onResponse(o); + public void onResponse(EsqlQueryResponse response) { + boolean isRunning = response.isRunning(); + threadPool.getThreadContext() + .addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_IS_RUNNING_HEADER, isRunning ? "?1" : "?0"); + threadPool.getThreadContext().addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_ID_HEADER, asyncExecutionId); + + listener.onResponse(response); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index 560a577f8f640..ead368a51663f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -252,7 +252,20 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener { recordCCSTelemetry(task, executionInfo, request, null); planExecutor.metrics().recordTook(executionInfo.overallTook().millis()); - listener.onResponse(toResponse(task, request, configuration, result)); + var response = toResponse(task, request, configuration, result); + assert response.isAsync() == request.async() : "The response must be async if the request was async"; + + if (response.isAsync()) { + if (response.asyncExecutionId().isPresent()) { + String asyncExecutionId = response.asyncExecutionId().get(); + threadPool.getThreadContext().addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_ID_HEADER, asyncExecutionId); + } + boolean isRunning = response.isRunning(); + threadPool.getThreadContext() + .addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_IS_RUNNING_HEADER, isRunning ? "?1" : "?0"); + } + + listener.onResponse(response); }, ex -> { recordCCSTelemetry(task, executionInfo, request, ex); listener.onFailure(ex); @@ -338,10 +351,8 @@ private EsqlQueryResponse toResponse(Task task, EsqlQueryRequest request, Config EsqlQueryResponse.Profile profile = configuration.profile() ? new EsqlQueryResponse.Profile(result.completionInfo().driverProfiles(), result.completionInfo().planProfiles()) : null; - threadPool.getThreadContext().addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_IS_RUNNING_HEADER, "?0"); if (task instanceof EsqlQueryTask asyncTask && request.keepOnCompletion()) { String asyncExecutionId = asyncTask.getExecutionId().getEncoded(); - threadPool.getThreadContext().addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_ID_HEADER, asyncExecutionId); return new EsqlQueryResponse( columns, result.pages(),