Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/121843.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 121843
summary: Fix async stop sometimes not properly collecting result
area: ES|QL
type: bug
issues:
- 121249
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -282,9 +282,6 @@ tests:
- class: org.elasticsearch.xpack.ml.integration.ClassificationIT
method: testWithDatastreams
issue: https://github.com/elastic/elasticsearch/issues/121236
- class: org.elasticsearch.xpack.remotecluster.RemoteClusterSecurityEsqlIT
method: testCrossClusterAsyncQueryStop
issue: https://github.com/elastic/elasticsearch/issues/121249
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
method: test {p0=transform/*}
issue: https://github.com/elastic/elasticsearch/issues/120816
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,22 @@ public <T extends AsyncTask> T getTaskAndCheckAuthentication(
TaskManager taskManager,
AsyncExecutionId asyncExecutionId,
Class<T> tClass
) throws IOException {
return getTaskAndCheckAuthentication(taskManager, security, asyncExecutionId, tClass);
}

/**
* Returns the {@link AsyncTask} if the provided <code>asyncTaskId</code>
* is registered in the task manager, <code>null</code> otherwise.
*
* This method throws a {@link ResourceNotFoundException} if the authenticated user
* is not the creator of the original task.
*/
public static <T extends AsyncTask> T getTaskAndCheckAuthentication(
TaskManager taskManager,
AsyncSearchSecurity security,
AsyncExecutionId asyncExecutionId,
Class<T> tClass
) throws IOException {
T asyncTask = getTask(taskManager, asyncExecutionId, tClass);
if (asyncTask == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@

package org.elasticsearch.xpack.esql.plugin;

import org.elasticsearch.ResourceNotFoundException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.support.ActionFilters;
Expand All @@ -16,10 +17,8 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.compute.EsqlRefCountingListener;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
Expand All @@ -32,12 +31,11 @@
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
import org.elasticsearch.xpack.core.security.SecurityContext;
import org.elasticsearch.xpack.esql.action.EsqlAsyncStopAction;
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
import org.elasticsearch.xpack.esql.action.EsqlQueryTask;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;

Expand All @@ -55,6 +53,8 @@ public class TransportEsqlAsyncStopAction extends HandledTransportAction<AsyncSt
private final TransportService transportService;
private final AsyncSearchSecurity security;

private static final Logger logger = LogManager.getLogger(TransportEsqlAsyncStopAction.class);

@Inject
public TransportEsqlAsyncStopAction(
TransportService transportService,
Expand Down Expand Up @@ -106,34 +106,33 @@ private String sessionID(AsyncExecutionId asyncId) {

private void stopQueryAndReturnResult(Task task, AsyncExecutionId asyncId, ActionListener<EsqlQueryResponse> listener) {
String asyncIdStr = asyncId.getEncoded();
TransportEsqlQueryAction.EsqlQueryListener asyncListener = queryAction.getAsyncListener(asyncIdStr);
if (asyncListener == null) {
EsqlQueryTask asyncTask = getEsqlQueryTask(asyncId);
GetAsyncResultRequest getAsyncResultRequest = new GetAsyncResultRequest(asyncIdStr);
if (asyncTask == null) {
// This should mean one of the two things: either bad request ID, or the query has already finished
// In both cases, let regular async get deal with it.
var getAsyncResultRequest = new GetAsyncResultRequest(asyncIdStr);
// TODO: this should not be happening, but if the listener is not registered and the query is not finished,
// we give it some time to finish
getAsyncResultRequest.setWaitForCompletionTimeout(new TimeValue(1, TimeUnit.SECONDS));
logger.debug("Async stop for task {}, no task present - passing to GetAsyncResultRequest", asyncIdStr);
getResultsAction.execute(task, getAsyncResultRequest, listener);
return;
}
try {
EsqlQueryTask asyncTask = AsyncTaskIndexService.getTask(taskManager, asyncId, EsqlQueryTask.class);
if (false == security.currentUserHasAccessToTask(asyncTask)) {
throw new ResourceNotFoundException(asyncId + " not found");
logger.debug("Async stop for task {} - stopping", asyncIdStr);
final EsqlExecutionInfo esqlExecutionInfo = asyncTask.executionInfo();
if (esqlExecutionInfo != null) {
esqlExecutionInfo.markAsPartial();
}
Runnable getResults = () -> getResultsAction.execute(task, getAsyncResultRequest, listener);
exchangeService.finishSessionEarly(sessionID(asyncId), ActionListener.running(() -> {
if (asyncTask.addCompletionListener(() -> ActionListener.running(getResults)) == false) {
getResults.run();
}
}));
}

private EsqlQueryTask getEsqlQueryTask(AsyncExecutionId asyncId) {
try {
return AsyncTaskIndexService.getTaskAndCheckAuthentication(taskManager, security, asyncId, EsqlQueryTask.class);
} catch (IOException e) {
throw new ResourceNotFoundException(asyncId + " not found", e);
}
// Here we will wait for both the response to become available and for the finish operation to complete
var responseHolder = new AtomicReference<EsqlQueryResponse>();
try (var refs = new EsqlRefCountingListener(listener.map(unused -> responseHolder.get()))) {
asyncListener.addListener(refs.acquire().map(r -> {
responseHolder.set(r);
return null;
}));
asyncListener.markAsPartial();
exchangeService.finishSessionEarly(sessionID(asyncId), refs.acquire());
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,12 @@
import org.elasticsearch.action.admin.cluster.stats.CCSUsageTelemetry;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
Expand Down Expand Up @@ -83,8 +81,6 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
private final RemoteClusterService remoteClusterService;
private final QueryBuilderResolver queryBuilderResolver;
private final UsageService usageService;
// Listeners for active async queries, key being the async task execution ID
private final Map<String, EsqlQueryListener> asyncListeners = ConcurrentCollections.newConcurrentMap();

@Inject
@SuppressWarnings("this-escape")
Expand Down Expand Up @@ -183,41 +179,11 @@ private void doExecuteForked(Task task, EsqlQueryRequest request, ActionListener
}
}

// Subscribable listener that can keep track of the EsqlExecutionInfo
// Used to mark an async query as partial if it is stopped
public static class EsqlQueryListener extends SubscribableListener<EsqlQueryResponse> {
private EsqlExecutionInfo executionInfo;

public EsqlQueryListener(EsqlExecutionInfo executionInfo) {
this.executionInfo = executionInfo;
}

public EsqlExecutionInfo getExecutionInfo() {
return executionInfo;
}

public void markAsPartial() {
if (executionInfo != null) {
executionInfo.markAsPartial();
}
}
}

@Override
public void execute(EsqlQueryRequest request, EsqlQueryTask task, ActionListener<EsqlQueryResponse> listener) {
// set EsqlExecutionInfo on async-search task so that it is accessible to GET _query/async while the query is still running
task.setExecutionInfo(createEsqlExecutionInfo(request));
// Since the request is async here, we need to wrap the listener in a SubscribableListener so that we can collect the results from
// other endpoints, such as _query/async/stop
EsqlQueryListener subListener = new EsqlQueryListener(task.executionInfo());
String asyncExecutionId = task.getExecutionId().getEncoded();
subListener.addListener(ActionListener.runAfter(listener, () -> asyncListeners.remove(asyncExecutionId)));
asyncListeners.put(asyncExecutionId, subListener);
ActionListener.run(subListener, l -> innerExecute(task, request, l));
}

public EsqlQueryListener getAsyncListener(String executionId) {
return asyncListeners.get(executionId);
ActionListener.run(listener, l -> innerExecute(task, request, l));
}

private void innerExecute(Task task, EsqlQueryRequest request, ActionListener<EsqlQueryResponse> listener) {
Expand Down