Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/135078.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 135078
summary: Fix async get results with inconsistent headers
area: ES|QL
type: bug
issues:
- 135042
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.WarningsHandler;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.Streams;
Expand All @@ -41,6 +42,7 @@
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.time.ZoneId;
import java.util.ArrayList;
Expand All @@ -51,6 +53,8 @@
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.IntFunction;

import static java.util.Collections.emptySet;
Expand All @@ -60,11 +64,11 @@
import static org.elasticsearch.test.MapMatcher.assertMap;
import static org.elasticsearch.test.MapMatcher.matchesMap;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.as;
import static org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase.assertNotPartial;
import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.Mode.ASYNC;
import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.Mode.SYNC;
import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.dateTimeToString;
import static org.hamcrest.Matchers.any;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.emptyOrNullString;
Expand Down Expand Up @@ -396,7 +400,9 @@ public void testCSVNoHeaderMode() throws IOException {
options.addHeader("Content-Type", mediaType);
options.addHeader("Accept", "text/csv; header=absent");
request.setOptions(options);
HttpEntity entity = performRequest(request, new AssertWarnings.NoWarnings());
Response response = performRequest(request);
assertWarnings(response, new AssertWarnings.NoWarnings());
HttpEntity entity = response.getEntity();
String actual = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8));
assertEquals("keyword0,0\r\n", actual);
}
Expand Down Expand Up @@ -1258,7 +1264,10 @@ public static Map<String, Object> runEsql(
var results = mode == ASYNC
? runEsqlAsync(requestObject, randomBoolean(), assertWarnings)
: runEsqlSync(requestObject, assertWarnings);
return checkPartialResults ? assertNotPartial(results) : results;
if (checkPartialResults) {
assertNotPartial(results);
}
return results;
}

