Skip to content

Commit

Permalink
Continue to retry FsBlobContainer writes
Browse files Browse the repository at this point in the history
Currently writeBlob will delete an existing blob and attempt to write
again if failIfAlreadyExists=false. However, this still introduces
scenarios where multiple writers could delete and fail on the next
write. Instead we should continue to retry until succeeding.
  • Loading branch information
Tim-Brooks committed May 4, 2023
1 parent 6ace408 commit e39f9d3
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -240,29 +240,35 @@ public long readBlobPreferredLength() {
@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
final Path file = path.resolve(blobName);
try {
writeToPath(inputStream, file, blobSize);
} catch (FileAlreadyExistsException faee) {
if (failIfAlreadyExists) {
throw faee;
while(true) {
try {
writeToPath(inputStream, file, blobSize);
break;
} catch (FileAlreadyExistsException faee) {
if (failIfAlreadyExists) {
throw faee;
}
deleteBlobsIgnoringIfNotExists(Iterators.single(blobName));
writeToPath(inputStream, file, blobSize);
}
deleteBlobsIgnoringIfNotExists(Iterators.single(blobName));
writeToPath(inputStream, file, blobSize);
}
IOUtils.fsync(path, true);
}

@Override
public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
final Path file = path.resolve(blobName);
try {
writeToPath(bytes, file);
} catch (FileAlreadyExistsException faee) {
if (failIfAlreadyExists) {
throw faee;
while (true) {
try {
writeToPath(bytes, file);
break;
} catch (FileAlreadyExistsException faee) {
if (failIfAlreadyExists) {
throw faee;
}
deleteBlobsIgnoringIfNotExists(Iterators.single(blobName));
writeToPath(bytes, file);
}
deleteBlobsIgnoringIfNotExists(Iterators.single(blobName));
writeToPath(bytes, file);
}
IOUtils.fsync(path, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.spi.FileSystemProvider;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.OptionalLong;
import java.util.Set;
Expand Down Expand Up @@ -104,41 +102,6 @@ public void testIsTempBlobName() {
assertThat(FsBlobContainer.isTempBlobName(tempBlobName), is(true));
}

public void testDeleteIgnoringIfNotExistsDoesNotThrowFileNotFound() throws IOException {
final String blobName = randomAlphaOfLengthBetween(1, 20).toLowerCase(Locale.ROOT);
final byte[] blobData = randomByteArrayOfLength(512);

final Path path = PathUtils.get(createTempDir().toString());
Files.write(path.resolve(blobName), blobData);

final FsBlobContainer container = new FsBlobContainer(
new FsBlobStore(randomIntBetween(1, 8) * 1024, path, false),
BlobPath.EMPTY,
path
);

ArrayList<PlainActionFuture<Void>> futures = new ArrayList<>();
for (int i = 0; i < 5; ++i) {
PlainActionFuture<Void> future = PlainActionFuture.newFuture();
futures.add(future);
new Thread(() -> {
try {
container.deleteBlobsIgnoringIfNotExists(List.of(blobName).listIterator());
future.onResponse(null);
} catch (IOException e) {
future.onFailure(e);
}
}).start();
}

// Check that none throw
for (PlainActionFuture<Void> future : futures) {
future.actionGet();
}

assertFalse(container.blobExists(blobName));
}

private static long getLongAsync(Consumer<ActionListener<OptionalLong>> consumer) {
return getAsync(consumer).orElseThrow(AssertionError::new);
}
Expand Down

0 comments on commit e39f9d3

Please sign in to comment.