Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
f3c380d
Implement failIfAlreadyExists in S3 repositories
kvanerum Aug 16, 2025
35d427c
Test failIfAlreadyExists for concurrent writes
kvanerum Aug 18, 2025
3d54119
Disable testFailIfAlreadyExists for HdfsRepositoryTests
kvanerum Aug 21, 2025
8e8f3e7
Fix race in S3HttpHandler overwrite protection
kvanerum Aug 21, 2025
3c2c833
Replace `failIfAlreadyExists` variable with inline `randomBoolean` in…
kvanerum Aug 21, 2025
c1974ad
Don't remove uploads when precondition failed on S3 completeMultipart…
kvanerum Aug 23, 2025
4c479f4
Test failIfAlreadyExists in S3BlobStoreRepositoryTests with multipart…
kvanerum Aug 23, 2025
a903309
Merge branch 'main' into s3-failIfAlreadyExists
kvanerum Aug 23, 2025
ad360ca
Cleanup
kvanerum Aug 23, 2025
cef778c
Merge branch 'main' into 0reviews/2025/09/01/kvanerum/s3-failIfAlread…
DaveCTurner Sep 1, 2025
6979fbe
Add changelog
DaveCTurner Sep 1, 2025
9c860cd
Inline failIfAlreadyExists in executeMultipartCopy
kvanerum Sep 3, 2025
b5c0088
Throw error on multiple If-None-Match headers in S3HttpHandler
kvanerum Sep 3, 2025
75a91cf
Refactor S3HttpHandlerTests to simplify task creation with helper met…
kvanerum Sep 3, 2025
c975a60
Use constant for safe await timeout in S3HttpHandlerTests
kvanerum Sep 3, 2025
249e8bb
Improve readability in AbstractThirdPartyRepositoryTestCase.testFailI…
kvanerum Sep 3, 2025
884dcc8
Simplify copy object logic in S3HttpHandler by removing unused precon…
kvanerum Sep 4, 2025
a839a47
Merge branch 'main' into s3-failIfAlreadyExists
kvanerum Sep 4, 2025
419f4ab
Merge branch 'main' into s3-failIfAlreadyExists
DaveCTurner Sep 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/133030.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 133030
summary: Implement `failIfAlreadyExists` in S3 repositories
area: Snapshot/Restore
type: enhancement
issues:
- 128565
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,11 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
Expand All @@ -105,6 +108,7 @@
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;

@SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint")
// Need to set up a new cluster for each test because cluster settings use randomized authentication settings
Expand Down Expand Up @@ -425,7 +429,7 @@ public void testEnforcedCooldownPeriod() throws IOException {
if (randomBoolean()) {
repository.blobStore()
.blobContainer(repository.basePath())
.writeBlobAtomic(randomNonDataPurpose(), getRepositoryDataBlobName(modifiedRepositoryData.getGenId()), serialized, true);
.writeBlobAtomic(randomNonDataPurpose(), getRepositoryDataBlobName(modifiedRepositoryData.getGenId()), serialized, false);
} else {
repository.blobStore()
.blobContainer(repository.basePath())
Expand All @@ -434,7 +438,7 @@ public void testEnforcedCooldownPeriod() throws IOException {
getRepositoryDataBlobName(modifiedRepositoryData.getGenId()),
serialized.streamInput(),
serialized.length(),
true
false
);
}

Expand Down Expand Up @@ -568,6 +572,52 @@ public void match(LogEvent event) {
}
}

public void testFailIfAlreadyExists() throws IOException, InterruptedException {
try (BlobStore store = newBlobStore()) {
final BlobContainer container = store.blobContainer(BlobPath.EMPTY);
final String blobName = randomAlphaOfLengthBetween(8, 12);

final byte[] data;
if (randomBoolean()) {
// single upload
data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
} else {
// multipart upload
int thresholdInBytes = Math.toIntExact(((S3BlobContainer) container).getLargeBlobThresholdInBytes());
data = randomBytes(randomIntBetween(thresholdInBytes, thresholdInBytes + scaledRandomIntBetween(1024, 1 << 16)));
}

// initial write blob
AtomicInteger exceptionCount = new AtomicInteger(0);
try (var executor = Executors.newFixedThreadPool(2)) {
for (int i = 0; i < 2; i++) {
executor.submit(() -> {
try {
writeBlob(container, blobName, new BytesArray(data), true);
} catch (IOException e) {
exceptionCount.incrementAndGet();
}
});
}
executor.shutdown();
var done = executor.awaitTermination(1, TimeUnit.SECONDS);
assertTrue(done);
}

assertEquals(1, exceptionCount.get());

// overwrite if failIfAlreadyExists is set to false
writeBlob(container, blobName, new BytesArray(data), false);

// throw exception if failIfAlreadyExists is set to true
var exception = expectThrows(IOException.class, () -> writeBlob(container, blobName, new BytesArray(data), true));

assertThat(exception.getMessage(), startsWith("Unable to upload"));

container.delete(randomPurpose());
}
}

