Skip to content

Commit

Permalink
[7.14] Remove and inline methods in SnapshotsService.deleteSnapshots() (
Browse files Browse the repository at this point in the history
#76151) (#76164)

Backport of #76079
  • Loading branch information
tlrx committed Aug 5, 2021
1 parent 65a79b6 commit b59ee4d
Showing 1 changed file with 62 additions and 85 deletions.
147 changes: 62 additions & 85 deletions server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -286,14 +286,7 @@ public ClusterState execute(ClusterState currentState) {
"cannot snapshot while a snapshot deletion is in-progress in [" + deletionsInProgress + "]"
);
}
final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE);
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) {
throw new ConcurrentSnapshotExecutionException(
repositoryName,
snapshotName,
"cannot snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]"
);
}
ensureNoCleanupInProgress(currentState, repositoryName, snapshotName, "create snapshot");
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
// Fail if there are any concurrently running snapshots. The only exception to this being a snapshot in INIT state from
// a
Expand Down Expand Up @@ -496,7 +489,7 @@ public ClusterState execute(ClusterState currentState) {
if (concurrentOperationsAllowed == false && runningSnapshots.stream().anyMatch(entry -> entry.state() != State.INIT)) {
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, " a snapshot is already running");
}
ensureNoCleanupInProgress(currentState, repositoryName, snapshotName);
ensureNoCleanupInProgress(currentState, repositoryName, snapshotName, "create snapshot");
ensureBelowConcurrencyLimit(repositoryName, snapshotName, snapshots, deletionsInProgress);
// Store newSnapshot here to be processed in clusterStateProcessed
List<String> indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, request));
Expand Down Expand Up @@ -656,7 +649,7 @@ public void cloneSnapshot(CloneSnapshotRequest request, ActionListener<Void> lis
public ClusterState execute(ClusterState currentState) {
ensureRepositoryExists(repositoryName, currentState);
ensureSnapshotNameAvailableInRepo(repositoryData, snapshotName, repository);
ensureNoCleanupInProgress(currentState, repositoryName, snapshotName);
ensureNoCleanupInProgress(currentState, repositoryName, snapshotName, "clone snapshot");
final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
final List<SnapshotsInProgress.Entry> runningSnapshots = snapshots.entries();
ensureSnapshotNameNotRunning(runningSnapshots, repositoryName, snapshotName);
Expand Down Expand Up @@ -729,7 +722,12 @@ public void clusterStateProcessed(String source, ClusterState oldState, final Cl
}, "clone_snapshot [" + request.source() + "][" + snapshotName + ']', listener::onFailure);
}

