Skip to content

Commit

Permalink
Also reroute after shard snapshot size fetch failure (#66008)
Browse files Browse the repository at this point in the history
In #61906 we added the possibility for the master node to fetch
the size of a shard snapshot before allocating the shard to a
data node with enough disk space to host it. When merging
this change we agreed that any failure during size fetching
should not prevent the shard to be allocated.

Sadly it does not work as expected: the service only triggers
reroutes when fetching the size succeed but never when it
 fails. It means that a shard might stay unassigned until
another cluster state update triggers a new allocation
(as in #64372). More sadly, the test I wrote was wrong as
it explicitly triggered a reroute.

This commit changes the InternalSnapshotsInfoService
so that it also triggers a reroute when fetching the snapshot
shard size failed, ensuring that the allocation can move
forward by using an UNAVAILABLE_EXPECTED_SHARD_SIZE
shard size. This unknown shard size is kept around in the
snapshot info service until no corresponding unassigned
shards need the information.

Backport of #65436
  • Loading branch information
tlrx committed Dec 8, 2020
1 parent 063db03 commit 16fae5d
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class InternalSnapshotsInfoService implements ClusterStateListener, Snaps
private final Supplier<RerouteService> rerouteService;

/** contains the snapshot shards for which the size is known **/
private volatile ImmutableOpenMap<SnapshotShard, Long> knownSnapshotShardSizes;
private volatile ImmutableOpenMap<SnapshotShard, Long> knownSnapshotShards;

private volatile boolean isMaster;

Expand All @@ -99,7 +99,7 @@ public InternalSnapshotsInfoService(
this.threadPool = clusterService.getClusterApplierService().threadPool();
this.repositoriesService = repositoriesServiceSupplier;
this.rerouteService = rerouteServiceSupplier;
this.knownSnapshotShardSizes = ImmutableOpenMap.of();
this.knownSnapshotShards = ImmutableOpenMap.of();
this.unknownSnapshotShards = new LinkedHashSet<>();
this.failedSnapshotShards = new LinkedHashSet<>();
this.queue = new LinkedList<>();
Expand All @@ -120,10 +120,12 @@ private void setMaxConcurrentFetches(Integer maxConcurrentFetches) {
@Override
public SnapshotShardSizeInfo snapshotShardSizes() {
synchronized (mutex){
final ImmutableOpenMap.Builder<SnapshotShard, Long> snapshotShardSizes = ImmutableOpenMap.builder(knownSnapshotShardSizes);
for (SnapshotShard snapshotShard : failedSnapshotShards) {
Long previous = snapshotShardSizes.put(snapshotShard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
assert previous == null : "snapshot shard size already known for " + snapshotShard;
final ImmutableOpenMap.Builder<SnapshotShard, Long> snapshotShardSizes = ImmutableOpenMap.builder(knownSnapshotShards);
if (failedSnapshotShards.isEmpty() == false) {
for (SnapshotShard snapshotShard : failedSnapshotShards) {
Long previous = snapshotShardSizes.put(snapshotShard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
assert previous == null : "snapshot shard size already known for " + snapshotShard;
}
}
return new SnapshotShardSizeInfo(snapshotShardSizes.build());
}
Expand All @@ -139,10 +141,9 @@ public void clusterChanged(ClusterChangedEvent event) {
isMaster = true;
for (SnapshotShard snapshotShard : onGoingSnapshotRecoveries) {
// check if already populated entry
if (knownSnapshotShardSizes.containsKey(snapshotShard) == false) {
if (knownSnapshotShards.containsKey(snapshotShard) == false && failedSnapshotShards.contains(snapshotShard) == false) {
// check if already fetching snapshot info in progress
if (unknownSnapshotShards.add(snapshotShard)) {
failedSnapshotShards.remove(snapshotShard); // retry the failed shard
queue.add(snapshotShard);
unknownShards += 1;
}
Expand All @@ -162,7 +163,7 @@ public void clusterChanged(ClusterChangedEvent event) {
// have to repopulate the data over and over in an unstable master situation?
synchronized (mutex) {
// information only needed on current master
knownSnapshotShardSizes = ImmutableOpenMap.of();
knownSnapshotShards = ImmutableOpenMap.of();
failedSnapshotShards.clear();
isMaster = false;
SnapshotShard snapshotShard;
Expand All @@ -175,7 +176,7 @@ public void clusterChanged(ClusterChangedEvent event) {
} else {
synchronized (mutex) {
assert unknownSnapshotShards.isEmpty() || unknownSnapshotShards.size() == activeFetches;
assert knownSnapshotShardSizes.isEmpty();
assert knownSnapshotShards.isEmpty();
assert failedSnapshotShards.isEmpty();
assert isMaster == false;
assert queue.isEmpty();
Expand Down Expand Up @@ -228,10 +229,10 @@ protected void doRun() throws Exception {
assert removed : "snapshot shard to remove does not exist " + snapshotShardSize;
if (isMaster) {
final ImmutableOpenMap.Builder<SnapshotShard, Long> newSnapshotShardSizes =
ImmutableOpenMap.builder(knownSnapshotShardSizes);
ImmutableOpenMap.builder(knownSnapshotShards);
updated = newSnapshotShardSizes.put(snapshotShard, snapshotShardSize) == null;
assert updated : "snapshot shard size already exists for " + snapshotShard;
knownSnapshotShardSizes = newSnapshotShardSizes.build();
knownSnapshotShards = newSnapshotShardSizes.build();
}
activeFetches -= 1;
assert invariant();
Expand All @@ -244,17 +245,21 @@ protected void doRun() throws Exception {
@Override
public void onFailure(Exception e) {
logger.warn(() -> new ParameterizedMessage("failed to retrieve shard size for {}", snapshotShard), e);
boolean failed = false;
synchronized (mutex) {
if (isMaster) {
final boolean added = failedSnapshotShards.add(snapshotShard);
assert added : "snapshot shard size already failed for " + snapshotShard;
failed = failedSnapshotShards.add(snapshotShard);
assert failed : "snapshot shard size already failed for " + snapshotShard;
}
if (removed == false) {
unknownSnapshotShards.remove(snapshotShard);
}
activeFetches -= 1;
assert invariant();
}
if (failed) {
rerouteService.get().reroute("snapshot shard size failed", Priority.HIGH, REROUTE_LISTENER);
}
}

@Override
Expand All @@ -266,16 +271,16 @@ public void onAfter() {
private void cleanUpSnapshotShardSizes(Set<SnapshotShard> requiredSnapshotShards) {
assert Thread.holdsLock(mutex);
ImmutableOpenMap.Builder<SnapshotShard, Long> newSnapshotShardSizes = null;
for (ObjectCursor<SnapshotShard> shard : knownSnapshotShardSizes.keys()) {
for (ObjectCursor<SnapshotShard> shard : knownSnapshotShards.keys()) {
if (requiredSnapshotShards.contains(shard.value) == false) {
if (newSnapshotShardSizes == null) {
newSnapshotShardSizes = ImmutableOpenMap.builder(knownSnapshotShardSizes);
newSnapshotShardSizes = ImmutableOpenMap.builder(knownSnapshotShards);
}
newSnapshotShardSizes.remove(shard.value);
}
}
if (newSnapshotShardSizes != null) {
knownSnapshotShardSizes = newSnapshotShardSizes.build();
knownSnapshotShards = newSnapshotShardSizes.build();
}
failedSnapshotShards.retainAll(requiredSnapshotShards);
}
Expand All @@ -284,16 +289,16 @@ private boolean invariant() {
assert Thread.holdsLock(mutex);
assert activeFetches >= 0 : "active fetches should be greater than or equal to zero but got: " + activeFetches;
assert activeFetches <= maxConcurrentFetches : activeFetches + " <= " + maxConcurrentFetches;
for (ObjectCursor<SnapshotShard> cursor : knownSnapshotShardSizes.keys()) {
for (ObjectCursor<SnapshotShard> cursor : knownSnapshotShards.keys()) {
assert unknownSnapshotShards.contains(cursor.value) == false : "cannot be known and unknown at same time: " + cursor.value;
assert failedSnapshotShards.contains(cursor.value) == false : "cannot be known and failed at same time: " + cursor.value;
}
for (SnapshotShard shard : unknownSnapshotShards) {
assert knownSnapshotShardSizes.keys().contains(shard) == false : "cannot be unknown and known at same time: " + shard;
assert knownSnapshotShards.keys().contains(shard) == false : "cannot be unknown and known at same time: " + shard;
assert failedSnapshotShards.contains(shard) == false : "cannot be unknown and failed at same time: " + shard;
}
for (SnapshotShard shard : failedSnapshotShards) {
assert knownSnapshotShardSizes.keys().contains(shard) == false : "cannot be failed and known at same time: " + shard;
assert knownSnapshotShards.keys().contains(shard) == false : "cannot be failed and known at same time: " + shard;
assert unknownSnapshotShards.contains(shard) == false : "cannot be failed and unknown at same time: " + shard;
}
return true;
Expand All @@ -315,7 +320,7 @@ int numberOfFailedSnapshotShardSizes() {

// used in tests
int numberOfKnownSnapshotShardSizes() {
return knownSnapshotShardSizes.size();
return knownSnapshotShards.size();
}

private static Set<SnapshotShard> listOfSnapshotShards(final ClusterState state) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,12 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, In
}

public void testErroneousSnapshotShardSizes() throws Exception {
final AtomicInteger reroutes = new AtomicInteger();
final RerouteService rerouteService = (reason, priority, listener) -> {
reroutes.incrementAndGet();
listener.onResponse(clusterService.state());
};

final InternalSnapshotsInfoService snapshotsInfoService =
new InternalSnapshotsInfoService(Settings.builder()
.put(INTERNAL_SNAPSHOT_INFO_MAX_CONCURRENT_FETCHES_SETTING.getKey(), randomIntBetween(1, 10))
Expand Down Expand Up @@ -244,6 +250,9 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, In
assertThat(snapshotShardSizeInfo.getShardSize(shardRouting, defaultValue),
success ? equalTo(results.get(snapshotShard.getKey())) : equalTo(defaultValue));
}

assertThat("Expecting all snapshot shard size fetches to provide a size", results.size(), equalTo(maxShardsToCreate));
assertThat("Expecting all snapshot shard size fetches to execute a Reroute", reroutes.get(), equalTo(maxShardsToCreate));
}

public void testNoLongerMaster() throws Exception {
Expand Down

0 comments on commit 16fae5d

Please sign in to comment.