From b0e00ab6938050c0bcbcd25fa91d47f5a0f6460f Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 9 Sep 2025 12:07:11 +0100 Subject: [PATCH 1/4] Add `ThreadWatchdog` to `ClusterApplierService` Adds another layer of detection of slow activity on the cluster applier thread. In particular this can detect activity that isn't included in an `UpdateTask`, which particularly may include completing an expensive listener attached to a `ClusterStateObserver`. Moreover it captures a thread dump if slow activity is detected. --- .../service/ClusterApplierService.java | 55 +++++++-- .../common/network/ThreadWatchdog.java | 19 ++- .../common/settings/ClusterSettings.java | 2 + .../ClusterApplierServiceWatchdogTests.java | 109 ++++++++++++++++++ .../common/network/ThreadWatchdogTests.java | 13 ++- 5 files changed, 181 insertions(+), 17 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceWatchdogTests.java diff --git a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java index 4ae799afff655..ad4734d781db7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java @@ -25,6 +25,7 @@ import org.elasticsearch.cluster.service.ClusterApplierRecordingService.Recorder; import org.elasticsearch.common.Priority; import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.network.ThreadWatchdog; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -75,6 +76,18 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements Setting.Property.NodeScope ); + public static final Setting CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL = Setting.positiveTimeSetting( + "cluster.service.applier.thread.watchdog.interval", + TimeValue.timeValueMinutes(5), + Setting.Property.NodeScope + ); + + public static final Setting CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME = Setting.positiveTimeSetting( + "cluster.service.applier.thread.watchdog.quiet_time", + TimeValue.timeValueHours(1), + Setting.Property.NodeScope + ); + public static final String CLUSTER_UPDATE_THREAD_NAME = "clusterApplierService#updateTask"; private final ClusterSettings clusterSettings; @@ -82,6 +95,8 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements private volatile TimeValue slowTaskLoggingThreshold; private volatile TimeValue slowTaskThreadDumpTimeout; + private final TimeValue watchdogInterval; + private final TimeValue watchdogQuietTime; private volatile PrioritizedEsThreadPoolExecutor threadPoolExecutor; @@ -103,6 +118,8 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements private NodeConnectionsService nodeConnectionsService; + private final ThreadWatchdog threadWatchdog = new ThreadWatchdog(); + public ClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { this.clusterSettings = clusterSettings; this.threadPool = threadPool; @@ -112,6 +129,9 @@ public ClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings.initializeAndWatch(CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, t -> slowTaskLoggingThreshold = t); clusterSettings.initializeAndWatch(CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING, t -> slowTaskThreadDumpTimeout = t); + + this.watchdogInterval = clusterSettings.get(CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL); + this.watchdogQuietTime = clusterSettings.get(CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME); } public synchronized void setNodeConnectionsService(NodeConnectionsService nodeConnectionsService) { @@ -133,6 +153,7 @@ protected synchronized void doStart() { Objects.requireNonNull(nodeConnectionsService, "please set the node connection service before starting"); Objects.requireNonNull(state.get(), "please set initial state before starting"); threadPoolExecutor = createThreadPoolExecutor(); + threadWatchdog.run(watchdogInterval, watchdogQuietTime, threadPool, lifecycle, logger); } protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { @@ -156,7 +177,13 @@ class UpdateTask extends SourcePrioritizedRunnable { @Override public void run() { - runTask(source(), updateFunction, listener); + final var activityTracker = threadWatchdog.getActivityTrackerForCurrentThread(); + try { + activityTracker.startActivity(); + runTask(source(), updateFunction, listener); + } finally { + activityTracker.stopActivity(); + } } } @@ -289,17 +316,23 @@ public void addTimeoutListener(@Nullable final TimeValue timeout, final TimeoutC threadPoolExecutor.execute(new SourcePrioritizedRunnable(Priority.HIGH, "_add_listener_") { @Override public void run() { - final NotifyTimeout notifyTimeout = new NotifyTimeout(listener, timeout); - final NotifyTimeout previous = timeoutClusterStateListeners.put(listener, notifyTimeout); - assert previous == null : "Added same listener [" + listener + "]"; - if (lifecycle.stoppedOrClosed()) { - listener.onClose(); - return; - } - if (timeout != null) { - notifyTimeout.cancellable = threadPool.schedule(notifyTimeout, timeout, threadPool.generic()); + final var activityTracker = threadWatchdog.getActivityTrackerForCurrentThread(); + try { + activityTracker.startActivity(); + final NotifyTimeout notifyTimeout = new NotifyTimeout(listener, timeout); + final NotifyTimeout previous = timeoutClusterStateListeners.put(listener, notifyTimeout); + assert previous == null : "Added same listener [" + listener + "]"; + if (lifecycle.stoppedOrClosed()) { + listener.onClose(); + return; + } + if (timeout != null) { + notifyTimeout.cancellable = threadPool.schedule(notifyTimeout, timeout, threadPool.generic()); + } + listener.postAdded(); + } finally { + activityTracker.stopActivity(); } - listener.postAdded(); } }); } catch (EsRejectedExecutionException e) { diff --git a/server/src/main/java/org/elasticsearch/common/network/ThreadWatchdog.java b/server/src/main/java/org/elasticsearch/common/network/ThreadWatchdog.java index 5432e7cfa2676..8c6f7daaa2f7c 100644 --- a/server/src/main/java/org/elasticsearch/common/network/ThreadWatchdog.java +++ b/server/src/main/java/org/elasticsearch/common/network/ThreadWatchdog.java @@ -50,8 +50,6 @@ public class ThreadWatchdog { Setting.Property.NodeScope ); - private static final Logger logger = LogManager.getLogger(ThreadWatchdog.class); - /** * Activity tracker for the current thread. Thread-locals are only retained by the owning thread so these will be GCd after thread exit. */ @@ -169,8 +167,17 @@ private static boolean isIdle(long value) { } public void run(Settings settings, ThreadPool threadPool, Lifecycle lifecycle) { - new Checker(threadPool, NETWORK_THREAD_WATCHDOG_INTERVAL.get(settings), NETWORK_THREAD_WATCHDOG_QUIET_TIME.get(settings), lifecycle) - .run(); + run( + NETWORK_THREAD_WATCHDOG_INTERVAL.get(settings), + NETWORK_THREAD_WATCHDOG_QUIET_TIME.get(settings), + threadPool, + lifecycle, + LogManager.getLogger(ThreadWatchdog.class) + ); + } + + public void run(TimeValue interval, TimeValue quietTime, ThreadPool threadPool, Lifecycle lifecycle, Logger logger) { + new Checker(threadPool, interval, quietTime, lifecycle, logger).run(); } /** @@ -182,12 +189,14 @@ private final class Checker extends AbstractRunnable { private final TimeValue interval; private final TimeValue quietTime; private final Lifecycle lifecycle; + private final Logger logger; - Checker(ThreadPool threadPool, TimeValue interval, TimeValue quietTime, Lifecycle lifecycle) { + Checker(ThreadPool threadPool, TimeValue interval, TimeValue quietTime, Lifecycle lifecycle, Logger logger) { this.threadPool = threadPool; this.interval = interval; this.quietTime = quietTime.compareTo(interval) <= 0 ? interval : quietTime; this.lifecycle = lifecycle; + this.logger = logger; assert this.interval.millis() <= this.quietTime.millis(); } diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 9c2d6fab10368..7006b5adbe886 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -358,6 +358,8 @@ public void apply(Settings value, Settings current, Settings previous) { IndexSettings.NODE_DEFAULT_REFRESH_INTERVAL_SETTING, ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME, ClusterService.USER_DEFINED_METADATA, MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, MasterService.MASTER_SERVICE_STARVATION_LOGGING_THRESHOLD_SETTING, diff --git a/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceWatchdogTests.java b/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceWatchdogTests.java new file mode 100644 index 0000000000000..a55ce578e550d --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceWatchdogTests.java @@ -0,0 +1,109 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.service; + +import org.apache.logging.log4j.Level; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.NodeConnectionsService; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; +import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.MockLog; + +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.elasticsearch.cluster.service.ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL; +import static org.elasticsearch.cluster.service.ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME; +import static org.mockito.Mockito.mock; + +public class ClusterApplierServiceWatchdogTests extends ESTestCase { + + private static final Logger logger = LogManager.getLogger(ClusterApplierServiceWatchdogTests.class); + + public void testThreadWatchdogLogging() { + final var deterministicTaskQueue = new DeterministicTaskQueue(); + + final var settingsBuilder = Settings.builder(); + + final long intervalMillis; + if (randomBoolean()) { + intervalMillis = randomLongBetween(1, 1_000_000); + settingsBuilder.put(CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.getKey(), TimeValue.timeValueMillis(intervalMillis)); + } else { + intervalMillis = CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.get(Settings.EMPTY).millis(); + } + + final long quietTimeMillis; + if (randomBoolean()) { + quietTimeMillis = randomLongBetween(intervalMillis, 3 * intervalMillis); + settingsBuilder.put(CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME.getKey(), TimeValue.timeValueMillis(quietTimeMillis)); + } else { + quietTimeMillis = CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME.get(Settings.EMPTY).millis(); + } + + final var settings = settingsBuilder.build(); + + try ( + var clusterApplierService = new ClusterApplierService( + randomIdentifier(), + settings, + new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + deterministicTaskQueue.getThreadPool() + ) { + @Override + protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { + return deterministicTaskQueue.getPrioritizedEsThreadPoolExecutor(); + } + }; + var mockLog = MockLog.capture(ClusterApplierService.class) + ) { + clusterApplierService.setNodeConnectionsService(mock(NodeConnectionsService.class)); + clusterApplierService.setInitialState(ClusterState.EMPTY_STATE); + clusterApplierService.start(); + + final AtomicBoolean completedTask = new AtomicBoolean(); + + clusterApplierService.runOnApplierThread("blocking task", randomFrom(Priority.values()), ignored -> { + + final var startMillis = deterministicTaskQueue.getCurrentTimeMillis(); + + for (int i = 0; i < 3; i++) { + mockLog.addExpectation( + new MockLog.SeenEventExpectation( + "hot threads dump [" + i + "]", + ClusterApplierService.class.getCanonicalName(), + Level.WARN, + "hot threads dump due to active threads not making progress" + ) + ); + + while (deterministicTaskQueue.getCurrentTimeMillis() < startMillis + 2 * intervalMillis + i * quietTimeMillis) { + deterministicTaskQueue.advanceTime(); + deterministicTaskQueue.runAllRunnableTasks(); + } + + mockLog.assertAllExpectationsMatched(); + } + }, ActionListener.running(() -> completedTask.set(true))); + + deterministicTaskQueue.runAllRunnableTasks(); + + assertTrue(completedTask.get()); + } + } + +} diff --git a/server/src/test/java/org/elasticsearch/common/network/ThreadWatchdogTests.java b/server/src/test/java/org/elasticsearch/common/network/ThreadWatchdogTests.java index f8506a007bb19..033e6a510fd89 100644 --- a/server/src/test/java/org/elasticsearch/common/network/ThreadWatchdogTests.java +++ b/server/src/test/java/org/elasticsearch/common/network/ThreadWatchdogTests.java @@ -10,6 +10,7 @@ package org.elasticsearch.common.network; import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; import org.elasticsearch.common.Strings; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.settings.Settings; @@ -222,7 +223,17 @@ public void testLoggingAndScheduling() { settings.put(ThreadWatchdog.NETWORK_THREAD_WATCHDOG_QUIET_TIME.getKey(), timeValueMillis(quietTimeMillis)); } - watchdog.run(settings.build(), deterministicTaskQueue.getThreadPool(), lifecycle); + if (randomBoolean()) { + watchdog.run(settings.build(), deterministicTaskQueue.getThreadPool(), lifecycle); + } else { + watchdog.run( + TimeValue.timeValueMillis(checkIntervalMillis), + TimeValue.timeValueMillis(quietTimeMillis), + deterministicTaskQueue.getThreadPool(), + lifecycle, + LogManager.getLogger(ThreadWatchdog.class) + ); + } for (int i = 0; i < 3; i++) { assertAdvanceTime(deterministicTaskQueue, checkIntervalMillis); From 756e30aaceef51eea13f47b0b76661288f637afa Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 9 Sep 2025 12:58:03 +0100 Subject: [PATCH 2/4] Update docs/changelog/134361.yaml --- docs/changelog/134361.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/134361.yaml diff --git a/docs/changelog/134361.yaml b/docs/changelog/134361.yaml new file mode 100644 index 0000000000000..ac032771ca0e0 --- /dev/null +++ b/docs/changelog/134361.yaml @@ -0,0 +1,5 @@ +pr: 134361 +summary: Add `ThreadWatchdog` to `ClusterApplierService` +area: Cluster Coordination +type: enhancement +issues: [] From 8bc49e27567f6c806766456d75b868cd7e6af7d9 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 9 Sep 2025 16:18:12 +0100 Subject: [PATCH 3/4] Test fixes --- ...ransportIndicesShardStoresActionTests.java | 8 ++++- ...rnalClusterInfoServiceSchedulingTests.java | 2 ++ .../ClusterAllocationSimulationTests.java | 5 ++- .../DesiredBalanceShardsAllocatorTests.java | 6 +++- .../gateway/GatewayServiceTests.java | 33 +++++++++++++++---- .../cluster/ESAllocationTestCase.java | 10 +++++- .../ml/integration/AnomalyJobCRUDIT.java | 4 ++- .../AutodetectResultProcessorIT.java | 4 ++- .../ml/integration/EstablishedMemUsageIT.java | 4 ++- .../integration/JobModelSnapshotCRUDIT.java | 4 ++- .../ml/integration/JobResultsProviderIT.java | 4 ++- .../integration/JobStorageDeletionTaskIT.java | 4 ++- ...sportGetTrainedModelsStatsActionTests.java | 4 ++- .../ml/datafeed/DatafeedJobBuilderTests.java | 4 ++- .../InferenceProcessorFactoryTests.java | 4 ++- .../persistence/JobResultsPersisterTests.java | 4 ++- .../OpenJobPersistentTasksExecutorTests.java | 2 ++ .../ResultsPersisterServiceTests.java | 4 ++- .../slm/SnapshotLifecycleServiceTests.java | 4 ++- 19 files changed, 91 insertions(+), 23 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/shards/TransportIndicesShardStoresActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/shards/TransportIndicesShardStoresActionTests.java index 8c6f1482424b5..30e635930166f 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/shards/TransportIndicesShardStoresActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/shards/TransportIndicesShardStoresActionTests.java @@ -24,10 +24,12 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.service.ClusterApplierService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.shard.ShardId; @@ -192,7 +194,11 @@ private abstract static class TestHarness implements Closeable { final var threadPool = deterministicTaskQueue.getThreadPool(); - final var settings = Settings.EMPTY; + final var settings = Settings.builder() + // disable thread watchdog to avoid infinitely repeating task + .put(ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.getKey(), TimeValue.ZERO) + .build(); + final var clusterSettings = ClusterSettings.createBuiltInClusterSettings(settings); final var transportService = new TransportService( diff --git a/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java b/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java index f393cf5033bbc..26ca2419ad3df 100644 --- a/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java @@ -33,6 +33,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.node.Node; import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESTestCase; @@ -63,6 +64,7 @@ public void testScheduling() { final Settings.Builder settingsBuilder = Settings.builder() .put(Node.NODE_NAME_SETTING.getKey(), discoveryNode.getName()) .put(InternalClusterInfoService.CLUSTER_ROUTING_ALLOCATION_ESTIMATED_HEAP_THRESHOLD_DECIDER_ENABLED.getKey(), true) + .put(ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.getKey(), TimeValue.ZERO) .put( WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), randomBoolean() diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationSimulationTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationSimulationTests.java index 3dcf6b15ae1e1..e164a848a9f23 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationSimulationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationSimulationTests.java @@ -216,7 +216,10 @@ public void testBalanceQuality() throws IOException { final var deterministicTaskQueue = new DeterministicTaskQueue(); final var threadPool = deterministicTaskQueue.getThreadPool(); - final var settings = Settings.EMPTY; + final var settings = Settings.builder() + // disable thread watchdog to avoid infinitely repeating task + .put(ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.getKey(), TimeValue.ZERO) + .build(); final var clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); final var masterService = new MasterService( diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java index 7162f443340f8..a048c68a0dd74 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java @@ -138,7 +138,11 @@ public void testAllocate(AllocateUnassignedHandler allocateUnassigned, Consumer< .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK) .build(); - var settings = Settings.EMPTY; + final var settings = Settings.builder() + // disable thread watchdog to avoid infinitely repeating task + .put(ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.getKey(), TimeValue.ZERO) + .build(); + var clusterSettings = createBuiltInClusterSettings(settings); var clusterService = new ClusterService( settings, diff --git a/server/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java b/server/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java index aa0bf049c2c0c..926f3dce94c26 100644 --- a/server/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java @@ -153,7 +153,12 @@ public void testRecoverStateUpdateTask() throws Exception { public void testRecoveryWillAbortIfExpectedTermDoesNotMatch() throws Exception { final long expectedTerm = randomLongBetween(1, 42); final ClusterState stateWithBlock = buildClusterState(1, randomLongBetween(43, 99)); - final GatewayService service = createGatewayService(Settings.builder(), stateWithBlock); + final GatewayService service = createGatewayService( + Settings.builder() + // disable thread watchdog to avoid infinitely repeating task + .put(ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.getKey(), TimeValue.ZERO), + stateWithBlock + ); final ClusterStateUpdateTask clusterStateUpdateTask = service.new RecoverStateUpdateTask(expectedTerm); final ClusterState recoveredState = clusterStateUpdateTask.execute(stateWithBlock); @@ -178,7 +183,12 @@ public void testNoActionWhenNodeIsNotMaster() { final ClusterChangedEvent clusterChangedEvent = mock(ClusterChangedEvent.class); when(clusterChangedEvent.state()).thenReturn(initialState); - final GatewayService gatewayService = createGatewayService(Settings.builder(), initialState); + final GatewayService gatewayService = createGatewayService( + Settings.builder() + // disable thread watchdog to avoid infinitely repeating task + .put(ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.getKey(), TimeValue.ZERO), + initialState + ); gatewayService.clusterChanged(clusterChangedEvent); assertThat(deterministicTaskQueue.hasAnyTasks(), is(false)); assertThat(gatewayService.currentPendingStateRecovery, nullValue()); @@ -189,7 +199,9 @@ public void testNoActionWhenStateIsAlreadyRecovered() { Settings.builder() .put(GatewayService.RECOVER_AFTER_DATA_NODES_SETTING.getKey(), 2) .put(GatewayService.EXPECTED_DATA_NODES_SETTING.getKey(), 4) - .put(GatewayService.RECOVER_AFTER_TIME_SETTING.getKey(), TimeValue.timeValueMinutes(10)), + .put(GatewayService.RECOVER_AFTER_TIME_SETTING.getKey(), TimeValue.timeValueMinutes(10)) + // disable thread watchdog to avoid infinitely repeating task + .put(ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.getKey(), TimeValue.ZERO), ClusterState.builder(buildClusterState(2, randomIntBetween(1, 42))).blocks(ClusterBlocks.builder()).build() ); final GatewayService gatewayService = createGatewayService(clusterService); @@ -208,7 +220,10 @@ public void testNoActionWhenStateIsAlreadyRecovered() { } public void testImmediateRecovery() { - final Settings.Builder settingsBuilder = Settings.builder(); + final Settings.Builder settingsBuilder = Settings.builder() + // disable thread watchdog to avoid infinitely repeating task + .put(ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.getKey(), TimeValue.ZERO); + final int expectedNumberOfDataNodes = randomIntBetween(1, 3); // The cluster recover immediately because it either has the required expectedDataNodes // or both expectedDataNodes and recoverAfterTime are not configured @@ -326,8 +341,10 @@ private Tuple createServicesTupleForScheduledRec int expectedNumberOfDataNodes, boolean hasRecoverAfterTime ) { - final Settings.Builder settingsBuilder = Settings.builder(); - settingsBuilder.put(EXPECTED_DATA_NODES_SETTING.getKey(), expectedNumberOfDataNodes); + final Settings.Builder settingsBuilder = Settings.builder() + .put(EXPECTED_DATA_NODES_SETTING.getKey(), expectedNumberOfDataNodes) + // disable thread watchdog to avoid infinitely repeating task + .put(ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.getKey(), TimeValue.ZERO); if (hasRecoverAfterTime) { settingsBuilder.put(RECOVER_AFTER_TIME_SETTING.getKey(), TimeValue.timeValueMinutes(10)); } @@ -350,7 +367,9 @@ private Tuple createServicesTupleForScheduledRec } public void testScheduledRecoveryWithRecoverAfterNodes() { - final Settings.Builder settingsBuilder = Settings.builder(); + final Settings.Builder settingsBuilder = Settings.builder() + // disable thread watchdog to avoid infinitely repeating task + .put(ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.getKey(), TimeValue.ZERO); final int expectedNumberOfDataNodes = randomIntBetween(4, 6); final boolean hasRecoverAfterTime = randomBoolean(); if (hasRecoverAfterTime) { diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java index 44236024a40b0..cc468040f8285 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java @@ -42,12 +42,14 @@ import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider; +import org.elasticsearch.cluster.service.ClusterApplierService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Strings; import org.elasticsearch.core.SuppressForbidden; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.gateway.GatewayAllocator; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.IndexVersions; @@ -172,7 +174,13 @@ private static String pickShardsAllocator(Settings settings) { private static DesiredBalanceShardsAllocator createDesiredBalanceShardsAllocator(Settings settings) { var queue = new DeterministicTaskQueue(); - var clusterSettings = createBuiltInClusterSettings(settings); + var clusterSettings = createBuiltInClusterSettings( + Settings.builder() + // disable thread watchdog (submits infinitely repeating task to threadpool) by default + .put(ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.getKey(), TimeValue.ZERO) + .put(settings) + .build() + ); var clusterService = ClusterServiceUtils.createClusterService(queue.getThreadPool(), clusterSettings); return new DesiredBalanceShardsAllocator( clusterSettings, diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AnomalyJobCRUDIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AnomalyJobCRUDIT.java index 8fe87b043c78b..dcc16c0bea23b 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AnomalyJobCRUDIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AnomalyJobCRUDIT.java @@ -69,7 +69,9 @@ public void createComponents() throws Exception { ResultsPersisterService.PERSIST_RESULTS_MAX_RETRIES, ClusterService.USER_DEFINED_METADATA, ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, - ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING + ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME ) ) ); diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java index 5d06cfe0cd951..f5e0f6068ae24 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java @@ -170,7 +170,9 @@ public void createComponents() throws Exception { ClusterService.USER_DEFINED_METADATA, ResultsPersisterService.PERSIST_RESULTS_MAX_RETRIES, ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, - ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING + ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME ) ) ); diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java index 75b4abcf46963..5885319e4f472 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java @@ -59,7 +59,9 @@ public void createComponents() { OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING, ClusterService.USER_DEFINED_METADATA, ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, - ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING + ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME ) ) ); diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobModelSnapshotCRUDIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobModelSnapshotCRUDIT.java index dbc8ec3f99a97..0522959b05f9e 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobModelSnapshotCRUDIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobModelSnapshotCRUDIT.java @@ -69,7 +69,9 @@ public void createComponents() throws Exception { ResultsPersisterService.PERSIST_RESULTS_MAX_RETRIES, ClusterService.USER_DEFINED_METADATA, ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, - ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING + ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME ) ) ); diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java index a650556b0501e..9a513f2690917 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java @@ -128,7 +128,9 @@ public void createComponents() throws Exception { ResultsPersisterService.PERSIST_RESULTS_MAX_RETRIES, ClusterService.USER_DEFINED_METADATA, ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, - ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING + ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME ) ) ); diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobStorageDeletionTaskIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobStorageDeletionTaskIT.java index e85e680e494c0..7e46f260ea475 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobStorageDeletionTaskIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobStorageDeletionTaskIT.java @@ -75,7 +75,9 @@ public void createComponents() { OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING, ClusterService.USER_DEFINED_METADATA, ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, - ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING + ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME ) ) ); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java index 06ba7ba113d4e..b819b0a91f2ff 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java @@ -138,7 +138,9 @@ public void setUpVariables() { OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING, ClusterService.USER_DEFINED_METADATA, ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, - ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING + ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME ) ) ); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java index 98bcb5d7f0d8e..6061f99402828 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java @@ -83,7 +83,9 @@ public void init() { OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING, ClusterService.USER_DEFINED_METADATA, ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, - ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING + ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME ) ) ); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/ingest/InferenceProcessorFactoryTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/ingest/InferenceProcessorFactoryTests.java index 4b0c4a5f7e814..729dd64794b44 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/ingest/InferenceProcessorFactoryTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/ingest/InferenceProcessorFactoryTests.java @@ -103,7 +103,9 @@ public void setUpVariables() { OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING, ClusterService.USER_DEFINED_METADATA, ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, - ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING + ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME ) ) ); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java index 9171f73160186..81434c3288bfd 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java @@ -442,7 +442,9 @@ private ResultsPersisterService buildResultsPersisterService(OriginSettingClient ResultsPersisterService.PERSIST_RESULTS_MAX_RETRIES, ClusterService.USER_DEFINED_METADATA, ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, - ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING + ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME ) ) ); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutorTests.java index 4b1ed557ef287..d1c6c22fdcb67 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutorTests.java @@ -105,6 +105,8 @@ public void setUpMocks() { ClusterService.USER_DEFINED_METADATA, ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME, MachineLearning.CONCURRENT_JOB_ALLOCATIONS, MachineLearning.MAX_MACHINE_MEMORY_PERCENT, MachineLearningField.MAX_LAZY_ML_NODES, diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterServiceTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterServiceTests.java index 8115c0f8e42a2..16498ddc05b8e 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterServiceTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterServiceTests.java @@ -418,7 +418,9 @@ public static ResultsPersisterService buildResultsPersisterService(OriginSetting ClusterService.USER_DEFINED_METADATA, ResultsPersisterService.PERSIST_RESULTS_MAX_RETRIES, ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, - ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING + ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME ) ) ); diff --git a/x-pack/plugin/slm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleServiceTests.java b/x-pack/plugin/slm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleServiceTests.java index e170fa4dd6961..30abc28b88564 100644 --- a/x-pack/plugin/slm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleServiceTests.java +++ b/x-pack/plugin/slm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleServiceTests.java @@ -492,7 +492,9 @@ public void testStoppedPriority() { OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING, ClusterService.USER_DEFINED_METADATA, ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, - ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING + ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME ) ) ); From ea0fca3b443db9ddb2b5491e1d77659b12c399a6 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 11 Sep 2025 13:36:44 +0100 Subject: [PATCH 4/4] Also test the other path --- .../ClusterApplierServiceWatchdogTests.java | 50 ++++++++++++++++--- 1 file changed, 42 insertions(+), 8 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceWatchdogTests.java b/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceWatchdogTests.java index a55ce578e550d..21b99a34bd519 100644 --- a/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceWatchdogTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceWatchdogTests.java @@ -11,16 +11,16 @@ import org.apache.logging.log4j.Level; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.NodeConnectionsService; +import org.elasticsearch.cluster.TimeoutClusterStateListener; import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.logging.LogManager; -import org.elasticsearch.logging.Logger; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.MockLog; @@ -32,8 +32,6 @@ public class ClusterApplierServiceWatchdogTests extends ESTestCase { - private static final Logger logger = LogManager.getLogger(ClusterApplierServiceWatchdogTests.class); - public void testThreadWatchdogLogging() { final var deterministicTaskQueue = new DeterministicTaskQueue(); @@ -77,8 +75,7 @@ protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { final AtomicBoolean completedTask = new AtomicBoolean(); - clusterApplierService.runOnApplierThread("blocking task", randomFrom(Priority.values()), ignored -> { - + final Runnable hotThreadsDumpsAsserter = () -> { final var startMillis = deterministicTaskQueue.getCurrentTimeMillis(); for (int i = 0; i < 3; i++) { @@ -98,12 +95,49 @@ protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { mockLog.assertAllExpectationsMatched(); } - }, ActionListener.running(() -> completedTask.set(true))); + }; + + if (randomBoolean()) { + clusterApplierService.runOnApplierThread( + "slow task", + randomFrom(Priority.values()), + ignored -> hotThreadsDumpsAsserter.run(), + ActionListener.running(() -> assertTrue(completedTask.compareAndSet(false, true))) + ); + } else { + class TestListener implements TimeoutClusterStateListener { + @Override + public void postAdded() { + hotThreadsDumpsAsserter.run(); + } + + @Override + public void onClose() { + fail("should time out before closing"); + } + + @Override + public void onTimeout(TimeValue timeout) { + assertTrue(completedTask.compareAndSet(false, true)); + clusterApplierService.removeTimeoutListener(TestListener.this); + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + fail("no cluster state updates expected"); + } + } + + clusterApplierService.addTimeoutListener( + // timeout sufficiently short that it elapses while postAdded() is still running + TimeValue.timeValueMillis(randomLongBetween(0, 2 * intervalMillis + 2 * quietTimeMillis)), + new TestListener() + ); + } deterministicTaskQueue.runAllRunnableTasks(); assertTrue(completedTask.get()); } } - }