From 6966fa5df46d04d97cc146cdf78f66ed687e6dbe Mon Sep 17 00:00:00 2001 From: Ali Beyad Date: Thu, 10 Nov 2016 21:45:02 -0500 Subject: [PATCH] Ensures cleanup of temporary index-* generational blobs during snapshotting (#21469) Ensures pending index-* blobs are deleted when snapshotting. The index-* blobs are generational files that maintain the snapshots in the repository. To write these atomically, we first write a `pending-index-*` blob, then move it to `index-*`, which also deletes `pending-index-*` in case its not a file-system level move (e.g. S3 repositories) . For example, to write the 5th generation of the index blob for the repository, we would first write the bytes to `pending-index-5` and then move `pending-index-5` to `index-5`. It is possible that we fail after writing `pending-index-5`, but before moving it to `index-5` or deleting `pending-index-5`. In this case, we will have a dangling `pending-index-5` blob laying around. Since snapshot #5 would have failed, the next snapshot assumes a generation number of 5, so it tries to write to `index-5`, which first tries to write to `pending-index-5` before moving the blob to `index-5`. Since `pending-index-5` is leftover from the previous failure, the snapshot fails as it cannot overwrite this blob. This commit solves the problem by first, adding a UUID to the `pending-index-*` blobs, and secondly, strengthen the logic around failure to write the `index-*` generational blob to ensure pending files are deleted on cleanup. Closes #21462 --- .../common/blobstore/BlobContainer.java | 7 ++- .../blobstore/BlobStoreRepository.java | 12 +++-- .../SharedClusterSnapshotRestoreIT.java | 49 +++++++++++++++++++ .../snapshots/mockstore/MockRepository.java | 8 ++- 4 files changed, 68 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java b/core/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java index 1427f34a642a7..a04c75941e7b6 100644 --- a/core/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java +++ b/core/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java @@ -105,8 +105,11 @@ public interface BlobContainer { Map listBlobsByPrefix(String blobNamePrefix) throws IOException; /** - * Atomically renames the source blob into the target blob. If the source blob does not exist or the - * target blob already exists, an exception is thrown. + * Renames the source blob into the target blob. If the source blob does not exist or the + * target blob already exists, an exception is thrown. Atomicity of the move operation + * can only be guaranteed on an implementation-by-implementation basis. The only current + * implementation of {@link BlobContainer} for which atomicity can be guaranteed is the + * {@link org.elasticsearch.common.blobstore.fs.FsBlobContainer}. * * @param sourceBlobName * The blob to rename. diff --git a/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index a1bad51626c19..3db9d4340ef93 100644 --- a/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -867,15 +867,17 @@ private long listBlobsToGetLatestIndexId() throws IOException { } private void writeAtomic(final String blobName, final BytesReference bytesRef) throws IOException { - final String tempBlobName = "pending-" + blobName; + final String tempBlobName = "pending-" + blobName + "-" + UUIDs.randomBase64UUID(); try (InputStream stream = bytesRef.streamInput()) { snapshotsBlobContainer.writeBlob(tempBlobName, stream, bytesRef.length()); - } - try { snapshotsBlobContainer.move(tempBlobName, blobName); } catch (IOException ex) { - // Move failed - try cleaning up - snapshotsBlobContainer.deleteBlob(tempBlobName); + // temporary blob creation or move failed - try cleaning up + try { + snapshotsBlobContainer.deleteBlob(tempBlobName); + } catch (IOException e) { + ex.addSuppressed(e); + } throw ex; } } diff --git a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 5b5e04cde0607..430a0e6d34c41 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -2672,4 +2672,53 @@ public void testSnapshotCanceledOnRemovedShard() throws Exception { assertEquals("IndexShardSnapshotFailedException[Aborted]", snapshotInfo.shardFailures().get(0).reason()); } + public void testSnapshotSucceedsAfterSnapshotFailure() throws Exception { + logger.info("--> creating repository"); + final Path repoPath = randomRepoPath(); + assertAcked(client().admin().cluster().preparePutRepository("test-repo").setType("mock").setVerify(false).setSettings( + Settings.builder().put("location", repoPath).put("random_control_io_exception_rate", randomIntBetween(5, 20) / 100f))); + + logger.info("--> indexing some data"); + createIndex("test-idx"); + ensureGreen(); + final int numDocs = randomIntBetween(1, 5); + for (int i = 0; i < numDocs; i++) { + index("test-idx", "doc", Integer.toString(i), "foo", "bar" + i); + } + refresh(); + assertThat(client().prepareSearch("test-idx").setSize(0).get().getHits().totalHits(), equalTo((long) numDocs)); + + logger.info("--> snapshot with potential I/O failures"); + try { + CreateSnapshotResponse createSnapshotResponse = + client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap") + .setWaitForCompletion(true) + .setIndices("test-idx") + .get(); + if (createSnapshotResponse.getSnapshotInfo().totalShards() != createSnapshotResponse.getSnapshotInfo().successfulShards()) { + assertThat(getFailureCount("test-repo"), greaterThan(0L)); + assertThat(createSnapshotResponse.getSnapshotInfo().shardFailures().size(), greaterThan(0)); + for (SnapshotShardFailure shardFailure : createSnapshotResponse.getSnapshotInfo().shardFailures()) { + assertThat(shardFailure.reason(), containsString("Random IOException")); + } + } + } catch (Exception ex) { + // sometimes, the snapshot will fail with a top level I/O exception + assertThat(ExceptionsHelper.stackTrace(ex), containsString("Random IOException")); + } + + logger.info("--> snapshot with no I/O failures"); + assertAcked(client().admin().cluster().preparePutRepository("test-repo-2").setType("mock").setVerify(false).setSettings( + Settings.builder().put("location", repoPath))); + CreateSnapshotResponse createSnapshotResponse = + client().admin().cluster().prepareCreateSnapshot("test-repo-2", "test-snap-2") + .setWaitForCompletion(true) + .setIndices("test-idx") + .get(); + assertEquals(0, createSnapshotResponse.getSnapshotInfo().failedShards()); + GetSnapshotsResponse getSnapshotsResponse = client().admin().cluster().prepareGetSnapshots("test-repo-2") + .addSnapshots("test-snap-2").get(); + assertEquals(SnapshotState.SUCCESS, getSnapshotsResponse.getSnapshots().get(0).state()); + } + } diff --git a/core/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/core/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index ca3aeb674bde6..ccf94b093dcb5 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/core/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -321,14 +321,20 @@ public Map listBlobsByPrefix(String blobNamePrefix) throws @Override public void move(String sourceBlob, String targetBlob) throws IOException { + // simulate a non-atomic move, since many blob container implementations + // will not have an atomic move, and we should be able to handle that maybeIOExceptionOrBlock(targetBlob); - super.move(sourceBlob, targetBlob); + super.writeBlob(targetBlob, super.readBlob(sourceBlob), 0L); + super.deleteBlob(sourceBlob); } @Override public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException { maybeIOExceptionOrBlock(blobName); super.writeBlob(blobName, inputStream, blobSize); + // for network based repositories, the blob may have been written but we may still + // get an error with the client connection, so an IOException here simulates this + maybeIOExceptionOrBlock(blobName); } } }