Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
6704f4f
allocation: add balancer round summary as metrics
schase-es Oct 6, 2025
99844ad
[CI] Auto commit changes from spotless
Oct 6, 2025
dbce2bd
Update docs/changelog/136043.yaml
schase-es Oct 8, 2025
840b003
Added enableSending flag in, and some renames
schase-es Oct 9, 2025
5a94657
Added metrics consolidation, correct diff calculation, and some tests.
schase-es Oct 13, 2025
c7195fc
[CI] Auto commit changes from spotless
Oct 13, 2025
fec6131
Merge branch 'main' into ES-10343_balancer-round-apm-export
schase-es Oct 27, 2025
4d1b9db
Changing metrics for NodeWeightsChanges and NodeWeightsDiff to use a …
schase-es Oct 27, 2025
1b149ef
Remove extra meter registry
schase-es Nov 3, 2025
41a2c72
Send absolute value of diff instead of last + diff
schase-es Nov 3, 2025
26f9c14
Adding shard moves histogram
schase-es Nov 7, 2025
40bf700
[CI] Auto commit changes from spotless
Nov 7, 2025
dad8264
Fixes to metrics names and summaries
schase-es Nov 10, 2025
28dbf90
Move of DiscoveryNode as key in BalancingRoundSummary.nodeToWeightCha…
schase-es Nov 10, 2025
d89a03c
Renaming nodeNameToWeightChanges to nodeToWeightChanges
schase-es Nov 10, 2025
ba786de
[CI] Auto commit changes from spotless
Nov 10, 2025
85e4919
CombinedBalancingRoundSummary prints DiscoveryNode name instead of en…
schase-es Nov 11, 2025
52c1ce7
Math.abs on a long is forbidden. Use *= -1 instead
schase-es Nov 11, 2025
cb6a196
Style fixes
schase-es Nov 11, 2025
d0bb2d3
Move of Math.abs on long into suppressed method
schase-es Nov 11, 2025
a650365
Use DiscoveryNodeUtils to make a node
schase-es Nov 11, 2025
ac9ee89
Formatting fixes
schase-es Nov 11, 2025
adb1741
Adding assertion for Long.MIN_VALUE, better comments and names
schase-es Nov 12, 2025
6eefe31
Merge branch 'main' into ES-10343_balancer-round-apm-export
schase-es Nov 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/136043.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 136043
summary: "Allocation: add balancer round summary as metrics"
area: Allocation
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision;
import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster;
import org.elasticsearch.cluster.routing.allocation.allocator.AllocationBalancingRoundMetrics;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancerSettings;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancingWeightsFactory;
Expand Down Expand Up @@ -141,6 +142,7 @@ public class ClusterModule extends AbstractModule {
private final AllocationStatsService allocationStatsService;
private final TelemetryProvider telemetryProvider;
private final DesiredBalanceMetrics desiredBalanceMetrics;
private final AllocationBalancingRoundMetrics balancingRoundMetrics;

public ClusterModule(
Settings settings,
Expand Down Expand Up @@ -168,6 +170,7 @@ public ClusterModule(
balancingWeightsFactory
);
this.desiredBalanceMetrics = new DesiredBalanceMetrics(telemetryProvider.getMeterRegistry());
this.balancingRoundMetrics = new AllocationBalancingRoundMetrics(telemetryProvider.getMeterRegistry());
this.shardsAllocator = createShardsAllocator(
settings,
clusterService.getClusterSettings(),
Expand All @@ -180,7 +183,8 @@ public ClusterModule(
writeLoadForecaster,
nodeAllocationStatsAndWeightsCalculator,
this::explainShardAllocation,
desiredBalanceMetrics
desiredBalanceMetrics,
balancingRoundMetrics
);
this.clusterService = clusterService;
this.indexNameExpressionResolver = new IndexNameExpressionResolver(threadPool.getThreadContext(), systemIndices, projectResolver);
Expand Down Expand Up @@ -521,7 +525,8 @@ private static ShardsAllocator createShardsAllocator(
WriteLoadForecaster writeLoadForecaster,
NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator,
ShardAllocationExplainer shardAllocationExplainer,
DesiredBalanceMetrics desiredBalanceMetrics
DesiredBalanceMetrics desiredBalanceMetrics,
AllocationBalancingRoundMetrics balancingRoundMetrics
) {
Map<String, Supplier<ShardsAllocator>> allocators = new HashMap<>();
allocators.put(
Expand All @@ -538,7 +543,8 @@ private static ShardsAllocator createShardsAllocator(
reconciler,
nodeAllocationStatsAndWeightsCalculator,
shardAllocationExplainer,
desiredBalanceMetrics
desiredBalanceMetrics,
balancingRoundMetrics
)
);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster.routing.allocation.allocator;

import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancingRoundSummary.NodesWeightsChanges;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.telemetry.metric.DoubleHistogram;
import org.elasticsearch.telemetry.metric.LongCounter;
import org.elasticsearch.telemetry.metric.LongHistogram;
import org.elasticsearch.telemetry.metric.MeterRegistry;

import java.util.Map;

/**
* A telemetry metrics sender for {@link BalancingRoundSummary}
*/
public class AllocationBalancingRoundMetrics {

// counters that measure rounds and moves from the last balancing round
public static final String NUMBER_OF_BALANCING_ROUNDS_METRIC_NAME = "es.allocator.balancing_round.balancing_rounds.total";
public static final String NUMBER_OF_SHARD_MOVES_METRIC_NAME = "es.allocator.balancing_round.shard_moves.total";
public static final String NUMBER_OF_SHARD_MOVES_HISTOGRAM_METRIC_NAME = "es.allocator.balancing_round.shard_moves.histogram";

// histograms that measure current utilization
public static final String NUMBER_OF_SHARDS_METRIC_NAME = "es.allocator.balancing_round.shard_count.histogram";
public static final String DISK_USAGE_BYTES_METRIC_NAME = "es.allocator.balancing_round.disk_usage_bytes.histogram";
public static final String WRITE_LOAD_METRIC_NAME = "es.allocator.balancing_round.write_load.histogram";
public static final String TOTAL_WEIGHT_METRIC_NAME = "es.allocator.balancing_round.total_weight.histogram";

private final LongCounter balancingRoundCounter;
private final LongCounter shardMovesCounter;
private final LongHistogram shardMovesHistogram;

private final LongHistogram shardCountHistogram;
private final DoubleHistogram diskUsageHistogram;
private final DoubleHistogram writeLoadHistogram;
private final DoubleHistogram totalWeightHistogram;

public static AllocationBalancingRoundMetrics NOOP = new AllocationBalancingRoundMetrics(MeterRegistry.NOOP);

public AllocationBalancingRoundMetrics(MeterRegistry meterRegistry) {
this.balancingRoundCounter = meterRegistry.registerLongCounter(
NUMBER_OF_BALANCING_ROUNDS_METRIC_NAME,
"Total number of balancing rounds",
"unit"
);
this.shardMovesCounter = meterRegistry.registerLongCounter(
NUMBER_OF_SHARD_MOVES_METRIC_NAME,
"Total number of shard moves",
"unit"
);

this.shardMovesHistogram = meterRegistry.registerLongHistogram(
NUMBER_OF_SHARD_MOVES_HISTOGRAM_METRIC_NAME,
"Number of shard movements executed in a balancing round",
"unit"
);
this.shardCountHistogram = meterRegistry.registerLongHistogram(
NUMBER_OF_SHARDS_METRIC_NAME,
"change in node shard count per balancing round",
"unit"
);
this.diskUsageHistogram = meterRegistry.registerDoubleHistogram(
DISK_USAGE_BYTES_METRIC_NAME,
"change in disk usage in bytes per balancing round",
"unit"
);
this.writeLoadHistogram = meterRegistry.registerDoubleHistogram(
WRITE_LOAD_METRIC_NAME,
"change in write load per balancing round",
"1.0"
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we discuss having a histogram for shard moves? To avoid the combine effect, merging multiple balancing rounds together 🤔 I can't recall what we decided.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nick and I talked about it last week again, and we decided to send the absolute difference with each summary. (The comment I made that seemed out of place was to create a link for Nick to see.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should have a histogram for shard moves as well, those differences are per-node, so we'll see how many shards moved on/off each node, but I think having a histogram of total shard move count would be good too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have the shard moves per node as a stat right now, so we can't have that :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The high level conclusion Simon and I were discussing is that generally histograms are going to be more useful for the balancer round summary metrics because our goal is to see per balancing round granularity, as best we can. We don't have a use case for wanting to see how many total shard moves take place over time, but we do want to see roughly how many shard moves happen per balancing round, and the histogram will give us that.

So let's go with a histogram and remove the counter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should have a histogram for shard moves as well, those differences are per-node, so we'll see how many shards moved on/off each node.

Right now, these metrics aren't available in the balancing round summary. We can certainly try to get them out of the Desired Balancer and get them in through here.

this.totalWeightHistogram = meterRegistry.registerDoubleHistogram(
TOTAL_WEIGHT_METRIC_NAME,
"change in total weight per balancing round",
"1.0"
);
}

@SuppressForbidden(reason = "ForbiddenAPIs bans Math.abs(long) because of overflow on Long.MIN_VALUE, but this is impossible here")
private long longAbsNegativeSafe(long value) {
assert value != Long.MIN_VALUE : "value must not be Long.MIN_VALUE";
return Math.abs(value);
}

public void addBalancingRoundSummary(BalancingRoundSummary summary) {
balancingRoundCounter.increment();
shardMovesCounter.incrementBy(summary.numberOfShardsToMove());
shardMovesHistogram.record(summary.numberOfShardsToMove());

for (Map.Entry<DiscoveryNode, NodesWeightsChanges> changesEntry : summary.nodeToWeightChanges().entrySet()) {
DiscoveryNode node = changesEntry.getKey();
NodesWeightsChanges weightChanges = changesEntry.getValue();
BalancingRoundSummary.NodeWeightsDiff weightsDiff = weightChanges.weightsDiff();

shardCountHistogram.record(longAbsNegativeSafe(weightsDiff.shardCountDiff()), getNodeAttributes(node));
diskUsageHistogram.record(Math.abs(weightsDiff.diskUsageInBytesDiff()), getNodeAttributes(node));
writeLoadHistogram.record(Math.abs(weightsDiff.writeLoadDiff()), getNodeAttributes(node));
totalWeightHistogram.record(Math.abs(weightsDiff.totalWeightDiff()), getNodeAttributes(node));
}
}

private Map<String, Object> getNodeAttributes(DiscoveryNode node) {
return Map.of("node_name", node.getName(), "node_id", node.getId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.core.TimeValue;
Expand Down Expand Up @@ -58,6 +59,7 @@ public class AllocationBalancingRoundSummaryService {
private final ThreadPool threadPool;
private volatile boolean enableBalancerRoundSummaries;
private volatile TimeValue summaryReportInterval;
private final AllocationBalancingRoundMetrics balancingRoundMetrics;

/**
* A concurrency-safe list of balancing round summaries. Balancer rounds are run and added here serially, so the queue will naturally
Expand All @@ -68,12 +70,17 @@ public class AllocationBalancingRoundSummaryService {
/** This reference is set when reporting is scheduled. If it is null, then reporting is inactive. */
private final AtomicReference<Scheduler.Cancellable> scheduledReportFuture = new AtomicReference<>();

public AllocationBalancingRoundSummaryService(ThreadPool threadPool, ClusterSettings clusterSettings) {
public AllocationBalancingRoundSummaryService(
ThreadPool threadPool,
ClusterSettings clusterSettings,
AllocationBalancingRoundMetrics balancingRoundMetrics
) {
this.threadPool = threadPool;
// Initialize the local setting values to avoid a null access when ClusterSettings#initializeAndWatch is called on each setting:
// updating enableBalancerRoundSummaries accesses summaryReportInterval.
this.enableBalancerRoundSummaries = clusterSettings.get(ENABLE_BALANCER_ROUND_SUMMARIES_SETTING);
this.summaryReportInterval = clusterSettings.get(BALANCER_ROUND_SUMMARIES_LOG_INTERVAL_SETTING);
this.balancingRoundMetrics = balancingRoundMetrics;

clusterSettings.initializeAndWatch(ENABLE_BALANCER_ROUND_SUMMARIES_SETTING, value -> {
this.enableBalancerRoundSummaries = value;
Expand All @@ -99,14 +106,14 @@ public static BalancingRoundSummary createBalancerRoundSummary(DesiredBalance ol
* Creates a summary of the node weight changes from {@code oldDesiredBalance} to {@code newDesiredBalance}.
* See {@link BalancingRoundSummary.NodesWeightsChanges} for content details.
*/
private static Map<String, BalancingRoundSummary.NodesWeightsChanges> createWeightsSummary(
private static Map<DiscoveryNode, BalancingRoundSummary.NodesWeightsChanges> createWeightsSummary(
DesiredBalance oldDesiredBalance,
DesiredBalance newDesiredBalance
) {
var oldWeightsPerNode = oldDesiredBalance.weightsPerNode();
var newWeightsPerNode = newDesiredBalance.weightsPerNode();

Map<String, BalancingRoundSummary.NodesWeightsChanges> nodeNameToWeightInfo = new HashMap<>(oldWeightsPerNode.size());
Map<DiscoveryNode, BalancingRoundSummary.NodesWeightsChanges> nodeNameToWeightInfo = new HashMap<>(oldWeightsPerNode.size());
for (var nodeAndWeights : oldWeightsPerNode.entrySet()) {
var discoveryNode = nodeAndWeights.getKey();
var oldNodeWeightStats = nodeAndWeights.getValue();
Expand All @@ -116,7 +123,7 @@ private static Map<String, BalancingRoundSummary.NodesWeightsChanges> createWeig
var newNodeWeightStats = newWeightsPerNode.getOrDefault(discoveryNode, DesiredBalanceMetrics.NodeWeightStats.ZERO);

nodeNameToWeightInfo.put(
discoveryNode.getName(),
discoveryNode,
new BalancingRoundSummary.NodesWeightsChanges(
oldNodeWeightStats,
BalancingRoundSummary.NodeWeightsDiff.create(oldNodeWeightStats, newNodeWeightStats)
Expand All @@ -128,11 +135,11 @@ private static Map<String, BalancingRoundSummary.NodesWeightsChanges> createWeig
// the new DesiredBalance to check.
for (var nodeAndWeights : newWeightsPerNode.entrySet()) {
var discoveryNode = nodeAndWeights.getKey();
if (nodeNameToWeightInfo.containsKey(discoveryNode.getName()) == false) {
if (nodeNameToWeightInfo.containsKey(discoveryNode) == false) {
// This node is new in the new DesiredBalance, there was no entry added during iteration of the nodes in the old
// DesiredBalance. So we'll make a new entry with a base of zero value weights and a weights diff of the new node's weights.
nodeNameToWeightInfo.put(
discoveryNode.getName(),
discoveryNode,
new BalancingRoundSummary.NodesWeightsChanges(
DesiredBalanceMetrics.NodeWeightStats.ZERO,
BalancingRoundSummary.NodeWeightsDiff.create(DesiredBalanceMetrics.NodeWeightStats.ZERO, nodeAndWeights.getValue())
Expand Down Expand Up @@ -164,6 +171,7 @@ public void addBalancerRoundSummary(BalancingRoundSummary summary) {
}

summaries.add(summary);
balancingRoundMetrics.addBalancingRoundSummary(summary);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,22 @@

package org.elasticsearch.cluster.routing.allocation.allocator;

import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.core.Strings;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Summarizes the impact to the cluster as a result of a rebalancing round.
*
* @param nodeNameToWeightChanges The shard balance weight changes for each node (by name), comparing a previous DesiredBalance shard
* @param nodeToWeightChanges The shard balance weight changes for each DiscoveryNode, comparing a previous DesiredBalance shard
* allocation to a new DesiredBalance allocation.
* @param numberOfShardsToMove The number of shard moves required to move from the previous desired balance to the new one. Does not include
* new (index creation) or removed (index deletion) shard assignements.
*/
public record BalancingRoundSummary(Map<String, NodesWeightsChanges> nodeNameToWeightChanges, long numberOfShardsToMove) {
public record BalancingRoundSummary(Map<DiscoveryNode, NodesWeightsChanges> nodeToWeightChanges, long numberOfShardsToMove) {

/**
* Represents the change in weights for a node going from an old DesiredBalance to a new DesiredBalance
Expand Down Expand Up @@ -75,8 +78,8 @@ public NodeWeightsDiff combine(NodeWeightsDiff otherDiff) {
@Override
public String toString() {
return "BalancingRoundSummary{"
+ "nodeNameToWeightChanges"
+ nodeNameToWeightChanges
+ "nodeToWeightChanges"
+ nodeToWeightChanges
+ ", numberOfShardsToMove="
+ numberOfShardsToMove
+ '}';
Expand All @@ -93,17 +96,34 @@ public String toString() {
* latest desired balance.
*
* @param numberOfBalancingRounds How many balancing round summaries are combined in this report.
* @param nodeNameToWeightChanges
* @param nodeToWeightChanges
* @param numberOfShardMoves The sum of shard moves for each balancing round being combined into a single summary.
*/
public record CombinedBalancingRoundSummary(
int numberOfBalancingRounds,
Map<String, NodesWeightsChanges> nodeNameToWeightChanges,
Map<DiscoveryNode, NodesWeightsChanges> nodeToWeightChanges,
long numberOfShardMoves
) {

public static final CombinedBalancingRoundSummary EMPTY_RESULTS = new CombinedBalancingRoundSummary(0, new HashMap<>(), 0);

/**
* Serialize the CombinedBalancingRoundSummary to a compact log representation, where {@link DiscoveryNode#getName()} is used
* instead of the entire {@link DiscoveryNode#toString()} method.
*/
@Override
public String toString() {
Map<String, NodesWeightsChanges> nodeNameToWeightChanges = new HashMap<>(nodeToWeightChanges.size());
nodeToWeightChanges.forEach((node, nodesWeightChanges) -> nodeNameToWeightChanges.put(node.getName(), nodesWeightChanges));

return Strings.format(
"CombinedBalancingRoundSummary[numberOfBalancingRounds=%d, nodeToWeightChange=%s, numberOfShardMoves=%d]",
numberOfBalancingRounds,
nodeNameToWeightChanges,
numberOfShardMoves
);
}

/**
* Merges multiple {@link BalancingRoundSummary} summaries into a single {@link CombinedBalancingRoundSummary}.
*/
Expand All @@ -113,7 +133,7 @@ public static CombinedBalancingRoundSummary combine(List<BalancingRoundSummary>
}

// We will loop through the summaries and sum the weight diffs for each node entry.
Map<String, NodesWeightsChanges> combinedNodeNameToWeightChanges = new HashMap<>();
Map<DiscoveryNode, NodesWeightsChanges> combinedNodeNameToWeightChanges = new HashMap<>();

// Number of shards moves are simply summed across summaries. Each new balancing round is built upon the last one, so it is
// possible that a shard is reassigned back to a node before it even moves away, and that will still be counted as 2 moves here.
Expand All @@ -128,7 +148,7 @@ public static CombinedBalancingRoundSummary combine(List<BalancingRoundSummary>

// We'll build the weight changes by keeping the node weight base from the first summary in which a node appears and then
// summing the weight diffs in each summary to get total weight diffs across summaries.
for (var nodeNameAndWeights : summary.nodeNameToWeightChanges.entrySet()) {
for (var nodeNameAndWeights : summary.nodeToWeightChanges.entrySet()) {
var combined = combinedNodeNameToWeightChanges.get(nodeNameAndWeights.getKey());
if (combined == null) {
// Either this is the first summary, and combinedNodeNameToWeightChanges hasn't been initialized yet for this node;
Expand Down
Loading