Skip to content

Commit

Permalink
Refactor SnapshotsInProgress to Use RepositoryId for Concurency Logic (
Browse files Browse the repository at this point in the history
…#75501) (#76539)

This refactors the snapshots-in-progress logic to work from `RepositoryShardId` when working out what parts of the repository are in-use by writes for snapshot concurrency safety. This change does not go all the way yet on this topic and there are a number of possible follow-up further improvements to simplify the logic that I'd work through over time.
But for now this allows fixing the remaining known issues that snapshot stress testing surfaced when combined with the fix in #75530.

These issues all come from the fact that `ShardId` is not a stable key across multiple snapshots if snapshots are partial. The scenarios that are broken are all roughly this:
* snapshot-1 for index-A with uuid-A runs and is partial
* index-A is deleted and re-created and now has uuid-B
* snapshot-2 for index-A is started and we now have it queued up behind snapshot-1 for the index
* snapshot-1 finishes and the logic tries to start the next snapshot for the same shard-id
  * this fails because the shard-id is not the same, we can't compare index uuids, just index name + shard id
  * this change fixes all these spots by always taking the round trip via `RepositoryShardId`

planned follow-ups here are:
* dry up logic across cloning and snapshotting more as both now essentially run the same code in many state-machine steps
* serialize snapshots-in-progress efficiently instead of re-computing the index and by-repository-shard-id lookups in the constructor every time
    * refactor the logic in snapshots-in-progress away from maps keyed by shard-id in almost all spots to this end, just keep an index name to `Index` map to work out what exactly is being snapshotted
 * refactoring snapshots-in-progress to be a map of list of operations keyed by repository shard id instead of a list of maps as it currently is to make the concurrency simpler and more obviously correct

closes #75423

relates (#75339 ... should also fix this, but I have to verify by testing with a backport to 7.x)
  • Loading branch information
original-brownbear committed Aug 16, 2021
1 parent 8fe2ee7 commit 57092cc
Show file tree
Hide file tree
Showing 11 changed files with 275 additions and 217 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ public void testStartCloneWithSuccessfulShardSnapshotPendingFinalization() throw
awaitClusterState(clusterState -> {
final List<SnapshotsInProgress.Entry> entries = clusterState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY)
.entries();
return entries.size() == 2 && entries.get(1).clones().isEmpty() == false;
return entries.size() == 2 && entries.get(1).shardsByRepoShardId().isEmpty() == false;
});
assertFalse(blockedSnapshot.isDone());
} finally {
Expand Down Expand Up @@ -684,9 +684,9 @@ public void testStartCloneDuringRunningDelete() throws Exception {
logger.info("--> waiting for snapshot clone to be fully initialized");
awaitClusterState(state -> {
for (SnapshotsInProgress.Entry entry : state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries()) {
if (entry.clones().isEmpty() == false) {
if (entry.shardsByRepoShardId().isEmpty() == false) {
assertEquals(sourceSnapshot, entry.source().getName());
for (ObjectCursor<SnapshotsInProgress.ShardSnapshotStatus> value : entry.clones().values()) {
for (ObjectCursor<SnapshotsInProgress.ShardSnapshotStatus> value : entry.shardsByRepoShardId().values()) {
assertSame(value.value, SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED);
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryShardId;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
Expand Down Expand Up @@ -131,7 +132,7 @@ protected void masterOperation(

Set<String> nodesIds = new HashSet<>();
for (SnapshotsInProgress.Entry entry : currentSnapshots) {
for (ObjectCursor<SnapshotsInProgress.ShardSnapshotStatus> status : entry.shards().values()) {
for (ObjectCursor<SnapshotsInProgress.ShardSnapshotStatus> status : entry.shardsByRepoShardId().values()) {
if (status.value.nodeId() != null) {
nodesIds.add(status.value.nodeId());
}
Expand Down Expand Up @@ -188,15 +189,17 @@ private void buildResponse(
for (SnapshotsInProgress.Entry entry : currentSnapshotEntries) {
currentSnapshotNames.add(entry.snapshot().getSnapshotId().getName());
List<SnapshotIndexShardStatus> shardStatusBuilder = new ArrayList<>();
for (ObjectObjectCursor<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shardEntry : entry.shards()) {
for (ObjectObjectCursor<RepositoryShardId, SnapshotsInProgress.ShardSnapshotStatus> shardEntry : entry
.shardsByRepoShardId()) {
SnapshotsInProgress.ShardSnapshotStatus status = shardEntry.value;
if (status.nodeId() != null) {
// We should have information about this shard from the shard:
TransportNodesSnapshotsStatus.NodeSnapshotStatus nodeStatus = nodeSnapshotStatusMap.get(status.nodeId());
if (nodeStatus != null) {
Map<ShardId, SnapshotIndexShardStatus> shardStatues = nodeStatus.status().get(entry.snapshot());
if (shardStatues != null) {
SnapshotIndexShardStatus shardStatus = shardStatues.get(shardEntry.key);
final ShardId sid = entry.shardId(shardEntry.key);
SnapshotIndexShardStatus shardStatus = shardStatues.get(sid);
if (shardStatus != null) {
// We have full information about this shard
if (shardStatus.getStage() == SnapshotIndexShardStage.DONE && shardEntry.value.state() != SUCCESS) {
Expand All @@ -206,7 +209,7 @@ private void buildResponse(
// technically if the data node failed before successfully reporting DONE state to master, then
// this shards state would jump to a failed state.
shardStatus = new SnapshotIndexShardStatus(
shardEntry.key,
sid,
SnapshotIndexShardStage.FINALIZE,
shardStatus.getStats(),
shardStatus.getNodeId(),
Expand Down Expand Up @@ -245,7 +248,7 @@ private void buildResponse(
if (stage == SnapshotIndexShardStage.DONE) {
// Shard snapshot completed successfully so we should be able to load the exact statistics for this
// shard from the repository already.
final ShardId shardId = shardEntry.key;
final ShardId shardId = entry.shardId(shardEntry.key);
shardStatus = new SnapshotIndexShardStatus(
shardId,
repositoriesService.repository(entry.repository())
Expand All @@ -257,7 +260,7 @@ private void buildResponse(
.asCopy()
);
} else {
shardStatus = new SnapshotIndexShardStatus(shardEntry.key, stage);
shardStatus = new SnapshotIndexShardStatus(entry.shardId(shardEntry.key), stage);
}
shardStatusBuilder.add(shardStatus);
}
Expand Down

0 comments on commit 57092cc

Please sign in to comment.