Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track Shard-Snapshot Index Generation at Repository Root #46250

Conversation

@original-brownbear
Copy link
Member

commented Sep 3, 2019

Changes to Root-Level index-N (RepositoryData)

This change adds a new field "shards" to RepositoryData that contains a mapping of IndexId to a String[]. This string array can be accessed by shard id to get the generation of a shard's shard folder (i.e. the N in the name of the currently valid /indices/${indexId}/${shardId}/index-${N} for the shard in question).

Benefits

This allows for creating a new snapshot in the shard without doing any LIST operations on the shard's folder. In the case of AWS S3, this saves about 1/3 of the cost for updating an empty shard (see #45736) and removes one out of two remaining potential issues with eventually consistent blob stores (see #38941 ... now only the root index-${N} is determined by listing).

Also and equally if not more important, a number of possible failure modes on eventually consistent blob stores like AWS S3 are eliminated by moving all delete operations to the master node and moving from incremental naming of shard level index-N to uuid suffixes for these blobs.

Only Master Deletes Blobs

This change moves the deleting of the previous shard level index-${uuid} blob to the master node instead of the data node allowing for a safe and consistent update of the shard's generation in the RepositoryData by first updating RepositoryData and then deleting the now unreferenced index-${newUUID} blob.
No deletes are executed on the data nodes at all for any operation with this change.

Note also: Previous issues with hanging data nodes interfering with master nodes are completely impossible, even on S3 (see next section for details).

Why Move from index-${N} to index-${uuid} at the Shard Level

This change changes the naming of the shard level index-${N} blobs to a uuid suffix index-${UUID}. The reason for this is the fact that writing a new shard-level index- generation blob is not atomic anymore in its effect. Not only does the blob have to be written to have an effect, it must also be referenced by the root level index-N (RepositoryData) to become an effective part of the snapshot repository.
This leads to a problem if we were to use incrementing names like we did before. If a blob index-${N+1} is written but due to the node/network/cluster/... crashes the root level RepositoryData has not been updated then a future operation will determine the shard's generation to be N and try to write a new index-${N+1} to the already existing path. Updates like that are problematic on S3 for consistency reasons, but also create numerous issues when thinking about stuck data nodes.
Previously stuck data nodes that were tasked to write index-${N+1} but got stuck and tried to do so after some other node had already written index-${N+1} were prevented form doing so (except for on S3) by us not allowing overwrites for that blob and thus no corruption could occur.
Were we to continue using incrementing names, we could not do this. The stuck node scenario would either allow for overwriting the N+1 generation or force us to continue using a LIST operation to figure out the next N (which would make this change pointless).
With uuid naming and moving all deletes to master this becomes a non-issue. Data nodes write updated shard generation index-${uuid} and master makes those index-${uuid} part of the RepositoryData that it deems correct and cleans up all those index- that are unused.

bck
bck
@elasticmachine

This comment has been minimized.

Copy link
Collaborator

commented Sep 3, 2019

@original-brownbear original-brownbear changed the title Track Shard-Snapshot Index Generationat Repository Root Track Shard-Snapshot Index Generation at Repository Root Sep 3, 2019
@ywelsch ywelsch added the v7.6.0 label Oct 17, 2019
Copy link
Contributor

left a comment

I've left some more comments, we're converging though.

@@ -384,47 +388,154 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Action
* @param listener Listener to invoke once finished
*/
private void doDeleteShardSnapshots(SnapshotId snapshotId, long repositoryStateId, Map<String, BlobContainer> foundIndices,
Map<String, BlobMetaData> rootBlobs, RepositoryData repositoryData,
Map<String, BlobMetaData> rootBlobs, RepositoryData repositoryData, Version version,

This comment has been minimized.

Copy link
@ywelsch

ywelsch Oct 17, 2019

Contributor

I think the duplication is ok, as it's just there, nicely isolated for the 7.x BWC path. In master, this code can go away. I don't expect much changes in this high-level logic (it just coordinates what is happening after what, the actual logic is implemented in helper methods). The important thing is that the series of actions is well understood. With the more callback-oriented approach that you've chosen, it's very difficult for me to understand the order of the various steps that the deletion is doing.

@original-brownbear

This comment has been minimized.

Copy link
Member Author

commented Oct 18, 2019

@ywelsch how about 77eeb7d for making the order clearer? :) No we really only have callbacks that force you to look back a few lines to get the execution order in a single place and I didn't ahve to mix try-catch with listener handling.

@original-brownbear original-brownbear requested a review from ywelsch Oct 18, 2019
@ywelsch

This comment has been minimized.

Copy link
Contributor

