From d41aaea802633462baab842eb28396fa6f4a3b8f Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Fri, 14 Nov 2025 11:13:55 +1100 Subject: [PATCH 1/7] Log NOT_PREFERRED shard movements --- .../elasticsearch/cluster/ClusterModule.java | 14 +++++++++-- .../allocator/BalancedShardsAllocator.java | 25 +++++++++++++++++-- .../decider/WriteLoadConstraintDecider.java | 12 +++++++-- .../common/settings/ClusterSettings.java | 1 + .../BalancedShardsAllocatorTests.java | 13 +++++++--- 5 files changed, 56 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java index e1e9444d983fb..0cd70f83c5c6b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -531,13 +531,23 @@ private static ShardsAllocator createShardsAllocator( Map> allocators = new HashMap<>(); allocators.put( BALANCED_ALLOCATOR, - () -> new BalancedShardsAllocator(balancerSettings, writeLoadForecaster, balancingWeightsFactory) + () -> new BalancedShardsAllocator( + balancerSettings, + writeLoadForecaster, + balancingWeightsFactory, + threadPool.relativeTimeInMillisSupplier() + ) ); allocators.put( DESIRED_BALANCE_ALLOCATOR, () -> new DesiredBalanceShardsAllocator( clusterSettings, - new BalancedShardsAllocator(balancerSettings, writeLoadForecaster, balancingWeightsFactory), + new BalancedShardsAllocator( + balancerSettings, + writeLoadForecaster, + balancingWeightsFactory, + threadPool.relativeTimeInMillisSupplier() + ), threadPool, clusterService, reconciler, diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index b65edda76ff15..46c70d579e956 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -35,12 +35,14 @@ import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.cluster.routing.allocation.decider.Decision.Type; +import org.elasticsearch.common.FrequencyCappedAction; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.Maps; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; import org.elasticsearch.gateway.PriorityComparator; import org.elasticsearch.index.shard.ShardId; @@ -56,6 +58,7 @@ import java.util.Map; import java.util.Set; import java.util.function.BiFunction; +import java.util.function.LongSupplier; import java.util.function.Predicate; import static org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata.Type.REPLACE; @@ -77,6 +80,7 @@ public class BalancedShardsAllocator implements ShardsAllocator { private static final Logger logger = LogManager.getLogger(BalancedShardsAllocator.class); + private static final Logger notPreferredLogger = LogManager.getLogger(BalancedShardsAllocator.class.getName() + ".not-preferred"); public static final Setting SHARD_BALANCE_FACTOR_SETTING = Setting.floatSetting( "cluster.routing.allocation.balance.shard", @@ -113,10 +117,18 @@ public class BalancedShardsAllocator implements ShardsAllocator { Property.Dynamic, Property.NodeScope ); + public static final Setting MOVE_NOT_PREFERRED_MINIMUM_LOGGING_INTERVAL = Setting.timeSetting( + "cluster.routing.allocation.balance.move_not_preferred_logging_interval", + TimeValue.ONE_MINUTE, + TimeValue.THIRTY_SECONDS, + Property.Dynamic, + Property.NodeScope + ); private final BalancerSettings balancerSettings; private final WriteLoadForecaster writeLoadForecaster; private final BalancingWeightsFactory balancingWeightsFactory; + private final FrequencyCappedAction logMoveNotPreferred; public BalancedShardsAllocator() { this(Settings.EMPTY); @@ -127,18 +139,20 @@ public BalancedShardsAllocator(Settings settings) { } public BalancedShardsAllocator(BalancerSettings balancerSettings, WriteLoadForecaster writeLoadForecaster) { - this(balancerSettings, writeLoadForecaster, new GlobalBalancingWeightsFactory(balancerSettings)); + this(balancerSettings, writeLoadForecaster, new GlobalBalancingWeightsFactory(balancerSettings), System::currentTimeMillis); } @Inject public BalancedShardsAllocator( BalancerSettings balancerSettings, WriteLoadForecaster writeLoadForecaster, - BalancingWeightsFactory balancingWeightsFactory + BalancingWeightsFactory balancingWeightsFactory, + LongSupplier relativeTimeInMillisProvider ) { this.balancerSettings = balancerSettings; this.writeLoadForecaster = writeLoadForecaster; this.balancingWeightsFactory = balancingWeightsFactory; + this.logMoveNotPreferred = new FrequencyCappedAction(relativeTimeInMillisProvider, TimeValue.ZERO); } @Override @@ -866,6 +880,13 @@ public boolean moveShards() { // can use the cached decision. final var moveDecision = shardMoved ? decideMove(index, shardRouting) : storedShardMovement.moveDecision(); if (moveDecision.isDecisionTaken() && moveDecision.cannotRemainAndCanMove()) { + if (notPreferredLogger.isDebugEnabled()) { + notPreferredLogger.debug( + "Moving shard [{}] from a NOT_PREFERRED allocation, explanation is {}", + shardRouting, + moveDecision.getCanRemainDecision().getExplanation() + ); + } executeMove(shardRouting, index, moveDecision, "move-non-preferred"); // Return after a single move so that the change can be simulated before further moves are made. return true; diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java index 211feb3617524..4db3e814fc845 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java @@ -20,6 +20,7 @@ import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; import org.elasticsearch.common.FrequencyCappedAction; import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Strings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.threadpool.ThreadPool; @@ -152,14 +153,16 @@ public Decision canRemain(IndexMetadata indexMetadata, ShardRouting shardRouting var nodeWriteThreadPoolQueueLatencyThreshold = writeLoadConstraintSettings.getQueueLatencyThreshold(); if (nodeWriteThreadPoolStats.maxThreadPoolQueueLatencyMillis() >= nodeWriteThreadPoolQueueLatencyThreshold.millis()) { if (logger.isDebugEnabled() || allocation.debugDecision()) { + final Double shardWriteLoad = getShardWriteLoad(allocation, shardRouting); final String explain = Strings.format( """ Node [%s] has a queue latency of [%d] millis that exceeds the queue latency threshold of [%s]. This node is \ - hot-spotting. Current thread pool utilization [%f]. Moving shard(s) away.""", + hot-spotting. Current thread pool utilization [%f]. Shard write load [%s]. Moving shard(s) away.""", node.nodeId(), nodeWriteThreadPoolStats.maxThreadPoolQueueLatencyMillis(), nodeWriteThreadPoolQueueLatencyThreshold.toHumanReadableString(2), - nodeWriteThreadPoolStats.averageThreadPoolUtilization() + nodeWriteThreadPoolStats.averageThreadPoolUtilization(), + shardWriteLoad == null ? "unknown" : shardWriteLoad ); if (logger.isDebugEnabled()) { logInterventionMessage.maybeExecute(() -> logger.debug(explain)); @@ -180,6 +183,11 @@ public Decision canRemain(IndexMetadata indexMetadata, ShardRouting shardRouting ); } + @Nullable + private Double getShardWriteLoad(RoutingAllocation allocation, ShardRouting shardRouting) { + return allocation.clusterInfo().getShardWriteLoads().get(shardRouting.shardId()); + } + /** * Calculates the change to the node's write thread pool utilization percentage if the shard is added to the node. * Returns the percent thread pool utilization change. diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 5ad7c2af94c81..ce4af1aaceac4 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -226,6 +226,7 @@ public void apply(Settings value, Settings current, Settings previous) { BalancedShardsAllocator.WRITE_LOAD_BALANCE_FACTOR_SETTING, BalancedShardsAllocator.DISK_USAGE_BALANCE_FACTOR_SETTING, BalancedShardsAllocator.THRESHOLD_SETTING, + BalancedShardsAllocator.MOVE_NOT_PREFERRED_MINIMUM_LOGGING_INTERVAL, DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_DECREASE_SHARDS_COOLDOWN, DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_INCREASE_SHARDS_COOLDOWN, DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_EXCLUDES_SETTING, diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java index e4ed6ea9b50d1..8c192cbffffe4 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java @@ -638,7 +638,8 @@ public void testPartitionedClusterWithSeparateWeights() { TEST_WRITE_LOAD_FORECASTER, new PrefixBalancingWeightsFactory( Map.of("shardsOnly", new WeightFunction(1, 0, 0, 0), "weightsOnly", new WeightFunction(0, 0, 1, 0)) - ) + ), + System::currentTimeMillis ), EmptyClusterInfoService.INSTANCE, SNAPSHOT_INFO_SERVICE_WITH_NO_SHARD_SIZES @@ -742,7 +743,8 @@ public Iterator iterator() { public boolean diskUsageIgnored() { return true; // This makes the computation ignore disk usage } - } + }, + System::currentTimeMillis ); final String indexName = randomIdentifier(); @@ -1128,7 +1130,12 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing final var allocationService = new MockAllocationService( new AllocationDeciders(List.of(notPreferredDecider)), new TestGatewayAllocator(), - new BalancedShardsAllocator(BalancerSettings.DEFAULT, TEST_WRITE_LOAD_FORECASTER, new NodeNameDrivenBalancingWeightsFactory()), + new BalancedShardsAllocator( + BalancerSettings.DEFAULT, + TEST_WRITE_LOAD_FORECASTER, + new NodeNameDrivenBalancingWeightsFactory(), + System::currentTimeMillis + ), () -> ClusterInfo.EMPTY, SNAPSHOT_INFO_SERVICE_WITH_NO_SHARD_SIZES ); From 7a0455f93d06e687ebf11679ad907bcdd67a3de2 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Fri, 14 Nov 2025 11:16:07 +1100 Subject: [PATCH 2/7] Log NOT_PREFERRED shard movements --- .../routing/allocation/allocator/BalancedShardsAllocator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 46c70d579e956..0ee1b6a0dc7b2 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -882,7 +882,7 @@ public boolean moveShards() { if (moveDecision.isDecisionTaken() && moveDecision.cannotRemainAndCanMove()) { if (notPreferredLogger.isDebugEnabled()) { notPreferredLogger.debug( - "Moving shard [{}] from a NOT_PREFERRED allocation, explanation is {}", + "Moving shard [{}] from a NOT_PREFERRED allocation, explanation is [{}]", shardRouting, moveDecision.getCanRemainDecision().getExplanation() ); From 6bf362b7ff01ae4f62d93236b09f41bb9d9c4254 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Fri, 14 Nov 2025 15:32:03 +1100 Subject: [PATCH 3/7] Add tests/finish implementation --- .../allocator/BalancedShardsAllocator.java | 23 ++++-- .../allocator/BalancerSettings.java | 6 ++ .../BalancedShardsAllocatorTests.java | 74 +++++++++++++++++++ .../java/org/elasticsearch/test/MockLog.java | 7 ++ 4 files changed, 103 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 0ee1b6a0dc7b2..96523fc796cc0 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -153,6 +153,8 @@ public BalancedShardsAllocator( this.writeLoadForecaster = writeLoadForecaster; this.balancingWeightsFactory = balancingWeightsFactory; this.logMoveNotPreferred = new FrequencyCappedAction(relativeTimeInMillisProvider, TimeValue.ZERO); + balancerSettings.getClusterSettings() + .initializeAndWatch(MOVE_NOT_PREFERRED_MINIMUM_LOGGING_INTERVAL, logMoveNotPreferred::setMinInterval); } @Override @@ -184,7 +186,8 @@ public void allocate(RoutingAllocation allocation) { allocation, balancerSettings.getThreshold(), balancingWeights, - balancerSettings.completeEarlyOnShardAssignmentChange() + balancerSettings.completeEarlyOnShardAssignmentChange(), + logMoveNotPreferred ); boolean shardAssigned = false, shardMoved = false, shardBalanced = false; @@ -263,7 +266,8 @@ public ShardAllocationDecision explainShardAllocation(final ShardRouting shard, allocation, balancerSettings.getThreshold(), balancingWeightsFactory.create(), - balancerSettings.completeEarlyOnShardAssignmentChange() + balancerSettings.completeEarlyOnShardAssignmentChange(), + logMoveNotPreferred ); AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision.NOT_TAKEN; MoveDecision moveDecision = MoveDecision.NOT_TAKEN; @@ -324,13 +328,15 @@ public static class Balancer { private final BalancingWeights balancingWeights; private final NodeSorters nodeSorters; private final boolean completeEarlyOnShardAssignmentChange; + private final FrequencyCappedAction logMoveNotPreferred; private Balancer( WriteLoadForecaster writeLoadForecaster, RoutingAllocation allocation, float threshold, BalancingWeights balancingWeights, - boolean completeEarlyOnShardAssignmentChange + boolean completeEarlyOnShardAssignmentChange, + FrequencyCappedAction logMoveNotPreferred ) { this.writeLoadForecaster = writeLoadForecaster; this.allocation = allocation; @@ -346,6 +352,7 @@ private Balancer( this.nodeSorters = balancingWeights.createNodeSorters(nodesArray(), this); this.balancingWeights = balancingWeights; this.completeEarlyOnShardAssignmentChange = completeEarlyOnShardAssignmentChange; + this.logMoveNotPreferred = logMoveNotPreferred; } private static long getShardDiskUsageInBytes(ShardRouting shardRouting, IndexMetadata indexMetadata, ClusterInfo clusterInfo) { @@ -881,10 +888,12 @@ public boolean moveShards() { final var moveDecision = shardMoved ? decideMove(index, shardRouting) : storedShardMovement.moveDecision(); if (moveDecision.isDecisionTaken() && moveDecision.cannotRemainAndCanMove()) { if (notPreferredLogger.isDebugEnabled()) { - notPreferredLogger.debug( - "Moving shard [{}] from a NOT_PREFERRED allocation, explanation is [{}]", - shardRouting, - moveDecision.getCanRemainDecision().getExplanation() + logMoveNotPreferred.maybeExecute( + () -> notPreferredLogger.debug( + "Moving shard [{}] from a NOT_PREFERRED allocation, explanation is [{}]", + shardRouting, + moveDecision.getCanRemainDecision().getExplanation() + ) ); } executeMove(shardRouting, index, moveDecision, "move-non-preferred"); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancerSettings.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancerSettings.java index ea45e46a278e9..6b335b863313e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancerSettings.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancerSettings.java @@ -28,6 +28,7 @@ public class BalancerSettings { private volatile float diskUsageBalanceFactor; private volatile float threshold; private final boolean completeEarlyOnShardAssignmentChange; + private final ClusterSettings clusterSettings; public BalancerSettings(Settings settings) { this(ClusterSettings.createBuiltInClusterSettings(settings)); @@ -42,6 +43,11 @@ public BalancerSettings(ClusterSettings clusterSettings) { this.completeEarlyOnShardAssignmentChange = ClusterModule.DESIRED_BALANCE_ALLOCATOR.equals( clusterSettings.get(ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING) ); + this.clusterSettings = clusterSettings; + } + + public ClusterSettings getClusterSettings() { + return clusterSettings; } /** diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java index 8c192cbffffe4..b8e4269607125 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java @@ -9,6 +9,7 @@ package org.elasticsearch.cluster.routing.allocation.allocator; +import org.apache.logging.log4j.Level; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; import org.elasticsearch.cluster.ClusterInfo; @@ -51,11 +52,14 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Strings; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.snapshots.SnapshotShardSizeInfo; +import org.elasticsearch.test.MockLog; import org.elasticsearch.test.gateway.TestGatewayAllocator; +import org.elasticsearch.test.junit.annotations.TestLogging; import org.hamcrest.Matchers; import java.util.ArrayList; @@ -66,6 +70,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.DoubleSupplier; import java.util.function.Function; @@ -1006,6 +1011,75 @@ public void testReturnEarlyOnShardAssignmentChanges() { applyStartedShardsUntilNoChange(clusterState, allocationService); } + @TestLogging( + value = "org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.not-preferred:DEBUG", + reason = "debug logging for test" + ) + public void testNotPreferredMovementIsLoggedAtDebugLevel() { + final var clusterState = ClusterStateCreationUtils.state(randomIdentifier(), 3, 3); + final var minimumLogInterval = TimeValue.timeValueMinutes(randomIntBetween(1, 10)); + final var balancerSettings = new BalancerSettings( + Settings.builder().put(BalancedShardsAllocator.MOVE_NOT_PREFERRED_MINIMUM_LOGGING_INTERVAL.getKey(), minimumLogInterval).build() + ); + final var relativeTimeMillis = new AtomicLong(); + final var balancedShardsAllocator = new BalancedShardsAllocator( + balancerSettings, + TEST_WRITE_LOAD_FORECASTER, + new GlobalBalancingWeightsFactory(balancerSettings), + relativeTimeMillis::get + ); + + final var allocation = new RoutingAllocation(new AllocationDeciders(List.of(new AllocationDecider() { + @Override + public Decision canRemain( + IndexMetadata indexMetadata, + ShardRouting shardRouting, + RoutingNode node, + RoutingAllocation allocation + ) { + return new Decision.Single(Decision.Type.NOT_PREFERRED, "test_decider", "Always NOT_PREFERRED"); + } + })), clusterState.getRoutingNodes().mutableCopy(), clusterState, ClusterInfo.EMPTY, SnapshotShardSizeInfo.EMPTY, 0L); + + final var notPreferredLoggerName = BalancedShardsAllocator.class.getName() + ".not-preferred"; + MockLog.assertThatLogger( + () -> balancedShardsAllocator.allocate(allocation), + notPreferredLoggerName, + new MockLog.SeenEventExpectation( + "moved a NOT_PREFERRED allocation", + notPreferredLoggerName, + Level.DEBUG, + "Moving shard [*] from a NOT_PREFERRED allocation, explanation is [Always NOT_PREFERRED]" + ) + ); + + // We shouldn't log again before the log interval + relativeTimeMillis.addAndGet(randomLongBetween(1, minimumLogInterval.millis() - 1)); + MockLog.assertThatLogger( + () -> balancedShardsAllocator.allocate(allocation), + notPreferredLoggerName, + new MockLog.UnseenEventExpectation( + "moved a NOT_PREFERRED allocation", + notPreferredLoggerName, + Level.DEBUG, + "Moving shard [*] from a NOT_PREFERRED allocation, explanation is [Always NOT_PREFERRED]" + ) + ); + + // We should log again once the interval has passed + relativeTimeMillis.addAndGet(minimumLogInterval.millis()); + MockLog.assertThatLogger( + () -> balancedShardsAllocator.allocate(allocation), + notPreferredLoggerName, + new MockLog.SeenEventExpectation( + "moved a NOT_PREFERRED allocation", + notPreferredLoggerName, + Level.DEBUG, + "Moving shard [*] from a NOT_PREFERRED allocation, explanation is [Always NOT_PREFERRED]" + ) + ); + } + /** * Test for {@link PrioritiseByShardWriteLoadComparator}. See Comparator Javadoc for expected * ordering. diff --git a/test/framework/src/main/java/org/elasticsearch/test/MockLog.java b/test/framework/src/main/java/org/elasticsearch/test/MockLog.java index f14fc86c03250..6739ca425c4ae 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/MockLog.java +++ b/test/framework/src/main/java/org/elasticsearch/test/MockLog.java @@ -447,6 +447,13 @@ private static void addToMockLogs(MockLog mockLog, List loggers) { * Executes an action and verifies expectations against the provided logger */ public static void assertThatLogger(Runnable action, Class loggerOwner, MockLog.LoggingExpectation... expectations) { + assertThatLogger(action, loggerOwner.getCanonicalName(), expectations); + } + + /** + * Executes an action and verifies expectations against the provided logger + */ + public static void assertThatLogger(Runnable action, String loggerOwner, MockLog.LoggingExpectation... expectations) { try (var mockLog = MockLog.capture(loggerOwner)) { for (var expectation : expectations) { mockLog.addExpectation(expectation); From 7dfedbe8357330254f35c9722251759a2d73e874 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Sat, 15 Nov 2025 08:44:21 +1100 Subject: [PATCH 4/7] Log target node when making NOT_PREFERRED move --- .../routing/allocation/allocator/BalancedShardsAllocator.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 96523fc796cc0..735ab0f6bee0a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -890,8 +890,9 @@ public boolean moveShards() { if (notPreferredLogger.isDebugEnabled()) { logMoveNotPreferred.maybeExecute( () -> notPreferredLogger.debug( - "Moving shard [{}] from a NOT_PREFERRED allocation, explanation is [{}]", + "Moving shard [{}] to [{}] from a NOT_PREFERRED allocation, explanation is [{}]", shardRouting, + moveDecision.getTargetNode().getName(), moveDecision.getCanRemainDecision().getExplanation() ) ); From 3ba31a0a7457d1821642b23e5b25dff7aa2ebb46 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Sat, 15 Nov 2025 09:20:45 +1100 Subject: [PATCH 5/7] Remove rate-limiting --- .../elasticsearch/cluster/ClusterModule.java | 14 ++------ .../allocator/BalancedShardsAllocator.java | 34 ++++++------------- .../BalancedShardsAllocatorTests.java | 16 +++------ 3 files changed, 16 insertions(+), 48 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 0cd70f83c5c6b..e1e9444d983fb 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -531,23 +531,13 @@ private static ShardsAllocator createShardsAllocator( Map> allocators = new HashMap<>(); allocators.put( BALANCED_ALLOCATOR, - () -> new BalancedShardsAllocator( - balancerSettings, - writeLoadForecaster, - balancingWeightsFactory, - threadPool.relativeTimeInMillisSupplier() - ) + () -> new BalancedShardsAllocator(balancerSettings, writeLoadForecaster, balancingWeightsFactory) ); allocators.put( DESIRED_BALANCE_ALLOCATOR, () -> new DesiredBalanceShardsAllocator( clusterSettings, - new BalancedShardsAllocator( - balancerSettings, - writeLoadForecaster, - balancingWeightsFactory, - threadPool.relativeTimeInMillisSupplier() - ), + new BalancedShardsAllocator(balancerSettings, writeLoadForecaster, balancingWeightsFactory), threadPool, clusterService, reconciler, diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 735ab0f6bee0a..27b015329fc5f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -35,7 +35,6 @@ import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.cluster.routing.allocation.decider.Decision.Type; -import org.elasticsearch.common.FrequencyCappedAction; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; @@ -58,7 +57,6 @@ import java.util.Map; import java.util.Set; import java.util.function.BiFunction; -import java.util.function.LongSupplier; import java.util.function.Predicate; import static org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata.Type.REPLACE; @@ -128,7 +126,6 @@ public class BalancedShardsAllocator implements ShardsAllocator { private final BalancerSettings balancerSettings; private final WriteLoadForecaster writeLoadForecaster; private final BalancingWeightsFactory balancingWeightsFactory; - private final FrequencyCappedAction logMoveNotPreferred; public BalancedShardsAllocator() { this(Settings.EMPTY); @@ -139,22 +136,18 @@ public BalancedShardsAllocator(Settings settings) { } public BalancedShardsAllocator(BalancerSettings balancerSettings, WriteLoadForecaster writeLoadForecaster) { - this(balancerSettings, writeLoadForecaster, new GlobalBalancingWeightsFactory(balancerSettings), System::currentTimeMillis); + this(balancerSettings, writeLoadForecaster, new GlobalBalancingWeightsFactory(balancerSettings)); } @Inject public BalancedShardsAllocator( BalancerSettings balancerSettings, WriteLoadForecaster writeLoadForecaster, - BalancingWeightsFactory balancingWeightsFactory, - LongSupplier relativeTimeInMillisProvider + BalancingWeightsFactory balancingWeightsFactory ) { this.balancerSettings = balancerSettings; this.writeLoadForecaster = writeLoadForecaster; this.balancingWeightsFactory = balancingWeightsFactory; - this.logMoveNotPreferred = new FrequencyCappedAction(relativeTimeInMillisProvider, TimeValue.ZERO); - balancerSettings.getClusterSettings() - .initializeAndWatch(MOVE_NOT_PREFERRED_MINIMUM_LOGGING_INTERVAL, logMoveNotPreferred::setMinInterval); } @Override @@ -186,8 +179,7 @@ public void allocate(RoutingAllocation allocation) { allocation, balancerSettings.getThreshold(), balancingWeights, - balancerSettings.completeEarlyOnShardAssignmentChange(), - logMoveNotPreferred + balancerSettings.completeEarlyOnShardAssignmentChange() ); boolean shardAssigned = false, shardMoved = false, shardBalanced = false; @@ -266,8 +258,7 @@ public ShardAllocationDecision explainShardAllocation(final ShardRouting shard, allocation, balancerSettings.getThreshold(), balancingWeightsFactory.create(), - balancerSettings.completeEarlyOnShardAssignmentChange(), - logMoveNotPreferred + balancerSettings.completeEarlyOnShardAssignmentChange() ); AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision.NOT_TAKEN; MoveDecision moveDecision = MoveDecision.NOT_TAKEN; @@ -328,15 +319,13 @@ public static class Balancer { private final BalancingWeights balancingWeights; private final NodeSorters nodeSorters; private final boolean completeEarlyOnShardAssignmentChange; - private final FrequencyCappedAction logMoveNotPreferred; private Balancer( WriteLoadForecaster writeLoadForecaster, RoutingAllocation allocation, float threshold, BalancingWeights balancingWeights, - boolean completeEarlyOnShardAssignmentChange, - FrequencyCappedAction logMoveNotPreferred + boolean completeEarlyOnShardAssignmentChange ) { this.writeLoadForecaster = writeLoadForecaster; this.allocation = allocation; @@ -352,7 +341,6 @@ private Balancer( this.nodeSorters = balancingWeights.createNodeSorters(nodesArray(), this); this.balancingWeights = balancingWeights; this.completeEarlyOnShardAssignmentChange = completeEarlyOnShardAssignmentChange; - this.logMoveNotPreferred = logMoveNotPreferred; } private static long getShardDiskUsageInBytes(ShardRouting shardRouting, IndexMetadata indexMetadata, ClusterInfo clusterInfo) { @@ -888,13 +876,11 @@ public boolean moveShards() { final var moveDecision = shardMoved ? decideMove(index, shardRouting) : storedShardMovement.moveDecision(); if (moveDecision.isDecisionTaken() && moveDecision.cannotRemainAndCanMove()) { if (notPreferredLogger.isDebugEnabled()) { - logMoveNotPreferred.maybeExecute( - () -> notPreferredLogger.debug( - "Moving shard [{}] to [{}] from a NOT_PREFERRED allocation, explanation is [{}]", - shardRouting, - moveDecision.getTargetNode().getName(), - moveDecision.getCanRemainDecision().getExplanation() - ) + notPreferredLogger.debug( + "Moving shard [{}] to [{}] from a NOT_PREFERRED allocation, explanation is [{}]", + shardRouting, + moveDecision.getTargetNode().getName(), + moveDecision.getCanRemainDecision().getExplanation() ); } executeMove(shardRouting, index, moveDecision, "move-non-preferred"); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java index b8e4269607125..64a359d2980b7 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java @@ -643,8 +643,7 @@ public void testPartitionedClusterWithSeparateWeights() { TEST_WRITE_LOAD_FORECASTER, new PrefixBalancingWeightsFactory( Map.of("shardsOnly", new WeightFunction(1, 0, 0, 0), "weightsOnly", new WeightFunction(0, 0, 1, 0)) - ), - System::currentTimeMillis + ) ), EmptyClusterInfoService.INSTANCE, SNAPSHOT_INFO_SERVICE_WITH_NO_SHARD_SIZES @@ -748,8 +747,7 @@ public Iterator iterator() { public boolean diskUsageIgnored() { return true; // This makes the computation ignore disk usage } - }, - System::currentTimeMillis + } ); final String indexName = randomIdentifier(); @@ -1025,8 +1023,7 @@ public void testNotPreferredMovementIsLoggedAtDebugLevel() { final var balancedShardsAllocator = new BalancedShardsAllocator( balancerSettings, TEST_WRITE_LOAD_FORECASTER, - new GlobalBalancingWeightsFactory(balancerSettings), - relativeTimeMillis::get + new GlobalBalancingWeightsFactory(balancerSettings) ); final var allocation = new RoutingAllocation(new AllocationDeciders(List.of(new AllocationDecider() { @@ -1204,12 +1201,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing final var allocationService = new MockAllocationService( new AllocationDeciders(List.of(notPreferredDecider)), new TestGatewayAllocator(), - new BalancedShardsAllocator( - BalancerSettings.DEFAULT, - TEST_WRITE_LOAD_FORECASTER, - new NodeNameDrivenBalancingWeightsFactory(), - System::currentTimeMillis - ), + new BalancedShardsAllocator(BalancerSettings.DEFAULT, TEST_WRITE_LOAD_FORECASTER, new NodeNameDrivenBalancingWeightsFactory()), () -> ClusterInfo.EMPTY, SNAPSHOT_INFO_SERVICE_WITH_NO_SHARD_SIZES ); From c593950c39d7a80b7a565b547374c1bec2a5d3f6 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Sat, 15 Nov 2025 09:24:06 +1100 Subject: [PATCH 6/7] Reduce change --- .../allocator/BalancedShardsAllocator.java | 8 ---- .../allocator/BalancerSettings.java | 6 --- .../common/settings/ClusterSettings.java | 1 - .../BalancedShardsAllocatorTests.java | 37 +------------------ 4 files changed, 2 insertions(+), 50 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 27b015329fc5f..325c89400010a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -41,7 +41,6 @@ import org.elasticsearch.common.util.Maps; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.Nullable; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; import org.elasticsearch.gateway.PriorityComparator; import org.elasticsearch.index.shard.ShardId; @@ -115,13 +114,6 @@ public class BalancedShardsAllocator implements ShardsAllocator { Property.Dynamic, Property.NodeScope ); - public static final Setting MOVE_NOT_PREFERRED_MINIMUM_LOGGING_INTERVAL = Setting.timeSetting( - "cluster.routing.allocation.balance.move_not_preferred_logging_interval", - TimeValue.ONE_MINUTE, - TimeValue.THIRTY_SECONDS, - Property.Dynamic, - Property.NodeScope - ); private final BalancerSettings balancerSettings; private final WriteLoadForecaster writeLoadForecaster; diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancerSettings.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancerSettings.java index 6b335b863313e..ea45e46a278e9 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancerSettings.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancerSettings.java @@ -28,7 +28,6 @@ public class BalancerSettings { private volatile float diskUsageBalanceFactor; private volatile float threshold; private final boolean completeEarlyOnShardAssignmentChange; - private final ClusterSettings clusterSettings; public BalancerSettings(Settings settings) { this(ClusterSettings.createBuiltInClusterSettings(settings)); @@ -43,11 +42,6 @@ public BalancerSettings(ClusterSettings clusterSettings) { this.completeEarlyOnShardAssignmentChange = ClusterModule.DESIRED_BALANCE_ALLOCATOR.equals( clusterSettings.get(ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING) ); - this.clusterSettings = clusterSettings; - } - - public ClusterSettings getClusterSettings() { - return clusterSettings; } /** diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index ce4af1aaceac4..5ad7c2af94c81 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -226,7 +226,6 @@ public void apply(Settings value, Settings current, Settings previous) { BalancedShardsAllocator.WRITE_LOAD_BALANCE_FACTOR_SETTING, BalancedShardsAllocator.DISK_USAGE_BALANCE_FACTOR_SETTING, BalancedShardsAllocator.THRESHOLD_SETTING, - BalancedShardsAllocator.MOVE_NOT_PREFERRED_MINIMUM_LOGGING_INTERVAL, DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_DECREASE_SHARDS_COOLDOWN, DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_INCREASE_SHARDS_COOLDOWN, DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_EXCLUDES_SETTING, diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java index 64a359d2980b7..ca7901a9de411 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java @@ -52,7 +52,6 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Strings; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.shard.ShardId; @@ -70,7 +69,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.DoubleSupplier; import java.util.function.Function; @@ -1015,15 +1013,10 @@ public void testReturnEarlyOnShardAssignmentChanges() { ) public void testNotPreferredMovementIsLoggedAtDebugLevel() { final var clusterState = ClusterStateCreationUtils.state(randomIdentifier(), 3, 3); - final var minimumLogInterval = TimeValue.timeValueMinutes(randomIntBetween(1, 10)); - final var balancerSettings = new BalancerSettings( - Settings.builder().put(BalancedShardsAllocator.MOVE_NOT_PREFERRED_MINIMUM_LOGGING_INTERVAL.getKey(), minimumLogInterval).build() - ); - final var relativeTimeMillis = new AtomicLong(); final var balancedShardsAllocator = new BalancedShardsAllocator( - balancerSettings, + BalancerSettings.DEFAULT, TEST_WRITE_LOAD_FORECASTER, - new GlobalBalancingWeightsFactory(balancerSettings) + new GlobalBalancingWeightsFactory(BalancerSettings.DEFAULT) ); final var allocation = new RoutingAllocation(new AllocationDeciders(List.of(new AllocationDecider() { @@ -1049,32 +1042,6 @@ public Decision canRemain( "Moving shard [*] from a NOT_PREFERRED allocation, explanation is [Always NOT_PREFERRED]" ) ); - - // We shouldn't log again before the log interval - relativeTimeMillis.addAndGet(randomLongBetween(1, minimumLogInterval.millis() - 1)); - MockLog.assertThatLogger( - () -> balancedShardsAllocator.allocate(allocation), - notPreferredLoggerName, - new MockLog.UnseenEventExpectation( - "moved a NOT_PREFERRED allocation", - notPreferredLoggerName, - Level.DEBUG, - "Moving shard [*] from a NOT_PREFERRED allocation, explanation is [Always NOT_PREFERRED]" - ) - ); - - // We should log again once the interval has passed - relativeTimeMillis.addAndGet(minimumLogInterval.millis()); - MockLog.assertThatLogger( - () -> balancedShardsAllocator.allocate(allocation), - notPreferredLoggerName, - new MockLog.SeenEventExpectation( - "moved a NOT_PREFERRED allocation", - notPreferredLoggerName, - Level.DEBUG, - "Moving shard [*] from a NOT_PREFERRED allocation, explanation is [Always NOT_PREFERRED]" - ) - ); } /** From 848be0a80008f876cfb67af6677ce43fb5e89a37 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Sat, 15 Nov 2025 12:17:46 +1100 Subject: [PATCH 7/7] Fix test to fit new pattern --- .../allocation/allocator/BalancedShardsAllocatorTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java index ca7901a9de411..246e6631a01c9 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java @@ -1039,7 +1039,7 @@ public Decision canRemain( "moved a NOT_PREFERRED allocation", notPreferredLoggerName, Level.DEBUG, - "Moving shard [*] from a NOT_PREFERRED allocation, explanation is [Always NOT_PREFERRED]" + "Moving shard [*] to [*] from a NOT_PREFERRED allocation, explanation is [Always NOT_PREFERRED]" ) ); }