Skip to content

Commit

Permalink
Return result from cluster state task execution (#83562)
Browse files Browse the repository at this point in the history
The `MasterService` executes batches of tasks which compute changes to
the `ClusterState`. After executing each batch the `MasterService`
publishes the updated cluster state and notifies every task in the batch
when the publication completes. Many tasks compute some kind of result
during their execution which needs to be made available to the
publication completion handler for subsequent activities.

Today there's no good general way to pass anything to the completion
handler other than the fact that the publication succeeded. Some tasks
work around this by storing their result in the `ClusterState` itself.
Others use the executor to capture the result and pass it through.
Neither solution works well with batching: later tasks in a batch may
overwrite the part of the `ClusterState` containing the results of
earlier tasks, and batching executors are re-used across batches.

This commit adjusts the `ClusterStateTaskExecutor` interface so that now
implementations must supply a listener for each task they successfully
execute. The `MasterService` collects the listeners for the batch and
notifies them all when publication completes. This gives the executor
control over the completion handler of each task which lets it pass in
any extra data needed.
  • Loading branch information
DaveCTurner committed Feb 10, 2022
1 parent d4655e8 commit 3991961
Show file tree
Hide file tree
Showing 16 changed files with 437 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public ClusterTasksResult<ClusterStateUpdateTask> execute(ClusterState currentSt
for (ClusterStateUpdateTask task : tasks) {
try {
clusterState = task.execute(clusterState);
builder.success(task);
builder.success(task, new LegacyClusterTaskResultActionListener(task, currentState));
} catch (Exception e) {
builder.failure(task, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public TransportAction(
// each duplicate task
task.indexNameRef.set(successfulBefore.indexNameRef.get());
}
builder.success(task);
builder.success(task, new ClusterStateTaskExecutor.LegacyClusterTaskResultActionListener(task, currentState));
} catch (Exception e) {
builder.failure(task, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ public ClusterTasksResult<RolloverTask> execute(ClusterState currentState, List<
for (RolloverTask task : tasks) {
try {
state = task.performRollover(state);
builder.success(task);
builder.success(task, new LegacyClusterTaskResultActionListener(task, currentState));
} catch (Exception e) {
builder.failure(task, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
*/
package org.elasticsearch.cluster;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.Nullable;

import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public interface ClusterStateTaskExecutor<T extends ClusterStateTaskListener> {
/**
Expand Down Expand Up @@ -75,17 +77,28 @@ public static <T extends ClusterStateTaskListener> Builder<T> builder() {
public static class Builder<T extends ClusterStateTaskListener> {
private final Map<T, TaskResult> executionResults = new IdentityHashMap<>();

public Builder<T> success(T task) {
return result(task, TaskResult.success());
}

public Builder<T> successes(Iterable<T> tasks) {
for (T task : tasks) {
success(task);
}
return this;
/**
* Record that the cluster state update task succeeded.
*
* @param taskListener A listener for the completion of the resulting cluster state publication. This listener is completed with
* the cluster state that was published (or the publication exception that occurred) in the thread context
* in which the task was submitted. The task's {@link ClusterStateTaskListener#clusterStateProcessed} method
* is not called directly by the master service, nor is {@link ClusterStateTaskListener#onFailure} once the
* task execution has succeeded, but legacy implementations may use this listener to call those methods.
* <p>
* The listener should prefer not to use the published state for things like determining the result of a
* task. The task may have been executed as part of a batch, and later tasks in the batch may overwrite
* the results from earlier tasks. Instead the listener should independently capture the information it
* needs to properly process the completion of a cluster state update.
*/
// TODO remove all remaining usages of the published state and then make this an ActionListener<Void>
public Builder<T> success(T task, ActionListener<ClusterState> taskListener) {
return result(task, TaskResult.success(taskListener));
}

/**
* Record that the cluster state update task failed.
*/
public Builder<T> failure(T task, Exception e) {
return result(task, TaskResult.failure(e));
}
Expand All @@ -109,19 +122,22 @@ public ClusterTasksResult<T> build(ClusterState resultingState) {
}
}

record TaskResult(Exception failure) {
private static final TaskResult SUCCESS = new TaskResult(null);
record TaskResult(@Nullable ActionListener<ClusterState> taskListener, @Nullable Exception failure) {

public TaskResult {
assert failure == null ^ taskListener == null;
}

public static TaskResult success() {
return SUCCESS;
public static TaskResult success(ActionListener<ClusterState> taskListener) {
return new TaskResult(Objects.requireNonNull(taskListener), null);
}

public static TaskResult failure(Exception failure) {
return new TaskResult(failure);
return new TaskResult(null, Objects.requireNonNull(failure));
}

public boolean isSuccess() {
return this == SUCCESS;
return failure == null;
}

public Exception getFailure() {
Expand All @@ -139,8 +155,11 @@ static <T extends ClusterStateUpdateTask> ClusterStateTaskExecutor<T> unbatched(
@Override
public ClusterTasksResult<T> execute(ClusterState currentState, List<T> tasks) throws Exception {
assert tasks.size() == 1 : "this only supports a single task but received " + tasks;
ClusterState result = tasks.get(0).execute(currentState);
return ClusterTasksResult.<T>builder().successes(tasks).build(result);
final T task = tasks.get(0);
final ClusterState newState = task.execute(currentState);
return ClusterTasksResult.<T>builder()
.success(task, new LegacyClusterTaskResultActionListener(task, currentState))
.build(newState);
}

@Override
Expand All @@ -150,4 +169,26 @@ public String describeTasks(List<T> tasks) {
};
}

/**
* An {@link ActionListener} for passing to {@link ClusterStateTaskExecutor.ClusterTasksResult.Builder#success} which preserves the
* legacy behaviour of calling {@link ClusterStateTaskListener#clusterStateProcessed} or {@link ClusterStateTaskListener#onFailure}.
* <p>
* New implementations should use a dedicated listener rather than relying on this legacy behaviour.
*/
// TODO remove all remaining usages of this listener
record LegacyClusterTaskResultActionListener(ClusterStateTaskListener task, ClusterState originalState)
implements
ActionListener<ClusterState> {

@Override
public void onResponse(ClusterState publishedState) {
task.clusterStateProcessed(originalState, publishedState);
}

@Override
public void onFailure(Exception e) {
task.onFailure(e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,18 @@ default void onNoLongerMaster() {
}

/**
* Called when the result of the {@link ClusterStateTaskExecutor#execute(ClusterState, List)} have been processed
* properly by all listeners.
* Called when the result of the {@link ClusterStateTaskExecutor#execute(ClusterState, List)} method have been processed properly by all
* listeners.
*
* The {@param newState} parameter is the state that was ultimately published. This can lead to surprising behaviour if tasks are
* batched together: a later task in the batch may undo or overwrite the changes made by an earlier task. In general you should prefer
* to ignore the published state and instead handle the success of a publication via the listener that the executor passes to
* {@link ClusterStateTaskExecutor.ClusterTasksResult.Builder#success}.
*
* Implementations of this callback must not throw exceptions: an exception thrown here is logged by the master service at {@code ERROR}
* level and otherwise ignored, except in tests where it raises an {@link AssertionError}. If log-and-ignore is the right behaviour then
* implementations must do so themselves, typically using a more specific logger and at a less dramatic log level.
*/
// TODO: replace all remaining usages of this method with dedicated listeners and then remove it.
default void clusterStateProcessed(ClusterState oldState, ClusterState newState) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,13 @@ public String describeTasks(List<LocalMasterServiceTask> tasks) {
@Override
public ClusterTasksResult<LocalMasterServiceTask> execute(ClusterState currentState, List<LocalMasterServiceTask> tasks)
throws Exception {
assert tasks.size() == 1 && tasks.get(0) == LocalMasterServiceTask.this
final LocalMasterServiceTask thisTask = LocalMasterServiceTask.this;
assert tasks.size() == 1 && tasks.get(0) == thisTask
: "expected one-element task list containing current object but was " + tasks;
LocalMasterServiceTask.this.execute(currentState);
return ClusterTasksResult.<LocalMasterServiceTask>builder().successes(tasks).build(currentState);
thisTask.execute(currentState);
return ClusterTasksResult.<LocalMasterServiceTask>builder()
.success(thisTask, new LegacyClusterTaskResultActionListener(thisTask, currentState))
.build(currentState);
}
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ public ClusterTasksResult<FailedShardUpdateTask> execute(ClusterState currentSta
entry,
entry.getShardId().getIndex()
);
batchResultBuilder.success(task);
batchResultBuilder.success(task, new LegacyClusterTaskResultActionListener(task, currentState));
} else {
// The primary term is 0 if the shard failed itself. It is > 0 if a write was done on a primary but was failed to be
// replicated to the shard copy with the provided allocation id. In case where the shard failed itself, it's ok to just
Expand Down Expand Up @@ -393,7 +393,7 @@ public ClusterTasksResult<FailedShardUpdateTask> execute(ClusterState currentSta
} else {
// tasks that correspond to non-existent shards are marked as successful
logger.debug("{} ignoring shard failed task [{}] (shard does not exist anymore)", entry.getShardId(), entry);
batchResultBuilder.success(task);
batchResultBuilder.success(task, new LegacyClusterTaskResultActionListener(task, currentState));
}
} else {
// failing a shard also possibly marks it as stale (see IndexMetadataUpdater)
Expand All @@ -408,7 +408,9 @@ public ClusterTasksResult<FailedShardUpdateTask> execute(ClusterState currentSta
ClusterState maybeUpdatedState = currentState;
try {
maybeUpdatedState = applyFailedShards(currentState, failedShardsToBeApplied, staleShardsToBeApplied);
batchResultBuilder.successes(tasksToBeApplied);
for (var task : tasksToBeApplied) {
batchResultBuilder.success(task, new LegacyClusterTaskResultActionListener(task, currentState));
}
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("failed to apply failed shards {}", failedShardsToBeApplied), e);
// failures are communicated back to the requester
Expand Down Expand Up @@ -637,7 +639,7 @@ public ClusterTasksResult<StartedShardUpdateTask> execute(ClusterState currentSt
// requests might still be in flight even after the shard has already been started or failed on the master. We just
// ignore these requests for now.
logger.debug("{} ignoring shard started task [{}] (shard does not exist anymore)", entry.shardId, entry);
builder.success(task);
builder.success(task, new LegacyClusterTaskResultActionListener(task, currentState));
} else {
if (matched.primary() && entry.primaryTerm > 0) {
final IndexMetadata indexMetadata = currentState.metadata().index(entry.shardId.getIndex());
Expand All @@ -658,7 +660,7 @@ public ClusterTasksResult<StartedShardUpdateTask> execute(ClusterState currentSt
entry.primaryTerm,
currentPrimaryTerm
);
builder.success(task);
builder.success(task, new LegacyClusterTaskResultActionListener(task, currentState));
continue;
}
}
Expand All @@ -671,7 +673,7 @@ public ClusterTasksResult<StartedShardUpdateTask> execute(ClusterState currentSt
entry,
matched
);
builder.success(task);
builder.success(task, new LegacyClusterTaskResultActionListener(task, currentState));
} else {
// remove duplicate actions as allocation service expects a clean list without duplicates
if (seenShardRoutings.contains(matched)) {
Expand Down Expand Up @@ -727,7 +729,9 @@ public ClusterTasksResult<StartedShardUpdateTask> execute(ClusterState currentSt

assert assertStartedIndicesHaveCompleteTimestampRanges(maybeUpdatedState);

builder.successes(tasksToBeApplied);
for (var task : tasksToBeApplied) {
builder.success(task, new LegacyClusterTaskResultActionListener(task, currentState));
}
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("failed to apply started shards {}", shardRoutingsToBeApplied), e);
builder.failures(tasksToBeApplied, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
ClusterState.Builder newState;

if (joiningNodes.size() == 1 && joiningNodes.get(0).isFinishElectionTask()) {
return results.successes(joiningNodes).build(currentState);
final Task task = joiningNodes.get(0);
return results.success(task, new LegacyClusterTaskResultActionListener(task, currentState)).build(currentState);
} else if (currentNodes.getMasterNode() == null && joiningNodes.stream().anyMatch(Task::isBecomeMasterTask)) {
assert joiningNodes.stream().anyMatch(Task::isFinishElectionTask)
: "becoming a master but election is not finished " + joiningNodes;
Expand Down Expand Up @@ -148,7 +149,7 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
continue;
}
}
results.success(joinTask);
results.success(joinTask, new LegacyClusterTaskResultActionListener(joinTask, currentState));
}

if (nodesChanged) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,25 +59,29 @@ public NodeRemovalClusterStateTaskExecutor(AllocationService allocationService)
public ClusterTasksResult<Task> execute(final ClusterState currentState, final List<Task> tasks) throws Exception {
final DiscoveryNodes.Builder remainingNodesBuilder = DiscoveryNodes.builder(currentState.nodes());
boolean removed = false;
final var resultBuilder = ClusterTasksResult.<Task>builder();
for (final Task task : tasks) {
if (currentState.nodes().nodeExists(task.node())) {
remainingNodesBuilder.remove(task.node());
removed = true;
} else {
logger.debug("node [{}] does not exist in cluster state, ignoring", task);
}
resultBuilder.success(task, new LegacyClusterTaskResultActionListener(task, currentState));
}

if (removed == false) {
final ClusterState finalState;

if (removed) {
final ClusterState remainingNodesClusterState = remainingNodesClusterState(currentState, remainingNodesBuilder);
final ClusterState ptasksDisassociatedState = PersistentTasksCustomMetadata.disassociateDeadNodes(remainingNodesClusterState);
finalState = allocationService.disassociateDeadNodes(ptasksDisassociatedState, true, describeTasks(tasks));
} else {
// no nodes to remove, keep the current cluster state
return ClusterTasksResult.<Task>builder().successes(tasks).build(currentState);
finalState = currentState;
}

final ClusterState remainingNodesClusterState = remainingNodesClusterState(currentState, remainingNodesBuilder);
final ClusterState ptasksDisassociatedState = PersistentTasksCustomMetadata.disassociateDeadNodes(remainingNodesClusterState);
final ClusterState finalState = allocationService.disassociateDeadNodes(ptasksDisassociatedState, true, describeTasks(tasks));

return ClusterTasksResult.<Task>builder().successes(tasks).build(finalState);
return resultBuilder.build(finalState);
}

// visible for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public ClusterTasksResult<PutMappingClusterStateUpdateTask> execute(
ClusterState currentState,
List<PutMappingClusterStateUpdateTask> tasks
) throws Exception {
final ClusterState originalState = currentState;
Map<Index, MapperService> indexMapperServices = new HashMap<>();
ClusterTasksResult.Builder<PutMappingClusterStateUpdateTask> builder = ClusterTasksResult.builder();
try {
Expand All @@ -115,7 +116,7 @@ public ClusterTasksResult<PutMappingClusterStateUpdateTask> execute(
}
}
currentState = applyRequest(currentState, request, indexMapperServices);
builder.success(task);
builder.success(task, new LegacyClusterTaskResultActionListener(task, originalState));
} catch (Exception e) {
builder.failure(task, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public MetadataUpdateSettingsService(
for (AckedClusterStateUpdateTask task : tasks) {
try {
state = task.execute(state);
builder.success(task);
builder.success(task, new ClusterStateTaskExecutor.LegacyClusterTaskResultActionListener(task, currentState));
} catch (Exception e) {
builder.failure(task, e);
}
Expand Down

0 comments on commit 3991961

Please sign in to comment.