diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java index d9fef7e0af8af..896b336d54d7b 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java @@ -276,55 +276,67 @@ void runOperation(ActionListener listener) { */ private void populateResults(ActionListener listener) { try (var listeners = new RefCountingListener(listener)) { - for (final RepositoryMetadata repository : repositories) { - final String repositoryName = repository.name(); - if (skipRepository(repositoryName)) { - continue; - } - if (listeners.isFailing()) { - return; - } + final BooleanSupplier failFastSupplier = () -> cancellableTask.isCancelled() || listeners.isFailing(); + + final Iterator asyncSnapshotInfoIterators = Iterators.failFast( + Iterators.map( + Iterators.filter( + Iterators.map(repositories.iterator(), RepositoryMetadata::name), + repositoryName -> skipRepository(repositoryName) == false + ), + repositoryName -> asyncRepositoryContentsListener -> SubscribableListener + + .newForked(l -> maybeGetRepositoryData(repositoryName, l)) + .andThenApply(repositoryData -> { + assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.MANAGEMENT); + cancellableTask.ensureNotCancelled(); + ensureRequiredNamesPresent(repositoryName, repositoryData); + return getAsyncSnapshotInfoIterator(repositoriesService.repository(repositoryName), repositoryData); + }) + .addListener(asyncRepositoryContentsListener) + ), + failFastSupplier + ); - maybeGetRepositoryData(repositoryName, listeners.acquire(repositoryData -> { - assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.MANAGEMENT); - cancellableTask.ensureNotCancelled(); - ensureRequiredNamesPresent(repositoryName, repositoryData); - ThrottledIterator.run( - Iterators.failFast( - getAsyncSnapshotInfoIterator(repositoriesService.repository(repositoryName), repositoryData), - () -> cancellableTask.isCancelled() || listeners.isFailing() - ), - (ref, asyncSnapshotInfo) -> ActionListener.run( - ActionListener.runBefore(listeners.acquire(), ref::close), - refListener -> asyncSnapshotInfo.getSnapshotInfo(new ActionListener<>() { - @Override - public void onResponse(SnapshotInfo snapshotInfo) { - if (matchesPredicates(snapshotInfo)) { - totalCount.incrementAndGet(); - if (afterPredicate.test(snapshotInfo)) { - allSnapshotInfos.add(snapshotInfo.maybeWithoutIndices(indices)); + // TODO if the request parameters allow it, modify asyncSnapshotInfoIterators to skip unnecessary GET calls here + + asyncSnapshotInfoIterators.forEachRemaining( + asyncSnapshotInfoIteratorSupplier -> asyncSnapshotInfoIteratorSupplier.getAsyncSnapshotInfoIterator( + listeners.acquire( + asyncSnapshotInfoIterator -> ThrottledIterator.run( + Iterators.failFast(asyncSnapshotInfoIterator, failFastSupplier), + (ref, asyncSnapshotInfo) -> ActionListener.run( + ActionListener.runBefore(listeners.acquire(), ref::close), + refListener -> asyncSnapshotInfo.getSnapshotInfo(new ActionListener<>() { + @Override + public void onResponse(SnapshotInfo snapshotInfo) { + if (matchesPredicates(snapshotInfo)) { + totalCount.incrementAndGet(); + if (afterPredicate.test(snapshotInfo)) { + allSnapshotInfos.add(snapshotInfo.maybeWithoutIndices(indices)); + } } + refListener.onResponse(null); } - refListener.onResponse(null); - } - @Override - public void onFailure(Exception e) { - if (ignoreUnavailable) { - logger.warn(Strings.format("failed to fetch snapshot info for [%s]", asyncSnapshotInfo), e); - refListener.onResponse(null); - } else { - refListener.onFailure(e); + @Override + public void onFailure(Exception e) { + if (ignoreUnavailable) { + logger.warn(Strings.format("failed to fetch snapshot info for [%s]", asyncSnapshotInfo), e); + refListener.onResponse(null); + } else { + refListener.onFailure(e); + } } - } - }) - ), - getSnapshotInfoExecutor.getMaxRunningTasks(), - () -> {} - ); - })); - } + }) + ), + getSnapshotInfoExecutor.getMaxRunningTasks(), + () -> {} + ) + ) + ) + ); } } @@ -383,6 +395,17 @@ private interface AsyncSnapshotInfo { void getSnapshotInfo(ActionListener listener); } + /** + * An asynchronous supplier of the collection of snapshots contained in a repository, as an iterator over snapshots each represented + * as an {@link AsyncSnapshotInfo}. + */ + private interface AsyncSnapshotInfoIterator { + /** + * @param listener completed, possibly asynchronously, with the appropriate iterator over {@link AsyncSnapshotInfo} instances. + */ + void getAsyncSnapshotInfoIterator(ActionListener> listener); + } + /** * @return an {@link AsyncSnapshotInfo} for the given in-progress snapshot entry. */