Skip to content

Commit

Permalink
[8.6] Simplify and optimize deduplication of RepositoryData for a non…
Browse files Browse the repository at this point in the history
…-caching repository instance (#91851) (#91866) (#92661)

* Simplify and optimize deduplication of RepositoryData for a non-caching repository instance (#91851)

This makes use of the new deduplicator infrastructure to move to more
efficient deduplication mechanics.
The existing solution hardly ever deduplicated because it would only
deduplicate after the repository entered a consistent state. The
adjusted solution is much simpler, in that it simply deduplicates such
that only a single loading of `RepositoryData` will ever happen at a
time, fixing memory issues from massively concurrent loading of the repo
data as described in #89952.

closes #89952

* fix compile
  • Loading branch information
original-brownbear committed Jan 4, 2023
1 parent cd55fd4 commit 7221c6a
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 88 deletions.
7 changes: 7 additions & 0 deletions docs/changelog/91851.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pr: 91851
summary: Simplify and optimize deduplication of `RepositoryData` for a non-caching
repository instance
area: Snapshot/Restore
type: bug
issues:
- 89952
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action;

import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.common.util.concurrent.ThreadContext;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;

/**
*
* Wraps an async action that consumes an {@link ActionListener} such that multiple invocations of {@link #execute(ActionListener)} can
* share the result from a single call to the wrapped action. This implementation is similar to {@link ResultDeduplicator} but offers
* stronger guarantees of not seeing a stale result ever. Concretely, every invocation of {@link #execute(ActionListener)} is guaranteed to
* be resolved with a response that has been computed at a time after the call to {@code execute} has been made. This allows this class to
* be used to deduplicate results from actions that produce results that change over time transparently.
*
* @param <T> Result type
*/
public final class SingleResultDeduplicator<T> {

private final ThreadContext threadContext;

/**
* List of listeners waiting for the execution after the current in-progress execution. If {@code null} then no execution is in
* progress currently, otherwise an execution is in progress and will trigger another execution that will resolve any listeners queued
* up here once done.
*/
private List<ActionListener<T>> waitingListeners;

private final Consumer<ActionListener<T>> executeAction;

public SingleResultDeduplicator(ThreadContext threadContext, Consumer<ActionListener<T>> executeAction) {
this.threadContext = threadContext;
this.executeAction = executeAction;
}

/**
* Execute the action for the given {@code listener}.
* @param listener listener to resolve with execution result
*/
public void execute(ActionListener<T> listener) {
synchronized (this) {
if (waitingListeners == null) {
// no queued up listeners, just execute this one directly without deduplication and instantiate the list so that
// subsequent executions will wait
waitingListeners = new ArrayList<>();
} else {
// already running an execution, queue this one up
waitingListeners.add(ContextPreservingActionListener.wrapPreservingContext(listener, threadContext));
return;
}
}
doExecute(listener);
}

private void doExecute(ActionListener<T> listener) {
final ActionListener<T> wrappedListener = ActionListener.runBefore(listener, () -> {
final List<ActionListener<T>> listeners;
synchronized (this) {
if (waitingListeners.isEmpty()) {
// no listeners were queued up while this execution ran, so we just reset the state to not having a running execution
waitingListeners = null;
return;
} else {
// we have queued up listeners, so we create a fresh list for the next execution and execute once to handle the
// listeners currently queued up
listeners = waitingListeners;
waitingListeners = new ArrayList<>();
}
}
doExecute(new ActionListener<T>() {
@Override
public void onResponse(T response) {
ActionListener.onResponse(listeners, response);
}

@Override
public void onFailure(Exception e) {
ActionListener.onFailure(listeners, e);
}
});
});
try {
executeAction.accept(wrappedListener);
} catch (Exception e) {
wrappedListener.onFailure(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.ResultDeduplicator;
import org.elasticsearch.action.SingleResultDeduplicator;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.ListenableActionFuture;
Expand Down Expand Up @@ -406,6 +406,11 @@ protected BlobStoreRepository(
cacheRepositoryData = CACHE_REPOSITORY_DATA.get(metadata.settings());
bufferSize = Math.toIntExact(BUFFER_SIZE_SETTING.get(metadata.settings()).getBytes());
this.maxSnapshotCount = MAX_SNAPSHOTS_SETTING.get(metadata.settings());
this.repoDataLoadDeduplicator = new SingleResultDeduplicator<>(
threadPool.getThreadContext(),
listener -> threadPool.executor(ThreadPool.Names.SNAPSHOT_META)
.execute(ActionRunnable.wrap(listener, this::doGetRepositoryData))
);
}

@Override
Expand Down Expand Up @@ -1722,7 +1727,7 @@ public void getRepositoryData(ActionListener<RepositoryData> listener) {
// Fast path loading repository data directly from cache if we're in fully consistent mode and the cache matches up with
// the latest known repository generation
if (bestEffortConsistency == false && cached.getGenId() == latestKnownRepoGen.get()) {
repoDataDeduplicator.executeOnce(metadata, listener, (metadata, l) -> l.onResponse(cached));
listener.onResponse(cached);
return;
}
if (metadata.generation() == RepositoryData.UNKNOWN_REPO_GEN
Expand All @@ -1740,19 +1745,7 @@ && isReadOnly() == false
metadata.name(),
latestKnownRepoGen
);
// Don't deduplicate repo data loading if we don't have strong consistency guarantees between the repo and the cluster state
// Also, if we are not caching repository data (for tests) we assume that the contents of the repository data at a given
// generation may change
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT_META);
if (bestEffortConsistency || cacheRepositoryData == false) {
executor.execute(ActionRunnable.wrap(listener, this::doGetRepositoryData));
} else {
repoDataDeduplicator.executeOnce(
metadata,
listener,
(metadata, l) -> executor.execute(ActionRunnable.wrap(l, this::doGetRepositoryData))
);
}
repoDataLoadDeduplicator.execute(listener);
}
}

Expand Down Expand Up @@ -1799,78 +1792,70 @@ private void initializeRepoGenerationTracking(ActionListener<RepositoryData> lis
}
existingListener.onFailure(e);
};
threadPool.generic()
.execute(
ActionRunnable.wrap(
ActionListener.wrap(
repoData -> clusterService.submitStateUpdateTask(
"set initial safe repository generation [" + metadata.name() + "][" + repoData.getGenId() + "]",
new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
RepositoryMetadata metadata = getRepoMetadata(currentState);
// No update to the repository generation should have occurred concurrently in general except
// for
// extreme corner cases like failing over to an older version master node and back to the
// current
// node concurrently
if (metadata.generation() != RepositoryData.UNKNOWN_REPO_GEN) {
throw new RepositoryException(
metadata.name(),
"Found unexpected initialized repo metadata [" + metadata + "]"
);
}
return ClusterState.builder(currentState)
.metadata(
Metadata.builder(currentState.getMetadata())
.putCustom(
RepositoriesMetadata.TYPE,
currentState.metadata()
.<RepositoriesMetadata>custom(RepositoriesMetadata.TYPE)
.withUpdatedGeneration(
metadata.name(),
repoData.getGenId(),
repoData.getGenId()
)
)
repoDataLoadDeduplicator.execute(
ActionListener.wrap(
repoData -> clusterService.submitStateUpdateTask(
"set initial safe repository generation [" + metadata.name() + "][" + repoData.getGenId() + "]",
new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
RepositoryMetadata metadata = getRepoMetadata(currentState);
// No update to the repository generation should have occurred concurrently in general except
// for
// extreme corner cases like failing over to an older version master node and back to the
// current
// node concurrently
if (metadata.generation() != RepositoryData.UNKNOWN_REPO_GEN) {
throw new RepositoryException(
metadata.name(),
"Found unexpected initialized repo metadata [" + metadata + "]"
);
}
return ClusterState.builder(currentState)
.metadata(
Metadata.builder(currentState.getMetadata())
.putCustom(
RepositoriesMetadata.TYPE,
currentState.metadata()
.<RepositoriesMetadata>custom(RepositoriesMetadata.TYPE)
.withUpdatedGeneration(metadata.name(), repoData.getGenId(), repoData.getGenId())
)
.build();
}
)
.build();
}

@Override
public void onFailure(String source, Exception e) {
onFailure.accept(e);
}
@Override
public void onFailure(String source, Exception e) {
onFailure.accept(e);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
logger.trace(
"[{}] initialized repository generation in cluster state to [{}]",
metadata.name(),
repoData.getGenId()
);
// Resolve listeners on generic pool since some callbacks for repository data do additional IO
threadPool.generic().execute(() -> {
final ActionListener<RepositoryData> existingListener;
synchronized (BlobStoreRepository.this) {
existingListener = repoDataInitialized;
repoDataInitialized = null;
}
existingListener.onResponse(repoData);
logger.trace(
"[{}] called listeners after initializing repository to generation [{}]",
metadata.name(),
repoData.getGenId()
);
});
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
logger.trace(
"[{}] initialized repository generation in cluster state to [{}]",
metadata.name(),
repoData.getGenId()
);
// Resolve listeners on generic pool since some callbacks for repository data do additional IO
threadPool.generic().execute(() -> {
final ActionListener<RepositoryData> existingListener;
synchronized (BlobStoreRepository.this) {
existingListener = repoDataInitialized;
repoDataInitialized = null;
}
}
),
onFailure
),
this::doGetRepositoryData
)
);
existingListener.onResponse(repoData);
logger.trace(
"[{}] called listeners after initializing repository to generation [{}]",
metadata.name(),
repoData.getGenId()
);
});
}
}
),
onFailure
)
);
} else {
logger.trace(
"[{}] waiting for existing initialization of repository metadata generation in cluster state",
Expand All @@ -1882,11 +1867,9 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
}

/**
* {@link RepositoryData} loading deduplicator. This may only be used with consistent generation repositories, meaning
* {@link #bestEffortConsistency} must be {@code false}, in which case we can assume that the {@link RepositoryData} loaded is
* unique for a given value of {@link #metadata} at any point in time.
* Deduplicator that deduplicates the physical loading of {@link RepositoryData} from the repositories' underlying storage.
*/
private final ResultDeduplicator<RepositoryMetadata, RepositoryData> repoDataDeduplicator = new ResultDeduplicator<>();
private final SingleResultDeduplicator<RepositoryData> repoDataLoadDeduplicator;

private void doGetRepositoryData(ActionListener<RepositoryData> listener) {
// Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository.
Expand All @@ -1908,7 +1891,7 @@ private void doGetRepositoryData(ActionListener<RepositoryData> listener) {
);
return;
}
genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, generation));
genToLoad = latestKnownRepoGen.accumulateAndGet(generation, Math::max);
if (genToLoad > generation) {
logger.info(
"Determined repository generation [{}] from repository contents but correct generation must be at " + "least [{}]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ public static ClusterService mockClusterService(RepositoryMetadata metadata) {
private static ClusterService mockClusterService(ClusterState initialState) {
final ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.executor(ThreadPool.Names.SNAPSHOT)).thenReturn(new SameThreadExecutorService());
when(threadPool.executor(ThreadPool.Names.SNAPSHOT_META)).thenReturn(new SameThreadExecutorService());
when(threadPool.generic()).thenReturn(new SameThreadExecutorService());
when(threadPool.info(ThreadPool.Names.SNAPSHOT)).thenReturn(
new ThreadPool.Info(ThreadPool.Names.SNAPSHOT, ThreadPool.ThreadPoolType.FIXED, randomIntBetween(1, 10))
Expand Down

0 comments on commit 7221c6a

Please sign in to comment.