Skip to content

Commit

Permalink
Fix Queued Snapshot Clone not Starting after Data Node Drops Out (#77111
Browse files Browse the repository at this point in the history
)

We have to account for queued up clones when dealing with nodes dropping out
and start them when they become ready to execute because of a node leaving the cluster.
Added test to reproduce the issue in #77101 and another test to verify that the more complex
case of clone queued after snapshot queued after clone still works correctly as well.
The solution here is the most direct fix I could think of and the by far easiest to backport.
That said, I added a TODO that asks for a follow-up that should allow for completely removing
the duplicate code across handling shard updates and external changes. The difference between
the two ways of updating the state is a left-over from the time before we had concurrent
operations and has become a needless complexity nowadays.

closes #77101
  • Loading branch information
original-brownbear committed Sep 1, 2021
1 parent 007469a commit 8b9c52e
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,34 @@ public void testRemoveFailedCloneFromCSWithQueuedSnapshotInProgress() throws Exc
assertAcked(startDeleteSnapshot(repoName, sourceSnapshot).get());
}

public void testCloneAfterFailedShardSnapshot() throws Exception {
final String masterNode = internalCluster().startMasterOnlyNode();
final String dataNode = internalCluster().startDataOnlyNode();
final String repoName = "test-repo";
createRepository(repoName, "mock");
final String testIndex = "index-test";
createIndex(testIndex);
final String sourceSnapshot = "source-snapshot";
createFullSnapshot(repoName, sourceSnapshot);
indexRandomDocs(testIndex, randomIntBetween(1, 100));
blockDataNode(repoName, dataNode);
final ActionFuture<CreateSnapshotResponse> snapshotFuture = client(masterNode).admin()
.cluster()
.prepareCreateSnapshot(repoName, "full-snapshot")
.execute();
awaitNumberOfSnapshotsInProgress(1);
waitForBlock(dataNode, repoName);
final ActionFuture<AcknowledgedResponse> cloneFuture = client(masterNode).admin()
.cluster()
.prepareCloneSnapshot(repoName, sourceSnapshot, "target-snapshot")
.setIndices(testIndex)
.execute();
awaitNumberOfSnapshotsInProgress(2);
internalCluster().stopNode(dataNode);
assertAcked(cloneFuture.get());
assertTrue(snapshotFuture.isDone());
}

private ActionFuture<AcknowledgedResponse> startCloneFromDataNode(
String repoName,
String sourceSnapshot,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1690,6 +1690,66 @@ public void testIndexDeletedWhileSnapshotQueuedAfterClone() throws Exception {
);
}

public void testIndexDeletedWhileSnapshotAndCloneQueuedAfterClone() throws Exception {
final String master = internalCluster().startMasterOnlyNode(LARGE_SNAPSHOT_POOL_SETTINGS);
internalCluster().startDataOnlyNode();
final String index1 = "index-1";
final String index2 = "index-2";
createIndexWithContent(index1);
createIndexWithContent(index2);

final String repository = "test-repo";
createRepository(repository, "mock");

final String sourceSnapshot = "source-snapshot";
createFullSnapshot(repository, sourceSnapshot);

final IndexId index1Id = getRepositoryData(repository).resolveIndexId(index1);
blockMasterOnShardLevelSnapshotFile(repository, index1Id.getId());

final String cloneTarget = "target-snapshot";
final ActionFuture<AcknowledgedResponse> cloneSnapshot = clusterAdmin().prepareCloneSnapshot(
repository,
sourceSnapshot,
cloneTarget
).setIndices(index1, index2).execute();
awaitNumberOfSnapshotsInProgress(1);
waitForBlock(master, repository);

final ActionFuture<CreateSnapshotResponse> snapshot3 = clusterAdmin().prepareCreateSnapshot(repository, "snapshot-3")
.setIndices(index1, index2)
.setWaitForCompletion(true)
.setPartial(true)
.execute();
final ActionFuture<CreateSnapshotResponse> snapshot2 = clusterAdmin().prepareCreateSnapshot(repository, "snapshot-2")
.setIndices(index2)
.setWaitForCompletion(true)
.execute();
assertSuccessful(snapshot2);
awaitNumberOfSnapshotsInProgress(2);
assertFalse(snapshot3.isDone());

final String cloneTarget2 = "target-snapshot-2";
final ActionFuture<AcknowledgedResponse> cloneSnapshot2 = clusterAdmin().prepareCloneSnapshot(
repository,
sourceSnapshot,
cloneTarget2
).setIndices(index1, index2).execute();

assertAcked(admin().indices().prepareDelete(index1).get());
assertSuccessful(snapshot3);
unblockNode(repository, master);

assertAcked(cloneSnapshot.get());
assertAcked(cloneSnapshot2.get());
assertAcked(startDeleteSnapshot(repository, cloneTarget).get());

assertThat(
clusterAdmin().prepareSnapshotStatus().setSnapshots("snapshot-2", "snapshot-3").setRepository(repository).get().getSnapshots(),
hasSize(2)
);
}

