Skip to content

Commit

Permalink
Add more desired balance stats (#102065)
Browse files Browse the repository at this point in the history
This change expose amount of total and desired allocations reconciled during
last reroute.
  • Loading branch information
idegtiarenko committed Nov 15, 2023
1 parent 35a7d99 commit 842e563
Show file tree
Hide file tree
Showing 11 changed files with 118 additions and 42 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/102065.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 102065
summary: Add more desired balance stats
area: Allocation
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -186,3 +186,18 @@ setup:
- exists: 'cluster_balance_stats.tiers.data_content.undesired_shard_allocation_count.max'
- exists: 'cluster_balance_stats.tiers.data_content.undesired_shard_allocation_count.average'
- exists: 'cluster_balance_stats.tiers.data_content.undesired_shard_allocation_count.std_dev'

---
"Test unassigned_shards, total_allocations, undesired_allocations and undesired_allocations_fraction":

- skip:
version: " - 8.11.99"
reason: "undesired_shard_allocation_count added in in 8.12.0"

- do:
_internal.get_desired_balance: { }

- gte: { 'stats.unassigned_shards' : 0 }
- gte: { 'stats.total_allocations' : 0 }
- gte: { 'stats.undesired_allocations' : 0 }
- gte: { 'stats.undesired_allocations_fraction' : 0.0 }
Original file line number Diff line number Diff line change
Expand Up @@ -221,3 +221,18 @@ setup:
- exists: 'cluster_balance_stats.tiers.data_content.undesired_shard_allocation_count.max'
- exists: 'cluster_balance_stats.tiers.data_content.undesired_shard_allocation_count.average'
- exists: 'cluster_balance_stats.tiers.data_content.undesired_shard_allocation_count.std_dev'

---
"Test unassigned_shards, total_allocations, undesired_allocations and undesired_allocations_fraction":

- skip:
version: " - 8.11.99"
reason: "undesired_shard_allocation_count added in in 8.12.0"

- do:
_internal.get_desired_balance: { }

- gte: { 'stats.unassigned_shards' : 0 }
- gte: { 'stats.total_allocations' : 0 }
- gte: { 'stats.undesired_allocations' : 0 }
- gte: { 'stats.undesired_allocations_fraction' : 0.0 }
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ static TransportVersion def(int id) {
public static final TransportVersion COUNTED_KEYWORD_ADDED = def(8_536_00_0);
public static final TransportVersion SHAPE_VALUE_SERIALIZATION_ADDED = def(8_537_00_0);
public static final TransportVersion INFERENCE_MULTIPLE_INPUTS = def(8_538_00_0);
public static final TransportVersion ADDITIONAL_DESIRED_BALANCE_RECONCILIATION_STATS = def(8_539_00_0);
/*
* STOP! READ THIS FIRST! No, really,
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class DesiredBalanceComputer {
private final ThreadPool threadPool;
private final ShardsAllocator delegateAllocator;

// stats
protected final MeanMetric iterations = new MeanMetric();

public static final Setting<TimeValue> PROGRESS_LOG_INTERVAL_SETTING = Setting.timeSetting(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Comparator;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -69,6 +70,20 @@ public class DesiredBalanceReconciler {
private final NodeAllocationOrdering allocationOrdering = new NodeAllocationOrdering();
private final NodeAllocationOrdering moveOrdering = new NodeAllocationOrdering();

// stats
/**
* Number of unassigned shards during last reconciliation
*/
protected final AtomicLong unassignedShards = new AtomicLong();
/**
* Total number of assigned shards during last reconciliation
*/
protected final AtomicLong totalAllocations = new AtomicLong();
/**
* Number of assigned shards during last reconciliation that are not allocated on desired node and need to be moved
*/
protected final AtomicLong undesiredAllocations = new AtomicLong();

public DesiredBalanceReconciler(ClusterSettings clusterSettings, ThreadPool threadPool) {
this.undesiredAllocationLogInterval = new FrequencyCappedAction(threadPool);
clusterSettings.initializeAndWatch(UNDESIRED_ALLOCATIONS_LOG_INTERVAL_SETTING, this.undesiredAllocationLogInterval::setMinInterval);
Expand Down Expand Up @@ -445,16 +460,17 @@ private void balance() {
return;
}

long allAllocations = 0;
long undesiredAllocations = 0;
int unassignedShards = routingNodes.unassigned().size() + routingNodes.unassigned().ignored().size();
int totalAllocations = 0;
int undesiredAllocations = 0;

// Iterate over all started shards and try to move any which are on undesired nodes. In the presence of throttling shard
// movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are offloading the
// shards.
for (final var iterator = OrderedShardsIterator.create(routingNodes, moveOrdering); iterator.hasNext();) {
final var shardRouting = iterator.next();

allAllocations++;
totalAllocations++;

if (shardRouting.started() == false) {
// can only rebalance started shards
Expand Down Expand Up @@ -504,10 +520,14 @@ private void balance() {
}
}

maybeLogUndesiredAllocationsWarning(allAllocations, undesiredAllocations, routingNodes.size());
DesiredBalanceReconciler.this.unassignedShards.set(unassignedShards);
DesiredBalanceReconciler.this.undesiredAllocations.set(undesiredAllocations);
DesiredBalanceReconciler.this.totalAllocations.set(totalAllocations);

maybeLogUndesiredAllocationsWarning(totalAllocations, undesiredAllocations, routingNodes.size());
}

private void maybeLogUndesiredAllocationsWarning(long allAllocations, long undesiredAllocations, int nodeCount) {
private void maybeLogUndesiredAllocationsWarning(int allAllocations, int undesiredAllocations, int nodeCount) {
// more shards than cluster can relocate with one reroute
final boolean nonEmptyRelocationBacklog = undesiredAllocations > 2L * nodeCount;
final boolean warningThresholdReached = undesiredAllocations > undesiredAllocationsLogThreshold * allAllocations;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,10 @@ public DesiredBalanceStats getStats() {
desiredBalanceComputer.iterations.sum(),
computedShardMovements.sum(),
cumulativeComputationTime.count(),
cumulativeReconciliationTime.count()
cumulativeReconciliationTime.count(),
desiredBalanceReconciler.unassignedShards.get(),
desiredBalanceReconciler.totalAllocations.get(),
desiredBalanceReconciler.undesiredAllocations.get()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import java.io.IOException;

import static org.elasticsearch.TransportVersions.ADDITIONAL_DESIRED_BALANCE_RECONCILIATION_STATS;

public record DesiredBalanceStats(
long lastConvergedIndex,
boolean computationActive,
Expand All @@ -28,7 +30,10 @@ public record DesiredBalanceStats(
long computationIterations,
long computedShardMovements,
long cumulativeComputationTime,
long cumulativeReconciliationTime
long cumulativeReconciliationTime,
long unassignedShards,
long totalAllocations,
long undesiredAllocations
) implements Writeable, ToXContentObject {

private static final TransportVersion COMPUTED_SHARD_MOVEMENTS_VERSION = TransportVersions.V_8_8_0;
Expand All @@ -50,7 +55,10 @@ public static DesiredBalanceStats readFrom(StreamInput in) throws IOException {
in.readVLong(),
in.getTransportVersion().onOrAfter(COMPUTED_SHARD_MOVEMENTS_VERSION) ? in.readVLong() : -1,
in.readVLong(),
in.readVLong()
in.readVLong(),
in.getTransportVersion().onOrAfter(ADDITIONAL_DESIRED_BALANCE_RECONCILIATION_STATS) ? in.readVLong() : -1,
in.getTransportVersion().onOrAfter(ADDITIONAL_DESIRED_BALANCE_RECONCILIATION_STATS) ? in.readVLong() : -1,
in.getTransportVersion().onOrAfter(ADDITIONAL_DESIRED_BALANCE_RECONCILIATION_STATS) ? in.readVLong() : -1
);
}

Expand All @@ -67,6 +75,11 @@ public void writeTo(StreamOutput out) throws IOException {
}
out.writeVLong(cumulativeComputationTime);
out.writeVLong(cumulativeReconciliationTime);
if (out.getTransportVersion().onOrAfter(ADDITIONAL_DESIRED_BALANCE_RECONCILIATION_STATS)) {
out.writeVLong(unassignedShards);
out.writeVLong(totalAllocations);
out.writeVLong(undesiredAllocations);
}
}

@Override
Expand All @@ -81,7 +94,21 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field("computed_shard_movements", computedShardMovements);
builder.humanReadableField("computation_time_in_millis", "computation_time", new TimeValue(cumulativeComputationTime));
builder.humanReadableField("reconciliation_time_in_millis", "reconciliation_time", new TimeValue(cumulativeReconciliationTime));
builder.field("unassigned_shards", unassignedShards);
builder.field("total_allocations", totalAllocations);
builder.field("undesired_allocations", undesiredAllocations);
builder.field("undesired_allocations_fraction", undesiredAllocationsFraction());
builder.endObject();
return builder;
}

public double undesiredAllocationsFraction() {
if (unassignedShards == -1 || totalAllocations == -1 || undesiredAllocations == -1) {
return -1.0;
} else if (totalAllocations == 0) {
return 0.0;
} else {
return (double) undesiredAllocations / totalAllocations;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.allocator.ClusterBalanceStats;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceStats;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceStatsTests;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.test.AbstractChunkedSerializingTestCase;
Expand All @@ -30,6 +30,7 @@

import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toMap;
import static org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceStatsTests.randomDesiredBalanceStats;
import static org.hamcrest.Matchers.containsInAnyOrder;

public class DesiredBalanceResponseTests extends AbstractWireSerializingTestCase<DesiredBalanceResponse> {
Expand All @@ -49,20 +50,6 @@ protected DesiredBalanceResponse createTestInstance() {
);
}

private DesiredBalanceStats randomDesiredBalanceStats() {
return new DesiredBalanceStats(
randomNonNegativeLong(),
randomBoolean(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong()
);
}

private ClusterBalanceStats randomClusterBalanceStats() {
return new ClusterBalanceStats(
randomNonNegativeInt(),
Expand Down Expand Up @@ -156,7 +143,7 @@ private Map<String, Map<Integer, DesiredBalanceResponse.DesiredShards>> randomRo
protected DesiredBalanceResponse mutateInstance(DesiredBalanceResponse instance) {
return switch (randomInt(4)) {
case 0 -> new DesiredBalanceResponse(
randomValueOtherThan(instance.getStats(), this::randomDesiredBalanceStats),
randomValueOtherThan(instance.getStats(), DesiredBalanceStatsTests::randomDesiredBalanceStats),
instance.getClusterBalanceStats(),
instance.getRoutingTable(),
instance.getClusterInfo()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import static org.elasticsearch.cluster.ClusterModule.BALANCED_ALLOCATOR;
import static org.elasticsearch.cluster.ClusterModule.DESIRED_BALANCE_ALLOCATOR;
import static org.elasticsearch.cluster.ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING;
import static org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceStatsTests.randomDesiredBalanceStats;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -219,17 +220,7 @@ public void testGetDesiredBalance() throws Exception {
}

when(desiredBalanceShardsAllocator.getDesiredBalance()).thenReturn(new DesiredBalance(randomInt(1024), shardAssignments));
DesiredBalanceStats desiredBalanceStats = new DesiredBalanceStats(
randomInt(Integer.MAX_VALUE),
randomBoolean(),
randomInt(Integer.MAX_VALUE),
randomInt(Integer.MAX_VALUE),
randomInt(Integer.MAX_VALUE),
randomInt(Integer.MAX_VALUE),
randomInt(Integer.MAX_VALUE),
randomInt(Integer.MAX_VALUE),
randomInt(Integer.MAX_VALUE)
);
DesiredBalanceStats desiredBalanceStats = randomDesiredBalanceStats();
when(desiredBalanceShardsAllocator.getStats()).thenReturn(desiredBalanceStats);
ClusterInfo clusterInfo = ClusterInfo.EMPTY;
when(clusterInfoService.getClusterInfo()).thenReturn(clusterInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;

import java.util.Locale;

import static org.hamcrest.Matchers.equalTo;

public class DesiredBalanceStatsTests extends AbstractWireSerializingTestCase<DesiredBalanceStats> {
Expand All @@ -25,6 +23,10 @@ protected Writeable.Reader<DesiredBalanceStats> instanceReader() {

@Override
protected DesiredBalanceStats createTestInstance() {
return randomDesiredBalanceStats();
}

public static DesiredBalanceStats randomDesiredBalanceStats() {
return new DesiredBalanceStats(
randomNonNegativeLong(),
randomBoolean(),
Expand All @@ -34,6 +36,9 @@ protected DesiredBalanceStats createTestInstance() {
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong()
);
}
Expand All @@ -48,8 +53,7 @@ public void testToXContent() {
assertThat(
Strings.toString(instance, true, false),
equalTo(
String.format(
Locale.ROOT,
Strings.format(
"""
{
"computation_converged_index" : %d,
Expand All @@ -60,9 +64,12 @@ public void testToXContent() {
"computation_iterations" : %d,
"computed_shard_movements" : %d,
"computation_time_in_millis" : %d,
"reconciliation_time_in_millis" : %d
"reconciliation_time_in_millis" : %d,
"unassigned_shards" : %d,
"total_allocations" : %d,
"undesired_allocations" : %d,
"undesired_allocations_fraction" : %s
}""",

instance.lastConvergedIndex(),
instance.computationActive(),
instance.computationSubmitted(),
Expand All @@ -71,7 +78,11 @@ public void testToXContent() {
instance.computationIterations(),
instance.computedShardMovements(),
instance.cumulativeComputationTime(),
instance.cumulativeReconciliationTime()
instance.cumulativeReconciliationTime(),
instance.unassignedShards(),
instance.totalAllocations(),
instance.undesiredAllocations(),
Double.toString(instance.undesiredAllocationsFraction())
)
)
);
Expand Down

0 comments on commit 842e563

Please sign in to comment.