From 5e09f65aa7e50294e642aee346dca2da7ab08118 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 25 Nov 2025 11:45:28 +0000 Subject: [PATCH 1/2] Ignore abort-on-cleanup failure in S3 repo If a `CompleteMultipartUpload` request fails then we attempt to clean up by calling the `AbortMultipartUpload` API. This cleanup may also fail, and a failure here will supersede the original failure. The cleanup is really a best-effort thing so we should ignore failures. In particular if the `CompleteMultipartUpload` has an `If-None-Match` precondition then this may fail with a `409 Conflict` and in that case the upload no longer exists, so we would expect the `AbortMultipartUpload` call to fail with a 404. This commit extends `S3HttpHandler` to respond with both 412 and 409s on precondition failures, cleaning up the upload on a 409 but not a 412, triggering the unwanted failure path in some tests, and adds exception handling to deal with it. --- .../repositories/s3/S3BlobContainer.java | 19 ++++++- .../main/java/fixture/s3/S3HttpHandler.java | 50 ++++++++++--------- .../java/fixture/s3/S3HttpHandlerTests.java | 13 ++--- 3 files changed, 49 insertions(+), 33 deletions(-) diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java index 8b00d90e2e051..e613b38fcdde2 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java @@ -608,7 +608,24 @@ private void executeMultipart( final String uploadId; try (AmazonS3Reference clientReference = s3BlobStore.clientReference()) { uploadId = clientReference.client().createMultipartUpload(createMultipartUpload(purpose, operation, blobName)).uploadId(); - cleanupOnFailureActions.add(() -> abortMultiPartUpload(purpose, uploadId, blobName)); + cleanupOnFailureActions.add(() -> { + try { + abortMultiPartUpload(purpose, uploadId, blobName); + } catch (Exception e) { + if (e instanceof SdkServiceException sdkServiceException + && sdkServiceException.statusCode() == RestStatus.NOT_FOUND.getStatus()) { + // NOT_FOUND is what we wanted + logger.atDebug() + .withThrowable(e) + .log("multipart upload of [{}] with ID [{}] not found on abort", blobName, uploadId); + } else { + // aborting the upload on failure is a best-effort cleanup step - if it fails then we must just move on + logger.atWarn() + .withThrowable(e) + .log("failed to clean up multipart upload of [{}] with ID [{}] after earlier failure", blobName, uploadId); + } + } + }); } if (Strings.isEmpty(uploadId)) { throw new IOException("Failed to initialize multipart operation for " + blobName); diff --git a/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java b/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java index 619582ba7ab9d..d282c448820a5 100644 --- a/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java +++ b/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java @@ -26,6 +26,7 @@ import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.fixture.HttpHeaderParser; import java.io.IOException; @@ -190,14 +191,15 @@ public void handle(final HttpExchange exchange) throws IOException { } else if (request.isCompleteMultipartUploadRequest()) { final byte[] responseBody; - final boolean preconditionFailed; + final RestStatus responseCode; synchronized (uploads) { final var upload = getUpload(request.getQueryParamOnce("uploadId")); if (upload == null) { - preconditionFailed = false; if (Randomness.get().nextBoolean()) { + responseCode = RestStatus.NOT_FOUND; responseBody = null; } else { + responseCode = RestStatus.OK; responseBody = """ @@ -209,10 +211,8 @@ public void handle(final HttpExchange exchange) throws IOException { } } else { final var blobContents = upload.complete(extractPartEtags(Streams.readFully(exchange.getRequestBody()))); - - preconditionFailed = updateBlobContents(exchange, request.path(), blobContents) == false; - - if (preconditionFailed == false) { + responseCode = updateBlobContents(exchange, request.path(), blobContents); + if (responseCode == RestStatus.OK) { responseBody = ("\n" + "\n" + "" @@ -222,20 +222,20 @@ public void handle(final HttpExchange exchange) throws IOException { + request.path() + "\n" + "").getBytes(StandardCharsets.UTF_8); - removeUpload(upload.getUploadId()); } else { responseBody = null; } + if (responseCode != RestStatus.PRECONDITION_FAILED) { + removeUpload(upload.getUploadId()); + } } } - if (preconditionFailed) { - exchange.sendResponseHeaders(RestStatus.PRECONDITION_FAILED.getStatus(), -1); - } else if (responseBody == null) { - exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1); - } else { + if (responseCode == RestStatus.OK) { exchange.getResponseHeaders().add("Content-Type", "application/xml"); exchange.sendResponseHeaders(RestStatus.OK.getStatus(), responseBody.length); exchange.getResponseBody().write(responseBody); + } else { + exchange.sendResponseHeaders(responseCode.getStatus(), -1); } } else if (request.isAbortMultipartUploadRequest()) { final var upload = removeUpload(request.getQueryParamOnce("uploadId")); @@ -264,14 +264,12 @@ public void handle(final HttpExchange exchange) throws IOException { } } else { final Tuple blob = parseRequestBody(exchange); - final var preconditionFailed = updateBlobContents(exchange, request.path(), blob.v2()) == false; + final var updateResponseCode = updateBlobContents(exchange, request.path(), blob.v2()); - if (preconditionFailed) { - exchange.sendResponseHeaders(RestStatus.PRECONDITION_FAILED.getStatus(), -1); - } else { + if (updateResponseCode == RestStatus.OK) { exchange.getResponseHeaders().add("ETag", blob.v1()); - exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1); } + exchange.sendResponseHeaders(updateResponseCode.getStatus(), -1); } } else if (request.isListObjectsRequest()) { @@ -407,15 +405,21 @@ public void handle(final HttpExchange exchange) throws IOException { /** * Update the blob contents if and only if the preconditions in the request are satisfied. * - * @return whether the blob contents were updated: if {@code false} then a requested precondition was not satisfied. + * @return {@link RestStatus#OK} if the blob contents were updated, or else a different status code to indicate the error: possibly + * {@link RestStatus#CONFLICT} or {@link RestStatus#PRECONDITION_FAILED} if the object exists and the precondition requires it + * not to. + * + * @see AWS docs */ - private boolean updateBlobContents(HttpExchange exchange, String path, BytesReference newContents) { + private RestStatus updateBlobContents(HttpExchange exchange, String path, BytesReference newContents) { if (isProtectOverwrite(exchange)) { - return blobs.putIfAbsent(path, newContents) == null; - } else { - blobs.put(path, newContents); - return true; + return blobs.putIfAbsent(path, newContents) == null + ? RestStatus.OK + : ESTestCase.randomFrom(RestStatus.PRECONDITION_FAILED, RestStatus.CONFLICT); } + + blobs.put(path, newContents); + return RestStatus.OK; } /** diff --git a/test/fixtures/s3-fixture/src/test/java/fixture/s3/S3HttpHandlerTests.java b/test/fixtures/s3-fixture/src/test/java/fixture/s3/S3HttpHandlerTests.java index ec7a3e15da707..de4b1d95c0ae2 100644 --- a/test/fixtures/s3-fixture/src/test/java/fixture/s3/S3HttpHandlerTests.java +++ b/test/fixtures/s3-fixture/src/test/java/fixture/s3/S3HttpHandlerTests.java @@ -32,14 +32,13 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.oneOf; public class S3HttpHandlerTests extends ESTestCase { @@ -412,7 +411,7 @@ public void testExtractPartEtags() { } - public void testPreventObjectOverwrite() throws InterruptedException { + public void testPreventObjectOverwrite() { final var handler = new S3HttpHandler("bucket", "path"); var tasks = List.of( @@ -422,12 +421,7 @@ public void testPreventObjectOverwrite() throws InterruptedException { createMultipartUploadTask(handler) ); - try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { - tasks.forEach(task -> executor.submit(task.consumer)); - executor.shutdown(); - var done = executor.awaitTermination(SAFE_AWAIT_TIMEOUT.seconds(), TimeUnit.SECONDS); - assertTrue(done); - } + runInParallel(tasks.size(), i -> tasks.get(i).consumer.run()); List successfulTasks = tasks.stream().filter(task -> task.status == RestStatus.OK).toList(); assertThat(successfulTasks, hasSize(1)); @@ -436,6 +430,7 @@ public void testPreventObjectOverwrite() throws InterruptedException { if (task.status == RestStatus.PRECONDITION_FAILED) { assertNotNull(handler.getUpload(task.uploadId)); } else { + assertThat(task.status, oneOf(RestStatus.OK, RestStatus.CONFLICT)); assertNull(handler.getUpload(task.uploadId)); } }); From 3f2551de2cd872096cdd84b0d8261e39f7cd4a02 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 25 Nov 2025 11:57:06 +0000 Subject: [PATCH 2/2] Update docs/changelog/138569.yaml --- docs/changelog/138569.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/138569.yaml diff --git a/docs/changelog/138569.yaml b/docs/changelog/138569.yaml new file mode 100644 index 0000000000000..521b2b4b14a4c --- /dev/null +++ b/docs/changelog/138569.yaml @@ -0,0 +1,5 @@ +pr: 138569 +summary: Ignore abort-on-cleanup failure in S3 repo +area: Snapshot/Restore +type: bug +issues: []