public static Map<String, Object> runEsql(RequestObjectBuilder requestObject, AssertWarnings assertWarnings, Mode mode)
Expand All @@ -1269,8 +1278,17 @@ public static Map<String, Object> runEsql(RequestObjectBuilder requestObject, As
public static Map<String, Object> runEsqlSync(RequestObjectBuilder requestObject, AssertWarnings assertWarnings) throws IOException {
Request request = prepareRequestWithOptions(requestObject, SYNC);

HttpEntity entity = performRequest(request, assertWarnings);
return entityToMap(entity, requestObject.contentType());
Response response = performRequest(request);
HttpEntity entity = response.getEntity();
Map<String, Object> json = entityToMap(entity, requestObject.contentType());

var supportsAsyncHeadersFix = hasCapabilities(adminClient(), List.of("async_query_status_headers_fix"));
if (supportsAsyncHeadersFix) {
assertNoAsyncHeaders(response);
}
assertWarnings(response, assertWarnings);

return json;
}

public static Map<String, Object> runEsqlAsync(RequestObjectBuilder requestObject, AssertWarnings assertWarnings) throws IOException {
Expand Down Expand Up @@ -1298,17 +1316,18 @@ public static Map<String, Object> runEsqlAsync(
checkKeepOnCompletion(requestObject, json, keepOnCompletion);
String id = (String) json.get("id");

var supportsAsyncHeaders = clusterHasCapability("POST", "/_query", List.of(), List.of("async_query_status_headers")).orElse(false);
var supportsSuggestedCast = clusterHasCapability("POST", "/_query", List.of(), List.of("suggested_cast")).orElse(false);
var supportsAsyncHeaders = hasCapabilities(adminClient(), List.of("async_query_status_headers_fix"));
var supportsSuggestedCast = hasCapabilities(adminClient(), List.of("suggested_cast"));

// Check headers on initial query call
if (supportsAsyncHeaders) {
assertAsyncHeaders(response, id, (boolean) json.get("is_running"));
}

if (id == null) {
// no id returned from an async call, must have completed immediately and without keep_on_completion
assertThat(requestObject.keepOnCompletion(), either(nullValue()).or(is(false)));
assertThat((boolean) json.get("is_running"), is(false));
if (supportsAsyncHeaders) {
assertThat(response.getHeader("X-Elasticsearch-Async-Id"), nullValue());
assertThat(response.getHeader("X-Elasticsearch-Async-Is-Running"), is("?0"));
}
assertWarnings(response, assertWarnings);
json.remove("is_running"); // remove this to not mess up later map assertions
return Collections.unmodifiableMap(json);
Expand All @@ -1329,11 +1348,6 @@ public static Map<String, Object> runEsqlAsync(
assertThat(json.get("pages"), nullValue());
}

if (supportsAsyncHeaders) {
assertThat(response.getHeader("X-Elasticsearch-Async-Id"), is(id));
assertThat(response.getHeader("X-Elasticsearch-Async-Is-Running"), is(isRunning ? "?1" : "?0"));
}

// issue a second request to "async get" the results
Request getRequest = prepareAsyncGetRequest(id);
getRequest.setOptions(request.getOptions());
Expand All @@ -1343,6 +1357,11 @@ public static Map<String, Object> runEsqlAsync(

var result = entityToMap(entity, requestObject.contentType());

// Check headers on get call
if (supportsAsyncHeaders) {
assertAsyncHeaders(response, id, (boolean) result.get("is_running"));
}

// assert initial contents, if any, are the same as async get contents
if (initialColumns != null) {
if (supportsSuggestedCast == false) {
Expand All @@ -1361,6 +1380,26 @@ public static Map<String, Object> runEsqlAsync(
return removeAsyncProperties(result);
}

record CapabilitesCacheKey(RestClient client, List<String> capabilities) {}

/**
* Cache of capabilities.
*/
private static final ConcurrentMap<CapabilitesCacheKey, Boolean> capabilities = new ConcurrentHashMap<>();

public static boolean hasCapabilities(RestClient client, List<String> requiredCapabilities) {
if (requiredCapabilities.isEmpty()) {
return true;
}
return capabilities.computeIfAbsent(new CapabilitesCacheKey(client, requiredCapabilities), r -> {
try {
return clusterHasCapability(client, "POST", "/_query", List.of(), requiredCapabilities).orElse(false);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
}

private static Object removeOriginalTypesAndSuggestedCast(Object response) {
if (response instanceof ArrayList<?> columns) {
var newColumns = new ArrayList<>();
Expand Down Expand Up @@ -1589,7 +1628,8 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma
}

Response response = performRequest(request);
HttpEntity entity = assertWarnings(response, new AssertWarnings.NoWarnings());
assertWarnings(response, new AssertWarnings.NoWarnings());
HttpEntity entity = response.getEntity();

// get the content, it could be empty because the request might have not completed
String initialValue = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8));
Expand Down Expand Up @@ -1642,7 +1682,8 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma
// if `addParam` is false, `options` will already have an `Accept` header
getRequest.setOptions(options);
response = performRequest(getRequest);
entity = assertWarnings(response, new AssertWarnings.NoWarnings());
assertWarnings(response, new AssertWarnings.NoWarnings());
entity = response.getEntity();
}
String newValue = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8));

Expand Down Expand Up @@ -1681,10 +1722,6 @@ private static String attachBody(RequestObjectBuilder requestObject, Request req
return mediaType;
}

private static HttpEntity performRequest(Request request, AssertWarnings assertWarnings) throws IOException {
return assertWarnings(performRequest(request), assertWarnings);
}

protected static Response performRequest(Request request) throws IOException {
Response response = client().performRequest(request);
if (shouldLog()) {
Expand All @@ -1695,14 +1732,19 @@ protected static Response performRequest(Request request) throws IOException {
return response;
}

private static HttpEntity assertWarnings(Response response, AssertWarnings assertWarnings) {
static void assertNotPartial(Map<String, Object> answer) {
var clusters = answer.get("_clusters");
var reason = "unexpected partial results" + (clusters != null ? ": _clusters=" + clusters : "");
assertThat(reason, answer.get("is_partial"), anyOf(nullValue(), is(false)));
}

private static void assertWarnings(Response response, AssertWarnings assertWarnings) {
List<String> warnings = new ArrayList<>(response.getWarnings());
warnings.removeAll(mutedWarnings());
if (shouldLog()) {
LOGGER.info("RESPONSE warnings (after muted)={}", warnings);
}
assertWarnings.assertWarnings(warnings);
return response.getEntity();
}

private static Set<String> mutedWarnings() {
Expand Down Expand Up @@ -1813,6 +1855,16 @@ private static void createIndex(String indexName, boolean lookupMode, String map
assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode());
}

private static void assertAsyncHeaders(Response response, @Nullable String asyncId, boolean isRunning) {
assertThat(response.getHeader("X-Elasticsearch-Async-Id"), asyncId == null ? nullValue() : equalTo(asyncId));
assertThat(response.getHeader("X-Elasticsearch-Async-Is-Running"), isRunning ? is("?1") : is("?0"));
}

private static void assertNoAsyncHeaders(Response response) {
assertThat(response.getHeader("X-Elasticsearch-Async-Id"), nullValue());
assertThat(response.getHeader("X-Elasticsearch-Async-Is-Running"), nullValue());
}

public static RequestObjectBuilder requestObjectBuilder() throws IOException {
return new RequestObjectBuilder();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,11 @@ public enum Cap {
*/
ASYNC_QUERY_STATUS_HEADERS,

/**
* Fix async headers not being sent on "get" requests
*/
ASYNC_QUERY_STATUS_HEADERS_FIX,

/**
* Consider the upper bound when computing the interval in BUCKET auto mode.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ public boolean isRunning() {
}

public boolean isAsync() {
return isRunning;
return isAsync;
}

public boolean isPartial() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
import org.elasticsearch.xpack.esql.VerificationException;
import org.elasticsearch.xpack.esql.action.EsqlAsyncGetResultAction;
Expand All @@ -35,6 +36,7 @@
public class TransportEsqlAsyncGetResultsAction extends AbstractTransportQlAsyncGetResultsAction<EsqlQueryResponse, EsqlQueryTask> {

private final BlockFactory blockFactory;
private final ThreadPool threadPool;

@Inject
public TransportEsqlAsyncGetResultsAction(
Expand All @@ -43,9 +45,9 @@ public TransportEsqlAsyncGetResultsAction(
ClusterService clusterService,
NamedWriteableRegistry registry,
Client client,
ThreadPool threadPool,
BigArrays bigArrays,
BlockFactoryProvider blockFactoryProvider
BlockFactoryProvider blockFactoryProvider,
ThreadPool threadPool
) {
super(
EsqlAsyncGetResultAction.NAME,
Expand All @@ -59,11 +61,12 @@ public TransportEsqlAsyncGetResultsAction(
EsqlQueryTask.class
);
this.blockFactory = blockFactoryProvider.blockFactory();
this.threadPool = threadPool;
}

@Override
protected void doExecute(Task task, GetAsyncResultRequest request, ActionListener<EsqlQueryResponse> listener) {
super.doExecute(task, request, unwrapListener(listener));
super.doExecute(task, request, unwrapListener(request.getId(), listener));
}

@Override
Expand All @@ -75,14 +78,21 @@ public Writeable.Reader<EsqlQueryResponse> responseReader() {
static final String VERIFY_EX_NAME = ElasticsearchException.getExceptionName(new VerificationException(""));

/**
* Unwraps the exception in the case of failure. This keeps the exception types
* the same as the sync API, namely ParsingException and VerificationException.
* Adds async headers, and unwraps the exception in the case of failure.
* <p>
* This keeps the exception types the same as the sync API, namely ParsingException and VerificationException.
* </p>
*/
static <R> ActionListener<R> unwrapListener(ActionListener<R> listener) {
ActionListener<EsqlQueryResponse> unwrapListener(String asyncExecutionId, ActionListener<EsqlQueryResponse> listener) {
return new ActionListener<>() {
@Override
public void onResponse(R o) {
listener.onResponse(o);
public void onResponse(EsqlQueryResponse response) {
boolean isRunning = response.isRunning();
threadPool.getThreadContext()
.addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_IS_RUNNING_HEADER, isRunning ? "?1" : "?0");
threadPool.getThreadContext().addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_ID_HEADER, asyncExecutionId);

listener.onResponse(response);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,20 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener<Es
ActionListener.wrap(result -> {
recordCCSTelemetry(task, executionInfo, request, null);
planExecutor.metrics().recordTook(executionInfo.overallTook().millis());
listener.onResponse(toResponse(task, request, configuration, result));
var response = toResponse(task, request, configuration, result);
assert response.isAsync() == request.async() : "The response must be async if the request was async";

if (response.isAsync()) {
if (response.asyncExecutionId().isPresent()) {
String asyncExecutionId = response.asyncExecutionId().get();
threadPool.getThreadContext().addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_ID_HEADER, asyncExecutionId);
}
boolean isRunning = response.isRunning();
threadPool.getThreadContext()
.addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_IS_RUNNING_HEADER, isRunning ? "?1" : "?0");
}

listener.onResponse(response);
}, ex -> {
recordCCSTelemetry(task, executionInfo, request, ex);
listener.onFailure(ex);
Expand Down Expand Up @@ -338,10 +351,8 @@ private EsqlQueryResponse toResponse(Task task, EsqlQueryRequest request, Config
EsqlQueryResponse.Profile profile = configuration.profile()
? new EsqlQueryResponse.Profile(result.completionInfo().driverProfiles(), result.completionInfo().planProfiles())
: null;
threadPool.getThreadContext().addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_IS_RUNNING_HEADER, "?0");
if (task instanceof EsqlQueryTask asyncTask && request.keepOnCompletion()) {
String asyncExecutionId = asyncTask.getExecutionId().getEncoded();
threadPool.getThreadContext().addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_ID_HEADER, asyncExecutionId);
return new EsqlQueryResponse(
columns,
result.pages(),
Expand Down
Loading