diff --git a/docs/changelog/134361.yaml b/docs/changelog/134361.yaml new file mode 100644 index 0000000000000..ac032771ca0e0 --- /dev/null +++ b/docs/changelog/134361.yaml @@ -0,0 +1,5 @@ +pr: 134361 +summary: Add `ThreadWatchdog` to `ClusterApplierService` +area: Cluster Coordination +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java index 4ae799afff655..ad4734d781db7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java @@ -25,6 +25,7 @@ import org.elasticsearch.cluster.service.ClusterApplierRecordingService.Recorder; import org.elasticsearch.common.Priority; import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.network.ThreadWatchdog; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -75,6 +76,18 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements Setting.Property.NodeScope ); + public static final Setting CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL = Setting.positiveTimeSetting( + "cluster.service.applier.thread.watchdog.interval", + TimeValue.timeValueMinutes(5), + Setting.Property.NodeScope + ); + + public static final Setting CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME = Setting.positiveTimeSetting( + "cluster.service.applier.thread.watchdog.quiet_time", + TimeValue.timeValueHours(1), + Setting.Property.NodeScope + ); + public static final String CLUSTER_UPDATE_THREAD_NAME = "clusterApplierService#updateTask"; private final ClusterSettings clusterSettings; @@ -82,6 +95,8 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements private volatile TimeValue slowTaskLoggingThreshold; private volatile TimeValue slowTaskThreadDumpTimeout; + private final TimeValue watchdogInterval; + private final TimeValue watchdogQuietTime; private volatile PrioritizedEsThreadPoolExecutor threadPoolExecutor; @@ -103,6 +118,8 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements private NodeConnectionsService nodeConnectionsService; + private final ThreadWatchdog threadWatchdog = new ThreadWatchdog(); + public ClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { this.clusterSettings = clusterSettings; this.threadPool = threadPool; @@ -112,6 +129,9 @@ public ClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings.initializeAndWatch(CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, t -> slowTaskLoggingThreshold = t); clusterSettings.initializeAndWatch(CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING, t -> slowTaskThreadDumpTimeout = t); + + this.watchdogInterval = clusterSettings.get(CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL); + this.watchdogQuietTime = clusterSettings.get(CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME); } public synchronized void setNodeConnectionsService(NodeConnectionsService nodeConnectionsService) { @@ -133,6 +153,7 @@ protected synchronized void doStart() { Objects.requireNonNull(nodeConnectionsService, "please set the node connection service before starting"); Objects.requireNonNull(state.get(), "please set initial state before starting"); threadPoolExecutor = createThreadPoolExecutor(); + threadWatchdog.run(watchdogInterval, watchdogQuietTime, threadPool, lifecycle, logger); } protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { @@ -156,7 +177,13 @@ class UpdateTask extends SourcePrioritizedRunnable { @Override public void run() { - runTask(source(), updateFunction, listener); + final var activityTracker = threadWatchdog.getActivityTrackerForCurrentThread(); + try { + activityTracker.startActivity(); + runTask(source(), updateFunction, listener); + } finally { + activityTracker.stopActivity(); + } } } @@ -289,17 +316,23 @@ public void addTimeoutListener(@Nullable final TimeValue timeout, final TimeoutC threadPoolExecutor.execute(new SourcePrioritizedRunnable(Priority.HIGH, "_add_listener_") { @Override public void run() { - final NotifyTimeout notifyTimeout = new NotifyTimeout(listener, timeout); - final NotifyTimeout previous = timeoutClusterStateListeners.put(listener, notifyTimeout); - assert previous == null : "Added same listener [" + listener + "]"; - if (lifecycle.stoppedOrClosed()) { - listener.onClose(); - return; - } - if (timeout != null) { - notifyTimeout.cancellable = threadPool.schedule(notifyTimeout, timeout, threadPool.generic()); + final var activityTracker = threadWatchdog.getActivityTrackerForCurrentThread(); + try { + activityTracker.startActivity(); + final NotifyTimeout notifyTimeout = new NotifyTimeout(listener, timeout); + final NotifyTimeout previous = timeoutClusterStateListeners.put(listener, notifyTimeout); + assert previous == null : "Added same listener [" + listener + "]"; + if (lifecycle.stoppedOrClosed()) { + listener.onClose(); + return; + } + if (timeout != null) { + notifyTimeout.cancellable = threadPool.schedule(notifyTimeout, timeout, threadPool.generic()); + } + listener.postAdded(); + } finally { + activityTracker.stopActivity(); } - listener.postAdded(); } }); } catch (EsRejectedExecutionException e) { diff --git a/server/src/main/java/org/elasticsearch/common/network/ThreadWatchdog.java b/server/src/main/java/org/elasticsearch/common/network/ThreadWatchdog.java index 5432e7cfa2676..8c6f7daaa2f7c 100644 --- a/server/src/main/java/org/elasticsearch/common/network/ThreadWatchdog.java +++ b/server/src/main/java/org/elasticsearch/common/network/ThreadWatchdog.java @@ -50,8 +50,6 @@ public class ThreadWatchdog { Setting.Property.NodeScope ); - private static final Logger logger = LogManager.getLogger(ThreadWatchdog.class); - /** * Activity tracker for the current thread. Thread-locals are only retained by the owning thread so these will be GCd after thread exit. */ @@ -169,8 +167,17 @@ private static boolean isIdle(long value) { } public void run(Settings settings, ThreadPool threadPool, Lifecycle lifecycle) { - new Checker(threadPool, NETWORK_THREAD_WATCHDOG_INTERVAL.get(settings), NETWORK_THREAD_WATCHDOG_QUIET_TIME.get(settings), lifecycle) - .run(); + run( + NETWORK_THREAD_WATCHDOG_INTERVAL.get(settings), + NETWORK_THREAD_WATCHDOG_QUIET_TIME.get(settings), + threadPool, + lifecycle, + LogManager.getLogger(ThreadWatchdog.class) + ); + } + + public void run(TimeValue interval, TimeValue quietTime, ThreadPool threadPool, Lifecycle lifecycle, Logger logger) { + new Checker(threadPool, interval, quietTime, lifecycle, logger).run(); } /** @@ -182,12 +189,14 @@ private final class Checker extends AbstractRunnable { private final TimeValue interval; private final TimeValue quietTime; private final Lifecycle lifecycle; + private final Logger logger; - Checker(ThreadPool threadPool, TimeValue interval, TimeValue quietTime, Lifecycle lifecycle) { + Checker(ThreadPool threadPool, TimeValue interval, TimeValue quietTime, Lifecycle lifecycle, Logger logger) { this.threadPool = threadPool; this.interval = interval; this.quietTime = quietTime.compareTo(interval) <= 0 ? interval : quietTime; this.lifecycle = lifecycle; + this.logger = logger; assert this.interval.millis() <= this.quietTime.millis(); } diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 9c2d6fab10368..7006b5adbe886 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -358,6 +358,8 @@ public void apply(Settings value, Settings current, Settings previous) { IndexSettings.NODE_DEFAULT_REFRESH_INTERVAL_SETTING, ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME, ClusterService.USER_DEFINED_METADATA, MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, MasterService.MASTER_SERVICE_STARVATION_LOGGING_THRESHOLD_SETTING, diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/shards/TransportIndicesShardStoresActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/shards/TransportIndicesShardStoresActionTests.java index 8c6f1482424b5..30e635930166f 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/shards/TransportIndicesShardStoresActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/shards/TransportIndicesShardStoresActionTests.java @@ -24,10 +24,12 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.service.ClusterApplierService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.shard.ShardId; @@ -192,7 +194,11 @@ private abstract static class TestHarness implements Closeable { final var threadPool = deterministicTaskQueue.getThreadPool(); - final var settings = Settings.EMPTY; + final var settings = Settings.builder() + // disable thread watchdog to avoid infinitely repeating task + .put(ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.getKey(), TimeValue.ZERO) + .build(); + final var clusterSettings = ClusterSettings.createBuiltInClusterSettings(settings); final var transportService = new TransportService( diff --git a/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java b/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java index f393cf5033bbc..26ca2419ad3df 100644 --- a/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java @@ -33,6 +33,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.node.Node; import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESTestCase; @@ -63,6 +64,7 @@ public void testScheduling() { final Settings.Builder settingsBuilder = Settings.builder() .put(Node.NODE_NAME_SETTING.getKey(), discoveryNode.getName()) .put(InternalClusterInfoService.CLUSTER_ROUTING_ALLOCATION_ESTIMATED_HEAP_THRESHOLD_DECIDER_ENABLED.getKey(), true) + .put(ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.getKey(), TimeValue.ZERO) .put( WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), randomBoolean() diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationSimulationTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationSimulationTests.java index 3dcf6b15ae1e1..e164a848a9f23 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationSimulationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationSimulationTests.java @@ -216,7 +216,10 @@ public void testBalanceQuality() throws IOException { final var deterministicTaskQueue = new DeterministicTaskQueue(); final var threadPool = deterministicTaskQueue.getThreadPool(); - final var settings = Settings.EMPTY; + final var settings = Settings.builder() + // disable thread watchdog to avoid infinitely repeating task + .put(ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.getKey(), TimeValue.ZERO) + .build(); final var clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); final var masterService = new MasterService( diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java index 1f067fcdba898..1b8524cb1368e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java @@ -145,7 +145,11 @@ public void testAllocate(AllocateUnassignedHandler allocateUnassigned, Consumer< .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK) .build(); - var settings = Settings.EMPTY; + final var settings = Settings.builder() + // disable thread watchdog to avoid infinitely repeating task + .put(ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.getKey(), TimeValue.ZERO) + .build(); + var clusterSettings = createBuiltInClusterSettings(settings); var clusterService = new ClusterService( settings, diff --git a/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceWatchdogTests.java b/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceWatchdogTests.java new file mode 100644 index 0000000000000..21b99a34bd519 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceWatchdogTests.java @@ -0,0 +1,143 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.service; + +import org.apache.logging.log4j.Level; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.NodeConnectionsService; +import org.elasticsearch.cluster.TimeoutClusterStateListener; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; +import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.MockLog; + +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.elasticsearch.cluster.service.ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL; +import static org.elasticsearch.cluster.service.ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME; +import static org.mockito.Mockito.mock; + +public class ClusterApplierServiceWatchdogTests extends ESTestCase { + + public void testThreadWatchdogLogging() { + final var deterministicTaskQueue = new DeterministicTaskQueue(); + + final var settingsBuilder = Settings.builder(); + + final long intervalMillis; + if (randomBoolean()) { + intervalMillis = randomLongBetween(1, 1_000_000); + settingsBuilder.put(CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.getKey(), TimeValue.timeValueMillis(intervalMillis)); + } else { + intervalMillis = CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.get(Settings.EMPTY).millis(); + } + + final long quietTimeMillis; + if (randomBoolean()) { + quietTimeMillis = randomLongBetween(intervalMillis, 3 * intervalMillis); + settingsBuilder.put(CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME.getKey(), TimeValue.timeValueMillis(quietTimeMillis)); + } else { + quietTimeMillis = CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME.get(Settings.EMPTY).millis(); + } + + final var settings = settingsBuilder.build(); + + try ( + var clusterApplierService = new ClusterApplierService( + randomIdentifier(), + settings, + new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + deterministicTaskQueue.getThreadPool() + ) { + @Override + protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { + return deterministicTaskQueue.getPrioritizedEsThreadPoolExecutor(); + } + }; + var mockLog = MockLog.capture(ClusterApplierService.class) + ) { + clusterApplierService.setNodeConnectionsService(mock(NodeConnectionsService.class)); + clusterApplierService.setInitialState(ClusterState.EMPTY_STATE); + clusterApplierService.start(); + + final AtomicBoolean completedTask = new AtomicBoolean(); + + final Runnable hotThreadsDumpsAsserter = () -> { + final var startMillis = deterministicTaskQueue.getCurrentTimeMillis(); + + for (int i = 0; i < 3; i++) { + mockLog.addExpectation( + new MockLog.SeenEventExpectation( + "hot threads dump [" + i + "]", + ClusterApplierService.class.getCanonicalName(), + Level.WARN, + "hot threads dump due to active threads not making progress" + ) + ); + + while (deterministicTaskQueue.getCurrentTimeMillis() < startMillis + 2 * intervalMillis + i * quietTimeMillis) { + deterministicTaskQueue.advanceTime(); + deterministicTaskQueue.runAllRunnableTasks(); + } + + mockLog.assertAllExpectationsMatched(); + } + }; + + if (randomBoolean()) { + clusterApplierService.runOnApplierThread( + "slow task", + randomFrom(Priority.values()), + ignored -> hotThreadsDumpsAsserter.run(), + ActionListener.running(() -> assertTrue(completedTask.compareAndSet(false, true))) + ); + } else { + class TestListener implements TimeoutClusterStateListener { + @Override + public void postAdded() { + hotThreadsDumpsAsserter.run(); + } + + @Override + public void onClose() { + fail("should time out before closing"); + } + + @Override + public void onTimeout(TimeValue timeout) { + assertTrue(completedTask.compareAndSet(false, true)); + clusterApplierService.removeTimeoutListener(TestListener.this); + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + fail("no cluster state updates expected"); + } + } + + clusterApplierService.addTimeoutListener( + // timeout sufficiently short that it elapses while postAdded() is still running + TimeValue.timeValueMillis(randomLongBetween(0, 2 * intervalMillis + 2 * quietTimeMillis)), + new TestListener() + ); + } + + deterministicTaskQueue.runAllRunnableTasks(); + + assertTrue(completedTask.get()); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/common/network/ThreadWatchdogTests.java b/server/src/test/java/org/elasticsearch/common/network/ThreadWatchdogTests.java index f8506a007bb19..033e6a510fd89 100644 --- a/server/src/test/java/org/elasticsearch/common/network/ThreadWatchdogTests.java +++ b/server/src/test/java/org/elasticsearch/common/network/ThreadWatchdogTests.java @@ -10,6 +10,7 @@ package org.elasticsearch.common.network; import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; import org.elasticsearch.common.Strings; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.settings.Settings; @@ -222,7 +223,17 @@ public void testLoggingAndScheduling() { settings.put(ThreadWatchdog.NETWORK_THREAD_WATCHDOG_QUIET_TIME.getKey(), timeValueMillis(quietTimeMillis)); } - watchdog.run(settings.build(), deterministicTaskQueue.getThreadPool(), lifecycle); + if (randomBoolean()) { + watchdog.run(settings.build(), deterministicTaskQueue.getThreadPool(), lifecycle); + } else { + watchdog.run( + TimeValue.timeValueMillis(checkIntervalMillis), + TimeValue.timeValueMillis(quietTimeMillis), + deterministicTaskQueue.getThreadPool(), + lifecycle, + LogManager.getLogger(ThreadWatchdog.class) + ); + } for (int i = 0; i < 3; i++) { assertAdvanceTime(deterministicTaskQueue, checkIntervalMillis); diff --git a/server/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java b/server/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java index aa0bf049c2c0c..926f3dce94c26 100644 --- a/server/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java @@ -153,7 +153,12 @@ public void testRecoverStateUpdateTask() throws Exception { public void testRecoveryWillAbortIfExpectedTermDoesNotMatch() throws Exception { final long expectedTerm = randomLongBetween(1, 42); final ClusterState stateWithBlock = buildClusterState(1, randomLongBetween(43, 99)); - final GatewayService service = createGatewayService(Settings.builder(), stateWithBlock); + final GatewayService service = createGatewayService( + Settings.builder() + // disable thread watchdog to avoid infinitely repeating task + .put(ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.getKey(), TimeValue.ZERO), + stateWithBlock + ); final ClusterStateUpdateTask clusterStateUpdateTask = service.new RecoverStateUpdateTask(expectedTerm); final ClusterState recoveredState = clusterStateUpdateTask.execute(stateWithBlock); @@ -178,7 +183,12 @@ public void testNoActionWhenNodeIsNotMaster() { final ClusterChangedEvent clusterChangedEvent = mock(ClusterChangedEvent.class); when(clusterChangedEvent.state()).thenReturn(initialState); - final GatewayService gatewayService = createGatewayService(Settings.builder(), initialState); + final GatewayService gatewayService = createGatewayService( + Settings.builder() + // disable thread watchdog to avoid infinitely repeating task + .put(ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.getKey(), TimeValue.ZERO), + initialState + ); gatewayService.clusterChanged(clusterChangedEvent); assertThat(deterministicTaskQueue.hasAnyTasks(), is(false)); assertThat(gatewayService.currentPendingStateRecovery, nullValue()); @@ -189,7 +199,9 @@ public void testNoActionWhenStateIsAlreadyRecovered() { Settings.builder() .put(GatewayService.RECOVER_AFTER_DATA_NODES_SETTING.getKey(), 2) .put(GatewayService.EXPECTED_DATA_NODES_SETTING.getKey(), 4) - .put(GatewayService.RECOVER_AFTER_TIME_SETTING.getKey(), TimeValue.timeValueMinutes(10)), + .put(GatewayService.RECOVER_AFTER_TIME_SETTING.getKey(), TimeValue.timeValueMinutes(10)) + // disable thread watchdog to avoid infinitely repeating task + .put(ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.getKey(), TimeValue.ZERO), ClusterState.builder(buildClusterState(2, randomIntBetween(1, 42))).blocks(ClusterBlocks.builder()).build() ); final GatewayService gatewayService = createGatewayService(clusterService); @@ -208,7 +220,10 @@ public void testNoActionWhenStateIsAlreadyRecovered() { } public void testImmediateRecovery() { - final Settings.Builder settingsBuilder = Settings.builder(); + final Settings.Builder settingsBuilder = Settings.builder() + // disable thread watchdog to avoid infinitely repeating task + .put(ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.getKey(), TimeValue.ZERO); + final int expectedNumberOfDataNodes = randomIntBetween(1, 3); // The cluster recover immediately because it either has the required expectedDataNodes // or both expectedDataNodes and recoverAfterTime are not configured @@ -326,8 +341,10 @@ private Tuple createServicesTupleForScheduledRec int expectedNumberOfDataNodes, boolean hasRecoverAfterTime ) { - final Settings.Builder settingsBuilder = Settings.builder(); - settingsBuilder.put(EXPECTED_DATA_NODES_SETTING.getKey(), expectedNumberOfDataNodes); + final Settings.Builder settingsBuilder = Settings.builder() + .put(EXPECTED_DATA_NODES_SETTING.getKey(), expectedNumberOfDataNodes) + // disable thread watchdog to avoid infinitely repeating task + .put(ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.getKey(), TimeValue.ZERO); if (hasRecoverAfterTime) { settingsBuilder.put(RECOVER_AFTER_TIME_SETTING.getKey(), TimeValue.timeValueMinutes(10)); } @@ -350,7 +367,9 @@ private Tuple createServicesTupleForScheduledRec } public void testScheduledRecoveryWithRecoverAfterNodes() { - final Settings.Builder settingsBuilder = Settings.builder(); + final Settings.Builder settingsBuilder = Settings.builder() + // disable thread watchdog to avoid infinitely repeating task + .put(ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.getKey(), TimeValue.ZERO); final int expectedNumberOfDataNodes = randomIntBetween(4, 6); final boolean hasRecoverAfterTime = randomBoolean(); if (hasRecoverAfterTime) { diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java index 7d12d0e77cb99..6b6ef65789c37 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java @@ -42,12 +42,14 @@ import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider; +import org.elasticsearch.cluster.service.ClusterApplierService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Strings; import org.elasticsearch.core.SuppressForbidden; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.gateway.GatewayAllocator; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.IndexVersions; @@ -174,7 +176,13 @@ private static String pickShardsAllocator(Settings settings) { private static DesiredBalanceShardsAllocator createDesiredBalanceShardsAllocator(Settings settings) { var queue = new DeterministicTaskQueue(); - var clusterSettings = createBuiltInClusterSettings(settings); + var clusterSettings = createBuiltInClusterSettings( + Settings.builder() + // disable thread watchdog (submits infinitely repeating task to threadpool) by default + .put(ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.getKey(), TimeValue.ZERO) + .put(settings) + .build() + ); var clusterService = ClusterServiceUtils.createClusterService(queue.getThreadPool(), clusterSettings); return new DesiredBalanceShardsAllocator( clusterSettings, diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AnomalyJobCRUDIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AnomalyJobCRUDIT.java index 8fe87b043c78b..dcc16c0bea23b 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AnomalyJobCRUDIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AnomalyJobCRUDIT.java @@ -69,7 +69,9 @@ public void createComponents() throws Exception { ResultsPersisterService.PERSIST_RESULTS_MAX_RETRIES, ClusterService.USER_DEFINED_METADATA, ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, - ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING + ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME ) ) ); diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java index 5d06cfe0cd951..f5e0f6068ae24 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java @@ -170,7 +170,9 @@ public void createComponents() throws Exception { ClusterService.USER_DEFINED_METADATA, ResultsPersisterService.PERSIST_RESULTS_MAX_RETRIES, ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, - ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING + ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME ) ) ); diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java index 75b4abcf46963..5885319e4f472 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java @@ -59,7 +59,9 @@ public void createComponents() { OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING, ClusterService.USER_DEFINED_METADATA, ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, - ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING + ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME ) ) ); diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobModelSnapshotCRUDIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobModelSnapshotCRUDIT.java index dbc8ec3f99a97..0522959b05f9e 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobModelSnapshotCRUDIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobModelSnapshotCRUDIT.java @@ -69,7 +69,9 @@ public void createComponents() throws Exception { ResultsPersisterService.PERSIST_RESULTS_MAX_RETRIES, ClusterService.USER_DEFINED_METADATA, ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, - ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING + ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME ) ) ); diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java index a650556b0501e..9a513f2690917 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java @@ -128,7 +128,9 @@ public void createComponents() throws Exception { ResultsPersisterService.PERSIST_RESULTS_MAX_RETRIES, ClusterService.USER_DEFINED_METADATA, ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, - ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING + ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME ) ) ); diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobStorageDeletionTaskIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobStorageDeletionTaskIT.java index e85e680e494c0..7e46f260ea475 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobStorageDeletionTaskIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobStorageDeletionTaskIT.java @@ -75,7 +75,9 @@ public void createComponents() { OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING, ClusterService.USER_DEFINED_METADATA, ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, - ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING + ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME ) ) ); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java index 85f0aa5c40a35..88ed17e79c894 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java @@ -139,7 +139,9 @@ public void setUpVariables() { OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING, ClusterService.USER_DEFINED_METADATA, ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, - ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING + ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME ) ) ); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java index 98bcb5d7f0d8e..6061f99402828 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java @@ -83,7 +83,9 @@ public void init() { OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING, ClusterService.USER_DEFINED_METADATA, ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, - ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING + ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME ) ) ); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/ingest/InferenceProcessorFactoryTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/ingest/InferenceProcessorFactoryTests.java index 4b0c4a5f7e814..729dd64794b44 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/ingest/InferenceProcessorFactoryTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/ingest/InferenceProcessorFactoryTests.java @@ -103,7 +103,9 @@ public void setUpVariables() { OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING, ClusterService.USER_DEFINED_METADATA, ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, - ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING + ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME ) ) ); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java index 9171f73160186..81434c3288bfd 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java @@ -442,7 +442,9 @@ private ResultsPersisterService buildResultsPersisterService(OriginSettingClient ResultsPersisterService.PERSIST_RESULTS_MAX_RETRIES, ClusterService.USER_DEFINED_METADATA, ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, - ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING + ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME ) ) ); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutorTests.java index 4b1ed557ef287..d1c6c22fdcb67 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutorTests.java @@ -105,6 +105,8 @@ public void setUpMocks() { ClusterService.USER_DEFINED_METADATA, ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME, MachineLearning.CONCURRENT_JOB_ALLOCATIONS, MachineLearning.MAX_MACHINE_MEMORY_PERCENT, MachineLearningField.MAX_LAZY_ML_NODES, diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterServiceTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterServiceTests.java index 8115c0f8e42a2..16498ddc05b8e 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterServiceTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterServiceTests.java @@ -418,7 +418,9 @@ public static ResultsPersisterService buildResultsPersisterService(OriginSetting ClusterService.USER_DEFINED_METADATA, ResultsPersisterService.PERSIST_RESULTS_MAX_RETRIES, ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, - ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING + ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME ) ) ); diff --git a/x-pack/plugin/slm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleServiceTests.java b/x-pack/plugin/slm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleServiceTests.java index d328b4621d237..c36ceb7cc8324 100644 --- a/x-pack/plugin/slm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleServiceTests.java +++ b/x-pack/plugin/slm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleServiceTests.java @@ -494,7 +494,9 @@ public void testStoppedPriority() { OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING, ClusterService.USER_DEFINED_METADATA, ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, - ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING + ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL, + ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME ) ) );