Skip to content

Commit

Permalink
Warn on possible master service starvation (#74820)
Browse files Browse the repository at this point in the history
Today the master service processes pending tasks in priority order. If
high-priority tasks arrive too frequently then low-priority tasks are
starved of access to the master service and are not executed. This can
cause certain tasks to appear to be stuck due to apparently-unrelated
overloads elsewhere.

With this commit we measure the interval between times when the pending
task queue is empty; if this interval exceeds a configurable threshold
then we log a warning.
  • Loading branch information
DaveCTurner committed Jul 5, 2021
1 parent ba38417 commit 32c12e0
Show file tree
Hide file tree
Showing 12 changed files with 413 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,12 @@ public void testExecutionErrorOnAutoQueueFixedESThreadPoolExecutor() throws Inte
}

public void testExecutionErrorOnSinglePrioritizingThreadPoolExecutor() throws InterruptedException {
final PrioritizedEsThreadPoolExecutor prioritizedExecutor = EsExecutors.newSinglePrioritizing("test",
EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext(), threadPool.scheduler());
final PrioritizedEsThreadPoolExecutor prioritizedExecutor = EsExecutors.newSinglePrioritizing(
"test",
EsExecutors.daemonThreadFactory("test"),
threadPool.getThreadContext(),
threadPool.scheduler(),
PrioritizedEsThreadPoolExecutor.StarvationWatcher.NOOP_STARVATION_WATCHER);
try {
checkExecutionError(getExecuteRunner(prioritizedExecutor));
checkExecutionError(getSubmitRunner(prioritizedExecutor));
Expand Down Expand Up @@ -200,8 +204,12 @@ public void testExecutionExceptionOnAutoQueueFixedESThreadPoolExecutor() throws
}

public void testExecutionExceptionOnSinglePrioritizingThreadPoolExecutor() throws InterruptedException {
final PrioritizedEsThreadPoolExecutor prioritizedExecutor = EsExecutors.newSinglePrioritizing("test",
EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext(), threadPool.scheduler());
final PrioritizedEsThreadPoolExecutor prioritizedExecutor = EsExecutors.newSinglePrioritizing(
"test",
EsExecutors.daemonThreadFactory("test"),
threadPool.getThreadContext(),
threadPool.scheduler(),
PrioritizedEsThreadPoolExecutor.StarvationWatcher.NOOP_STARVATION_WATCHER);
try {
checkExecutionException(getExecuteRunner(prioritizedExecutor), true);
checkExecutionException(getSubmitRunner(prioritizedExecutor), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
nodeName + "/" + CLUSTER_UPDATE_THREAD_NAME,
daemonThreadFactory(nodeName, CLUSTER_UPDATE_THREAD_NAME),
threadPool.getThreadContext(),
threadPool.scheduler());
threadPool.scheduler(),
PrioritizedEsThreadPoolExecutor.StarvationWatcher.NOOP_STARVATION_WATCHER);
}

class UpdateTask extends SourcePrioritizedRunnable implements Function<ClusterState, ClusterState> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;

Expand All @@ -62,9 +63,16 @@
public class MasterService extends AbstractLifecycleComponent {
private static final Logger logger = LogManager.getLogger(MasterService.class);

public static final Setting<TimeValue> MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING =
Setting.positiveTimeSetting("cluster.service.slow_master_task_logging_threshold", TimeValue.timeValueSeconds(10),
Setting.Property.Dynamic, Setting.Property.NodeScope);
public static final Setting<TimeValue> MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING = Setting.positiveTimeSetting(
"cluster.service.slow_master_task_logging_threshold",
TimeValue.timeValueSeconds(10),
Setting.Property.Dynamic,
Setting.Property.NodeScope);

public static final Setting<TimeValue> MASTER_SERVICE_STARVATION_LOGGING_THRESHOLD_SETTING = Setting.positiveTimeSetting(
"cluster.service.master_service_starvation_logging_threshold",
TimeValue.timeValueMinutes(5),
Setting.Property.NodeScope);

static final String MASTER_UPDATE_THREAD_NAME = "masterService#updateTask";

Expand All @@ -75,6 +83,7 @@ public class MasterService extends AbstractLifecycleComponent {
private java.util.function.Supplier<ClusterState> clusterStateSupplier;

private volatile TimeValue slowTaskLoggingThreshold;
private final TimeValue starvationLoggingThreshold;

protected final ThreadPool threadPool;

Expand All @@ -87,6 +96,8 @@ public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadP
this.slowTaskLoggingThreshold = MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, this::setSlowTaskLoggingThreshold);

this.starvationLoggingThreshold = MASTER_SERVICE_STARVATION_LOGGING_THRESHOLD_SETTING.get(settings);

this.threadPool = threadPool;
}

Expand All @@ -112,10 +123,14 @@ protected synchronized void doStart() {

protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
return EsExecutors.newSinglePrioritizing(
nodeName + "/" + MASTER_UPDATE_THREAD_NAME,
daemonThreadFactory(nodeName, MASTER_UPDATE_THREAD_NAME),
threadPool.getThreadContext(),
threadPool.scheduler());
nodeName + "/" + MASTER_UPDATE_THREAD_NAME,
daemonThreadFactory(nodeName, MASTER_UPDATE_THREAD_NAME),
threadPool.getThreadContext(),
threadPool.scheduler(),
new MasterServiceStarvationWatcher(
starvationLoggingThreshold.getMillis(),
threadPool::relativeTimeInMillis,
() -> threadPoolExecutor));
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -794,4 +809,62 @@ public <T> void submitStateUpdateTasks(final String source,
}
}

