Skip to content

Commit

Permalink
Remove Blocking on GENERIC Pool in GET Snapshots (#69101)
Browse files Browse the repository at this point in the history
This aligns the way get snapshots works in `7.x` with how it works
in `master` exactly (except for masters ability to load snapshots across repos)
and thus fixes the deadlock in #69099.

Closes #69099
  • Loading branch information
original-brownbear committed Feb 17, 2021
1 parent ac48469 commit 27b60e5
Showing 1 changed file with 86 additions and 74 deletions.
Expand Up @@ -13,8 +13,9 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.SnapshotsInProgress;
Expand Down Expand Up @@ -62,7 +63,7 @@ public TransportGetSnapshotsAction(TransportService transportService, ClusterSer
ThreadPool threadPool, RepositoriesService repositoriesService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(GetSnapshotsAction.NAME, transportService, clusterService, threadPool, actionFilters,
GetSnapshotsRequest::new, indexNameExpressionResolver, GetSnapshotsResponse::new, ThreadPool.Names.GENERIC);
GetSnapshotsRequest::new, indexNameExpressionResolver, GetSnapshotsResponse::new, ThreadPool.Names.SAME);
this.repositoriesService = repositoriesService;
}

Expand All @@ -74,72 +75,27 @@ protected ClusterBlockException checkBlock(GetSnapshotsRequest request, ClusterS
@Override
protected void masterOperation(final GetSnapshotsRequest request, final ClusterState state,
final ActionListener<GetSnapshotsResponse> listener) {
try {
final String repository = request.repository();
final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE);
final Map<String, SnapshotId> allSnapshotIds = new HashMap<>();
final List<SnapshotInfo> currentSnapshots = new ArrayList<>();
for (SnapshotInfo snapshotInfo : sortedCurrentSnapshots(snapshotsInProgress, repository)) {
SnapshotId snapshotId = snapshotInfo.snapshotId();
allSnapshotIds.put(snapshotId.getName(), snapshotId);
currentSnapshots.add(snapshotInfo);
}

final RepositoryData repositoryData;
if (isCurrentSnapshotsOnly(request.snapshots()) == false) {
repositoryData = PlainActionFuture.get(fut -> repositoriesService.getRepositoryData(repository, fut));
for (SnapshotId snapshotId : repositoryData.getSnapshotIds()) {
allSnapshotIds.put(snapshotId.getName(), snapshotId);
}
} else {
repositoryData = null;
}

final Set<SnapshotId> toResolve = new HashSet<>();
if (isAllSnapshots(request.snapshots())) {
toResolve.addAll(allSnapshotIds.values());
} else {
for (String snapshotOrPattern : request.snapshots()) {
if (GetSnapshotsRequest.CURRENT_SNAPSHOT.equalsIgnoreCase(snapshotOrPattern)) {
toResolve.addAll(currentSnapshots.stream().map(SnapshotInfo::snapshotId).collect(Collectors.toList()));
} else if (Regex.isSimpleMatchPattern(snapshotOrPattern) == false) {
if (allSnapshotIds.containsKey(snapshotOrPattern)) {
toResolve.add(allSnapshotIds.get(snapshotOrPattern));
} else if (request.ignoreUnavailable() == false) {
throw new SnapshotMissingException(repository, snapshotOrPattern);
}
} else {
for (Map.Entry<String, SnapshotId> entry : allSnapshotIds.entrySet()) {
if (Regex.simpleMatch(snapshotOrPattern, entry.getKey())) {
toResolve.add(entry.getValue());
}
}
}
}

if (toResolve.isEmpty() && request.ignoreUnavailable() == false && isCurrentSnapshotsOnly(request.snapshots()) == false) {
throw new SnapshotMissingException(repository, request.snapshots()[0]);
}
}
final String repo = request.repository();
final String[] snapshots = request.snapshots();
final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
final Map<String, SnapshotId> allSnapshotIds = new HashMap<>();
final List<SnapshotInfo> currentSnapshots = new ArrayList<>();
for (SnapshotInfo snapshotInfo : sortedCurrentSnapshots(snapshotsInProgress, repo)) {
SnapshotId snapshotId = snapshotInfo.snapshotId();
allSnapshotIds.put(snapshotId.getName(), snapshotId);
currentSnapshots.add(snapshotInfo);
}

final List<SnapshotInfo> snapshotInfos;
if (request.verbose()) {
snapshotInfos = snapshots(
snapshotsInProgress, repository, new ArrayList<>(toResolve), request.ignoreUnavailable());
} else {
if (repositoryData != null) {
// want non-current snapshots as well, which are found in the repository data
snapshotInfos = buildSimpleSnapshotInfos(toResolve, repositoryData, currentSnapshots);
} else {
// only want current snapshots
snapshotInfos = currentSnapshots.stream().map(SnapshotInfo::basic).collect(Collectors.toList());
CollectionUtil.timSort(snapshotInfos);
}
}
listener.onResponse(new GetSnapshotsResponse(snapshotInfos));
} catch (Exception e) {
listener.onFailure(e);
final StepListener<RepositoryData> repositoryDataListener = new StepListener<>();
if (isCurrentSnapshotsOnly(snapshots)) {
repositoryDataListener.onResponse(null);
} else {
repositoriesService.getRepositoryData(repo, repositoryDataListener);
}

repositoryDataListener.whenComplete(repositoryData -> loadSnapshotInfos(snapshotsInProgress, repo, snapshots,
request.ignoreUnavailable(), request.verbose(), allSnapshotIds, currentSnapshots, repositoryData,
listener.map(GetSnapshotsResponse::new)), listener::onFailure);
}

/**
Expand All @@ -149,17 +105,72 @@ protected void masterOperation(final GetSnapshotsRequest request, final ClusterS
* @param repositoryName repository name
* @return list of snapshots
*/
private static List<SnapshotInfo> sortedCurrentSnapshots(@Nullable SnapshotsInProgress snapshotsInProgress, String repositoryName) {
private static List<SnapshotInfo> sortedCurrentSnapshots(SnapshotsInProgress snapshotsInProgress, String repositoryName) {
List<SnapshotInfo> snapshotList = new ArrayList<>();
List<SnapshotsInProgress.Entry> entries =
SnapshotsService.currentSnapshots(snapshotsInProgress, repositoryName, Collections.emptyList());
SnapshotsService.currentSnapshots(snapshotsInProgress, repositoryName, Collections.emptyList());
for (SnapshotsInProgress.Entry entry : entries) {
snapshotList.add(new SnapshotInfo(entry));
}
CollectionUtil.timSort(snapshotList);
return unmodifiableList(snapshotList);
}


private void loadSnapshotInfos(SnapshotsInProgress snapshotsInProgress, String repo, String[] snapshots,
boolean ignoreUnavailable, boolean verbose, Map<String, SnapshotId> allSnapshotIds,
List<SnapshotInfo> currentSnapshots, @Nullable RepositoryData repositoryData,
ActionListener<List<SnapshotInfo>> listener) {
if (repositoryData != null) {
for (SnapshotId snapshotId : repositoryData.getSnapshotIds()) {
allSnapshotIds.put(snapshotId.getName(), snapshotId);
}
}

final Set<SnapshotId> toResolve = new HashSet<>();
if (isAllSnapshots(snapshots)) {
toResolve.addAll(allSnapshotIds.values());
} else {
for (String snapshotOrPattern : snapshots) {
if (GetSnapshotsRequest.CURRENT_SNAPSHOT.equalsIgnoreCase(snapshotOrPattern)) {
toResolve.addAll(currentSnapshots.stream().map(SnapshotInfo::snapshotId).collect(Collectors.toList()));
} else if (Regex.isSimpleMatchPattern(snapshotOrPattern) == false) {
if (allSnapshotIds.containsKey(snapshotOrPattern)) {
toResolve.add(allSnapshotIds.get(snapshotOrPattern));
} else if (ignoreUnavailable == false) {
throw new SnapshotMissingException(repo, snapshotOrPattern);
}
} else {
for (Map.Entry<String, SnapshotId> entry : allSnapshotIds.entrySet()) {
if (Regex.simpleMatch(snapshotOrPattern, entry.getKey())) {
toResolve.add(entry.getValue());
}
}
}
}

if (toResolve.isEmpty() && ignoreUnavailable == false && isCurrentSnapshotsOnly(snapshots) == false) {
throw new SnapshotMissingException(repo, snapshots[0]);
}
}

if (verbose) {
threadPool.generic().execute(ActionRunnable.supply(
listener, () -> snapshots(snapshotsInProgress, repo, new ArrayList<>(toResolve), ignoreUnavailable)));
} else {
final List<SnapshotInfo> snapshotInfos;
if (repositoryData != null) {
// want non-current snapshots as well, which are found in the repository data
snapshotInfos = buildSimpleSnapshotInfos(toResolve, repositoryData, currentSnapshots);
} else {
// only want current snapshots
snapshotInfos = currentSnapshots.stream().map(SnapshotInfo::basic).collect(Collectors.toList());
CollectionUtil.timSort(snapshotInfos);
}
listener.onResponse(snapshotInfos);
}
}

/**
* Returns a list of snapshots from repository sorted by snapshot creation date
*
Expand All @@ -170,16 +181,17 @@ private static List<SnapshotInfo> sortedCurrentSnapshots(@Nullable SnapshotsInPr
* if false, they will throw an error
* @return list of snapshots
*/
private List<SnapshotInfo> snapshots(@Nullable SnapshotsInProgress snapshotsInProgress, String repositoryName,
private List<SnapshotInfo> snapshots(SnapshotsInProgress snapshotsInProgress, String repositoryName,
List<SnapshotId> snapshotIds, boolean ignoreUnavailable) {
final Set<SnapshotInfo> snapshotSet = new HashSet<>();
final Set<SnapshotId> snapshotIdsToIterate = new HashSet<>(snapshotIds);
// first, look at the snapshots in progress
final List<SnapshotsInProgress.Entry> entries = SnapshotsService.currentSnapshots(
snapshotsInProgress, repositoryName, snapshotIdsToIterate.stream().map(SnapshotId::getName).collect(Collectors.toList()));
for (SnapshotsInProgress.Entry entry : entries) {
snapshotSet.add(new SnapshotInfo(entry));
snapshotIdsToIterate.remove(entry.snapshot().getSnapshotId());
if (snapshotIdsToIterate.remove(entry.snapshot().getSnapshotId())) {
snapshotSet.add(new SnapshotInfo(entry));
}
}
// then, look in the repository
final Repository repository = repositoriesService.repository(repositoryName);
Expand Down Expand Up @@ -210,7 +222,7 @@ private boolean isCurrentSnapshotsOnly(String[] snapshots) {
return (snapshots.length == 1 && GetSnapshotsRequest.CURRENT_SNAPSHOT.equalsIgnoreCase(snapshots[0]));
}

private List<SnapshotInfo> buildSimpleSnapshotInfos(final Set<SnapshotId> toResolve,
private static List<SnapshotInfo> buildSimpleSnapshotInfos(final Set<SnapshotId> toResolve,
final RepositoryData repositoryData,
final List<SnapshotInfo> currentSnapshots) {
List<SnapshotInfo> snapshotInfos = new ArrayList<>();
Expand All @@ -224,7 +236,7 @@ private List<SnapshotInfo> buildSimpleSnapshotInfos(final Set<SnapshotId> toReso
for (SnapshotId snapshotId : repositoryData.getSnapshots(indexId)) {
if (toResolve.contains(snapshotId)) {
snapshotsToIndices.computeIfAbsent(snapshotId, (k) -> new ArrayList<>())
.add(indexId.getName());
.add(indexId.getName());
}
}
}
Expand All @@ -235,6 +247,6 @@ private List<SnapshotInfo> buildSimpleSnapshotInfos(final Set<SnapshotId> toReso
repositoryData.getSnapshotState(snapshotId)));
}
CollectionUtil.timSort(snapshotInfos);
return unmodifiableList(snapshotInfos);
return Collections.unmodifiableList(snapshotInfos);
}
}

0 comments on commit 27b60e5

Please sign in to comment.