private static void ensureNoCleanupInProgress(ClusterState currentState, String repositoryName, String snapshotName) {
private static void ensureNoCleanupInProgress(
final ClusterState currentState,
final String repositoryName,
final String snapshotName,
final String reason
) {
final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(
RepositoryCleanupInProgress.TYPE,
RepositoryCleanupInProgress.EMPTY
Expand All @@ -738,7 +736,13 @@ private static void ensureNoCleanupInProgress(ClusterState currentState, String
throw new ConcurrentSnapshotExecutionException(
repositoryName,
snapshotName,
"cannot snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]"
"cannot "
+ reason
+ " while a repository cleanup is in-progress in "
+ repositoryCleanupInProgress.entries()
.stream()
.map(RepositoryCleanupInProgress.Entry::repository)
.collect(Collectors.toSet())
);
}
}
Expand Down Expand Up @@ -2507,18 +2511,17 @@ private void failSnapshotCompletionListeners(Snapshot snapshot, Exception e) {
* @param listener listener
*/
public void deleteSnapshots(final DeleteSnapshotRequest request, final ActionListener<Void> listener) {

final String repositoryName = request.repository();
final String[] snapshotNames = request.snapshots();
final String repoName = request.repository();
logger.info(
() -> new ParameterizedMessage(
"deleting snapshots [{}] from repository [{}]",
Strings.arrayToCommaDelimitedString(snapshotNames),
repoName
repositoryName
)
);

final Repository repository = repositoriesService.repository(repoName);
final Repository repository = repositoriesService.repository(repositoryName);
repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask(request.masterNodeTimeout()) {

private Snapshot runningSnapshot;
Expand All @@ -2541,17 +2544,46 @@ public ClusterState execute(ClusterState currentState) throws Exception {
+ "]"
);
}
ensureRepositoryExists(repoName, currentState);
final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
final List<SnapshotsInProgress.Entry> snapshotEntries = findInProgressSnapshots(snapshots, snapshotNames, repoName);
final List<SnapshotId> snapshotIds = matchingSnapshotIds(
snapshotEntries.stream().map(e -> e.snapshot().getSnapshotId()).collect(Collectors.toList()),
repositoryData,
snapshotNames,
repoName
);
ensureRepositoryExists(repositoryName, currentState);
final List<SnapshotId> snapshotIds = new ArrayList<>();
final List<SnapshotsInProgress.Entry> snapshotEntries = new ArrayList<>();

// find in-progress snapshots to delete in cluster state
final SnapshotsInProgress snapshotsInProgress = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) {
final SnapshotId snapshotId = entry.snapshot().getSnapshotId();
if (entry.repository().equals(repositoryName) && Regex.simpleMatch(snapshotNames, snapshotId.getName())) {
snapshotIds.add(snapshotId);
snapshotEntries.add(entry);
}
}

// find snapshots to delete in repository data
final Map<String, SnapshotId> snapshotsIdsInRepository = repositoryData.getSnapshotIds()
.stream()
.collect(Collectors.toMap(SnapshotId::getName, Function.identity()));
for (String snapshotOrPattern : snapshotNames) {
if (Regex.isSimpleMatchPattern(snapshotOrPattern)) {
for (Map.Entry<String, SnapshotId> entry : snapshotsIdsInRepository.entrySet()) {
if (Regex.simpleMatch(snapshotOrPattern, entry.getKey())) {
snapshotIds.add(entry.getValue());
}
}
} else {
final SnapshotId foundId = snapshotsIdsInRepository.get(snapshotOrPattern);
if (foundId == null) {
if (snapshotEntries.stream()
.noneMatch(entry -> entry.snapshot().getSnapshotId().getName().equals(snapshotOrPattern))) {
throw new SnapshotMissingException(repositoryName, snapshotOrPattern);
}
} else {
snapshotIds.add(foundId);
}
}
}

if (snapshotEntries.isEmpty() || minNodeVersion.onOrAfter(SnapshotsService.FULL_CONCURRENCY_VERSION)) {
deleteFromRepoTask = createDeleteStateUpdate(snapshotIds, repoName, repositoryData, Priority.NORMAL, listener);
deleteFromRepoTask = createDeleteStateUpdate(snapshotIds, repositoryName, repositoryData, Priority.NORMAL, listener);
return deleteFromRepoTask.execute(currentState);
}
assert snapshotEntries.size() == 1 : "Expected just a single running snapshot but saw " + snapshotEntries;
Expand Down Expand Up @@ -2606,7 +2638,7 @@ public ClusterState execute(ClusterState currentState) throws Exception {
.putCustom(
SnapshotsInProgress.TYPE,
SnapshotsInProgress.of(
snapshots.entries()
snapshotsInProgress.entries()
.stream()
// remove init state snapshot we found from a previous master if there was one
.filter(existing -> abortedDuringInit == false || existing.equals(snapshotEntry) == false)
Expand Down Expand Up @@ -2646,7 +2678,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
} else {
clusterService.submitStateUpdateTask(
"delete snapshot",
createDeleteStateUpdate(outstandingDeletes, repoName, repositoryData, Priority.IMMEDIATE, listener)
createDeleteStateUpdate(outstandingDeletes, repositoryName, repositoryData, Priority.IMMEDIATE, listener)
);
}
return;
Expand All @@ -2656,7 +2688,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
logger.debug("deleted snapshot completed - deleting files");
clusterService.submitStateUpdateTask(
"delete snapshot",
createDeleteStateUpdate(outstandingDeletes, repoName, result.v1(), Priority.IMMEDIATE, listener)
createDeleteStateUpdate(outstandingDeletes, repositoryName, result.v1(), Priority.IMMEDIATE, listener)
);
}, e -> {
if (ExceptionsHelper.unwrap(e, NotMasterException.class, FailedToCommitClusterStateException.class) != null) {
Expand All @@ -2674,52 +2706,6 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
}, "delete snapshot", listener::onFailure);
}

private static List<SnapshotId> matchingSnapshotIds(
List<SnapshotId> inProgress,
RepositoryData repositoryData,
String[] snapshotsOrPatterns,
String repositoryName
) {
final Map<String, SnapshotId> allSnapshotIds = repositoryData.getSnapshotIds()
.stream()
.collect(Collectors.toMap(SnapshotId::getName, Function.identity()));
final Set<SnapshotId> foundSnapshots = new HashSet<>(inProgress);
for (String snapshotOrPattern : snapshotsOrPatterns) {
if (Regex.isSimpleMatchPattern(snapshotOrPattern)) {
for (Map.Entry<String, SnapshotId> entry : allSnapshotIds.entrySet()) {
if (Regex.simpleMatch(snapshotOrPattern, entry.getKey())) {
foundSnapshots.add(entry.getValue());
}
}
} else {
final SnapshotId foundId = allSnapshotIds.get(snapshotOrPattern);
if (foundId == null) {
if (inProgress.stream().noneMatch(snapshotId -> snapshotId.getName().equals(snapshotOrPattern))) {
throw new SnapshotMissingException(repositoryName, snapshotOrPattern);
}
} else {
foundSnapshots.add(allSnapshotIds.get(snapshotOrPattern));
}
}
}
return Collections.unmodifiableList(new ArrayList<>(foundSnapshots));
}

// Return in-progress snapshot entries by name and repository in the given cluster state or null if none is found
private static List<SnapshotsInProgress.Entry> findInProgressSnapshots(
SnapshotsInProgress snapshots,
String[] snapshotNames,
String repositoryName
) {
List<SnapshotsInProgress.Entry> entries = new ArrayList<>();
for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
if (entry.repository().equals(repositoryName) && Regex.simpleMatch(snapshotNames, entry.snapshot().getSnapshotId().getName())) {
entries.add(entry);
}
}
return entries;
}

private ClusterStateUpdateTask createDeleteStateUpdate(
List<SnapshotId> snapshotIds,
String repoName,
Expand Down Expand Up @@ -2775,16 +2761,7 @@ public ClusterState execute(ClusterState currentState) {
);
}
}
final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(
RepositoryCleanupInProgress.TYPE,
RepositoryCleanupInProgress.EMPTY
);
if (repositoryCleanupInProgress.hasCleanupInProgress()) {
throw new ConcurrentSnapshotExecutionException(
new Snapshot(repoName, snapshotIds.get(0)),
"cannot delete snapshots while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]"
);
}
ensureNoCleanupInProgress(currentState, repoName, snapshotIds.get(0).getName(), "delete snapshot");
final RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY);
// don't allow snapshot deletions while a restore is taking place,
// otherwise we could end up deleting a snapshot that is being restored
Expand Down

0 comments on commit b59ee4d

Please sign in to comment.