Skip to content

Commit

Permalink
Add debug information to ReactiveReason about assigned and unassigned…
Browse files Browse the repository at this point in the history
… shards (#86132)

We would like to see not only the amount of assigned and unassigned shards, but also the ids of the respective shards.

Fixes #85243

* Calculate assigned shards based on storagePreventsRemainOrMove and storagePreventsAllocation0
* Correctly set assignedShardIds
* Add a TODO
* Add tests for assigned and non-assigned shard ids for ReactiveStorageDeciderService
* Remove changes in ProactiveStorageDeciderService
* Update docs/changelog/86132.yaml
* Return empty list for unassigned shards if can't make a decision
* Keep a terse constructor for ReactiveReason
* Move assignedBytesUnmovableShards initialization to the original point
* Add tests for JSON representation of ReactiveReason
* Add a missed LICENSE header
* shards ids are empty if the context is null
* use shardIds helper
* Use ShardId in records
* Use record to represent amount of bytes and shard ids
* Sort shards by their ids
* Limit the amount of assigned shards in the output
* Test that we limit the amount of assigned shards in the output
* Move to SortedSet
* Add tests that shard ids are sorted
* Remove internal package protected methods of allocation info
* Verify shard ids
* Correctly calculate the amount of unmovableShards
* Move assignedBytes
* Extract the max amount of assigned shard ids to a constant
* Extract the shard ids output version to a constant
* Rename BytesShardIds to ShardsSize
* Extract ShardsSize to a top level class
* Keep shards its closer to the total size
* Add missed license import
* Update docs/changelog/86132.yaml
* Fix the test for assigned shard JSON field names
* Limit the amount of unassigned shards
* Rename the limit to MAX_AMOUNT_OF_SHARDS
* Use toList instead of Collections.toList
* Bring back public accessors
* Precollect unallocated shards to avoid double calculation
* Update version to V_8_4_0
* Randomize index name
* Coalesce numAllocatableSubjectShards and allocatableShards
* Randomize indexName
* Add comment about duplicated shard ids
  • Loading branch information
arteam committed Jun 8, 2022
1 parent f615744 commit 666e85b
Show file tree
Hide file tree
Showing 7 changed files with 324 additions and 50 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/86132.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 86132
summary: Add debug information to `ReactiveReason` about assigned and unassigned shards
area: Allocation
type: enhancement
issues:
- 85243
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
diskThresholdSettings,
allocationDeciders
);
long unassignedBytesBeforeForecast = allocationState.storagePreventsAllocation();
long unassignedBytesBeforeForecast = allocationState.storagePreventsAllocation().sizeInBytes();
assert unassignedBytesBeforeForecast >= 0;

TimeValue forecastWindow = FORECAST_WINDOW.get(configuration);
Expand All @@ -72,8 +72,8 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
System.currentTimeMillis()
);

long unassignedBytes = allocationStateAfterForecast.storagePreventsAllocation();
long assignedBytes = allocationStateAfterForecast.storagePreventsRemainOrMove();
long unassignedBytes = allocationStateAfterForecast.storagePreventsAllocation().sizeInBytes();
long assignedBytes = allocationStateAfterForecast.storagePreventsRemainOrMove().sizeInBytes();
long maxShardSize = allocationStateAfterForecast.maxShardSize();
assert assignedBytes >= 0;
assert unassignedBytes >= unassignedBytesBeforeForecast;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.autoscaling.storage;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.DiskUsage;
Expand Down Expand Up @@ -55,13 +56,16 @@
import java.io.IOException;
import java.math.BigInteger;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -106,9 +110,11 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
return new AutoscalingDeciderResult(null, new ReactiveReason("current capacity not available", -1, -1));
}

AllocationState allocationState = new AllocationState(context, diskThresholdSettings, allocationDeciders);
long unassignedBytes = allocationState.storagePreventsAllocation();
long assignedBytes = allocationState.storagePreventsRemainOrMove();
AllocationState allocationState = allocationState(context);
var assignedBytesUnmovableShards = allocationState.storagePreventsRemainOrMove();
long assignedBytes = assignedBytesUnmovableShards.sizeInBytes();
var unassignedBytesUnassignedShards = allocationState.storagePreventsAllocation();
long unassignedBytes = unassignedBytesUnassignedShards.sizeInBytes();
long maxShardSize = allocationState.maxShardSize();
assert assignedBytes >= 0;
assert unassignedBytes >= 0;
Expand All @@ -118,7 +124,20 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
.total(autoscalingCapacity.total().storage().getBytes() + unassignedBytes + assignedBytes, null)
.node(maxShardSize, null)
.build();
return new AutoscalingDeciderResult(requiredCapacity, new ReactiveReason(message, unassignedBytes, assignedBytes));
return new AutoscalingDeciderResult(
requiredCapacity,
new ReactiveReason(
message,
unassignedBytes,
unassignedBytesUnassignedShards.shardIds(),
assignedBytes,
assignedBytesUnmovableShards.shardIds()
)
);
}

