Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Blocking on GENERIC Pool in GET Snapshots #69101

Merged
merged 2 commits into from Feb 17, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -13,8 +13,9 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.SnapshotsInProgress;
Expand Down Expand Up @@ -62,7 +63,7 @@ public TransportGetSnapshotsAction(TransportService transportService, ClusterSer
ThreadPool threadPool, RepositoriesService repositoriesService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(GetSnapshotsAction.NAME, transportService, clusterService, threadPool, actionFilters,
GetSnapshotsRequest::new, indexNameExpressionResolver, GetSnapshotsResponse::new, ThreadPool.Names.GENERIC);
GetSnapshotsRequest::new, indexNameExpressionResolver, GetSnapshotsResponse::new, ThreadPool.Names.SAME);
this.repositoriesService = repositoriesService;
}

Expand All @@ -74,72 +75,27 @@ protected ClusterBlockException checkBlock(GetSnapshotsRequest request, ClusterS
@Override
protected void masterOperation(final GetSnapshotsRequest request, final ClusterState state,
final ActionListener<GetSnapshotsResponse> listener) {
try {
final String repository = request.repository();
final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE);
final Map<String, SnapshotId> allSnapshotIds = new HashMap<>();
final List<SnapshotInfo> currentSnapshots = new ArrayList<>();
for (SnapshotInfo snapshotInfo : sortedCurrentSnapshots(snapshotsInProgress, repository)) {
SnapshotId snapshotId = snapshotInfo.snapshotId();
allSnapshotIds.put(snapshotId.getName(), snapshotId);
currentSnapshots.add(snapshotInfo);
}

final RepositoryData repositoryData;
if (isCurrentSnapshotsOnly(request.snapshots()) == false) {
repositoryData = PlainActionFuture.get(fut -> repositoriesService.getRepositoryData(repository, fut));
for (SnapshotId snapshotId : repositoryData.getSnapshotIds()) {
allSnapshotIds.put(snapshotId.getName(), snapshotId);
}
} else {
repositoryData = null;
}

final Set<SnapshotId> toResolve = new HashSet<>();
if (isAllSnapshots(request.snapshots())) {
toResolve.addAll(allSnapshotIds.values());
} else {
for (String snapshotOrPattern : request.snapshots()) {
if (GetSnapshotsRequest.CURRENT_SNAPSHOT.equalsIgnoreCase(snapshotOrPattern)) {
toResolve.addAll(currentSnapshots.stream().map(SnapshotInfo::snapshotId).collect(Collectors.toList()));
} else if (Regex.isSimpleMatchPattern(snapshotOrPattern) == false) {
if (allSnapshotIds.containsKey(snapshotOrPattern)) {
toResolve.add(allSnapshotIds.get(snapshotOrPattern));
} else if (request.ignoreUnavailable() == false) {
throw new SnapshotMissingException(repository, snapshotOrPattern);
}
} else {
for (Map.Entry<String, SnapshotId> entry : allSnapshotIds.entrySet()) {
if (Regex.simpleMatch(snapshotOrPattern, entry.getKey())) {
toResolve.add(entry.getValue());
}
}
}
}

if (toResolve.isEmpty() && request.ignoreUnavailable() == false && isCurrentSnapshotsOnly(request.snapshots()) == false) {
throw new SnapshotMissingException(repository, request.snapshots()[0]);
}
}
final String repo = request.repository();
final String[] snapshots = request.snapshots();
final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
final Map<String, SnapshotId> allSnapshotIds = new HashMap<>();
final List<SnapshotInfo> currentSnapshots = new ArrayList<>();
for (SnapshotInfo snapshotInfo : sortedCurrentSnapshots(snapshotsInProgress, repo)) {
SnapshotId snapshotId = snapshotInfo.snapshotId();
allSnapshotIds.put(snapshotId.getName(), snapshotId);
currentSnapshots.add(snapshotInfo);
}

final List<SnapshotInfo> snapshotInfos;
if (request.verbose()) {
snapshotInfos = snapshots(
snapshotsInProgress, repository, new ArrayList<>(toResolve), request.ignoreUnavailable());
} else {
if (repositoryData != null) {
// want non-current snapshots as well, which are found in the repository data
snapshotInfos = buildSimpleSnapshotInfos(toResolve, repositoryData, currentSnapshots);
} else {
// only want current snapshots
snapshotInfos = currentSnapshots.stream().map(SnapshotInfo::basic).collect(Collectors.toList());
CollectionUtil.timSort(snapshotInfos);
}
}
listener.onResponse(new GetSnapshotsResponse(snapshotInfos));
} catch (Exception e) {
listener.onFailure(e);
final StepListener<RepositoryData> repositoryDataListener = new StepListener<>();
if (isCurrentSnapshotsOnly(snapshots)) {
repositoryDataListener.onResponse(null);
} else {
repositoriesService.getRepositoryData(repo, repositoryDataListener);
}

repositoryDataListener.whenComplete(repositoryData -> loadSnapshotInfos(snapshotsInProgress, repo, snapshots,
request.ignoreUnavailable(), request.verbose(), allSnapshotIds, currentSnapshots, repositoryData,
listener.map(GetSnapshotsResponse::new)), listener::onFailure);
}

/**
Expand All @@ -149,17 +105,72 @@ protected void masterOperation(final GetSnapshotsRequest request, final ClusterS
* @param repositoryName repository name
* @return list of snapshots
*/
private static List<SnapshotInfo> sortedCurrentSnapshots(@Nullable SnapshotsInProgress snapshotsInProgress, String repositoryName) {
private static List<SnapshotInfo> sortedCurrentSnapshots(SnapshotsInProgress snapshotsInProgress, String repositoryName) {
List<SnapshotInfo> snapshotList = new ArrayList<>();
List<SnapshotsInProgress.Entry> entries =
SnapshotsService.currentSnapshots(snapshotsInProgress, repositoryName, Collections.emptyList());
SnapshotsService.currentSnapshots(snapshotsInProgress, repositoryName, Collections.emptyList());
for (SnapshotsInProgress.Entry entry : entries) {
snapshotList.add(new SnapshotInfo(entry));
}
CollectionUtil.timSort(snapshotList);
return unmodifiableList(snapshotList);
}


private void loadSnapshotInfos(SnapshotsInProgress snapshotsInProgress, String repo, String[] snapshots,
boolean ignoreUnavailable, boolean verbose, Map<String, SnapshotId> allSnapshotIds,
List<SnapshotInfo> currentSnapshots, @Nullable RepositoryData repositoryData,
ActionListener<List<SnapshotInfo>> listener) {
if (repositoryData != null) {
for (SnapshotId snapshotId : repositoryData.getSnapshotIds()) {
allSnapshotIds.put(snapshotId.getName(), snapshotId);
}
}

final Set<SnapshotId> toResolve = new HashSet<>();
if (isAllSnapshots(snapshots)) {
toResolve.addAll(allSnapshotIds.values());
} else {
for (String snapshotOrPattern : snapshots) {
if (GetSnapshotsRequest.CURRENT_SNAPSHOT.equalsIgnoreCase(snapshotOrPattern)) {
toResolve.addAll(currentSnapshots.stream().map(SnapshotInfo::snapshotId).collect(Collectors.toList()));
} else if (Regex.isSimpleMatchPattern(snapshotOrPattern) == false) {
if (allSnapshotIds.containsKey(snapshotOrPattern)) {
toResolve.add(allSnapshotIds.get(snapshotOrPattern));
} else if (ignoreUnavailable == false) {
throw new SnapshotMissingException(repo, snapshotOrPattern);
}
} else {
for (Map.Entry<String, SnapshotId> entry : allSnapshotIds.entrySet()) {
if (Regex.simpleMatch(snapshotOrPattern, entry.getKey())) {
toResolve.add(entry.getValue());
}
}
}
}

if (toResolve.isEmpty() && ignoreUnavailable == false && isCurrentSnapshotsOnly(snapshots) == false) {
throw new SnapshotMissingException(repo, snapshots[0]);
}
}

if (verbose) {
threadPool.generic().execute(ActionRunnable.supply(
listener, () -> snapshots(snapshotsInProgress, repo, new ArrayList<>(toResolve), ignoreUnavailable)));
} else {
final List<SnapshotInfo> snapshotInfos;
if (repositoryData != null) {
// want non-current snapshots as well, which are found in the repository data
snapshotInfos = buildSimpleSnapshotInfos(toResolve, repositoryData, currentSnapshots);
} else {
// only want current snapshots
snapshotInfos = currentSnapshots.stream().map(SnapshotInfo::basic).collect(Collectors.toList());
CollectionUtil.timSort(snapshotInfos);
}
listener.onResponse(snapshotInfos);
}
}

/**
* Returns a list of snapshots from repository sorted by snapshot creation date
*
Expand All @@ -170,16 +181,17 @@ private static List<SnapshotInfo> sortedCurrentSnapshots(@Nullable SnapshotsInPr
* if false, they will throw an error
* @return list of snapshots
*/
private List<SnapshotInfo> snapshots(@Nullable SnapshotsInProgress snapshotsInProgress, String repositoryName,
private List<SnapshotInfo> snapshots(SnapshotsInProgress snapshotsInProgress, String repositoryName,
List<SnapshotId> snapshotIds, boolean ignoreUnavailable) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: spurious whitespace deviation from master :)

Suggested change
List<SnapshotId> snapshotIds, boolean ignoreUnavailable) {
List<SnapshotId> snapshotIds, boolean ignoreUnavailable) {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The version here is actually nicer/more correct :D that's why I deviated. I'd rather fix this in master next time a change to the file is made there :)

final Set<SnapshotInfo> snapshotSet = new HashSet<>();
final Set<SnapshotId> snapshotIdsToIterate = new HashSet<>(snapshotIds);
// first, look at the snapshots in progress
final List<SnapshotsInProgress.Entry> entries = SnapshotsService.currentSnapshots(
snapshotsInProgress, repositoryName, snapshotIdsToIterate.stream().map(SnapshotId::getName).collect(Collectors.toList()));
for (SnapshotsInProgress.Entry entry : entries) {
snapshotSet.add(new SnapshotInfo(entry));
snapshotIdsToIterate.remove(entry.snapshot().getSnapshotId());
if (snapshotIdsToIterate.remove(entry.snapshot().getSnapshotId())) {
snapshotSet.add(new SnapshotInfo(entry));
}
}
// then, look in the repository
final Repository repository = repositoriesService.repository(repositoryName);
Expand Down Expand Up @@ -210,7 +222,7 @@ private boolean isCurrentSnapshotsOnly(String[] snapshots) {
return (snapshots.length == 1 && GetSnapshotsRequest.CURRENT_SNAPSHOT.equalsIgnoreCase(snapshots[0]));
}

private List<SnapshotInfo> buildSimpleSnapshotInfos(final Set<SnapshotId> toResolve,
private static List<SnapshotInfo> buildSimpleSnapshotInfos(final Set<SnapshotId> toResolve,
final RepositoryData repositoryData,
final List<SnapshotInfo> currentSnapshots) {
List<SnapshotInfo> snapshotInfos = new ArrayList<>();
Expand All @@ -224,7 +236,7 @@ private List<SnapshotInfo> buildSimpleSnapshotInfos(final Set<SnapshotId> toReso
for (SnapshotId snapshotId : repositoryData.getSnapshots(indexId)) {
if (toResolve.contains(snapshotId)) {
snapshotsToIndices.computeIfAbsent(snapshotId, (k) -> new ArrayList<>())
.add(indexId.getName());
.add(indexId.getName());
}
}
}
Expand All @@ -235,6 +247,6 @@ private List<SnapshotInfo> buildSimpleSnapshotInfos(final Set<SnapshotId> toReso
repositoryData.getSnapshotState(snapshotId)));
}
CollectionUtil.timSort(snapshotInfos);
return unmodifiableList(snapshotInfos);
return Collections.unmodifiableList(snapshotInfos);
}
}