Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more desired balance stats #102065

Merged
merged 13 commits into from
Nov 15, 2023
5 changes: 5 additions & 0 deletions docs/changelog/102065.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 102065
summary: Add more desired balance stats
area: Allocation
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -186,3 +186,18 @@ setup:
- exists: 'cluster_balance_stats.tiers.data_content.undesired_shard_allocation_count.max'
- exists: 'cluster_balance_stats.tiers.data_content.undesired_shard_allocation_count.average'
- exists: 'cluster_balance_stats.tiers.data_content.undesired_shard_allocation_count.std_dev'

---
"Test unassigned_shards, total_allocations, undesired_allocations and undesired_allocations_fraction":

- skip:
version: " - 8.11.99"
reason: "undesired_shard_allocation_count added in in 8.12.0"

- do:
_internal.get_desired_balance: { }

- gte: { 'stats.unassigned_shards' : 0 }
kingherc marked this conversation as resolved.
Show resolved Hide resolved
- gte: { 'stats.total_allocations' : 0 }
- gte: { 'stats.undesired_allocations' : 0 }
- gte: { 'stats.undesired_allocations_fraction' : 0.0 }
Original file line number Diff line number Diff line change
Expand Up @@ -221,3 +221,18 @@ setup:
- exists: 'cluster_balance_stats.tiers.data_content.undesired_shard_allocation_count.max'
- exists: 'cluster_balance_stats.tiers.data_content.undesired_shard_allocation_count.average'
- exists: 'cluster_balance_stats.tiers.data_content.undesired_shard_allocation_count.std_dev'

---
"Test unassigned_shards, total_allocations, undesired_allocations and undesired_allocations_fraction":

- skip:
version: " - 8.11.99"
reason: "undesired_shard_allocation_count added in in 8.12.0"

- do:
_internal.get_desired_balance: { }