/**
* S3RepositoryPlugin that allows to disable chunked encoding and to set a low threshold between single upload and multipart upload.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,18 +137,15 @@ public long readBlobPreferredLength() {
return ByteSizeValue.of(32, ByteSizeUnit.MB).getBytes();
}

/**
* This implementation ignores the failIfAlreadyExists flag as the S3 API has no way to enforce this due to its weak consistency model.
*/
@Override
public void writeBlob(OperationPurpose purpose, String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
throws IOException {
assert BlobContainer.assertPurposeConsistency(purpose, blobName);
assert inputStream.markSupported() : "No mark support on inputStream breaks the S3 SDK's ability to retry requests";
if (blobSize <= getLargeBlobThresholdInBytes()) {
executeSingleUpload(purpose, blobStore, buildKey(blobName), inputStream, blobSize);
executeSingleUpload(purpose, blobStore, buildKey(blobName), inputStream, blobSize, failIfAlreadyExists);
} else {
executeMultipartUpload(purpose, blobStore, buildKey(blobName), inputStream, blobSize);
executeMultipartUpload(purpose, blobStore, buildKey(blobName), inputStream, blobSize, failIfAlreadyExists);
}
}

Expand Down Expand Up @@ -545,7 +542,8 @@ void executeSingleUpload(
final S3BlobStore s3BlobStore,
final String blobName,
final InputStream input,
final long blobSize
final long blobSize,
final boolean failIfAlreadyExists
) throws IOException {
try (var clientReference = s3BlobStore.clientReference()) {
// Extra safety checks
Expand All @@ -565,6 +563,9 @@ void executeSingleUpload(
if (s3BlobStore.serverSideEncryption()) {
putRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
}
if (failIfAlreadyExists) {
putRequestBuilder.ifNoneMatch("*");
}
S3BlobStore.configureRequestForMetrics(putRequestBuilder, blobStore, Operation.PUT_OBJECT, purpose);

final var putRequest = putRequestBuilder.build();
Expand All @@ -586,7 +587,8 @@ private void executeMultipart(
final String blobName,
final long partSize,
final long blobSize,
final PartOperation partOperation
final PartOperation partOperation,
final boolean failIfAlreadyExists
) throws IOException {

ensureMultiPartUploadSize(blobSize);
Expand Down Expand Up @@ -639,6 +641,11 @@ private void executeMultipart(
.key(blobName)
.uploadId(uploadId)
.multipartUpload(b -> b.parts(parts));

if (failIfAlreadyExists) {
completeMultipartUploadRequestBuilder.ifNoneMatch("*");
}

S3BlobStore.configureRequestForMetrics(completeMultipartUploadRequestBuilder, blobStore, operation, purpose);
final var completeMultipartUploadRequest = completeMultipartUploadRequestBuilder.build();
try (var clientReference = s3BlobStore.clientReference()) {
Expand All @@ -663,7 +670,8 @@ void executeMultipartUpload(
final S3BlobStore s3BlobStore,
final String blobName,
final InputStream input,
final long blobSize
final long blobSize,
final boolean failIfAlreadyExists
) throws IOException {
executeMultipart(
purpose,
Expand All @@ -680,7 +688,8 @@ void executeMultipartUpload(
.uploadPart(uploadRequest, RequestBody.fromInputStream(input, partSize));
return CompletedPart.builder().partNumber(partNum).eTag(uploadResponse.eTag()).build();
}
}
},
failIfAlreadyExists
);
}

Expand Down Expand Up @@ -727,7 +736,8 @@ void executeMultipartCopy(
final var uploadPartCopyResponse = clientReference.client().uploadPartCopy(uploadPartCopyRequest);
return CompletedPart.builder().partNumber(partNum).eTag(uploadPartCopyResponse.copyPartResult().eTag()).build();
}
})
}),
false
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,14 @@ public void testExecuteSingleUploadBlobSizeTooLarge() {

final IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> blobContainer.executeSingleUpload(randomPurpose(), blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize)
() -> blobContainer.executeSingleUpload(
randomPurpose(),
blobStore,
randomAlphaOfLengthBetween(1, 10),
null,
blobSize,
randomBoolean()
)
);
assertEquals("Upload request size [" + blobSize + "] can't be larger than 5gb", e.getMessage());
}
Expand All @@ -88,7 +95,8 @@ public void testExecuteSingleUploadBlobSizeLargerThanBufferSize() {
blobStore,
blobName,
new ByteArrayInputStream(new byte[0]),
ByteSizeUnit.MB.toBytes(2)
ByteSizeUnit.MB.toBytes(2),
randomBoolean()
)
);
assertEquals("Upload request size [2097152] can't be larger than buffer size", e.getMessage());
Expand Down Expand Up @@ -123,6 +131,8 @@ public void testExecuteSingleUpload() throws IOException {
when(blobStore.getCannedACL()).thenReturn(cannedAccessControlList);
}

