diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 1c1b5eed6fe95..b7220e0b4fc7e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -38,10 +38,12 @@ import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsAndWeightsCalculator; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision; +import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster; import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.elasticsearch.cluster.routing.allocation.allocator.BalancerSettings; import org.elasticsearch.cluster.routing.allocation.allocator.BalancingWeightsFactory; +import org.elasticsearch.cluster.routing.allocation.allocator.DefaultNonPreferredShardIteratorFactory; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceMetrics; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator.DesiredBalanceReconcilerAction; @@ -503,16 +505,30 @@ private static ShardsAllocator createShardsAllocator( ShardAllocationExplainer shardAllocationExplainer, DesiredBalanceMetrics desiredBalanceMetrics ) { + WriteLoadConstraintSettings writeLoadConstraintSettings = new WriteLoadConstraintSettings(clusterSettings); + DefaultNonPreferredShardIteratorFactory nonPreferredShardIteratorFactory = new DefaultNonPreferredShardIteratorFactory( + writeLoadConstraintSettings + ); Map> allocators = new HashMap<>(); allocators.put( BALANCED_ALLOCATOR, - () -> new BalancedShardsAllocator(balancerSettings, writeLoadForecaster, balancingWeightsFactory) + () -> new BalancedShardsAllocator( + balancerSettings, + writeLoadForecaster, + balancingWeightsFactory, + nonPreferredShardIteratorFactory + ) ); allocators.put( DESIRED_BALANCE_ALLOCATOR, () -> new DesiredBalanceShardsAllocator( clusterSettings, - new BalancedShardsAllocator(balancerSettings, writeLoadForecaster, balancingWeightsFactory), + new BalancedShardsAllocator( + balancerSettings, + writeLoadForecaster, + balancingWeightsFactory, + nonPreferredShardIteratorFactory + ), threadPool, clusterService, reconciler, diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index c737b89b80f73..336b3ace5c899 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -114,6 +114,7 @@ public class BalancedShardsAllocator implements ShardsAllocator { private final BalancerSettings balancerSettings; private final WriteLoadForecaster writeLoadForecaster; private final BalancingWeightsFactory balancingWeightsFactory; + private final NonPreferredShardIteratorFactory nonPreferredShardIteratorFactory; public BalancedShardsAllocator() { this(Settings.EMPTY); @@ -124,18 +125,28 @@ public BalancedShardsAllocator(Settings settings) { } public BalancedShardsAllocator(BalancerSettings balancerSettings, WriteLoadForecaster writeLoadForecaster) { - this(balancerSettings, writeLoadForecaster, new GlobalBalancingWeightsFactory(balancerSettings)); + this( + balancerSettings, + writeLoadForecaster, + new GlobalBalancingWeightsFactory(balancerSettings), + // We need to default to no-op here because there are lots of tests + // that depend on not returning after a single move + // TODO: default to NODE_INTERLEAVED or similar + NonPreferredShardIteratorFactory.NOOP + ); } @Inject public BalancedShardsAllocator( BalancerSettings balancerSettings, WriteLoadForecaster writeLoadForecaster, - BalancingWeightsFactory balancingWeightsFactory + BalancingWeightsFactory balancingWeightsFactory, + NonPreferredShardIteratorFactory nonPreferredShardIteratorFactory ) { this.balancerSettings = balancerSettings; this.writeLoadForecaster = writeLoadForecaster; this.balancingWeightsFactory = balancingWeightsFactory; + this.nonPreferredShardIteratorFactory = nonPreferredShardIteratorFactory; } @Override @@ -152,13 +163,25 @@ public void allocate(RoutingAllocation allocation) { return; } final BalancingWeights balancingWeights = balancingWeightsFactory.create(); - final Balancer balancer = new Balancer(writeLoadForecaster, allocation, balancerSettings.getThreshold(), balancingWeights); - balancer.allocateUnassigned(); - balancer.moveShards(); - balancer.balance(); + final Balancer balancer = new Balancer( + writeLoadForecaster, + allocation, + balancerSettings.getThreshold(), + balancingWeights, + nonPreferredShardIteratorFactory + ); - // Node weights are calculated after each internal balancing round and saved to the RoutingNodes copy. - collectAndRecordNodeWeightStats(balancer, balancingWeights, allocation); + try { + balancer.allocateUnassigned(); + if (balancer.moveNonPreferred()) { + return; + } + balancer.moveShards(); + balancer.balance(); + } finally { + // Node weights are calculated after each internal balancing round and saved to the RoutingNodes copy. + collectAndRecordNodeWeightStats(balancer, balancingWeights, allocation); + } } private void collectAndRecordNodeWeightStats(Balancer balancer, BalancingWeights balancingWeights, RoutingAllocation allocation) { @@ -188,7 +211,8 @@ public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, f writeLoadForecaster, allocation, balancerSettings.getThreshold(), - balancingWeightsFactory.create() + balancingWeightsFactory.create(), + nonPreferredShardIteratorFactory ); AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision.NOT_TAKEN; MoveDecision moveDecision = MoveDecision.NOT_TAKEN; @@ -248,12 +272,14 @@ public static class Balancer { private final Map nodes; private final BalancingWeights balancingWeights; private final NodeSorters nodeSorters; + private final NonPreferredShardIteratorFactory nonPreferredShardIteratorFactory; private Balancer( WriteLoadForecaster writeLoadForecaster, RoutingAllocation allocation, float threshold, - BalancingWeights balancingWeights + BalancingWeights balancingWeights, + NonPreferredShardIteratorFactory nonPreferredShardIteratorFactory ) { this.writeLoadForecaster = writeLoadForecaster; this.allocation = allocation; @@ -266,6 +292,7 @@ private Balancer( nodes = Collections.unmodifiableMap(buildModelFromAssigned()); this.nodeSorters = balancingWeights.createNodeSorters(nodesArray(), this); this.balancingWeights = balancingWeights; + this.nonPreferredShardIteratorFactory = nonPreferredShardIteratorFactory; } private static long getShardDiskUsageInBytes(ShardRouting shardRouting, IndexMetadata indexMetadata, ClusterInfo clusterInfo) { @@ -711,6 +738,89 @@ protected int comparePivot(int j) { return indices; } + /** + * Move a started shard in a non-preferred allocation + * + * @return true if a shard was moved, false otherwise + */ + private boolean moveNonPreferred() { + for (ShardRouting shardRouting : nonPreferredShardIteratorFactory.createNonPreferredShardIterator(allocation)) { + ProjectIndex index = projectIndex(shardRouting); + final MoveDecision moveDecision = decideMoveNonPreferred(index, shardRouting); + if (moveDecision.isDecisionTaken() && moveDecision.forceMove()) { + final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId()); + final ModelNode targetNode = nodes.get(moveDecision.getTargetNode().getId()); + sourceNode.removeShard(index, shardRouting); + Tuple relocatingShards = routingNodes.relocateShard( + shardRouting, + targetNode.getNodeId(), + allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), + "non-preferred", + allocation.changes() + ); + final ShardRouting shard = relocatingShards.v2(); + targetNode.addShard(projectIndex(shard), shard); + if (logger.isTraceEnabled()) { + logger.trace("Moved shard [{}] to node [{}]", shardRouting, targetNode.getRoutingNode()); + } + return true; + } else if (moveDecision.isDecisionTaken() && moveDecision.canRemain() == false) { + logger.trace("[{}][{}] can't move", shardRouting.index(), shardRouting.id()); + } + } + return false; + } + + /** + * Makes a decision on whether to move a started shard to another node. The following rules apply + * to the {@link MoveDecision} return object: + * 1. If the shard is not started, no decision will be taken and {@link MoveDecision#isDecisionTaken()} will return false. + * 2. If the shard's current allocation is preferred ({@link Decision.Type#YES}), no attempt will be made to move the shard and + * {@link MoveDecision#getCanRemainDecision} will have a decision type of YES. All other fields in the object will be null. + * 3. If the shard is not allowed ({@link Decision.Type#NO}), or not preferred ({@link Decision.Type#NOT_PREFERRED}) to remain + * on its current node, then {@link MoveDecision#getAllocationDecision()} will be populated with the decision of moving to + * another node. If {@link MoveDecision#forceMove()} returns {@code true}, then {@link MoveDecision#getTargetNode} will return + * a non-null value representing a node that returned {@link Decision.Type#YES} from canAllocate, otherwise the assignedNodeId + * will be null. + * 4. If the method is invoked in explain mode (e.g. from the cluster allocation explain APIs), then + * {@link MoveDecision#getNodeDecisions} will have a non-null value. + */ + public MoveDecision decideMoveNonPreferred(final ProjectIndex index, final ShardRouting shardRouting) { + NodeSorter sorter = nodeSorters.sorterForShard(shardRouting); + index.assertMatch(shardRouting); + + if (shardRouting.started() == false) { + // we can only move started shards + return MoveDecision.NOT_TAKEN; + } + + final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId()); + assert sourceNode != null && sourceNode.containsShard(index, shardRouting); + RoutingNode routingNode = sourceNode.getRoutingNode(); + Decision canRemain = allocation.deciders().canRemain(shardRouting, routingNode, allocation); + if (canRemain.type() != Type.NOT_PREFERRED && canRemain.type() != Type.NO) { + return MoveDecision.remain(canRemain); + } + + sorter.reset(index); + /* + * the sorter holds the minimum weight node first for the shards index. + * We now walk through the nodes until we find a node to allocate the shard. + * This is not guaranteed to be balanced after this operation we still try best effort to + * allocate on the minimal eligible node. + */ + return decideMove(sorter, shardRouting, sourceNode, canRemain, this::decideCanAllocatePreferredOnly); + } + + private Decision decideCanAllocatePreferredOnly(ShardRouting shardRouting, RoutingNode target) { + Decision decision = allocation.deciders().canAllocate(shardRouting, target, allocation); + // not-preferred means no here + if (decision.type() == Type.NOT_PREFERRED) { + return Decision.NO; + } + return decision; + } + /** * Move started shards that can not be allocated to a node anymore * diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactory.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactory.java new file mode 100644 index 0000000000000..b4df7220a1975 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactory.java @@ -0,0 +1,158 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing.allocation.allocator; + +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.function.Function; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +/** + * Non-preferred shard iterator factory that returns the most desirable shards from most-hot-spotted + * nodes first. + *
    + *
  • Any nodes missing queue-latency information are considered to have a queue-latency of 0.
  • + *
  • Any shards missing write-load information are considered to have a write-load of 0.
  • + *
+ */ +public record DefaultNonPreferredShardIteratorFactory(WriteLoadConstraintSettings writeLoadConstraintSettings) + implements + NonPreferredShardIteratorFactory { + + @Override + public Iterable createNonPreferredShardIterator(RoutingAllocation allocation) { + if (writeLoadConstraintSettings.getWriteLoadConstraintEnabled().notFullyEnabled()) { + return Collections.emptyList(); + } + final Set allClusterNodes = new TreeSet<>(Comparator.reverseOrder()); + final var nodeUsageStatsForThreadPools = allocation.clusterInfo().getNodeUsageStatsForThreadPools(); + for (RoutingNode node : allocation.routingNodes()) { + var nodeUsageStats = nodeUsageStatsForThreadPools.get(node.nodeId()); + if (nodeUsageStats != null) { + final var writeThreadPoolStats = nodeUsageStats.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE); + assert writeThreadPoolStats != null; + allClusterNodes.add(new NodeShardIterable(allocation, node, writeThreadPoolStats.maxThreadPoolQueueLatencyMillis())); + } else { + allClusterNodes.add(new NodeShardIterable(allocation, node, 0L)); + } + } + return () -> new LazilyExpandingIterator<>(allClusterNodes); + } + + /** + * Returns all shards from a node in the order + * + *
    + *
  1. shards with medium write-load
  2. + *
  3. shards with high write-load
  4. + *
  5. shards with low write-load
  6. + *
+ * + * Where low and high thresholds are {@link #LOW_THRESHOLD} * max-write-load + * and {@link #HIGH_THRESHOLD} * max-write-load respectively. + */ + private record NodeShardIterable(RoutingAllocation allocation, RoutingNode routingNode, long maxQueueLatencyMillis) + implements + Iterable, + Comparable { + + private static final double LOW_THRESHOLD = 0.5; + private static final double HIGH_THRESHOLD = 0.8; + + @Override + public Iterator iterator() { + return createShardIterator(); + } + + @Override + public int compareTo(NodeShardIterable o) { + return Long.compare(maxQueueLatencyMillis, o.maxQueueLatencyMillis); + } + + private Iterator createShardIterator() { + final var shardWriteLoads = allocation.clusterInfo().getShardWriteLoads(); + final WriteLoadFilter filter = WriteLoadFilter.create(shardWriteLoads); + return Stream.of( + StreamSupport.stream(routingNode.spliterator(), false).filter(filter::hasMediumLoad), + StreamSupport.stream(routingNode.spliterator(), false).filter(filter::hasHighLoad), + StreamSupport.stream(routingNode.spliterator(), false).filter(filter::hasLowLoad) + ).flatMap(Function.identity()).iterator(); + } + + private record WriteLoadFilter(Map shardWriteLoads, double lowThreshold, double highThreshold) { + + public static WriteLoadFilter create(Map shardWriteLoads) { + final double maxWriteLoad = shardWriteLoads.values().stream().reduce(0.0, Double::max); + final double lowThreshold = maxWriteLoad * NodeShardIterable.LOW_THRESHOLD; + final double highThreshold = maxWriteLoad * NodeShardIterable.HIGH_THRESHOLD; + return new WriteLoadFilter(shardWriteLoads, lowThreshold, highThreshold); + } + + public boolean hasMediumLoad(ShardRouting shardRouting) { + double shardWriteLoad = shardWriteLoad(shardRouting); + return shardWriteLoad >= lowThreshold && shardWriteLoad < highThreshold; + } + + public boolean hasHighLoad(ShardRouting shardRouting) { + return shardWriteLoad(shardRouting) >= highThreshold; + } + + public boolean hasLowLoad(ShardRouting shardRouting) { + return shardWriteLoad(shardRouting) < lowThreshold; + } + + private double shardWriteLoad(ShardRouting shardRouting) { + return shardWriteLoads.getOrDefault(shardRouting.shardId(), 0.0); + } + } + } + + static class LazilyExpandingIterator implements Iterator { + + private final Iterator> allIterables; + private Iterator currentIterator; + + LazilyExpandingIterator(Iterable> allIterables) { + this.allIterables = allIterables.iterator(); + } + + @Override + public boolean hasNext() { + while (currentIterator == null || currentIterator.hasNext() == false) { + if (allIterables.hasNext() == false) { + return false; + } else { + currentIterator = allIterables.next().iterator(); + } + } + return true; + } + + @Override + public T next() { + while (currentIterator == null || currentIterator.hasNext() == false) { + currentIterator = allIterables.next().iterator(); + } + return currentIterator.next(); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/NonPreferredShardIteratorFactory.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/NonPreferredShardIteratorFactory.java new file mode 100644 index 0000000000000..b92d3d9ff49f7 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/NonPreferredShardIteratorFactory.java @@ -0,0 +1,42 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing.allocation.allocator; + +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; + +import java.util.Collections; + +/** + * A factory that produces {@link Iterable}s of {@link ShardRouting}s used to look for non-preferred allocation and + * try to relocate them. The first shard encountered that the allocation deciders indicate is in a NOT_PREFERRED + * allocation, and can be moved to a preferred allocation, will be moved and the iteration will stop. + */ +public interface NonPreferredShardIteratorFactory { + + /** + * Doesn't iterate over the shards at all, can be used to disable movement of NON_PREFERRED shards. + */ + NonPreferredShardIteratorFactory NOOP = ignored -> Collections.emptyList(); + + /** + * Just iterates over all shards using {@link org.elasticsearch.cluster.routing.RoutingNodes#nodeInterleavedShardIterator()} + */ + NonPreferredShardIteratorFactory NODE_INTERLEAVED = allocation -> () -> allocation.routingNodes().nodeInterleavedShardIterator(); + + /** + * Create an iterator returning all shards to be checked for non-preferred allocation, ordered in + * descending desirability-to-move order + * + * @param allocation the current routing allocation + * @return An iterator containing shards we'd like to move to a preferred allocation + */ + Iterable createNonPreferredShardIterator(RoutingAllocation allocation); +} diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java index 3667de9c65e4e..3b31303ae2b6d 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java @@ -621,7 +621,8 @@ public void testPartitionedClusterWithSeparateWeights() { TEST_WRITE_LOAD_FORECASTER, new PrefixBalancingWeightsFactory( Map.of("shardsOnly", new WeightFunction(1, 0, 0, 0), "weightsOnly", new WeightFunction(0, 0, 1, 0)) - ) + ), + NonPreferredShardIteratorFactory.NOOP ), EmptyClusterInfoService.INSTANCE, SNAPSHOT_INFO_SERVICE_WITH_NO_SHARD_SIZES diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactoryTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactoryTests.java new file mode 100644 index 0000000000000..cece9903846c6 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactoryTests.java @@ -0,0 +1,232 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing.allocation.allocator; + +import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; +import org.elasticsearch.cluster.ClusterInfo; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.NodeUsageStatsForThreadPools; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; +import org.elasticsearch.cluster.routing.allocation.allocator.DefaultNonPreferredShardIteratorFactory.LazilyExpandingIterator; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.StreamSupport; + +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +public class DefaultNonPreferredShardIteratorFactoryTests extends ESTestCase { + + public void testLazilyExpandingIterator() { + final List> allValues = new ArrayList<>(); + final List flatValues = new ArrayList<>(); + IntStream.range(0, randomIntBetween(0, 30)).forEach(i -> { + int listSize = randomIntBetween(0, 10); + final var innerList = IntStream.range(0, listSize).mapToObj(j -> (i + "/" + j)).toList(); + allValues.add(innerList); + flatValues.addAll(innerList); + }); + + Iterator iterator = new LazilyExpandingIterator<>(allValues); + + int nextIndex = 0; + while (true) { + if (randomBoolean()) { + assertEquals(iterator.hasNext(), nextIndex < flatValues.size()); + } else { + if (nextIndex < flatValues.size()) { + assertEquals(iterator.next(), flatValues.get(nextIndex++)); + } else { + assertThrows(NoSuchElementException.class, iterator::next); + } + } + if (randomBoolean() && nextIndex == flatValues.size()) { + break; + } + } + } + + public void testShardIterationOrder() { + final var iteratorFactory = new DefaultNonPreferredShardIteratorFactory( + new WriteLoadConstraintSettings( + ClusterSettings.createBuiltInClusterSettings( + Settings.builder() + .put( + WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), + WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED + ) + .build() + ) + ) + ); + final RoutingAllocation routingAllocation = createRoutingAllocation(randomIntBetween(2, 20)); + final double maxShardWriteLoad = routingAllocation.clusterInfo().getShardWriteLoads().values().stream().reduce(0.0, Double::max); + final double lowThreshold = maxShardWriteLoad * 0.5; + final double highThreshold = maxShardWriteLoad * 0.8; + final Iterable shards = iteratorFactory.createNonPreferredShardIterator(routingAllocation); + + String nodeId = null; + Tier lastTierForNode = null; + long lastNodeQueueLatency = -1; + int totalCount = 0; + for (ShardRouting shardRouting : shards) { + totalCount++; + if (Objects.equals(nodeId, shardRouting.currentNodeId()) == false) { + lastTierForNode = null; + nodeId = shardRouting.currentNodeId(); + } + NodeUsageStatsForThreadPools nodeUsageStatsForThreadPools = routingAllocation.clusterInfo() + .getNodeUsageStatsForThreadPools() + .get(nodeId); + // Should not receive shards from nodes with no usage stats + assertNotNull(nodeUsageStatsForThreadPools); + long thisNodeQueueLatency = nodeUsageStatsForThreadPools.threadPoolUsageStatsMap() + .get(ThreadPool.Names.WRITE) + .maxThreadPoolQueueLatencyMillis(); + // should receive shards from nodes in descending queue latency order + if (lastNodeQueueLatency != -1) { + assertThat(thisNodeQueueLatency, lessThanOrEqualTo(lastNodeQueueLatency)); + } + lastNodeQueueLatency = thisNodeQueueLatency; + final double shardWriteLoad = routingAllocation.clusterInfo().getShardWriteLoads().getOrDefault(shardRouting.shardId(), 0.0); + // Inside nodes, shards should be delivered in tier order + Tier tier = tierFor(shardWriteLoad, lowThreshold, highThreshold); + if (lastTierForNode != null) { + assertThat(tier, greaterThanOrEqualTo(lastTierForNode)); + } + lastTierForNode = tier; + } + + if (totalCount > 0) { + assertThat(lastNodeQueueLatency, greaterThanOrEqualTo(0L)); + } + } + + public void testThatAllShardsAreReturnedOnce() { + final var iteratorFactory = new DefaultNonPreferredShardIteratorFactory( + new WriteLoadConstraintSettings( + ClusterSettings.createBuiltInClusterSettings( + Settings.builder() + .put( + WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), + WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED + ) + .build() + ) + ) + ); + final RoutingAllocation routingAllocation = createRoutingAllocation(randomIntBetween(2, 20)); + final Set allShardRoutings = routingAllocation.routingNodes() + .stream() + .flatMap(n -> StreamSupport.stream(n.spliterator(), false)) + .collect(Collectors.toSet()); + for (ShardRouting shard : iteratorFactory.createNonPreferredShardIterator(routingAllocation)) { + assertTrue(allShardRoutings.remove(shard)); + } + assertThat(allShardRoutings, empty()); + } + + private Tier tierFor(double writeLoad, double lowThreshold, double highThreshold) { + return writeLoad < lowThreshold ? Tier.LOW : writeLoad < highThreshold ? Tier.MEDIUM : Tier.HIGH; + } + + private enum Tier { + MEDIUM, + HIGH, + LOW + } + + public void testNoShardsAreReturnedWhenWriteLoadDeciderNotFullyEnabled() { + final var iteratorFactory = new DefaultNonPreferredShardIteratorFactory( + new WriteLoadConstraintSettings( + ClusterSettings.createBuiltInClusterSettings( + Settings.builder() + .put( + WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), + randomFrom( + WriteLoadConstraintSettings.WriteLoadDeciderStatus.DISABLED, + WriteLoadConstraintSettings.WriteLoadDeciderStatus.LOW_THRESHOLD_ONLY + ) + ) + .build() + ) + ) + ); + final RoutingAllocation routingAllocation = createRoutingAllocation(randomIntBetween(2, 20)); + final Iterable shards = iteratorFactory.createNonPreferredShardIterator(routingAllocation); + assertFalse(shards.iterator().hasNext()); + } + + private RoutingAllocation createRoutingAllocation(int numberOfNodes) { + int writeThreadPoolSize = randomIntBetween(4, 32); + final Map nodeUsageStats = new HashMap<>(); + final List allNodeIds = new ArrayList<>(); + IntStream.range(0, numberOfNodes).mapToObj(i -> "node_" + i).forEach(nodeId -> { + // Some have no utilization + if (usually()) { + nodeUsageStats.put( + nodeId, + new NodeUsageStatsForThreadPools( + nodeId, + Map.of( + ThreadPool.Names.WRITE, + new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( + writeThreadPoolSize, + randomFloatBetween(0.0f, 1.0f, true), + randomLongBetween(0, 5_000) + ) + ) + ) + ); + } + allNodeIds.add(nodeId); + }); + + ClusterInfo.Builder clusterInfo = ClusterInfo.builder().nodeUsageStatsForThreadPools(nodeUsageStats); + + final int numberOfPrimaries = randomIntBetween(1, numberOfNodes * 3); + final ClusterState state = ClusterStateCreationUtils.state( + numberOfNodes, + new String[] { randomIdentifier(), randomIdentifier(), randomIdentifier() }, + numberOfPrimaries + ); + + final Map shardWriteLoads = new HashMap<>(); + for (RoutingNode node : state.getRoutingNodes()) { + for (ShardRouting shardRouting : node) { + // Some have no write-load + if (usually()) { + shardWriteLoads.put(shardRouting.shardId(), randomDoubleBetween(0.0, writeThreadPoolSize, true)); + } + } + } + clusterInfo.shardWriteLoads(shardWriteLoads); + + return new RoutingAllocation(null, state, clusterInfo.build(), null, System.nanoTime()); + } +}