Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix TasksIT#testGetTaskWaitForCompletionWithoutStoringResult #108094

Merged
merged 17 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.RemovedTaskListener;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tasks.TaskId;
Expand Down Expand Up @@ -599,24 +600,32 @@ private <T> void waitForCompletionTestCase(boolean storeResult, Function<TaskId,
TEST_TASK_ACTION.name() + "[n]",
() -> client().execute(TEST_TASK_ACTION, request)
);
ActionFuture<T> waitResponseFuture;
var tasks = clusterAdmin().prepareListTasks().setActions(TEST_TASK_ACTION.name()).get().getTasks();
assertThat(tasks, hasSize(1));
TaskId taskId = tasks.get(0).taskId();
clusterAdmin().prepareGetTask(taskId).get();

var taskManager = (MockTaskManager) internalCluster().getInstance(
TransportService.class,
clusterService().state().getNodes().resolveNode(taskId.getNodeId()).getName()
).getTaskManager();
var listener = new MockTaskManagerListener() {
@Override
public void onRemovedTaskListenerRegistered(RemovedTaskListener removedTaskListener) {
// Unblock the request only after it started waiting for task completion
client().execute(UNBLOCK_TASK_ACTION, new TestTaskPlugin.UnblockTestTasksRequest());
}
};
taskManager.addListener(listener);
try {
var tasks = clusterAdmin().prepareListTasks().setActions(TEST_TASK_ACTION.name()).get().getTasks();
assertThat(tasks, hasSize(1));
var taskId = tasks.get(0).taskId();
clusterAdmin().prepareGetTask(taskId).get();

// Spin up a request to wait for the test task to finish
waitResponseFuture = wait.apply(taskId);
// The task will be unblocked as soon as the request started waiting for task completion
T waitResponse = wait.apply(taskId).get();
validator.accept(waitResponse);
} finally {
// Unblock the request so the wait for completion request can finish
client().execute(UNBLOCK_TASK_ACTION, new TestTaskPlugin.UnblockTestTasksRequest()).get();
taskManager.removeListener(listener);
}

// Now that the task is unblocked the list response will come back
T waitResponse = waitResponseFuture.get();
validator.accept(waitResponse);

TestTaskPlugin.NodesResponse response = future.get();
assertEquals(emptyList(), response.failures());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,17 @@ void getRunningTaskFromNode(Task thisTask, GetTaskRequest request, ActionListene
} else {
if (request.getWaitForCompletion()) {
final ListenableActionFuture<Void> future = new ListenableActionFuture<>();
RemovedTaskListener removedTaskListener = task -> {
if (task.equals(runningTask)) {
future.onResponse(null);
RemovedTaskListener removedTaskListener = new RemovedTaskListener() {
@Override
public void onRemoved(Task task) {
if (task.equals(runningTask)) {
future.onResponse(null);
}
}

@Override
public String toString() {
return "Waiting for task completion " + runningTask;
}
};
taskManager.registerRemovedTaskListener(removedTaskListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.RemovedTaskListener;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskAwareRequest;
import org.elasticsearch.tasks.TaskManager;
Expand Down Expand Up @@ -84,4 +85,16 @@ public void addListener(MockTaskManagerListener listener) {
public void removeListener(MockTaskManagerListener listener) {
listeners.remove(listener);
}

@Override
public void registerRemovedTaskListener(RemovedTaskListener removedTaskListener) {
super.registerRemovedTaskListener(removedTaskListener);
for (MockTaskManagerListener listener : listeners) {
try {
listener.onRemovedTaskListenerRegistered(removedTaskListener);
} catch (Exception e) {
logger.warn("failed to notify task manager listener about a registered removed task listener", e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.test.tasks;

import org.elasticsearch.tasks.RemovedTaskListener;
import org.elasticsearch.tasks.Task;

/**
Expand All @@ -17,4 +18,6 @@ public interface MockTaskManagerListener {
default void onTaskRegistered(Task task) {};

default void onTaskUnregistered(Task task) {};

default void onRemovedTaskListenerRegistered(RemovedTaskListener removedTaskListener) {};
}