From 69b288d1870bd9d2b4d67bcbc59a2965beef0a3a Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 24 Nov 2025 10:44:55 +0000 Subject: [PATCH 1/3] Fix exception handling in S3 `compareAndExchangeRegister` In #138422 we shifted the exception handling for `compareAndExchangeRegister` into its `run()` method, but this introduced a subtle bug: an `AwsServiceException` may be thrown synchronously, bypassing the handling added to the listener with the `delegateResponse` call. This commit reinstates the missing exception handling. --- .../repositories/s3/S3BlobContainer.java | 29 +++++++++---------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java index d5a380b2e3b69..8b00d90e2e051 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java @@ -810,12 +810,12 @@ private class MultipartUploadCompareAndExchangeOperation { this.threadPool = threadPool; } - void run(BytesReference expected, BytesReference updated, ActionListener listener) throws Exception { - innerRun(expected, updated, listener.delegateResponse((delegate, e) -> { + void run(BytesReference expected, BytesReference updated, ActionListener listener) { + ActionListener.run(listener.delegateResponse((delegate, e) -> { logger.trace(() -> Strings.format("[%s]: compareAndExchangeRegister failed", rawKey), e); - if (e instanceof AwsServiceException awsServiceException - && (awsServiceException.statusCode() == 404 - || awsServiceException.statusCode() == 200 + if ((e instanceof AwsServiceException awsServiceException) + && (awsServiceException.statusCode() == RestStatus.NOT_FOUND.getStatus() + || awsServiceException.statusCode() == RestStatus.OK.getStatus() && "NoSuchUpload".equals(awsServiceException.awsErrorDetails().errorCode()))) { // An uncaught 404 means that our multipart upload was aborted by a concurrent operation before we could complete it. // Also (rarely) S3 can start processing the request during a concurrent abort and this can result in a 200 OK with an @@ -824,7 +824,7 @@ void run(BytesReference expected, BytesReference updated, ActionListener innerRun(expected, updated, l)); } void innerRun(BytesReference expected, BytesReference updated, ActionListener listener) throws Exception { @@ -1120,16 +1120,13 @@ public void compareAndExchangeRegister( ActionListener listener ) { final var clientReference = blobStore.clientReference(); - ActionListener.run( - ActionListener.releaseBefore(clientReference, listener), - l -> new MultipartUploadCompareAndExchangeOperation( - purpose, - clientReference.client(), - blobStore.bucket(), - key, - blobStore.getThreadPool() - ).run(expected, updated, l) - ); + new MultipartUploadCompareAndExchangeOperation( + purpose, + clientReference.client(), + blobStore.bucket(), + key, + blobStore.getThreadPool() + ).run(expected, updated, ActionListener.releaseBefore(clientReference, listener)); } @Override From 0c405c538fc041bbf5a0ea20a3e93682130854f8 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 24 Nov 2025 11:42:47 +0000 Subject: [PATCH 2/3] Add test --- .../s3/S3BlobContainerRetriesTests.java | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java index e217cffc6d3aa..bc08d42628839 100644 --- a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -1346,6 +1346,35 @@ public void testRetryOn403InStateless() { assertEquals(denyAccessAfterAttempt <= maxRetries ? 1 : 0, accessDeniedResponseCount.get()); } + public void testUploadNotFoundInCompareAndExchange() { + final var blobContainerPath = BlobPath.EMPTY.add(getTestName()); + final var statefulBlobContainer = createBlobContainer(1, null, null, null, null, null, blobContainerPath); + + httpServer.createContext("/", new S3HttpHandler("bucket") { + @Override + public void handle(HttpExchange exchange) throws IOException { + if (parseRequest(exchange).isUploadPartRequest()) { + exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1); + } else { + super.handle(exchange); + } + } + }); + + safeAwait( + l -> statefulBlobContainer.compareAndExchangeRegister( + randomPurpose(), + "not_found_register", + BytesArray.EMPTY, + new BytesArray(new byte[1]), + l.map(result -> { + assertFalse(result.isPresent()); + return null; + }) + ) + ); + } + private static String getBase16MD5Digest(BytesReference bytesReference) { return MessageDigests.toHexString(MessageDigests.digest(bytesReference, MessageDigests.md5())); } From 5cfcf3fe31e54d3d91744f19df1ed7b2a412ee12 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 24 Nov 2025 13:05:11 +0000 Subject: [PATCH 3/3] SuppressForbidden --- .../repositories/s3/S3BlobContainerRetriesTests.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java index bc08d42628839..65b4f58f6cb55 100644 --- a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -1350,7 +1350,12 @@ public void testUploadNotFoundInCompareAndExchange() { final var blobContainerPath = BlobPath.EMPTY.add(getTestName()); final var statefulBlobContainer = createBlobContainer(1, null, null, null, null, null, blobContainerPath); - httpServer.createContext("/", new S3HttpHandler("bucket") { + @SuppressForbidden(reason = "use a http server") + class RejectsUploadPartRequests extends S3HttpHandler { + RejectsUploadPartRequests() { + super("bucket"); + } + @Override public void handle(HttpExchange exchange) throws IOException { if (parseRequest(exchange).isUploadPartRequest()) { @@ -1359,7 +1364,9 @@ public void handle(HttpExchange exchange) throws IOException { super.handle(exchange); } } - }); + } + + httpServer.createContext("/", new RejectsUploadPartRequests()); safeAwait( l -> statefulBlobContainer.compareAndExchangeRegister(