diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java b/server/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java index 2ce349e2d3b61..613bf3d7ea69b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java @@ -74,7 +74,7 @@ public final class ShardRouting implements Writeable, ToXContentObject { */ ShardRouting( ShardId shardId, - String currentNodeId, + @Nullable String currentNodeId, String relocatingNodeId, boolean primary, ShardRoutingState state, diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java index 982356153a61d..a4a2f3212e7e1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java @@ -123,7 +123,7 @@ public RoutingAllocation( * @param currentNanoTime the nano time to use for all delay allocation calculation (typically {@link System#nanoTime()}) * @param isSimulating {@code true} if "transient" deciders should be ignored because we are simulating the final allocation */ - private RoutingAllocation( + public RoutingAllocation( AllocationDeciders deciders, @Nullable RoutingNodes routingNodes, ClusterState clusterState, diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java index 708656cd52288..6774234c6c6bd 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java @@ -77,9 +77,17 @@ public class ThrottlingAllocationDecider extends AllocationDecider { Property.NodeScope ); + public static final Setting CLUSTER_ROUTING_ALLOCATION_UNTHROTTLE_REPLICA_ASSIGNMENT_IN_SIMULATION = Setting.boolSetting( + "cluster.routing.allocation.unthrottle_replica_assignment_in_simulation", + false, + Property.Dynamic, + Property.NodeScope + ); + private volatile int primariesInitialRecoveries; private volatile int concurrentIncomingRecoveries; private volatile int concurrentOutgoingRecoveries; + private volatile boolean unthrottleReplicaAssignmentInSimulation = false; public ThrottlingAllocationDecider(ClusterSettings clusterSettings) { clusterSettings.initializeAndWatch( @@ -94,6 +102,9 @@ public ThrottlingAllocationDecider(ClusterSettings clusterSettings) { CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING, this::setConcurrentOutgoingRecoverries ); + clusterSettings.initializeAndWatch(CLUSTER_ROUTING_ALLOCATION_UNTHROTTLE_REPLICA_ASSIGNMENT_IN_SIMULATION, (settingVal) -> { + unthrottleReplicaAssignmentInSimulation = settingVal; + }); logger.debug( "using node_concurrent_outgoing_recoveries [{}], node_concurrent_incoming_recoveries [{}], " + "node_initial_primaries_recoveries [{}]", @@ -150,6 +161,13 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing ); } return allocation.decision(YES, NAME, "below primary recovery limit of [%d]", primariesInitialRecoveries); + } else if (allocation.isSimulating() && unthrottleReplicaAssignmentInSimulation && shardRouting.unassigned()) { + // CLUSTER_ROUTING_ALLOCATION_UNTHROTTLE_REPLICA_ASSIGNMENT_IN_SIMULATION permits us to treat unassigned replicas as equally + // urgent as unassigned primaries to allocate, for availability reasons. + // During simulation, this supports early publishing DesiredBalance, with all unassigned shards assigned. + // Notably, this bypass is only in simulation decisions. Reconciliation will continue to obey throttling, in particular the + // requirement to assign a primary before allowing its replicas to begin initializing. + return allocation.decision(Decision.YES, NAME, "replica allocation is not throttled when simulating"); } else { // Peer recovery assert initializingShard(shardRouting, node.nodeId()).recoverySource().getType() == RecoverySource.Type.PEER; diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 3c73cd0e7b462..13b75fab557cc 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -291,6 +291,7 @@ public void apply(Settings value, Settings current, Settings previous) { ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING, ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING, ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING, + ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_UNTHROTTLE_REPLICA_ASSIGNMENT_IN_SIMULATION, DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING, DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_MAX_HEADROOM_SETTING, DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING, diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationDeciderTests.java new file mode 100644 index 0000000000000..50f8e2b873000 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationDeciderTests.java @@ -0,0 +1,234 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing.allocation; + +import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; +import org.elasticsearch.cluster.ClusterInfo; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ESAllocationTestCase; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.routing.RoutingChangesObserver; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.decider.Decision; +import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class ThrottlingAllocationDeciderTests extends ESAllocationTestCase { + + private record TestHarness( + ClusterState clusterState, + RoutingNodes mutableRoutingNodes, + RoutingNode mutableRoutingNode1, + RoutingNode mutableRoutingNode2, + ShardRouting unassignedShardRouting1Primary, + ShardRouting unassignedShardRouting1Replica, + ShardRouting unassignedShardRouting2Primary, + ShardRouting unassignedShardRouting2Replica + ) {} + + private TestHarness setUpTwoNodesAndIndexWithTwoUnassignedPrimariesAndReplicas() { + int numberOfShards = 2; + ClusterState clusterState = ClusterStateCreationUtils.stateWithUnassignedPrimariesAndReplicas( + new String[] { "test-index" }, + numberOfShards, + 1 + ); + // The number of data nodes the util method above creates is numberOfReplicas+1. + assertEquals(2, clusterState.nodes().size()); + assertEquals(1, clusterState.metadata().getTotalNumberOfIndices()); + + var indexIterator = clusterState.metadata().indicesAllProjects().iterator(); + assertTrue(indexIterator.hasNext()); + IndexMetadata testIndexMetadata = indexIterator.next(); + assertFalse(indexIterator.hasNext()); + Index testIndex = testIndexMetadata.getIndex(); + assertEquals(numberOfShards, testIndexMetadata.getNumberOfShards()); + ShardId testShardId1 = new ShardId(testIndex, 0); + ShardId testShardId2 = new ShardId(testIndex, 1); + + var mutableRoutingNodes = clusterState.mutableRoutingNodes(); + + // The RoutingNode references must be to the RoutingAllocation's RoutingNodes, so that changes to one is reflected in the other. + var routingNodesIterator = mutableRoutingNodes.iterator(); + assertTrue(routingNodesIterator.hasNext()); + var mutableRoutingNode1 = routingNodesIterator.next(); + assertTrue(routingNodesIterator.hasNext()); + var mutableRoutingNode2 = routingNodesIterator.next(); + assertFalse(routingNodesIterator.hasNext()); + + RoutingTable routingTable = clusterState.routingTable(ProjectId.DEFAULT); + + assertThat(routingTable.shardRoutingTable(testShardId1).replicaShards().size(), equalTo(1)); + assertThat(routingTable.shardRoutingTable(testShardId2).replicaShards().size(), equalTo(1)); + + ShardRouting unassignedShardRouting1Primary = routingTable.shardRoutingTable(testShardId1).primaryShard(); + ShardRouting unassignedShardRouting1Replica = routingTable.shardRoutingTable(testShardId1).replicaShards().get(0); + ShardRouting unassignedShardRouting2Primary = routingTable.shardRoutingTable(testShardId2).primaryShard(); + ShardRouting unassignedShardRouting2Replica = routingTable.shardRoutingTable(testShardId2).replicaShards().get(0); + + assertFalse(unassignedShardRouting1Primary.assignedToNode()); + assertFalse(unassignedShardRouting1Replica.assignedToNode()); + assertFalse(unassignedShardRouting2Primary.assignedToNode()); + assertFalse(unassignedShardRouting2Replica.assignedToNode()); + + return new TestHarness( + clusterState, + mutableRoutingNodes, + mutableRoutingNode1, + mutableRoutingNode2, + unassignedShardRouting1Primary, + unassignedShardRouting1Replica, + unassignedShardRouting2Primary, + unassignedShardRouting2Replica + ); + } + + public void testPrimaryAndReplicaThrottlingNotSimulation() { + /* Create cluster state for multiple nodes and an index with _unassigned_ shards. */ + TestHarness harness = setUpTwoNodesAndIndexWithTwoUnassignedPrimariesAndReplicas(); + + /* Decider Testing */ + + // Set up RoutingAllocation in non-simulation mode. + var routingAllocation = new RoutingAllocation( + null, + harness.mutableRoutingNodes, + harness.clusterState, + ClusterInfo.builder().build(), + null, + System.nanoTime(), + false // Turn off isSimulating + ); + + final RoutingChangesObserver NOOP = new RoutingChangesObserver() { + }; + Settings settings = Settings.builder() + .put("cluster.routing.allocation.unthrottle_replica_assignment_in_simulation", randomBoolean() ? true : false) + .put("cluster.routing.allocation.node_concurrent_recoveries", 1) + .put("cluster.routing.allocation.node_initial_primaries_recoveries", 1) + .build(); + assertFalse(routingAllocation.isSimulating()); + ThrottlingAllocationDecider decider = new ThrottlingAllocationDecider(ClusterSettings.createBuiltInClusterSettings(settings)); + + // A single primary can be allocated. + assertThat( + decider.canAllocate(harness.unassignedShardRouting1Primary, harness.mutableRoutingNode1, routingAllocation), + equalTo(Decision.YES) + ); + var shardRouting1PrimaryInitializing = harness.mutableRoutingNodes.initializeShard( + harness.unassignedShardRouting1Primary, + harness.mutableRoutingNode1.nodeId(), + null, + 0, + NOOP + ); + + // Leaving the first shard's primary in an INITIALIZING state should THROTTLE further allocation. + // Only 1 concurrent allocation is allowed. + assertThat( + decider.canAllocate(harness.unassignedShardRouting2Primary, harness.mutableRoutingNode1, routingAllocation), + equalTo(Decision.THROTTLE) + ); + + // The first shard's replica should receive a simple NO because the corresponding primary is not active yet. + assertThat( + decider.canAllocate(harness.unassignedShardRouting1Replica, harness.mutableRoutingNode2, routingAllocation), + equalTo(Decision.NO) + ); + + // Start the first shard's primary, and initialize the second shard's primary to again reach the 1 concurrency limit. + harness.mutableRoutingNodes.startShard(shardRouting1PrimaryInitializing, NOOP, 0); + assertThat( + decider.canAllocate(harness.unassignedShardRouting2Primary, harness.mutableRoutingNode2, routingAllocation), + equalTo(Decision.YES) + ); + harness.mutableRoutingNodes.initializeShard( + harness.unassignedShardRouting2Primary, + harness.mutableRoutingNode2.nodeId(), + null, + 0, + NOOP + ); + + // The first shard's replica should receive THROTTLE now, since the primary is active. + // There is still already 1 allocation in progress, which is the limit. + assertThat( + decider.canAllocate(harness.unassignedShardRouting1Replica, harness.mutableRoutingNode2, routingAllocation), + equalTo(Decision.THROTTLE) + ); + } + + public void testPrimaryAndReplicaThrottlingInSimulation() { + /* Create cluster state for multiple nodes and an index with _unassigned_ shards. */ + TestHarness harness = setUpTwoNodesAndIndexWithTwoUnassignedPrimariesAndReplicas(); + var mutableRoutingNodes = harness.clusterState.mutableRoutingNodes(); + + /* Decider Testing */ + + // Set up RoutingAllocation in simulation mode. + var routingAllocation = new RoutingAllocation( + null, + mutableRoutingNodes, + harness.clusterState, + ClusterInfo.builder().build(), + null, + System.nanoTime(), + true // Turn on isSimulating + ); + + final RoutingChangesObserver NOOP = new RoutingChangesObserver() { + }; + Settings settings = Settings.builder() + .put("cluster.routing.allocation.unthrottle_replica_assignment_in_simulation", true) + .put("cluster.routing.allocation.node_concurrent_recoveries", 1) + .put("cluster.routing.allocation.node_initial_primaries_recoveries", 1) + .build(); + assertTrue(routingAllocation.isSimulating()); + ThrottlingAllocationDecider decider = new ThrottlingAllocationDecider(ClusterSettings.createBuiltInClusterSettings(settings)); + + // Primary path is unthrottled during simulation, regardless of the `node_initial_primaries_recoveries` setting + assertThat( + decider.canAllocate(harness.unassignedShardRouting1Primary, harness.mutableRoutingNode1, routingAllocation), + equalTo(Decision.YES) + ); + mutableRoutingNodes.initializeShard(harness.unassignedShardRouting1Primary, harness.mutableRoutingNode1.nodeId(), null, 0, NOOP); + assertThat( + decider.canAllocate(harness.unassignedShardRouting2Primary, harness.mutableRoutingNode1, routingAllocation), + equalTo(Decision.YES) + ); + mutableRoutingNodes.initializeShard(harness.unassignedShardRouting2Primary, harness.mutableRoutingNode1.nodeId(), null, 0, NOOP); + + // Replica path is unthrottled during simulation AND `unthrottle_replica_assignment_in_simulation` is set to true. + assertThat( + decider.canAllocate(harness.unassignedShardRouting1Replica, harness.mutableRoutingNode2, routingAllocation), + equalTo(Decision.YES) + ); + mutableRoutingNodes.initializeShard(harness.unassignedShardRouting1Replica, harness.mutableRoutingNode2.nodeId(), null, 0, NOOP); + assertThat( + decider.canAllocate(harness.unassignedShardRouting2Replica, harness.mutableRoutingNode2, routingAllocation), + equalTo(Decision.YES) + ); + mutableRoutingNodes.initializeShard(harness.unassignedShardRouting2Replica, harness.mutableRoutingNode2.nodeId(), null, 0, NOOP); + + // Note: INITIALIZING was chosen above, not STARTED, because the BalancedShardsAllocator only initializes. We want that path to be + // unthrottled in simulation. + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java b/test/framework/src/main/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java index 783208843d9db..1a3faa7c628ce 100644 --- a/test/framework/src/main/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java @@ -596,6 +596,89 @@ public static ClusterState stateWithAssignedPrimariesAndReplicas( return state.build(); } + /** + * Creates cluster state that contains the specified indices with UNASSIGNED shards. The number of nodes will be + * `numberOfReplicas + 1` so that all index shards can be fully assigned. + * + * @param indices List of indices to create. + * @param numberOfShards Number of shards per index. + * @param numberOfReplicas Number of shard replicas per shard. + */ + public static ClusterState stateWithUnassignedPrimariesAndReplicas(String[] indices, int numberOfShards, int numberOfReplicas) { + return stateWithUnassignedPrimariesAndReplicas( + Metadata.DEFAULT_PROJECT_ID, + indices, + numberOfShards, + Collections.nCopies(numberOfReplicas, ShardRouting.Role.DEFAULT) + ); + } + + /** + * Creates cluster state that contains the specified indices with all UNASSIGNED shards. + * The number of nodes will be `replicaRoles.size() + 1` so that all index shards can be fully assigned. + */ + public static ClusterState stateWithUnassignedPrimariesAndReplicas( + ProjectId projectId, + String[] indices, + int numberOfShards, + List replicaRoles + ) { + int numberOfDataNodes = replicaRoles.size() + 1; + DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder(); + for (int i = 0; i < numberOfDataNodes; i++) { + final DiscoveryNode node = newNode(i); + discoBuilder = discoBuilder.add(node); + if (i == 0) { + discoBuilder.localNodeId(node.getId()); + discoBuilder.masterNodeId(node.getId()); + } + } + ClusterState.Builder state = ClusterState.builder(new ClusterName("test")); + state.nodes(discoBuilder); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + + Metadata.Builder metadataBuilder = Metadata.builder(); + final ProjectMetadata.Builder projectBuilder = ProjectMetadata.builder(projectId); + for (String index : indices) { + IndexMetadata indexMetadata = IndexMetadata.builder(index) + .settings( + indexSettings(IndexVersion.current(), numberOfShards, replicaRoles.size()).put( + SETTING_CREATION_DATE, + System.currentTimeMillis() + ) + ) + .timestampRange(IndexLongFieldRange.UNKNOWN) + .eventIngestedRange(IndexLongFieldRange.UNKNOWN) + .build(); + projectBuilder.put(indexMetadata, false); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(indexMetadata.getIndex()); + + for (int i = 0; i < numberOfShards; i++) { + final ShardId shardId = new ShardId(index, "_na_", i); + UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null); + IndexShardRoutingTable.Builder indexShardRoutingBuilder = IndexShardRoutingTable.builder(shardId); + indexShardRoutingBuilder.addShard( + shardRoutingBuilder(index, i, null, true, ShardRoutingState.UNASSIGNED).withUnassignedInfo(unassignedInfo).build() + ); + for (int replica = 0; replica < replicaRoles.size(); replica++) { + indexShardRoutingBuilder.addShard( + shardRoutingBuilder(index, i, null, false, ShardRoutingState.UNASSIGNED).withRole(replicaRoles.get(replica)) + .withUnassignedInfo(unassignedInfo) + .build() + ); + } + indexRoutingTableBuilder.addIndexShard(indexShardRoutingBuilder); + } + routingTableBuilder.add(indexRoutingTableBuilder.build()); + } + + metadataBuilder.put(projectBuilder).generateClusterUuidIfNeeded(); + + state.metadata(metadataBuilder); + state.routingTable(GlobalRoutingTable.builder().put(projectId, routingTableBuilder).build()); + return state.build(); + } + public static Tuple projectWithAssignedPrimariesAndReplicas( ProjectId projectId, String[] indices, diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/routing/TestShardRouting.java b/test/framework/src/main/java/org/elasticsearch/cluster/routing/TestShardRouting.java index 84e1dd532e2b7..5b243517efbe0 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/routing/TestShardRouting.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/routing/TestShardRouting.java @@ -12,6 +12,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.core.Nullable; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.repositories.IndexId; @@ -207,7 +208,7 @@ public static ShardRouting newShardRouting( public static ShardRouting newShardRouting( ShardId shardId, - String currentNodeId, + @Nullable String currentNodeId, String relocatingNodeId, boolean primary, ShardRoutingState state