diff --git a/docs/changelog/136043.yaml b/docs/changelog/136043.yaml new file mode 100644 index 0000000000000..baee8094be783 --- /dev/null +++ b/docs/changelog/136043.yaml @@ -0,0 +1,5 @@ +pr: 136043 +summary: "Allocation: add balancer round summary as metrics" +area: Allocation +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java index f1ca84bccdfa4..e1e9444d983fb 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -41,6 +41,7 @@ import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision; import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster; +import org.elasticsearch.cluster.routing.allocation.allocator.AllocationBalancingRoundMetrics; import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.elasticsearch.cluster.routing.allocation.allocator.BalancerSettings; import org.elasticsearch.cluster.routing.allocation.allocator.BalancingWeightsFactory; @@ -141,6 +142,7 @@ public class ClusterModule extends AbstractModule { private final AllocationStatsService allocationStatsService; private final TelemetryProvider telemetryProvider; private final DesiredBalanceMetrics desiredBalanceMetrics; + private final AllocationBalancingRoundMetrics balancingRoundMetrics; public ClusterModule( Settings settings, @@ -168,6 +170,7 @@ public ClusterModule( balancingWeightsFactory ); this.desiredBalanceMetrics = new DesiredBalanceMetrics(telemetryProvider.getMeterRegistry()); + this.balancingRoundMetrics = new AllocationBalancingRoundMetrics(telemetryProvider.getMeterRegistry()); this.shardsAllocator = createShardsAllocator( settings, clusterService.getClusterSettings(), @@ -180,7 +183,8 @@ public ClusterModule( writeLoadForecaster, nodeAllocationStatsAndWeightsCalculator, this::explainShardAllocation, - desiredBalanceMetrics + desiredBalanceMetrics, + balancingRoundMetrics ); this.clusterService = clusterService; this.indexNameExpressionResolver = new IndexNameExpressionResolver(threadPool.getThreadContext(), systemIndices, projectResolver); @@ -521,7 +525,8 @@ private static ShardsAllocator createShardsAllocator( WriteLoadForecaster writeLoadForecaster, NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator, ShardAllocationExplainer shardAllocationExplainer, - DesiredBalanceMetrics desiredBalanceMetrics + DesiredBalanceMetrics desiredBalanceMetrics, + AllocationBalancingRoundMetrics balancingRoundMetrics ) { Map> allocators = new HashMap<>(); allocators.put( @@ -538,7 +543,8 @@ private static ShardsAllocator createShardsAllocator( reconciler, nodeAllocationStatsAndWeightsCalculator, shardAllocationExplainer, - desiredBalanceMetrics + desiredBalanceMetrics, + balancingRoundMetrics ) ); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/AllocationBalancingRoundMetrics.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/AllocationBalancingRoundMetrics.java new file mode 100644 index 0000000000000..3e1bd01048874 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/AllocationBalancingRoundMetrics.java @@ -0,0 +1,114 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing.allocation.allocator; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.allocation.allocator.BalancingRoundSummary.NodesWeightsChanges; +import org.elasticsearch.core.SuppressForbidden; +import org.elasticsearch.telemetry.metric.DoubleHistogram; +import org.elasticsearch.telemetry.metric.LongCounter; +import org.elasticsearch.telemetry.metric.LongHistogram; +import org.elasticsearch.telemetry.metric.MeterRegistry; + +import java.util.Map; + +/** + * A telemetry metrics sender for {@link BalancingRoundSummary} + */ +public class AllocationBalancingRoundMetrics { + + // counters that measure rounds and moves from the last balancing round + public static final String NUMBER_OF_BALANCING_ROUNDS_METRIC_NAME = "es.allocator.balancing_round.balancing_rounds.total"; + public static final String NUMBER_OF_SHARD_MOVES_METRIC_NAME = "es.allocator.balancing_round.shard_moves.total"; + public static final String NUMBER_OF_SHARD_MOVES_HISTOGRAM_METRIC_NAME = "es.allocator.balancing_round.shard_moves.histogram"; + + // histograms that measure current utilization + public static final String NUMBER_OF_SHARDS_METRIC_NAME = "es.allocator.balancing_round.shard_count.histogram"; + public static final String DISK_USAGE_BYTES_METRIC_NAME = "es.allocator.balancing_round.disk_usage_bytes.histogram"; + public static final String WRITE_LOAD_METRIC_NAME = "es.allocator.balancing_round.write_load.histogram"; + public static final String TOTAL_WEIGHT_METRIC_NAME = "es.allocator.balancing_round.total_weight.histogram"; + + private final LongCounter balancingRoundCounter; + private final LongCounter shardMovesCounter; + private final LongHistogram shardMovesHistogram; + + private final LongHistogram shardCountHistogram; + private final DoubleHistogram diskUsageHistogram; + private final DoubleHistogram writeLoadHistogram; + private final DoubleHistogram totalWeightHistogram; + + public static AllocationBalancingRoundMetrics NOOP = new AllocationBalancingRoundMetrics(MeterRegistry.NOOP); + + public AllocationBalancingRoundMetrics(MeterRegistry meterRegistry) { + this.balancingRoundCounter = meterRegistry.registerLongCounter( + NUMBER_OF_BALANCING_ROUNDS_METRIC_NAME, + "Total number of balancing rounds", + "unit" + ); + this.shardMovesCounter = meterRegistry.registerLongCounter( + NUMBER_OF_SHARD_MOVES_METRIC_NAME, + "Total number of shard moves", + "unit" + ); + + this.shardMovesHistogram = meterRegistry.registerLongHistogram( + NUMBER_OF_SHARD_MOVES_HISTOGRAM_METRIC_NAME, + "Number of shard movements executed in a balancing round", + "unit" + ); + this.shardCountHistogram = meterRegistry.registerLongHistogram( + NUMBER_OF_SHARDS_METRIC_NAME, + "change in node shard count per balancing round", + "unit" + ); + this.diskUsageHistogram = meterRegistry.registerDoubleHistogram( + DISK_USAGE_BYTES_METRIC_NAME, + "change in disk usage in bytes per balancing round", + "unit" + ); + this.writeLoadHistogram = meterRegistry.registerDoubleHistogram( + WRITE_LOAD_METRIC_NAME, + "change in write load per balancing round", + "1.0" + ); + this.totalWeightHistogram = meterRegistry.registerDoubleHistogram( + TOTAL_WEIGHT_METRIC_NAME, + "change in total weight per balancing round", + "1.0" + ); + } + + @SuppressForbidden(reason = "ForbiddenAPIs bans Math.abs(long) because of overflow on Long.MIN_VALUE, but this is impossible here") + private long longAbsNegativeSafe(long value) { + assert value != Long.MIN_VALUE : "value must not be Long.MIN_VALUE"; + return Math.abs(value); + } + + public void addBalancingRoundSummary(BalancingRoundSummary summary) { + balancingRoundCounter.increment(); + shardMovesCounter.incrementBy(summary.numberOfShardsToMove()); + shardMovesHistogram.record(summary.numberOfShardsToMove()); + + for (Map.Entry changesEntry : summary.nodeToWeightChanges().entrySet()) { + DiscoveryNode node = changesEntry.getKey(); + NodesWeightsChanges weightChanges = changesEntry.getValue(); + BalancingRoundSummary.NodeWeightsDiff weightsDiff = weightChanges.weightsDiff(); + + shardCountHistogram.record(longAbsNegativeSafe(weightsDiff.shardCountDiff()), getNodeAttributes(node)); + diskUsageHistogram.record(Math.abs(weightsDiff.diskUsageInBytesDiff()), getNodeAttributes(node)); + writeLoadHistogram.record(Math.abs(weightsDiff.writeLoadDiff()), getNodeAttributes(node)); + totalWeightHistogram.record(Math.abs(weightsDiff.totalWeightDiff()), getNodeAttributes(node)); + } + } + + private Map getNodeAttributes(DiscoveryNode node) { + return Map.of("node_name", node.getName(), "node_id", node.getId()); + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/AllocationBalancingRoundSummaryService.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/AllocationBalancingRoundSummaryService.java index 39ecf066096eb..c3a2853daa1dc 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/AllocationBalancingRoundSummaryService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/AllocationBalancingRoundSummaryService.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.core.TimeValue; @@ -58,6 +59,7 @@ public class AllocationBalancingRoundSummaryService { private final ThreadPool threadPool; private volatile boolean enableBalancerRoundSummaries; private volatile TimeValue summaryReportInterval; + private final AllocationBalancingRoundMetrics balancingRoundMetrics; /** * A concurrency-safe list of balancing round summaries. Balancer rounds are run and added here serially, so the queue will naturally @@ -68,12 +70,17 @@ public class AllocationBalancingRoundSummaryService { /** This reference is set when reporting is scheduled. If it is null, then reporting is inactive. */ private final AtomicReference scheduledReportFuture = new AtomicReference<>(); - public AllocationBalancingRoundSummaryService(ThreadPool threadPool, ClusterSettings clusterSettings) { + public AllocationBalancingRoundSummaryService( + ThreadPool threadPool, + ClusterSettings clusterSettings, + AllocationBalancingRoundMetrics balancingRoundMetrics + ) { this.threadPool = threadPool; // Initialize the local setting values to avoid a null access when ClusterSettings#initializeAndWatch is called on each setting: // updating enableBalancerRoundSummaries accesses summaryReportInterval. this.enableBalancerRoundSummaries = clusterSettings.get(ENABLE_BALANCER_ROUND_SUMMARIES_SETTING); this.summaryReportInterval = clusterSettings.get(BALANCER_ROUND_SUMMARIES_LOG_INTERVAL_SETTING); + this.balancingRoundMetrics = balancingRoundMetrics; clusterSettings.initializeAndWatch(ENABLE_BALANCER_ROUND_SUMMARIES_SETTING, value -> { this.enableBalancerRoundSummaries = value; @@ -99,14 +106,14 @@ public static BalancingRoundSummary createBalancerRoundSummary(DesiredBalance ol * Creates a summary of the node weight changes from {@code oldDesiredBalance} to {@code newDesiredBalance}. * See {@link BalancingRoundSummary.NodesWeightsChanges} for content details. */ - private static Map createWeightsSummary( + private static Map createWeightsSummary( DesiredBalance oldDesiredBalance, DesiredBalance newDesiredBalance ) { var oldWeightsPerNode = oldDesiredBalance.weightsPerNode(); var newWeightsPerNode = newDesiredBalance.weightsPerNode(); - Map nodeNameToWeightInfo = new HashMap<>(oldWeightsPerNode.size()); + Map nodeNameToWeightInfo = new HashMap<>(oldWeightsPerNode.size()); for (var nodeAndWeights : oldWeightsPerNode.entrySet()) { var discoveryNode = nodeAndWeights.getKey(); var oldNodeWeightStats = nodeAndWeights.getValue(); @@ -116,7 +123,7 @@ private static Map createWeig var newNodeWeightStats = newWeightsPerNode.getOrDefault(discoveryNode, DesiredBalanceMetrics.NodeWeightStats.ZERO); nodeNameToWeightInfo.put( - discoveryNode.getName(), + discoveryNode, new BalancingRoundSummary.NodesWeightsChanges( oldNodeWeightStats, BalancingRoundSummary.NodeWeightsDiff.create(oldNodeWeightStats, newNodeWeightStats) @@ -128,11 +135,11 @@ private static Map createWeig // the new DesiredBalance to check. for (var nodeAndWeights : newWeightsPerNode.entrySet()) { var discoveryNode = nodeAndWeights.getKey(); - if (nodeNameToWeightInfo.containsKey(discoveryNode.getName()) == false) { + if (nodeNameToWeightInfo.containsKey(discoveryNode) == false) { // This node is new in the new DesiredBalance, there was no entry added during iteration of the nodes in the old // DesiredBalance. So we'll make a new entry with a base of zero value weights and a weights diff of the new node's weights. nodeNameToWeightInfo.put( - discoveryNode.getName(), + discoveryNode, new BalancingRoundSummary.NodesWeightsChanges( DesiredBalanceMetrics.NodeWeightStats.ZERO, BalancingRoundSummary.NodeWeightsDiff.create(DesiredBalanceMetrics.NodeWeightStats.ZERO, nodeAndWeights.getValue()) @@ -164,6 +171,7 @@ public void addBalancerRoundSummary(BalancingRoundSummary summary) { } summaries.add(summary); + balancingRoundMetrics.addBalancingRoundSummary(summary); } /** diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingRoundSummary.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingRoundSummary.java index 62331019937bb..22ad18a330f59 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingRoundSummary.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingRoundSummary.java @@ -9,6 +9,9 @@ package org.elasticsearch.cluster.routing.allocation.allocator; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.core.Strings; + import java.util.HashMap; import java.util.List; import java.util.Map; @@ -16,12 +19,12 @@ /** * Summarizes the impact to the cluster as a result of a rebalancing round. * - * @param nodeNameToWeightChanges The shard balance weight changes for each node (by name), comparing a previous DesiredBalance shard + * @param nodeToWeightChanges The shard balance weight changes for each DiscoveryNode, comparing a previous DesiredBalance shard * allocation to a new DesiredBalance allocation. * @param numberOfShardsToMove The number of shard moves required to move from the previous desired balance to the new one. Does not include * new (index creation) or removed (index deletion) shard assignements. */ -public record BalancingRoundSummary(Map nodeNameToWeightChanges, long numberOfShardsToMove) { +public record BalancingRoundSummary(Map nodeToWeightChanges, long numberOfShardsToMove) { /** * Represents the change in weights for a node going from an old DesiredBalance to a new DesiredBalance @@ -75,8 +78,8 @@ public NodeWeightsDiff combine(NodeWeightsDiff otherDiff) { @Override public String toString() { return "BalancingRoundSummary{" - + "nodeNameToWeightChanges" - + nodeNameToWeightChanges + + "nodeToWeightChanges" + + nodeToWeightChanges + ", numberOfShardsToMove=" + numberOfShardsToMove + '}'; @@ -93,17 +96,34 @@ public String toString() { * latest desired balance. * * @param numberOfBalancingRounds How many balancing round summaries are combined in this report. - * @param nodeNameToWeightChanges + * @param nodeToWeightChanges * @param numberOfShardMoves The sum of shard moves for each balancing round being combined into a single summary. */ public record CombinedBalancingRoundSummary( int numberOfBalancingRounds, - Map nodeNameToWeightChanges, + Map nodeToWeightChanges, long numberOfShardMoves ) { public static final CombinedBalancingRoundSummary EMPTY_RESULTS = new CombinedBalancingRoundSummary(0, new HashMap<>(), 0); + /** + * Serialize the CombinedBalancingRoundSummary to a compact log representation, where {@link DiscoveryNode#getName()} is used + * instead of the entire {@link DiscoveryNode#toString()} method. + */ + @Override + public String toString() { + Map nodeNameToWeightChanges = new HashMap<>(nodeToWeightChanges.size()); + nodeToWeightChanges.forEach((node, nodesWeightChanges) -> nodeNameToWeightChanges.put(node.getName(), nodesWeightChanges)); + + return Strings.format( + "CombinedBalancingRoundSummary[numberOfBalancingRounds=%d, nodeToWeightChange=%s, numberOfShardMoves=%d]", + numberOfBalancingRounds, + nodeNameToWeightChanges, + numberOfShardMoves + ); + } + /** * Merges multiple {@link BalancingRoundSummary} summaries into a single {@link CombinedBalancingRoundSummary}. */ @@ -113,7 +133,7 @@ public static CombinedBalancingRoundSummary combine(List } // We will loop through the summaries and sum the weight diffs for each node entry. - Map combinedNodeNameToWeightChanges = new HashMap<>(); + Map combinedNodeNameToWeightChanges = new HashMap<>(); // Number of shards moves are simply summed across summaries. Each new balancing round is built upon the last one, so it is // possible that a shard is reassigned back to a node before it even moves away, and that will still be counted as 2 moves here. @@ -128,7 +148,7 @@ public static CombinedBalancingRoundSummary combine(List // We'll build the weight changes by keeping the node weight base from the first summary in which a node appears and then // summing the weight diffs in each summary to get total weight diffs across summaries. - for (var nodeNameAndWeights : summary.nodeNameToWeightChanges.entrySet()) { + for (var nodeNameAndWeights : summary.nodeToWeightChanges.entrySet()) { var combined = combinedNodeNameToWeightChanges.get(nodeNameAndWeights.getKey()); if (combined == null) { // Either this is the first summary, and combinedNodeNameToWeightChanges hasn't been initialized yet for this node; diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java index 4d20e88fd5d9c..927e0158e29ae 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java @@ -90,6 +90,7 @@ public class DesiredBalanceShardsAllocator implements ShardsAllocator { private final Set processedNodeShutdowns = new HashSet<>(); private final NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator; private final DesiredBalanceMetrics desiredBalanceMetrics; + private final AllocationBalancingRoundMetrics balancingRoundMetrics; /** * Manages balancer round results in order to report on the balancer activity in a configurable manner. */ @@ -121,7 +122,8 @@ public DesiredBalanceShardsAllocator( DesiredBalanceReconcilerAction reconciler, NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator, ShardAllocationExplainer shardAllocationExplainer, - DesiredBalanceMetrics desiredBalanceMetrics + DesiredBalanceMetrics desiredBalanceMetrics, + AllocationBalancingRoundMetrics balancingRoundMetrics ) { this( delegateAllocator, @@ -130,7 +132,8 @@ public DesiredBalanceShardsAllocator( new DesiredBalanceComputer(clusterSettings, threadPool, delegateAllocator, shardAllocationExplainer), reconciler, nodeAllocationStatsAndWeightsCalculator, - desiredBalanceMetrics + desiredBalanceMetrics, + balancingRoundMetrics ); } @@ -141,11 +144,17 @@ public DesiredBalanceShardsAllocator( DesiredBalanceComputer desiredBalanceComputer, DesiredBalanceReconcilerAction reconciler, NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator, - DesiredBalanceMetrics desiredBalanceMetrics + DesiredBalanceMetrics desiredBalanceMetrics, + AllocationBalancingRoundMetrics balancingRoundMetrics ) { this.desiredBalanceMetrics = desiredBalanceMetrics; + this.balancingRoundMetrics = balancingRoundMetrics; this.nodeAllocationStatsAndWeightsCalculator = nodeAllocationStatsAndWeightsCalculator; - this.balancerRoundSummaryService = new AllocationBalancingRoundSummaryService(threadPool, clusterService.getClusterSettings()); + this.balancerRoundSummaryService = new AllocationBalancingRoundSummaryService( + threadPool, + clusterService.getClusterSettings(), + balancingRoundMetrics + ); this.delegateAllocator = delegateAllocator; this.threadPool = threadPool; this.reconciler = reconciler; diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportDeleteDesiredBalanceActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportDeleteDesiredBalanceActionTests.java index f9dacdb1b1e1f..19f2f4df6974b 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportDeleteDesiredBalanceActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportDeleteDesiredBalanceActionTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.routing.allocation.allocator.AllocationBalancingRoundMetrics; import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalance; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceComputer; @@ -121,7 +122,8 @@ public DesiredBalance compute( computer, (state, action) -> state, EMPTY_NODE_ALLOCATION_STATS, - DesiredBalanceMetrics.NOOP + DesiredBalanceMetrics.NOOP, + AllocationBalancingRoundMetrics.NOOP ); var allocationService = new MockAllocationService( randomAllocationDeciders(settings, clusterSettings), diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java index 08ebdbea6e6c9..987999caa468e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.allocation.allocator.AllocationBalancingRoundMetrics; import org.elasticsearch.cluster.routing.allocation.allocator.BalancerSettings; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalance; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceMetrics; @@ -177,7 +178,8 @@ public void testUndesiredShardCount() { (innerState, strategy) -> innerState, EMPTY_NODE_ALLOCATION_STATS, TEST_ONLY_EXPLAINER, - DesiredBalanceMetrics.NOOP + DesiredBalanceMetrics.NOOP, + AllocationBalancingRoundMetrics.NOOP ) { @Override public DesiredBalance getDesiredBalance() { diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/AllocationBalancingRoundSummaryServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/AllocationBalancingRoundSummaryServiceTests.java index 8345d3261139d..af5956414e508 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/AllocationBalancingRoundSummaryServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/AllocationBalancingRoundSummaryServiceTests.java @@ -13,48 +13,57 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.telemetry.InstrumentType; +import org.elasticsearch.telemetry.Measurement; +import org.elasticsearch.telemetry.MetricRecorder; +import org.elasticsearch.telemetry.RecordingMeterRegistry; +import org.elasticsearch.telemetry.metric.Instrument; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.MockLog; import org.elasticsearch.threadpool.ThreadPool; import org.junit.Before; +import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Function; + +import static org.elasticsearch.cluster.routing.allocation.allocator.AllocationBalancingRoundMetrics.DISK_USAGE_BYTES_METRIC_NAME; +import static org.elasticsearch.cluster.routing.allocation.allocator.AllocationBalancingRoundMetrics.NUMBER_OF_BALANCING_ROUNDS_METRIC_NAME; +import static org.elasticsearch.cluster.routing.allocation.allocator.AllocationBalancingRoundMetrics.NUMBER_OF_SHARDS_METRIC_NAME; +import static org.elasticsearch.cluster.routing.allocation.allocator.AllocationBalancingRoundMetrics.NUMBER_OF_SHARD_MOVES_HISTOGRAM_METRIC_NAME; +import static org.elasticsearch.cluster.routing.allocation.allocator.AllocationBalancingRoundMetrics.NUMBER_OF_SHARD_MOVES_METRIC_NAME; +import static org.elasticsearch.cluster.routing.allocation.allocator.AllocationBalancingRoundMetrics.TOTAL_WEIGHT_METRIC_NAME; +import static org.elasticsearch.cluster.routing.allocation.allocator.AllocationBalancingRoundMetrics.WRITE_LOAD_METRIC_NAME; public class AllocationBalancingRoundSummaryServiceTests extends ESTestCase { private static final Logger logger = LogManager.getLogger(AllocationBalancingRoundSummaryServiceTests.class); private static final String BALANCING_SUMMARY_MSG_PREFIX = "Balancing round summaries:*"; - private static final Map NODE_NAME_TO_WEIGHT_CHANGES = Map.of( - "node1", + static final DiscoveryNode NODE_1 = DiscoveryNodeUtils.create("node1", "node1_id"); + static final DiscoveryNode NODE_2 = DiscoveryNodeUtils.create("node2", "node2_id"); + + private static final Map NODE_NAME_TO_WEIGHT_CHANGES = Map.of( + NODE_1, new BalancingRoundSummary.NodesWeightsChanges( new DesiredBalanceMetrics.NodeWeightStats(1L, 2, 3, 4), new BalancingRoundSummary.NodeWeightsDiff(1, 2, 3, 4) ), - "node2", + NODE_2, new BalancingRoundSummary.NodesWeightsChanges( new DesiredBalanceMetrics.NodeWeightStats(1L, 2, 3, 4), new BalancingRoundSummary.NodeWeightsDiff(1, 2, 3, 4) ) ); - final DiscoveryNode DUMMY_NODE = new DiscoveryNode("node1Name", "node1Id", "eph-node1", "abc", "abc", null, Map.of(), Set.of(), null); - final DiscoveryNode SECOND_DUMMY_NODE = new DiscoveryNode( - "node2Name", - "node2Id", - "eph-node2", - "def", - "def", - null, - Map.of(), - Set.of(), - null - ); + final DiscoveryNode DUMMY_NODE = DiscoveryNodeUtils.create("dummy1Name", "dummy1Id"); + final DiscoveryNode SECOND_DUMMY_NODE = DiscoveryNodeUtils.create("dummy2Name", "dummy2Id"); final String INDEX_NAME = "index"; final String INDEX_UUID = "_indexUUID_"; @@ -86,7 +95,13 @@ public void setUpThreadPool() { * {@link AllocationBalancingRoundSummaryService#ENABLE_BALANCER_ROUND_SUMMARIES_SETTING} defaults to false. */ public void testServiceDisabledByDefault() { - var service = new AllocationBalancingRoundSummaryService(testThreadPool, disabledDefaultEmptyClusterSettings); + var recordingMeterRegistry = new RecordingMeterRegistry(); + var balancingRoundMetrics = new AllocationBalancingRoundMetrics(recordingMeterRegistry); + var service = new AllocationBalancingRoundSummaryService( + testThreadPool, + disabledDefaultEmptyClusterSettings, + balancingRoundMetrics + ); try (var mockLog = MockLog.capture(AllocationBalancingRoundSummaryService.class)) { /** @@ -110,11 +125,15 @@ public void testServiceDisabledByDefault() { deterministicTaskQueue.runAllRunnableTasks(); mockLog.awaitAllExpectationsMatched(); service.verifyNumberOfSummaries(0); + + assertMetricsCollected(recordingMeterRegistry, List.of(), List.of(), Map.of(), Map.of(), Map.of(), Map.of()); } } public void testEnabledService() { - var service = new AllocationBalancingRoundSummaryService(testThreadPool, enabledClusterSettings); + var recordingMeterRegistry = new RecordingMeterRegistry(); + var balancingRoundMetrics = new AllocationBalancingRoundMetrics(recordingMeterRegistry); + var service = new AllocationBalancingRoundSummaryService(testThreadPool, enabledClusterSettings, balancingRoundMetrics); try (var mockLog = MockLog.capture(AllocationBalancingRoundSummaryService.class)) { /** @@ -156,6 +175,16 @@ public void testEnabledService() { deterministicTaskQueue.runAllRunnableTasks(); mockLog.awaitAllExpectationsMatched(); service.verifyNumberOfSummaries(0); + + assertMetricsCollected( + recordingMeterRegistry, + List.of(1L, 1L), + List.of(50L, 200L), + Map.of("node1", List.of(1L, 1L), "node2", List.of(1L, 1L)), + Map.of("node1", List.of(2.0, 2.0), "node2", List.of(2.0, 2.0)), + Map.of("node1", List.of(3.0, 3.0), "node2", List.of(3.0, 3.0)), + Map.of("node1", List.of(4.0, 4.0), "node2", List.of(4.0, 4.0)) + ); } } @@ -163,7 +192,9 @@ public void testEnabledService() { * The service should combine multiple summaries together into a single report when multiple summaries were added since the last report. */ public void testCombinedSummary() { - var service = new AllocationBalancingRoundSummaryService(testThreadPool, enabledClusterSettings); + var recordingMeterRegistry = new RecordingMeterRegistry(); + var balancingRoundMetrics = new AllocationBalancingRoundMetrics(recordingMeterRegistry); + var service = new AllocationBalancingRoundSummaryService(testThreadPool, enabledClusterSettings, balancingRoundMetrics); try (var mockLog = MockLog.capture(AllocationBalancingRoundSummaryService.class)) { service.addBalancerRoundSummary(new BalancingRoundSummary(NODE_NAME_TO_WEIGHT_CHANGES, 50)); @@ -182,6 +213,16 @@ public void testCombinedSummary() { deterministicTaskQueue.runAllRunnableTasks(); mockLog.awaitAllExpectationsMatched(); service.verifyNumberOfSummaries(0); + + assertMetricsCollected( + recordingMeterRegistry, + List.of(1L, 1L), + List.of(50L, 100L), + Map.of("node1", List.of(1L, 1L), "node2", List.of(1L, 1L)), + Map.of("node1", List.of(2.0, 2.0), "node2", List.of(2.0, 2.0)), + Map.of("node1", List.of(3.0, 3.0), "node2", List.of(3.0, 3.0)), + Map.of("node1", List.of(4.0, 4.0), "node2", List.of(4.0, 4.0)) + ); } } @@ -189,7 +230,9 @@ public void testCombinedSummary() { * The service shouldn't log anything when there haven't been any summaries added since the last report. */ public void testNoSummariesToReport() { - var service = new AllocationBalancingRoundSummaryService(testThreadPool, enabledClusterSettings); + var recordingMeterRegistry = new RecordingMeterRegistry(); + var balancingRoundMetrics = new AllocationBalancingRoundMetrics(recordingMeterRegistry); + var service = new AllocationBalancingRoundSummaryService(testThreadPool, enabledClusterSettings, balancingRoundMetrics); try (var mockLog = MockLog.capture(AllocationBalancingRoundSummaryService.class)) { /** @@ -207,6 +250,16 @@ public void testNoSummariesToReport() { ) ); + assertMetricsCollected( + recordingMeterRegistry, + List.of(1L), + List.of(50L), + Map.of("node1", List.of(1L), "node2", List.of(1L)), + Map.of("node1", List.of(2.0), "node2", List.of(2.0)), + Map.of("node1", List.of(3.0), "node2", List.of(3.0)), + Map.of("node1", List.of(4.0), "node2", List.of(4.0)) + ); + deterministicTaskQueue.advanceTime(); deterministicTaskQueue.runAllRunnableTasks(); mockLog.awaitAllExpectationsMatched(); @@ -229,6 +282,16 @@ public void testNoSummariesToReport() { deterministicTaskQueue.runAllRunnableTasks(); mockLog.awaitAllExpectationsMatched(); service.verifyNumberOfSummaries(0); + + assertMetricsCollected( + recordingMeterRegistry, + List.of(1L), + List.of(50L), + Map.of("node1", List.of(1L), "node2", List.of(1L)), + Map.of("node1", List.of(2.0), "node2", List.of(2.0)), + Map.of("node1", List.of(3.0), "node2", List.of(3.0)), + Map.of("node1", List.of(4.0), "node2", List.of(4.0)) + ); } } @@ -237,11 +300,13 @@ public void testNoSummariesToReport() { * to false. */ public void testEnableAndThenDisableService() { + var recordingMeterRegistry = new RecordingMeterRegistry(); + var balancingRoundMetrics = new AllocationBalancingRoundMetrics(recordingMeterRegistry); var disabledSettingsUpdate = Settings.builder() .put(AllocationBalancingRoundSummaryService.ENABLE_BALANCER_ROUND_SUMMARIES_SETTING.getKey(), false) .build(); ClusterSettings clusterSettings = new ClusterSettings(enabledSummariesSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - var service = new AllocationBalancingRoundSummaryService(testThreadPool, clusterSettings); + var service = new AllocationBalancingRoundSummaryService(testThreadPool, clusterSettings, balancingRoundMetrics); try (var mockLog = MockLog.capture(AllocationBalancingRoundSummaryService.class)) { /** @@ -255,6 +320,16 @@ public void testEnableAndThenDisableService() { clusterSettings.applySettings(disabledSettingsUpdate); service.verifyNumberOfSummaries(0); + assertMetricsCollected( + recordingMeterRegistry, + List.of(1L), + List.of(50L), + Map.of("node1", List.of(1L), "node2", List.of(1L)), + Map.of("node1", List.of(2.0), "node2", List.of(2.0)), + Map.of("node1", List.of(3.0), "node2", List.of(3.0)), + Map.of("node1", List.of(4.0), "node2", List.of(4.0)) + ); + /** * Verify that any additional summaries are not retained, since the service is disabled. */ @@ -275,6 +350,16 @@ public void testEnableAndThenDisableService() { deterministicTaskQueue.runAllRunnableTasks(); mockLog.awaitAllExpectationsMatched(); service.verifyNumberOfSummaries(0); + + assertMetricsCollected( + recordingMeterRegistry, + List.of(1L), + List.of(50L), + Map.of("node1", List.of(1L), "node2", List.of(1L)), + Map.of("node1", List.of(2.0), "node2", List.of(2.0)), + Map.of("node1", List.of(3.0), "node2", List.of(3.0)), + Map.of("node1", List.of(4.0), "node2", List.of(4.0)) + ); } } @@ -311,8 +396,8 @@ public void testCreateBalancerRoundSummary() { var secondSummary = AllocationBalancingRoundSummaryService.createBalancerRoundSummary(secondDesiredBalance, thirdDesiredBalance); assertEquals(2, firstSummary.numberOfShardsToMove()); - assertEquals(1, firstSummary.nodeNameToWeightChanges().size()); - var firstSummaryWeights = firstSummary.nodeNameToWeightChanges().get(DUMMY_NODE.getName()); + assertEquals(1, firstSummary.nodeToWeightChanges().size()); + var firstSummaryWeights = firstSummary.nodeToWeightChanges().get(DUMMY_NODE); assertEquals(10, firstSummaryWeights.baseWeights().shardCount()); assertDoublesEqual(20, firstSummaryWeights.baseWeights().diskUsageInBytes()); assertDoublesEqual(30, firstSummaryWeights.baseWeights().writeLoad()); @@ -323,8 +408,8 @@ public void testCreateBalancerRoundSummary() { assertDoublesEqual(40, firstSummaryWeights.weightsDiff().totalWeightDiff()); assertEquals(1, secondSummary.numberOfShardsToMove()); - assertEquals(1, secondSummary.nodeNameToWeightChanges().size()); - var secondSummaryWeights = secondSummary.nodeNameToWeightChanges().get(DUMMY_NODE.getName()); + assertEquals(1, secondSummary.nodeToWeightChanges().size()); + var secondSummaryWeights = secondSummary.nodeToWeightChanges().get(DUMMY_NODE); assertEquals(20, secondSummaryWeights.baseWeights().shardCount()); assertDoublesEqual(40, secondSummaryWeights.baseWeights().diskUsageInBytes()); assertDoublesEqual(60, secondSummaryWeights.baseWeights().writeLoad()); @@ -365,9 +450,9 @@ public void testCreateBalancerRoundSummaryWithRemovedNode() { var summary = AllocationBalancingRoundSummaryService.createBalancerRoundSummary(firstDesiredBalance, secondDesiredBalance); assertEquals(0, summary.numberOfShardsToMove()); - assertEquals(2, summary.nodeNameToWeightChanges().size()); + assertEquals(2, summary.nodeToWeightChanges().size()); - var summaryDummyNodeWeights = summary.nodeNameToWeightChanges().get(DUMMY_NODE.getName()); + var summaryDummyNodeWeights = summary.nodeToWeightChanges().get(DUMMY_NODE); assertEquals(10, summaryDummyNodeWeights.baseWeights().shardCount()); assertDoublesEqual(20, summaryDummyNodeWeights.baseWeights().diskUsageInBytes()); assertDoublesEqual(30, summaryDummyNodeWeights.baseWeights().writeLoad()); @@ -377,7 +462,7 @@ public void testCreateBalancerRoundSummaryWithRemovedNode() { assertDoublesEqual(30, summaryDummyNodeWeights.weightsDiff().writeLoadDiff()); assertDoublesEqual(40, summaryDummyNodeWeights.weightsDiff().totalWeightDiff()); - var summarySecondDummyNodeWeights = summary.nodeNameToWeightChanges().get(SECOND_DUMMY_NODE.getName()); + var summarySecondDummyNodeWeights = summary.nodeToWeightChanges().get(SECOND_DUMMY_NODE); assertEquals(5, summarySecondDummyNodeWeights.baseWeights().shardCount()); assertDoublesEqual(15, summarySecondDummyNodeWeights.baseWeights().diskUsageInBytes()); assertDoublesEqual(25, summarySecondDummyNodeWeights.baseWeights().writeLoad()); @@ -418,9 +503,9 @@ public void testCreateBalancerRoundSummaryWithAddedNode() { var summary = AllocationBalancingRoundSummaryService.createBalancerRoundSummary(firstDesiredBalance, secondDesiredBalance); assertEquals(1, summary.numberOfShardsToMove()); - assertEquals(2, summary.nodeNameToWeightChanges().size()); + assertEquals(2, summary.nodeToWeightChanges().size()); - var summaryDummyNodeWeights = summary.nodeNameToWeightChanges().get(DUMMY_NODE.getName()); + var summaryDummyNodeWeights = summary.nodeToWeightChanges().get(DUMMY_NODE); assertEquals(10, summaryDummyNodeWeights.baseWeights().shardCount()); assertDoublesEqual(20, summaryDummyNodeWeights.baseWeights().diskUsageInBytes()); assertDoublesEqual(30, summaryDummyNodeWeights.baseWeights().writeLoad()); @@ -430,7 +515,7 @@ public void testCreateBalancerRoundSummaryWithAddedNode() { assertDoublesEqual(30, summaryDummyNodeWeights.weightsDiff().writeLoadDiff()); assertDoublesEqual(40, summaryDummyNodeWeights.weightsDiff().totalWeightDiff()); - var summarySecondDummyNodeWeights = summary.nodeNameToWeightChanges().get(SECOND_DUMMY_NODE.getName()); + var summarySecondDummyNodeWeights = summary.nodeToWeightChanges().get(SECOND_DUMMY_NODE); assertEquals(0, summarySecondDummyNodeWeights.baseWeights().shardCount()); assertDoublesEqual(0, summarySecondDummyNodeWeights.baseWeights().diskUsageInBytes()); assertDoublesEqual(0, summarySecondDummyNodeWeights.baseWeights().writeLoad()); @@ -447,4 +532,63 @@ public void testCreateBalancerRoundSummaryWithAddedNode() { private void assertDoublesEqual(double expected, double actual) { assertEquals(expected, actual, 0.00001); } + + private void assertMetricsCollected( + RecordingMeterRegistry recordingMeterRegistry, + List roundCounts, + List shardMoves, + Map> shardCountTelemetry, + Map> diskUsageTelemetry, + Map> writeLoadTelemetry, + Map> totalWeightTelemetry + ) { + MetricRecorder metricRecorder = recordingMeterRegistry.getRecorder(); + + List measuredRoundCounts = metricRecorder.getMeasurements( + InstrumentType.LONG_COUNTER, + NUMBER_OF_BALANCING_ROUNDS_METRIC_NAME + ); + List measuredRoundCountValues = Measurement.getMeasurementValues(measuredRoundCounts, (measurement -> measurement.getLong())); + assertEquals(measuredRoundCountValues, roundCounts); + + List measuredShardMoves = metricRecorder.getMeasurements( + InstrumentType.LONG_COUNTER, + NUMBER_OF_SHARD_MOVES_METRIC_NAME + ); + List measuredShardMoveValues = Measurement.getMeasurementValues(measuredShardMoves, (measurement -> measurement.getLong())); + assertEquals(measuredShardMoveValues, shardMoves); + + List measuredShardMoveHistogram = metricRecorder.getMeasurements( + InstrumentType.LONG_HISTOGRAM, + NUMBER_OF_SHARD_MOVES_HISTOGRAM_METRIC_NAME + ); + List measuredShardMoveHistogramValues = Measurement.getMeasurementValues( + measuredShardMoveHistogram, + (measurement -> measurement.getLong()) + ); + assertEquals(measuredShardMoveHistogramValues, shardMoves); + + List measuredShardCounts = metricRecorder.getMeasurements(InstrumentType.LONG_HISTOGRAM, NUMBER_OF_SHARDS_METRIC_NAME); + var shardCountsByNode = groupMeasurementsByAttribute(measuredShardCounts, (measurement -> measurement.getLong())); + assertEquals(shardCountTelemetry, shardCountsByNode); + + List measuredDiskUsage = metricRecorder.getMeasurements(InstrumentType.DOUBLE_HISTOGRAM, DISK_USAGE_BYTES_METRIC_NAME); + var diskUsageByNode = groupMeasurementsByAttribute(measuredDiskUsage, (measurement -> measurement.getDouble())); + assertEquals(diskUsageTelemetry, diskUsageByNode); + + List measuredWriteLoad = metricRecorder.getMeasurements(InstrumentType.DOUBLE_HISTOGRAM, WRITE_LOAD_METRIC_NAME); + var writeLoadByNode = groupMeasurementsByAttribute(measuredWriteLoad, (measurement -> measurement.getDouble())); + assertEquals(writeLoadTelemetry, writeLoadByNode); + + List measuredTotalWeight = metricRecorder.getMeasurements(InstrumentType.DOUBLE_HISTOGRAM, TOTAL_WEIGHT_METRIC_NAME); + var totalWeightByNode = groupMeasurementsByAttribute(measuredTotalWeight, (measurement -> measurement.getDouble())); + assertEquals(totalWeightTelemetry, totalWeightByNode); + } + + private Map> groupMeasurementsByAttribute( + List measurements, + Function getMeasurementValue + ) { + return Measurement.groupMeasurementsByAttribute(measurements, (attrs -> (String) attrs.get("node_name")), getMeasurementValue); + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingRoundSummaryTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingRoundSummaryTests.java index 6291c629281dc..56764ac31afe4 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingRoundSummaryTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingRoundSummaryTests.java @@ -9,6 +9,8 @@ package org.elasticsearch.cluster.routing.allocation.allocator; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.routing.allocation.allocator.BalancingRoundSummary.CombinedBalancingRoundSummary; import org.elasticsearch.test.ESTestCase; @@ -22,8 +24,8 @@ public class BalancingRoundSummaryTests extends ESTestCase { * Tests the {@link BalancingRoundSummary.CombinedBalancingRoundSummary#combine(List)} method. */ public void testCombine() { - final String NODE_1 = "node1"; - final String NODE_2 = "node2"; + final DiscoveryNode NODE_1 = DiscoveryNodeUtils.create("node1", "node1Id"); + final DiscoveryNode NODE_2 = DiscoveryNodeUtils.create("node2", "node2Id"); final var node1BaseWeights = new DesiredBalanceMetrics.NodeWeightStats(10, 20, 30, 40); final var node2BaseWeights = new DesiredBalanceMetrics.NodeWeightStats(100, 200, 300, 400); final var commonDiff = new BalancingRoundSummary.NodeWeightsDiff(1, 2, 3, 4); @@ -78,10 +80,10 @@ public void testCombine() { assertEquals(2, combined.numberOfBalancingRounds()); assertEquals(shardMovesSummary1 + shardMovesSummary2, combined.numberOfShardMoves()); - assertEquals(2, combined.nodeNameToWeightChanges().size()); + assertEquals(2, combined.nodeToWeightChanges().size()); - var combinedNode1WeightsChanges = combined.nodeNameToWeightChanges().get(NODE_1); - var combinedNode2WeightsChanges = combined.nodeNameToWeightChanges().get(NODE_2); + var combinedNode1WeightsChanges = combined.nodeToWeightChanges().get(NODE_1); + var combinedNode2WeightsChanges = combined.nodeToWeightChanges().get(NODE_2); // The base weights for each node should match the first BalancingRoundSummary's base weight values. The diff weights will be summed // across all BalancingRoundSummary entries (in this case, there are two BalancingRoundSummary entries). diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationSimulationTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationSimulationTests.java index e164a848a9f23..6d2d27a5e7b10 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationSimulationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationSimulationTests.java @@ -493,7 +493,8 @@ private Map.Entry createNewAllocationSer .executeWithRoutingAllocation(clusterState, "reconcile-desired-balance", routingAllocationAction), EMPTY_NODE_ALLOCATION_STATS, TEST_ONLY_EXPLAINER, - DesiredBalanceMetrics.NOOP + DesiredBalanceMetrics.NOOP, + AllocationBalancingRoundMetrics.NOOP ) { @Override public void allocate(RoutingAllocation allocation, ActionListener listener) { diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java index ec5059bd3e224..7096d0f5399dc 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java @@ -185,7 +185,8 @@ public ClusterState apply(ClusterState clusterState, RerouteStrategy routingAllo reconcileAction, EMPTY_NODE_ALLOCATION_STATS, TEST_ONLY_EXPLAINER, - DesiredBalanceMetrics.NOOP + DesiredBalanceMetrics.NOOP, + AllocationBalancingRoundMetrics.NOOP ); assertValidStats(desiredBalanceShardsAllocator.getStats()); var allocationService = createAllocationService(desiredBalanceShardsAllocator, createGatewayAllocator(allocateUnassigned)); @@ -314,7 +315,8 @@ public ClusterState apply(ClusterState clusterState, RerouteStrategy routingAllo reconcileAction, EMPTY_NODE_ALLOCATION_STATS, TEST_ONLY_EXPLAINER, - DesiredBalanceMetrics.NOOP + DesiredBalanceMetrics.NOOP, + AllocationBalancingRoundMetrics.NOOP ); var allocationService = new AllocationService( new AllocationDeciders(List.of()), @@ -433,7 +435,8 @@ boolean hasEnoughIterations(int currentIteration) { }, reconcileAction, EMPTY_NODE_ALLOCATION_STATS, - DesiredBalanceMetrics.NOOP + DesiredBalanceMetrics.NOOP, + AllocationBalancingRoundMetrics.NOOP ); var allocationService = createAllocationService(desiredBalanceShardsAllocator, gatewayAllocator); allocationServiceRef.set(allocationService); @@ -561,7 +564,8 @@ public DesiredBalance compute( }, reconcileAction, EMPTY_NODE_ALLOCATION_STATS, - DesiredBalanceMetrics.NOOP + DesiredBalanceMetrics.NOOP, + AllocationBalancingRoundMetrics.NOOP ); var allocationService = createAllocationService(desiredBalanceShardsAllocator, gatewayAllocator); allocationServiceRef.set(allocationService); @@ -665,7 +669,8 @@ public DesiredBalance compute( }, reconcileAction, EMPTY_NODE_ALLOCATION_STATS, - DesiredBalanceMetrics.NOOP + DesiredBalanceMetrics.NOOP, + AllocationBalancingRoundMetrics.NOOP ); var allocationService = createAllocationService(desiredBalanceShardsAllocator, gatewayAllocator); @@ -758,7 +763,8 @@ public DesiredBalance compute( desiredBalanceComputer, (reconcilerClusterState, rerouteStrategy) -> reconcilerClusterState, EMPTY_NODE_ALLOCATION_STATS, - DesiredBalanceMetrics.NOOP + DesiredBalanceMetrics.NOOP, + AllocationBalancingRoundMetrics.NOOP ); var service = createAllocationService(desiredBalanceShardsAllocator, createGatewayAllocator()); @@ -868,7 +874,8 @@ public ShardAllocationDecision explainShardAllocation(ShardRouting shard, Routin (reconcilerClusterState, rerouteStrategy) -> allocationServiceRef.get() .executeWithRoutingAllocation(reconcilerClusterState, "reconcile-desired-balance", rerouteStrategy), EMPTY_NODE_ALLOCATION_STATS, - DesiredBalanceMetrics.NOOP + DesiredBalanceMetrics.NOOP, + AllocationBalancingRoundMetrics.NOOP ) { @Override protected void reconcile(DesiredBalance desiredBalance, RoutingAllocation allocation) { @@ -1073,7 +1080,8 @@ public void testResetDesiredBalanceOnNoLongerMaster() { desiredBalanceComputer, (reconcilerClusterState, rerouteStrategy) -> reconcilerClusterState, EMPTY_NODE_ALLOCATION_STATS, - DesiredBalanceMetrics.NOOP + DesiredBalanceMetrics.NOOP, + AllocationBalancingRoundMetrics.NOOP ); var service = createAllocationService(desiredBalanceShardsAllocator, createGatewayAllocator()); @@ -1128,7 +1136,8 @@ public void testResetDesiredBalanceOnNodeShutdown() { desiredBalanceComputer, (reconcilerClusterState, rerouteStrategy) -> reconcilerClusterState, EMPTY_NODE_ALLOCATION_STATS, - DesiredBalanceMetrics.NOOP + DesiredBalanceMetrics.NOOP, + AllocationBalancingRoundMetrics.NOOP ) { @Override public void resetDesiredBalance() { @@ -1224,7 +1233,8 @@ public DesiredBalance compute( }, (clusterState, rerouteStrategy) -> null, EMPTY_NODE_ALLOCATION_STATS, - DesiredBalanceMetrics.NOOP + DesiredBalanceMetrics.NOOP, + AllocationBalancingRoundMetrics.NOOP ) { private ActionListener lastListener; diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java index 933649ca20c6e..34fc441389137 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java @@ -31,6 +31,7 @@ import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision; import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster; +import org.elasticsearch.cluster.routing.allocation.allocator.AllocationBalancingRoundMetrics; import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.elasticsearch.cluster.routing.allocation.allocator.BalancerSettings; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalance; @@ -211,7 +212,8 @@ private static DesiredBalanceShardsAllocator createDesiredBalanceShardsAllocator null, EMPTY_NODE_ALLOCATION_STATS, TEST_ONLY_EXPLAINER, - DesiredBalanceMetrics.NOOP + DesiredBalanceMetrics.NOOP, + AllocationBalancingRoundMetrics.NOOP ) { private RoutingAllocation lastAllocation; diff --git a/test/framework/src/main/java/org/elasticsearch/telemetry/Measurement.java b/test/framework/src/main/java/org/elasticsearch/telemetry/Measurement.java index 4b769a25b295f..d45ff79f05ab1 100644 --- a/test/framework/src/main/java/org/elasticsearch/telemetry/Measurement.java +++ b/test/framework/src/main/java/org/elasticsearch/telemetry/Measurement.java @@ -9,11 +9,13 @@ package org.elasticsearch.telemetry; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -66,4 +68,40 @@ public static List combine(List measurements) { .map(entry -> new Measurement(entry.getValue(), entry.getKey(), isDouble)) .collect(Collectors.toList()); } + + /** + * Turn a list of {@link org.elasticsearch.telemetry.Measurement} into a list of its Long or Double + * + * @param measurements The measurements in question + * @param getMeasurementValue The measurement -> value (Long or Double) retrieval function + */ + public static List getMeasurementValues(List measurements, Function getMeasurementValue) { + List measurementValues = new ArrayList(measurements.size()); + for (Measurement measurement : measurements) { + T measurementValue = getMeasurementValue.apply(measurement); + measurementValues.add(measurementValue); + } + return measurementValues; + } + + /** + * Groups a list of {@link org.elasticsearch.telemetry.Measurement} by their attribute values + * + * @param measurements The measurements + * @param getAttribute The attribute retrieval function. This must cast from Object to its return type + * @param getMeasurementValue The measurement -> value (Long or Double) retrieval function + */ + public static Map> groupMeasurementsByAttribute( + List measurements, + Function, Attr> getAttribute, + Function getMeasurementValue + ) { + Map> measurementsByNode = new HashMap<>(); + for (Measurement measurement : measurements) { + Attr attr = getAttribute.apply(measurement.attributes()); + List nodeMeasurements = measurementsByNode.computeIfAbsent(attr, (k -> new ArrayList<>())); + nodeMeasurements.add(getMeasurementValue.apply(measurement)); + } + return measurementsByNode; + } }