final boolean failIfAlreadyExists = randomBoolean();

final S3Client client = configureMockClient(blobStore);

final ArgumentCaptor<PutObjectRequest> requestCaptor = ArgumentCaptor.forClass(PutObjectRequest.class);
Expand All @@ -131,7 +141,7 @@ public void testExecuteSingleUpload() throws IOException {
when(client.putObject(requestCaptor.capture(), bodyCaptor.capture())).thenReturn(PutObjectResponse.builder().build());

final ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[blobSize]);
blobContainer.executeSingleUpload(randomPurpose(), blobStore, blobName, inputStream, blobSize);
blobContainer.executeSingleUpload(randomPurpose(), blobStore, blobName, inputStream, blobSize, failIfAlreadyExists);

final PutObjectRequest request = requestCaptor.getValue();
assertEquals(bucketName, request.bucket());
Expand All @@ -147,6 +157,10 @@ public void testExecuteSingleUpload() throws IOException {
);
}

if (failIfAlreadyExists) {
assertEquals("*", request.ifNoneMatch());
}

final RequestBody requestBody = bodyCaptor.getValue();
try (var contentStream = requestBody.contentStreamProvider().newStream()) {
assertEquals(inputStream.available(), blobSize);
Expand All @@ -164,7 +178,14 @@ public void testExecuteMultipartUploadBlobSizeTooLarge() {

final IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> blobContainer.executeMultipartUpload(randomPurpose(), blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize)
() -> blobContainer.executeMultipartUpload(
randomPurpose(),
blobStore,
randomAlphaOfLengthBetween(1, 10),
null,
blobSize,
randomBoolean()
)
);
assertEquals("Multipart upload request size [" + blobSize + "] can't be larger than 5tb", e.getMessage());
}
Expand All @@ -176,7 +197,14 @@ public void testExecuteMultipartUploadBlobSizeTooSmall() {

final IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> blobContainer.executeMultipartUpload(randomPurpose(), blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize)
() -> blobContainer.executeMultipartUpload(
randomPurpose(),
blobStore,
randomAlphaOfLengthBetween(1, 10),
null,
blobSize,
randomBoolean()
)
);
assertEquals("Multipart upload request size [" + blobSize + "] can't be smaller than 5mb", e.getMessage());
}
Expand Down Expand Up @@ -225,6 +253,8 @@ void testExecuteMultipart(boolean doCopy) throws IOException {
when(blobStore.getCannedACL()).thenReturn(cannedAccessControlList);
}

final boolean failIfAlreadyExists = doCopy ? false : randomBoolean();

final S3Client client = configureMockClient(blobStore);

final var uploadId = randomIdentifier();
Expand Down Expand Up @@ -273,7 +303,7 @@ void testExecuteMultipart(boolean doCopy) throws IOException {
if (doCopy) {
blobContainer.executeMultipartCopy(randomPurpose(), sourceContainer, sourceBlobName, blobName, blobSize);
} else {
blobContainer.executeMultipartUpload(randomPurpose(), blobStore, blobName, inputStream, blobSize);
blobContainer.executeMultipartUpload(randomPurpose(), blobStore, blobName, inputStream, blobSize, failIfAlreadyExists);
}

final CreateMultipartUploadRequest initRequest = createMultipartUploadRequestCaptor.getValue();
Expand Down Expand Up @@ -340,6 +370,10 @@ void testExecuteMultipart(boolean doCopy) throws IOException {
assertEquals(blobPath.buildAsString() + blobName, compRequest.key());
assertEquals(uploadId, compRequest.uploadId());

if (failIfAlreadyExists) {
assertEquals("*", compRequest.ifNoneMatch());
}

final List<String> actualETags = compRequest.multipartUpload()
.parts()
.stream()
Expand Down Expand Up @@ -419,7 +453,14 @@ public void close() {}

final IOException e = expectThrows(IOException.class, () -> {
final S3BlobContainer blobContainer = new S3BlobContainer(BlobPath.EMPTY, blobStore);
blobContainer.executeMultipartUpload(randomPurpose(), blobStore, blobName, new ByteArrayInputStream(new byte[0]), blobSize);
blobContainer.executeMultipartUpload(
randomPurpose(),
blobStore,
blobName,
new ByteArrayInputStream(new byte[0]),
blobSize,
randomBoolean()
);
});

assertEquals("Unable to upload or copy object [" + blobName + "] using multipart upload", e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,9 @@ protected void assertCleanupResponse(CleanupRepositoryResponse response, long by
assertThat(response.result().blobs(), equalTo(0L));
}
}

@Override
public void testFailIfAlreadyExists() {
// HDFS does not implement failIfAlreadyExists correctly
}
}
Loading