Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,14 @@ ThreadPool getThreadPool() {
/**
* Waits for persistent tasks to comply with a given predicate, then call back the listener accordingly.
*
* @param predicate the predicate to evaluate
* @param projectId the project that the persistent tasks are associated with
* @param predicate the predicate to evaluate, must be able to handle {@code null} input which means either the project
* does not exist or persistent tasks for the project do not exist
* @param timeout a timeout for waiting
* @param listener the callback listener
*/
public void waitForPersistentTasksCondition(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason you didn't also update the (overloaded) waitForPersistentTasksCondition method above? I think it makes sense to do both within this PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other method is not an overload, it is waitForPersistentTaskCondition, note the singular Task. I didn't touch that because you handled it in #124000?

final ProjectId projectId,
final Predicate<PersistentTasksCustomMetadata> predicate,
final @Nullable TimeValue timeout,
final ActionListener<Boolean> listener
Expand All @@ -264,7 +267,15 @@ public void onClusterServiceClose() {
public void onTimeout(TimeValue timeout) {
listener.onFailure(new IllegalStateException("Timed out when waiting for persistent tasks after " + timeout));
}
}, clusterState -> predicate.test(PersistentTasksCustomMetadata.get(clusterState.metadata().getDefaultProject())), timeout, logger);
}, clusterState -> {
final var project = clusterState.metadata().projects().get(projectId);
if (project == null) {
logger.debug("project [{}] not found while waiting for persistent tasks condition", projectId);
return predicate.test(null);
} else {
return predicate.test(PersistentTasksCustomMetadata.get(project));
}
}, timeout, logger);
}

