diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java index d5a380b2e3b69..8b00d90e2e051 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java @@ -810,12 +810,12 @@ private class MultipartUploadCompareAndExchangeOperation { this.threadPool = threadPool; } - void run(BytesReference expected, BytesReference updated, ActionListener listener) throws Exception { - innerRun(expected, updated, listener.delegateResponse((delegate, e) -> { + void run(BytesReference expected, BytesReference updated, ActionListener listener) { + ActionListener.run(listener.delegateResponse((delegate, e) -> { logger.trace(() -> Strings.format("[%s]: compareAndExchangeRegister failed", rawKey), e); - if (e instanceof AwsServiceException awsServiceException - && (awsServiceException.statusCode() == 404 - || awsServiceException.statusCode() == 200 + if ((e instanceof AwsServiceException awsServiceException) + && (awsServiceException.statusCode() == RestStatus.NOT_FOUND.getStatus() + || awsServiceException.statusCode() == RestStatus.OK.getStatus() && "NoSuchUpload".equals(awsServiceException.awsErrorDetails().errorCode()))) { // An uncaught 404 means that our multipart upload was aborted by a concurrent operation before we could complete it. // Also (rarely) S3 can start processing the request during a concurrent abort and this can result in a 200 OK with an @@ -824,7 +824,7 @@ void run(BytesReference expected, BytesReference updated, ActionListener innerRun(expected, updated, l)); } void innerRun(BytesReference expected, BytesReference updated, ActionListener listener) throws Exception { @@ -1120,16 +1120,13 @@ public void compareAndExchangeRegister( ActionListener listener ) { final var clientReference = blobStore.clientReference(); - ActionListener.run( - ActionListener.releaseBefore(clientReference, listener), - l -> new MultipartUploadCompareAndExchangeOperation( - purpose, - clientReference.client(), - blobStore.bucket(), - key, - blobStore.getThreadPool() - ).run(expected, updated, l) - ); + new MultipartUploadCompareAndExchangeOperation( + purpose, + clientReference.client(), + blobStore.bucket(), + key, + blobStore.getThreadPool() + ).run(expected, updated, ActionListener.releaseBefore(clientReference, listener)); } @Override diff --git a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java index e217cffc6d3aa..65b4f58f6cb55 100644 --- a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -1346,6 +1346,42 @@ public void testRetryOn403InStateless() { assertEquals(denyAccessAfterAttempt <= maxRetries ? 1 : 0, accessDeniedResponseCount.get()); } + public void testUploadNotFoundInCompareAndExchange() { + final var blobContainerPath = BlobPath.EMPTY.add(getTestName()); + final var statefulBlobContainer = createBlobContainer(1, null, null, null, null, null, blobContainerPath); + + @SuppressForbidden(reason = "use a http server") + class RejectsUploadPartRequests extends S3HttpHandler { + RejectsUploadPartRequests() { + super("bucket"); + } + + @Override + public void handle(HttpExchange exchange) throws IOException { + if (parseRequest(exchange).isUploadPartRequest()) { + exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1); + } else { + super.handle(exchange); + } + } + } + + httpServer.createContext("/", new RejectsUploadPartRequests()); + + safeAwait( + l -> statefulBlobContainer.compareAndExchangeRegister( + randomPurpose(), + "not_found_register", + BytesArray.EMPTY, + new BytesArray(new byte[1]), + l.map(result -> { + assertFalse(result.isPresent()); + return null; + }) + ) + ); + } + private static String getBase16MD5Digest(BytesReference bytesReference) { return MessageDigests.toHexString(MessageDigests.digest(bytesReference, MessageDigests.md5())); }