Skip to content

Commit

Permalink
Autoscaling during shrink (#88292)
Browse files Browse the repository at this point in the history
Fix autoscaling during shrink to disregard pinning to nodes for the total tier size.
Instead signal that we need a minimum node size to hold the entire shrink
operation. This avoids scaling far higher than necessary when cluster
balancing does not allow a shrink to proceed. It is considered a
(separate) balancing issue when a shrink cannot complete with enough
space in the tier.

This changes autoscaling in general for node pinning filters (based on
`_id`, `_name` or `name` filters).

Clone and split also pins to the shards they clone or split, similarly
this is changed to ignore that pinning during autoscaling.

Closes #85480
  • Loading branch information
henningandersen committed Jul 8, 2022
1 parent d4f78db commit 66e750c
Show file tree
Hide file tree
Showing 7 changed files with 585 additions and 27 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/88292.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 88292
summary: Autoscaling during shrink
area: Autoscaling
type: bug
issues:
- 85480
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

public class DiscoveryNodeFilters {

public static final Set<String> SINGLE_NODE_NAMES = Set.of("_id", "_name", "name");
static final Set<String> NON_ATTRIBUTE_NAMES = Set.of("_ip", "_host_ip", "_publish_ip", "host", "_id", "_name", "name");

public enum OpType {
Expand Down Expand Up @@ -234,6 +235,20 @@ public boolean isOnlyAttributeValueFilter() {
return filters.keySet().stream().anyMatch(NON_ATTRIBUTE_NAMES::contains) == false;
}

/**
* @return true if filter is for a single node
*/
public boolean isSingleNodeFilter() {
return withoutTierPreferences != null && withoutTierPreferences.isSingleNodeFilterInternal();
}

private boolean isSingleNodeFilterInternal() {
return (filters.size() == 1
&& NON_ATTRIBUTE_NAMES.contains(filters.keySet().iterator().next())
&& (filters.values().iterator().next().length == 1 || opType == OpType.AND))
|| (filters.size() > 1 && opType == OpType.AND && NON_ATTRIBUTE_NAMES.containsAll(filters.keySet()));
}

/**
* Generates a human-readable string for the DiscoverNodeFilters.
* Example: {@code _id:"id1 OR blah",name:"blah OR name2"}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ public void testScaleUp() throws IOException, InterruptedException {
response.results().get(policyName).requiredCapacity().total().storage().getBytes(),
Matchers.greaterThanOrEqualTo(enoughSpace + used)
);
assertThat(response.results().get(policyName).requiredCapacity().node().storage().getBytes(), Matchers.equalTo(maxShardSize));
assertThat(
response.results().get(policyName).requiredCapacity().node().storage().getBytes(),
Matchers.equalTo(maxShardSize + ReactiveStorageDeciderService.NODE_DISK_OVERHEAD + LOW_WATERMARK_BYTES)
);

// with 0 window, we expect just current.
putAutoscalingPolicy(
Expand All @@ -101,7 +104,10 @@ public void testScaleUp() throws IOException, InterruptedException {
assertThat(response.results().keySet(), Matchers.equalTo(Set.of(policyName)));
assertThat(response.results().get(policyName).currentCapacity().total().storage().getBytes(), Matchers.equalTo(enoughSpace));
assertThat(response.results().get(policyName).requiredCapacity().total().storage().getBytes(), Matchers.equalTo(enoughSpace));
assertThat(response.results().get(policyName).requiredCapacity().node().storage().getBytes(), Matchers.equalTo(maxShardSize));
assertThat(
response.results().get(policyName).requiredCapacity().node().storage().getBytes(),
Matchers.equalTo(maxShardSize + ReactiveStorageDeciderService.NODE_DISK_OVERHEAD + LOW_WATERMARK_BYTES)
);
}

private void putAutoscalingPolicy(String policyName, Settings settings) {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.elasticsearch.cluster.node.DiscoveryNodeFilters;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
Expand All @@ -32,6 +34,7 @@
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ResizeAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
Expand Down Expand Up @@ -73,10 +76,33 @@

public class ReactiveStorageDeciderService implements AutoscalingDeciderService {
public static final String NAME = "reactive_storage";
/**
* An estimate of what space other things than accounted for by shard sizes in ClusterInfo use on disk.
* Set conservatively low for now.
*/
static final long NODE_DISK_OVERHEAD = ByteSizeValue.ofMb(10).getBytes();

private final DiskThresholdSettings diskThresholdSettings;
private final AllocationDeciders allocationDeciders;

private static final Predicate<String> REMOVE_NODE_LOCKED_FILTER_INITIAL = removeNodeLockedFilterPredicate(
IndexMetadata.INDEX_ROUTING_INITIAL_RECOVERY_GROUP_SETTING.getKey()
);

private static final Predicate<String> REMOVE_NODE_LOCKED_FILTER_REQUIRE = removeNodeLockedFilterPredicate(
IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey()
);

private static final Predicate<String> REMOVE_NODE_LOCKED_FILTER_INCLUDE = removeNodeLockedFilterPredicate(
IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey()
);

private static Predicate<String> removeNodeLockedFilterPredicate(String settingPrefix) {
return Predicate.not(
DiscoveryNodeFilters.SINGLE_NODE_NAMES.stream().map(settingPrefix::concat).collect(Collectors.toSet())::contains
);
}

public ReactiveStorageDeciderService(Settings settings, ClusterSettings clusterSettings, AllocationDeciders allocationDeciders) {
this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterSettings);
this.allocationDeciders = allocationDeciders;
Expand Down Expand Up @@ -116,13 +142,16 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
var unassignedBytesUnassignedShards = allocationState.storagePreventsAllocation();
long unassignedBytes = unassignedBytesUnassignedShards.sizeInBytes();
long maxShardSize = allocationState.maxShardSize();
long maxNodeLockedSize = allocationState.maxNodeLockedSize();
long minimumNodeSize = nodeSizeForDataBelowLowWatermark(Math.max(maxShardSize, maxNodeLockedSize), diskThresholdSettings)
+ NODE_DISK_OVERHEAD;
assert assignedBytes >= 0;
assert unassignedBytes >= 0;
assert maxShardSize >= 0;
String message = message(unassignedBytes, assignedBytes);
AutoscalingCapacity requiredCapacity = AutoscalingCapacity.builder()
.total(autoscalingCapacity.total().storage().getBytes() + unassignedBytes + assignedBytes, null, null)
.node(maxShardSize, null, null)
.node(minimumNodeSize, null, null)
.build();
return new AutoscalingDeciderResult(
requiredCapacity,
Expand Down Expand Up @@ -150,6 +179,10 @@ static boolean isDiskOnlyNoDecision(Decision decision) {
return singleNoDecision(decision, single -> true).map(DiskThresholdDecider.NAME::equals).orElse(false);
}

static boolean isResizeOnlyNoDecision(Decision decision) {
return singleNoDecision(decision, single -> true).map(ResizeAllocationDecider.NAME::equals).orElse(false);
}

static boolean isFilterTierOnlyDecision(Decision decision, IndexMetadata indexMetadata) {
// only primary shards are handled here, allowing us to disregard same shard allocation decider.
return singleNoDecision(decision, single -> SameShardAllocationDecider.NAME.equals(single.label()) == false).filter(
Expand Down Expand Up @@ -185,9 +218,24 @@ static Optional<String> singleNoDecision(Decision decision, Predicate<Decision>
}
}

static long nodeSizeForDataBelowLowWatermark(long bytes, DiskThresholdSettings thresholdSettings) {
ByteSizeValue bytesThreshold = thresholdSettings.getFreeBytesThresholdLow();
if (bytesThreshold.getBytes() != 0) {
return bytesThreshold.getBytes() + bytes;
} else {
double percentThreshold = thresholdSettings.getFreeDiskThresholdLow();
if (percentThreshold >= 0.0 && percentThreshold < 100.0) {
return (long) (bytes / ((100.0 - percentThreshold) / 100));
} else {
return bytes;
}
}
}

// todo: move this to top level class.
public static class AllocationState {
private final ClusterState state;
private final ClusterState originalState;
private final AllocationDeciders allocationDeciders;
private final DiskThresholdSettings diskThresholdSettings;
private final ClusterInfo info;
Expand Down Expand Up @@ -222,7 +270,8 @@ public static class AllocationState {
Set<DiscoveryNode> nodes,
Set<DiscoveryNodeRole> roles
) {
this.state = state;
this.state = removeNodeLockFilters(state);
this.originalState = state;
this.allocationDeciders = allocationDeciders;
this.diskThresholdSettings = diskThresholdSettings;
this.info = info;
Expand Down Expand Up @@ -324,8 +373,16 @@ private boolean cannotAllocateDueToStorage(ShardRouting shard, RoutingAllocation
// enable debug decisions to see all decisions and preserve the allocation decision label
allocation.debugDecision(true);
try {
return nodesInTier(allocation.routingNodes()).map(node -> allocationDeciders.canAllocate(shard, node, allocation))
.anyMatch(ReactiveStorageDeciderService::isDiskOnlyNoDecision);
boolean diskOnly = nodesInTier(allocation.routingNodes()).map(
node -> allocationDeciders.canAllocate(shard, node, allocation)
).anyMatch(ReactiveStorageDeciderService::isDiskOnlyNoDecision);
if (diskOnly && shard.unassigned() && shard.recoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) {
// For resize shards only allow autoscaling if there is no other node where the shard could fit had it not been
// a resize shard. Notice that we already removed any initial_recovery filters.
diskOnly = nodesInTier(allocation.routingNodes()).map(node -> allocationDeciders.canAllocate(shard, node, allocation))
.anyMatch(ReactiveStorageDeciderService::isResizeOnlyNoDecision) == false;
}
return diskOnly;
} finally {
allocation.debugDecision(false);
}
Expand Down Expand Up @@ -395,16 +452,25 @@ boolean needsThisTier(ShardRouting shard, RoutingAllocation allocation) {

private boolean isAssignedToTier(ShardRouting shard, RoutingAllocation allocation) {
IndexMetadata indexMetadata = indexMetadata(shard, allocation);
return DataTierAllocationDecider.shouldFilter(indexMetadata, roles, this::highestPreferenceTier, allocation) != Decision.NO;
return isAssignedToTier(indexMetadata, roles);
}

private static boolean isAssignedToTier(IndexMetadata indexMetadata, Set<DiscoveryNodeRole> roles) {
List<String> tierPreference = indexMetadata.getTierPreference();
return tierPreference.isEmpty() || DataTierAllocationDecider.allocationAllowed(highestPreferenceTier(tierPreference), roles);
}

private IndexMetadata indexMetadata(ShardRouting shard, RoutingAllocation allocation) {
return allocation.metadata().getIndexSafe(shard.index());
}

private Optional<String> highestPreferenceTier(List<String> preferredTiers, DiscoveryNodes unused, DesiredNodes desiredNodes) {
return Optional.of(highestPreferenceTier(preferredTiers));
}

private static String highestPreferenceTier(List<String> preferredTiers) {
assert preferredTiers.isEmpty() == false;
return Optional.of(preferredTiers.get(0));
return preferredTiers.get(0);
}

public long maxShardSize() {
Expand All @@ -414,6 +480,49 @@ public long maxShardSize() {
.orElse(0L);
}

public long maxNodeLockedSize() {
Metadata metadata = originalState.getMetadata();
return metadata.indices().values().stream().mapToLong(imd -> nodeLockedSize(imd, metadata)).max().orElse(0L);
}

private long nodeLockedSize(IndexMetadata indexMetadata, Metadata metadata) {
if (isNodeLocked(indexMetadata)) {
IndexRoutingTable indexRoutingTable = state.getRoutingTable().index(indexMetadata.getIndex());
long sum = 0;
for (int s = 0; s < indexMetadata.getNumberOfShards(); ++s) {
ShardRouting shard = indexRoutingTable.shard(s).primaryShard();
long size = sizeOf(shard);
sum += size;
}
if (indexMetadata.getResizeSourceIndex() != null) {
// since we only report the max size for an index, count a shrink/clone/split 2x if it is node locked.
sum = sum * 2;
}
return sum;
} else {
Index resizeSourceIndex = indexMetadata.getResizeSourceIndex();
if (resizeSourceIndex != null) {
IndexMetadata sourceIndexMetadata = metadata.getIndexSafe(resizeSourceIndex);
// ResizeAllocationDecider only handles clone or split, do the same here.

if (indexMetadata.getNumberOfShards() >= sourceIndexMetadata.getNumberOfShards()) {
IndexRoutingTable indexRoutingTable = state.getRoutingTable().index(resizeSourceIndex);
long max = 0;
for (int s = 0; s < sourceIndexMetadata.getNumberOfShards(); ++s) {
ShardRouting shard = indexRoutingTable.shard(s).primaryShard();
long size = sizeOf(shard);
max = Math.max(max, size);
}

// 2x to account for the extra copy residing on the same node
return max * 2;
}
}
}

return 0;
}

long sizeOf(ShardRouting shard) {
long expectedShardSize = getExpectedShardSize(shard);
if (expectedShardSize == 0L && shard.primary() == false) {
Expand Down Expand Up @@ -638,6 +747,48 @@ ClusterInfo info() {
return info;
}

private static ClusterState removeNodeLockFilters(ClusterState state) {
ClusterState.Builder builder = ClusterState.builder(state);
builder.metadata(removeNodeLockFilters(state.metadata()));
return builder.build();
}

private static Metadata removeNodeLockFilters(Metadata metadata) {
Metadata.Builder builder = Metadata.builder(metadata);
metadata.stream()
.filter(AllocationState::isNodeLocked)
.map(AllocationState::removeNodeLockFilters)
.forEach(imd -> builder.put(imd, false));
return builder.build();
}

private static IndexMetadata removeNodeLockFilters(IndexMetadata indexMetadata) {
Settings settings = indexMetadata.getSettings();
settings = removeNodeLockFilters(settings, REMOVE_NODE_LOCKED_FILTER_INITIAL, indexMetadata.getInitialRecoveryFilters());
settings = removeNodeLockFilters(settings, REMOVE_NODE_LOCKED_FILTER_REQUIRE, indexMetadata.requireFilters());
settings = removeNodeLockFilters(settings, REMOVE_NODE_LOCKED_FILTER_INCLUDE, indexMetadata.includeFilters());
return IndexMetadata.builder(indexMetadata).settings(settings).build();
}

private static Settings removeNodeLockFilters(Settings settings, Predicate<String> predicate, DiscoveryNodeFilters filters) {
// only filter if it is a single node filter - otherwise removing it risks narrowing legal nodes for OR filters.
if (filters != null && filters.isSingleNodeFilter()) {
return settings.filter(predicate);
} else {
return settings;
}
}

private static boolean isNodeLocked(IndexMetadata indexMetadata) {
return isNodeLocked(indexMetadata.requireFilters())
|| isNodeLocked(indexMetadata.includeFilters())
|| isNodeLocked(indexMetadata.getInitialRecoveryFilters());
}

private static boolean isNodeLocked(DiscoveryNodeFilters filters) {
return filters != null && filters.isSingleNodeFilter();
}

private static class ExtendedClusterInfo extends ClusterInfo {
private final ClusterInfo delegate;

Expand Down

0 comments on commit 66e750c

Please sign in to comment.