From 83e2fb80756421ec32c4c08e454ea8cb922bcb42 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Fri, 10 Jan 2025 17:32:27 -0500 Subject: [PATCH 1/5] Extract metric handling from the desired balancer reconciler The DesiredBalanceReconciler applies desired balance to the routing table cluster metadata. This patch extract logic from the reconciler that pushes updates to the desired balance metrics. This removes extra parameters passed into the DesiredBalanceReconciler, and enables expanding the metrics that are collected without passing more objects into the reconciler class. --- .../allocator/ClusterAllocationStats.java | 17 ++++++ .../allocator/DesiredBalanceMetrics.java | 18 ++---- .../allocator/DesiredBalanceReconciler.java | 56 ++++--------------- .../DesiredBalanceShardsAllocator.java | 43 +++++++++++--- .../allocator/DesiredBalanceMetricsTests.java | 7 +-- .../DesiredBalanceReconcilerTests.java | 17 +----- 6 files changed, 76 insertions(+), 82 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationStats.java diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationStats.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationStats.java new file mode 100644 index 0000000000000..9c1ccf23b440d --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationStats.java @@ -0,0 +1,17 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing.allocation.allocator; + +/** + * Data structure to pass allocation statistics between the desired balance classes. + */ +public record ClusterAllocationStats(long unassignedShards, long totalAllocations, long undesiredAllocationsExcludingShuttingDownNodes) { + public static final ClusterAllocationStats EMPTY_ALLOCATION_STATS = new ClusterAllocationStats(-1, -1, -1); +}; diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java index fddf9267cdbb1..fd656b24f7e23 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java @@ -28,12 +28,8 @@ */ public class DesiredBalanceMetrics { - public record AllocationStats(long unassignedShards, long totalAllocations, long undesiredAllocationsExcludingShuttingDownNodes) {} - public record NodeWeightStats(long shardCount, double diskUsageInBytes, double writeLoad, double nodeWeight) {} - public static final DesiredBalanceMetrics NOOP = new DesiredBalanceMetrics(MeterRegistry.NOOP); - public static final String UNASSIGNED_SHARDS_METRIC_NAME = "es.allocator.desired_balance.shards.unassigned.current"; public static final String TOTAL_SHARDS_METRIC_NAME = "es.allocator.desired_balance.shards.current"; public static final String UNDESIRED_ALLOCATION_COUNT_METRIC_NAME = "es.allocator.desired_balance.allocations.undesired.current"; @@ -56,8 +52,6 @@ public record NodeWeightStats(long shardCount, double diskUsageInBytes, double w public static final String CURRENT_NODE_FORECASTED_DISK_USAGE_METRIC_NAME = "es.allocator.allocations.node.forecasted_disk_usage_bytes.current"; - public static final AllocationStats EMPTY_ALLOCATION_STATS = new AllocationStats(-1, -1, -1); - private volatile boolean nodeIsMaster = false; /** * Number of unassigned shards during last reconciliation @@ -80,16 +74,16 @@ public record NodeWeightStats(long shardCount, double diskUsageInBytes, double w ); public void updateMetrics( - AllocationStats allocationStats, + ClusterAllocationStats clusterAllocationStats, Map weightStatsPerNode, Map nodeAllocationStats ) { - assert allocationStats != null : "allocation stats cannot be null"; + assert clusterAllocationStats != null : "allocation stats cannot be null"; assert weightStatsPerNode != null : "node balance weight stats cannot be null"; - if (allocationStats != EMPTY_ALLOCATION_STATS) { - this.unassignedShards = allocationStats.unassignedShards; - this.totalAllocations = allocationStats.totalAllocations; - this.undesiredAllocations = allocationStats.undesiredAllocationsExcludingShuttingDownNodes; + if (clusterAllocationStats != ClusterAllocationStats.EMPTY_ALLOCATION_STATS) { + this.unassignedShards = clusterAllocationStats.unassignedShards(); + this.totalAllocations = clusterAllocationStats.totalAllocations(); + this.undesiredAllocations = clusterAllocationStats.undesiredAllocationsExcludingShuttingDownNodes(); } weightStatsPerNodeRef.set(weightStatsPerNode); allocationStatsPerNodeRef.set(nodeAllocationStats); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java index 83b370c1a7928..af56e72ac7cab 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java @@ -20,10 +20,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus; -import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsAndWeightsCalculator; -import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsAndWeightsCalculator.NodeAllocationStatsAndWeight; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; -import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceMetrics.AllocationStats; import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.ClusterSettings; @@ -36,9 +33,7 @@ import org.elasticsearch.threadpool.ThreadPool; import java.util.Comparator; -import java.util.HashMap; import java.util.Iterator; -import java.util.Map; import java.util.Set; import java.util.function.BiFunction; import java.util.stream.Collectors; @@ -74,16 +69,8 @@ public class DesiredBalanceReconciler { private double undesiredAllocationsLogThreshold; private final NodeAllocationOrdering allocationOrdering = new NodeAllocationOrdering(); private final NodeAllocationOrdering moveOrdering = new NodeAllocationOrdering(); - private final DesiredBalanceMetrics desiredBalanceMetrics; - private final NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator; - - public DesiredBalanceReconciler( - ClusterSettings clusterSettings, - ThreadPool threadPool, - DesiredBalanceMetrics desiredBalanceMetrics, - NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator - ) { - this.desiredBalanceMetrics = desiredBalanceMetrics; + + public DesiredBalanceReconciler(ClusterSettings clusterSettings, ThreadPool threadPool) { this.undesiredAllocationLogInterval = new FrequencyCappedAction( threadPool.relativeTimeInMillisSupplier(), TimeValue.timeValueMinutes(5) @@ -93,14 +80,13 @@ public DesiredBalanceReconciler( UNDESIRED_ALLOCATIONS_LOG_THRESHOLD_SETTING, value -> this.undesiredAllocationsLogThreshold = value ); - this.nodeAllocationStatsAndWeightsCalculator = nodeAllocationStatsAndWeightsCalculator; } - public void reconcile(DesiredBalance desiredBalance, RoutingAllocation allocation) { + public ClusterAllocationStats reconcile(DesiredBalance desiredBalance, RoutingAllocation allocation) { var nodeIds = allocation.routingNodes().getAllNodeIds(); allocationOrdering.retainNodes(nodeIds); moveOrdering.retainNodes(nodeIds); - new Reconciliation(desiredBalance, allocation).run(); + return new Reconciliation(desiredBalance, allocation).run(); } public void clear() { @@ -120,7 +106,7 @@ private class Reconciliation { this.routingNodes = allocation.routingNodes(); } - void run() { + ClusterAllocationStats run() { try (var ignored = allocation.withReconcilingFlag()) { logger.debug("Reconciling desired balance for [{}]", desiredBalance.lastConvergedIndex()); @@ -129,13 +115,13 @@ void run() { // no data nodes, so fail allocation to report red health failAllocationOfNewPrimaries(allocation); logger.trace("no nodes available, nothing to reconcile"); - return; + return ClusterAllocationStats.EMPTY_ALLOCATION_STATS; } if (desiredBalance.assignments().isEmpty()) { // no desired state yet but it is on its way and we'll reroute again when it is ready logger.trace("desired balance is empty, nothing to reconcile"); - return; + return ClusterAllocationStats.EMPTY_ALLOCATION_STATS; } // compute next moves towards current desired balance: @@ -153,28 +139,8 @@ void run() { var allocationStats = balance(); logger.debug("Reconciliation is complete"); - - updateDesireBalanceMetrics(allocationStats); - } - } - - private void updateDesireBalanceMetrics(AllocationStats allocationStats) { - var nodesStatsAndWeights = nodeAllocationStatsAndWeightsCalculator.nodesAllocationStatsAndWeights( - allocation.metadata(), - allocation.routingNodes(), - allocation.clusterInfo(), - desiredBalance - ); - Map filteredNodeAllocationStatsAndWeights = new HashMap<>( - nodesStatsAndWeights.size() - ); - for (var nodeStatsAndWeight : nodesStatsAndWeights.entrySet()) { - var node = allocation.nodes().get(nodeStatsAndWeight.getKey()); - if (node != null) { - filteredNodeAllocationStatsAndWeights.put(node, nodeStatsAndWeight.getValue()); - } + return allocationStats; } - desiredBalanceMetrics.updateMetrics(allocationStats, desiredBalance.weightsPerNode(), filteredNodeAllocationStatsAndWeights); } private boolean allocateUnassignedInvariant() { @@ -497,9 +463,9 @@ private void moveShards() { } } - private AllocationStats balance() { + private ClusterAllocationStats balance() { if (allocation.deciders().canRebalance(allocation).type() != Decision.Type.YES) { - return DesiredBalanceMetrics.EMPTY_ALLOCATION_STATS; + return ClusterAllocationStats.EMPTY_ALLOCATION_STATS; } int unassignedShards = routingNodes.unassigned().size() + routingNodes.unassigned().ignored().size(); @@ -567,7 +533,7 @@ private AllocationStats balance() { maybeLogUndesiredAllocationsWarning(totalAllocations, undesiredAllocationsExcludingShuttingDownNodes, routingNodes.size()); - return new AllocationStats(unassignedShards, totalAllocations, undesiredAllocationsExcludingShuttingDownNodes); + return new ClusterAllocationStats(unassignedShards, totalAllocations, undesiredAllocationsExcludingShuttingDownNodes); } private void maybeLogUndesiredAllocationsWarning(int totalAllocations, int undesiredAllocations, int nodeCount) { diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java index 5be26f0b3e8c7..93c5de0220a39 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java @@ -16,6 +16,7 @@ import org.elasticsearch.cluster.ClusterStateTaskExecutor; import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.AllocationService.RerouteStrategy; import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsAndWeightsCalculator; @@ -38,6 +39,7 @@ import java.util.ArrayList; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -67,6 +69,7 @@ public class DesiredBalanceShardsAllocator implements ShardsAllocator { private final AtomicReference currentDesiredBalanceRef = new AtomicReference<>(DesiredBalance.NOT_MASTER); private volatile boolean resetCurrentDesiredBalance = false; private final Set processedNodeShutdowns = new HashSet<>(); + private final NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator; private final DesiredBalanceMetrics desiredBalanceMetrics; // stats @@ -112,16 +115,12 @@ public DesiredBalanceShardsAllocator( NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator ) { this.desiredBalanceMetrics = new DesiredBalanceMetrics(telemetryProvider.getMeterRegistry()); + this.nodeAllocationStatsAndWeightsCalculator = nodeAllocationStatsAndWeightsCalculator; this.delegateAllocator = delegateAllocator; this.threadPool = threadPool; this.reconciler = reconciler; this.desiredBalanceComputer = desiredBalanceComputer; - this.desiredBalanceReconciler = new DesiredBalanceReconciler( - clusterService.getClusterSettings(), - threadPool, - desiredBalanceMetrics, - nodeAllocationStatsAndWeightsCalculator - ); + this.desiredBalanceReconciler = new DesiredBalanceReconciler(clusterService.getClusterSettings(), threadPool); this.desiredBalanceComputation = new ContinuousComputation<>(threadPool.generic()) { @Override @@ -324,7 +323,11 @@ protected void reconcile(DesiredBalance desiredBalance, RoutingAllocation alloca } else { logger.debug("Reconciling desired balance for [{}]", desiredBalance.lastConvergedIndex()); } - recordTime(cumulativeReconciliationTime, () -> desiredBalanceReconciler.reconcile(desiredBalance, allocation)); + recordTime(cumulativeReconciliationTime, () -> { + ClusterAllocationStats clusterAllocationStats = desiredBalanceReconciler.reconcile(desiredBalance, allocation); + updateDesireBalanceMetrics(desiredBalance, allocation, clusterAllocationStats); + }); + if (logger.isTraceEnabled()) { logger.trace("Reconciled desired balance: {}", desiredBalance); } else { @@ -358,6 +361,32 @@ public void resetDesiredBalance() { resetCurrentDesiredBalance = true; } + private void updateDesireBalanceMetrics( + DesiredBalance desiredBalance, + RoutingAllocation routingAllocation, + ClusterAllocationStats clusterAllocationStats + ) { + if (clusterAllocationStats == ClusterAllocationStats.EMPTY_ALLOCATION_STATS) { + return; + } + + var nodesStatsAndWeights = nodeAllocationStatsAndWeightsCalculator.nodesAllocationStatsAndWeights( + routingAllocation.metadata(), + routingAllocation.routingNodes(), + routingAllocation.clusterInfo(), + desiredBalance + ); + Map filteredNodeAllocationStatsAndWeights = + new HashMap<>(nodesStatsAndWeights.size()); + for (var nodeStatsAndWeight : nodesStatsAndWeights.entrySet()) { + var node = routingAllocation.nodes().get(nodeStatsAndWeight.getKey()); + if (node != null) { + filteredNodeAllocationStatsAndWeights.put(node, nodeStatsAndWeight.getValue()); + } + } + desiredBalanceMetrics.updateMetrics(clusterAllocationStats, desiredBalance.weightsPerNode(), filteredNodeAllocationStatsAndWeights); + } + public DesiredBalanceStats getStats() { return new DesiredBalanceStats( Math.max(currentDesiredBalanceRef.get().lastConvergedIndex(), 0L), diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetricsTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetricsTests.java index 9e6e080f38216..e6dbbdea6bd79 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetricsTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetricsTests.java @@ -9,7 +9,6 @@ package org.elasticsearch.cluster.routing.allocation.allocator; -import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceMetrics.AllocationStats; import org.elasticsearch.telemetry.InstrumentType; import org.elasticsearch.telemetry.RecordingMeterRegistry; import org.elasticsearch.telemetry.metric.MeterRegistry; @@ -27,7 +26,7 @@ public void testZeroAllMetrics() { long unassignedShards = randomNonNegativeLong(); long totalAllocations = randomNonNegativeLong(); long undesiredAllocations = randomNonNegativeLong(); - metrics.updateMetrics(new AllocationStats(unassignedShards, totalAllocations, undesiredAllocations), Map.of(), Map.of()); + metrics.updateMetrics(new ClusterAllocationStats(unassignedShards, totalAllocations, undesiredAllocations), Map.of(), Map.of()); assertEquals(totalAllocations, metrics.totalAllocations()); assertEquals(unassignedShards, metrics.unassignedShards()); assertEquals(undesiredAllocations, metrics.undesiredAllocations()); @@ -44,7 +43,7 @@ public void testMetricsAreOnlyPublishedWhenNodeIsMaster() { long unassignedShards = randomNonNegativeLong(); long totalAllocations = randomLongBetween(100, 10000000); long undesiredAllocations = randomLongBetween(0, totalAllocations); - metrics.updateMetrics(new AllocationStats(unassignedShards, totalAllocations, undesiredAllocations), Map.of(), Map.of()); + metrics.updateMetrics(new ClusterAllocationStats(unassignedShards, totalAllocations, undesiredAllocations), Map.of(), Map.of()); // Collect when not master meterRegistry.getRecorder().collect(); @@ -104,7 +103,7 @@ public void testUndesiredAllocationRatioIsZeroWhenTotalShardsIsZero() { RecordingMeterRegistry meterRegistry = new RecordingMeterRegistry(); DesiredBalanceMetrics metrics = new DesiredBalanceMetrics(meterRegistry); long unassignedShards = randomNonNegativeLong(); - metrics.updateMetrics(new AllocationStats(unassignedShards, 0, 0), Map.of(), Map.of()); + metrics.updateMetrics(new ClusterAllocationStats(unassignedShards, 0, 0), Map.of(), Map.of()); metrics.setNodeIsMaster(true); meterRegistry.getRecorder().collect(); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java index cd94c87bb4b57..81aa1a60eb45e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java @@ -1212,12 +1212,7 @@ public void testRebalanceDoesNotCauseHotSpots() { new ConcurrentRebalanceAllocationDecider(clusterSettings), new ThrottlingAllocationDecider(clusterSettings) }; - var reconciler = new DesiredBalanceReconciler( - clusterSettings, - new DeterministicTaskQueue().getThreadPool(), - DesiredBalanceMetrics.NOOP, - EMPTY_NODE_ALLOCATION_STATS - ); + var reconciler = new DesiredBalanceReconciler(clusterSettings, new DeterministicTaskQueue().getThreadPool()); var totalOutgoingMoves = new HashMap(); for (int i = 0; i < numberOfNodes; i++) { @@ -1299,12 +1294,7 @@ public void testShouldLogOnTooManyUndesiredAllocations() { final var timeInMillisSupplier = new AtomicLong(); when(threadPool.relativeTimeInMillisSupplier()).thenReturn(timeInMillisSupplier::incrementAndGet); - var reconciler = new DesiredBalanceReconciler( - createBuiltInClusterSettings(), - threadPool, - DesiredBalanceMetrics.NOOP, - EMPTY_NODE_ALLOCATION_STATS - ); + var reconciler = new DesiredBalanceReconciler(createBuiltInClusterSettings(), threadPool); final long initialDelayInMillis = TimeValue.timeValueMinutes(5).getMillis(); timeInMillisSupplier.addAndGet(randomLongBetween(initialDelayInMillis, 2 * initialDelayInMillis)); @@ -1356,8 +1346,7 @@ public void testShouldLogOnTooManyUndesiredAllocations() { private static void reconcile(RoutingAllocation routingAllocation, DesiredBalance desiredBalance) { final var threadPool = mock(ThreadPool.class); when(threadPool.relativeTimeInMillisSupplier()).thenReturn(new AtomicLong()::incrementAndGet); - new DesiredBalanceReconciler(createBuiltInClusterSettings(), threadPool, DesiredBalanceMetrics.NOOP, EMPTY_NODE_ALLOCATION_STATS) - .reconcile(desiredBalance, routingAllocation); + new DesiredBalanceReconciler(createBuiltInClusterSettings(), threadPool).reconcile(desiredBalance, routingAllocation); } private static boolean isReconciled(RoutingNode node, DesiredBalance balance) { From e59e0f54531d18ef0a9baedae58b749c9736d74d Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Thu, 9 Jan 2025 12:04:53 -0500 Subject: [PATCH 2/5] Add documentation around desired balance --- .../routing/allocation/AllocationService.java | 9 ++--- .../allocation/allocator/DesiredBalance.java | 21 +++++++++++- .../allocator/DesiredBalanceComputer.java | 13 +++++--- .../allocator/DesiredBalanceReconciler.java | 8 +++++ .../DesiredBalanceShardsAllocator.java | 33 +++++++++++++++---- .../allocator/PendingListenersQueue.java | 4 +++ .../allocation/allocator/ShardAssignment.java | 14 +++++++- .../allocator/DesiredBalanceTests.java | 10 +++--- 8 files changed, 88 insertions(+), 24 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index 5d1e6741c5e22..2f2fd4ef453f6 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -158,9 +158,7 @@ public ClusterState applyStartedShards(ClusterState clusterState, List ass * The placeholder value for {@link DesiredBalance} when the node stands down as master. */ public static final DesiredBalance NOT_MASTER = new DesiredBalance(-2, Map.of()); + /** * The starting value for {@link DesiredBalance} when the node becomes the master. */ @@ -57,6 +65,10 @@ public static boolean hasChanges(DesiredBalance a, DesiredBalance b) { return Objects.equals(a.assignments, b.assignments) == false; } + /** + * Returns the sum of shard movements needed to reach the new desired balance. Doesn't count new shard copies as a move, nor removal or + * unassignment of a shard copy. + */ public static int shardMovements(DesiredBalance old, DesiredBalance updated) { var intersection = Sets.intersection(old.assignments().keySet(), updated.assignments().keySet()); int movements = 0; @@ -70,8 +82,15 @@ public static int shardMovements(DesiredBalance old, DesiredBalance updated) { return movements; } + /** + * Returns the number of shard movements needed to reach the new shard assignment. Doesn't count new shard copies as a move, nor removal + * or unassignment of a shard copy. + */ private static int shardMovements(ShardAssignment old, ShardAssignment updated) { - var movements = Math.min(0, old.assigned() - updated.assigned());// compensate newly started shards + // A shard move should retain the same number of assigned nodes, just swap out one node for another. We will compensate for newly + // started shards -- adding a shard copy is not a move -- by initializing the count with a negative value so that incrementing later + // for a new node zeros out. + var movements = Math.min(0, old.assigned() - updated.assigned()); for (String nodeId : updated.nodeIds()) { if (old.nodeIds().contains(nodeId) == false) { movements++; diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputer.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputer.java index 3b22221ea7db4..03630c284fa30 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputer.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputer.java @@ -415,11 +415,14 @@ boolean hasEnoughIterations(int currentIteration) { } private static Map collectShardAssignments(RoutingNodes routingNodes) { - final var entries = routingNodes.getAssignedShards().entrySet(); - assert entries.stream().flatMap(t -> t.getValue().stream()).allMatch(ShardRouting::started) : routingNodes; - final Map res = Maps.newHashMapWithExpectedSize(entries.size()); - for (var shardAndAssignments : entries) { - res.put(shardAndAssignments.getKey(), ShardAssignment.ofAssignedShards(shardAndAssignments.getValue())); + final var allAssignedShards = routingNodes.getAssignedShards().entrySet(); + assert allAssignedShards.stream().flatMap(t -> t.getValue().stream()).allMatch(ShardRouting::started) : routingNodes; + final Map res = Maps.newHashMapWithExpectedSize(allAssignedShards.size()); + for (var shardIdAndShardRoutings : allAssignedShards) { + res.put( + shardIdAndShardRoutings.getKey(), + ShardAssignment.createFromAssignedShardRoutingsList(shardIdAndShardRoutings.getValue()) + ); } return res; } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java index af56e72ac7cab..2f3089062036e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java @@ -49,6 +49,10 @@ public class DesiredBalanceReconciler { private static final Logger logger = LogManager.getLogger(DesiredBalanceReconciler.class); + /** + * The minimum interval that log messages will be written if the number of undesired shard allocations reaches the percentage of total + * shards set by {@link #UNDESIRED_ALLOCATIONS_LOG_THRESHOLD_SETTING}. + */ public static final Setting UNDESIRED_ALLOCATIONS_LOG_INTERVAL_SETTING = Setting.timeSetting( "cluster.routing.allocation.desired_balance.undesired_allocations.log_interval", TimeValue.timeValueHours(1), @@ -57,6 +61,10 @@ public class DesiredBalanceReconciler { Setting.Property.NodeScope ); + /** + * Warning log messages may be periodically written if the number of shards that are on undesired nodes reaches this percentage setting. + * Works together with {@link #UNDESIRED_ALLOCATIONS_LOG_INTERVAL_SETTING} to log on a periodic basis. + */ public static final Setting UNDESIRED_ALLOCATIONS_LOG_THRESHOLD_SETTING = Setting.doubleSetting( "cluster.routing.allocation.desired_balance.undesired_allocations.threshold", 0.1, diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java index 93c5de0220a39..1725b361baff9 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java @@ -18,6 +18,7 @@ import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.AllocationService.RerouteStrategy; import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsAndWeightsCalculator; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; @@ -58,11 +59,31 @@ public class DesiredBalanceShardsAllocator implements ShardsAllocator { private final ShardsAllocator delegateAllocator; private final ThreadPool threadPool; + /** + * This is a callback to run {@link AllocationService#executeWithRoutingAllocation(ClusterState, String, RerouteStrategy)}, which + * produces a new ClusterState with the changes made by {@link DesiredBalanceReconciler#reconcile}. The {@link RerouteStrategy} provided + * to the callback calls into {@link #desiredBalanceReconciler} for the changes. The {@link #masterServiceTaskQueue} will apply the + * cluster state update. + */ private final DesiredBalanceReconcilerAction reconciler; private final DesiredBalanceComputer desiredBalanceComputer; + /** + * Reconciliation ({@link DesiredBalanceReconciler#reconcile(DesiredBalance, RoutingAllocation)}) takes the {@link DesiredBalance} + * output of {@link DesiredBalanceComputer#compute} and identifies how shards need to be added, moved or removed to go from the current + * cluster shard allocation to the new desired allocation. + */ private final DesiredBalanceReconciler desiredBalanceReconciler; private final ContinuousComputation desiredBalanceComputation; - private final PendingListenersQueue queue; + /** + * Accepts listeners with an index value (see {#link #indexGenerator}) and run them whenever a DesiredBalance computation completes with + * an equal or greater index value. + */ + private final PendingListenersQueue pendingListenersQueue; + /** + * Each reroute request gets assigned a monotonically increasing sequence number. Many reroute requests may arrive before the balancer + * asynchronously runs a computation. The balancer will use the latest request and save this sequence number to track back to the + * request. + */ private final AtomicLong indexGenerator = new AtomicLong(-1); private final ConcurrentLinkedQueue> pendingDesiredBalanceMoves = new ConcurrentLinkedQueue<>(); private final MasterServiceTaskQueue masterServiceTaskQueue; @@ -198,7 +219,7 @@ public String toString() { return "DesiredBalanceShardsAllocator#allocate"; } }; - this.queue = new PendingListenersQueue(); + this.pendingListenersQueue = new PendingListenersQueue(); this.masterServiceTaskQueue = clusterService.createTaskQueue( "reconcile-desired-balance", Priority.URGENT, @@ -234,7 +255,7 @@ public void allocate(RoutingAllocation allocation, ActionListener listener var index = indexGenerator.incrementAndGet(); logger.debug("Executing allocate for [{}]", index); - queue.add(index, listener); + pendingListenersQueue.add(index, listener); // This can only run on master, so unset not-master if exists if (currentDesiredBalanceRef.compareAndSet(DesiredBalance.NOT_MASTER, DesiredBalance.BECOME_MASTER_INITIAL)) { logger.debug("initialized desired balance for becoming master"); @@ -407,7 +428,7 @@ public DesiredBalanceStats getStats() { private void onNoLongerMaster() { if (indexGenerator.getAndSet(-1) != -1) { currentDesiredBalanceRef.set(DesiredBalance.NOT_MASTER); - queue.completeAllAsNotMaster(); + pendingListenersQueue.completeAllAsNotMaster(); pendingDesiredBalanceMoves.clear(); desiredBalanceReconciler.clear(); desiredBalanceMetrics.zeroAllMetrics(); @@ -457,7 +478,7 @@ private ClusterState applyBalance( batchExecutionContext.initialState(), createReconcileAllocationAction(latest.getTask().desiredBalance) ); - latest.success(() -> queue.complete(latest.getTask().desiredBalance.lastConvergedIndex())); + latest.success(() -> pendingListenersQueue.complete(latest.getTask().desiredBalance.lastConvergedIndex())); return newState; } } @@ -476,7 +497,7 @@ private static void discardSupersededTasks( // only for tests - in production, this happens after reconciliation protected final void completeToLastConvergedIndex() { - queue.complete(currentDesiredBalanceRef.get().lastConvergedIndex()); + pendingListenersQueue.complete(currentDesiredBalanceRef.get().lastConvergedIndex()); } private void recordTime(CounterMetric metric, Runnable action) { diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/PendingListenersQueue.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/PendingListenersQueue.java index e1b58cf79ac09..5b14277f2c651 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/PendingListenersQueue.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/PendingListenersQueue.java @@ -24,6 +24,10 @@ import static org.elasticsearch.cluster.service.ClusterApplierService.CLUSTER_UPDATE_THREAD_NAME; import static org.elasticsearch.cluster.service.MasterService.MASTER_UPDATE_THREAD_NAME; +/** + * Registers listeners with an `index` number ({@link #add(long, ActionListener)}) and then completes them whenever the latest index number + * is greater or equal to a listener's index value ({@link #complete(long)}). + */ public class PendingListenersQueue { private static final Logger logger = LogManager.getLogger(PendingListenersQueue.class); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardAssignment.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardAssignment.java index 4fb9137cb4544..2bd1b9bb2bb64 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardAssignment.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardAssignment.java @@ -17,6 +17,14 @@ import static java.util.Collections.unmodifiableSet; +/** + * Simple shard assignment summary of shard copies for a particular index shard. + * + * @param nodeIds The node IDs of nodes holding a shard copy. + * @param total The total number of shard copies. + * @param unassigned The number of unassigned shard copies. + * @param ignored The number of ignored shard copies. + */ public record ShardAssignment(Set nodeIds, int total, int unassigned, int ignored) { public ShardAssignment { @@ -28,9 +36,13 @@ public int assigned() { return nodeIds.size(); } - public static ShardAssignment ofAssignedShards(List routings) { + /** + * Helper method to instantiate a new ShardAssignment from a given list of ShardRouting instances. Assumes all shards are assigned. + */ + public static ShardAssignment createFromAssignedShardRoutingsList(List routings) { var nodeIds = new LinkedHashSet(); for (ShardRouting routing : routings) { + assert routing.unassignedInfo() == null : "Expected assigned shard copies only, unassigned info: " + routing.unassignedInfo(); nodeIds.add(routing.currentNodeId()); } return new ShardAssignment(unmodifiableSet(nodeIds), routings.size(), 0, 0); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceTests.java index 760900817780a..2c15addfe217b 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceTests.java @@ -46,19 +46,19 @@ public void testShardMovements() { ); assertThat( - "1 shard movements when existing shard is moved and new shard copy is unassigned", + "1 shard movements when an existing shard copy is moved and new shard copy is unassigned", shardMovements(new ShardAssignment(Set.of("a", "b"), 2, 0, 0), new ShardAssignment(Set.of("a", "c"), 3, 1, 0)), equalTo(1) ); assertThat( - "1 shard movement", + "1 shard movement when an existing shard copy is moved", shardMovements(new ShardAssignment(Set.of("a", "b"), 2, 0, 0), new ShardAssignment(Set.of("a", "c"), 2, 0, 0)), equalTo(1) ); assertThat( - "2 shard movement", + "2 shard movements when both shard copies are move to new nodes", shardMovements(new ShardAssignment(Set.of("a", "b"), 2, 0, 0), new ShardAssignment(Set.of("c", "d"), 2, 0, 0)), equalTo(2) ); @@ -77,10 +77,10 @@ public void testShardMovements() { } private static int shardMovements(ShardAssignment old, ShardAssignment updated) { - return DesiredBalance.shardMovements(of(old), of(updated)); + return DesiredBalance.shardMovements(createDesiredBalanceWith(old), createDesiredBalanceWith(updated)); } - private static DesiredBalance of(ShardAssignment assignment) { + private static DesiredBalance createDesiredBalanceWith(ShardAssignment assignment) { return new DesiredBalance(1, Map.of(new ShardId("index", "_na_", 0), assignment)); } } From 4c29f978ebb5fac65423b552170411f6d1ae958f Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Fri, 10 Jan 2025 14:22:21 -0500 Subject: [PATCH 3/5] improvements per review --- .../routing/allocation/allocator/DesiredBalance.java | 2 +- .../allocation/allocator/DesiredBalanceReconciler.java | 9 ++++++++- .../allocator/DesiredBalanceShardsAllocator.java | 3 +-- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalance.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalance.java index e29c0d4aebe90..daa3666af4525 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalance.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalance.java @@ -20,7 +20,7 @@ * The desired balance of the cluster, indicating which nodes should hold a copy of each shard. * * @param lastConvergedIndex Identifies what input data the balancer computation round used to produce this {@link DesiredBalance}. See - * {@link DesiredBalanceInput#index()} for details. Each reroute request gets assigned a monotonically increasing + * {@link DesiredBalanceInput#index()} for details. Each reroute request gets assigned a strictly increasing * sequence number, and the balancer, which runs async to reroute, uses the latest request's data to compute the * desired balance. * @param assignments a set of the (persistent) node IDs to which each {@link ShardId} should be allocated diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java index 2f3089062036e..08a25a3d012a8 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java @@ -90,7 +90,14 @@ public DesiredBalanceReconciler(ClusterSettings clusterSettings, ThreadPool thre ); } - public ClusterAllocationStats reconcile(DesiredBalance desiredBalance, RoutingAllocation allocation) { + /** + * Applies a desired shard allocation to the routing table by initializing and relocating shards in the cluster state. + * + * @param desiredBalance The new desired cluster shard allocation + * @param allocation Cluster state information with which to make decisions, contains routing table metadata that will be modified to + * reach the given desired balance. + */ + public void reconcile(DesiredBalance desiredBalance, RoutingAllocation allocation) { var nodeIds = allocation.routingNodes().getAllNodeIds(); allocationOrdering.retainNodes(nodeIds); moveOrdering.retainNodes(nodeIds); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java index 1725b361baff9..fdd78f481e28c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java @@ -75,8 +75,7 @@ public class DesiredBalanceShardsAllocator implements ShardsAllocator { private final DesiredBalanceReconciler desiredBalanceReconciler; private final ContinuousComputation desiredBalanceComputation; /** - * Accepts listeners with an index value (see {#link #indexGenerator}) and run them whenever a DesiredBalance computation completes with - * an equal or greater index value. + * Saves and runs listeners after DesiredBalance computations complete. */ private final PendingListenersQueue pendingListenersQueue; /** From b12dad78bcf7e85a8985f4c9d68ef4b46d872971 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Thu, 9 Jan 2025 22:45:07 -0500 Subject: [PATCH 4/5] Create balancer summary scaffolding Lays out the balancer summary information that we want to collect and eventually report. Only the class scaffolding, still needs implementation. --- ...llocationBalancingRoundSummaryService.java | 55 +++++++ .../allocator/BalancingSummary.java | 149 ++++++++++++++++++ .../DesiredBalanceShardsAllocator.java | 5 + 3 files changed, 209 insertions(+) create mode 100644 server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/AllocationBalancingRoundSummaryService.java create mode 100644 server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingSummary.java diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/AllocationBalancingRoundSummaryService.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/AllocationBalancingRoundSummaryService.java new file mode 100644 index 0000000000000..21830798a119e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/AllocationBalancingRoundSummaryService.java @@ -0,0 +1,55 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing.allocation.allocator; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * Manages the lifecycle of {@link BalancingSummary} data structures tracking allocation balancing round results. There are many balancing + * rounds and this class manages their reporting. + * + * Summarizing balancer rounds and reporting the results will provide information with which to do a cost-benefit analysis of the work that + * the allocation rebalancing performs. + */ +public class AllocationBalancingRoundSummaryService { + + /** Value to return if no balancing rounds have occurred in the requested time period. */ + private final BalancingSummary.CombinedClusterBalancingRoundSummary EMPTY_RESULTS = + new BalancingSummary.CombinedClusterBalancingRoundSummary( + 0, + 0, + new LinkedList<>(), + new BalancingSummary.ClusterShardMovements(0, 0, 0, 0, 0), + new HashMap<>() + ); + + /** + * A concurrency-safe list of balancing round summaries. Balancer rounds are run and added here serially, so the queue will naturally + * progress from newer to older results. + */ + private ConcurrentLinkedQueue summaries = new ConcurrentLinkedQueue<>(); + + /** + * Returns a combined summary of all unreported allocation round summaries: may summarize a single balancer round, multiple, or none. + * + * @return returns {@link #EMPTY_RESULTS} if there are no unreported balancing rounds. + */ + public BalancingSummary.CombinedClusterBalancingRoundSummary combineSummaries() { + // TODO: implement + return EMPTY_RESULTS; + } + + public void addBalancerRoundSummary(BalancingSummary.BalancingRoundSummary summary) { + summaries.add(summary); + } + +} diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingSummary.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingSummary.java new file mode 100644 index 0000000000000..e72d6eaf8f7fa --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingSummary.java @@ -0,0 +1,149 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing.allocation.allocator; + +import java.util.List; +import java.util.Map; + +/** + * Data structures defining the results of allocation balancing rounds. + */ +public class BalancingSummary { + + /** + * Holds combined {@link BalancingRoundSummary} results. Essentially holds a list of the balancing events and the summed up changes + * across all those events: what allocation work was done across some period of time. + * + * @param eventsStartTime The earliest start time of all the combined balancing rounds. + * @param eventsEndTime The latest end time of all the combined balancing rounds. + * @param events A list of all the cluster events that started the balancing rounds. + * @param shardMovements The sum of all shard movements across all combined balancing rounds. + * @param nodeChanges The total change stats per node in the cluster from the earliest balancing round to the latest one. + */ + record CombinedClusterBalancingRoundSummary( + long eventsStartTime, + long eventsEndTime, + List events, + ClusterShardMovements shardMovements, + Map nodeChanges + ) {}; + + /** + * Summarizes the impact to the cluster as a result of a rebalancing round. + * + * @param eventStartTime Time at which the desired balance calculation began due to a cluster event. + * @param computationEndTime Time at which the new desired balance calculation was finished. + * @param event Reports what provoked the rebalancing round. The rebalancer only runs when requested, not on a periodic basis. + * @param computationFinishReason Whether the balancing round converged to a final allocation, or exiting early for some reason. + * @param shardMovements Lists the total number of shard moves, and breaks down the total into number shards moved by category, + * like node shutdown + * @param nodeChanges A Map of node name to {@link IndividualNodeRebalancingChangeStats} to describe what each node gained and how much + * work each node performed for the balancing round. + */ + record BalancingRoundSummary( + long eventStartTime, + long computationEndTime, + ClusterRebalancingEvent event, + DesiredBalance.ComputationFinishReason computationFinishReason, + ClusterShardMovements shardMovements, + Map nodeChanges + ) { + @Override + public String toString() { + return "BalancingRoundSummary{" + + "ClusterRebalancingEvent=" + + event + + ", ClusterShardMovements=" + + shardMovements + + ", NodeChangeStats={" + + nodeChanges + + "}" + + '}'; + } + }; + + /** + * General cost-benefit information on the node-level. Describes how each node was improved by a balancing round, and how much work that + * node did to achieve the shard rebalancing. + * + * @param nodeWeightBeforeRebalancing + * @param nodeWeightAfterRebalancing + * @param dataMovedToNodeInMB + * @param dataMovedAwayFromNodeInMB + */ + record IndividualNodeRebalancingChangeStats( + long nodeWeightBeforeRebalancing, + long nodeWeightAfterRebalancing, + long dataMovedToNodeInMB, + long dataMovedAwayFromNodeInMB + ) { + @Override + public String toString() { + return "IndividualNodeRebalancingChangeStats{" + + "nodeWeightBeforeRebalancing=" + + nodeWeightBeforeRebalancing + + ", nodeWeightAfterRebalancing=" + + nodeWeightAfterRebalancing + + ", dataMovedToNodeInMB=" + + dataMovedToNodeInMB + + ", dataMovedAwayFromNodeInMB=" + + dataMovedAwayFromNodeInMB + + '}'; + } + }; + + /** + * Tracks and summarizes the more granular reasons why shards are moved between nodes. + * + * @param numShardMoves total number of shard moves + * @param numAllocationDeciderForcedShardMoves total number of shards that must be moved because they violate an AllocationDecider rule + * @param numRebalancingShardMoves total number of shards moved to improve cluster balance and are not otherwise required to move + * @param numShutdownForcedShardMoves total number of shards that must move off of a node because it is shutting down + * @param numStuckShards total number of shards violating an AllocationDecider on their current node and on every other cluster node + */ + record ClusterShardMovements( + long numShardMoves, + long numAllocationDeciderForcedShardMoves, + long numRebalancingShardMoves, + long numShutdownForcedShardMoves, + long numStuckShards + ) { + @Override + public String toString() { + return "ClusterShardMovements{" + + "numShardMoves=" + + numShardMoves + + ", numAllocationDeciderForcedShardMoves=" + + numAllocationDeciderForcedShardMoves + + ", numRebalancingShardMoves=" + + numRebalancingShardMoves + + ", numShutdownForcedShardMoves=" + + numShutdownForcedShardMoves + + ", numStuckShards=" + + numStuckShards + + '}'; + } + }; + + /** + * The cluster event that initiated a rebalancing round. This will tell us what initiated the balancer doing some amount of rebalancing + * work. + */ + enum ClusterRebalancingEvent { + // TODO (Dianna): go through the reroute methods and identify the causes -- many reroute methods accept a 'reason' string -- and + // replace them with this enum to be saved later in a balancing summary. + RerouteCommand, + IndexCreation, + IndexDeletion, + NodeShutdownAndRemoval, + NewNodeAdded + } + +} diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java index fdd78f481e28c..25de51f2365d9 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java @@ -91,6 +91,10 @@ public class DesiredBalanceShardsAllocator implements ShardsAllocator { private final Set processedNodeShutdowns = new HashSet<>(); private final NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator; private final DesiredBalanceMetrics desiredBalanceMetrics; + /** + * Manages balancer round results in order to report metrics on the balancer activity in a configurable manner. + */ + private final AllocationBalancingRoundSummaryService balancerRoundSummaryService; // stats protected final CounterMetric computationsSubmitted = new CounterMetric(); @@ -135,6 +139,7 @@ public DesiredBalanceShardsAllocator( NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator ) { this.desiredBalanceMetrics = new DesiredBalanceMetrics(telemetryProvider.getMeterRegistry()); + this.balancerRoundSummaryService = new AllocationBalancingRoundSummaryService(); this.nodeAllocationStatsAndWeightsCalculator = nodeAllocationStatsAndWeightsCalculator; this.delegateAllocator = delegateAllocator; this.threadPool = threadPool; From 8faa756f097f9b207b8d6016d30cdc71c7d9b7bf Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Mon, 13 Jan 2025 10:55:32 -0500 Subject: [PATCH 5/5] wip --- ...llocationBalancingRoundSummaryService.java | 11 +- .../allocator/BalancingRoundStats.java | 95 ++++++++++++++++ .../allocator/BalancingSummary.java | 37 +++--- .../allocator/ClusterAllocationStats.java | 17 --- .../allocator/DesiredBalanceInput.java | 10 +- .../allocator/DesiredBalanceMetrics.java | 12 +- .../allocator/DesiredBalanceReconciler.java | 45 ++++---- .../DesiredBalanceShardsAllocator.java | 105 ++++++++++++++---- .../DesiredBalanceComputerTests.java | 20 +++- .../allocator/DesiredBalanceMetricsTests.java | 38 ++++++- .../DesiredBalanceReconcilerTests.java | 65 +++++++---- .../DesiredBalanceShardsAllocatorTests.java | 8 +- .../cluster/ESAllocationTestCase.java | 11 +- 13 files changed, 349 insertions(+), 125 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingRoundStats.java delete mode 100644 server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationStats.java diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/AllocationBalancingRoundSummaryService.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/AllocationBalancingRoundSummaryService.java index 21830798a119e..41f53edd64a62 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/AllocationBalancingRoundSummaryService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/AllocationBalancingRoundSummaryService.java @@ -19,16 +19,19 @@ * * Summarizing balancer rounds and reporting the results will provide information with which to do a cost-benefit analysis of the work that * the allocation rebalancing performs. + * + * TODO (Dianna): how to handle master step down. Probably refuse to take further add*() calls, but return any previous results that have + * not yet been drained for reporting */ public class AllocationBalancingRoundSummaryService { + public static final AllocationBalancingRoundSummaryService NOOP = new AllocationBalancingRoundSummaryService(); + /** Value to return if no balancing rounds have occurred in the requested time period. */ - private final BalancingSummary.CombinedClusterBalancingRoundSummary EMPTY_RESULTS = + public static final BalancingSummary.CombinedClusterBalancingRoundSummary EMPTY_RESULTS = new BalancingSummary.CombinedClusterBalancingRoundSummary( - 0, - 0, new LinkedList<>(), - new BalancingSummary.ClusterShardMovements(0, 0, 0, 0, 0), + new BalancingSummary.ClusterShardAssignments(0L, 0L, 0L, 0L, 0L, 0L), new HashMap<>() ); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingRoundStats.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingRoundStats.java new file mode 100644 index 0000000000000..6c6abda9099ee --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingRoundStats.java @@ -0,0 +1,95 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing.allocation.allocator; + +/** + * Data structure to pass allocation statistics between the desired balance classes. + */ +public record BalancingRoundStats( + long startTime, + long endTime, + long newlyAssignedShards, + long unassignedShards, + long totalAllocations, + long undesiredAllocationsExcludingShuttingDownNodes, + boolean executedReconciliation +) { + + public static final BalancingRoundStats EMPTY_BALANCING_ROUND_STATS = new BalancingRoundStats(-1, -1, -1, -1, -1, -1, false); + + public static class Builder { + private long startTime = 0; + private long endTime = 0; + private long newlyAssignedShards = 0; + private long unassignedShards = 0; + long totalAllocations = 0; + long undesiredAllocationsExcludingShuttingDownNodes = 0; + boolean executedReconciliation = false; + + public BalancingRoundStats build() { + return new BalancingRoundStats( + startTime, + endTime, + newlyAssignedShards, + unassignedShards, + totalAllocations, + undesiredAllocationsExcludingShuttingDownNodes, + + executedReconciliation + ); + } + + public void setStartTime(long startTime) { + this.startTime = startTime; + } + + public void setEndTime(long endTime) { + this.endTime = endTime; + } + + public void incNewlyAssignedShards() { + ++this.newlyAssignedShards; + } + + public void setUnassignedShards(long numUnassignedShards) { + this.unassignedShards = numUnassignedShards; + } + + public void incTotalAllocations() { + ++this.totalAllocations; + } + + public void incUndesiredAllocationsExcludingShuttingDownNodes() { + ++this.undesiredAllocationsExcludingShuttingDownNodes; + } + + public long getStartTime() { + return this.startTime; + } + + public long getEndTime() { + return this.endTime; + } + + public long getTotalAllocations() { + return this.totalAllocations; + } + + public long getUndesiredAllocationsExcludingShuttingDownNodes() { + return this.undesiredAllocationsExcludingShuttingDownNodes; + } + + public void setExecutedReconciliation() { + this.executedReconciliation = executedReconciliation; + } + + }; + +}; diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingSummary.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingSummary.java index e72d6eaf8f7fa..e5d6c7ae24590 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingSummary.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingSummary.java @@ -9,6 +9,8 @@ package org.elasticsearch.cluster.routing.allocation.allocator; +import org.elasticsearch.core.Tuple; + import java.util.List; import java.util.Map; @@ -21,17 +23,14 @@ public class BalancingSummary { * Holds combined {@link BalancingRoundSummary} results. Essentially holds a list of the balancing events and the summed up changes * across all those events: what allocation work was done across some period of time. * - * @param eventsStartTime The earliest start time of all the combined balancing rounds. - * @param eventsEndTime The latest end time of all the combined balancing rounds. - * @param events A list of all the cluster events that started the balancing rounds. - * @param shardMovements The sum of all shard movements across all combined balancing rounds. + * @param events A list of all the cluster events that started the balancing rounds and time duration for computation + reconciliation + * of each event. + * @param shardAssignments The sum of all shard movements across all combined balancing rounds. * @param nodeChanges The total change stats per node in the cluster from the earliest balancing round to the latest one. */ record CombinedClusterBalancingRoundSummary( - long eventsStartTime, - long eventsEndTime, - List events, - ClusterShardMovements shardMovements, + List> events, + ClusterShardAssignments shardAssignments, Map nodeChanges ) {}; @@ -39,7 +38,7 @@ record CombinedClusterBalancingRoundSummary( * Summarizes the impact to the cluster as a result of a rebalancing round. * * @param eventStartTime Time at which the desired balance calculation began due to a cluster event. - * @param computationEndTime Time at which the new desired balance calculation was finished. + * @param eventEndTime Time at which the new desired balance calculation was finished. * @param event Reports what provoked the rebalancing round. The rebalancer only runs when requested, not on a periodic basis. * @param computationFinishReason Whether the balancing round converged to a final allocation, or exiting early for some reason. * @param shardMovements Lists the total number of shard moves, and breaks down the total into number shards moved by category, @@ -49,10 +48,10 @@ record CombinedClusterBalancingRoundSummary( */ record BalancingRoundSummary( long eventStartTime, - long computationEndTime, + long eventEndTime, ClusterRebalancingEvent event, DesiredBalance.ComputationFinishReason computationFinishReason, - ClusterShardMovements shardMovements, + ClusterShardAssignments shardMovements, Map nodeChanges ) { @Override @@ -79,8 +78,8 @@ public String toString() { * @param dataMovedAwayFromNodeInMB */ record IndividualNodeRebalancingChangeStats( - long nodeWeightBeforeRebalancing, - long nodeWeightAfterRebalancing, + float nodeWeightBeforeRebalancing, + float nodeWeightAfterRebalancing, long dataMovedToNodeInMB, long dataMovedAwayFromNodeInMB ) { @@ -102,24 +101,26 @@ public String toString() { /** * Tracks and summarizes the more granular reasons why shards are moved between nodes. * - * @param numShardMoves total number of shard moves + * @param numShardsMoved total number of shard moves between nodes * @param numAllocationDeciderForcedShardMoves total number of shards that must be moved because they violate an AllocationDecider rule * @param numRebalancingShardMoves total number of shards moved to improve cluster balance and are not otherwise required to move * @param numShutdownForcedShardMoves total number of shards that must move off of a node because it is shutting down + * @param numNewlyAssignedShardsNotMoved * @param numStuckShards total number of shards violating an AllocationDecider on their current node and on every other cluster node */ - record ClusterShardMovements( - long numShardMoves, + public record ClusterShardAssignments( + long numShardsMoved, long numAllocationDeciderForcedShardMoves, long numRebalancingShardMoves, long numShutdownForcedShardMoves, + long numNewlyAssignedShardsNotMoved, long numStuckShards ) { @Override public String toString() { return "ClusterShardMovements{" - + "numShardMoves=" - + numShardMoves + + "numShardsMoved=" + + numShardsMoved + ", numAllocationDeciderForcedShardMoves=" + numAllocationDeciderForcedShardMoves + ", numRebalancingShardMoves=" diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationStats.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationStats.java deleted file mode 100644 index 9c1ccf23b440d..0000000000000 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationStats.java +++ /dev/null @@ -1,17 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". - */ - -package org.elasticsearch.cluster.routing.allocation.allocator; - -/** - * Data structure to pass allocation statistics between the desired balance classes. - */ -public record ClusterAllocationStats(long unassignedShards, long totalAllocations, long undesiredAllocationsExcludingShuttingDownNodes) { - public static final ClusterAllocationStats EMPTY_ALLOCATION_STATS = new ClusterAllocationStats(-1, -1, -1); -}; diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceInput.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceInput.java index 3b3d893bc6848..3089fa2f9d602 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceInput.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceInput.java @@ -23,13 +23,19 @@ * @param routingAllocation a copy of (the immutable parts of) the context for the allocation decision process * @param ignoredShards a list of the shards for which earlier allocators have claimed responsibility */ -public record DesiredBalanceInput(long index, RoutingAllocation routingAllocation, List ignoredShards) { +public record DesiredBalanceInput( + long index, + RoutingAllocation routingAllocation, + List ignoredShards, + BalancingRoundStats.Builder clusterAllocationStatsBuilder +) { public static DesiredBalanceInput create(long index, RoutingAllocation routingAllocation) { return new DesiredBalanceInput( index, routingAllocation.immutableClone(), - List.copyOf(routingAllocation.routingNodes().unassigned().ignored()) + List.copyOf(routingAllocation.routingNodes().unassigned().ignored()), + new BalancingRoundStats.Builder() ); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java index fd656b24f7e23..57d3b845a7226 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java @@ -74,16 +74,16 @@ public record NodeWeightStats(long shardCount, double diskUsageInBytes, double w ); public void updateMetrics( - ClusterAllocationStats clusterAllocationStats, + BalancingRoundStats balancingRoundStats, Map weightStatsPerNode, Map nodeAllocationStats ) { - assert clusterAllocationStats != null : "allocation stats cannot be null"; + assert balancingRoundStats != null : "allocation stats cannot be null"; assert weightStatsPerNode != null : "node balance weight stats cannot be null"; - if (clusterAllocationStats != ClusterAllocationStats.EMPTY_ALLOCATION_STATS) { - this.unassignedShards = clusterAllocationStats.unassignedShards(); - this.totalAllocations = clusterAllocationStats.totalAllocations(); - this.undesiredAllocations = clusterAllocationStats.undesiredAllocationsExcludingShuttingDownNodes(); + if (balancingRoundStats.executedReconciliation()) { + this.unassignedShards = balancingRoundStats.unassignedShards(); + this.totalAllocations = balancingRoundStats.totalAllocations(); + this.undesiredAllocations = balancingRoundStats.undesiredAllocationsExcludingShuttingDownNodes(); } weightStatsPerNodeRef.set(weightStatsPerNode); allocationStatsPerNodeRef.set(nodeAllocationStats); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java index 08a25a3d012a8..bc10c85b6d8ca 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java @@ -97,11 +97,11 @@ public DesiredBalanceReconciler(ClusterSettings clusterSettings, ThreadPool thre * @param allocation Cluster state information with which to make decisions, contains routing table metadata that will be modified to * reach the given desired balance. */ - public void reconcile(DesiredBalance desiredBalance, RoutingAllocation allocation) { + public void reconcile(DesiredBalance desiredBalance, RoutingAllocation allocation, BalancingRoundStats.Builder statsBuilder) { var nodeIds = allocation.routingNodes().getAllNodeIds(); allocationOrdering.retainNodes(nodeIds); moveOrdering.retainNodes(nodeIds); - return new Reconciliation(desiredBalance, allocation).run(); + new Reconciliation(desiredBalance, allocation).run(statsBuilder); } public void clear() { @@ -121,7 +121,7 @@ private class Reconciliation { this.routingNodes = allocation.routingNodes(); } - ClusterAllocationStats run() { + void run(BalancingRoundStats.Builder statsBuilder) { try (var ignored = allocation.withReconcilingFlag()) { logger.debug("Reconciling desired balance for [{}]", desiredBalance.lastConvergedIndex()); @@ -130,31 +130,29 @@ ClusterAllocationStats run() { // no data nodes, so fail allocation to report red health failAllocationOfNewPrimaries(allocation); logger.trace("no nodes available, nothing to reconcile"); - return ClusterAllocationStats.EMPTY_ALLOCATION_STATS; } if (desiredBalance.assignments().isEmpty()) { // no desired state yet but it is on its way and we'll reroute again when it is ready logger.trace("desired balance is empty, nothing to reconcile"); - return ClusterAllocationStats.EMPTY_ALLOCATION_STATS; } + statsBuilder.setExecutedReconciliation(); // compute next moves towards current desired balance: // 1. allocate unassigned shards first logger.trace("Reconciler#allocateUnassigned"); - allocateUnassigned(); + allocateUnassigned(statsBuilder); assert allocateUnassignedInvariant(); // 2. move any shards that cannot remain where they are logger.trace("Reconciler#moveShards"); - moveShards(); + moveShards(statsBuilder); // 3. move any other shards that are desired elsewhere logger.trace("Reconciler#balance"); - var allocationStats = balance(); + balance(statsBuilder); logger.debug("Reconciliation is complete"); - return allocationStats; } } @@ -219,7 +217,7 @@ private void failAllocationOfNewPrimaries(RoutingAllocation allocation) { } } - private void allocateUnassigned() { + private void allocateUnassigned(BalancingRoundStats.Builder statsBuilder) { RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned(); if (logger.isTraceEnabled()) { logger.trace("Start allocating unassigned shards: {}", routingNodes.toString()); @@ -293,6 +291,7 @@ private void allocateUnassigned() { logger.debug("Assigning shard [{}] to {} [{}]", shard, nodeIdsIterator.source, nodeId); long shardSize = getExpectedShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE, allocation); routingNodes.initializeShard(shard, nodeId, null, shardSize, allocation.changes()); + statsBuilder.incNewlyAssignedShards(); allocationOrdering.recordAllocation(nodeId); if (shard.primary() == false) { // copy over the same replica shards to the secondary array so they will get allocated @@ -427,7 +426,7 @@ private boolean isIgnored(RoutingNodes routingNodes, ShardRouting shard, ShardAs return assignment.total() - assignment.ignored() <= assigned; } - private void moveShards() { + private void moveShards(BalancingRoundStats.Builder statsBuilder) { // Iterate over all started shards and check if they can remain. In the presence of throttling shard movements, // the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are offloading the shards. for (final var iterator = OrderedShardsIterator.createForNecessaryMoves(allocation, moveOrdering); iterator.hasNext();) { @@ -450,6 +449,8 @@ private void moveShards() { } if (allocation.deciders().canAllocate(shardRouting, allocation).type() != Decision.Type.YES) { + // TODO (Dianna): are these 'stuck' shards that cannot be allocated anyplace due to restrictions? + // cannot allocate anywhere, no point in looking for a target node continue; } @@ -478,14 +479,12 @@ private void moveShards() { } } - private ClusterAllocationStats balance() { + private void balance(BalancingRoundStats.Builder statsBuilder) { if (allocation.deciders().canRebalance(allocation).type() != Decision.Type.YES) { - return ClusterAllocationStats.EMPTY_ALLOCATION_STATS; + return; } - int unassignedShards = routingNodes.unassigned().size() + routingNodes.unassigned().ignored().size(); - int totalAllocations = 0; - int undesiredAllocationsExcludingShuttingDownNodes = 0; + statsBuilder.setUnassignedShards(routingNodes.unassigned().size() + routingNodes.unassigned().ignored().size()); // Iterate over all started shards and try to move any which are on undesired nodes. In the presence of throttling shard // movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are offloading the @@ -493,7 +492,7 @@ private ClusterAllocationStats balance() { for (final var iterator = OrderedShardsIterator.createForBalancing(allocation, moveOrdering); iterator.hasNext();) { final var shardRouting = iterator.next(); - totalAllocations++; + statsBuilder.incTotalAllocations(); if (shardRouting.started() == false) { // can only rebalance started shards @@ -512,7 +511,7 @@ private ClusterAllocationStats balance() { } if (allocation.metadata().nodeShutdowns().contains(shardRouting.currentNodeId()) == false) { - undesiredAllocationsExcludingShuttingDownNodes++; + statsBuilder.incUndesiredAllocationsExcludingShuttingDownNodes(); } if (allocation.deciders().canRebalance(shardRouting, allocation).type() != Decision.Type.YES) { @@ -546,12 +545,14 @@ private ClusterAllocationStats balance() { } } - maybeLogUndesiredAllocationsWarning(totalAllocations, undesiredAllocationsExcludingShuttingDownNodes, routingNodes.size()); - - return new ClusterAllocationStats(unassignedShards, totalAllocations, undesiredAllocationsExcludingShuttingDownNodes); + maybeLogUndesiredAllocationsWarning( + statsBuilder.getTotalAllocations(), + statsBuilder.getUndesiredAllocationsExcludingShuttingDownNodes(), + routingNodes.size() + ); } - private void maybeLogUndesiredAllocationsWarning(int totalAllocations, int undesiredAllocations, int nodeCount) { + private void maybeLogUndesiredAllocationsWarning(long totalAllocations, long undesiredAllocations, int nodeCount) { // more shards than cluster can relocate with one reroute final boolean nonEmptyRelocationBacklog = undesiredAllocations > 2L * nodeCount; final boolean warningThresholdReached = undesiredAllocations > undesiredAllocationsLogThreshold * totalAllocations; diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java index 25de51f2365d9..fc87b1d10adf6 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java @@ -68,9 +68,9 @@ public class DesiredBalanceShardsAllocator implements ShardsAllocator { private final DesiredBalanceReconcilerAction reconciler; private final DesiredBalanceComputer desiredBalanceComputer; /** - * Reconciliation ({@link DesiredBalanceReconciler#reconcile(DesiredBalance, RoutingAllocation)}) takes the {@link DesiredBalance} - * output of {@link DesiredBalanceComputer#compute} and identifies how shards need to be added, moved or removed to go from the current - * cluster shard allocation to the new desired allocation. + * Reconciliation ({@link DesiredBalanceReconciler#reconcile(DesiredBalance, RoutingAllocation, BalancingRoundStats.Builder)}) takes + * the {@link DesiredBalance} output of {@link DesiredBalanceComputer#compute} and identifies how shards need to be added, moved or + * removed to go from the current cluster shard allocation to the new desired allocation. */ private final DesiredBalanceReconciler desiredBalanceReconciler; private final ContinuousComputation desiredBalanceComputation; @@ -139,7 +139,7 @@ public DesiredBalanceShardsAllocator( NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator ) { this.desiredBalanceMetrics = new DesiredBalanceMetrics(telemetryProvider.getMeterRegistry()); - this.balancerRoundSummaryService = new AllocationBalancingRoundSummaryService(); + this.balancerRoundSummaryService = new AllocationBalancingRoundSummaryService(); this.nodeAllocationStatsAndWeightsCalculator = nodeAllocationStatsAndWeightsCalculator; this.delegateAllocator = delegateAllocator; this.threadPool = threadPool; @@ -161,6 +161,9 @@ protected void processInput(DesiredBalanceInput desiredBalanceInput) { return; } + BalancingRoundStats.Builder statsBuilder = new BalancingRoundStats.Builder(); + statsBuilder.setStartTime(threadPool.relativeTimeInMillis()); + recordTime( cumulativeComputationTime, // We set currentDesiredBalance back to INITIAL when the node stands down as master in onNoLongerMaster. @@ -193,13 +196,13 @@ protected void processInput(DesiredBalanceInput desiredBalanceInput) { "Desired balance computation for [{}] terminated early with partial result, scheduling reconciliation", index ); - submitReconcileTask(currentDesiredBalance); + submitReconcileTask(currentDesiredBalance, statsBuilder); var newInput = DesiredBalanceInput.create(indexGenerator.incrementAndGet(), desiredBalanceInput.routingAllocation()); desiredBalanceComputation.compareAndEnqueue(desiredBalanceInput, newInput); } else if (isFresh(desiredBalanceInput)) { logger.debug("Desired balance computation for [{}] is completed, scheduling reconciliation", index); computationsConverged.inc(); - submitReconcileTask(currentDesiredBalance); + submitReconcileTask(currentDesiredBalance, statsBuilder); } else { logger.debug("Desired balance computation for [{}] is discarded as newer one is submitted", index); } @@ -264,7 +267,8 @@ public void allocate(RoutingAllocation allocation, ActionListener listener if (currentDesiredBalanceRef.compareAndSet(DesiredBalance.NOT_MASTER, DesiredBalance.BECOME_MASTER_INITIAL)) { logger.debug("initialized desired balance for becoming master"); } - desiredBalanceComputation.onNewInput(DesiredBalanceInput.create(index, allocation)); + var desiredBalanceInput = DesiredBalanceInput.create(index, allocation); + desiredBalanceComputation.onNewInput(desiredBalanceInput); if (allocation.routingTable().indicesRouting().isEmpty()) { logger.debug("No eager reconciliation needed for empty routing table"); @@ -273,7 +277,7 @@ public void allocate(RoutingAllocation allocation, ActionListener listener // Starts reconciliation towards desired balance that might have not been updated with a recent calculation yet. // This is fine as balance should have incremental rather than radical changes. // This should speed up achieving the desired balance in cases current state is still different from it (due to THROTTLING). - reconcile(currentDesiredBalanceRef.get(), allocation); + reconcile(currentDesiredBalanceRef.get(), allocation, desiredBalanceInput.clusterAllocationStatsBuilder()); } private void processNodeShutdowns(ClusterState clusterState) { @@ -338,19 +342,78 @@ private void setCurrentDesiredBalance(DesiredBalance newDesiredBalance) { } } - protected void submitReconcileTask(DesiredBalance desiredBalance) { - masterServiceTaskQueue.submitTask("reconcile-desired-balance", new ReconcileDesiredBalanceTask(desiredBalance), null); + /** + * Submits the desired balance to be reconciled (applies the desired changes to the routing table) and creates and publishes a new + * cluster state. The data nodes will receive and apply the new cluster state to start/move/remove shards. + */ + protected void submitReconcileTask(DesiredBalance desiredBalance, BalancingRoundStats.Builder statsBuilder) { + masterServiceTaskQueue.submitTask("reconcile-desired-balance", new ReconcileDesiredBalanceTask(desiredBalance, statsBuilder), null); } - protected void reconcile(DesiredBalance desiredBalance, RoutingAllocation allocation) { + protected void reconcile(DesiredBalance desiredBalance, RoutingAllocation allocation, BalancingRoundStats.Builder statsBuilder) { if (logger.isTraceEnabled()) { logger.trace("Reconciling desired balance: {}", desiredBalance); } else { logger.debug("Reconciling desired balance for [{}]", desiredBalance.lastConvergedIndex()); } recordTime(cumulativeReconciliationTime, () -> { - ClusterAllocationStats clusterAllocationStats = desiredBalanceReconciler.reconcile(desiredBalance, allocation); - updateDesireBalanceMetrics(desiredBalance, allocation, clusterAllocationStats); + var nodesStatsAndWeightsBeforeReconciliationChanges = nodeAllocationStatsAndWeightsCalculator.nodesAllocationStatsAndWeights( + allocation.metadata(), + allocation.routingNodes(), + allocation.clusterInfo(), + desiredBalance + ); + + desiredBalanceReconciler.reconcile(desiredBalance, allocation, statsBuilder); + + // TODO (Dianna): dedupe changes and filtering with updateDesireBalanceMetrics work + var nodesStatsAndWeightsAfterReconciliationChanges = nodeAllocationStatsAndWeightsCalculator.nodesAllocationStatsAndWeights( + allocation.metadata(), + allocation.routingNodes(), + allocation.clusterInfo(), + desiredBalance + ); + Map nodeChanges = new HashMap<>(); + for (var nodeStatsAndWeightBefore : nodesStatsAndWeightsBeforeReconciliationChanges.entrySet()) { + var node = allocation.nodes().get(nodeStatsAndWeightBefore.getKey()); + var nodeStatsAndWeightAfter = nodesStatsAndWeightsAfterReconciliationChanges.get(nodeStatsAndWeightBefore.getKey()); + // TODO (Dianna): is this node!=null check even necessary? + if (node != null && nodeStatsAndWeightAfter != null) { + var nodeChange = new BalancingSummary.IndividualNodeRebalancingChangeStats( + nodeStatsAndWeightBefore.getValue().currentNodeWeight(), + nodeStatsAndWeightAfter.currentNodeWeight(), + // TODO (Dianna): dataMovedTo/AwayFromNodeInMB + 0, + 0 + ); + nodeChanges.put(nodeStatsAndWeightBefore.getKey(), nodeChange); + } + } + + statsBuilder.setEndTime(threadPool.relativeTimeInMillis()); + balancerRoundSummaryService.addBalancerRoundSummary( + new BalancingSummary.BalancingRoundSummary( + statsBuilder.getStartTime(), + statsBuilder.getEndTime(), + // TODO: non dummy value + BalancingSummary.ClusterRebalancingEvent.RerouteCommand, + desiredBalance.finishReason(), + new BalancingSummary.ClusterShardAssignments( + 1, //// TODO numShardsMoved + 1, //// TODO numAllocationDeciderForcedShardMoves + 1, //// TODO numRebalancingShardMoves + 1, //// TODO numShutdownForcedShardMoves + 1, //// TODO numNewlyAssignedShardsNotMoved + //// TODO numStuckShards - the current val is wrong. + statsBuilder.getUndesiredAllocationsExcludingShuttingDownNodes() + ), + nodeChanges + ) + ); + + // TODO (Dianna): revisit how I want to use ClusterAllocationStats.Builder - I don't really want to call build, but I do want to + // hang things off of it to grab later + updateDesireBalanceMetrics(desiredBalance, allocation, statsBuilder.build()); }); if (logger.isTraceEnabled()) { @@ -360,7 +423,7 @@ protected void reconcile(DesiredBalance desiredBalance, RoutingAllocation alloca } } - private RerouteStrategy createReconcileAllocationAction(DesiredBalance desiredBalance) { + private RerouteStrategy createReconcileAllocationAction(DesiredBalance desiredBalance, BalancingRoundStats.Builder statsBuilder) { return new RerouteStrategy() { @Override public void removeDelayMarkers(RoutingAllocation allocation) { @@ -373,7 +436,7 @@ public void removeDelayMarkers(RoutingAllocation allocation) { @Override public void execute(RoutingAllocation allocation) { - reconcile(desiredBalance, allocation); + reconcile(desiredBalance, allocation, statsBuilder); } }; } @@ -389,9 +452,9 @@ public void resetDesiredBalance() { private void updateDesireBalanceMetrics( DesiredBalance desiredBalance, RoutingAllocation routingAllocation, - ClusterAllocationStats clusterAllocationStats + BalancingRoundStats balancingRoundStats ) { - if (clusterAllocationStats == ClusterAllocationStats.EMPTY_ALLOCATION_STATS) { + if (balancingRoundStats == BalancingRoundStats.EMPTY_BALANCING_ROUND_STATS) { return; } @@ -409,7 +472,7 @@ private void updateDesireBalanceMetrics( filteredNodeAllocationStatsAndWeights.put(node, nodeStatsAndWeight.getValue()); } } - desiredBalanceMetrics.updateMetrics(clusterAllocationStats, desiredBalance.weightsPerNode(), filteredNodeAllocationStatsAndWeights); + desiredBalanceMetrics.updateMetrics(balancingRoundStats, desiredBalance.weightsPerNode(), filteredNodeAllocationStatsAndWeights); } public DesiredBalanceStats getStats() { @@ -441,9 +504,11 @@ private void onNoLongerMaster() { private static final class ReconcileDesiredBalanceTask implements ClusterStateTaskListener { private final DesiredBalance desiredBalance; + private final BalancingRoundStats.Builder statsBuilder; - private ReconcileDesiredBalanceTask(DesiredBalance desiredBalance) { + private ReconcileDesiredBalanceTask(DesiredBalance desiredBalance, BalancingRoundStats.Builder statsBuilder) { this.desiredBalance = desiredBalance; + this.statsBuilder = statsBuilder; } @Override @@ -480,7 +545,7 @@ private ClusterState applyBalance( try (var ignored = batchExecutionContext.dropHeadersContext()) { var newState = reconciler.apply( batchExecutionContext.initialState(), - createReconcileAllocationAction(latest.getTask().desiredBalance) + createReconcileAllocationAction(latest.getTask().desiredBalance, latest.getTask().statsBuilder) ); latest.success(() -> pendingListenersQueue.complete(latest.getTask().desiredBalance.lastConvergedIndex())); return newState; diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java index 679d04224aefe..262115240487d 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java @@ -685,7 +685,12 @@ public void testDesiredBalanceShouldConvergeInABigCluster() { var settings = Settings.EMPTY; - var input = new DesiredBalanceInput(randomInt(), routingAllocationWithDecidersOf(clusterState, clusterInfo, settings), List.of()); + var input = new DesiredBalanceInput( + randomInt(), + routingAllocationWithDecidersOf(clusterState, clusterInfo, settings), + List.of(), + new BalancingRoundStats.Builder() + ); var desiredBalance = createDesiredBalanceComputer(new BalancedShardsAllocator(settings)).compute( DesiredBalance.BECOME_MASTER_INITIAL, input, @@ -834,7 +839,12 @@ public void testComputeConsideringShardSizes() { var desiredBalance = createDesiredBalanceComputer(new BalancedShardsAllocator(settings)).compute( initial, - new DesiredBalanceInput(randomInt(), routingAllocationWithDecidersOf(clusterState, clusterInfo, settings), List.of()), + new DesiredBalanceInput( + randomInt(), + routingAllocationWithDecidersOf(clusterState, clusterInfo, settings), + List.of(), + new BalancingRoundStats.Builder() + ), queue(), input -> true ); @@ -994,7 +1004,7 @@ public void testAccountForSizeOfMisplacedShardsDuringNewComputation() { ); var nextDesiredBalance = createDesiredBalanceComputer(new BalancedShardsAllocator()).compute( initialDesiredBalance, - new DesiredBalanceInput(2, allocation, List.of()), + new DesiredBalanceInput(2, allocation, List.of(), new BalancingRoundStats.Builder()), queue(), input -> true ); @@ -1120,7 +1130,7 @@ public void testAccountForSizeOfAllInitializingShardsDuringAllocation() { ); var nextDesiredBalance = createDesiredBalanceComputer(new BalancedShardsAllocator()).compute( initialDesiredBalance, - new DesiredBalanceInput(2, allocation, List.of()), + new DesiredBalanceInput(2, allocation, List.of(), new BalancingRoundStats.Builder()), queue(), input -> true ); @@ -1379,7 +1389,7 @@ private static void assertDesiredAssignments(DesiredBalance desiredBalance, Map< } private static DesiredBalanceInput createInput(ClusterState clusterState, ShardRouting... ignored) { - return new DesiredBalanceInput(randomInt(), routingAllocationOf(clusterState), List.of(ignored)); + return new DesiredBalanceInput(randomInt(), routingAllocationOf(clusterState), List.of(ignored), new BalancingRoundStats.Builder()); } private static RoutingAllocation routingAllocationOf(ClusterState clusterState) { diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetricsTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetricsTests.java index e6dbbdea6bd79..842604ebe76e0 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetricsTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetricsTests.java @@ -23,10 +23,25 @@ public class DesiredBalanceMetricsTests extends ESTestCase { public void testZeroAllMetrics() { DesiredBalanceMetrics metrics = new DesiredBalanceMetrics(MeterRegistry.NOOP); + long startTime = randomNonNegativeLong(); + long endTime = randomNonNegativeLong(); + long newlyAssignedShards = randomNonNegativeLong(); long unassignedShards = randomNonNegativeLong(); long totalAllocations = randomNonNegativeLong(); long undesiredAllocations = randomNonNegativeLong(); - metrics.updateMetrics(new ClusterAllocationStats(unassignedShards, totalAllocations, undesiredAllocations), Map.of(), Map.of()); + metrics.updateMetrics( + new BalancingRoundStats( + startTime, + endTime, + newlyAssignedShards, + unassignedShards, + totalAllocations, + undesiredAllocations, + true + ), + Map.of(), + Map.of() + ); assertEquals(totalAllocations, metrics.totalAllocations()); assertEquals(unassignedShards, metrics.unassignedShards()); assertEquals(undesiredAllocations, metrics.undesiredAllocations()); @@ -40,10 +55,25 @@ public void testMetricsAreOnlyPublishedWhenNodeIsMaster() { RecordingMeterRegistry meterRegistry = new RecordingMeterRegistry(); DesiredBalanceMetrics metrics = new DesiredBalanceMetrics(meterRegistry); + long startTime = randomNonNegativeLong(); + long endTime = randomNonNegativeLong(); + long newlyAssignedShards = randomNonNegativeLong(); long unassignedShards = randomNonNegativeLong(); long totalAllocations = randomLongBetween(100, 10000000); long undesiredAllocations = randomLongBetween(0, totalAllocations); - metrics.updateMetrics(new ClusterAllocationStats(unassignedShards, totalAllocations, undesiredAllocations), Map.of(), Map.of()); + metrics.updateMetrics( + new BalancingRoundStats( + startTime, + endTime, + newlyAssignedShards, + unassignedShards, + totalAllocations, + undesiredAllocations, + true + ), + Map.of(), + Map.of() + ); // Collect when not master meterRegistry.getRecorder().collect(); @@ -102,8 +132,10 @@ public void testMetricsAreOnlyPublishedWhenNodeIsMaster() { public void testUndesiredAllocationRatioIsZeroWhenTotalShardsIsZero() { RecordingMeterRegistry meterRegistry = new RecordingMeterRegistry(); DesiredBalanceMetrics metrics = new DesiredBalanceMetrics(meterRegistry); + long startTime = randomNonNegativeLong(); + long endTime = randomNonNegativeLong(); long unassignedShards = randomNonNegativeLong(); - metrics.updateMetrics(new ClusterAllocationStats(unassignedShards, 0, 0), Map.of(), Map.of()); + metrics.updateMetrics(new BalancingRoundStats(startTime, endTime, 0, unassignedShards, 0, 0, true), Map.of(), Map.of()); metrics.setNodeIsMaster(true); meterRegistry.getRecorder().collect(); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java index 81aa1a60eb45e..89ad040b18713 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java @@ -110,11 +110,13 @@ public class DesiredBalanceReconcilerTests extends ESAllocationTestCase { + private BalancingRoundStats.Builder statsBuilderDummy = new BalancingRoundStats.Builder(); + public void testNoChangesOnEmptyDesiredBalance() { final var clusterState = DesiredBalanceComputerTests.createInitialClusterState(3); final var routingAllocation = createRoutingAllocationFrom(clusterState); - reconcile(routingAllocation, new DesiredBalance(1, Map.of())); + reconcile(routingAllocation, new DesiredBalance(1, Map.of()), statsBuilderDummy); assertFalse(routingAllocation.routingNodesChanged()); } @@ -186,7 +188,8 @@ public void testFailsNewPrimariesIfNoDataNodes() { new ShardId(clusterState.metadata().index(DesiredBalanceComputerTests.TEST_INDEX).getIndex(), 0), new ShardAssignment(Set.of("node-0"), 1, 0, 0) ) - ) + ), + statsBuilderDummy ); assertTrue(routingAllocation.routingNodesChanged()); @@ -236,7 +239,7 @@ public void testUnassignedPrimariesBeforeUnassignedReplicas() { ); final var allocationService = createTestAllocationService( - routingAllocation -> reconcile(routingAllocation, desiredBalance), + routingAllocation -> reconcile(routingAllocation, desiredBalance, statsBuilderDummy), new SameShardAllocationDecider(clusterSettings), new ReplicaAfterPrimaryActiveAllocationDecider(), new ThrottlingAllocationDecider(clusterSettings), @@ -319,7 +322,7 @@ public void testUnassignedShardsInterleaving() { final var desiredBalance = desiredBalance(clusterState, (shardId, nodeId) -> true); final var allocationService = createTestAllocationService( - routingAllocation -> reconcile(routingAllocation, desiredBalance), + routingAllocation -> reconcile(routingAllocation, desiredBalance, statsBuilderDummy), new SameShardAllocationDecider(clusterSettings), new ReplicaAfterPrimaryActiveAllocationDecider(), new ThrottlingAllocationDecider(clusterSettings) @@ -409,7 +412,7 @@ public void testUnassignedShardsPriority() { final var assignReplicas = new AtomicBoolean(false); final var allocationService = createTestAllocationService( - routingAllocation -> reconcile(routingAllocation, desiredBalance), + routingAllocation -> reconcile(routingAllocation, desiredBalance, statsBuilderDummy), new SameShardAllocationDecider(clusterSettings), new ReplicaAfterPrimaryActiveAllocationDecider(), new ThrottlingAllocationDecider(clusterSettings), @@ -517,7 +520,7 @@ public void testUnassignedRespectsDesiredBalance() { final var desiredBalance = desiredBalance(clusterState, (shardId, nodeId) -> true); final var allocationService = createTestAllocationService( - routingAllocation -> reconcile(routingAllocation, desiredBalance), + routingAllocation -> reconcile(routingAllocation, desiredBalance, statsBuilderDummy), new SameShardAllocationDecider(clusterSettings), new ReplicaAfterPrimaryActiveAllocationDecider() ); @@ -608,7 +611,7 @@ public void testUnassignedAllocationPredictsDiskUsage() { final var desiredBalance = desiredBalance(clusterState, (shardId, nodeId) -> true); final var allocationService = createTestAllocationService( - routingAllocation -> reconcile(routingAllocation, desiredBalance), + routingAllocation -> reconcile(routingAllocation, desiredBalance, statsBuilderDummy), () -> clusterInfo, () -> snapshotShardSizeInfo, new SameShardAllocationDecider(clusterSettings), @@ -648,7 +651,7 @@ public void testUnassignedSkipsEquivalentReplicas() { final var replicaDecision = randomFrom(Decision.THROTTLE, Decision.NO); final var desiredBalance = desiredBalance(clusterState, (shardId, nodeId) -> true); final var allocationService = createTestAllocationService( - routingAllocation -> reconcile(routingAllocation, desiredBalance), + routingAllocation -> reconcile(routingAllocation, desiredBalance, statsBuilderDummy), new SameShardAllocationDecider(clusterSettings), new ReplicaAfterPrimaryActiveAllocationDecider(), new AllocationDecider() { @@ -707,7 +710,7 @@ public void testUnassignedSetsAllocationStatusOnUnassignedShards() { final var nonYesDecision = randomFrom(Decision.THROTTLE, Decision.NO); final var desiredBalance = desiredBalance(clusterState, (shardId, nodeId) -> true); final var allocationService = createTestAllocationService( - routingAllocation -> reconcile(routingAllocation, desiredBalance), + routingAllocation -> reconcile(routingAllocation, desiredBalance, statsBuilderDummy), new SameShardAllocationDecider(clusterSettings), new ReplicaAfterPrimaryActiveAllocationDecider(), new AllocationDecider() { @@ -765,7 +768,7 @@ public void testUnassignedPrimariesThrottlingAndFallback() { final var allocationFilter = new AtomicReference>(); final var allocationService = createTestAllocationService( - routingAllocation -> reconcile(routingAllocation, desiredBalance), + routingAllocation -> reconcile(routingAllocation, desiredBalance, statsBuilderDummy), new SameShardAllocationDecider(clusterSettings), new ReplicaAfterPrimaryActiveAllocationDecider(), new ThrottlingAllocationDecider(clusterSettings), @@ -843,7 +846,7 @@ public void testMoveShards() { final var desiredBalance = new AtomicReference<>(desiredBalance(clusterState, (shardId, nodeId) -> true)); final var allocationService = createTestAllocationService( - routingAllocation -> reconcile(routingAllocation, desiredBalance.get()), + routingAllocation -> reconcile(routingAllocation, desiredBalance.get(), statsBuilderDummy), new SameShardAllocationDecider(clusterSettings), new ReplicaAfterPrimaryActiveAllocationDecider(), new ThrottlingAllocationDecider(clusterSettings), @@ -964,7 +967,7 @@ public void testRebalance() { desiredBalance(clusterState, (shardId, nodeId) -> nodeId.equals("node-0") || nodeId.equals("node-1")) ); final var allocationService = createTestAllocationService( - routingAllocation -> reconcile(routingAllocation, desiredBalance.get()), + routingAllocation -> reconcile(routingAllocation, desiredBalance.get(), statsBuilderDummy), new SameShardAllocationDecider(clusterSettings), new ReplicaAfterPrimaryActiveAllocationDecider(), new ThrottlingAllocationDecider(clusterSettings), @@ -1044,7 +1047,7 @@ public void testDoNotRebalanceToTheNodeThatNoLongerExists() { Map.of(shardId, new ShardAssignment(Set.of("node-1"), 1, 0, 0)) // shard is assigned to the node that has left ); - reconcile(allocation, balance); + reconcile(allocation, balance, statsBuilderDummy); assertThat(allocation.routingNodes().node("node-0"), notNullValue()); assertThat(allocation.routingNodes().node("node-1"), nullValue()); @@ -1070,7 +1073,7 @@ public void testDoNotAllocateIgnoredShards() { Map.of(shardId, new ShardAssignment(Set.of(), 1, 1, 1)) // shard is ignored ); - reconcile(allocation, balance); + reconcile(allocation, balance, statsBuilderDummy); assertThat(allocation.routingNodes().node("node-0").size(), equalTo(0)); assertThat(allocation.routingNodes().unassigned().ignored(), hasSize(1)); @@ -1100,7 +1103,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing final var allocation = createRoutingAllocationFrom(clusterState, initialForcedAllocationDecider); final var balance = new DesiredBalance(1, Map.of(shardId, new ShardAssignment(desiredNodeIds, 2, 0, 0))); - reconcile(allocation, balance); + reconcile(allocation, balance, statsBuilderDummy); // only primary is allocated to the fallback node, replica stays unassigned assertThat(allocation.routingNodes().node("node-0").size() + allocation.routingNodes().node("node-1").size(), equalTo(0)); @@ -1130,7 +1133,7 @@ public Optional> getForcedInitialShardAllocationToNodes(ShardRouting final var allocation = createRoutingAllocationFrom(clusterState, allocationIsNotPossibleOnDesiredNodeDesiredNode); final var balance = new DesiredBalance(1, Map.of(shardId, new ShardAssignment(Set.of("node-0"), 1, 0, 0))); - reconcile(allocation, balance); + reconcile(allocation, balance, statsBuilderDummy); assertThat(allocation.routingNodes().node("node-0").size(), equalTo(0)); assertThat(allocation.routingNodes().node("node-1").size(), equalTo(1)); @@ -1164,7 +1167,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing final var allocation = createRoutingAllocationFrom(clusterState, initialForcedAllocationDecider); final var balance = new DesiredBalance(1, Map.of(shardId, new ShardAssignment(Set.of("node-0"), 1, 0, 0))); - reconcile(allocation, balance); + reconcile(allocation, balance, statsBuilderDummy); assertThat(allocation.routingNodes().node("node-0").size(), equalTo(0)); assertThat(allocation.routingNodes().node("node-1").size(), equalTo(0)); @@ -1225,7 +1228,7 @@ public void testRebalanceDoesNotCauseHotSpots() { while (true) { var allocation = createRoutingAllocationFrom(clusterState, deciders); - reconciler.reconcile(balance, allocation); + reconciler.reconcile(balance, allocation, statsBuilderDummy); var initializing = shardsWithState(allocation.routingNodes(), ShardRoutingState.INITIALIZING); if (initializing.isEmpty()) { @@ -1305,7 +1308,11 @@ public void testShouldLogOnTooManyUndesiredAllocations() { + ") are not on their desired nodes, which exceeds the warn threshold of [10%]"; // Desired assignment matches current routing table assertThatLogger( - () -> reconciler.reconcile(new DesiredBalance(1, allShardsDesiredOnDataNode1), createRoutingAllocationFrom(clusterState)), + () -> reconciler.reconcile( + new DesiredBalance(1, allShardsDesiredOnDataNode1), + createRoutingAllocationFrom(clusterState), + statsBuilderDummy + ), DesiredBalanceReconciler.class, new MockLog.UnseenEventExpectation( "Should not log if all shards on desired location", @@ -1315,7 +1322,11 @@ public void testShouldLogOnTooManyUndesiredAllocations() { ) ); assertThatLogger( - () -> reconciler.reconcile(new DesiredBalance(1, allShardsDesiredOnDataNode2), createRoutingAllocationFrom(clusterState)), + () -> reconciler.reconcile( + new DesiredBalance(1, allShardsDesiredOnDataNode2), + createRoutingAllocationFrom(clusterState), + statsBuilderDummy + ), DesiredBalanceReconciler.class, node1ShuttingDown ? new MockLog.UnseenEventExpectation( @@ -1332,7 +1343,11 @@ public void testShouldLogOnTooManyUndesiredAllocations() { ) ); assertThatLogger( - () -> reconciler.reconcile(new DesiredBalance(1, allShardsDesiredOnDataNode2), createRoutingAllocationFrom(clusterState)), + () -> reconciler.reconcile( + new DesiredBalance(1, allShardsDesiredOnDataNode2), + createRoutingAllocationFrom(clusterState), + statsBuilderDummy + ), DesiredBalanceReconciler.class, new MockLog.UnseenEventExpectation( "Should not log immediate second too many shards on undesired locations", @@ -1343,10 +1358,14 @@ public void testShouldLogOnTooManyUndesiredAllocations() { ); } - private static void reconcile(RoutingAllocation routingAllocation, DesiredBalance desiredBalance) { + private static void reconcile( + RoutingAllocation routingAllocation, + DesiredBalance desiredBalance, + BalancingRoundStats.Builder statsBuilder + ) { final var threadPool = mock(ThreadPool.class); when(threadPool.relativeTimeInMillisSupplier()).thenReturn(new AtomicLong()::incrementAndGet); - new DesiredBalanceReconciler(createBuiltInClusterSettings(), threadPool).reconcile(desiredBalance, routingAllocation); + new DesiredBalanceReconciler(createBuiltInClusterSettings(), threadPool).reconcile(desiredBalance, routingAllocation, statsBuilder); } private static boolean isReconciled(RoutingNode node, DesiredBalance balance) { diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java index 21d547c1593b8..94e0373dc8dcd 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java @@ -958,12 +958,16 @@ public void allocate(RoutingAllocation allocation, ActionListener listener } @Override - protected void reconcile(DesiredBalance desiredBalance, RoutingAllocation allocation) { + protected void reconcile( + DesiredBalance desiredBalance, + RoutingAllocation allocation, + BalancingRoundStats.Builder statsBuilder + ) { fail("should not call reconcile"); } @Override - protected void submitReconcileTask(DesiredBalance desiredBalance) { + protected void submitReconcileTask(DesiredBalance desiredBalance, BalancingRoundStats.Builder statsBuilder) { assertThat(desiredBalance.lastConvergedIndex(), equalTo(0L)); reconciliationTaskSubmitted.set(true); lastListener.onResponse(null); diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java index 132dd4b119469..5b21fc90f402c 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java @@ -29,6 +29,7 @@ import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster; import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; +import org.elasticsearch.cluster.routing.allocation.allocator.BalancingRoundStats; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalance; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator; import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator; @@ -183,14 +184,18 @@ public void allocate(RoutingAllocation allocation, ActionListener listener } @Override - protected void reconcile(DesiredBalance desiredBalance, RoutingAllocation allocation) { + protected void reconcile( + DesiredBalance desiredBalance, + RoutingAllocation allocation, + BalancingRoundStats.Builder statsBuilder + ) { // do nothing as balance is not computed yet (during allocate) } @Override - protected void submitReconcileTask(DesiredBalance desiredBalance) { + protected void submitReconcileTask(DesiredBalance desiredBalance, BalancingRoundStats.Builder statsBuilder) { // reconcile synchronously rather than in cluster state update task - super.reconcile(desiredBalance, lastAllocation); + super.reconcile(desiredBalance, lastAllocation, statsBuilder); } }; }