Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/134361.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 134361
summary: Add `ThreadWatchdog` to `ClusterApplierService`
area: Cluster Coordination
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.cluster.service.ClusterApplierRecordingService.Recorder;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.network.ThreadWatchdog;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -75,13 +76,27 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
Setting.Property.NodeScope
);

public static final Setting<TimeValue> CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL = Setting.positiveTimeSetting(
"cluster.service.applier.thread.watchdog.interval",
TimeValue.timeValueMinutes(5),
Setting.Property.NodeScope
);

public static final Setting<TimeValue> CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME = Setting.positiveTimeSetting(
"cluster.service.applier.thread.watchdog.quiet_time",
TimeValue.timeValueHours(1),
Setting.Property.NodeScope
);

public static final String CLUSTER_UPDATE_THREAD_NAME = "clusterApplierService#updateTask";

private final ClusterSettings clusterSettings;
private final ThreadPool threadPool;

private volatile TimeValue slowTaskLoggingThreshold;
private volatile TimeValue slowTaskThreadDumpTimeout;
private final TimeValue watchdogInterval;
private final TimeValue watchdogQuietTime;

private volatile PrioritizedEsThreadPoolExecutor threadPoolExecutor;

Expand All @@ -103,6 +118,8 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements

private NodeConnectionsService nodeConnectionsService;

private final ThreadWatchdog threadWatchdog = new ThreadWatchdog();

public ClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
this.clusterSettings = clusterSettings;
this.threadPool = threadPool;
Expand All @@ -112,6 +129,9 @@ public ClusterApplierService(String nodeName, Settings settings, ClusterSettings

clusterSettings.initializeAndWatch(CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, t -> slowTaskLoggingThreshold = t);
clusterSettings.initializeAndWatch(CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING, t -> slowTaskThreadDumpTimeout = t);

this.watchdogInterval = clusterSettings.get(CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL);
this.watchdogQuietTime = clusterSettings.get(CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME);
}

