Skip to content

Commit

Permalink
Warn if allocation diverged from the desired allocation (#95458) (#96367
Browse files Browse the repository at this point in the history
)

This commit adds a warning if a fraction of a shards on undesired nodes has
grown. This should allow us to proactively investigate those cases before they
turn into nodes with hot spots.

(cherry picked from commit 3808b53)
  • Loading branch information
idegtiarenko committed May 26, 2023
1 parent 2490de7 commit 8a16067
Show file tree
Hide file tree
Showing 10 changed files with 577 additions and 377 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,12 @@ public class DesiredBalanceShardsAllocator implements ShardsAllocator {
private final ThreadPool threadPool;
private final DesiredBalanceReconcilerAction reconciler;
private final DesiredBalanceComputer desiredBalanceComputer;
private final DesiredBalanceReconciler desiredBalanceReconciler;
private final ContinuousComputation<DesiredBalanceInput> desiredBalanceComputation;
private final PendingListenersQueue queue;
private final AtomicLong indexGenerator = new AtomicLong(-1);
private final ConcurrentLinkedQueue<List<MoveAllocationCommand>> pendingDesiredBalanceMoves = new ConcurrentLinkedQueue<>();
private final MasterServiceTaskQueue<ReconcileDesiredBalanceTask> masterServiceTaskQueue;
private final NodeAllocationOrdering allocationOrdering = new NodeAllocationOrdering();
private final NodeAllocationOrdering moveOrdering = new NodeAllocationOrdering();
private volatile DesiredBalance currentDesiredBalance = DesiredBalance.INITIAL;
private volatile boolean resetCurrentDesiredBalance = false;

Expand Down Expand Up @@ -101,6 +100,7 @@ public DesiredBalanceShardsAllocator(
this.threadPool = threadPool;
this.reconciler = reconciler;
this.desiredBalanceComputer = desiredBalanceComputer;
this.desiredBalanceReconciler = new DesiredBalanceReconciler(clusterService.getClusterSettings(), threadPool);
this.desiredBalanceComputation = new ContinuousComputation<>(threadPool) {

@Override
Expand Down Expand Up @@ -224,13 +224,7 @@ protected void reconcile(DesiredBalance desiredBalance, RoutingAllocation alloca
} else {
logger.debug("Reconciling desired balance for [{}]", desiredBalance.lastConvergedIndex());
}
var allNodeIds = allocation.routingNodes().getAllNodeIds();
allocationOrdering.retainNodes(allNodeIds);
moveOrdering.retainNodes(allNodeIds);
recordTime(
cumulativeReconciliationTime,
new DesiredBalanceReconciler(desiredBalance, allocation, allocationOrdering, moveOrdering)::run
);
recordTime(cumulativeReconciliationTime, () -> desiredBalanceReconciler.reconcile(desiredBalance, allocation));
if (logger.isTraceEnabled()) {
logger.trace("Reconciled desired balance: {}", desiredBalance);
} else {
Expand Down Expand Up @@ -265,7 +259,7 @@ private void onNoLongerMaster() {
currentDesiredBalance = DesiredBalance.INITIAL;
queue.completeAllAsNotMaster();
pendingDesiredBalanceMoves.clear();
allocationOrdering.clear();
desiredBalanceReconciler.clear();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.cluster.routing.allocation.allocator;

import org.elasticsearch.core.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.function.LongSupplier;

/**
* Execute an action at most once per time interval
*/
public class FrequencyCappedAction {

private final LongSupplier currentTimeMillisSupplier;
private TimeValue minInterval;

private long next = -1;

public FrequencyCappedAction(ThreadPool threadPool) {
this(threadPool::relativeTimeInMillis);
}

public FrequencyCappedAction(LongSupplier currentTimeMillisSupplier) {
this.currentTimeMillisSupplier = currentTimeMillisSupplier;
this.minInterval = TimeValue.MAX_VALUE;
}

public void setMinInterval(TimeValue minInterval) {
this.minInterval = minInterval;
}

public void maybeExecute(Runnable runnable) {
var current = currentTimeMillisSupplier.getAsLong();
if (current >= next) {
next = current + minInterval.millis();
runnable.run();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceComputer;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceReconciler;
import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ConcurrentRebalanceAllocationDecider;
Expand Down Expand Up @@ -213,6 +214,8 @@ public void apply(Settings value, Settings current, Settings previous) {
BalancedShardsAllocator.DISK_USAGE_BALANCE_FACTOR_SETTING,
BalancedShardsAllocator.THRESHOLD_SETTING,
DesiredBalanceComputer.PROGRESS_LOG_INTERVAL_SETTING,
DesiredBalanceReconciler.UNDESIRED_ALLOCATIONS_LOG_INTERVAL_SETTING,
DesiredBalanceReconciler.UNDESIRED_ALLOCATIONS_LOG_THRESHOLD_SETTING,
BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING,
BreakerSettings.CIRCUIT_BREAKER_OVERHEAD_SETTING,
BreakerSettings.CIRCUIT_BREAKER_TYPE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
package org.elasticsearch.cluster.routing.allocation.allocator;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterInfo.NodeAndShard;
Expand Down Expand Up @@ -41,7 +39,6 @@
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -72,6 +69,7 @@
import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED;
import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
import static org.elasticsearch.common.settings.ClusterSettings.createBuiltInClusterSettings;
import static org.elasticsearch.test.MockLogAppender.assertThatLogger;
import static org.hamcrest.Matchers.aMapWithSize;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -956,27 +954,15 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing
}
});

MockLogAppender mockAppender = new MockLogAppender();
mockAppender.start();
mockAppender.addExpectation(expectation);

Logger logger = LogManager.getLogger(DesiredBalanceComputer.class);
Loggers.addAppender(logger, mockAppender);

try {
assertThatLogger(() -> {
var iteration = new AtomicInteger(0);
desiredBalanceComputer.compute(
DesiredBalance.INITIAL,
createInput(createInitialClusterState(3)),
queue(),
input -> iteration.incrementAndGet() < iterations
);

mockAppender.assertAllExpectationsMatched();
} finally {
Loggers.removeAppender(logger, mockAppender);
mockAppender.stop();
}
}, DesiredBalanceComputer.class, expectation);
}

private static Map.Entry<String, Long> indexSize(ClusterState clusterState, String name, long size, boolean primary) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.cluster.routing.allocation.allocator;

import org.apache.logging.log4j.Level;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterInfo;
Expand Down Expand Up @@ -62,6 +63,8 @@
import org.elasticsearch.snapshots.SnapshotShardSizeInfo;
import org.elasticsearch.snapshots.SnapshotsInfoService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.BeforeClass;

import java.util.Comparator;
Expand Down Expand Up @@ -90,11 +93,14 @@
import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING;
import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING;
import static org.elasticsearch.common.settings.ClusterSettings.createBuiltInClusterSettings;
import static org.elasticsearch.test.MockLogAppender.assertThatLogger;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.oneOf;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class DesiredBalanceReconcilerTests extends ESAllocationTestCase {

Expand Down Expand Up @@ -1093,8 +1099,7 @@ public void testRebalanceDoesNotCauseHotSpots() {
new ConcurrentRebalanceAllocationDecider(clusterSettings),
new ThrottlingAllocationDecider(clusterSettings) };

var allocationOrdering = new NodeAllocationOrdering();
var moveOrdering = new NodeAllocationOrdering();
var reconciler = new DesiredBalanceReconciler(clusterSettings, mock(ThreadPool.class));

var totalOutgoingMoves = new HashMap<String, AtomicInteger>();
for (int i = 0; i < numberOfNodes; i++) {
Expand All @@ -1107,7 +1112,7 @@ public void testRebalanceDoesNotCauseHotSpots() {
while (true) {

var allocation = createRoutingAllocationFrom(clusterState, deciders);
new DesiredBalanceReconciler(balance, allocation, allocationOrdering, moveOrdering).run();
reconciler.reconcile(balance, allocation);

var initializing = shardsWithState(allocation.routingNodes(), ShardRoutingState.INITIALIZING);
if (initializing.isEmpty()) {
Expand All @@ -1134,8 +1139,52 @@ public void testRebalanceDoesNotCauseHotSpots() {
}
}

public void testShouldLogOnTooManyUndesiredAllocations() {

var indexMetadata = IndexMetadata.builder("index-1").settings(indexSettings(Version.CURRENT, 1, 0)).build();
final var index = indexMetadata.getIndex();
final var shardId = new ShardId(index, 0);

final var clusterState = ClusterState.builder(ClusterName.DEFAULT)
.nodes(DiscoveryNodes.builder().add(newNode("data-node-1")).add(newNode("data-node-2")))
.metadata(Metadata.builder().put(indexMetadata, true))
.routingTable(
RoutingTable.builder()
.add(IndexRoutingTable.builder(index).addShard(newShardRouting(shardId, "data-node-2", true, STARTED)))
)
.build();

final var balance = new DesiredBalance(1, Map.of(shardId, new ShardAssignment(Set.of("data-node-1"), 1, 0, 0)));

var threadPool = mock(ThreadPool.class);
when(threadPool.relativeTimeInMillis()).thenReturn(1L).thenReturn(2L);

var reconciler = new DesiredBalanceReconciler(createBuiltInClusterSettings(), threadPool);

assertThatLogger(
() -> reconciler.reconcile(balance, createRoutingAllocationFrom(clusterState)),
DesiredBalanceReconciler.class,
new MockLogAppender.SeenEventExpectation(
"Should log first too many shards on undesired locations",
DesiredBalanceReconciler.class.getCanonicalName(),
Level.WARN,
"[100.0%] of assigned shards (1/1) are not on their desired nodes, which exceeds the warn threshold of [10.0%]"
)
);
assertThatLogger(
() -> reconciler.reconcile(balance, createRoutingAllocationFrom(clusterState)),
DesiredBalanceReconciler.class,
new MockLogAppender.UnseenEventExpectation(
"Should not log immediate second too many shards on undesired locations",
DesiredBalanceReconciler.class.getCanonicalName(),
Level.WARN,
"[100.0%] of assigned shards (1/1) are not on their desired nodes, which exceeds the warn threshold of [10.0%]"
)
);
}

private static void reconcile(RoutingAllocation routingAllocation, DesiredBalance desiredBalance) {
new DesiredBalanceReconciler(desiredBalance, routingAllocation, new NodeAllocationOrdering(), new NodeAllocationOrdering()).run();
new DesiredBalanceReconciler(createBuiltInClusterSettings(), mock(ThreadPool.class)).reconcile(desiredBalance, routingAllocation);
}

private static boolean isReconciled(RoutingNode node, DesiredBalance balance) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,12 @@ public ClusterState apply(ClusterState clusterState, Consumer<RoutingAllocation>

var gatewayAllocator = createGatewayAllocator();
var shardsAllocator = createShardsAllocator();
var clusterSettings = createBuiltInClusterSettings();
var desiredBalanceShardsAllocator = new DesiredBalanceShardsAllocator(
shardsAllocator,
threadPool,
clusterService,
new DesiredBalanceComputer(createBuiltInClusterSettings(), threadPool, shardsAllocator) {
new DesiredBalanceComputer(clusterSettings, threadPool, shardsAllocator) {
@Override
public DesiredBalance compute(
DesiredBalance previousDesiredBalance,
Expand Down Expand Up @@ -321,11 +322,12 @@ public ClusterState apply(ClusterState clusterState, Consumer<RoutingAllocation>

var gatewayAllocator = createGatewayAllocator();
var shardsAllocator = createShardsAllocator();
var clusterSettings = createBuiltInClusterSettings();
var desiredBalanceShardsAllocator = new DesiredBalanceShardsAllocator(
shardsAllocator,
threadPool,
clusterService,
new DesiredBalanceComputer(createBuiltInClusterSettings(), threadPool, shardsAllocator) {
new DesiredBalanceComputer(clusterSettings, threadPool, shardsAllocator) {
@Override
public DesiredBalance compute(
DesiredBalance previousDesiredBalance,
Expand Down Expand Up @@ -408,10 +410,10 @@ public void testResetDesiredBalance() {

var threadPool = new TestThreadPool(getTestName());
var clusterService = ClusterServiceUtils.createClusterService(clusterState, threadPool);

var delegateAllocator = createShardsAllocator();
var clusterSettings = createBuiltInClusterSettings();

var desiredBalanceComputer = new DesiredBalanceComputer(createBuiltInClusterSettings(), threadPool, delegateAllocator) {
var desiredBalanceComputer = new DesiredBalanceComputer(clusterSettings, threadPool, delegateAllocator) {

final AtomicReference<DesiredBalance> lastComputationInput = new AtomicReference<>();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.cluster.routing.allocation.allocator;

import org.elasticsearch.test.ESTestCase;

import java.util.concurrent.atomic.AtomicLong;

import static org.elasticsearch.core.TimeValue.timeValueMillis;
import static org.hamcrest.Matchers.equalTo;

public class FrequencyCappedActionTests extends ESTestCase {

public void testFrequencyCapExecution() {

var executions = new AtomicLong(0);
var currentTime = new AtomicLong();
var action = new FrequencyCappedAction(currentTime::get);

var minInterval = timeValueMillis(randomNonNegativeInt());
action.setMinInterval(minInterval);

// initial execution should happen
action.maybeExecute(executions::incrementAndGet);
assertThat(executions.get(), equalTo(1L));

// should not execute again too soon
currentTime.set(randomLongBetween(0, minInterval.millis() - 1));
action.maybeExecute(executions::incrementAndGet);
assertThat(executions.get(), equalTo(1L));

// should execute min interval elapsed
currentTime.set(randomLongBetween(minInterval.millis(), Long.MAX_VALUE));
action.maybeExecute(executions::incrementAndGet);
assertThat(executions.get(), equalTo(2L));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
Expand All @@ -41,6 +40,7 @@
import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.snapshots.SnapshotShardSizeInfo;
import org.elasticsearch.snapshots.SnapshotsInfoService;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.gateway.TestGatewayAllocator;

Expand All @@ -58,7 +58,6 @@
import static org.elasticsearch.cluster.ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.common.settings.ClusterSettings.createBuiltInClusterSettings;
import static org.mockito.Mockito.mock;

public abstract class ESAllocationTestCase extends ESTestCase {

Expand Down Expand Up @@ -153,11 +152,13 @@ private static String pickShardsAllocator(Settings settings) {

private static DesiredBalanceShardsAllocator createDesiredBalanceShardsAllocator(Settings settings) {
var queue = new DeterministicTaskQueue();
var clusterSettings = createBuiltInClusterSettings(settings);
var clusterService = ClusterServiceUtils.createClusterService(queue.getThreadPool(), clusterSettings);
return new DesiredBalanceShardsAllocator(
createBuiltInClusterSettings(settings),
clusterSettings,
new BalancedShardsAllocator(settings),
queue.getThreadPool(),
mock(ClusterService.class),
clusterService,
null
) {
private RoutingAllocation lastAllocation;
Expand Down

0 comments on commit 8a16067

Please sign in to comment.