From 9683bb0a9d19c37027c02b85a61d20eb4b20adbd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?= Date: Tue, 23 Sep 2025 12:16:00 +0200 Subject: [PATCH 1/5] ESQL: Fix async query inconsistent headers (#135078) Fixes https://github.com/elastic/elasticsearch/issues/135042 This PR: - Fixes the get-result not always returning the expected headers - Fixes the non-async query incorrectly returning the "is running" async header --- docs/changelog/135078.yaml | 6 + .../xpack/esql/qa/rest/RestEsqlTestCase.java | 132 ++++++++++++------ .../xpack/esql/action/EsqlCapabilities.java | 5 + .../xpack/esql/action/EsqlQueryResponse.java | 2 +- .../TransportEsqlAsyncGetResultsAction.java | 26 ++-- .../esql/plugin/TransportEsqlQueryAction.java | 17 ++- 6 files changed, 134 insertions(+), 54 deletions(-) create mode 100644 docs/changelog/135078.yaml diff --git a/docs/changelog/135078.yaml b/docs/changelog/135078.yaml new file mode 100644 index 0000000000000..a68589e06946c --- /dev/null +++ b/docs/changelog/135078.yaml @@ -0,0 +1,6 @@ +pr: 135078 +summary: Fix async get results with inconsistent headers +area: ES|QL +type: bug +issues: + - 135042 diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java index 5e85c20b026ab..45284a89c0625 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java @@ -16,6 +16,7 @@ import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; +import org.elasticsearch.client.RestClient; import org.elasticsearch.client.WarningsHandler; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.io.Streams; @@ -41,6 +42,7 @@ import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStream; +import java.io.UncheckedIOException; import java.nio.charset.StandardCharsets; import java.time.ZoneId; import java.util.ArrayList; @@ -51,6 +53,8 @@ import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.function.IntFunction; import static java.util.Collections.emptySet; @@ -60,11 +64,11 @@ import static org.elasticsearch.test.MapMatcher.assertMap; import static org.elasticsearch.test.MapMatcher.matchesMap; import static org.elasticsearch.xpack.esql.EsqlTestUtils.as; -import static org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase.assertNotPartial; import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.Mode.ASYNC; import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.Mode.SYNC; import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.dateTimeToString; import static org.hamcrest.Matchers.any; +import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.emptyOrNullString; @@ -396,7 +400,9 @@ public void testCSVNoHeaderMode() throws IOException { options.addHeader("Content-Type", mediaType); options.addHeader("Accept", "text/csv; header=absent"); request.setOptions(options); - HttpEntity entity = performRequest(request, new AssertWarnings.NoWarnings()); + Response response = performRequest(request); + assertWarnings(response, new AssertWarnings.NoWarnings()); + HttpEntity entity = response.getEntity(); String actual = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)); assertEquals("keyword0,0\r\n", actual); } @@ -692,12 +698,12 @@ public void testNamedParamsForIdentifierAndIdentifierPatterns() throws IOExcepti bulkLoadTestData(10); // positive var query = requestObjectBuilder().query( - format( - null, - "from {} | eval x1 = ?n1 | where ?n2 == x1 | stats xx2 = ?fn1(?n3) by ?n4 | keep ?n4, ?n5 | sort ?n4", - testIndexName() + format( + null, + "from {} | eval x1 = ?n1 | where ?n2 == x1 | stats xx2 = ?fn1(?n3) by ?n4 | keep ?n4, ?n5 | sort ?n4", + testIndexName() + ) ) - ) .params( "[{\"n1\" : {\"identifier\" : \"integer\"}}, {\"n2\" : {\"identifier\" : \"short\"}}, " + "{\"n3\" : {\"identifier\" : \"double\"}}, {\"n4\" : {\"identifier\" : \"boolean\"}}, " @@ -832,12 +838,12 @@ public void testDoubleParamsForIdentifiers() throws IOException { // positive // named double parameters var query = requestObjectBuilder().query( - format( - null, - "from {} | eval x1 = ??n1 | where ??n2 == x1 | stats xx2 = ??fn1(??n3) by ??n4 | keep ??n4, ??n5 | sort ??n4", - testIndexName() + format( + null, + "from {} | eval x1 = ??n1 | where ??n2 == x1 | stats xx2 = ??fn1(??n3) by ??n4 | keep ??n4, ??n5 | sort ??n4", + testIndexName() + ) ) - ) .params( "[{\"n1\" : \"integer\"}, {\"n2\" : \"short\"}, {\"n3\" : \"double\"}, {\"n4\" : \"boolean\"}, " + "{\"n5\" : \"xx2\"}, {\"fn1\" : \"max\"}]" @@ -846,12 +852,12 @@ public void testDoubleParamsForIdentifiers() throws IOException { // positional double parameters query = requestObjectBuilder().query( - format( - null, - "from {} | eval x1 = ??1 | where ??2 == x1 | stats xx2 = ??6(??3) by ??4 | keep ??4, ??5 | sort ??4", - testIndexName() + format( + null, + "from {} | eval x1 = ??1 | where ??2 == x1 | stats xx2 = ??6(??3) by ??4 | keep ??4, ??5 | sort ??4", + testIndexName() + ) ) - ) .params( "[{\"n1\" : \"integer\"}, {\"n2\" : \"short\"}, {\"n3\" : \"double\"}, {\"n4\" : \"boolean\"}, " + "{\"n5\" : \"xx2\"}, {\"fn1\" : \"max\"}]" @@ -869,8 +875,8 @@ public void testDoubleParamsForIdentifiers() throws IOException { // anonymous double parameters query = requestObjectBuilder().query( - format(null, "from {} | eval x1 = ?? | where ?? == x1 | stats xx2 = ??(??) by ?? | keep ??, ?? | sort ??", testIndexName()) - ) + format(null, "from {} | eval x1 = ?? | where ?? == x1 | stats xx2 = ??(??) by ?? | keep ??, ?? | sort ??", testIndexName()) + ) .params( "[{\"n1\" : \"integer\"}, {\"n2\" : \"short\"}, {\"fn1\" : \"max\"}, {\"n3\" : \"double\"}, {\"n4\" : \"boolean\"}, " + "{\"n4\" : \"boolean\"}, {\"n5\" : \"xx2\"}, {\"n4\" : \"boolean\"}]" @@ -1091,7 +1097,7 @@ public void testComplexFieldNames() throws IOException { } /** - * INLINESTATS can group on {@code NOW()}. It's a little silly, but + * INLINE STATS can group on {@code NOW()}. It's a little silly, but * doing something like {@code DATE_TRUNC(1 YEAR, NOW() - 1970-01-01T00:00:00Z)} is * much more sensible. But just grouping on {@code NOW()} is enough to test this. *

@@ -1101,11 +1107,11 @@ public void testComplexFieldNames() throws IOException { */ @AwaitsFix(bugUrl = "Disabled temporarily until JOIN implementation is completed") public void testInlineStatsNow() throws IOException { - assumeTrue("INLINESTATS only available on snapshots", Build.current().isSnapshot()); + assumeTrue("INLINE STATS only available on snapshots", Build.current().isSnapshot()); indexTimestampData(1); RequestObjectBuilder builder = requestObjectBuilder().query( - fromIndex() + " | EVAL now=NOW() | INLINESTATS AVG(value) BY now | SORT value ASC" + fromIndex() + " | EVAL now=NOW() | INLINE STATS AVG(value) BY now | SORT value ASC" ); Map result = runEsql(builder); ListMatcher values = matchesList(); @@ -1115,8 +1121,8 @@ public void testInlineStatsNow() throws IOException { .item("value" + i) .item("value" + i) .item(i) - .item(any(String.class)) .item(499.5) + .item(any(String.class)) ); } assertResultMap( @@ -1125,8 +1131,8 @@ public void testInlineStatsNow() throws IOException { .item(matchesMap().entry("name", "test").entry("type", "text")) .item(matchesMap().entry("name", "test.keyword").entry("type", "keyword")) .item(matchesMap().entry("name", "value").entry("type", "long")) - .item(matchesMap().entry("name", "now").entry("type", "date")) - .item(matchesMap().entry("name", "AVG(value)").entry("type", "double")), + .item(matchesMap().entry("name", "AVG(value)").entry("type", "double")) + .item(matchesMap().entry("name", "now").entry("type", "date")), values ); } @@ -1258,22 +1264,40 @@ public static Map runEsql( var results = mode == ASYNC ? runEsqlAsync(requestObject, randomBoolean(), assertWarnings) : runEsqlSync(requestObject, assertWarnings); - return checkPartialResults ? assertNotPartial(results) : results; + if (checkPartialResults) { + assertNotPartial(results); + } + return results; } - public static Map runEsql(RequestObjectBuilder requestObject, AssertWarnings assertWarnings, Mode mode) - throws IOException { + public static Map runEsql( + RequestObjectBuilder requestObject, + AssertWarnings assertWarnings, + Mode mode + ) throws IOException { return runEsql(requestObject, assertWarnings, mode, true); } - public static Map runEsqlSync(RequestObjectBuilder requestObject, AssertWarnings assertWarnings) throws IOException { + public static Map runEsqlSync( + RequestObjectBuilder requestObject, + AssertWarnings assertWarnings + ) throws IOException { + Boolean profileEnabled = requestObject.profile; Request request = prepareRequestWithOptions(requestObject, SYNC); - HttpEntity entity = performRequest(request, assertWarnings); - return entityToMap(entity, requestObject.contentType()); + Response response = performRequest(request); + HttpEntity entity = response.getEntity(); + Map json = entityToMap(entity, requestObject.contentType()); + + assertWarnings(response, assertWarnings); + + return json; } - public static Map runEsqlAsync(RequestObjectBuilder requestObject, AssertWarnings assertWarnings) throws IOException { + public static Map runEsqlAsync( + RequestObjectBuilder requestObject, + AssertWarnings assertWarnings + ) throws IOException { return runEsqlAsync(requestObject, randomBoolean(), assertWarnings); } @@ -1282,6 +1306,7 @@ public static Map runEsqlAsync( boolean keepOnCompletion, AssertWarnings assertWarnings ) throws IOException { + Boolean profileEnabled = requestObject.profile; addAsyncParameters(requestObject, keepOnCompletion); Request request = prepareRequestWithOptions(requestObject, ASYNC); @@ -1298,8 +1323,8 @@ public static Map runEsqlAsync( checkKeepOnCompletion(requestObject, json, keepOnCompletion); String id = (String) json.get("id"); - var supportsAsyncHeaders = clusterHasCapability("POST", "/_query", List.of(), List.of("async_query_status_headers")).orElse(false); - var supportsSuggestedCast = clusterHasCapability("POST", "/_query", List.of(), List.of("suggested_cast")).orElse(false); + var supportsAsyncHeaders = hasCapabilities(adminClient(), List.of("async_query_status_headers")); + var supportsSuggestedCast = hasCapabilities(adminClient(), List.of("suggested_cast")); if (id == null) { // no id returned from an async call, must have completed immediately and without keep_on_completion @@ -1361,6 +1386,26 @@ public static Map runEsqlAsync( return removeAsyncProperties(result); } + record CapabilitesCacheKey(RestClient client, List capabilities) {} + + /** + * Cache of capabilities. + */ + private static final ConcurrentMap capabilities = new ConcurrentHashMap<>(); + + public static boolean hasCapabilities(RestClient client, List requiredCapabilities) { + if (requiredCapabilities.isEmpty()) { + return true; + } + return capabilities.computeIfAbsent(new CapabilitesCacheKey(client, requiredCapabilities), r -> { + try { + return clusterHasCapability(client, "POST", "/_query", List.of(), requiredCapabilities).orElse(false); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + } + private static Object removeOriginalTypesAndSuggestedCast(Object response) { if (response instanceof ArrayList columns) { var newColumns = new ArrayList<>(); @@ -1589,7 +1634,8 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma } Response response = performRequest(request); - HttpEntity entity = assertWarnings(response, new AssertWarnings.NoWarnings()); + assertWarnings(response, new AssertWarnings.NoWarnings()); + HttpEntity entity = response.getEntity(); // get the content, it could be empty because the request might have not completed String initialValue = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)); @@ -1642,7 +1688,8 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma // if `addParam` is false, `options` will already have an `Accept` header getRequest.setOptions(options); response = performRequest(getRequest); - entity = assertWarnings(response, new AssertWarnings.NoWarnings()); + assertWarnings(response, new AssertWarnings.NoWarnings()); + entity = response.getEntity(); } String newValue = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)); @@ -1681,10 +1728,6 @@ private static String attachBody(RequestObjectBuilder requestObject, Request req return mediaType; } - private static HttpEntity performRequest(Request request, AssertWarnings assertWarnings) throws IOException { - return assertWarnings(performRequest(request), assertWarnings); - } - protected static Response performRequest(Request request) throws IOException { Response response = client().performRequest(request); if (shouldLog()) { @@ -1695,14 +1738,19 @@ protected static Response performRequest(Request request) throws IOException { return response; } - private static HttpEntity assertWarnings(Response response, AssertWarnings assertWarnings) { + static void assertNotPartial(Map answer) { + var clusters = answer.get("_clusters"); + var reason = "unexpected partial results" + (clusters != null ? ": _clusters=" + clusters : ""); + assertThat(reason, answer.get("is_partial"), anyOf(nullValue(), is(false))); + } + + private static void assertWarnings(Response response, AssertWarnings assertWarnings) { List warnings = new ArrayList<>(response.getWarnings()); warnings.removeAll(mutedWarnings()); if (shouldLog()) { LOGGER.info("RESPONSE warnings (after muted)={}", warnings); } assertWarnings.assertWarnings(warnings); - return response.getEntity(); } private static Set mutedWarnings() { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index ecc41b147356f..b441854fe060c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -628,6 +628,11 @@ public enum Cap { */ ASYNC_QUERY_STATUS_HEADERS, + /** + * Fix async headers not being sent on "get" requests + */ + ASYNC_QUERY_STATUS_HEADERS_FIX, + /** * Consider the upper bound when computing the interval in BUCKET auto mode. */ diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java index 4afb1418b2585..1275bbe91cf70 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java @@ -221,7 +221,7 @@ public boolean isRunning() { } public boolean isAsync() { - return isRunning; + return isAsync; } public boolean isPartial() { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlAsyncGetResultsAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlAsyncGetResultsAction.java index cc917dfa7a30c..44e9d8728d335 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlAsyncGetResultsAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlAsyncGetResultsAction.java @@ -23,6 +23,7 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.async.AsyncExecutionId; import org.elasticsearch.xpack.core.async.GetAsyncResultRequest; import org.elasticsearch.xpack.esql.VerificationException; import org.elasticsearch.xpack.esql.action.EsqlAsyncGetResultAction; @@ -35,6 +36,7 @@ public class TransportEsqlAsyncGetResultsAction extends AbstractTransportQlAsyncGetResultsAction { private final BlockFactory blockFactory; + private final ThreadPool threadPool; @Inject public TransportEsqlAsyncGetResultsAction( @@ -43,9 +45,9 @@ public TransportEsqlAsyncGetResultsAction( ClusterService clusterService, NamedWriteableRegistry registry, Client client, - ThreadPool threadPool, BigArrays bigArrays, - BlockFactoryProvider blockFactoryProvider + BlockFactoryProvider blockFactoryProvider, + ThreadPool threadPool ) { super( EsqlAsyncGetResultAction.NAME, @@ -59,11 +61,12 @@ public TransportEsqlAsyncGetResultsAction( EsqlQueryTask.class ); this.blockFactory = blockFactoryProvider.blockFactory(); + this.threadPool = threadPool; } @Override protected void doExecute(Task task, GetAsyncResultRequest request, ActionListener listener) { - super.doExecute(task, request, unwrapListener(listener)); + super.doExecute(task, request, unwrapListener(request.getId(), listener)); } @Override @@ -75,14 +78,21 @@ public Writeable.Reader responseReader() { static final String VERIFY_EX_NAME = ElasticsearchException.getExceptionName(new VerificationException("")); /** - * Unwraps the exception in the case of failure. This keeps the exception types - * the same as the sync API, namely ParsingException and VerificationException. + * Adds async headers, and unwraps the exception in the case of failure. + *

+ * This keeps the exception types the same as the sync API, namely ParsingException and VerificationException. + *

*/ - static ActionListener unwrapListener(ActionListener listener) { + ActionListener unwrapListener(String asyncExecutionId, ActionListener listener) { return new ActionListener<>() { @Override - public void onResponse(R o) { - listener.onResponse(o); + public void onResponse(EsqlQueryResponse response) { + boolean isRunning = response.isRunning(); + threadPool.getThreadContext() + .addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_IS_RUNNING_HEADER, isRunning ? "?1" : "?0"); + threadPool.getThreadContext().addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_ID_HEADER, asyncExecutionId); + + listener.onResponse(response); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index 560a577f8f640..ead368a51663f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -252,7 +252,20 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener { recordCCSTelemetry(task, executionInfo, request, null); planExecutor.metrics().recordTook(executionInfo.overallTook().millis()); - listener.onResponse(toResponse(task, request, configuration, result)); + var response = toResponse(task, request, configuration, result); + assert response.isAsync() == request.async() : "The response must be async if the request was async"; + + if (response.isAsync()) { + if (response.asyncExecutionId().isPresent()) { + String asyncExecutionId = response.asyncExecutionId().get(); + threadPool.getThreadContext().addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_ID_HEADER, asyncExecutionId); + } + boolean isRunning = response.isRunning(); + threadPool.getThreadContext() + .addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_IS_RUNNING_HEADER, isRunning ? "?1" : "?0"); + } + + listener.onResponse(response); }, ex -> { recordCCSTelemetry(task, executionInfo, request, ex); listener.onFailure(ex); @@ -338,10 +351,8 @@ private EsqlQueryResponse toResponse(Task task, EsqlQueryRequest request, Config EsqlQueryResponse.Profile profile = configuration.profile() ? new EsqlQueryResponse.Profile(result.completionInfo().driverProfiles(), result.completionInfo().planProfiles()) : null; - threadPool.getThreadContext().addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_IS_RUNNING_HEADER, "?0"); if (task instanceof EsqlQueryTask asyncTask && request.keepOnCompletion()) { String asyncExecutionId = asyncTask.getExecutionId().getEncoded(); - threadPool.getThreadContext().addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_ID_HEADER, asyncExecutionId); return new EsqlQueryResponse( columns, result.pages(), From 2db4c9da0920db8d6f7f3922e782958c6550d26c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?= Date: Tue, 23 Sep 2025 14:34:54 +0200 Subject: [PATCH 2/5] Revert rest tests file --- .../xpack/esql/qa/rest/RestEsqlTestCase.java | 132 ++++++------------ 1 file changed, 42 insertions(+), 90 deletions(-) diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java index 45284a89c0625..5e85c20b026ab 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java @@ -16,7 +16,6 @@ import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; -import org.elasticsearch.client.RestClient; import org.elasticsearch.client.WarningsHandler; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.io.Streams; @@ -42,7 +41,6 @@ import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStream; -import java.io.UncheckedIOException; import java.nio.charset.StandardCharsets; import java.time.ZoneId; import java.util.ArrayList; @@ -53,8 +51,6 @@ import java.util.Locale; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.function.IntFunction; import static java.util.Collections.emptySet; @@ -64,11 +60,11 @@ import static org.elasticsearch.test.MapMatcher.assertMap; import static org.elasticsearch.test.MapMatcher.matchesMap; import static org.elasticsearch.xpack.esql.EsqlTestUtils.as; +import static org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase.assertNotPartial; import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.Mode.ASYNC; import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.Mode.SYNC; import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.dateTimeToString; import static org.hamcrest.Matchers.any; -import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.emptyOrNullString; @@ -400,9 +396,7 @@ public void testCSVNoHeaderMode() throws IOException { options.addHeader("Content-Type", mediaType); options.addHeader("Accept", "text/csv; header=absent"); request.setOptions(options); - Response response = performRequest(request); - assertWarnings(response, new AssertWarnings.NoWarnings()); - HttpEntity entity = response.getEntity(); + HttpEntity entity = performRequest(request, new AssertWarnings.NoWarnings()); String actual = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)); assertEquals("keyword0,0\r\n", actual); } @@ -698,12 +692,12 @@ public void testNamedParamsForIdentifierAndIdentifierPatterns() throws IOExcepti bulkLoadTestData(10); // positive var query = requestObjectBuilder().query( - format( - null, - "from {} | eval x1 = ?n1 | where ?n2 == x1 | stats xx2 = ?fn1(?n3) by ?n4 | keep ?n4, ?n5 | sort ?n4", - testIndexName() - ) + format( + null, + "from {} | eval x1 = ?n1 | where ?n2 == x1 | stats xx2 = ?fn1(?n3) by ?n4 | keep ?n4, ?n5 | sort ?n4", + testIndexName() ) + ) .params( "[{\"n1\" : {\"identifier\" : \"integer\"}}, {\"n2\" : {\"identifier\" : \"short\"}}, " + "{\"n3\" : {\"identifier\" : \"double\"}}, {\"n4\" : {\"identifier\" : \"boolean\"}}, " @@ -838,12 +832,12 @@ public void testDoubleParamsForIdentifiers() throws IOException { // positive // named double parameters var query = requestObjectBuilder().query( - format( - null, - "from {} | eval x1 = ??n1 | where ??n2 == x1 | stats xx2 = ??fn1(??n3) by ??n4 | keep ??n4, ??n5 | sort ??n4", - testIndexName() - ) + format( + null, + "from {} | eval x1 = ??n1 | where ??n2 == x1 | stats xx2 = ??fn1(??n3) by ??n4 | keep ??n4, ??n5 | sort ??n4", + testIndexName() ) + ) .params( "[{\"n1\" : \"integer\"}, {\"n2\" : \"short\"}, {\"n3\" : \"double\"}, {\"n4\" : \"boolean\"}, " + "{\"n5\" : \"xx2\"}, {\"fn1\" : \"max\"}]" @@ -852,12 +846,12 @@ public void testDoubleParamsForIdentifiers() throws IOException { // positional double parameters query = requestObjectBuilder().query( - format( - null, - "from {} | eval x1 = ??1 | where ??2 == x1 | stats xx2 = ??6(??3) by ??4 | keep ??4, ??5 | sort ??4", - testIndexName() - ) + format( + null, + "from {} | eval x1 = ??1 | where ??2 == x1 | stats xx2 = ??6(??3) by ??4 | keep ??4, ??5 | sort ??4", + testIndexName() ) + ) .params( "[{\"n1\" : \"integer\"}, {\"n2\" : \"short\"}, {\"n3\" : \"double\"}, {\"n4\" : \"boolean\"}, " + "{\"n5\" : \"xx2\"}, {\"fn1\" : \"max\"}]" @@ -875,8 +869,8 @@ public void testDoubleParamsForIdentifiers() throws IOException { // anonymous double parameters query = requestObjectBuilder().query( - format(null, "from {} | eval x1 = ?? | where ?? == x1 | stats xx2 = ??(??) by ?? | keep ??, ?? | sort ??", testIndexName()) - ) + format(null, "from {} | eval x1 = ?? | where ?? == x1 | stats xx2 = ??(??) by ?? | keep ??, ?? | sort ??", testIndexName()) + ) .params( "[{\"n1\" : \"integer\"}, {\"n2\" : \"short\"}, {\"fn1\" : \"max\"}, {\"n3\" : \"double\"}, {\"n4\" : \"boolean\"}, " + "{\"n4\" : \"boolean\"}, {\"n5\" : \"xx2\"}, {\"n4\" : \"boolean\"}]" @@ -1097,7 +1091,7 @@ public void testComplexFieldNames() throws IOException { } /** - * INLINE STATS can group on {@code NOW()}. It's a little silly, but + * INLINESTATS can group on {@code NOW()}. It's a little silly, but * doing something like {@code DATE_TRUNC(1 YEAR, NOW() - 1970-01-01T00:00:00Z)} is * much more sensible. But just grouping on {@code NOW()} is enough to test this. *

@@ -1107,11 +1101,11 @@ public void testComplexFieldNames() throws IOException { */ @AwaitsFix(bugUrl = "Disabled temporarily until JOIN implementation is completed") public void testInlineStatsNow() throws IOException { - assumeTrue("INLINE STATS only available on snapshots", Build.current().isSnapshot()); + assumeTrue("INLINESTATS only available on snapshots", Build.current().isSnapshot()); indexTimestampData(1); RequestObjectBuilder builder = requestObjectBuilder().query( - fromIndex() + " | EVAL now=NOW() | INLINE STATS AVG(value) BY now | SORT value ASC" + fromIndex() + " | EVAL now=NOW() | INLINESTATS AVG(value) BY now | SORT value ASC" ); Map result = runEsql(builder); ListMatcher values = matchesList(); @@ -1121,8 +1115,8 @@ public void testInlineStatsNow() throws IOException { .item("value" + i) .item("value" + i) .item(i) - .item(499.5) .item(any(String.class)) + .item(499.5) ); } assertResultMap( @@ -1131,8 +1125,8 @@ public void testInlineStatsNow() throws IOException { .item(matchesMap().entry("name", "test").entry("type", "text")) .item(matchesMap().entry("name", "test.keyword").entry("type", "keyword")) .item(matchesMap().entry("name", "value").entry("type", "long")) - .item(matchesMap().entry("name", "AVG(value)").entry("type", "double")) - .item(matchesMap().entry("name", "now").entry("type", "date")), + .item(matchesMap().entry("name", "now").entry("type", "date")) + .item(matchesMap().entry("name", "AVG(value)").entry("type", "double")), values ); } @@ -1264,40 +1258,22 @@ public static Map runEsql( var results = mode == ASYNC ? runEsqlAsync(requestObject, randomBoolean(), assertWarnings) : runEsqlSync(requestObject, assertWarnings); - if (checkPartialResults) { - assertNotPartial(results); - } - return results; + return checkPartialResults ? assertNotPartial(results) : results; } - public static Map runEsql( - RequestObjectBuilder requestObject, - AssertWarnings assertWarnings, - Mode mode - ) throws IOException { + public static Map runEsql(RequestObjectBuilder requestObject, AssertWarnings assertWarnings, Mode mode) + throws IOException { return runEsql(requestObject, assertWarnings, mode, true); } - public static Map runEsqlSync( - RequestObjectBuilder requestObject, - AssertWarnings assertWarnings - ) throws IOException { - Boolean profileEnabled = requestObject.profile; + public static Map runEsqlSync(RequestObjectBuilder requestObject, AssertWarnings assertWarnings) throws IOException { Request request = prepareRequestWithOptions(requestObject, SYNC); - Response response = performRequest(request); - HttpEntity entity = response.getEntity(); - Map json = entityToMap(entity, requestObject.contentType()); - - assertWarnings(response, assertWarnings); - - return json; + HttpEntity entity = performRequest(request, assertWarnings); + return entityToMap(entity, requestObject.contentType()); } - public static Map runEsqlAsync( - RequestObjectBuilder requestObject, - AssertWarnings assertWarnings - ) throws IOException { + public static Map runEsqlAsync(RequestObjectBuilder requestObject, AssertWarnings assertWarnings) throws IOException { return runEsqlAsync(requestObject, randomBoolean(), assertWarnings); } @@ -1306,7 +1282,6 @@ public static Map runEsqlAsync( boolean keepOnCompletion, AssertWarnings assertWarnings ) throws IOException { - Boolean profileEnabled = requestObject.profile; addAsyncParameters(requestObject, keepOnCompletion); Request request = prepareRequestWithOptions(requestObject, ASYNC); @@ -1323,8 +1298,8 @@ public static Map runEsqlAsync( checkKeepOnCompletion(requestObject, json, keepOnCompletion); String id = (String) json.get("id"); - var supportsAsyncHeaders = hasCapabilities(adminClient(), List.of("async_query_status_headers")); - var supportsSuggestedCast = hasCapabilities(adminClient(), List.of("suggested_cast")); + var supportsAsyncHeaders = clusterHasCapability("POST", "/_query", List.of(), List.of("async_query_status_headers")).orElse(false); + var supportsSuggestedCast = clusterHasCapability("POST", "/_query", List.of(), List.of("suggested_cast")).orElse(false); if (id == null) { // no id returned from an async call, must have completed immediately and without keep_on_completion @@ -1386,26 +1361,6 @@ public static Map runEsqlAsync( return removeAsyncProperties(result); } - record CapabilitesCacheKey(RestClient client, List capabilities) {} - - /** - * Cache of capabilities. - */ - private static final ConcurrentMap capabilities = new ConcurrentHashMap<>(); - - public static boolean hasCapabilities(RestClient client, List requiredCapabilities) { - if (requiredCapabilities.isEmpty()) { - return true; - } - return capabilities.computeIfAbsent(new CapabilitesCacheKey(client, requiredCapabilities), r -> { - try { - return clusterHasCapability(client, "POST", "/_query", List.of(), requiredCapabilities).orElse(false); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - }); - } - private static Object removeOriginalTypesAndSuggestedCast(Object response) { if (response instanceof ArrayList columns) { var newColumns = new ArrayList<>(); @@ -1634,8 +1589,7 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma } Response response = performRequest(request); - assertWarnings(response, new AssertWarnings.NoWarnings()); - HttpEntity entity = response.getEntity(); + HttpEntity entity = assertWarnings(response, new AssertWarnings.NoWarnings()); // get the content, it could be empty because the request might have not completed String initialValue = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)); @@ -1688,8 +1642,7 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma // if `addParam` is false, `options` will already have an `Accept` header getRequest.setOptions(options); response = performRequest(getRequest); - assertWarnings(response, new AssertWarnings.NoWarnings()); - entity = response.getEntity(); + entity = assertWarnings(response, new AssertWarnings.NoWarnings()); } String newValue = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)); @@ -1728,6 +1681,10 @@ private static String attachBody(RequestObjectBuilder requestObject, Request req return mediaType; } + private static HttpEntity performRequest(Request request, AssertWarnings assertWarnings) throws IOException { + return assertWarnings(performRequest(request), assertWarnings); + } + protected static Response performRequest(Request request) throws IOException { Response response = client().performRequest(request); if (shouldLog()) { @@ -1738,19 +1695,14 @@ protected static Response performRequest(Request request) throws IOException { return response; } - static void assertNotPartial(Map answer) { - var clusters = answer.get("_clusters"); - var reason = "unexpected partial results" + (clusters != null ? ": _clusters=" + clusters : ""); - assertThat(reason, answer.get("is_partial"), anyOf(nullValue(), is(false))); - } - - private static void assertWarnings(Response response, AssertWarnings assertWarnings) { + private static HttpEntity assertWarnings(Response response, AssertWarnings assertWarnings) { List warnings = new ArrayList<>(response.getWarnings()); warnings.removeAll(mutedWarnings()); if (shouldLog()) { LOGGER.info("RESPONSE warnings (after muted)={}", warnings); } assertWarnings.assertWarnings(warnings); + return response.getEntity(); } private static Set mutedWarnings() { From 09ad3514de73d4f81164b24ada3b2e21b726ecdb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?= Date: Tue, 23 Sep 2025 14:48:11 +0200 Subject: [PATCH 3/5] Added main changes on test class --- .../xpack/esql/qa/rest/RestEsqlTestCase.java | 73 ++++++++++++++----- 1 file changed, 55 insertions(+), 18 deletions(-) diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java index 5e85c20b026ab..488ab33677c0f 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java @@ -16,6 +16,7 @@ import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; +import org.elasticsearch.client.RestClient; import org.elasticsearch.client.WarningsHandler; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.io.Streams; @@ -41,6 +42,7 @@ import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStream; +import java.io.UncheckedIOException; import java.nio.charset.StandardCharsets; import java.time.ZoneId; import java.util.ArrayList; @@ -51,6 +53,8 @@ import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.function.IntFunction; import static java.util.Collections.emptySet; @@ -60,11 +64,11 @@ import static org.elasticsearch.test.MapMatcher.assertMap; import static org.elasticsearch.test.MapMatcher.matchesMap; import static org.elasticsearch.xpack.esql.EsqlTestUtils.as; -import static org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase.assertNotPartial; import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.Mode.ASYNC; import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.Mode.SYNC; import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.dateTimeToString; import static org.hamcrest.Matchers.any; +import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.emptyOrNullString; @@ -396,7 +400,9 @@ public void testCSVNoHeaderMode() throws IOException { options.addHeader("Content-Type", mediaType); options.addHeader("Accept", "text/csv; header=absent"); request.setOptions(options); - HttpEntity entity = performRequest(request, new AssertWarnings.NoWarnings()); + Response response = performRequest(request); + assertWarnings(response, new AssertWarnings.NoWarnings()); + HttpEntity entity = response.getEntity(); String actual = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)); assertEquals("keyword0,0\r\n", actual); } @@ -1091,7 +1097,7 @@ public void testComplexFieldNames() throws IOException { } /** - * INLINESTATS can group on {@code NOW()}. It's a little silly, but + * INLINE STATS can group on {@code NOW()}. It's a little silly, but * doing something like {@code DATE_TRUNC(1 YEAR, NOW() - 1970-01-01T00:00:00Z)} is * much more sensible. But just grouping on {@code NOW()} is enough to test this. *

@@ -1101,11 +1107,11 @@ public void testComplexFieldNames() throws IOException { */ @AwaitsFix(bugUrl = "Disabled temporarily until JOIN implementation is completed") public void testInlineStatsNow() throws IOException { - assumeTrue("INLINESTATS only available on snapshots", Build.current().isSnapshot()); + assumeTrue("INLINE STATS only available on snapshots", Build.current().isSnapshot()); indexTimestampData(1); RequestObjectBuilder builder = requestObjectBuilder().query( - fromIndex() + " | EVAL now=NOW() | INLINESTATS AVG(value) BY now | SORT value ASC" + fromIndex() + " | EVAL now=NOW() | INLINE STATS AVG(value) BY now | SORT value ASC" ); Map result = runEsql(builder); ListMatcher values = matchesList(); @@ -1258,7 +1264,10 @@ public static Map runEsql( var results = mode == ASYNC ? runEsqlAsync(requestObject, randomBoolean(), assertWarnings) : runEsqlSync(requestObject, assertWarnings); - return checkPartialResults ? assertNotPartial(results) : results; + if (checkPartialResults) { + assertNotPartial(results); + } + return results; } public static Map runEsql(RequestObjectBuilder requestObject, AssertWarnings assertWarnings, Mode mode) @@ -1269,8 +1278,13 @@ public static Map runEsql(RequestObjectBuilder requestObject, As public static Map runEsqlSync(RequestObjectBuilder requestObject, AssertWarnings assertWarnings) throws IOException { Request request = prepareRequestWithOptions(requestObject, SYNC); - HttpEntity entity = performRequest(request, assertWarnings); - return entityToMap(entity, requestObject.contentType()); + Response response = performRequest(request); + HttpEntity entity = response.getEntity(); + Map json = entityToMap(entity, requestObject.contentType()); + + assertWarnings(response, assertWarnings); + + return json; } public static Map runEsqlAsync(RequestObjectBuilder requestObject, AssertWarnings assertWarnings) throws IOException { @@ -1298,8 +1312,8 @@ public static Map runEsqlAsync( checkKeepOnCompletion(requestObject, json, keepOnCompletion); String id = (String) json.get("id"); - var supportsAsyncHeaders = clusterHasCapability("POST", "/_query", List.of(), List.of("async_query_status_headers")).orElse(false); - var supportsSuggestedCast = clusterHasCapability("POST", "/_query", List.of(), List.of("suggested_cast")).orElse(false); + var supportsAsyncHeaders = hasCapabilities(adminClient(), List.of("async_query_status_headers")); + var supportsSuggestedCast = hasCapabilities(adminClient(), List.of("suggested_cast")); if (id == null) { // no id returned from an async call, must have completed immediately and without keep_on_completion @@ -1361,6 +1375,26 @@ public static Map runEsqlAsync( return removeAsyncProperties(result); } + record CapabilitesCacheKey(RestClient client, List capabilities) {} + + /** + * Cache of capabilities. + */ + private static final ConcurrentMap capabilities = new ConcurrentHashMap<>(); + + public static boolean hasCapabilities(RestClient client, List requiredCapabilities) { + if (requiredCapabilities.isEmpty()) { + return true; + } + return capabilities.computeIfAbsent(new CapabilitesCacheKey(client, requiredCapabilities), r -> { + try { + return clusterHasCapability(client, "POST", "/_query", List.of(), requiredCapabilities).orElse(false); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + } + private static Object removeOriginalTypesAndSuggestedCast(Object response) { if (response instanceof ArrayList columns) { var newColumns = new ArrayList<>(); @@ -1589,7 +1623,8 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma } Response response = performRequest(request); - HttpEntity entity = assertWarnings(response, new AssertWarnings.NoWarnings()); + assertWarnings(response, new AssertWarnings.NoWarnings()); + HttpEntity entity = response.getEntity(); // get the content, it could be empty because the request might have not completed String initialValue = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)); @@ -1642,7 +1677,8 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma // if `addParam` is false, `options` will already have an `Accept` header getRequest.setOptions(options); response = performRequest(getRequest); - entity = assertWarnings(response, new AssertWarnings.NoWarnings()); + assertWarnings(response, new AssertWarnings.NoWarnings()); + entity = response.getEntity(); } String newValue = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)); @@ -1681,10 +1717,6 @@ private static String attachBody(RequestObjectBuilder requestObject, Request req return mediaType; } - private static HttpEntity performRequest(Request request, AssertWarnings assertWarnings) throws IOException { - return assertWarnings(performRequest(request), assertWarnings); - } - protected static Response performRequest(Request request) throws IOException { Response response = client().performRequest(request); if (shouldLog()) { @@ -1695,14 +1727,19 @@ protected static Response performRequest(Request request) throws IOException { return response; } - private static HttpEntity assertWarnings(Response response, AssertWarnings assertWarnings) { + static void assertNotPartial(Map answer) { + var clusters = answer.get("_clusters"); + var reason = "unexpected partial results" + (clusters != null ? ": _clusters=" + clusters : ""); + assertThat(reason, answer.get("is_partial"), anyOf(nullValue(), is(false))); + } + + private static void assertWarnings(Response response, AssertWarnings assertWarnings) { List warnings = new ArrayList<>(response.getWarnings()); warnings.removeAll(mutedWarnings()); if (shouldLog()) { LOGGER.info("RESPONSE warnings (after muted)={}", warnings); } assertWarnings.assertWarnings(warnings); - return response.getEntity(); } private static Set mutedWarnings() { From 77c108472cddbc878980ac200af33270e059ddad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?= Date: Tue, 23 Sep 2025 15:03:11 +0200 Subject: [PATCH 4/5] Backported test for the headers fix --- .../xpack/esql/qa/rest/RestEsqlTestCase.java | 35 +++++++++++++------ 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java index 488ab33677c0f..ed0948643f23c 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java @@ -1282,6 +1282,10 @@ public static Map runEsqlSync(RequestObjectBuilder requestObject HttpEntity entity = response.getEntity(); Map json = entityToMap(entity, requestObject.contentType()); + var supportsAsyncHeadersFix = hasCapabilities(adminClient(), List.of("async_query_status_headers_fix")); + if (supportsAsyncHeadersFix) { + assertNoAsyncHeaders(response); + } assertWarnings(response, assertWarnings); return json; @@ -1312,17 +1316,18 @@ public static Map runEsqlAsync( checkKeepOnCompletion(requestObject, json, keepOnCompletion); String id = (String) json.get("id"); - var supportsAsyncHeaders = hasCapabilities(adminClient(), List.of("async_query_status_headers")); + var supportsAsyncHeaders = hasCapabilities(adminClient(), List.of("async_query_status_headers_fix")); var supportsSuggestedCast = hasCapabilities(adminClient(), List.of("suggested_cast")); + // Check headers on initial query call + if (supportsAsyncHeaders) { + assertAsyncHeaders(response, id, (boolean) json.get("is_running")); + } + if (id == null) { // no id returned from an async call, must have completed immediately and without keep_on_completion assertThat(requestObject.keepOnCompletion(), either(nullValue()).or(is(false))); assertThat((boolean) json.get("is_running"), is(false)); - if (supportsAsyncHeaders) { - assertThat(response.getHeader("X-Elasticsearch-Async-Id"), nullValue()); - assertThat(response.getHeader("X-Elasticsearch-Async-Is-Running"), is("?0")); - } assertWarnings(response, assertWarnings); json.remove("is_running"); // remove this to not mess up later map assertions return Collections.unmodifiableMap(json); @@ -1343,11 +1348,6 @@ public static Map runEsqlAsync( assertThat(json.get("pages"), nullValue()); } - if (supportsAsyncHeaders) { - assertThat(response.getHeader("X-Elasticsearch-Async-Id"), is(id)); - assertThat(response.getHeader("X-Elasticsearch-Async-Is-Running"), is(isRunning ? "?1" : "?0")); - } - // issue a second request to "async get" the results Request getRequest = prepareAsyncGetRequest(id); getRequest.setOptions(request.getOptions()); @@ -1357,6 +1357,11 @@ public static Map runEsqlAsync( var result = entityToMap(entity, requestObject.contentType()); + // Check headers on get call + if (supportsAsyncHeaders) { + assertAsyncHeaders(response, id, (boolean) result.get("is_running")); + } + // assert initial contents, if any, are the same as async get contents if (initialColumns != null) { if (supportsSuggestedCast == false) { @@ -1850,6 +1855,16 @@ private static void createIndex(String indexName, boolean lookupMode, String map assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode()); } + private static void assertAsyncHeaders(Response response, @Nullable String asyncId, boolean isRunning) { + assertThat(response.getHeader("X-Elasticsearch-Async-Id"), asyncId == null ? nullValue() : equalTo(asyncId)); + assertThat(response.getHeader("X-Elasticsearch-Async-Is-Running"), isRunning ? is("?1") : is("?0")); + } + + private static void assertNoAsyncHeaders(Response response) { + assertThat(response.getHeader("X-Elasticsearch-Async-Id"), nullValue()); + assertThat(response.getHeader("X-Elasticsearch-Async-Is-Running"), nullValue()); + } + public static RequestObjectBuilder requestObjectBuilder() throws IOException { return new RequestObjectBuilder(); } From 03aa85f933a476a286e536f7bbbb6e392bf7eb01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?= Date: Tue, 23 Sep 2025 15:04:19 +0200 Subject: [PATCH 5/5] Remove unrelated changes --- .../elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java index ed0948643f23c..797ffd595a4e0 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java @@ -1097,7 +1097,7 @@ public void testComplexFieldNames() throws IOException { } /** - * INLINE STATS can group on {@code NOW()}. It's a little silly, but + * INLINESTATS can group on {@code NOW()}. It's a little silly, but * doing something like {@code DATE_TRUNC(1 YEAR, NOW() - 1970-01-01T00:00:00Z)} is * much more sensible. But just grouping on {@code NOW()} is enough to test this. *

@@ -1107,11 +1107,11 @@ public void testComplexFieldNames() throws IOException { */ @AwaitsFix(bugUrl = "Disabled temporarily until JOIN implementation is completed") public void testInlineStatsNow() throws IOException { - assumeTrue("INLINE STATS only available on snapshots", Build.current().isSnapshot()); + assumeTrue("INLINESTATS only available on snapshots", Build.current().isSnapshot()); indexTimestampData(1); RequestObjectBuilder builder = requestObjectBuilder().query( - fromIndex() + " | EVAL now=NOW() | INLINE STATS AVG(value) BY now | SORT value ASC" + fromIndex() + " | EVAL now=NOW() | INLINESTATS AVG(value) BY now | SORT value ASC" ); Map result = runEsql(builder); ListMatcher values = matchesList();