Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
public class BalancedShardsAllocator implements ShardsAllocator {

private static final Logger logger = LogManager.getLogger(BalancedShardsAllocator.class);
private static final Logger notPreferredLogger = LogManager.getLogger(BalancedShardsAllocator.class.getName() + ".not-preferred");

public static final Setting<Float> SHARD_BALANCE_FACTOR_SETTING = Setting.floatSetting(
"cluster.routing.allocation.balance.shard",
Expand Down Expand Up @@ -866,6 +867,14 @@ public boolean moveShards() {
// can use the cached decision.
final var moveDecision = shardMoved ? decideMove(index, shardRouting) : storedShardMovement.moveDecision();
if (moveDecision.isDecisionTaken() && moveDecision.cannotRemainAndCanMove()) {
if (notPreferredLogger.isDebugEnabled()) {
notPreferredLogger.debug(
"Moving shard [{}] to [{}] from a NOT_PREFERRED allocation, explanation is [{}]",
shardRouting,
moveDecision.getTargetNode().getName(),
moveDecision.getCanRemainDecision().getExplanation()
);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have debug logging turned on for the WriteLoadConstraintDecider the explanation will include the shard write load and the node utilisation.

executeMove(shardRouting, index, moveDecision, "move-non-preferred");
// Return after a single move so that the change can be simulated before further moves are made.
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings;
import org.elasticsearch.common.FrequencyCappedAction;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -154,14 +155,16 @@ public Decision canRemain(IndexMetadata indexMetadata, ShardRouting shardRouting
var nodeWriteThreadPoolQueueLatencyThreshold = writeLoadConstraintSettings.getQueueLatencyThreshold();
if (nodeWriteThreadPoolStats.maxThreadPoolQueueLatencyMillis() >= nodeWriteThreadPoolQueueLatencyThreshold.millis()) {
if (logger.isDebugEnabled() || allocation.debugDecision()) {
final Double shardWriteLoad = getShardWriteLoad(allocation, shardRouting);
final String explain = Strings.format(
"""
Node [%s] has a queue latency of [%d] millis that exceeds the queue latency threshold of [%s]. This node is \
hot-spotting. Current thread pool utilization [%f]. Moving shard(s) away.""",
hot-spotting. Current thread pool utilization [%f]. Shard write load [%s]. Moving shard(s) away.""",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add shard write load into explanation, so we can see it when we log the movement

node.nodeId(),
nodeWriteThreadPoolStats.maxThreadPoolQueueLatencyMillis(),
nodeWriteThreadPoolQueueLatencyThreshold.toHumanReadableString(2),
nodeWriteThreadPoolStats.averageThreadPoolUtilization()
nodeWriteThreadPoolStats.averageThreadPoolUtilization(),
shardWriteLoad == null ? "unknown" : shardWriteLoad
);
if (logger.isDebugEnabled()) {
logCanRemainMessage.maybeExecute(() -> logger.debug(explain));
Expand All @@ -182,6 +185,11 @@ public Decision canRemain(IndexMetadata indexMetadata, ShardRouting shardRouting
);
}

@Nullable
private Double getShardWriteLoad(RoutingAllocation allocation, ShardRouting shardRouting) {
return allocation.clusterInfo().getShardWriteLoads().get(shardRouting.shardId());
}

/**
* Calculates the change to the node's write thread pool utilization percentage if the shard is added to the node.
* Returns the percent thread pool utilization change.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.cluster.routing.allocation.allocator;

import org.apache.logging.log4j.Level;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
import org.elasticsearch.cluster.ClusterInfo;
Expand Down Expand Up @@ -55,7 +56,9 @@
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.snapshots.SnapshotShardSizeInfo;
import org.elasticsearch.test.MockLog;
import org.elasticsearch.test.gateway.TestGatewayAllocator;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.hamcrest.Matchers;

import java.util.ArrayList;
Expand Down Expand Up @@ -1004,6 +1007,43 @@ public void testReturnEarlyOnShardAssignmentChanges() {
applyStartedShardsUntilNoChange(clusterState, allocationService);
}

@TestLogging(
value = "org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.not-preferred:DEBUG",
reason = "debug logging for test"
)
public void testNotPreferredMovementIsLoggedAtDebugLevel() {
final var clusterState = ClusterStateCreationUtils.state(randomIdentifier(), 3, 3);
final var balancedShardsAllocator = new BalancedShardsAllocator(
BalancerSettings.DEFAULT,
TEST_WRITE_LOAD_FORECASTER,
new GlobalBalancingWeightsFactory(BalancerSettings.DEFAULT)
);

final var allocation = new RoutingAllocation(new AllocationDeciders(List.<AllocationDecider>of(new AllocationDecider() {
@Override
public Decision canRemain(
IndexMetadata indexMetadata,
ShardRouting shardRouting,
RoutingNode node,
RoutingAllocation allocation
) {
return new Decision.Single(Decision.Type.NOT_PREFERRED, "test_decider", "Always NOT_PREFERRED");
}
})), clusterState.getRoutingNodes().mutableCopy(), clusterState, ClusterInfo.EMPTY, SnapshotShardSizeInfo.EMPTY, 0L);

final var notPreferredLoggerName = BalancedShardsAllocator.class.getName() + ".not-preferred";
MockLog.assertThatLogger(
() -> balancedShardsAllocator.allocate(allocation),
notPreferredLoggerName,
new MockLog.SeenEventExpectation(
"moved a NOT_PREFERRED allocation",
notPreferredLoggerName,
Level.DEBUG,
"Moving shard [*] to [*] from a NOT_PREFERRED allocation, explanation is [Always NOT_PREFERRED]"
)
);
}

/**
* Test for {@link PrioritiseByShardWriteLoadComparator}. See Comparator Javadoc for expected
* ordering.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,13 @@ private static void addToMockLogs(MockLog mockLog, List<String> loggers) {
* Executes an action and verifies expectations against the provided logger
*/
public static void assertThatLogger(Runnable action, Class<?> loggerOwner, MockLog.LoggingExpectation... expectations) {
assertThatLogger(action, loggerOwner.getCanonicalName(), expectations);
}

/**
* Executes an action and verifies expectations against the provided logger
*/
public static void assertThatLogger(Runnable action, String loggerOwner, MockLog.LoggingExpectation... expectations) {
try (var mockLog = MockLog.capture(loggerOwner)) {
for (var expectation : expectations) {
mockLog.addExpectation(expectation);
Expand Down