public void testQueuedAfterFailedShardSnapshot() throws Exception {
internalCluster().startMasterOnlyNode();
final String dataNode = internalCluster().startDataOnlyNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1105,6 +1105,10 @@ private void processExternalChanges(boolean changedNodes, boolean startShards) {
public ClusterState execute(ClusterState currentState) {
RoutingTable routingTable = currentState.routingTable();
final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
final SnapshotDeletionsInProgress deletes = currentState.custom(
SnapshotDeletionsInProgress.TYPE,
SnapshotDeletionsInProgress.EMPTY
);
DiscoveryNodes nodes = currentState.nodes();
boolean changed = false;
final EnumSet<State> statesToUpdate;
Expand All @@ -1122,19 +1126,73 @@ public ClusterState execute(ClusterState currentState) {

// We keep a cache of shards that failed in this map. If we fail a shardId for a given repository because of
// a node leaving or shard becoming unassigned for one snapshot, we will also fail it for all subsequent enqueued
// snapshots
// for the same repository
final Map<String, Map<ShardId, ShardSnapshotStatus>> knownFailures = new HashMap<>();

// snapshots for the same repository
// TODO: the code in this state update duplicates large chunks of the logic in #SHARD_STATE_EXECUTOR.
// We should refactor it to ideally also go through #SHARD_STATE_EXECUTOR by hand-crafting shard state updates
// that encapsulate nodes leaving or indices having been deleted and passing them to the executor instead.
final Map<String, Map<RepositoryShardId, ShardSnapshotStatus>> knownFailures = new HashMap<>();
for (final SnapshotsInProgress.Entry snapshot : snapshots.entries()) {
if (statesToUpdate.contains(snapshot.state())) {
// Currently initializing clone
if (snapshot.isClone() && snapshot.shardsByRepoShardId().isEmpty()) {
if (initializingClones.contains(snapshot.snapshot())) {
updatedSnapshotEntries.add(snapshot);
if (snapshot.isClone()) {
if (snapshot.shardsByRepoShardId().isEmpty()) {
// Currently initializing clone
if (initializingClones.contains(snapshot.snapshot())) {
updatedSnapshotEntries.add(snapshot);
} else {
logger.debug("removing not yet start clone operation [{}]", snapshot);
changed = true;
}
} else {
logger.debug("removing not yet start clone operation [{}]", snapshot);
changed = true;
// see if any clones may have had a shard become available for execution because of failures
if (deletes.hasExecutingDeletion(snapshot.repository())) {
// Currently executing a delete for this repo, no need to try and update any clone operations.
// The logic for finishing the delete will update running clones with the latest changes.
updatedSnapshotEntries.add(snapshot);
continue;
}
ImmutableOpenMap.Builder<RepositoryShardId, ShardSnapshotStatus> clones = null;
InFlightShardSnapshotStates inFlightShardSnapshotStates = null;
for (Map.Entry<RepositoryShardId, ShardSnapshotStatus> failureEntry : knownFailures.getOrDefault(
snapshot.repository(),
Collections.emptyMap()
).entrySet()) {
final RepositoryShardId repositoryShardId = failureEntry.getKey();
final ShardSnapshotStatus existingStatus = snapshot.shardsByRepoShardId().get(repositoryShardId);
if (ShardSnapshotStatus.UNASSIGNED_QUEUED.equals(existingStatus)) {
if (inFlightShardSnapshotStates == null) {
inFlightShardSnapshotStates = InFlightShardSnapshotStates.forRepo(
snapshot.repository(),
updatedSnapshotEntries
);
}
if (inFlightShardSnapshotStates.isActive(
repositoryShardId.indexName(),
repositoryShardId.shardId()
)) {
// we already have this shard assigned to another task
continue;
}
if (clones == null) {
clones = ImmutableOpenMap.builder(snapshot.shardsByRepoShardId());
}
// We can use the generation from the shard failure to start the clone operation here
// because #processWaitingShardsAndRemovedNodes adds generations to failure statuses that allow
// us to start another clone.
// The usual route via InFlightShardSnapshotStates is not viable here because it would require
// a consistent view of the RepositoryData which we don't have here because this state update
// runs over all repositories at once.
clones.put(
repositoryShardId,
new ShardSnapshotStatus(nodes.getLocalNodeId(), failureEntry.getValue().generation())
);
}
}
if (clones != null) {
changed = true;
updatedSnapshotEntries.add(snapshot.withClones(clones.build()));
} else {
updatedSnapshotEntries.add(snapshot);
}
}
} else {
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards = processWaitingShardsAndRemovedNodes(
Expand Down Expand Up @@ -1236,28 +1294,26 @@ private static ImmutableOpenMap<ShardId, ShardSnapshotStatus> processWaitingShar
SnapshotsInProgress.Entry entry,
RoutingTable routingTable,
DiscoveryNodes nodes,
Map<ShardId, ShardSnapshotStatus> knownFailures
Map<RepositoryShardId, ShardSnapshotStatus> knownFailures
) {
if (entry.isClone()) {
return null;
}
assert entry.isClone() == false : "clones take a different path";
boolean snapshotChanged = false;
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shards = ImmutableOpenMap.builder();
for (ObjectObjectCursor<RepositoryShardId, ShardSnapshotStatus> shardEntry : entry.shardsByRepoShardId()) {
ShardSnapshotStatus shardStatus = shardEntry.value;
ShardId shardId = entry.shardId(shardEntry.key);
if (shardStatus.equals(ShardSnapshotStatus.UNASSIGNED_QUEUED)) {
// this shard snapshot is waiting for a previous snapshot to finish execution for this shard
final ShardSnapshotStatus knownFailure = knownFailures.get(shardId);
final ShardSnapshotStatus knownFailure = knownFailures.get(shardEntry.key);
if (knownFailure == null) {
final IndexRoutingTable indexShardRoutingTable = routingTable.index(shardId.getIndex());
if (indexShardRoutingTable == null) {
// shard became unassigned while queued so we fail as missing here
// shard became unassigned while queued after a delete or clone operation so we can fail as missing here
assert entry.partial();
snapshotChanged = true;
logger.debug("failing snapshot of shard [{}] because index got deleted", shardId);
shards.put(shardId, ShardSnapshotStatus.MISSING);
knownFailures.put(shardId, ShardSnapshotStatus.MISSING);
knownFailures.put(shardEntry.key, ShardSnapshotStatus.MISSING);
} else {
// if no failure is known for the shard we keep waiting
shards.put(shardId, shardStatus);
Expand Down Expand Up @@ -1299,7 +1355,7 @@ private static ImmutableOpenMap<ShardId, ShardSnapshotStatus> processWaitingShar
shardStatus.generation()
);
shards.put(shardId, failedState);
knownFailures.put(shardId, failedState);
knownFailures.put(shardEntry.key, failedState);
} else if (shardStatus.state().completed() == false && shardStatus.nodeId() != null) {
if (nodes.nodeExists(shardStatus.nodeId())) {
shards.put(shardId, shardStatus);
Expand All @@ -1314,7 +1370,7 @@ private static ImmutableOpenMap<ShardId, ShardSnapshotStatus> processWaitingShar
shardStatus.generation()
);
shards.put(shardId, failedState);
knownFailures.put(shardId, failedState);
knownFailures.put(shardEntry.key, failedState);
}
} else {
shards.put(shardId, shardStatus);
Expand Down

0 comments on commit 8b9c52e

Please sign in to comment.