Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public final class ShardRouting implements Writeable, ToXContentObject {
*/
ShardRouting(
ShardId shardId,
String currentNodeId,
@Nullable String currentNodeId,
String relocatingNodeId,
boolean primary,
ShardRoutingState state,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public RoutingAllocation(
* @param currentNanoTime the nano time to use for all delay allocation calculation (typically {@link System#nanoTime()})
* @param isSimulating {@code true} if "transient" deciders should be ignored because we are simulating the final allocation
*/
private RoutingAllocation(
public RoutingAllocation(
AllocationDeciders deciders,
@Nullable RoutingNodes routingNodes,
ClusterState clusterState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,17 @@ public class ThrottlingAllocationDecider extends AllocationDecider {
Property.NodeScope
);

public static final Setting<Boolean> CLUSTER_ROUTING_ALLOCATION_UNTHROTTLE_REPLICA_ASSIGNMENT_IN_SIMULATION = Setting.boolSetting(
"cluster.routing.allocation.unthrottle_replica_assignment_in_simulation",
false,
Property.Dynamic,
Property.NodeScope
);

private volatile int primariesInitialRecoveries;
private volatile int concurrentIncomingRecoveries;
private volatile int concurrentOutgoingRecoveries;
private volatile boolean unthrottleReplicaAssignmentInSimulation = false;

public ThrottlingAllocationDecider(ClusterSettings clusterSettings) {
clusterSettings.initializeAndWatch(
Expand All @@ -94,6 +102,9 @@ public ThrottlingAllocationDecider(ClusterSettings clusterSettings) {
CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING,
this::setConcurrentOutgoingRecoverries
);
clusterSettings.initializeAndWatch(CLUSTER_ROUTING_ALLOCATION_UNTHROTTLE_REPLICA_ASSIGNMENT_IN_SIMULATION, (settingVal) -> {
unthrottleReplicaAssignmentInSimulation = settingVal;
});
logger.debug(
"using node_concurrent_outgoing_recoveries [{}], node_concurrent_incoming_recoveries [{}], "
+ "node_initial_primaries_recoveries [{}]",
Expand Down Expand Up @@ -150,6 +161,13 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
);
}
return allocation.decision(YES, NAME, "below primary recovery limit of [%d]", primariesInitialRecoveries);
} else if (allocation.isSimulating() && unthrottleReplicaAssignmentInSimulation && shardRouting.unassigned()) {
// CLUSTER_ROUTING_ALLOCATION_UNTHROTTLE_REPLICA_ASSIGNMENT_IN_SIMULATION permits us to treat unassigned replicas as equally
// urgent as unassigned primaries to allocate, for availability reasons.
// During simulation, this supports early publishing DesiredBalance, with all unassigned shards assigned.
// Notably, this bypass is only in simulation decisions. Reconciliation will continue to obey throttling, in particular the
// requirement to assign a primary before allowing its replicas to begin initializing.
return allocation.decision(Decision.YES, NAME, "replica allocation is not throttled when simulating");
Copy link
Contributor

@nicktindall nicktindall Nov 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change possibly redundant since we implemented #134786. If I understand correctly the ThrottlingAllocationDecider won't ever kick in now that we do a single move per balancing round?

I thought that ES-12942 was more similar to this change #115511

Never mind I see now we'll need this with the subsequent changes

} else {
// Peer recovery
assert initializingShard(shardRouting, node.nodeId()).recoverySource().getType() == RecoverySource.Type.PEER;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ public void apply(Settings value, Settings current, Settings previous) {
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_UNTHROTTLE_REPLICA_ASSIGNMENT_IN_SIMULATION,
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING,
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_MAX_HEADROOM_SETTING,
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster.routing.allocation;

import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.routing.RoutingChangesObserver;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;

import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

public class ThrottlingAllocationDeciderTests extends ESAllocationTestCase {

private record TestHarness(
ClusterState clusterState,
RoutingNodes mutableRoutingNodes,
RoutingNode mutableRoutingNode1,
RoutingNode mutableRoutingNode2,
ShardRouting unassignedShardRouting1Primary,
ShardRouting unassignedShardRouting1Replica,
ShardRouting unassignedShardRouting2Primary,
ShardRouting unassignedShardRouting2Replica
) {}

private TestHarness setUpTwoNodesAndIndexWithTwoUnassignedPrimariesAndReplicas() {
int numberOfShards = 2;
ClusterState clusterState = ClusterStateCreationUtils.stateWithUnassignedPrimariesAndReplicas(
new String[] { "test-index" },
numberOfShards,
1
);
// The number of data nodes the util method above creates is numberOfReplicas+1.
assertEquals(2, clusterState.nodes().size());
assertEquals(1, clusterState.metadata().getTotalNumberOfIndices());

var indexIterator = clusterState.metadata().indicesAllProjects().iterator();
assertTrue(indexIterator.hasNext());
IndexMetadata testIndexMetadata = indexIterator.next();
assertFalse(indexIterator.hasNext());
Index testIndex = testIndexMetadata.getIndex();
assertEquals(numberOfShards, testIndexMetadata.getNumberOfShards());
ShardId testShardId1 = new ShardId(testIndex, 0);
ShardId testShardId2 = new ShardId(testIndex, 1);

var mutableRoutingNodes = clusterState.mutableRoutingNodes();

// The RoutingNode references must be to the RoutingAllocation's RoutingNodes, so that changes to one is reflected in the other.
var routingNodesIterator = mutableRoutingNodes.iterator();
assertTrue(routingNodesIterator.hasNext());
var mutableRoutingNode1 = routingNodesIterator.next();
assertTrue(routingNodesIterator.hasNext());
var mutableRoutingNode2 = routingNodesIterator.next();
assertFalse(routingNodesIterator.hasNext());

RoutingTable routingTable = clusterState.routingTable(ProjectId.DEFAULT);

assertThat(routingTable.shardRoutingTable(testShardId1).replicaShards().size(), equalTo(1));
assertThat(routingTable.shardRoutingTable(testShardId2).replicaShards().size(), equalTo(1));

ShardRouting unassignedShardRouting1Primary = routingTable.shardRoutingTable(testShardId1).primaryShard();
ShardRouting unassignedShardRouting1Replica = routingTable.shardRoutingTable(testShardId1).replicaShards().get(0);
ShardRouting unassignedShardRouting2Primary = routingTable.shardRoutingTable(testShardId2).primaryShard();
ShardRouting unassignedShardRouting2Replica = routingTable.shardRoutingTable(testShardId2).replicaShards().get(0);

assertFalse(unassignedShardRouting1Primary.assignedToNode());
assertFalse(unassignedShardRouting1Replica.assignedToNode());
assertFalse(unassignedShardRouting2Primary.assignedToNode());
assertFalse(unassignedShardRouting2Replica.assignedToNode());

return new TestHarness(
clusterState,
mutableRoutingNodes,
mutableRoutingNode1,
mutableRoutingNode2,
unassignedShardRouting1Primary,
unassignedShardRouting1Replica,
unassignedShardRouting2Primary,
unassignedShardRouting2Replica
);
}

public void testPrimaryAndReplicaThrottlingNotSimulation() {
/* Create cluster state for multiple nodes and an index with _unassigned_ shards. */
TestHarness harness = setUpTwoNodesAndIndexWithTwoUnassignedPrimariesAndReplicas();

/* Decider Testing */

// Set up RoutingAllocation in non-simulation mode.
var routingAllocation = new RoutingAllocation(
null,
harness.mutableRoutingNodes,
harness.clusterState,
ClusterInfo.builder().build(),
null,
System.nanoTime(),
false // Turn off isSimulating
);

final RoutingChangesObserver NOOP = new RoutingChangesObserver() {
};
Settings settings = Settings.builder()
.put("cluster.routing.allocation.unthrottle_replica_assignment_in_simulation", randomBoolean() ? true : false)
.put("cluster.routing.allocation.node_concurrent_recoveries", 1)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 1)
.build();
assertFalse(routingAllocation.isSimulating());
ThrottlingAllocationDecider decider = new ThrottlingAllocationDecider(ClusterSettings.createBuiltInClusterSettings(settings));

// A single primary can be allocated.
assertThat(
decider.canAllocate(harness.unassignedShardRouting1Primary, harness.mutableRoutingNode1, routingAllocation),
equalTo(Decision.YES)
);
var shardRouting1PrimaryInitializing = harness.mutableRoutingNodes.initializeShard(
harness.unassignedShardRouting1Primary,
harness.mutableRoutingNode1.nodeId(),
null,
0,
NOOP
);

// Leaving the first shard's primary in an INITIALIZING state should THROTTLE further allocation.
// Only 1 concurrent allocation is allowed.
assertThat(
decider.canAllocate(harness.unassignedShardRouting2Primary, harness.mutableRoutingNode1, routingAllocation),
equalTo(Decision.THROTTLE)
);

// The first shard's replica should receive a simple NO because the corresponding primary is not active yet.
assertThat(
decider.canAllocate(harness.unassignedShardRouting1Replica, harness.mutableRoutingNode2, routingAllocation),
equalTo(Decision.NO)
);

// Start the first shard's primary, and initialize the second shard's primary to again reach the 1 concurrency limit.
harness.mutableRoutingNodes.startShard(shardRouting1PrimaryInitializing, NOOP, 0);
assertThat(
decider.canAllocate(harness.unassignedShardRouting2Primary, harness.mutableRoutingNode2, routingAllocation),
equalTo(Decision.YES)
);
harness.mutableRoutingNodes.initializeShard(
harness.unassignedShardRouting2Primary,
harness.mutableRoutingNode2.nodeId(),
null,
0,
NOOP
);

// The first shard's replica should receive THROTTLE now, since the primary is active.
// There is still already 1 allocation in progress, which is the limit.
assertThat(
decider.canAllocate(harness.unassignedShardRouting1Replica, harness.mutableRoutingNode2, routingAllocation),
equalTo(Decision.THROTTLE)
);
}

public void testPrimaryAndReplicaThrottlingInSimulation() {
/* Create cluster state for multiple nodes and an index with _unassigned_ shards. */
TestHarness harness = setUpTwoNodesAndIndexWithTwoUnassignedPrimariesAndReplicas();
var mutableRoutingNodes = harness.clusterState.mutableRoutingNodes();

/* Decider Testing */

// Set up RoutingAllocation in simulation mode.
var routingAllocation = new RoutingAllocation(
null,
mutableRoutingNodes,
harness.clusterState,
ClusterInfo.builder().build(),
null,
System.nanoTime(),
true // Turn on isSimulating
);

final RoutingChangesObserver NOOP = new RoutingChangesObserver() {
};
Settings settings = Settings.builder()
.put("cluster.routing.allocation.unthrottle_replica_assignment_in_simulation", true)
.put("cluster.routing.allocation.node_concurrent_recoveries", 1)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 1)
.build();
assertTrue(routingAllocation.isSimulating());
ThrottlingAllocationDecider decider = new ThrottlingAllocationDecider(ClusterSettings.createBuiltInClusterSettings(settings));

// Primary path is unthrottled during simulation, regardless of the `node_initial_primaries_recoveries` setting
assertThat(
decider.canAllocate(harness.unassignedShardRouting1Primary, harness.mutableRoutingNode1, routingAllocation),
equalTo(Decision.YES)
);
mutableRoutingNodes.initializeShard(harness.unassignedShardRouting1Primary, harness.mutableRoutingNode1.nodeId(), null, 0, NOOP);
assertThat(
decider.canAllocate(harness.unassignedShardRouting2Primary, harness.mutableRoutingNode1, routingAllocation),
equalTo(Decision.YES)
);
mutableRoutingNodes.initializeShard(harness.unassignedShardRouting2Primary, harness.mutableRoutingNode1.nodeId(), null, 0, NOOP);

// Replica path is unthrottled during simulation AND `unthrottle_replica_assignment_in_simulation` is set to true.
assertThat(
decider.canAllocate(harness.unassignedShardRouting1Replica, harness.mutableRoutingNode2, routingAllocation),
equalTo(Decision.YES)
);
mutableRoutingNodes.initializeShard(harness.unassignedShardRouting1Replica, harness.mutableRoutingNode2.nodeId(), null, 0, NOOP);
assertThat(
decider.canAllocate(harness.unassignedShardRouting2Replica, harness.mutableRoutingNode2, routingAllocation),
equalTo(Decision.YES)
);
mutableRoutingNodes.initializeShard(harness.unassignedShardRouting2Replica, harness.mutableRoutingNode2.nodeId(), null, 0, NOOP);

// Note: INITIALIZING was chosen above, not STARTED, because the BalancedShardsAllocator only initializes. We want that path to be
// unthrottled in simulation.
}
}
Loading