commented Oct 21, 2019

how about 77eeb7d for making the order clearer?

I find it still as complex to understand. It still defines 4 abstract things and plugs them together in various ways. It's impossible to tell which listener will be invoked, and how often it is invoked.

Here's another patch which imho makes it easier to understand what's going on:

diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
index c5bd70f586b..7e9234436bb 100644
--- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
+++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
@@ -62,7 +62,6 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
-import org.elasticsearch.common.util.concurrent.ListenableFuture;
 import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.common.xcontent.XContentFactory;
@@ -109,7 +108,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -390,78 +388,77 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
      */
     private void doDeleteShardSnapshots(SnapshotId snapshotId, long repositoryStateId, Map<String, BlobContainer> foundIndices,
                                         Map<String, BlobMetaData> rootBlobs, RepositoryData repositoryData, boolean writeShardGens,
-                                        ActionListener<Void> listener) {
-
-        final ExecutorService executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
-
-        // Listener to invoke once an updated ShardGenerations instance is ready to be written
-        final ListenableFuture<ShardGenerations> shardGenerationsListener = new ListenableFuture<>();
-
-        // Listener to invoke once we've written the latest repository data and updated all shard metadata.
-        // Any failure past this point will only be logged.
-        final ListenableFuture<Collection<ShardSnapshotMetaDeleteResult>> afterUpdateAllMetadata = new ListenableFuture<>();
-
-        final ListenableFuture<RepositoryData> repositoryDataWrittenListener = new ListenableFuture<>();
+                                        ActionListener<Void> listener) throws IOException {
 
         if (writeShardGens) {
-            // New path that updates the pointer to each deleted shard's generation in the root RepositoryData
-
-            // Once we are done removing the snapshot from the shard-level metadata of each affected shard we can update the repository
-            // metadata as follows:
+            // First write the new shard state metadata (with the removed snapshot) and compute deletion targets
+            final StepListener<Collection<ShardSnapshotMetaDeleteResult>> writeShardMetaDataAndComputeDeletesStep = new StepListener<>();
+            writeUpdatedShardMetaDataAndComputeDeletes(snapshotId, repositoryData, true, writeShardMetaDataAndComputeDeletesStep);
+            // Once we have put the new shard-level metadata into place, we can update the repository metadata as follows:
             // 1. Remove the snapshot from the list of existing snapshots
             // 2. Update the index shard generations of all updated shard folders
             //
             // Note: If we fail updating any of the individual shard paths, none of them are changed since the newly created
             //       index-${gen_uuid} will not be referenced by the existing RepositoryData and new RepositoryData is only
             //       written if all shard paths have been successfully updated.
-            final StepListener<Collection<ShardSnapshotMetaDeleteResult>> afterUpdateShardMeta = new StepListener<>();
-
-            writeUpdatedShardMetaDataAndComputeDeletes(snapshotId, repositoryData, true, afterUpdateShardMeta);
-
-            afterUpdateShardMeta.whenComplete(res -> {
-                final ShardGenerations.Builder builder = ShardGenerations.builder();
-                for (ShardSnapshotMetaDeleteResult newGen : res) {
-                    builder.put(newGen.indexId, newGen.shardId, newGen.newGeneration);
-                }
-                shardGenerationsListener.onResponse(builder.build());
-                repositoryDataWrittenListener.addListener(ActionListener.delegateFailure(listener,
-                    (v, l) -> afterUpdateAllMetadata.onResponse(res)), executor);
+            final StepListener<RepositoryData> writeUpdatedRepoDataStep = new StepListener<>();
+            writeShardMetaDataAndComputeDeletesStep.whenComplete(deleteResults -> {
+                    final ShardGenerations.Builder builder = ShardGenerations.builder();
+                    for (ShardSnapshotMetaDeleteResult newGen : deleteResults) {
+                        builder.put(newGen.indexId, newGen.shardId, newGen.newGeneration);
+                    }
+                    final RepositoryData updatedRepoData = repositoryData.removeSnapshot(snapshotId, builder.build());
+                    writeIndexGen(updatedRepoData, repositoryStateId, true);
+                    writeUpdatedRepoDataStep.onResponse(updatedRepoData);
+                }, writeUpdatedRepoDataStep::onFailure);
+            // Once we have updated the repository, run the clean-ups
+            writeUpdatedRepoDataStep.whenComplete(updatedRepoData -> {
+                // Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
+                final ActionListener<Void> afterCleanupsListener =
+                    new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2);
+                asyncCleanupUnlinkedRootAndIndicesBlobs(foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener);
+                asyncCleanupUnlinkedShardLevelBlobs(snapshotId, writeShardMetaDataAndComputeDeletesStep.result(), afterCleanupsListener);
             }, listener::onFailure);
         } else {
-            // Writing the new repository data first, without tracking any shard generations in the BwC path
-            shardGenerationsListener.onResponse(ShardGenerations.EMPTY);
-            // We've already written the new RepositoryData so updating all shard-level metadata will mean that all metadata is updated
-            repositoryDataWrittenListener.addListener(ActionListener.delegateFailure(listener,
-                (v, l) -> writeUpdatedShardMetaDataAndComputeDeletes(snapshotId, repositoryData, false, afterUpdateAllMetadata)),
-                executor);
+            // Write the new repository data first (with the removed snapshot), using no shard generations
+            final RepositoryData updatedRepoData = repositoryData.removeSnapshot(snapshotId, ShardGenerations.EMPTY);
+            writeIndexGen(updatedRepoData, repositoryStateId, false);
+            // Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
+            final ActionListener<Void> afterCleanupsListener =
+                new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2);
+            asyncCleanupUnlinkedRootAndIndicesBlobs(foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener);
+            writeUpdatedShardMetaDataAndComputeDeletes(snapshotId, repositoryData, false, ActionListener.map(afterCleanupsListener,
+                deleteResults -> {
+                    asyncCleanupUnlinkedShardLevelBlobs(snapshotId, deleteResults, afterCleanupsListener);
+                    return null;
+                }));
         }
+    }
+
+    private void asyncCleanupUnlinkedRootAndIndicesBlobs(Map<String, BlobContainer> foundIndices, Map<String, BlobMetaData> rootBlobs,
+        RepositoryData updatedRepoData, ActionListener<Void> listener) {
+        threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(
+            listener,
+            l -> cleanupStaleBlobs(foundIndices, rootBlobs, updatedRepoData, ActionListener.map(l, ignored -> null))));
+    }
 
