diff --git a/core/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java b/core/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java index 1427f34a642a7..a04c75941e7b6 100644 --- a/core/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java +++ b/core/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java @@ -105,8 +105,11 @@ public interface BlobContainer { Map listBlobsByPrefix(String blobNamePrefix) throws IOException; /** - * Atomically renames the source blob into the target blob. If the source blob does not exist or the - * target blob already exists, an exception is thrown. + * Renames the source blob into the target blob. If the source blob does not exist or the + * target blob already exists, an exception is thrown. Atomicity of the move operation + * can only be guaranteed on an implementation-by-implementation basis. The only current + * implementation of {@link BlobContainer} for which atomicity can be guaranteed is the + * {@link org.elasticsearch.common.blobstore.fs.FsBlobContainer}. * * @param sourceBlobName * The blob to rename. diff --git a/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index a1bad51626c19..3db9d4340ef93 100644 --- a/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -867,15 +867,17 @@ private long listBlobsToGetLatestIndexId() throws IOException { } private void writeAtomic(final String blobName, final BytesReference bytesRef) throws IOException { - final String tempBlobName = "pending-" + blobName; + final String tempBlobName = "pending-" + blobName + "-" + UUIDs.randomBase64UUID(); try (InputStream stream = bytesRef.streamInput()) { snapshotsBlobContainer.writeBlob(tempBlobName, stream, bytesRef.length()); - } - try { snapshotsBlobContainer.move(tempBlobName, blobName); } catch (IOException ex) { - // Move failed - try cleaning up - snapshotsBlobContainer.deleteBlob(tempBlobName); + // temporary blob creation or move failed - try cleaning up + try { + snapshotsBlobContainer.deleteBlob(tempBlobName); + } catch (IOException e) { + ex.addSuppressed(e); + } throw ex; } } diff --git a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 657e5bbf0180b..c3686c0b98395 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -2592,4 +2592,53 @@ public void testGetSnapshotsRequest() throws Exception { waitForCompletion(repositoryName, inProgressSnapshot, TimeValue.timeValueSeconds(60)); } + public void testSnapshotSucceedsAfterSnapshotFailure() throws Exception { + logger.info("--> creating repository"); + final Path repoPath = randomRepoPath(); + assertAcked(client().admin().cluster().preparePutRepository("test-repo").setType("mock").setVerify(false).setSettings( + Settings.builder().put("location", repoPath).put("random_control_io_exception_rate", randomIntBetween(5, 20) / 100f))); + + logger.info("--> indexing some data"); + createIndex("test-idx"); + ensureGreen(); + final int numDocs = randomIntBetween(1, 5); + for (int i = 0; i < numDocs; i++) { + index("test-idx", "doc", Integer.toString(i), "foo", "bar" + i); + } + refresh(); + assertThat(client().prepareSearch("test-idx").setSize(0).get().getHits().totalHits(), equalTo((long) numDocs)); + + logger.info("--> snapshot with potential I/O failures"); + try { + CreateSnapshotResponse createSnapshotResponse = + client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap") + .setWaitForCompletion(true) + .setIndices("test-idx") + .get(); + if (createSnapshotResponse.getSnapshotInfo().totalShards() != createSnapshotResponse.getSnapshotInfo().successfulShards()) { + assertThat(getFailureCount("test-repo"), greaterThan(0L)); + assertThat(createSnapshotResponse.getSnapshotInfo().shardFailures().size(), greaterThan(0)); + for (SnapshotShardFailure shardFailure : createSnapshotResponse.getSnapshotInfo().shardFailures()) { + assertThat(shardFailure.reason(), containsString("Random IOException")); + } + } + } catch (Exception ex) { + // sometimes, the snapshot will fail with a top level I/O exception + assertThat(ExceptionsHelper.stackTrace(ex), containsString("Random IOException")); + } + + logger.info("--> snapshot with no I/O failures"); + assertAcked(client().admin().cluster().preparePutRepository("test-repo-2").setType("mock").setVerify(false).setSettings( + Settings.builder().put("location", repoPath))); + CreateSnapshotResponse createSnapshotResponse = + client().admin().cluster().prepareCreateSnapshot("test-repo-2", "test-snap-2") + .setWaitForCompletion(true) + .setIndices("test-idx") + .get(); + assertEquals(0, createSnapshotResponse.getSnapshotInfo().failedShards()); + GetSnapshotsResponse getSnapshotsResponse = client().admin().cluster().prepareGetSnapshots("test-repo-2") + .addSnapshots("test-snap-2").get(); + assertEquals(SnapshotState.SUCCESS, getSnapshotsResponse.getSnapshots().get(0).state()); + } + } diff --git a/core/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/core/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index 06c4ec10af056..ae2070d9ce371 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/core/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -313,14 +313,20 @@ public Map listBlobsByPrefix(String blobNamePrefix) throws @Override public void move(String sourceBlob, String targetBlob) throws IOException { + // simulate a non-atomic move, since many blob container implementations + // will not have an atomic move, and we should be able to handle that maybeIOExceptionOrBlock(targetBlob); - super.move(sourceBlob, targetBlob); + super.writeBlob(targetBlob, super.readBlob(sourceBlob), 0L); + super.deleteBlob(sourceBlob); } @Override public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException { maybeIOExceptionOrBlock(blobName); super.writeBlob(blobName, inputStream, blobSize); + // for network based repositories, the blob may have been written but we may still + // get an error with the client connection, so an IOException here simulates this + maybeIOExceptionOrBlock(blobName); } } }