Skip to content

Commit

Permalink
Fix Inconsistent RepositoryData on First Repository Mount (#67647) (#…
Browse files Browse the repository at this point in the history
…67660)

Fix Inconsistent RepositoryData on First Repository Mount

Add mechanism that forces tracking the repository generation in the
cluster state before it is first used to avoid races between the first
write of repository data and the next operation (e.g. when doing
concurrent snapshot creation right after mounting a repository) in
extreme corner cases.

Closes #66663
Closes #67199
  • Loading branch information
original-brownbear committed Jan 18, 2021
1 parent 06512ea commit 2474dcd
Showing 1 changed file with 86 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.PlainListenableActionFuture;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.RepositoryCleanupInProgress;
Expand Down Expand Up @@ -1335,8 +1336,91 @@ public void getRepositoryData(ActionListener<RepositoryData> listener) {
}
return;
}
// Slow path if we were not able to safely read the repository data from cache
threadPool.generic().execute(ActionRunnable.wrap(listener, this::doGetRepositoryData));
if (metadata.generation() == RepositoryData.UNKNOWN_REPO_GEN && isReadOnly() == false &&
clusterService.state().nodes().getMinNodeVersion().onOrAfter(RepositoryMetadata.REPO_GEN_IN_CS_VERSION)) {
logger.debug("[{}] loading repository metadata for the first time, trying to determine correct generation and to store " +
"it in the cluster state", metadata.name());
initializeRepoGenerationTracking(listener);
} else {
logger.trace("[{}] loading un-cached repository data with best known repository generation [{}]", metadata.name(),
latestKnownRepoGen);
threadPool.generic().execute(ActionRunnable.wrap(listener, this::doGetRepositoryData));
}
}

// Listener used to ensure that repository data is only initialized once in the cluster state by #initializeRepoGenerationTracking
private PlainListenableActionFuture<RepositoryData> repoDataInitialized;

/**
* Method used to set the current repository generation in the cluster state's {@link RepositoryMetadata} to the latest generation that
* can be physically found in the repository before passing the latest {@link RepositoryData} to the given listener.
* This ensures that operations using {@link #executeConsistentStateUpdate} right after mounting a fresh repository will have a
* consistent view of the {@link RepositoryData} before any data has been written to the repository.
*
* @param listener listener to resolve with new repository data
*/
private void initializeRepoGenerationTracking(ActionListener<RepositoryData> listener) {
synchronized (this) {
if (repoDataInitialized == null) {
logger.trace("[{}] initializing repository generation in cluster state", metadata.name());
repoDataInitialized = PlainListenableActionFuture.newListenableFuture();
repoDataInitialized.addListener(listener);
final Consumer<Exception> onFailure = e -> {
logger.warn(new ParameterizedMessage("[{}] Exception when initializing repository generation in cluster state",
metadata.name()), e);
final ActionListener<RepositoryData> existingListener;
synchronized (BlobStoreRepository.this) {
existingListener = repoDataInitialized;
repoDataInitialized = null;
}
existingListener.onFailure(e);
};
threadPool.generic().execute(() -> doGetRepositoryData(
ActionListener.wrap(repoData -> clusterService.submitStateUpdateTask(
"set safe repository generation [" + metadata.name() + "][" + repoData + "]",
new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
RepositoryMetadata metadata = getRepoMetadata(currentState);
// No update to the repository generation should have occurred concurrently in general except for
// extreme corner cases like failing over to an older version master node and back to the current
// node concurrently
if (metadata.generation() != RepositoryData.UNKNOWN_REPO_GEN) {
throw new RepositoryException(
metadata.name(), "Found unexpected initialized repo metadata [" + metadata + "]");
}
return ClusterState.builder(currentState)
.metadata(Metadata.builder(currentState.getMetadata()).putCustom(
RepositoriesMetadata.TYPE,
currentState.metadata().<RepositoriesMetadata>custom(RepositoriesMetadata.TYPE)
.withUpdatedGeneration(metadata.name(),
repoData.getGenId(), repoData.getGenId()))).build();
}

@Override
public void onFailure(String source, Exception e) {
onFailure.accept(e);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
// Resolve listeners on generic pool since some callbacks for repository data do additional IO
threadPool.generic().execute(() -> {
final ActionListener<RepositoryData> existingListener;
synchronized (BlobStoreRepository.this) {
existingListener = repoDataInitialized;
repoDataInitialized = null;
}
existingListener.onResponse(repoData);
});
}
}), onFailure)));
} else {
logger.trace("[{}] waiting for existing initialization of repository metadata generation in cluster state",
metadata.name());
repoDataInitialized.addListener(listener);
}
}
}

private void doGetRepositoryData(ActionListener<RepositoryData> listener) {
Expand Down

0 comments on commit 2474dcd

Please sign in to comment.