Skip to content

Commit

Permalink
Fix over-allocation of mounted indices on a cold/frozen node (#86331)
Browse files Browse the repository at this point in the history
Preserve the expected shard size in ShardRouting when starting a new shard in case it was recovered from a searchable snapshot. After that DiskThresholdDecider can account for the shards that haven't been synced with cluster service info yet.

Fixes #85018
  • Loading branch information
arteam committed Sep 19, 2022
1 parent 0bbe17a commit 7dc8806
Show file tree
Hide file tree
Showing 47 changed files with 629 additions and 181 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/86331.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 86331
summary: Fix over-allocation of mounted indices on a cold/frozen node
area: Snapshot/Restore
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ static ClusterState createClusterState(PersistentTasksCustomMetadata tasksCustom
String nodeId = ESTestCase.randomAlphaOfLength(8);
shardRouting = shardRouting.initialize(nodeId, null, shardRouting.getExpectedShardSize());
if (noStartedShards == false) {
shardRouting = shardRouting.moveToStarted();
shardRouting = shardRouting.moveToStarted(ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
}
return ClusterState.builder(new ClusterName("name"))
.metadata(Metadata.builder().putCustom(TYPE, tasksCustomMetadata).put(idxMeta))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,10 @@ public static final IndexShard recoverShard(IndexShard newShard) throws IOExcept
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null));
recoverFromStore(newShard);
IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted());
IndexShardTestCase.updateRoutingEntry(
newShard,
newShard.routingEntry().moveToStarted(ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE)
);
return newShard;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public class RoutingNode implements Iterable<ShardRouting> {

private final LinkedHashSet<ShardRouting> relocatingShards;

private final LinkedHashSet<ShardRouting> startedShards;

private final Map<Index, Set<ShardRouting>> shardsByIndex;

/**
Expand All @@ -57,6 +59,7 @@ public class RoutingNode implements Iterable<ShardRouting> {
this.shards = Maps.newLinkedHashMapWithExpectedSize(sizeGuess);
this.relocatingShards = new LinkedHashSet<>();
this.initializingShards = new LinkedHashSet<>();
this.startedShards = new LinkedHashSet<>();
this.shardsByIndex = Maps.newHashMapWithExpectedSize(sizeGuess);
assert invariant();
}
Expand All @@ -67,6 +70,7 @@ private RoutingNode(RoutingNode original) {
this.shards = new LinkedHashMap<>(original.shards);
this.relocatingShards = new LinkedHashSet<>(original.relocatingShards);
this.initializingShards = new LinkedHashSet<>(original.initializingShards);
this.startedShards = new LinkedHashSet<>(original.startedShards);
this.shardsByIndex = Maps.copyOf(original.shardsByIndex, HashSet::new);
assert invariant();
}
Expand Down Expand Up @@ -145,6 +149,8 @@ private void addInternal(ShardRouting shard, boolean validate) {
initializingShards.add(shard);
} else if (shard.relocating()) {
relocatingShards.add(shard);
} else if (shard.started()) {
startedShards.add(shard);
}
shardsByIndex.computeIfAbsent(shard.index(), k -> new HashSet<>()).add(shard);
assert validate == false || invariant();
Expand All @@ -165,6 +171,9 @@ void update(ShardRouting oldShard, ShardRouting newShard) {
} else if (oldShard.relocating()) {
boolean exist = relocatingShards.remove(oldShard);
assert exist : "expected shard " + oldShard + " to exist in relocatingShards";
} else if (oldShard.started()) {
boolean exist = startedShards.remove(oldShard);
assert exist : "expected shard " + oldShard + " to exist in startedShards";
}
final Set<ShardRouting> byIndex = shardsByIndex.get(oldShard.index());
byIndex.remove(oldShard);
Expand All @@ -173,6 +182,8 @@ void update(ShardRouting oldShard, ShardRouting newShard) {
initializingShards.add(newShard);
} else if (newShard.relocating()) {
relocatingShards.add(newShard);
} else if (newShard.started()) {
startedShards.add(newShard);
}
assert invariant();
}
Expand All @@ -186,6 +197,9 @@ void remove(ShardRouting shard) {
} else if (shard.relocating()) {
boolean exist = relocatingShards.remove(shard);
assert exist : "expected shard " + shard + " to exist in relocatingShards";
} else if (shard.started()) {
boolean exist = startedShards.remove(shard);
assert exist : "expected shard " + shard + " to exist in startedShards";
}
final Set<ShardRouting> byIndex = shardsByIndex.get(shard.index());
byIndex.remove(shard);
Expand All @@ -206,6 +220,8 @@ public int numberOfShardsWithState(ShardRoutingState... states) {
return initializingShards.size();
} else if (states[0] == ShardRoutingState.RELOCATING) {
return relocatingShards.size();
} else if (states[0] == ShardRoutingState.STARTED) {
return startedShards.size();
}
}

Expand All @@ -230,6 +246,8 @@ public List<ShardRouting> shardsWithState(ShardRoutingState state) {
return new ArrayList<>(initializingShards);
} else if (state == ShardRoutingState.RELOCATING) {
return new ArrayList<>(relocatingShards);
} else if (state == ShardRoutingState.STARTED) {
return new ArrayList<>(startedShards);
}
List<ShardRouting> shards = new ArrayList<>();
for (ShardRouting shardEntry : this) {
Expand All @@ -250,6 +268,10 @@ public ShardRouting[] relocating() {
return relocatingShards.toArray(EMPTY_SHARD_ROUTING_ARRAY);
}

public ShardRouting[] started() {
return startedShards.toArray(EMPTY_SHARD_ROUTING_ARRAY);
}

/**
* Determine the shards of an index with a specific state
* @param index id of the index
Expand All @@ -276,6 +298,14 @@ public List<ShardRouting> shardsWithState(String index, ShardRoutingState... sta
shards.add(shardEntry);
}
return shards;
} else if (states[0] == ShardRoutingState.STARTED) {
for (ShardRouting shardEntry : startedShards) {
if (shardEntry.getIndexName().equals(index) == false) {
continue;
}
shards.add(shardEntry);
}
return shards;
}
}

Expand Down Expand Up @@ -348,21 +378,20 @@ public boolean isEmpty() {
boolean invariant() {
var shardRoutingsInitializing = new ArrayList<ShardRouting>(shards.size());
var shardRoutingsRelocating = new ArrayList<ShardRouting>(shards.size());
var shardRoutingsStarted = new ArrayList<ShardRouting>(shards.size());
// this guess assumes 1 shard per index, this is not precise, but okay for assertion
var shardRoutingsByIndex = Maps.<Index, Set<ShardRouting>>newHashMapWithExpectedSize(shards.size());

for (var shard : shards.values()) {
if (shard.initializing()) {
shardRoutingsInitializing.add(shard);
}
if (shard.relocating()) {
shardRoutingsRelocating.add(shard);
switch (shard.state()) {
case INITIALIZING -> shardRoutingsInitializing.add(shard);
case RELOCATING -> shardRoutingsRelocating.add(shard);
case STARTED -> shardRoutingsStarted.add(shard);
}
shardRoutingsByIndex.computeIfAbsent(shard.index(), k -> new HashSet<>(10)).add(shard);
}

assert initializingShards.size() == shardRoutingsInitializing.size() && initializingShards.containsAll(shardRoutingsInitializing);
assert relocatingShards.size() == shardRoutingsRelocating.size() && relocatingShards.containsAll(shardRoutingsRelocating);
assert startedShards.size() == shardRoutingsStarted.size() && startedShards.containsAll(shardRoutingsStarted);
assert shardRoutingsByIndex.equals(shardsByIndex);

return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,9 +475,14 @@ public Tuple<ShardRouting, ShardRouting> relocateShard(
*
* @return the started shard
*/
public ShardRouting startShard(Logger logger, ShardRouting initializingShard, RoutingChangesObserver routingChangesObserver) {
public ShardRouting startShard(
Logger logger,
ShardRouting initializingShard,
RoutingChangesObserver routingChangesObserver,
long startedExpectedShardSize
) {
ensureMutable();
ShardRouting startedShard = started(initializingShard);
ShardRouting startedShard = started(initializingShard, startedExpectedShardSize);
logger.trace("{} marked shard as started (routing: {})", initializingShard.shardId(), initializingShard);
routingChangesObserver.shardStarted(initializingShard, startedShard);

Expand Down Expand Up @@ -679,7 +684,7 @@ private void promoteReplicaToPrimary(ShardRouting activeReplica, RoutingChangesO
*
* @return the started shard
*/
private ShardRouting started(ShardRouting shard) {
private ShardRouting started(ShardRouting shard, long expectedShardSize) {
assert shard.initializing() : "expected an initializing shard " + shard;
if (shard.relocatingNodeId() == null) {
// if this is not a target shard for relocation, we need to update statistics
Expand All @@ -689,7 +694,7 @@ private ShardRouting started(ShardRouting shard) {
}
}
removeRecovery(shard);
ShardRouting startedShard = shard.moveToStarted();
ShardRouting startedShard = shard.moveToStarted(expectedShardSize);
updateAssigned(shard, startedShard);
return startedShard;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.cluster.routing;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.routing.RecoverySource.ExistingStoreRecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.PeerRecoverySource;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
Expand All @@ -34,6 +35,7 @@ public final class ShardRouting implements Writeable, ToXContentObject {
* Used if shard size is not available
*/
public static final long UNAVAILABLE_EXPECTED_SHARD_SIZE = -1;
private static final Version EXPECTED_SHARD_SIZE_FOR_STARTED_VERSION = Version.V_8_5_0;

private final ShardId shardId;
private final String currentNodeId;
Expand Down Expand Up @@ -72,9 +74,6 @@ public final class ShardRouting implements Writeable, ToXContentObject {
this.allocationId = allocationId;
this.expectedShardSize = expectedShardSize;
this.targetRelocatingShard = initializeTargetRelocatingShard();
assert expectedShardSize == UNAVAILABLE_EXPECTED_SHARD_SIZE
|| state == ShardRoutingState.INITIALIZING
|| state == ShardRoutingState.RELOCATING : expectedShardSize + " state: " + state;
assert expectedShardSize >= 0 || state != ShardRoutingState.INITIALIZING || state != ShardRoutingState.RELOCATING
: expectedShardSize + " state: " + state;
assert (state == ShardRoutingState.UNASSIGNED && unassignedInfo == null) == false : "unassigned shard must be created with meta";
Expand Down Expand Up @@ -285,7 +284,9 @@ public ShardRouting(ShardId shardId, StreamInput in) throws IOException {
unassignedInfo = in.readOptionalWriteable(UnassignedInfo::new);
allocationId = in.readOptionalWriteable(AllocationId::new);
final long shardSize;
if (state == ShardRoutingState.RELOCATING || state == ShardRoutingState.INITIALIZING) {
if (state == ShardRoutingState.RELOCATING
|| state == ShardRoutingState.INITIALIZING
|| (state == ShardRoutingState.STARTED && in.getVersion().onOrAfter(EXPECTED_SHARD_SIZE_FOR_STARTED_VERSION))) {
shardSize = in.readLong();
} else {
shardSize = UNAVAILABLE_EXPECTED_SHARD_SIZE;
Expand Down Expand Up @@ -314,7 +315,9 @@ public void writeToThin(StreamOutput out) throws IOException {
}
out.writeOptionalWriteable(unassignedInfo);
out.writeOptionalWriteable(allocationId);
if (state == ShardRoutingState.RELOCATING || state == ShardRoutingState.INITIALIZING) {
if (state == ShardRoutingState.RELOCATING
|| state == ShardRoutingState.INITIALIZING
|| (state == ShardRoutingState.STARTED && out.getVersion().onOrAfter(EXPECTED_SHARD_SIZE_FOR_STARTED_VERSION))) {
out.writeLong(expectedShardSize);
}
}
Expand Down Expand Up @@ -485,13 +488,14 @@ public ShardRouting reinitializeReplicaShard() {
* <code>INITIALIZING</code> or <code>RELOCATING</code>. Any relocation will be
* canceled.
*/
public ShardRouting moveToStarted() {
public ShardRouting moveToStarted(long expectedShardSize) {
assert state == ShardRoutingState.INITIALIZING : "expected an initializing shard " + this;
AllocationId allocationId = this.allocationId;
if (allocationId.getRelocationId() != null) {
// relocation target
allocationId = AllocationId.finishRelocation(allocationId);
}

return new ShardRouting(
shardId,
currentNodeId,
Expand All @@ -501,7 +505,7 @@ public ShardRouting moveToStarted() {
null,
null,
allocationId,
UNAVAILABLE_EXPECTED_SHARD_SIZE
expectedShardSize
);
}

Expand Down Expand Up @@ -759,7 +763,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
.field("relocating_node", relocatingNodeId())
.field("shard", id())
.field("index", getIndexName());
if (expectedShardSize != UNAVAILABLE_EXPECTED_SHARD_SIZE) {
if (expectedShardSize != UNAVAILABLE_EXPECTED_SHARD_SIZE && state != ShardRoutingState.STARTED) {
builder.field("expected_shard_size_in_bytes", expectedShardSize);
}
if (recoverySource != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,8 +571,10 @@ private static void applyStartedShards(RoutingAllocation routingAllocation, List
+ startedShard
+ " but was: "
+ routingNodes.getByAllocationId(startedShard.shardId(), startedShard.allocationId().getId());

routingNodes.startShard(logger, startedShard, routingAllocation.changes());
long expectedShardSize = routingAllocation.metadata().getIndexSafe(startedShard.index()).isSearchableSnapshot()
? startedShard.getExpectedShardSize()
: ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE;
routingNodes.startShard(logger, startedShard, routingAllocation.changes(), expectedShardSize);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,13 +423,14 @@ public void onNewInfo(ClusterInfo info) {

// exposed for tests to override
long sizeOfRelocatingShards(RoutingNode routingNode, DiskUsage diskUsage, ClusterInfo info, ClusterState reroutedClusterState) {
return DiskThresholdDecider.sizeOfRelocatingShards(
return DiskThresholdDecider.sizeOfUnaccountedShards(
routingNode,
true,
diskUsage.getPath(),
info,
reroutedClusterState.metadata(),
reroutedClusterState.routingTable()
reroutedClusterState.routingTable(),
0L
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@

import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.DiskUsage;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.metadata.DesiredNodes;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingChangesObserver;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
Expand All @@ -26,6 +28,7 @@
import org.elasticsearch.snapshots.RestoreService.RestoreInProgressUpdater;
import org.elasticsearch.snapshots.SnapshotShardSizeInfo;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
Expand Down Expand Up @@ -77,6 +80,9 @@ public class RoutingAllocation {
@Nullable
private final DesiredNodes desiredNodes;

// Tracks the sizes of the searchable snapshots that aren't yet registered in ClusterInfo by their cluster node id
private final Map<String, Long> unaccountedSearchableSnapshotSizes;

public RoutingAllocation(
AllocationDeciders deciders,
ClusterState clusterState,
Expand All @@ -100,7 +106,7 @@ public RoutingAllocation(
AllocationDeciders deciders,
@Nullable RoutingNodes routingNodes,
ClusterState clusterState,
ClusterInfo clusterInfo,
@Nullable ClusterInfo clusterInfo,
SnapshotShardSizeInfo shardSizeInfo,
long currentNanoTime
) {
Expand All @@ -119,6 +125,30 @@ public RoutingAllocation(
}
this.nodeReplacementTargets = Map.copyOf(targetNameToShutdown);
this.desiredNodes = DesiredNodes.latestFromClusterState(clusterState);
unaccountedSearchableSnapshotSizes = unaccountedSearchableSnapshotSizes(clusterState, clusterInfo);
}

private static Map<String, Long> unaccountedSearchableSnapshotSizes(ClusterState clusterState, ClusterInfo clusterInfo) {
Map<String, Long> unaccountedSearchableSnapshotSizes = new HashMap<>();
if (clusterInfo != null) {
for (RoutingNode node : clusterState.getRoutingNodes()) {
DiskUsage usage = clusterInfo.getNodeMostAvailableDiskUsages().get(node.nodeId());
ClusterInfo.ReservedSpace reservedSpace = clusterInfo.getReservedSpace(node.nodeId(), usage != null ? usage.getPath() : "");
long totalSize = 0;
for (ShardRouting shard : node.started()) {
if (shard.getExpectedShardSize() > 0
&& clusterState.metadata().getIndexSafe(shard.index()).isSearchableSnapshot()
&& reservedSpace.containsShardId(shard.shardId()) == false
&& clusterInfo.getShardSize(shard) == null) {
totalSize += shard.getExpectedShardSize();
}
}
if (totalSize > 0) {
unaccountedSearchableSnapshotSizes.put(node.nodeId(), totalSize);
}
}
}
return Collections.unmodifiableMap(unaccountedSearchableSnapshotSizes);
}

/** returns the nano time captured at the beginning of the allocation. used to make sure all time based decisions are aligned */
Expand Down Expand Up @@ -333,6 +363,13 @@ public void setHasPendingAsyncFetch() {
this.hasPendingAsyncFetch = true;
}

/**
* Returns an approximation of the size (in bytes) of the unaccounted searchable snapshots before the allocation
*/
public long unaccountedSearchableSnapshotSize(RoutingNode routingNode) {
return unaccountedSearchableSnapshotSizes.getOrDefault(routingNode.nodeId(), 0L);
}

public enum DebugMode {
/**
* debug mode is off
Expand Down

0 comments on commit 7dc8806

Please sign in to comment.