Skip to content

Commit

Permalink
TransportGetTaskAction: Wait for the task asynchronously (#93375)
Browse files Browse the repository at this point in the history
Wait for the requested task asynchronously in a similar fashion to TransportListTaskAction from #90977

See #90977

---------

Co-authored-by: David Turner <david.turner@elastic.co>
  • Loading branch information
arteam and DaveCTurner committed Feb 6, 2023
1 parent f55d70a commit 7bd5613
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 51 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/93375.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 93375
summary: '`TransportGetTaskAction:` Wait for the task asynchronously'
area: Task Management
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,6 @@ public void testWaitForCompletion() throws Exception {
assertNull(threadContext.getResponseHeaders().get(TestTransportAction.HEADER_NAME));
}));

// briefly fill up the generic pool so that (a) we know the wait has started and (b) we know it's not blocking
// flushThreadPool(threadPool, ThreadPool.Names.GENERIC); // TODO it _is_ blocking right now!!, unmute this in #93375

assertFalse(listWaitFuture.isDone());
assertFalse(testActionFuture.isDone());
barrier.await(10, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,27 @@
package org.elasticsearch.action.admin.cluster.node.tasks.get;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.ListenableActionFuture;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.OriginSettingClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.tasks.RemovedTaskListener;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo;
Expand All @@ -39,8 +43,9 @@

import java.io.IOException;

import static java.util.Objects.requireNonNullElse;
import static org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN;
import static org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction.waitForCompletionTimeout;
import static org.elasticsearch.core.TimeValue.timeValueSeconds;

/**
* ActionType to get a single task. If the task isn't running then it'll try to request the status from request index.
Expand All @@ -53,6 +58,9 @@
* </ul>
*/
public class TransportGetTaskAction extends HandledTransportAction<GetTaskRequest, GetTaskResponse> {

private static final TimeValue DEFAULT_WAIT_FOR_COMPLETION_TIMEOUT = timeValueSeconds(30);

private final ThreadPool threadPool;
private final ClusterService clusterService;
private final TransportService transportService;
Expand Down Expand Up @@ -130,19 +138,47 @@ void getRunningTaskFromNode(Task thisTask, GetTaskRequest request, ActionListene
getFinishedTaskFromIndex(thisTask, request, listener);
} else {
if (request.getWaitForCompletion()) {
// Shift to the generic thread pool and let it wait for the task to complete so we don't block any important threads.
threadPool.generic().execute(new AbstractRunnable() {
@Override
protected void doRun() {
taskManager.waitForTaskCompletion(runningTask, waitForCompletionTimeout(request.getTimeout()));
waitedForCompletion(thisTask, request, runningTask.taskInfo(clusterService.localNode().getId(), true), listener);
final ListenableActionFuture<Void> future = new ListenableActionFuture<>();
RemovedTaskListener removedTaskListener = task -> {
if (task.equals(runningTask)) {
future.onResponse(null);
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
};
taskManager.registerRemovedTaskListener(removedTaskListener);
// Check if the task had finished before we registered the listener, so we wouldn't wait
// for an event that would never come
if (taskManager.getTask(request.getTaskId().getId()) == null) {
future.onResponse(null);
}
final ActionListener<Void> waitedForCompletionListener = ActionListener.runBefore(
ActionListener.wrap(
v -> waitedForCompletion(
thisTask,
request,
runningTask.taskInfo(clusterService.localNode().getId(), true),
listener
),
listener::onFailure
),
() -> taskManager.unregisterRemovedTaskListener(removedTaskListener)
);
if (future.isDone()) {
// The task has already finished, we can run the completion listener in the same thread
waitedForCompletionListener.onResponse(null);
} else {
future.addListener(
new ContextPreservingActionListener<>(
threadPool.getThreadContext().newRestorableContext(false),
waitedForCompletionListener
)
);
var failByTimeout = threadPool.schedule(
() -> future.onFailure(new ElasticsearchTimeoutException("Timed out waiting for completion of task")),
requireNonNullElse(request.getTimeout(), DEFAULT_WAIT_FOR_COMPLETION_TIMEOUT),
ThreadPool.Names.SAME
);
future.addListener(ActionListener.wrap(failByTimeout::cancel));
}
} else {
TaskInfo info = runningTask.taskInfo(clusterService.localNode().getId(), true);
listener.onResponse(new GetTaskResponse(new TaskResult(false, info)));
Expand Down
19 changes: 0 additions & 19 deletions server/src/main/java/org/elasticsearch/tasks/TaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Assertions;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
Expand Down Expand Up @@ -547,23 +545,6 @@ public void applyClusterState(ClusterChangedEvent event) {
lastDiscoveryNodes = event.state().getNodes();
}

/**
* Blocks the calling thread, waiting for the task to vanish from the TaskManager.
*/
public void waitForTaskCompletion(Task task, long untilInNanos) {
while (System.nanoTime() - untilInNanos < 0) {
if (getTask(task.getId()) == null) {
return;
}
try {
Thread.sleep(WAIT_FOR_COMPLETION_POLL.millis());
} catch (InterruptedException e) {
throw new ElasticsearchException("Interrupted waiting for completion of [{}]", e, task);
}
}
throw new ElasticsearchTimeoutException("Timed out waiting for completion of [{}]", task);
}

private static class CancellableTaskHolder {
private final CancellableTask task;
private boolean finished = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,21 +78,6 @@ public Task unregister(Task task) {
return removedTask;
}

@Override
public void waitForTaskCompletion(Task task, long untilInNanos) {
for (MockTaskManagerListener listener : listeners) {
try {
listener.waitForTaskCompletion(task);
} catch (Exception e) {
logger.warn(
() -> format("failed to notify task manager listener about waitForTaskCompletion the task with id %s", task.getId()),
e
);
}
}
super.waitForTaskCompletion(task, untilInNanos);
}

@Override
public void registerRemovedTaskListener(RemovedTaskListener removedTaskListener) {
for (MockTaskManagerListener listener : listeners) {
Expand Down

0 comments on commit 7bd5613

Please sign in to comment.