From 258602c3d6399b1c0d3b30ea719ce04c0f184a3b Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 24 Nov 2025 20:01:09 +0000 Subject: [PATCH 1/4] Add full conditional write support to S3 test fixture In #133030 we added limited support for conditional writes in `S3HttpHandler`, allowing callers to prevent overwriting an existing blob with an `If-None-Match: *` precondition header. This commit extends the implementation to include support for the `If-Match: ` precondition header allowing callers to perform atomic compare-and-set operations which overwrite existing objects. --- .../main/java/fixture/s3/S3HttpHandler.java | 50 +++++++++++++++++-- .../java/fixture/s3/S3HttpHandlerTests.java | 41 +++++++++++---- 2 files changed, 78 insertions(+), 13 deletions(-) diff --git a/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java b/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java index 619582ba7ab9d..b22ee563e5116 100644 --- a/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java +++ b/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java @@ -44,6 +44,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -248,6 +249,9 @@ public void handle(final HttpExchange exchange) throws IOException { if (isProtectOverwrite(exchange)) { throw new AssertionError("If-None-Match: * header is not supported here"); } + if (getRequiredExistingETag(exchange) != null) { + throw new AssertionError("If-Match: * header is not supported here"); + } var sourceBlob = blobs.get(copySource); if (sourceBlob == null) { @@ -412,10 +416,24 @@ public void handle(final HttpExchange exchange) throws IOException { private boolean updateBlobContents(HttpExchange exchange, String path, BytesReference newContents) { if (isProtectOverwrite(exchange)) { return blobs.putIfAbsent(path, newContents) == null; - } else { - blobs.put(path, newContents); - return true; } + + final var requireExistingETag = getRequiredExistingETag(exchange); + if (requireExistingETag != null) { + final var success = new AtomicBoolean(true); + blobs.compute(path, (ignoredPath, existingContents) -> { + if (existingContents != null && requireExistingETag.equals(getEtagFromContents(existingContents))) { + return newContents; + } + + success.set(false); + return existingContents; + }); + return success.get(); + } + + blobs.put(path, newContents); + return true; } /** @@ -594,6 +612,9 @@ private static boolean isProtectOverwrite(final HttpExchange exchange) { return false; } + if (exchange.getRequestHeaders().get("If-Match") != null) { + throw new AssertionError("Handling both If-None-Match and If-Match headers is not supported"); + } if (ifNoneMatch.size() != 1) { throw new AssertionError("multiple If-None-Match headers found: " + ifNoneMatch); } @@ -605,6 +626,29 @@ private static boolean isProtectOverwrite(final HttpExchange exchange) { throw new AssertionError("invalid If-None-Match header: " + ifNoneMatch); } + @Nullable // if no If-Match header found + private static String getRequiredExistingETag(final HttpExchange exchange) { + final var ifMatch = exchange.getRequestHeaders().get("If-Match"); + + if (ifMatch == null) { + return null; + } + + if (exchange.getRequestHeaders().get("If-None-Match") != null) { + throw new AssertionError("Handling both If-None-Match and If-Match headers is not supported"); + } + + final var iterator = ifMatch.iterator(); + if (iterator.hasNext()) { + final var result = iterator.next(); + if (iterator.hasNext() == false) { + return result; + } + } + + throw new AssertionError("multiple If-Match headers found: " + ifMatch); + } + MultipartUpload putUpload(String path) { final var upload = new MultipartUpload(UUIDs.randomBase64UUID(), path); synchronized (uploads) { diff --git a/test/fixtures/s3-fixture/src/test/java/fixture/s3/S3HttpHandlerTests.java b/test/fixtures/s3-fixture/src/test/java/fixture/s3/S3HttpHandlerTests.java index ec7a3e15da707..a74411a2087b8 100644 --- a/test/fixtures/s3-fixture/src/test/java/fixture/s3/S3HttpHandlerTests.java +++ b/test/fixtures/s3-fixture/src/test/java/fixture/s3/S3HttpHandlerTests.java @@ -413,13 +413,29 @@ public void testExtractPartEtags() { } public void testPreventObjectOverwrite() throws InterruptedException { + ensureExactlyOneSuccess(new S3HttpHandler("bucket", "path"), null); + } + + public void testConditionalOverwrite() throws InterruptedException { final var handler = new S3HttpHandler("bucket", "path"); + final var originalBody = new BytesArray(randomAlphaOfLength(50).getBytes(StandardCharsets.UTF_8)); + final var originalETag = S3HttpHandler.getEtagFromContents(originalBody); + assertEquals(RestStatus.OK, handleRequest(handler, "PUT", "/bucket/path/blob", originalBody).status()); + assertEquals( + new TestHttpResponse(RestStatus.OK, originalBody, addETag(originalETag, TestHttpExchange.EMPTY_HEADERS)), + handleRequest(handler, "GET", "/bucket/path/blob") + ); + + ensureExactlyOneSuccess(handler, originalETag); + } + + private static void ensureExactlyOneSuccess(S3HttpHandler handler, String originalETag) throws InterruptedException { var tasks = List.of( - createPutObjectTask(handler), - createPutObjectTask(handler), - createMultipartUploadTask(handler), - createMultipartUploadTask(handler) + createPutObjectTask(handler, originalETag), + createPutObjectTask(handler, originalETag), + createMultipartUploadTask(handler, originalETag), + createMultipartUploadTask(handler, originalETag) ); try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { @@ -450,13 +466,14 @@ public void testPreventObjectOverwrite() throws InterruptedException { ); } - private static TestWriteTask createPutObjectTask(S3HttpHandler handler) { + private static TestWriteTask createPutObjectTask(S3HttpHandler handler, @Nullable String originalETag) { return new TestWriteTask( - (task) -> task.status = handleRequest(handler, "PUT", "/bucket/path/blob", task.body, ifNoneMatchHeader()).status() + (task) -> task.status = handleRequest(handler, "PUT", "/bucket/path/blob", task.body, conditionalWriteHeader(originalETag)) + .status() ); } - private static TestWriteTask createMultipartUploadTask(S3HttpHandler handler) { + private static TestWriteTask createMultipartUploadTask(S3HttpHandler handler, @Nullable String originalETag) { final var multipartUploadTask = new TestWriteTask( (task) -> task.status = handleRequest( handler, @@ -470,7 +487,7 @@ private static TestWriteTask createMultipartUploadTask(S3HttpHandler handler) { 1 """, task.etag)), - ifNoneMatchHeader() + conditionalWriteHeader(originalETag) ).status() ); @@ -599,9 +616,13 @@ private static Headers contentRangeHeader(long start, long end, long length) { return headers; } - private static Headers ifNoneMatchHeader() { + private static Headers conditionalWriteHeader(@Nullable String originalEtag) { var headers = new Headers(); - headers.put("If-None-Match", List.of("*")); + if (originalEtag == null) { + headers.put("If-None-Match", List.of("*")); + } else { + headers.put("If-Match", List.of(originalEtag)); + } return headers; } From 3b4477f0862dece60d50d4543c7b283fb275e56c Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 25 Nov 2025 08:53:22 +0000 Subject: [PATCH 2/4] Rework to return 404 and 409 too --- .../main/java/fixture/s3/S3HttpHandler.java | 53 ++++++++++--------- .../java/fixture/s3/S3HttpHandlerTests.java | 43 ++++++++++----- 2 files changed, 60 insertions(+), 36 deletions(-) diff --git a/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java b/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java index b22ee563e5116..95b5e9a2d87cf 100644 --- a/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java +++ b/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java @@ -26,6 +26,7 @@ import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.fixture.HttpHeaderParser; import java.io.IOException; @@ -44,7 +45,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -191,14 +192,15 @@ public void handle(final HttpExchange exchange) throws IOException { } else if (request.isCompleteMultipartUploadRequest()) { final byte[] responseBody; - final boolean preconditionFailed; + final RestStatus responseCode; synchronized (uploads) { final var upload = getUpload(request.getQueryParamOnce("uploadId")); if (upload == null) { - preconditionFailed = false; if (Randomness.get().nextBoolean()) { + responseCode = RestStatus.NOT_FOUND; responseBody = null; } else { + responseCode = RestStatus.OK; responseBody = """ @@ -210,10 +212,8 @@ public void handle(final HttpExchange exchange) throws IOException { } } else { final var blobContents = upload.complete(extractPartEtags(Streams.readFully(exchange.getRequestBody()))); - - preconditionFailed = updateBlobContents(exchange, request.path(), blobContents) == false; - - if (preconditionFailed == false) { + responseCode = updateBlobContents(exchange, request.path(), blobContents); + if (responseCode == RestStatus.OK) { responseBody = ("\n" + "\n" + "" @@ -223,20 +223,20 @@ public void handle(final HttpExchange exchange) throws IOException { + request.path() + "\n" + "").getBytes(StandardCharsets.UTF_8); - removeUpload(upload.getUploadId()); } else { responseBody = null; } + if (responseCode != RestStatus.PRECONDITION_FAILED) { + removeUpload(upload.getUploadId()); + } } } - if (preconditionFailed) { - exchange.sendResponseHeaders(RestStatus.PRECONDITION_FAILED.getStatus(), -1); - } else if (responseBody == null) { - exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1); - } else { + if (responseCode == RestStatus.OK) { exchange.getResponseHeaders().add("Content-Type", "application/xml"); exchange.sendResponseHeaders(RestStatus.OK.getStatus(), responseBody.length); exchange.getResponseBody().write(responseBody); + } else { + exchange.sendResponseHeaders(responseCode.getStatus(), -1); } } else if (request.isAbortMultipartUploadRequest()) { final var upload = removeUpload(request.getQueryParamOnce("uploadId")); @@ -268,14 +268,12 @@ public void handle(final HttpExchange exchange) throws IOException { } } else { final Tuple blob = parseRequestBody(exchange); - final var preconditionFailed = updateBlobContents(exchange, request.path(), blob.v2()) == false; + final var updateResponseCode = updateBlobContents(exchange, request.path(), blob.v2()); - if (preconditionFailed) { - exchange.sendResponseHeaders(RestStatus.PRECONDITION_FAILED.getStatus(), -1); - } else { + if (updateResponseCode == RestStatus.OK) { exchange.getResponseHeaders().add("ETag", blob.v1()); - exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1); } + exchange.sendResponseHeaders(updateResponseCode.getStatus(), -1); } } else if (request.isListObjectsRequest()) { @@ -413,27 +411,34 @@ public void handle(final HttpExchange exchange) throws IOException { * * @return whether the blob contents were updated: if {@code false} then a requested precondition was not satisfied. */ - private boolean updateBlobContents(HttpExchange exchange, String path, BytesReference newContents) { + private RestStatus updateBlobContents(HttpExchange exchange, String path, BytesReference newContents) { if (isProtectOverwrite(exchange)) { - return blobs.putIfAbsent(path, newContents) == null; + return blobs.putIfAbsent(path, newContents) == null + ? RestStatus.OK + : ESTestCase.randomFrom(RestStatus.PRECONDITION_FAILED, RestStatus.CONFLICT); } final var requireExistingETag = getRequiredExistingETag(exchange); if (requireExistingETag != null) { - final var success = new AtomicBoolean(true); + final var responseCode = new AtomicReference<>(RestStatus.OK); blobs.compute(path, (ignoredPath, existingContents) -> { if (existingContents != null && requireExistingETag.equals(getEtagFromContents(existingContents))) { return newContents; } - success.set(false); + responseCode.set( + ESTestCase.randomFrom( + existingContents == null ? RestStatus.NOT_FOUND : RestStatus.PRECONDITION_FAILED, + RestStatus.CONFLICT + ) + ); return existingContents; }); - return success.get(); + return responseCode.get(); } blobs.put(path, newContents); - return true; + return RestStatus.OK; } /** diff --git a/test/fixtures/s3-fixture/src/test/java/fixture/s3/S3HttpHandlerTests.java b/test/fixtures/s3-fixture/src/test/java/fixture/s3/S3HttpHandlerTests.java index a74411a2087b8..d1df676698d1e 100644 --- a/test/fixtures/s3-fixture/src/test/java/fixture/s3/S3HttpHandlerTests.java +++ b/test/fixtures/s3-fixture/src/test/java/fixture/s3/S3HttpHandlerTests.java @@ -32,14 +32,13 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.oneOf; public class S3HttpHandlerTests extends ESTestCase { @@ -412,11 +411,11 @@ public void testExtractPartEtags() { } - public void testPreventObjectOverwrite() throws InterruptedException { + public void testPreventObjectOverwrite() { ensureExactlyOneSuccess(new S3HttpHandler("bucket", "path"), null); } - public void testConditionalOverwrite() throws InterruptedException { + public void testConditionalOverwrite() { final var handler = new S3HttpHandler("bucket", "path"); final var originalBody = new BytesArray(randomAlphaOfLength(50).getBytes(StandardCharsets.UTF_8)); @@ -430,20 +429,15 @@ public void testConditionalOverwrite() throws InterruptedException { ensureExactlyOneSuccess(handler, originalETag); } - private static void ensureExactlyOneSuccess(S3HttpHandler handler, String originalETag) throws InterruptedException { - var tasks = List.of( + private static void ensureExactlyOneSuccess(S3HttpHandler handler, String originalETag) { + final var tasks = List.of( createPutObjectTask(handler, originalETag), createPutObjectTask(handler, originalETag), createMultipartUploadTask(handler, originalETag), createMultipartUploadTask(handler, originalETag) ); - try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { - tasks.forEach(task -> executor.submit(task.consumer)); - executor.shutdown(); - var done = executor.awaitTermination(SAFE_AWAIT_TIMEOUT.seconds(), TimeUnit.SECONDS); - assertTrue(done); - } + runInParallel(tasks.size(), i -> tasks.get(i).consumer.run()); List successfulTasks = tasks.stream().filter(task -> task.status == RestStatus.OK).toList(); assertThat(successfulTasks, hasSize(1)); @@ -452,6 +446,7 @@ private static void ensureExactlyOneSuccess(S3HttpHandler handler, String origin if (task.status == RestStatus.PRECONDITION_FAILED) { assertNotNull(handler.getUpload(task.uploadId)); } else { + assertThat(task.status, oneOf(RestStatus.OK, RestStatus.CONFLICT)); assertNull(handler.getUpload(task.uploadId)); } }); @@ -466,6 +461,30 @@ private static void ensureExactlyOneSuccess(S3HttpHandler handler, String origin ); } + public void testPutObjectIfMatchWithBlobNotFound() { + final var handler = new S3HttpHandler("bucket", "path"); + while (true) { + final var task = createPutObjectTask(handler, randomIdentifier()); + task.consumer.run(); + if (task.status == RestStatus.NOT_FOUND) { + break; + } + assertEquals(RestStatus.CONFLICT, task.status); // chosen randomly so eventually we will escape the loop + } + } + + public void testCompleteMultipartUploadIfMatchWithBlobNotFound() { + final var handler = new S3HttpHandler("bucket", "path"); + while (true) { + final var task = createMultipartUploadTask(handler, randomIdentifier()); + task.consumer.run(); + if (task.status == RestStatus.NOT_FOUND) { + break; + } + assertEquals(RestStatus.CONFLICT, task.status); // chosen randomly so eventually we will escape the loop + } + } + private static TestWriteTask createPutObjectTask(S3HttpHandler handler, @Nullable String originalETag) { return new TestWriteTask( (task) -> task.status = handleRequest(handler, "PUT", "/bucket/path/blob", task.body, conditionalWriteHeader(originalETag)) From 5a9eace16c3f10d511a7ce509fb8be7f85176f0e Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 25 Nov 2025 09:03:28 +0000 Subject: [PATCH 3/4] Fix comment --- .../s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java b/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java index 95b5e9a2d87cf..eeb35ed637c5d 100644 --- a/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java +++ b/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java @@ -409,7 +409,12 @@ public void handle(final HttpExchange exchange) throws IOException { /** * Update the blob contents if and only if the preconditions in the request are satisfied. * - * @return whether the blob contents were updated: if {@code false} then a requested precondition was not satisfied. + * @return {@link RestStatus#OK} if the blob contents were updated, or else a different status code to indicate the error: possibly + * {@link RestStatus#CONFLICT} in any case, but if not then either {@link RestStatus#PRECONDITION_FAILED} if the object exists + * but doesn't match the specified precondition, or {@link RestStatus#NOT_FOUND} if the object doesn't exist but is required to + * do so by the precondition. + * + * @see AWS docs */ private RestStatus updateBlobContents(HttpExchange exchange, String path, BytesReference newContents) { if (isProtectOverwrite(exchange)) { From 86363d1d83d886ca47f84e50059d7d9c5ab0a136 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 25 Nov 2025 10:19:11 +0000 Subject: [PATCH 4/4] Accept 404 for abort-on-cleanup response --- .../repositories/s3/S3BlobContainer.java | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java index 8b00d90e2e051..e613b38fcdde2 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java @@ -608,7 +608,24 @@ private void executeMultipart( final String uploadId; try (AmazonS3Reference clientReference = s3BlobStore.clientReference()) { uploadId = clientReference.client().createMultipartUpload(createMultipartUpload(purpose, operation, blobName)).uploadId(); - cleanupOnFailureActions.add(() -> abortMultiPartUpload(purpose, uploadId, blobName)); + cleanupOnFailureActions.add(() -> { + try { + abortMultiPartUpload(purpose, uploadId, blobName); + } catch (Exception e) { + if (e instanceof SdkServiceException sdkServiceException + && sdkServiceException.statusCode() == RestStatus.NOT_FOUND.getStatus()) { + // NOT_FOUND is what we wanted + logger.atDebug() + .withThrowable(e) + .log("multipart upload of [{}] with ID [{}] not found on abort", blobName, uploadId); + } else { + // aborting the upload on failure is a best-effort cleanup step - if it fails then we must just move on + logger.atWarn() + .withThrowable(e) + .log("failed to clean up multipart upload of [{}] with ID [{}] after earlier failure", blobName, uploadId); + } + } + }); } if (Strings.isEmpty(uploadId)) { throw new IOException("Failed to initialize multipart operation for " + blobName);