From 04f9563eb3120ff84fde4902c55955273a0fda99 Mon Sep 17 00:00:00 2001 From: Niyati Aggarwal <121826855+niyatiagg@users.noreply.github.com> Date: Sat, 11 Nov 2023 16:45:34 -0800 Subject: [PATCH] Adding support for dynamically updating Leader/follower checker timeouts (#10528) * making leader check timeout dynamic Signed-off-by: Niyati Aggarwal * making follower check timeout dynamic Signed-off-by: Niyati Aggarwal * fixing existing unit tests Signed-off-by: Niyati Aggarwal * fixing checkstyle violations Signed-off-by: Niyati Aggarwal * adding tests for leader/follower check timeout Signed-off-by: Niyati Aggarwal * setting maximum and minimum timeout value for leader/follower checker Signed-off-by: Niyati Aggarwal * adding tests for checking boundary cases Signed-off-by: Niyati Aggarwal * Fixing checkstyle violations Signed-off-by: Niyati Aggarwal * changed the log file and added other suggested changes Signed-off-by: Niyati Aggarwal * fixing checkstyle violations Signed-off-by: Niyati Aggarwal * Addressing review comments Signed-off-by: Niyati Aggarwal * addressing proposed changes Signed-off-by: Niyati Aggarwal * Applying checkstyle fixes Signed-off-by: Niyati Aggarwal * Fixing flakiness for existing tests Signed-off-by: Niyati Aggarwal * Applying checkstyle fixes Signed-off-by: Niyati Aggarwal * Fixing the timeout value limits for randomSettings Signed-off-by: Niyati Aggarwal --------- Signed-off-by: Niyati Aggarwal Signed-off-by: Shivansh Arora --- CHANGELOG.md | 1 + .../cluster/coordination/Coordinator.java | 3 +- .../coordination/FollowersChecker.java | 14 ++- .../cluster/coordination/LeaderChecker.java | 13 +- .../transport/TransportChannel.java | 2 +- .../CoordinationCheckerSettingsTests.java | 113 ++++++++++++++++++ .../coordination/FollowersCheckerTests.java | 29 +++-- .../coordination/LeaderCheckerTests.java | 12 +- 8 files changed, 164 insertions(+), 23 deletions(-) create mode 100644 server/src/test/java/org/opensearch/cluster/coordination/CoordinationCheckerSettingsTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index e8f27b9323306..e45eb3870b522 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -129,6 +129,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [BUG] Disable sort optimization for HALF_FLOAT ([#10999](https://github.com/opensearch-project/OpenSearch/pull/10999)) - Performance improvement for MultiTerm Queries on Keyword fields ([#7057](https://github.com/opensearch-project/OpenSearch/issues/7057)) - Disable concurrent aggs for Diversified Sampler and Sampler aggs ([#11087](https://github.com/opensearch-project/OpenSearch/issues/11087)) +- Made leader/follower check timeout setting dynamic ([#10528](https://github.com/opensearch-project/OpenSearch/pull/10528)) - Use iterative approach to evaluate Regex.simpleMatch ([#11060](https://github.com/opensearch-project/OpenSearch/pull/11060)) ### Deprecated diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index a4ffab7fb70c9..5a07f964f94a4 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -261,9 +261,10 @@ public Coordinator( this::handlePublishRequest, this::handleApplyCommit ); - this.leaderChecker = new LeaderChecker(settings, transportService, this::onLeaderFailure, nodeHealthService); + this.leaderChecker = new LeaderChecker(settings, clusterSettings, transportService, this::onLeaderFailure, nodeHealthService); this.followersChecker = new FollowersChecker( settings, + clusterSettings, transportService, this::onFollowerCheckRequest, this::removeNode, diff --git a/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java b/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java index f69a4f771cf21..70bb0515bb022 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java @@ -38,6 +38,7 @@ import org.opensearch.cluster.coordination.Coordinator.Mode; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; @@ -98,7 +99,9 @@ public class FollowersChecker { "cluster.fault_detection.follower_check.timeout", TimeValue.timeValueMillis(10000), TimeValue.timeValueMillis(1), - Setting.Property.NodeScope + TimeValue.timeValueMillis(60000), + Setting.Property.NodeScope, + Setting.Property.Dynamic ); // the number of failed checks that must happen before the follower is considered to have failed. @@ -112,7 +115,7 @@ public class FollowersChecker { private final Settings settings; private final TimeValue followerCheckInterval; - private final TimeValue followerCheckTimeout; + private TimeValue followerCheckTimeout; private final int followerCheckRetryCount; private final BiConsumer onNodeFailure; private final Consumer handleRequestAndUpdateState; @@ -127,6 +130,7 @@ public class FollowersChecker { public FollowersChecker( Settings settings, + ClusterSettings clusterSettings, TransportService transportService, Consumer handleRequestAndUpdateState, BiConsumer onNodeFailure, @@ -141,7 +145,7 @@ public FollowersChecker( followerCheckInterval = FOLLOWER_CHECK_INTERVAL_SETTING.get(settings); followerCheckTimeout = FOLLOWER_CHECK_TIMEOUT_SETTING.get(settings); followerCheckRetryCount = FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings); - + clusterSettings.addSettingsUpdateConsumer(FOLLOWER_CHECK_TIMEOUT_SETTING, this::setFollowerCheckTimeout); updateFastResponseState(0, Mode.CANDIDATE); transportService.registerRequestHandler( FOLLOWER_CHECK_ACTION_NAME, @@ -159,6 +163,10 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti }); } + private void setFollowerCheckTimeout(TimeValue followerCheckTimeout) { + this.followerCheckTimeout = followerCheckTimeout; + } + /** * Update the set of known nodes, starting to check any new ones and stopping checking any previously-known-but-now-unknown ones. */ diff --git a/server/src/main/java/org/opensearch/cluster/coordination/LeaderChecker.java b/server/src/main/java/org/opensearch/cluster/coordination/LeaderChecker.java index 69ba1f977f326..8d4373b865f62 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/LeaderChecker.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/LeaderChecker.java @@ -40,6 +40,7 @@ import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.common.Nullable; import org.opensearch.common.lease.Releasable; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; @@ -97,7 +98,9 @@ public class LeaderChecker { "cluster.fault_detection.leader_check.timeout", TimeValue.timeValueMillis(10000), TimeValue.timeValueMillis(1), - Setting.Property.NodeScope + TimeValue.timeValueMillis(60000), + Setting.Property.NodeScope, + Setting.Property.Dynamic ); // the number of failed checks that must happen before the leader is considered to have failed. @@ -111,7 +114,7 @@ public class LeaderChecker { private final Settings settings; private final TimeValue leaderCheckInterval; - private final TimeValue leaderCheckTimeout; + private TimeValue leaderCheckTimeout; private final int leaderCheckRetryCount; private final TransportService transportService; private final Consumer onLeaderFailure; @@ -123,6 +126,7 @@ public class LeaderChecker { LeaderChecker( final Settings settings, + final ClusterSettings clusterSettings, final TransportService transportService, final Consumer onLeaderFailure, NodeHealthService nodeHealthService @@ -134,6 +138,7 @@ public class LeaderChecker { this.transportService = transportService; this.onLeaderFailure = onLeaderFailure; this.nodeHealthService = nodeHealthService; + clusterSettings.addSettingsUpdateConsumer(LEADER_CHECK_TIMEOUT_SETTING, this::setLeaderCheckTimeout); transportService.registerRequestHandler( LEADER_CHECK_ACTION_NAME, @@ -155,6 +160,10 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti }); } + private void setLeaderCheckTimeout(TimeValue leaderCheckTimeout) { + this.leaderCheckTimeout = leaderCheckTimeout; + } + public DiscoveryNode leader() { CheckScheduler checkScheduler = currentChecker.get(); return checkScheduler == null ? null : checkScheduler.leader; diff --git a/server/src/main/java/org/opensearch/transport/TransportChannel.java b/server/src/main/java/org/opensearch/transport/TransportChannel.java index 7423d59103302..f84ee5dc745c3 100644 --- a/server/src/main/java/org/opensearch/transport/TransportChannel.java +++ b/server/src/main/java/org/opensearch/transport/TransportChannel.java @@ -82,7 +82,7 @@ static void sendErrorResponse(TransportChannel channel, String actionName, Trans /** * Returns the contextual property associated with this specific transport channel (the - * implementation of how such properties are managed depends on the the particular + * implementation of how such properties are managed depends on the particular * transport engine). * * @param name the name of the property diff --git a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationCheckerSettingsTests.java b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationCheckerSettingsTests.java new file mode 100644 index 0000000000000..56bd2d94dce84 --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationCheckerSettingsTests.java @@ -0,0 +1,113 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.coordination; + +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.test.OpenSearchSingleNodeTestCase; + +import static org.opensearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING; +import static org.opensearch.cluster.coordination.LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING; +import static org.opensearch.common.unit.TimeValue.timeValueSeconds; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +public class CoordinationCheckerSettingsTests extends OpenSearchSingleNodeTestCase { + public void testFollowerCheckTimeoutValueUpdate() { + Setting setting1 = FOLLOWER_CHECK_TIMEOUT_SETTING; + Settings timeSettings1 = Settings.builder().put(setting1.getKey(), "60s").build(); + try { + ClusterUpdateSettingsResponse response = client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(timeSettings1) + .execute() + .actionGet(); + + assertAcked(response); + assertEquals(timeValueSeconds(60), setting1.get(response.getPersistentSettings())); + } finally { + // cleanup + timeSettings1 = Settings.builder().putNull(setting1.getKey()).build(); + client().admin().cluster().prepareUpdateSettings().setPersistentSettings(timeSettings1).execute().actionGet(); + } + } + + public void testFollowerCheckTimeoutMaxValue() { + Setting setting1 = FOLLOWER_CHECK_TIMEOUT_SETTING; + Settings timeSettings1 = Settings.builder().put(setting1.getKey(), "61s").build(); + + assertThrows( + "failed to parse value [61s] for setting [" + setting1.getKey() + "], must be <= [60000ms]", + IllegalArgumentException.class, + () -> { + client().admin().cluster().prepareUpdateSettings().setPersistentSettings(timeSettings1).execute().actionGet(); + } + ); + } + + public void testFollowerCheckTimeoutMinValue() { + Setting setting1 = FOLLOWER_CHECK_TIMEOUT_SETTING; + Settings timeSettings1 = Settings.builder().put(setting1.getKey(), "0s").build(); + + assertThrows( + "failed to parse value [0s] for setting [" + setting1.getKey() + "], must be >= [1ms]", + IllegalArgumentException.class, + () -> { + client().admin().cluster().prepareUpdateSettings().setPersistentSettings(timeSettings1).execute().actionGet(); + } + ); + } + + public void testLeaderCheckTimeoutValueUpdate() { + Setting setting1 = LEADER_CHECK_TIMEOUT_SETTING; + Settings timeSettings1 = Settings.builder().put(setting1.getKey(), "60s").build(); + try { + ClusterUpdateSettingsResponse response = client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(timeSettings1) + .execute() + .actionGet(); + assertAcked(response); + assertEquals(timeValueSeconds(60), setting1.get(response.getPersistentSettings())); + } finally { + // cleanup + timeSettings1 = Settings.builder().putNull(setting1.getKey()).build(); + client().admin().cluster().prepareUpdateSettings().setPersistentSettings(timeSettings1).execute().actionGet(); + } + } + + public void testLeaderCheckTimeoutMaxValue() { + Setting setting1 = LEADER_CHECK_TIMEOUT_SETTING; + Settings timeSettings1 = Settings.builder().put(setting1.getKey(), "61s").build(); + + assertThrows( + "failed to parse value [61s] for setting [" + setting1.getKey() + "], must be <= [60000ms]", + IllegalArgumentException.class, + () -> { + client().admin().cluster().prepareUpdateSettings().setPersistentSettings(timeSettings1).execute().actionGet(); + } + ); + } + + public void testLeaderCheckTimeoutMinValue() { + Setting setting1 = LEADER_CHECK_TIMEOUT_SETTING; + Settings timeSettings1 = Settings.builder().put(setting1.getKey(), "0s").build(); + + assertThrows( + "failed to parse value [0s] for setting [" + setting1.getKey() + "], must be >= [1ms]", + IllegalArgumentException.class, + () -> { + client().admin().cluster().prepareUpdateSettings().setPersistentSettings(timeSettings1).execute().actionGet(); + } + ); + } +} diff --git a/server/src/test/java/org/opensearch/cluster/coordination/FollowersCheckerTests.java b/server/src/test/java/org/opensearch/cluster/coordination/FollowersCheckerTests.java index c152a1606681e..a106706c00732 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/FollowersCheckerTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/FollowersCheckerTests.java @@ -39,6 +39,7 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.Settings.Builder; import org.opensearch.core.common.io.stream.StreamInput; @@ -96,7 +97,7 @@ public class FollowersCheckerTests extends OpenSearchTestCase { public void testChecksExpectedNodes() { final DiscoveryNode localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT); final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), localNode.getName()).build(); - + final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); final DiscoveryNodes[] discoveryNodesHolder = new DiscoveryNodes[] { DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).build() }; @@ -132,6 +133,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req final FollowersChecker followersChecker = new FollowersChecker( settings, + clusterSettings, transportService, fcr -> { assert false : fcr; }, (node, reason) -> { @@ -257,6 +259,7 @@ public void testFailsNodeThatDisconnects() { final DiscoveryNode localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT); final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT); final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), localNode.getName()).build(); + final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings, random()); final MockTransport mockTransport = new MockTransport() { @@ -297,6 +300,7 @@ public String toString() { final FollowersChecker followersChecker = new FollowersChecker( settings, + clusterSettings, transportService, fcr -> { assert false : fcr; }, (node, reason) -> { @@ -336,6 +340,7 @@ private void testBehaviourOfFailingNode( final DiscoveryNode localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT); final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT); final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), localNode.getName()).put(testSettings).build(); + final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings, random()); final MockTransport mockTransport = new MockTransport() { @@ -384,6 +389,7 @@ public String toString() { final FollowersChecker followersChecker = new FollowersChecker( settings, + clusterSettings, transportService, fcr -> { assert false : fcr; }, (node, reason) -> { @@ -464,6 +470,7 @@ public void testUnhealthyNodeRejectsImmediately() { final DiscoveryNode leader = new DiscoveryNode("leader", buildNewFakeTransportAddress(), Version.CURRENT); final DiscoveryNode follower = new DiscoveryNode("follower", buildNewFakeTransportAddress(), Version.CURRENT); final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), follower.getName()).build(); + final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings, random()); final MockTransport mockTransport = new MockTransport() { @@ -488,7 +495,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req final AtomicBoolean calledCoordinator = new AtomicBoolean(); final AtomicReference coordinatorException = new AtomicReference<>(); - final FollowersChecker followersChecker = new FollowersChecker(settings, transportService, fcr -> { + final FollowersChecker followersChecker = new FollowersChecker(settings, clusterSettings, transportService, fcr -> { assertTrue(calledCoordinator.compareAndSet(false, true)); final RuntimeException exception = coordinatorException.get(); if (exception != null) { @@ -536,6 +543,7 @@ public void testResponder() { final DiscoveryNode leader = new DiscoveryNode("leader", buildNewFakeTransportAddress(), Version.CURRENT); final DiscoveryNode follower = new DiscoveryNode("follower", buildNewFakeTransportAddress(), Version.CURRENT); final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), follower.getName()).build(); + final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings, random()); final MockTransport mockTransport = new MockTransport() { @@ -560,7 +568,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req final AtomicBoolean calledCoordinator = new AtomicBoolean(); final AtomicReference coordinatorException = new AtomicReference<>(); - final FollowersChecker followersChecker = new FollowersChecker(settings, transportService, fcr -> { + final FollowersChecker followersChecker = new FollowersChecker(settings, clusterSettings, transportService, fcr -> { assertTrue(calledCoordinator.compareAndSet(false, true)); final RuntimeException exception = coordinatorException.get(); if (exception != null) { @@ -700,6 +708,7 @@ public void testPreferClusterManagerNodes() { DiscoveryNodes discoveryNodes = discoNodesBuilder.localNodeId(nodes.get(0).getId()).build(); CapturingTransport capturingTransport = new CapturingTransport(); final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), nodes.get(0).getName()).build(); + final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings, random()); TransportService transportService = capturingTransport.createTransportService( Settings.EMPTY, @@ -710,15 +719,9 @@ public void testPreferClusterManagerNodes() { emptySet(), NoopTracer.INSTANCE ); - final FollowersChecker followersChecker = new FollowersChecker( - Settings.EMPTY, - transportService, - fcr -> { assert false : fcr; }, - (node, reason) -> { - assert false : node; - }, - () -> new StatusInfo(HEALTHY, "healthy-info") - ); + final FollowersChecker followersChecker = new FollowersChecker(Settings.EMPTY, clusterSettings, transportService, fcr -> { + assert false : fcr; + }, (node, reason) -> { assert false : node; }, () -> new StatusInfo(HEALTHY, "healthy-info")); followersChecker.setCurrentNodes(discoveryNodes); List followerTargets = Stream.of(capturingTransport.getCapturedRequestsAndClear()) .map(cr -> cr.node) @@ -754,7 +757,7 @@ private static Settings randomSettings() { settingsBuilder.put(FOLLOWER_CHECK_INTERVAL_SETTING.getKey(), randomIntBetween(100, 100000) + "ms"); } if (randomBoolean()) { - settingsBuilder.put(FOLLOWER_CHECK_TIMEOUT_SETTING.getKey(), randomIntBetween(1, 100000) + "ms"); + settingsBuilder.put(FOLLOWER_CHECK_TIMEOUT_SETTING.getKey(), randomIntBetween(1, 60000) + "ms"); } return settingsBuilder.build(); } diff --git a/server/src/test/java/org/opensearch/cluster/coordination/LeaderCheckerTests.java b/server/src/test/java/org/opensearch/cluster/coordination/LeaderCheckerTests.java index 8915f4c5c1274..fe65058333116 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/LeaderCheckerTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/LeaderCheckerTests.java @@ -38,6 +38,7 @@ import org.opensearch.cluster.coordination.LeaderChecker.LeaderCheckRequest; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.transport.TransportResponse; @@ -117,6 +118,7 @@ public void testFollowerBehaviour() { final AtomicBoolean allResponsesFail = new AtomicBoolean(); final Settings settings = settingsBuilder.build(); + final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); logger.info("--> using {}", settings); final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings, random()); @@ -174,7 +176,7 @@ public String toString() { final AtomicBoolean leaderFailed = new AtomicBoolean(); - final LeaderChecker leaderChecker = new LeaderChecker(settings, transportService, e -> { + final LeaderChecker leaderChecker = new LeaderChecker(settings, clusterSettings, transportService, e -> { assertThat(e.getMessage(), matchesRegex("node \\[.*\\] failed \\[[1-9][0-9]*\\] consecutive checks")); assertTrue(leaderFailed.compareAndSet(false, true)); }, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info")); @@ -242,6 +244,7 @@ public void testFollowerFailsImmediatelyOnDisconnection() { final Response[] responseHolder = new Response[] { Response.SUCCESS }; final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), localNode.getId()).build(); + final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings, random()); final MockTransport mockTransport = new MockTransport() { @Override @@ -290,7 +293,7 @@ public String toString() { transportService.acceptIncomingRequests(); final AtomicBoolean leaderFailed = new AtomicBoolean(); - final LeaderChecker leaderChecker = new LeaderChecker(settings, transportService, e -> { + final LeaderChecker leaderChecker = new LeaderChecker(settings, clusterSettings, transportService, e -> { assertThat(e.getMessage(), anyOf(endsWith("disconnected"), endsWith("disconnected during check"))); assertTrue(leaderFailed.compareAndSet(false, true)); }, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info")); @@ -357,6 +360,7 @@ public void testFollowerFailsImmediatelyOnHealthCheckFailure() { final Response[] responseHolder = new Response[] { Response.SUCCESS }; final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), localNode.getId()).build(); + final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings, random()); final MockTransport mockTransport = new MockTransport() { @Override @@ -403,7 +407,7 @@ public String toString() { transportService.acceptIncomingRequests(); final AtomicBoolean leaderFailed = new AtomicBoolean(); - final LeaderChecker leaderChecker = new LeaderChecker(settings, transportService, e -> { + final LeaderChecker leaderChecker = new LeaderChecker(settings, clusterSettings, transportService, e -> { assertThat(e.getMessage(), endsWith("failed health checks")); assertTrue(leaderFailed.compareAndSet(false, true)); }, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info")); @@ -432,6 +436,7 @@ public void testLeaderBehaviour() { final DiscoveryNode localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT); final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT); final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), localNode.getId()).build(); + final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings, random()); final CapturingTransport capturingTransport = new CapturingTransport(); AtomicReference nodeHealthServiceStatus = new AtomicReference<>(new StatusInfo(UNHEALTHY, "unhealthy-info")); @@ -450,6 +455,7 @@ public void testLeaderBehaviour() { final LeaderChecker leaderChecker = new LeaderChecker( settings, + clusterSettings, transportService, e -> fail("shouldn't be checking anything"), () -> nodeHealthServiceStatus.get()