private static class MasterServiceStarvationWatcher implements PrioritizedEsThreadPoolExecutor.StarvationWatcher {

private final long warnThreshold;
private final LongSupplier nowMillisSupplier;
private final Supplier<PrioritizedEsThreadPoolExecutor> threadPoolExecutorSupplier;

// accesses of these mutable fields are synchronized (on this)
private long lastLogMillis;
private long nonemptySinceMillis;
private boolean isEmpty = true;

MasterServiceStarvationWatcher(
long warnThreshold,
LongSupplier nowMillisSupplier,
Supplier<PrioritizedEsThreadPoolExecutor> threadPoolExecutorSupplier) {
this.nowMillisSupplier = nowMillisSupplier;
this.threadPoolExecutorSupplier = threadPoolExecutorSupplier;
this.warnThreshold = warnThreshold;
}

@Override
public synchronized void onEmptyQueue() {
isEmpty = true;
}

@Override
public void onNonemptyQueue() {
final long nowMillis = nowMillisSupplier.getAsLong();
final long nonemptyDurationMillis;
synchronized (this) {
if (isEmpty) {
isEmpty = false;
nonemptySinceMillis = nowMillis;
lastLogMillis = nowMillis;
return;
}

if (nowMillis - lastLogMillis < warnThreshold) {
return;
}

lastLogMillis = nowMillis;
nonemptyDurationMillis = nowMillis - nonemptySinceMillis;
}

final PrioritizedEsThreadPoolExecutor threadPoolExecutor = threadPoolExecutorSupplier.get();
final TimeValue maxTaskWaitTime = threadPoolExecutor.getMaxTaskWaitTime();
logger.warn("pending task queue has been nonempty for [{}/{}ms] which is longer than the warn threshold of [{}ms];" +
" there are currently [{}] pending tasks, the oldest of which has age [{}/{}ms]",
TimeValue.timeValueMillis(nonemptyDurationMillis),
nonemptyDurationMillis,
warnThreshold,
threadPoolExecutor.getNumberOfPendingTasks(),
maxTaskWaitTime,
maxTaskWaitTime.millis());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ public void apply(Settings value, Settings current, Settings previous) {
ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
ClusterService.USER_DEFINED_METADATA,
MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
MasterService.MASTER_SERVICE_STARVATION_LOGGING_THRESHOLD_SETTING,
SearchService.DEFAULT_SEARCH_TIMEOUT_SETTING,
SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS,
ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,22 @@ public static int allocatedProcessors(final Settings settings) {
return NODE_PROCESSORS_SETTING.get(settings);
}

public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(String name, ThreadFactory threadFactory,
ThreadContext contextHolder, ScheduledExecutorService timer) {
return new PrioritizedEsThreadPoolExecutor(name, 1, 1, 0L, TimeUnit.MILLISECONDS, threadFactory, contextHolder, timer);
public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(
String name,
ThreadFactory threadFactory,
ThreadContext contextHolder,
ScheduledExecutorService timer,
PrioritizedEsThreadPoolExecutor.StarvationWatcher starvationWatcher) {
return new PrioritizedEsThreadPoolExecutor(
name,
1,
1,
0L,
TimeUnit.MILLISECONDS,
threadFactory,
contextHolder,
timer,
starvationWatcher);
}

public static EsThreadPoolExecutor newScaling(String name, int min, int max, long keepAliveTime, TimeUnit unit,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,21 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
private final AtomicLong insertionOrder = new AtomicLong();
private final Queue<Runnable> current = ConcurrentCollections.newQueue();
private final ScheduledExecutorService timer;

public PrioritizedEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
ThreadFactory threadFactory, ThreadContext contextHolder, ScheduledExecutorService timer) {
private final StarvationWatcher starvationWatcher;

public PrioritizedEsThreadPoolExecutor(
String name,
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
ThreadFactory threadFactory,
ThreadContext contextHolder,
ScheduledExecutorService timer,
StarvationWatcher starvationWatcher) {
super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<>(), threadFactory, contextHolder);
this.timer = timer;
this.starvationWatcher = starvationWatcher;
}

public Pending[] getPending() {
Expand Down Expand Up @@ -101,12 +111,20 @@ private void addPending(List<Runnable> runnables, List<Pending> pending, boolean
@Override
protected void beforeExecute(Thread t, Runnable r) {
current.add(r);
if (getQueue().isEmpty()) {
starvationWatcher.onEmptyQueue();
}
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
current.remove(r);
if (getQueue().isEmpty()) {
starvationWatcher.onEmptyQueue();
} else {
starvationWatcher.onNonemptyQueue();
}
}

public void execute(Runnable command, final TimeValue timeout, final Runnable timeoutCallback) {
Expand Down Expand Up @@ -282,4 +300,33 @@ public int compareTo(PrioritizedFutureTask pft) {
}
}

/**
* We expect the work queue to be empty fairly frequently; if the queue remains nonempty for sufficiently long then there's a risk that
* some lower-priority tasks are being starved of access to the executor. Implementations of this interface are notified whether the
* work queue is empty or not before and after execution of each task, so that we can warn the user of this possible starvation.
*/
public interface StarvationWatcher {

/**
* Called before and after the execution of each task if the queue is empty (excluding the task being executed)
*/
void onEmptyQueue();

/**
* Called after the execution of each task if the queue is nonempty (excluding the task being executed)
*/
void onNonemptyQueue();

StarvationWatcher NOOP_STARVATION_WATCHER = new StarvationWatcher() {
@Override
public void onEmptyQueue() {
}

@Override
public void onNonemptyQueue() {
}
};

}

}

0 comments on commit 32c12e0

Please sign in to comment.