-        shardGenerationsListener.addListener(ActionListener.wrap(shardGens -> {
-            final RepositoryData updatedRepoData = repositoryData.removeSnapshot(snapshotId, shardGens);
-            // Write out new RepositoryData
-            writeIndexGen(updatedRepoData, repositoryStateId, writeShardGens);
-            repositoryDataWrittenListener.onResponse(updatedRepoData);
-        }, listener::onFailure), executor);
-
-        // Listener that resolves the given listener passed to this method once both cleanup steps have completed.
-        // Any failures encountered by this listener are ignored as they were already logged by the throwing code.
-        final ActionListener<Void> afterCleanupsListener =
-            new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2);
-
-        // Run unreferenced blobs cleanup if we were able to write updated repository data
-        repositoryDataWrittenListener.addListener(
-            ActionListener.wrap(updatedRepoData -> cleanupStaleBlobs(
-                foundIndices, rootBlobs, updatedRepoData, ActionListener.map(afterCleanupsListener, ignored -> null)), e -> {}), executor);
-
-        afterUpdateAllMetadata.addListener(ActionListener.runAfter(
-            ActionListener.wrap(
-                // Now that all metadata (RepositoryData at the repo root as well as index-N blobs in all shard paths)
-                // has been updated we can execute the delete operations for all blobs that have become unreferenced as a result
-                deleteResults -> blobContainer().deleteBlobsIgnoringIfNotExists(resolveFilesToDelete(snapshotId, deleteResults)),
-                // Any exceptions after we have updated the root level RepositoryData are only logged but won't fail the delete request
-                e -> logger.warn(
-                    () -> new ParameterizedMessage("[{}] Failed to delete some blobs during snapshot delete", snapshotId), e)),
-            () -> afterCleanupsListener.onResponse(null)), executor);
+    private void asyncCleanupUnlinkedShardLevelBlobs(SnapshotId snapshotId, Collection<ShardSnapshotMetaDeleteResult> deleteResults,
+        ActionListener<Void> listener) {
+        threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(
+            listener,
+            l -> {
+                try {
+                    blobContainer().deleteBlobsIgnoringIfNotExists(resolveFilesToDelete(
+                        snapshotId,
+                        deleteResults));
+                    listener.onResponse(null);
+                } catch (Exception e) {
+                    logger.warn(
+                        () -> new ParameterizedMessage("[{}] Failed to delete some blobs during snapshot delete", snapshotId),
+                        e);
+                    throw e;
+                }
+            }));
     }
 
     // updates the shard state metadata for shards of a snapshot that is to be deleted. Also computes the files to be cleaned up.
@original-brownbear

This comment has been minimized.

Copy link
Member Author

commented Oct 21, 2019