AllocationState allocationState(AutoscalingDeciderContext context) {
return new AllocationState(context, diskThresholdSettings, allocationDeciders);
}

static String message(long unassignedBytes, long assignedBytes) {
Expand Down Expand Up @@ -214,16 +233,19 @@ public static class AllocationState {
this.roles = roles;
}

public long storagePreventsAllocation() {
public ShardsSize storagePreventsAllocation() {
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state, info, shardSizeInfo, System.nanoTime());
return StreamSupport.stream(state.getRoutingNodes().unassigned().spliterator(), false)
List<ShardRouting> unassignedShards = StreamSupport.stream(state.getRoutingNodes().unassigned().spliterator(), false)
.filter(shard -> canAllocate(shard, allocation) == false)
.filter(shard -> cannotAllocateDueToStorage(shard, allocation))
.mapToLong(this::sizeOf)
.sum();
.toList();
return new ShardsSize(
unassignedShards.stream().mapToLong(this::sizeOf).sum(),
unassignedShards.stream().map(ShardRouting::shardId).collect(Collectors.toCollection(TreeSet::new))
);
}

public long storagePreventsRemainOrMove() {
public ShardsSize storagePreventsRemainOrMove() {
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state, info, shardSizeInfo, System.nanoTime());

List<ShardRouting> candidates = new LinkedList<>();
Expand All @@ -249,13 +271,18 @@ && canAllocate(shard, allocation) == false) {
.mapToLong(e -> unmovableSize(e.getKey(), e.getValue()))
.sum();

long unallocatableBytes = candidates.stream()
List<ShardRouting> unallocatedShards = candidates.stream()
.filter(Predicate.not(unmovableShards::contains))
.filter(s1 -> cannotAllocateDueToStorage(s1, allocation))
.mapToLong(this::sizeOf)
.sum();

return unallocatableBytes + unmovableBytes;
.toList();
long unallocatableBytes = unallocatedShards.stream().mapToLong(this::sizeOf).sum();

return new ShardsSize(
unallocatableBytes + unmovableBytes,
Stream.concat(unmovableShards.stream(), unallocatedShards.stream())
.map(ShardRouting::shardId)
.collect(Collectors.toCollection(TreeSet::new))
);
}

/**
Expand Down Expand Up @@ -668,20 +695,45 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}

public static class ReactiveReason implements AutoscalingDeciderResult.Reason {

static final int MAX_AMOUNT_OF_SHARDS = 512;
private static final Version SHARD_IDS_OUTPUT_VERSION = Version.V_8_4_0;

private final String reason;
private final long unassigned;
private final long assigned;
private final SortedSet<ShardId> unassignedShardIds;
private final SortedSet<ShardId> assignedShardIds;

public ReactiveReason(String reason, long unassigned, long assigned) {
this(reason, unassigned, Collections.emptySortedSet(), assigned, Collections.emptySortedSet());
}

ReactiveReason(
String reason,
long unassigned,
SortedSet<ShardId> unassignedShardIds,
long assigned,
SortedSet<ShardId> assignedShardIds
) {
this.reason = reason;
this.unassigned = unassigned;
this.assigned = assigned;
this.unassignedShardIds = unassignedShardIds;
this.assignedShardIds = assignedShardIds;
}

public ReactiveReason(StreamInput in) throws IOException {
this.reason = in.readString();
this.unassigned = in.readLong();
this.assigned = in.readLong();
if (in.getVersion().onOrAfter(SHARD_IDS_OUTPUT_VERSION)) {
unassignedShardIds = Collections.unmodifiableSortedSet(new TreeSet<>(in.readSet(ShardId::new)));
assignedShardIds = Collections.unmodifiableSortedSet(new TreeSet<>(in.readSet(ShardId::new)));
} else {
unassignedShardIds = Collections.emptySortedSet();
assignedShardIds = Collections.emptySortedSet();
}
}

@Override
Expand All @@ -697,6 +749,14 @@ public long assigned() {
return assigned;
}

public SortedSet<ShardId> unassignedShardIds() {
return unassignedShardIds;
}

public SortedSet<ShardId> assignedShardIds() {
return assignedShardIds;
}

@Override
public String getWriteableName() {
return ReactiveStorageDeciderService.NAME;
Expand All @@ -707,14 +767,22 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(reason);
out.writeLong(unassigned);
out.writeLong(assigned);
if (out.getVersion().onOrAfter(SHARD_IDS_OUTPUT_VERSION)) {
out.writeCollection(unassignedShardIds);
out.writeCollection(assignedShardIds);
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("reason", reason);
builder.field("unassigned", unassigned);
builder.field("unassigned_shards", unassignedShardIds.stream().limit(MAX_AMOUNT_OF_SHARDS).toList());
builder.field("unassigned_shards_count", unassignedShardIds.size());
builder.field("assigned", assigned);
builder.field("assigned_shards", assignedShardIds.stream().limit(MAX_AMOUNT_OF_SHARDS).toList());
builder.field("assigned_shards_count", assignedShardIds.size());
builder.endObject();
return builder;
}
Expand All @@ -724,12 +792,16 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ReactiveReason that = (ReactiveReason) o;
return unassigned == that.unassigned && assigned == that.assigned && reason.equals(that.reason);
return unassigned == that.unassigned
&& assigned == that.assigned
&& reason.equals(that.reason)
&& unassignedShardIds.equals(that.unassignedShardIds)
&& assignedShardIds.equals(that.assignedShardIds);
}

@Override
public int hashCode() {
return Objects.hash(reason, unassigned, assigned);
return Objects.hash(reason, unassigned, assigned, unassignedShardIds, assignedShardIds);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.autoscaling.storage;

import org.elasticsearch.index.shard.ShardId;

import java.util.SortedSet;

record ShardsSize(long sizeInBytes, SortedSet<ShardId> shardIds) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.autoscaling.storage;

import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.json.JsonXContent;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.greaterThan;

public class ReactiveReasonTests extends ESTestCase {

@SuppressWarnings("unchecked")
public void testXContent() throws IOException {
String reason = randomAlphaOfLength(10);
long unassigned = randomNonNegativeLong();
long assigned = randomNonNegativeLong();
String indexUUID = UUIDs.randomBase64UUID();
String indexName = randomAlphaOfLength(10);
SortedSet<ShardId> unassignedShardIds = new TreeSet<>(randomUnique(() -> new ShardId(indexName, indexUUID, randomInt(1000)), 600));
SortedSet<ShardId> assignedShardIds = new TreeSet<>(randomUnique(() -> new ShardId(indexName, indexUUID, randomInt(1000)), 600));
var reactiveReason = new ReactiveStorageDeciderService.ReactiveReason(
reason,
unassigned,
unassignedShardIds,
assigned,
assignedShardIds
);

try (
XContentParser parser = createParser(
JsonXContent.jsonXContent,
BytesReference.bytes(reactiveReason.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
)
) {
Map<String, Object> map = parser.map();
assertEquals(reason, map.get("reason"));
assertEquals(unassigned, map.get("unassigned"));
assertEquals(assigned, map.get("assigned"));

List<String> xContentUnassignedShardIds = (List<String>) map.get("unassigned_shards");
assertEquals(
unassignedShardIds.stream()
.map(ShardId::toString)
.limit(ReactiveStorageDeciderService.ReactiveReason.MAX_AMOUNT_OF_SHARDS)
.toList(),
xContentUnassignedShardIds
);
assertSorted(xContentUnassignedShardIds.stream().map(ShardId::fromString).toList());
assertEquals(unassignedShardIds.size(), map.get("unassigned_shards_count"));

List<String> xContentAssignedShardIds = (List<String>) map.get("assigned_shards");
assertEquals(
assignedShardIds.stream()
.map(ShardId::toString)
.limit(ReactiveStorageDeciderService.ReactiveReason.MAX_AMOUNT_OF_SHARDS)
.collect(Collectors.toList()),
xContentAssignedShardIds
);
assertSorted(xContentAssignedShardIds.stream().map(ShardId::fromString).toList());
assertEquals(assignedShardIds.size(), map.get("assigned_shards_count"));
}
}

private static void assertSorted(Collection<ShardId> collection) {
ShardId previous = null;
for (ShardId e : collection) {
if (previous != null) {
assertThat(e, greaterThan(previous));
}
previous = e;
}
}
}

0 comments on commit 666e85b

Please sign in to comment.