Skip to content

Commit

Permalink
Refactor SnapshotsInProgress to Track Snapshots By Repository (#77984) (
Browse files Browse the repository at this point in the history
#79328)

First step in making `SnapshotsInProgress` easier to work with
by tracking snapshots per repository. This allows simplifying the
concurrency logic in a couple of places and sets up a follow-up
that would invert the current list of maps for snapshots that is
very hard to reason about in the concurrency logic into a map
of lists that maps repo-shard to snapshots to make the logic
more obviously correct.
  • Loading branch information
original-brownbear committed Oct 18, 2021
1 parent ca24a64 commit f37d528
Show file tree
Hide file tree
Showing 26 changed files with 720 additions and 679 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@ public ClusterState.Builder remove(ClusterState.Builder builder, String name) {
public ClusterState.Custom randomCreate(String name) {
switch (randomIntBetween(0, 1)) {
case 0:
return SnapshotsInProgress.of(Collections.singletonList(new SnapshotsInProgress.Entry(
return SnapshotsInProgress.EMPTY.withAddedEntry(new SnapshotsInProgress.Entry(
new Snapshot(randomName("repo"), new SnapshotId(randomName("snap"), UUIDs.randomBase64UUID())),
randomBoolean(),
randomBoolean(),
Expand All @@ -711,7 +711,7 @@ public ClusterState.Custom randomCreate(String name) {
ImmutableOpenMap.of(),
null,
SnapshotInfoTestUtils.randomUserMetadata(),
randomVersion(random()))));
randomVersion(random())));
case 1:
return new RestoreInProgress.Builder().add(
new RestoreInProgress.Entry(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ public void testDisruptionAfterFinalization() throws Exception {

createRandomIndex(idxName);

createRepository("test-repo", "fs");
final String repoName = "test-repo";
createRepository(repoName, "fs");

final String masterNode1 = internalCluster().getMasterName();

Expand All @@ -82,12 +83,12 @@ public void testDisruptionAfterFinalization() throws Exception {
@Override
public void clusterChanged(ClusterChangedEvent event) {
SnapshotsInProgress snapshots = event.state().custom(SnapshotsInProgress.TYPE);
if (snapshots != null && snapshots.entries().size() > 0) {
final SnapshotsInProgress.Entry snapshotEntry = snapshots.entries().get(0);
if (snapshots != null && snapshots.isEmpty() == false) {
final SnapshotsInProgress.Entry snapshotEntry = snapshots.forRepo(repoName).get(0);
if (snapshotEntry.state() == SnapshotsInProgress.State.SUCCESS) {
final RepositoriesMetadata repoMeta =
event.state().metadata().custom(RepositoriesMetadata.TYPE);
final RepositoryMetadata metadata = repoMeta.repository("test-repo");
final RepositoryMetadata metadata = repoMeta.repository(repoName);
if (metadata.pendingGeneration() > snapshotEntry.repositoryStateId()) {
logger.info("--> starting disruption");
networkDisruption.startDisrupting();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ public void testStartCloneWithSuccessfulShardSnapshotPendingFinalization() throw
try {
awaitClusterState(clusterState -> {
final List<SnapshotsInProgress.Entry> entries = clusterState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY)
.entries();
.forRepo(repoName);
return entries.size() == 2 && entries.get(1).shardsByRepoShardId().isEmpty() == false;
});
assertFalse(blockedSnapshot.isDone());
Expand Down Expand Up @@ -679,7 +679,7 @@ public void testStartCloneDuringRunningDelete() throws Exception {
final ActionFuture<AcknowledgedResponse> cloneFuture = startClone(repoName, sourceSnapshot, "target-snapshot", indexName);
logger.info("--> waiting for snapshot clone to be fully initialized");
awaitClusterState(state -> {
for (SnapshotsInProgress.Entry entry : state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries()) {
for (SnapshotsInProgress.Entry entry : state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).forRepo(repoName)) {
if (entry.shardsByRepoShardId().isEmpty() == false) {
assertEquals(sourceSnapshot, entry.source().getName());
for (SnapshotsInProgress.ShardSnapshotStatus value : entry.shardsByRepoShardId().values()) {
Expand Down Expand Up @@ -722,7 +722,7 @@ public void testManyConcurrentClonesStartOutOfOrder() throws Exception {
awaitNumberOfSnapshotsInProgress(2);
awaitClusterState(
state -> state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY)
.entries()
.forRepo(repoName)
.stream()
.anyMatch(entry -> entry.state().completed())
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ public void testAbortOneOfMultipleSnapshots() throws Exception {
logger.info("--> wait for snapshot on second data node to finish");
awaitClusterState(state -> {
final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
return snapshotsInProgress.entries().size() == 2 && snapshotHasCompletedShard(secondSnapshot, snapshotsInProgress);
return snapshotsInProgress.count() == 2 && snapshotHasCompletedShard(repoName, secondSnapshot, snapshotsInProgress);
});

final ActionFuture<AcknowledgedResponse> deleteSnapshotsResponse = startDeleteSnapshot(repoName, firstSnapshot);
Expand Down Expand Up @@ -375,7 +375,7 @@ public void testCascadedAborts() throws Exception {
logger.info("--> wait for snapshot on second data node to finish");
awaitClusterState(state -> {
final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
return snapshotsInProgress.entries().size() == 2 && snapshotHasCompletedShard(secondSnapshot, snapshotsInProgress);
return snapshotsInProgress.count() == 2 && snapshotHasCompletedShard(repoName, secondSnapshot, snapshotsInProgress);
});

final ActionFuture<AcknowledgedResponse> deleteSnapshotsResponse = startDeleteSnapshot(repoName, firstSnapshot);
Expand All @@ -395,7 +395,7 @@ public void testCascadedAborts() throws Exception {
assertBusy(() -> {
assertThat(currentSnapshots(repoName), hasSize(1));
final SnapshotsInProgress snapshotsInProgress = clusterService().state().custom(SnapshotsInProgress.TYPE);
assertThat(snapshotsInProgress.entries().get(0).state(), is(SnapshotsInProgress.State.ABORTED));
assertThat(snapshotsInProgress.forRepo(repoName).get(0).state(), is(SnapshotsInProgress.State.ABORTED));
}, 30L, TimeUnit.SECONDS);

unblockNode(repoName, dataNode);
Expand Down Expand Up @@ -438,7 +438,7 @@ public void testMasterFailOverWithQueuedDeletes() throws Exception {
logger.info("--> wait for snapshot on second data node to finish");
awaitClusterState(state -> {
final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
return snapshotsInProgress.entries().size() == 2 && snapshotHasCompletedShard(secondSnapshot, snapshotsInProgress);
return snapshotsInProgress.count() == 2 && snapshotHasCompletedShard(repoName, secondSnapshot, snapshotsInProgress);
});

final ActionFuture<AcknowledgedResponse> firstDeleteFuture = startDeleteFromNonMasterClient(repoName, firstSnapshot);
Expand Down Expand Up @@ -466,7 +466,7 @@ public void testMasterFailOverWithQueuedDeletes() throws Exception {
assertThat(currentSnapshots(repoName), hasSize(2));
for (SnapshotsInProgress.Entry entry : clusterService().state()
.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY)
.entries()) {
.forRepo(repoName)) {
assertThat(entry.state(), is(SnapshotsInProgress.State.ABORTED));
assertThat(entry.snapshot().getSnapshotId().getName(), not(secondSnapshot));
}
Expand Down Expand Up @@ -1500,12 +1500,15 @@ public void testOutOfOrderAndConcurrentFinalization() throws Exception {
.execute();

awaitClusterState(state -> {
final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE);
return snapshotsInProgress.entries().size() == 2 && snapshotsInProgress.entries().get(1).state().completed();
final List<SnapshotsInProgress.Entry> snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY)
.forRepo(repository);
return snapshotsInProgress.size() == 2 && snapshotsInProgress.get(1).state().completed();
});

unblockAllDataNodes(repository);
awaitClusterState(state -> state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries().get(0).state().completed());
awaitClusterState(
state -> state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).forRepo(repository).get(0).state().completed()
);

unblockNode(repository, master);
assertSuccessful(snapshot2);
Expand Down Expand Up @@ -1987,8 +1990,8 @@ private void createIndexWithContent(String indexName, String nodeInclude, String
);
}

private static boolean snapshotHasCompletedShard(String snapshot, SnapshotsInProgress snapshotsInProgress) {
for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) {
private static boolean snapshotHasCompletedShard(String repoName, String snapshot, SnapshotsInProgress snapshotsInProgress) {
for (SnapshotsInProgress.Entry entry : snapshotsInProgress.forRepo(repoName)) {
if (entry.snapshot().getSnapshotId().getName().equals(snapshot)) {
for (SnapshotsInProgress.ShardSnapshotStatus shard : entry.shards().values()) {
if (shard.state().completed()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1071,7 +1071,7 @@ public void onRequestSent(
logger,
otherDataNode,
state -> state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY)
.entries()
.forRepo(repoName)
.stream()
.anyMatch(entry -> entry.state() == SnapshotsInProgress.State.ABORTED)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public ClusterState execute(ClusterState currentState) {
);
}
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
if (snapshots.entries().isEmpty() == false) {
if (snapshots.isEmpty() == false) {
throw new IllegalStateException(
"Cannot cleanup [" + repositoryName + "] - a snapshot is currently running in [" + snapshots + "]"
);
Expand Down

0 comments on commit f37d528

Please sign in to comment.