- gte: { 'stats.unassigned_shards' : 0 }
- gte: { 'stats.total_allocations' : 0 }
- gte: { 'stats.undesired_allocations' : 0 }
- gte: { 'stats.undesired_allocations_fraction' : 0.0 }
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ static TransportVersion def(int id) {
public static final TransportVersion UPDATE_NON_DYNAMIC_SETTINGS_ADDED = def(8_533_00_0);
public static final TransportVersion REPO_ANALYSIS_REGISTER_OP_COUNT_ADDED = def(8_534_00_0);
public static final TransportVersion ML_TRAINED_MODEL_PREFIX_STRINGS_ADDED = def(8_535_00_0);
public static final TransportVersion ADDITIONAL_DESIRED_BALANCE_RECONCILIATION_STATS = def(8_536_00_0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's now another 8_536 in main, so please re-adjust before merging.

/*
* STOP! READ THIS FIRST! No, really,
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class DesiredBalanceComputer {
private final ThreadPool threadPool;
private final ShardsAllocator delegateAllocator;

// stats
protected final MeanMetric iterations = new MeanMetric();

public static final Setting<TimeValue> PROGRESS_LOG_INTERVAL_SETTING = Setting.timeSetting(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Comparator;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -69,13 +70,29 @@ public class DesiredBalanceReconciler {
private final NodeAllocationOrdering allocationOrdering = new NodeAllocationOrdering();
private final NodeAllocationOrdering moveOrdering = new NodeAllocationOrdering();

// stats
private final DesiredBalanceReconcilerStats desiredBalanceReconcilerStats;
/**
* Number of unassigned shards during last reconciliation
*/
protected final AtomicInteger unassignedShards = new AtomicInteger();
/**
* Total number of assigned shards during last reconciliation
*/
protected final AtomicInteger totalAllocations = new AtomicInteger();
/**
* Number of assigned shards during last reconciliation that are not allocated on desired node and need to be moved
*/
protected final AtomicInteger undesiredAllocations = new AtomicInteger();

public DesiredBalanceReconciler(ClusterSettings clusterSettings, ThreadPool threadPool) {
this.undesiredAllocationLogInterval = new FrequencyCappedAction(threadPool);
clusterSettings.initializeAndWatch(UNDESIRED_ALLOCATIONS_LOG_INTERVAL_SETTING, this.undesiredAllocationLogInterval::setMinInterval);
clusterSettings.initializeAndWatch(
UNDESIRED_ALLOCATIONS_LOG_THRESHOLD_SETTING,
value -> this.undesiredAllocationsLogThreshold = value
);
this.desiredBalanceReconcilerStats = new DesiredBalanceReconcilerStats(threadPool);
}

public void reconcile(DesiredBalance desiredBalance, RoutingAllocation allocation) {
Expand Down Expand Up @@ -445,16 +462,17 @@ private void balance() {
return;
}

long allAllocations = 0;
long undesiredAllocations = 0;
int unassignedShards = routingNodes.unassigned().size() + routingNodes.unassigned().ignored().size();
int totalAllocations = 0;
int undesiredAllocations = 0;

// Iterate over all started shards and try to move any which are on undesired nodes. In the presence of throttling shard
// movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are offloading the
// shards.
for (final var iterator = OrderedShardsIterator.create(routingNodes, moveOrdering); iterator.hasNext();) {
final var shardRouting = iterator.next();

allAllocations++;
totalAllocations++;

if (shardRouting.started() == false) {
// can only rebalance started shards
Expand Down Expand Up @@ -504,10 +522,15 @@ private void balance() {
}
}

maybeLogUndesiredAllocationsWarning(allAllocations, undesiredAllocations, routingNodes.size());
DesiredBalanceReconciler.this.unassignedShards.set(unassignedShards);
DesiredBalanceReconciler.this.undesiredAllocations.set(undesiredAllocations);
DesiredBalanceReconciler.this.totalAllocations.set(totalAllocations);
kingherc marked this conversation as resolved.
Show resolved Hide resolved

desiredBalanceReconcilerStats.logUndesiredAllocationsMetrics(totalAllocations, undesiredAllocations);
maybeLogUndesiredAllocationsWarning(totalAllocations, undesiredAllocations, routingNodes.size());
}

private void maybeLogUndesiredAllocationsWarning(long allAllocations, long undesiredAllocations, int nodeCount) {
private void maybeLogUndesiredAllocationsWarning(int allAllocations, int undesiredAllocations, int nodeCount) {
// more shards than cluster can relocate with one reroute
final boolean nonEmptyRelocationBacklog = undesiredAllocations > 2L * nodeCount;
final boolean warningThresholdReached = undesiredAllocations > undesiredAllocationsLogThreshold * allAllocations;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.cluster.routing.allocation.allocator;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.logging.ESLogMessage;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;

public class DesiredBalanceReconcilerStats {

private static final Logger logger = LogManager.getLogger(DesiredBalanceReconcilerStats.class);

private final FrequencyCappedAction logReconciliationMetrics;

public DesiredBalanceReconcilerStats(ThreadPool threadPool) {
this.logReconciliationMetrics = new FrequencyCappedAction(threadPool);
this.logReconciliationMetrics.setMinInterval(TimeValue.timeValueMinutes(30));
}

public void logUndesiredAllocationsMetrics(int allAllocations, int undesiredAllocations) {
logReconciliationMetrics.maybeExecute(
() -> logger.debug(
new ESLogMessage("DesiredBalanceReconciler stats") //
.field(
"allocator.desired_balance.reconciliation.undesired_allocations_fraction",
(double) undesiredAllocations / allAllocations
)
.field("allocator.desired_balance.reconciliation.undesired_allocations", undesiredAllocations)
.field("allocator.desired_balance.reconciliation.total_allocations", allAllocations)
)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,10 @@ public DesiredBalanceStats getStats() {
desiredBalanceComputer.iterations.sum(),
computedShardMovements.sum(),
cumulativeComputationTime.count(),
cumulativeReconciliationTime.count()
cumulativeReconciliationTime.count(),
desiredBalanceReconciler.unassignedShards.get(),
desiredBalanceReconciler.totalAllocations.get(),
desiredBalanceReconciler.undesiredAllocations.get()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import java.io.IOException;

import static org.elasticsearch.TransportVersions.ADDITIONAL_DESIRED_BALANCE_RECONCILIATION_STATS;

public record DesiredBalanceStats(
long lastConvergedIndex,
boolean computationActive,
Expand All @@ -28,7 +30,10 @@ public record DesiredBalanceStats(
long computationIterations,
long computedShardMovements,
long cumulativeComputationTime,
long cumulativeReconciliationTime
long cumulativeReconciliationTime,
int unassignedShards,
int totalAllocations,
int undesiredAllocations
) implements Writeable, ToXContentObject {

private static final TransportVersion COMPUTED_SHARD_MOVEMENTS_VERSION = TransportVersions.V_8_8_0;
Expand All @@ -50,7 +55,10 @@ public static DesiredBalanceStats readFrom(StreamInput in) throws IOException {
in.readVLong(),
in.getTransportVersion().onOrAfter(COMPUTED_SHARD_MOVEMENTS_VERSION) ? in.readVLong() : -1,
in.readVLong(),
in.readVLong()
in.readVLong(),
in.getTransportVersion().onOrAfter(ADDITIONAL_DESIRED_BALANCE_RECONCILIATION_STATS) ? in.readVInt() : -1,
in.getTransportVersion().onOrAfter(ADDITIONAL_DESIRED_BALANCE_RECONCILIATION_STATS) ? in.readVInt() : -1,
in.getTransportVersion().onOrAfter(ADDITIONAL_DESIRED_BALANCE_RECONCILIATION_STATS) ? in.readVInt() : -1
);
}

Expand All @@ -67,6 +75,11 @@ public void writeTo(StreamOutput out) throws IOException {
}
out.writeVLong(cumulativeComputationTime);
out.writeVLong(cumulativeReconciliationTime);
if (out.getTransportVersion().onOrAfter(ADDITIONAL_DESIRED_BALANCE_RECONCILIATION_STATS)) {
out.writeVInt(unassignedShards);
out.writeVInt(totalAllocations);
out.writeVInt(undesiredAllocations);
}
}

@Override
Expand All @@ -81,7 +94,15 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field("computed_shard_movements", computedShardMovements);
builder.humanReadableField("computation_time_in_millis", "computation_time", new TimeValue(cumulativeComputationTime));
builder.humanReadableField("reconciliation_time_in_millis", "reconciliation_time", new TimeValue(cumulativeReconciliationTime));
builder.field("unassigned_shards", unassignedShards);
builder.field("total_allocations", totalAllocations);
builder.field("undesired_allocations", undesiredAllocations);
builder.field("undesired_allocations_fraction", undesiredAllocationsFraction());
builder.endObject();
return builder;
}

public double undesiredAllocationsFraction() {
return (double) undesiredAllocations / totalAllocations;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit for versions before ADDITIONAL_DESIRED_BALANCE_RECONCILIATION_STATS, this will be -1/-1 = 1 = 100%. I do not think it is a problem, but if we do not want that, we could extend the logic to return 0% in case of negative values. Similar comment for logUndesiredAllocationsMetrics().

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.allocator.ClusterBalanceStats;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceStats;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceStatsTests;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.test.AbstractChunkedSerializingTestCase;
Expand All @@ -30,6 +30,7 @@

import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toMap;
import static org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceStatsTests.randomDesiredBalanceStats;
import static org.hamcrest.Matchers.containsInAnyOrder;

public class DesiredBalanceResponseTests extends AbstractWireSerializingTestCase<DesiredBalanceResponse> {
Expand All @@ -49,20 +50,6 @@ protected DesiredBalanceResponse createTestInstance() {
);
}

private DesiredBalanceStats randomDesiredBalanceStats() {
return new DesiredBalanceStats(
randomNonNegativeLong(),
randomBoolean(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong()
);
}

private ClusterBalanceStats randomClusterBalanceStats() {
return new ClusterBalanceStats(
randomNonNegativeInt(),
Expand Down Expand Up @@ -156,7 +143,7 @@ private Map<String, Map<Integer, DesiredBalanceResponse.DesiredShards>> randomRo
protected DesiredBalanceResponse mutateInstance(DesiredBalanceResponse instance) {
return switch (randomInt(4)) {
case 0 -> new DesiredBalanceResponse(
randomValueOtherThan(instance.getStats(), this::randomDesiredBalanceStats),
randomValueOtherThan(instance.getStats(), DesiredBalanceStatsTests::randomDesiredBalanceStats),
instance.getClusterBalanceStats(),
instance.getRoutingTable(),
instance.getClusterInfo()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import static org.elasticsearch.cluster.ClusterModule.BALANCED_ALLOCATOR;
import static org.elasticsearch.cluster.ClusterModule.DESIRED_BALANCE_ALLOCATOR;
import static org.elasticsearch.cluster.ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING;
import static org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceStatsTests.randomDesiredBalanceStats;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -219,17 +220,7 @@ public void testGetDesiredBalance() throws Exception {
}

when(desiredBalanceShardsAllocator.getDesiredBalance()).thenReturn(new DesiredBalance(randomInt(1024), shardAssignments));
DesiredBalanceStats desiredBalanceStats = new DesiredBalanceStats(
randomInt(Integer.MAX_VALUE),
randomBoolean(),
randomInt(Integer.MAX_VALUE),
randomInt(Integer.MAX_VALUE),
randomInt(Integer.MAX_VALUE),
randomInt(Integer.MAX_VALUE),
randomInt(Integer.MAX_VALUE),
randomInt(Integer.MAX_VALUE),
randomInt(Integer.MAX_VALUE)
);
DesiredBalanceStats desiredBalanceStats = randomDesiredBalanceStats();
when(desiredBalanceShardsAllocator.getStats()).thenReturn(desiredBalanceStats);
ClusterInfo clusterInfo = ClusterInfo.EMPTY;
when(clusterInfoService.getClusterInfo()).thenReturn(clusterInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;

import java.util.Locale;

import static org.hamcrest.Matchers.equalTo;

public class DesiredBalanceStatsTests extends AbstractWireSerializingTestCase<DesiredBalanceStats> {
Expand All @@ -25,6 +23,10 @@ protected Writeable.Reader<DesiredBalanceStats> instanceReader() {

@Override
protected DesiredBalanceStats createTestInstance() {
return randomDesiredBalanceStats();
}

public static DesiredBalanceStats randomDesiredBalanceStats() {
return new DesiredBalanceStats(
randomNonNegativeLong(),
randomBoolean(),
Expand All @@ -34,7 +36,10 @@ protected DesiredBalanceStats createTestInstance() {
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong()
randomNonNegativeLong(),
randomNonNegativeInt(),
randomNonNegativeInt(),
randomNonNegativeInt()
);
}

Expand All @@ -48,8 +53,7 @@ public void testToXContent() {
assertThat(
Strings.toString(instance, true, false),
equalTo(
String.format(
Locale.ROOT,
Strings.format(
"""
{
"computation_converged_index" : %d,
Expand All @@ -60,9 +64,12 @@ public void testToXContent() {
"computation_iterations" : %d,
"computed_shard_movements" : %d,
"computation_time_in_millis" : %d,
"reconciliation_time_in_millis" : %d
"reconciliation_time_in_millis" : %d,
"unassigned_shards" : %d,
"total_allocations" : %d,
"undesired_allocations" : %d,
"undesired_allocations_fraction" : %s
kingherc marked this conversation as resolved.
Show resolved Hide resolved
}""",

instance.lastConvergedIndex(),
instance.computationActive(),
instance.computationSubmitted(),
Expand All @@ -71,7 +78,11 @@ public void testToXContent() {
instance.computationIterations(),
instance.computedShardMovements(),
instance.cumulativeComputationTime(),
instance.cumulativeReconciliationTime()
instance.cumulativeReconciliationTime(),
instance.unassignedShards(),
instance.totalAllocations(),
instance.undesiredAllocations(),
Double.toString(instance.undesiredAllocationsFraction())
)
)
);
Expand Down