Skip to content

Commit

Permalink
Accept only single tasks at master service (#83829)
Browse files Browse the repository at this point in the history
Today `MasterService` (and `TaskBatcher`) allow callers to submit a
collection of tasks that will be executed all at once. Support for
batches of tasks makes things more complicated than they need to be,
noting that (since #83803) in production code we only ever submit single
tasks. This commit specializes things to accept only single tasks.
  • Loading branch information
DaveCTurner committed Feb 11, 2022
1 parent 586378b commit dd4d442
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 277 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import org.elasticsearch.node.Node;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.List;

public class ClusterService extends AbstractLifecycleComponent {
private final MasterService masterService;

Expand Down Expand Up @@ -259,7 +257,7 @@ public <T extends ClusterStateTaskListener> void submitStateUpdateTask(
ClusterStateTaskConfig config,
ClusterStateTaskExecutor<T> executor
) {
masterService.submitStateUpdateTasks(source, List.of(task), config, executor);
masterService.submitStateUpdateTask(source, task, config, executor);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -150,13 +149,9 @@ class Batcher extends TaskBatcher {
}

@Override
protected void onTimeout(List<? extends BatchedTask> tasks, TimeValue timeout) {
protected void onTimeout(BatchedTask task, TimeValue timeout) {
threadPool.generic()
.execute(
() -> tasks.forEach(
task -> ((UpdateTask) task).onFailure(new ProcessClusterEventTimeoutException(timeout, task.source))
)
);
.execute(() -> ((UpdateTask) task).onFailure(new ProcessClusterEventTimeoutException(timeout, task.source)));
}

@Override
Expand Down Expand Up @@ -506,7 +501,21 @@ public <T extends ClusterStateTaskListener> void submitStateUpdateTask(
ClusterStateTaskConfig config,
ClusterStateTaskExecutor<T> executor
) {
submitStateUpdateTasks(source, List.of(task), config, executor);
if (lifecycle.started() == false) {
return;
}
final ThreadContext threadContext = threadPool.getThreadContext();
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(true);
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
threadContext.markAsSystemContext();
taskBatcher.submitTask(taskBatcher.new UpdateTask(config.priority(), source, task, supplier, executor), config.timeout());
} catch (EsRejectedExecutionException e) {
// ignore cases where we are shutting down..., there is really nothing interesting
// to be done here...
if (lifecycle.stoppedOrClosed() == false) {
throw e;
}
}
}

/**
Expand Down Expand Up @@ -903,47 +912,6 @@ void onNoLongerMaster() {
}
}

/**
* Submits a batch of cluster state update tasks; submitted updates are guaranteed to be processed together,
* potentially with more tasks of the same executor.
*
* @param source the source of the cluster state update task
* @param tasks a collection of update tasks, which implement {@link ClusterStateTaskListener} so that they are notified when they
* are executed; tasks that also implement {@link ClusterStateAckListener} are notified on acks too.
* @param config the cluster state update task configuration
* @param executor the cluster state update task executor; tasks
* that share the same executor will be executed
* batches on this executor
* @param <T> the type of the cluster state update task state
*
*/
public <T extends ClusterStateTaskListener> void submitStateUpdateTasks(
final String source,
final Collection<T> tasks,
final ClusterStateTaskConfig config,
final ClusterStateTaskExecutor<T> executor
) {
if (lifecycle.started() == false) {
return;
}
final ThreadContext threadContext = threadPool.getThreadContext();
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(true);
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
threadContext.markAsSystemContext();

List<Batcher.UpdateTask> safeTasks = tasks.stream()
.map(task -> taskBatcher.new UpdateTask(config.priority(), source, task, supplier, executor))
.toList();
taskBatcher.submitTasks(safeTasks, config.timeout());
} catch (EsRejectedExecutionException e) {
// ignore cases where we are shutting down..., there is really nothing interesting
// to be done here...
if (lifecycle.stoppedOrClosed() == false) {
throw e;
}
}
}

private static class MasterServiceStarvationWatcher implements PrioritizedEsThreadPoolExecutor.StarvationWatcher {

private final long warnThreshold;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,12 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* Batching support for {@link PrioritizedEsThreadPoolExecutor}
Expand All @@ -45,86 +42,50 @@ public TaskBatcher(Logger logger, PrioritizedEsThreadPoolExecutor threadExecutor
this.threadExecutor = threadExecutor;
}

public void submitTasks(List<? extends BatchedTask> tasks, @Nullable TimeValue timeout) throws EsRejectedExecutionException {
if (tasks.isEmpty()) {
return;
}
final BatchedTask firstTask = tasks.get(0);
assert tasks.stream().allMatch(t -> t.batchingKey == firstTask.batchingKey)
: "tasks submitted in a batch should share the same batching key: " + tasks;
// convert to an identity map to check for dups based on task identity

tasksPerBatchingKey.compute(firstTask.batchingKey, (k, existingTasks) -> {
assert assertNoDuplicateTasks(tasks, existingTasks);
public void submitTask(BatchedTask task, @Nullable TimeValue timeout) throws EsRejectedExecutionException {
tasksPerBatchingKey.compute(task.batchingKey, (k, existingTasks) -> {
if (existingTasks == null) {
return Collections.synchronizedSet(new LinkedHashSet<>(tasks));
existingTasks = Collections.synchronizedSet(new LinkedHashSet<>());
} else {
assert assertNoDuplicateTasks(task, existingTasks);
}
existingTasks.addAll(tasks);
existingTasks.add(task);
return existingTasks;
});

if (timeout != null) {
threadExecutor.execute(firstTask, timeout, () -> onTimeoutInternal(tasks, timeout));
threadExecutor.execute(task, timeout, () -> onTimeoutInternal(task, timeout));
} else {
threadExecutor.execute(firstTask);
threadExecutor.execute(task);
}
}

private static boolean assertNoDuplicateTasks(List<? extends BatchedTask> tasks, Set<BatchedTask> existingTasks) {
final Map<Object, BatchedTask> tasksIdentity = tasks.stream()
.collect(
Collectors.toMap(
BatchedTask::getTask,
Function.identity(),
(a, b) -> { throw new AssertionError("cannot add duplicate task: " + a); },
IdentityHashMap::new
)
);
if (existingTasks == null) {
return true;
}
for (BatchedTask existing : existingTasks) {
// check that there won't be two tasks with the same identity for the same batching key
BatchedTask duplicateTask = tasksIdentity.get(existing.getTask());
assert duplicateTask == null
: "task ["
+ duplicateTask.describeTasks(Collections.singletonList(existing))
+ "] with source ["
+ duplicateTask.source
+ "] is already queued";
private static boolean assertNoDuplicateTasks(BatchedTask task, Set<BatchedTask> existingTasks) {
for (final var existingTask : existingTasks) {
assert existingTask.getTask() != task.getTask()
: "task [" + task.describeTasks(List.of(task)) + "] with source [" + task.source + "] is already queued";
}
return true;
}

private void onTimeoutInternal(List<? extends BatchedTask> tasks, TimeValue timeout) {
final ArrayList<BatchedTask> toRemove = new ArrayList<>();
for (BatchedTask task : tasks) {
if (task.processed.getAndSet(true) == false) {
logger.debug("task [{}] timed out after [{}]", task.source, timeout);
toRemove.add(task);
}
}
if (toRemove.isEmpty() == false) {
BatchedTask firstTask = toRemove.get(0);
Object batchingKey = firstTask.batchingKey;
assert tasks.stream().allMatch(t -> t.batchingKey == batchingKey)
: "tasks submitted in a batch should share the same batching key: " + tasks;
tasksPerBatchingKey.computeIfPresent(batchingKey, (key, existingTasks) -> {
toRemove.forEach(existingTasks::remove);
if (existingTasks.isEmpty()) {
return null;
}
return existingTasks;
});
onTimeout(toRemove, timeout);
private void onTimeoutInternal(BatchedTask task, TimeValue timeout) {
if (task.processed.getAndSet(true)) {
return;
}

logger.debug("task [{}] timed out after [{}]", task.source, timeout);
tasksPerBatchingKey.computeIfPresent(task.batchingKey, (key, existingTasks) -> {
existingTasks.remove(task);
return existingTasks.isEmpty() ? null : existingTasks;
});
onTimeout(task, timeout);
}

/**
* Action to be implemented by the specific batching implementation.
* All tasks have the same batching key.
*/
protected abstract void onTimeout(List<? extends BatchedTask> tasks, TimeValue timeout);
protected abstract void onTimeout(BatchedTask task, TimeValue timeout);

void runIfNotProcessed(BatchedTask updateTask) {
// if this task is already processed, it shouldn't execute other tasks with same batching key that arrived later,
Expand All @@ -135,6 +96,7 @@ void runIfNotProcessed(BatchedTask updateTask) {
final Set<BatchedTask> pending = tasksPerBatchingKey.remove(updateTask.batchingKey);
if (pending != null) {
// pending is a java.util.Collections.SynchronizedSet so we can safely iterate holding its mutex
// noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (pending) {
for (BatchedTask task : pending) {
if (task.processed.getAndSet(true) == false) {
Expand Down

0 comments on commit dd4d442

Please sign in to comment.