diff --git a/docs/changelog/135078.yaml b/docs/changelog/135078.yaml new file mode 100644 index 0000000000000..a68589e06946c --- /dev/null +++ b/docs/changelog/135078.yaml @@ -0,0 +1,6 @@ +pr: 135078 +summary: Fix async get results with inconsistent headers +area: ES|QL +type: bug +issues: + - 135042 diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java index e13c20e2e7cc5..c5b9cf15137d3 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java @@ -16,6 +16,7 @@ import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; +import org.elasticsearch.client.RestClient; import org.elasticsearch.client.WarningsHandler; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.io.Streams; @@ -43,6 +44,7 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; +import java.io.UncheckedIOException; import java.nio.charset.StandardCharsets; import java.time.ZoneId; import java.util.ArrayList; @@ -53,6 +55,8 @@ import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.function.IntFunction; import static java.util.Collections.emptySet; @@ -65,6 +69,7 @@ import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.Mode.SYNC; import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.dateTimeToString; import static org.hamcrest.Matchers.any; +import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.emptyOrNullString; @@ -371,7 +376,9 @@ public void testCSVNoHeaderMode() throws IOException { options.addHeader("Content-Type", mediaType); options.addHeader("Accept", "text/csv; header=absent"); request.setOptions(options); - HttpEntity entity = performRequest(request, new AssertWarnings.NoWarnings()); + Response response = performRequest(request); + assertWarnings(response, new AssertWarnings.NoWarnings()); + HttpEntity entity = response.getEntity(); String actual = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)); assertEquals("keyword0,0\r\n", actual); } @@ -1053,8 +1060,17 @@ static Map runEsql(RequestObjectBuilder requestObject, AssertWar public static Map runEsqlSync(RequestObjectBuilder requestObject, AssertWarnings assertWarnings) throws IOException { Request request = prepareRequestWithOptions(requestObject, SYNC); - HttpEntity entity = performRequest(request, assertWarnings); - return entityToMap(entity, requestObject.contentType()); + Response response = performRequest(request); + HttpEntity entity = response.getEntity(); + Map json = entityToMap(entity, requestObject.contentType()); + + var supportsAsyncHeadersFix = hasCapabilities(adminClient(), List.of("async_query_status_headers_fix")); + if (supportsAsyncHeadersFix) { + assertNoAsyncHeaders(response); + } + assertWarnings(response, assertWarnings); + + return json; } public static Map runEsqlAsync(RequestObjectBuilder requestObject, AssertWarnings assertWarnings) throws IOException { @@ -1082,16 +1098,18 @@ public static Map runEsqlAsync( checkKeepOnCompletion(requestObject, json, keepOnCompletion); String id = (String) json.get("id"); - var supportsAsyncHeaders = clusterHasCapability("POST", "/_query", List.of(), List.of("async_query_status_headers")).orElse(false); + var supportsAsyncHeaders = hasCapabilities(adminClient(), List.of("async_query_status_headers_fix")); + var supportsSuggestedCast = hasCapabilities(adminClient(), List.of("suggested_cast")); + + // Check headers on initial query call + if (supportsAsyncHeaders) { + assertAsyncHeaders(response, id, (boolean) json.get("is_running")); + } if (id == null) { // no id returned from an async call, must have completed immediately and without keep_on_completion assertThat(requestObject.keepOnCompletion(), either(nullValue()).or(is(false))); assertThat((boolean) json.get("is_running"), is(false)); - if (supportsAsyncHeaders) { - assertThat(response.getHeader("X-Elasticsearch-Async-Id"), nullValue()); - assertThat(response.getHeader("X-Elasticsearch-Async-Is-Running"), is("?0")); - } assertWarnings(response, assertWarnings); json.remove("is_running"); // remove this to not mess up later map assertions return Collections.unmodifiableMap(json); @@ -1112,11 +1130,6 @@ public static Map runEsqlAsync( assertThat(json.get("pages"), nullValue()); } - if (supportsAsyncHeaders) { - assertThat(response.getHeader("X-Elasticsearch-Async-Id"), is(id)); - assertThat(response.getHeader("X-Elasticsearch-Async-Is-Running"), is(isRunning ? "?1" : "?0")); - } - // issue a second request to "async get" the results Request getRequest = prepareAsyncGetRequest(id); getRequest.setOptions(request.getOptions()); @@ -1126,9 +1139,21 @@ public static Map runEsqlAsync( var result = entityToMap(entity, requestObject.contentType()); + // Check headers on get call + if (supportsAsyncHeaders) { + assertAsyncHeaders(response, id, (boolean) result.get("is_running")); + } + // assert initial contents, if any, are the same as async get contents if (initialColumns != null) { - assertEquals(initialColumns, result.get("columns")); + if (supportsSuggestedCast == false) { + assertEquals( + removeOriginalTypesAndSuggestedCast(initialColumns), + removeOriginalTypesAndSuggestedCast(result.get("columns")) + ); + } else { + assertEquals(initialColumns, result.get("columns")); + } assertEquals(initialValues, result.get("values")); } @@ -1137,6 +1162,45 @@ public static Map runEsqlAsync( return removeAsyncProperties(result); } + record CapabilitesCacheKey(RestClient client, List capabilities) {} + + /** + * Cache of capabilities. + */ + private static final ConcurrentMap capabilities = new ConcurrentHashMap<>(); + + public static boolean hasCapabilities(RestClient client, List requiredCapabilities) { + if (requiredCapabilities.isEmpty()) { + return true; + } + return capabilities.computeIfAbsent(new CapabilitesCacheKey(client, requiredCapabilities), r -> { + try { + return clusterHasCapability(client, "POST", "/_query", List.of(), requiredCapabilities).orElse(false); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + } + + private static Object removeOriginalTypesAndSuggestedCast(Object response) { + if (response instanceof ArrayList columns) { + var newColumns = new ArrayList<>(); + for (var column : columns) { + if (column instanceof Map columnMap) { + var newMap = new HashMap<>(columnMap); + newMap.remove("original_types"); + newMap.remove("suggested_cast"); + newColumns.add(newMap); + } else { + newColumns.add(column); + } + } + return newColumns; + } else { + return response; + } + } + public void testAsyncGetWithoutContentType() throws IOException { int count = randomIntBetween(0, 100); bulkLoadTestData(count); @@ -1278,7 +1342,8 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma } Response response = performRequest(request); - HttpEntity entity = assertWarnings(response, new AssertWarnings.NoWarnings()); + assertWarnings(response, new AssertWarnings.NoWarnings()); + HttpEntity entity = response.getEntity(); // get the content, it could be empty because the request might have not completed String initialValue = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)); @@ -1331,7 +1396,8 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma // if `addParam` is false, `options` will already have an `Accept` header getRequest.setOptions(options); response = performRequest(getRequest); - entity = assertWarnings(response, new AssertWarnings.NoWarnings()); + assertWarnings(response, new AssertWarnings.NoWarnings()); + entity = response.getEntity(); } String newValue = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)); @@ -1345,21 +1411,18 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma } private static Request prepareRequest(Mode mode) { - Request request = new Request("POST", "/_query" + (mode == ASYNC ? "/async" : "")); - request.addParameter("error_trace", "true"); // Helps with debugging in case something crazy happens on the server. - request.addParameter("pretty", "true"); // Improves error reporting readability - return request; + return finishRequest(new Request("POST", "/_query" + (mode == ASYNC ? "/async" : ""))); } private static Request prepareAsyncGetRequest(String id) { - Request request = new Request("GET", "/_query/async/" + id + "?wait_for_completion_timeout=60s"); - request.addParameter("error_trace", "true"); // Helps with debugging in case something crazy happens on the server. - request.addParameter("pretty", "true"); // Improves error reporting readability - return request; + return finishRequest(new Request("GET", "/_query/async/" + id + "?wait_for_completion_timeout=6000s")); } private static Request prepareAsyncDeleteRequest(String id) { - Request request = new Request("DELETE", "/_query/async/" + id); + return finishRequest(new Request("DELETE", "/_query/async/" + id)); + } + + private static Request finishRequest(Request request) { request.addParameter("error_trace", "true"); // Helps with debugging in case something crazy happens on the server. request.addParameter("pretty", "true"); // Improves error reporting readability return request; @@ -1373,11 +1436,7 @@ private static String attachBody(RequestObjectBuilder requestObject, Request req return mediaType; } - private static HttpEntity performRequest(Request request, AssertWarnings assertWarnings) throws IOException { - return assertWarnings(performRequest(request), assertWarnings); - } - - private static Response performRequest(Request request) throws IOException { + protected static Response performRequest(Request request) throws IOException { Response response = client().performRequest(request); if (shouldLog()) { LOGGER.info("RESPONSE={}", response); @@ -1387,14 +1446,19 @@ private static Response performRequest(Request request) throws IOException { return response; } - private static HttpEntity assertWarnings(Response response, AssertWarnings assertWarnings) { + static void assertNotPartial(Map answer) { + var clusters = answer.get("_clusters"); + var reason = "unexpected partial results" + (clusters != null ? ": _clusters=" + clusters : ""); + assertThat(reason, answer.get("is_partial"), anyOf(nullValue(), is(false))); + } + + private static void assertWarnings(Response response, AssertWarnings assertWarnings) { List warnings = new ArrayList<>(response.getWarnings()); warnings.removeAll(mutedWarnings()); if (shouldLog()) { LOGGER.info("RESPONSE warnings (after muted)={}", warnings); } assertWarnings.assertWarnings(warnings); - return response.getEntity(); } private static Set mutedWarnings() { @@ -1505,6 +1569,16 @@ private static void createIndex(String indexName, boolean lookupMode, String map assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode()); } + private static void assertAsyncHeaders(Response response, @Nullable String asyncId, boolean isRunning) { + assertThat(response.getHeader("X-Elasticsearch-Async-Id"), asyncId == null ? nullValue() : equalTo(asyncId)); + assertThat(response.getHeader("X-Elasticsearch-Async-Is-Running"), isRunning ? is("?1") : is("?0")); + } + + private static void assertNoAsyncHeaders(Response response) { + assertThat(response.getHeader("X-Elasticsearch-Async-Id"), nullValue()); + assertThat(response.getHeader("X-Elasticsearch-Async-Is-Running"), nullValue()); + } + public static RequestObjectBuilder requestObjectBuilder() throws IOException { return new RequestObjectBuilder(); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index 9fb13a8096cb5..5856f715623d3 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -579,6 +579,11 @@ public enum Cap { */ ASYNC_QUERY_STATUS_HEADERS, + /** + * Fix async headers not being sent on "get" requests + */ + ASYNC_QUERY_STATUS_HEADERS_FIX, + /** * Consider the upper bound when computing the interval in BUCKET auto mode. */ diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java index 1a82bb9b2829d..9dae37060b5b3 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java @@ -178,7 +178,7 @@ public boolean isRunning() { } public boolean isAsync() { - return isRunning; + return isAsync; } public EsqlExecutionInfo getExecutionInfo() { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlAsyncGetResultsAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlAsyncGetResultsAction.java index 5658db0599186..b7dcb254f5d76 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlAsyncGetResultsAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlAsyncGetResultsAction.java @@ -22,6 +22,7 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.async.AsyncExecutionId; import org.elasticsearch.xpack.core.async.GetAsyncResultRequest; import org.elasticsearch.xpack.esql.VerificationException; import org.elasticsearch.xpack.esql.action.EsqlAsyncGetResultAction; @@ -34,6 +35,7 @@ public class TransportEsqlAsyncGetResultsAction extends AbstractTransportQlAsyncGetResultsAction { private final BlockFactory blockFactory; + private final ThreadPool threadPool; @Inject public TransportEsqlAsyncGetResultsAction( @@ -42,9 +44,9 @@ public TransportEsqlAsyncGetResultsAction( ClusterService clusterService, NamedWriteableRegistry registry, Client client, - ThreadPool threadPool, BigArrays bigArrays, - BlockFactory blockFactory + BlockFactory blockFactory, + ThreadPool threadPool ) { super( EsqlAsyncGetResultAction.NAME, @@ -58,11 +60,12 @@ public TransportEsqlAsyncGetResultsAction( EsqlQueryTask.class ); this.blockFactory = blockFactory; + this.threadPool = threadPool; } @Override protected void doExecute(Task task, GetAsyncResultRequest request, ActionListener listener) { - super.doExecute(task, request, unwrapListener(listener)); + super.doExecute(task, request, unwrapListener(request.getId(), listener)); } @Override @@ -74,14 +77,21 @@ public Writeable.Reader responseReader() { static final String VERIFY_EX_NAME = ElasticsearchException.getExceptionName(new VerificationException("")); /** - * Unwraps the exception in the case of failure. This keeps the exception types - * the same as the sync API, namely ParsingException and VerificationException. + * Adds async headers, and unwraps the exception in the case of failure. + *

+ * This keeps the exception types the same as the sync API, namely ParsingException and VerificationException. + *

*/ - static ActionListener unwrapListener(ActionListener listener) { + ActionListener unwrapListener(String asyncExecutionId, ActionListener listener) { return new ActionListener<>() { @Override - public void onResponse(R o) { - listener.onResponse(o); + public void onResponse(EsqlQueryResponse response) { + boolean isRunning = response.isRunning(); + threadPool.getThreadContext() + .addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_IS_RUNNING_HEADER, isRunning ? "?1" : "?0"); + threadPool.getThreadContext().addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_ID_HEADER, asyncExecutionId); + + listener.onResponse(response); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index d83239545c383..eb09bf2e7139b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -227,7 +227,20 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener { recordCCSTelemetry(task, executionInfo, request, null); - listener.onResponse(toResponse(task, request, configuration, result)); + var response = toResponse(task, request, configuration, result); + assert response.isAsync() == request.async() : "The response must be async if the request was async"; + + if (response.isAsync()) { + if (response.asyncExecutionId().isPresent()) { + String asyncExecutionId = response.asyncExecutionId().get(); + threadPool.getThreadContext().addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_ID_HEADER, asyncExecutionId); + } + boolean isRunning = response.isRunning(); + threadPool.getThreadContext() + .addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_IS_RUNNING_HEADER, isRunning ? "?1" : "?0"); + } + + listener.onResponse(response); }, ex -> { recordCCSTelemetry(task, executionInfo, request, ex); listener.onFailure(ex); @@ -301,10 +314,8 @@ private EsqlExecutionInfo createEsqlExecutionInfo(EsqlQueryRequest request) { private EsqlQueryResponse toResponse(Task task, EsqlQueryRequest request, Configuration configuration, Result result) { List columns = result.schema().stream().map(c -> new ColumnInfoImpl(c.name(), c.dataType().outputType())).toList(); EsqlQueryResponse.Profile profile = configuration.profile() ? new EsqlQueryResponse.Profile(result.profiles()) : null; - threadPool.getThreadContext().addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_IS_RUNNING_HEADER, "?0"); if (task instanceof EsqlQueryTask asyncTask && request.keepOnCompletion()) { String asyncExecutionId = asyncTask.getExecutionId().getEncoded(); - threadPool.getThreadContext().addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_ID_HEADER, asyncExecutionId); return new EsqlQueryResponse( columns, result.pages(),