From e39f9d3d8be42833866dec35e13ad067d32534a5 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 3 May 2023 22:27:06 -0600 Subject: [PATCH] Continue to retry FsBlobContainer writes Currently writeBlob will delete an existing blob and attempt to write again if failIfAlreadyExists=false. However, this still introduces scenarios where multiple writers could delete and fail on the next write. Instead we should continue to retry until succeeding. --- .../common/blobstore/fs/FsBlobContainer.java | 34 ++++++++++------- .../blobstore/fs/FsBlobContainerTests.java | 37 ------------------- 2 files changed, 20 insertions(+), 51 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java index 954cb483a38e0..1dd7822860ed8 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java @@ -240,14 +240,17 @@ public long readBlobPreferredLength() { @Override public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { final Path file = path.resolve(blobName); - try { - writeToPath(inputStream, file, blobSize); - } catch (FileAlreadyExistsException faee) { - if (failIfAlreadyExists) { - throw faee; + while(true) { + try { + writeToPath(inputStream, file, blobSize); + break; + } catch (FileAlreadyExistsException faee) { + if (failIfAlreadyExists) { + throw faee; + } + deleteBlobsIgnoringIfNotExists(Iterators.single(blobName)); + writeToPath(inputStream, file, blobSize); } - deleteBlobsIgnoringIfNotExists(Iterators.single(blobName)); - writeToPath(inputStream, file, blobSize); } IOUtils.fsync(path, true); } @@ -255,14 +258,17 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b @Override public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { final Path file = path.resolve(blobName); - try { - writeToPath(bytes, file); - } catch (FileAlreadyExistsException faee) { - if (failIfAlreadyExists) { - throw faee; + while (true) { + try { + writeToPath(bytes, file); + break; + } catch (FileAlreadyExistsException faee) { + if (failIfAlreadyExists) { + throw faee; + } + deleteBlobsIgnoringIfNotExists(Iterators.single(blobName)); + writeToPath(bytes, file); } - deleteBlobsIgnoringIfNotExists(Iterators.single(blobName)); - writeToPath(bytes, file); } IOUtils.fsync(path, true); } diff --git a/server/src/test/java/org/elasticsearch/common/blobstore/fs/FsBlobContainerTests.java b/server/src/test/java/org/elasticsearch/common/blobstore/fs/FsBlobContainerTests.java index 95dc6c4428838..5c270123d903a 100644 --- a/server/src/test/java/org/elasticsearch/common/blobstore/fs/FsBlobContainerTests.java +++ b/server/src/test/java/org/elasticsearch/common/blobstore/fs/FsBlobContainerTests.java @@ -36,8 +36,6 @@ import java.nio.file.StandardCopyOption; import java.nio.file.attribute.FileAttribute; import java.nio.file.spi.FileSystemProvider; -import java.util.ArrayList; -import java.util.List; import java.util.Locale; import java.util.OptionalLong; import java.util.Set; @@ -104,41 +102,6 @@ public void testIsTempBlobName() { assertThat(FsBlobContainer.isTempBlobName(tempBlobName), is(true)); } - public void testDeleteIgnoringIfNotExistsDoesNotThrowFileNotFound() throws IOException { - final String blobName = randomAlphaOfLengthBetween(1, 20).toLowerCase(Locale.ROOT); - final byte[] blobData = randomByteArrayOfLength(512); - - final Path path = PathUtils.get(createTempDir().toString()); - Files.write(path.resolve(blobName), blobData); - - final FsBlobContainer container = new FsBlobContainer( - new FsBlobStore(randomIntBetween(1, 8) * 1024, path, false), - BlobPath.EMPTY, - path - ); - - ArrayList> futures = new ArrayList<>(); - for (int i = 0; i < 5; ++i) { - PlainActionFuture future = PlainActionFuture.newFuture(); - futures.add(future); - new Thread(() -> { - try { - container.deleteBlobsIgnoringIfNotExists(List.of(blobName).listIterator()); - future.onResponse(null); - } catch (IOException e) { - future.onFailure(e); - } - }).start(); - } - - // Check that none throw - for (PlainActionFuture future : futures) { - future.actionGet(); - } - - assertFalse(container.blobExists(blobName)); - } - private static long getLongAsync(Consumer> consumer) { return getAsync(consumer).orElseThrow(AssertionError::new); }