public interface WaitForPersistentTaskListener<P extends PersistentTaskParams> extends ActionListener<PersistentTask<P>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.injection.guice.Inject;
Expand Down Expand Up @@ -642,7 +644,12 @@ void waitForJobClosed(
ActionListener<CloseJobAction.Response> listener,
Set<String> movedJobs
) {
persistentTasksService.waitForPersistentTasksCondition(persistentTasksCustomMetadata -> {
@FixForMultiProject
final var projectId = Metadata.DEFAULT_PROJECT_ID;
persistentTasksService.waitForPersistentTasksCondition(projectId, persistentTasksCustomMetadata -> {
if (persistentTasksCustomMetadata == null) {
return true;
}
Comment on lines +650 to +652
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added similar null check in most of the call sites. It looks correct to me since all usages are either waiting for all tasks to be removed or not on certain state. Additionally, the same null check is already in place for TransportStopTransformAction.

I think it's likely a noop for single project setup since the method is called only after the tasks have started. Having it feels more future proof for multi-project where project might be concurrently removed. It might be that the project deletion is orderly so that a project won't be removed until all tasks have cleared out. In that case, this check will be a noop. Overall I feel it is safer to have it in any case.

for (PersistentTasksCustomMetadata.PersistentTask<?> originalPersistentTask : waitForCloseRequest.persistentTasks) {
String originalPersistentTaskId = originalPersistentTask.getId();
PersistentTasksCustomMetadata.PersistentTask<?> currentPersistentTask = persistentTasksCustomMetadata.getTask(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,19 @@ protected void upgradeModeSuccessfullyChanged(
isolateDatafeeds(tasksCustomMetadata, isolateDatafeedListener);
} else {
logger.info("Disabling upgrade mode, must wait for tasks to not have AWAITING_UPGRADE assignment");
@FixForMultiProject
final var projectId = Metadata.DEFAULT_PROJECT_ID;
persistentTasksService.waitForPersistentTasksCondition(
// Wait for jobs, datafeeds and analytics not to be "Awaiting upgrade"
persistentTasksCustomMetadata -> persistentTasksCustomMetadata.tasks()
.stream()
.noneMatch(t -> ML_TASK_NAMES.contains(t.getTaskName()) && t.getAssignment().equals(AWAITING_UPGRADE)),
projectId,
persistentTasksCustomMetadata -> {
if (persistentTasksCustomMetadata == null) {
return true;
}
return persistentTasksCustomMetadata.tasks()
.stream()
.noneMatch(t -> ML_TASK_NAMES.contains(t.getTaskName()) && t.getAssignment().equals(AWAITING_UPGRADE));
},
request.ackTimeout(),
ActionListener.wrap(r -> {
logger.info("Done waiting for tasks to be out of AWAITING_UPGRADE");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
Expand Down Expand Up @@ -427,15 +429,17 @@ void waitForTaskRemoved(
StopDataFrameAnalyticsAction.Response response,
ActionListener<StopDataFrameAnalyticsAction.Response> listener
) {
persistentTasksService.waitForPersistentTasksCondition(
persistentTasks -> persistentTasks.findTasks(MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, t -> taskIds.contains(t.getId()))
.isEmpty(),
request.getTimeout(),
ActionListener.wrap(booleanResponse -> {
auditor.info(request.getId(), Messages.DATA_FRAME_ANALYTICS_AUDIT_STOPPED);
listener.onResponse(response);
}, listener::onFailure)
);
@FixForMultiProject
final var projectId = Metadata.DEFAULT_PROJECT_ID;
persistentTasksService.waitForPersistentTasksCondition(projectId, persistentTasks -> {
if (persistentTasks == null) {
return true;
}
return persistentTasks.findTasks(MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, t -> taskIds.contains(t.getId())).isEmpty();
}, request.getTimeout(), ActionListener.wrap(booleanResponse -> {
auditor.info(request.getId(), Messages.DATA_FRAME_ANALYTICS_AUDIT_STOPPED);
listener.onResponse(response);
}, listener::onFailure));
}

// Visible for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.OriginSettingClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.injection.guice.Inject;
Expand Down Expand Up @@ -501,7 +503,12 @@ void waitForDatafeedStopped(
ActionListener<StopDatafeedAction.Response> listener,
Set<String> movedDatafeeds
) {
persistentTasksService.waitForPersistentTasksCondition(persistentTasksCustomMetadata -> {
@FixForMultiProject
final var projectId = Metadata.DEFAULT_PROJECT_ID;
persistentTasksService.waitForPersistentTasksCondition(projectId, persistentTasksCustomMetadata -> {
if (persistentTasksCustomMetadata == null) {
return true;
}
for (PersistentTasksCustomMetadata.PersistentTask<?> originalPersistentTask : datafeedPersistentTasks) {
String originalPersistentTaskId = originalPersistentTask.getId();
PersistentTasksCustomMetadata.PersistentTask<?> currentPersistentTask = persistentTasksCustomMetadata.getTask(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,15 @@ private boolean isTransformTask(PersistentTasksCustomMetadata.PersistentTask<?>

private void waitForTransformsToRestart(SetUpgradeModeActionRequest request, ActionListener<AcknowledgedResponse> listener) {
logger.info("Disabling upgrade mode for Transforms, must wait for tasks to not have AWAITING_UPGRADE assignment");
persistentTasksService.waitForPersistentTasksCondition(
persistentTasksCustomMetadata -> persistentTasksCustomMetadata.tasks()
@FixForMultiProject
final var projectId = Metadata.DEFAULT_PROJECT_ID;
persistentTasksService.waitForPersistentTasksCondition(projectId, persistentTasksCustomMetadata -> {
if (persistentTasksCustomMetadata == null) {
return true;
}
return persistentTasksCustomMetadata.tasks()
.stream()
.noneMatch(t -> isTransformTask(t) && t.getAssignment().equals(AWAITING_UPGRADE)),
request.ackTimeout(),
listener.delegateFailureAndWrap((d, r) -> d.onResponse(AcknowledgedResponse.TRUE))
);
.noneMatch(t -> isTransformTask(t) && t.getAssignment().equals(AWAITING_UPGRADE));
}, request.ackTimeout(), listener.delegateFailureAndWrap((d, r) -> d.onResponse(AcknowledgedResponse.TRUE)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.index.IndexNotFoundException;
Expand Down Expand Up @@ -388,7 +390,9 @@ private void waitForTransformStopped(
// This map is accessed in the predicate and the listener callbacks
final Map<String, ElasticsearchException> exceptions = new ConcurrentHashMap<>();

persistentTasksService.waitForPersistentTasksCondition(persistentTasksCustomMetadata -> {
@FixForMultiProject
final var projectId = Metadata.DEFAULT_PROJECT_ID;
persistentTasksService.waitForPersistentTasksCondition(projectId, persistentTasksCustomMetadata -> {
if (persistentTasksCustomMetadata == null) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,10 @@ private ClusterState stateWithTransformTask() {

public void testDisableUpgradeMode() throws InterruptedException {
doAnswer(ans -> {
ActionListener<Boolean> listener = ans.getArgument(2);
ActionListener<Boolean> listener = ans.getArgument(3);
listener.onResponse(true);
return null;
}).when(persistentTasksService).waitForPersistentTasksCondition(any(), any(), any());
}).when(persistentTasksService).waitForPersistentTasksCondition(any(), any(), any(), any());
upgradeModeSuccessfullyChanged(new SetUpgradeModeActionRequest(false), stateWithTransformTask(), assertNoFailureListener(r -> {
assertThat(r, is(AcknowledgedResponse.TRUE));
verify(clusterService, never()).submitUnbatchedStateUpdateTask(eq("unassign persistent task from any node"), any());
Expand Down