public synchronized void setNodeConnectionsService(NodeConnectionsService nodeConnectionsService) {
Expand All @@ -133,6 +153,7 @@ protected synchronized void doStart() {
Objects.requireNonNull(nodeConnectionsService, "please set the node connection service before starting");
Objects.requireNonNull(state.get(), "please set initial state before starting");
threadPoolExecutor = createThreadPoolExecutor();
threadWatchdog.run(watchdogInterval, watchdogQuietTime, threadPool, lifecycle, logger);
}

protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
Expand All @@ -156,7 +177,13 @@ class UpdateTask extends SourcePrioritizedRunnable {

@Override
public void run() {
runTask(source(), updateFunction, listener);
final var activityTracker = threadWatchdog.getActivityTrackerForCurrentThread();
try {
activityTracker.startActivity();
runTask(source(), updateFunction, listener);
} finally {
activityTracker.stopActivity();
}
}
}

Expand Down Expand Up @@ -289,17 +316,23 @@ public void addTimeoutListener(@Nullable final TimeValue timeout, final TimeoutC
threadPoolExecutor.execute(new SourcePrioritizedRunnable(Priority.HIGH, "_add_listener_") {
@Override
public void run() {
final NotifyTimeout notifyTimeout = new NotifyTimeout(listener, timeout);
final NotifyTimeout previous = timeoutClusterStateListeners.put(listener, notifyTimeout);
assert previous == null : "Added same listener [" + listener + "]";
if (lifecycle.stoppedOrClosed()) {
listener.onClose();
return;
}
if (timeout != null) {
notifyTimeout.cancellable = threadPool.schedule(notifyTimeout, timeout, threadPool.generic());
final var activityTracker = threadWatchdog.getActivityTrackerForCurrentThread();
try {
activityTracker.startActivity();
final NotifyTimeout notifyTimeout = new NotifyTimeout(listener, timeout);
final NotifyTimeout previous = timeoutClusterStateListeners.put(listener, notifyTimeout);
assert previous == null : "Added same listener [" + listener + "]";
if (lifecycle.stoppedOrClosed()) {
listener.onClose();
return;
}
if (timeout != null) {
notifyTimeout.cancellable = threadPool.schedule(notifyTimeout, timeout, threadPool.generic());
}
listener.postAdded();
} finally {
activityTracker.stopActivity();
}
listener.postAdded();
}
});
} catch (EsRejectedExecutionException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ public class ThreadWatchdog {
Setting.Property.NodeScope
);

private static final Logger logger = LogManager.getLogger(ThreadWatchdog.class);

/**
* Activity tracker for the current thread. Thread-locals are only retained by the owning thread so these will be GCd after thread exit.
*/
Expand Down Expand Up @@ -169,8 +167,17 @@ private static boolean isIdle(long value) {
}

public void run(Settings settings, ThreadPool threadPool, Lifecycle lifecycle) {
new Checker(threadPool, NETWORK_THREAD_WATCHDOG_INTERVAL.get(settings), NETWORK_THREAD_WATCHDOG_QUIET_TIME.get(settings), lifecycle)
.run();
run(
NETWORK_THREAD_WATCHDOG_INTERVAL.get(settings),
NETWORK_THREAD_WATCHDOG_QUIET_TIME.get(settings),
threadPool,
lifecycle,
LogManager.getLogger(ThreadWatchdog.class)
);
}

public void run(TimeValue interval, TimeValue quietTime, ThreadPool threadPool, Lifecycle lifecycle, Logger logger) {
new Checker(threadPool, interval, quietTime, lifecycle, logger).run();
}

/**
Expand All @@ -182,12 +189,14 @@ private final class Checker extends AbstractRunnable {
private final TimeValue interval;
private final TimeValue quietTime;
private final Lifecycle lifecycle;
private final Logger logger;

Checker(ThreadPool threadPool, TimeValue interval, TimeValue quietTime, Lifecycle lifecycle) {
Checker(ThreadPool threadPool, TimeValue interval, TimeValue quietTime, Lifecycle lifecycle, Logger logger) {
this.threadPool = threadPool;
this.interval = interval;
this.quietTime = quietTime.compareTo(interval) <= 0 ? interval : quietTime;
this.lifecycle = lifecycle;
this.logger = logger;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hot-threads dumper still links the doc for ReferenceDocs.NETWORK_THREADING_MODEL which is not applicable in the new case. Do we care?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes you're right. I can't think of a great place to document this. I mean it's kind of the same principle, this thread should be frequently idle just like the transport_worker threads. I think I'm going to say we don't care.

assert this.interval.millis() <= this.quietTime.millis();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,8 @@ public void apply(Settings value, Settings current, Settings previous) {
IndexSettings.NODE_DEFAULT_REFRESH_INTERVAL_SETTING,
ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING,
ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL,
ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME,
ClusterService.USER_DEFINED_METADATA,
MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
MasterService.MASTER_SERVICE_STARVATION_LOGGING_THRESHOLD_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -192,7 +194,11 @@ private abstract static class TestHarness implements Closeable {

final var threadPool = deterministicTaskQueue.getThreadPool();

final var settings = Settings.EMPTY;
final var settings = Settings.builder()
// disable thread watchdog to avoid infinitely repeating task
.put(ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.getKey(), TimeValue.ZERO)
.build();

final var clusterSettings = ClusterSettings.createBuiltInClusterSettings(settings);

final var transportService = new TransportService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESTestCase;
Expand Down Expand Up @@ -63,6 +64,7 @@ public void testScheduling() {
final Settings.Builder settingsBuilder = Settings.builder()
.put(Node.NODE_NAME_SETTING.getKey(), discoveryNode.getName())
.put(InternalClusterInfoService.CLUSTER_ROUTING_ALLOCATION_ESTIMATED_HEAP_THRESHOLD_DECIDER_ENABLED.getKey(), true)
.put(ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.getKey(), TimeValue.ZERO)
.put(
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(),
randomBoolean()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,10 @@ public void testBalanceQuality() throws IOException {
final var deterministicTaskQueue = new DeterministicTaskQueue();
final var threadPool = deterministicTaskQueue.getThreadPool();

final var settings = Settings.EMPTY;
final var settings = Settings.builder()
// disable thread watchdog to avoid infinitely repeating task
.put(ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.getKey(), TimeValue.ZERO)
.build();
final var clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);

final var masterService = new MasterService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,11 @@ public void testAllocate(AllocateUnassignedHandler allocateUnassigned, Consumer<
.blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK)
.build();

var settings = Settings.EMPTY;
final var settings = Settings.builder()
// disable thread watchdog to avoid infinitely repeating task
.put(ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.getKey(), TimeValue.ZERO)
.build();

var clusterSettings = createBuiltInClusterSettings(settings);
var clusterService = new ClusterService(
settings,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster.service;

import org.apache.logging.log4j.Level;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.TimeoutClusterStateListener;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLog;

import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.cluster.service.ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL;
import static org.elasticsearch.cluster.service.ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME;
import static org.mockito.Mockito.mock;

public class ClusterApplierServiceWatchdogTests extends ESTestCase {

public void testThreadWatchdogLogging() {
final var deterministicTaskQueue = new DeterministicTaskQueue();

final var settingsBuilder = Settings.builder();

final long intervalMillis;
if (randomBoolean()) {
intervalMillis = randomLongBetween(1, 1_000_000);
settingsBuilder.put(CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.getKey(), TimeValue.timeValueMillis(intervalMillis));
} else {
intervalMillis = CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.get(Settings.EMPTY).millis();
}

final long quietTimeMillis;
if (randomBoolean()) {
quietTimeMillis = randomLongBetween(intervalMillis, 3 * intervalMillis);
settingsBuilder.put(CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME.getKey(), TimeValue.timeValueMillis(quietTimeMillis));
} else {
quietTimeMillis = CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME.get(Settings.EMPTY).millis();
}

final var settings = settingsBuilder.build();

try (
var clusterApplierService = new ClusterApplierService(
randomIdentifier(),
settings,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
deterministicTaskQueue.getThreadPool()
) {
@Override
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
return deterministicTaskQueue.getPrioritizedEsThreadPoolExecutor();
}
};
var mockLog = MockLog.capture(ClusterApplierService.class)
) {
clusterApplierService.setNodeConnectionsService(mock(NodeConnectionsService.class));
clusterApplierService.setInitialState(ClusterState.EMPTY_STATE);
clusterApplierService.start();

final AtomicBoolean completedTask = new AtomicBoolean();

final Runnable hotThreadsDumpsAsserter = () -> {
final var startMillis = deterministicTaskQueue.getCurrentTimeMillis();

for (int i = 0; i < 3; i++) {
mockLog.addExpectation(
new MockLog.SeenEventExpectation(
"hot threads dump [" + i + "]",
ClusterApplierService.class.getCanonicalName(),
Level.WARN,
"hot threads dump due to active threads not making progress"
)
);

while (deterministicTaskQueue.getCurrentTimeMillis() < startMillis + 2 * intervalMillis + i * quietTimeMillis) {
deterministicTaskQueue.advanceTime();
deterministicTaskQueue.runAllRunnableTasks();
}

mockLog.assertAllExpectationsMatched();
}
};

if (randomBoolean()) {
clusterApplierService.runOnApplierThread(
"slow task",
randomFrom(Priority.values()),
ignored -> hotThreadsDumpsAsserter.run(),
ActionListener.running(() -> assertTrue(completedTask.compareAndSet(false, true)))
);
} else {
class TestListener implements TimeoutClusterStateListener {
@Override
public void postAdded() {
hotThreadsDumpsAsserter.run();
}

@Override
public void onClose() {
fail("should time out before closing");
}

@Override
public void onTimeout(TimeValue timeout) {
assertTrue(completedTask.compareAndSet(false, true));
clusterApplierService.removeTimeoutListener(TestListener.this);
}

@Override
public void clusterChanged(ClusterChangedEvent event) {
fail("no cluster state updates expected");
}
}

clusterApplierService.addTimeoutListener(
// timeout sufficiently short that it elapses while postAdded() is still running
TimeValue.timeValueMillis(randomLongBetween(0, 2 * intervalMillis + 2 * quietTimeMillis)),
new TestListener()
);
}

deterministicTaskQueue.runAllRunnableTasks();

assertTrue(completedTask.get());
}
}
}
Loading