Thanks @ywelsch, I like that patch :) Applied in 1bdbf2c with minor change 68e407c (to make it a little more obvious what fails the overall request and what doesn't) now :)

@original-brownbear

This comment has been minimized.

Copy link
Member Author

commented Oct 21, 2019

Jenkins run elasticsearch-ci/2 (unrelated xpack security failure)

Copy link
Contributor

left a comment

I've left one important comment that needs addressing, o.w. LGTM

@original-brownbear original-brownbear merged commit 8c13cf7 into elastic:master Oct 22, 2019
8 checks passed
8 checks passed
CLA All commits in pull request signed
Details
elasticsearch-ci/1 Build finished.
Details
elasticsearch-ci/2 Build finished.
Details
elasticsearch-ci/bwc Build finished.
Details
elasticsearch-ci/default-distro Build finished.
Details
elasticsearch-ci/docs Build finished.
Details
elasticsearch-ci/oss-distro-docs Build finished.
Details
elasticsearch-ci/packaging-sample-matrix Build finished.
Details
@original-brownbear original-brownbear deleted the original-brownbear:smarter-incrementality-snapshots branch Oct 22, 2019
@original-brownbear

This comment has been minimized.

Copy link
Member Author

commented Oct 22, 2019

Thanks so much for grinding through this @tlrx and @ywelsch, much appreciated !

original-brownbear added a commit to original-brownbear/elasticsearch that referenced this pull request Oct 23, 2019
* Track Shard Snapshot Generation in CS

Adds communication of new shard generations from datanodes to master
and tracking of those generations in the CS.
This is a preliminary to elastic#46250
original-brownbear added a commit to original-brownbear/elasticsearch that referenced this pull request Oct 23, 2019
This change adds a new field `"shards"` to `RepositoryData` that contains a mapping of `IndexId` to a `String[]`. This string array can be accessed by shard id to get the generation of a shard's shard folder (i.e. the `N` in the name of the currently valid `/indices/${indexId}/${shardId}/index-${N}` for the shard in question).

This allows for creating a new snapshot in the shard without doing any LIST operations on the shard's folder. In the case of AWS S3, this saves about 1/3 of the cost for updating an empty shard (see elastic#45736) and removes one out of two remaining potential issues with eventually consistent blob stores (see elastic#38941 ... now only the root `index-${N}` is determined by listing).

Also and equally if not more important, a number of possible failure modes on eventually consistent blob stores like AWS S3 are eliminated by moving all delete operations to the `master` node and moving from incremental naming of shard level index-N to uuid suffixes for these blobs.

This change moves the deleting of the previous shard level `index-${uuid}` blob to the master node instead of the data node allowing for a safe and consistent update of the shard's generation in the `RepositoryData` by first updating `RepositoryData` and then deleting the now unreferenced `index-${newUUID}` blob.
__No deletes are executed on the data nodes at all for any operation with this change.__

Note also: Previous issues with hanging data nodes interfering with master nodes are completely impossible, even on S3 (see next section for details).

This change changes the naming of the shard level `index-${N}` blobs to a uuid suffix `index-${UUID}`. The reason for this is the fact that writing a new shard-level `index-` generation blob is not atomic anymore in its effect. Not only does the blob have to be written to have an effect, it must also be referenced by the root level `index-N` (`RepositoryData`) to become an effective part of the snapshot repository.
This leads to a problem if we were to use incrementing names like we did before. If a blob `index-${N+1}` is written but due to the node/network/cluster/... crashes the root level `RepositoryData` has not been updated then a future operation will determine the shard's generation to be `N` and try to write a new `index-${N+1}` to the already existing path. Updates like that are problematic on S3 for consistency reasons, but also create numerous issues when thinking about stuck data nodes.
Previously stuck data nodes that were tasked to write `index-${N+1}` but got stuck and tried to do so after some other node had already written `index-${N+1}` were prevented form doing so (except for on S3) by us not allowing overwrites for that blob and thus no corruption could occur.
Were we to continue using incrementing names, we could not do this. The stuck node scenario would either allow for overwriting the `N+1` generation or force us to continue using a `LIST` operation to figure out the next `N` (which would make this change pointless).
With uuid naming and moving all deletes to `master` this becomes a non-issue. Data nodes write updated shard generation `index-${uuid}` and `master` makes those `index-${uuid}` part of the `RepositoryData` that it deems correct and cleans up all those `index-` that are unused.

Co-authored-by: Yannick Welsch <yannick@welsch.lu>
Co-authored-by: Tanguy Leroux <tlrx.dev@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants
You can’t perform that action at this time.