Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -276,55 +276,67 @@ void runOperation(ActionListener<GetSnapshotsResponse> listener) {
*/
private void populateResults(ActionListener<Void> listener) {
try (var listeners = new RefCountingListener(listener)) {
for (final RepositoryMetadata repository : repositories) {
final String repositoryName = repository.name();
if (skipRepository(repositoryName)) {
continue;
}

if (listeners.isFailing()) {
return;
}
final BooleanSupplier failFastSupplier = () -> cancellableTask.isCancelled() || listeners.isFailing();

final Iterator<AsyncSnapshotInfoIterator> asyncSnapshotInfoIterators = Iterators.failFast(
Iterators.map(
Iterators.filter(
Iterators.map(repositories.iterator(), RepositoryMetadata::name),
repositoryName -> skipRepository(repositoryName) == false
),
repositoryName -> asyncRepositoryContentsListener -> SubscribableListener

.<RepositoryData>newForked(l -> maybeGetRepositoryData(repositoryName, l))
.andThenApply(repositoryData -> {
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.MANAGEMENT);
cancellableTask.ensureNotCancelled();
ensureRequiredNamesPresent(repositoryName, repositoryData);
return getAsyncSnapshotInfoIterator(repositoriesService.repository(repositoryName), repositoryData);
})
.addListener(asyncRepositoryContentsListener)
),
failFastSupplier
);

maybeGetRepositoryData(repositoryName, listeners.acquire(repositoryData -> {
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.MANAGEMENT);
cancellableTask.ensureNotCancelled();
ensureRequiredNamesPresent(repositoryName, repositoryData);
ThrottledIterator.run(
Iterators.failFast(
getAsyncSnapshotInfoIterator(repositoriesService.repository(repositoryName), repositoryData),
() -> cancellableTask.isCancelled() || listeners.isFailing()
),
(ref, asyncSnapshotInfo) -> ActionListener.run(
ActionListener.runBefore(listeners.acquire(), ref::close),
refListener -> asyncSnapshotInfo.getSnapshotInfo(new ActionListener<>() {
@Override
public void onResponse(SnapshotInfo snapshotInfo) {
if (matchesPredicates(snapshotInfo)) {
totalCount.incrementAndGet();
if (afterPredicate.test(snapshotInfo)) {
allSnapshotInfos.add(snapshotInfo.maybeWithoutIndices(indices));
// TODO if the request parameters allow it, modify asyncSnapshotInfoIterators to skip unnecessary GET calls here

asyncSnapshotInfoIterators.forEachRemaining(
asyncSnapshotInfoIteratorSupplier -> asyncSnapshotInfoIteratorSupplier.getAsyncSnapshotInfoIterator(
listeners.acquire(
asyncSnapshotInfoIterator -> ThrottledIterator.run(
Iterators.failFast(asyncSnapshotInfoIterator, failFastSupplier),
(ref, asyncSnapshotInfo) -> ActionListener.run(
ActionListener.runBefore(listeners.acquire(), ref::close),
refListener -> asyncSnapshotInfo.getSnapshotInfo(new ActionListener<>() {
@Override
public void onResponse(SnapshotInfo snapshotInfo) {
if (matchesPredicates(snapshotInfo)) {
totalCount.incrementAndGet();
if (afterPredicate.test(snapshotInfo)) {
allSnapshotInfos.add(snapshotInfo.maybeWithoutIndices(indices));
}
}
refListener.onResponse(null);
}
refListener.onResponse(null);
}

@Override
public void onFailure(Exception e) {
if (ignoreUnavailable) {
logger.warn(Strings.format("failed to fetch snapshot info for [%s]", asyncSnapshotInfo), e);
refListener.onResponse(null);
} else {
refListener.onFailure(e);
@Override
public void onFailure(Exception e) {
if (ignoreUnavailable) {
logger.warn(Strings.format("failed to fetch snapshot info for [%s]", asyncSnapshotInfo), e);
refListener.onResponse(null);
} else {
refListener.onFailure(e);
}
}
}
})
),
getSnapshotInfoExecutor.getMaxRunningTasks(),
() -> {}
);
}));
}
})
),
getSnapshotInfoExecutor.getMaxRunningTasks(),
() -> {}
)
)
)
);
}
}

Expand Down Expand Up @@ -383,6 +395,17 @@ private interface AsyncSnapshotInfo {
void getSnapshotInfo(ActionListener<SnapshotInfo> listener);
}

/**
* An asynchronous supplier of the collection of snapshots contained in a repository, as an iterator over snapshots each represented
* as an {@link AsyncSnapshotInfo}.
*/
private interface AsyncSnapshotInfoIterator {
/**
* @param listener completed, possibly asynchronously, with the appropriate iterator over {@link AsyncSnapshotInfo} instances.
*/
void getAsyncSnapshotInfoIterator(ActionListener<Iterator<AsyncSnapshotInfo>> listener);
}

/**
* @return an {@link AsyncSnapshotInfo} for the given in-progress snapshot entry.
*/
Expand Down