From e000138e3ee226503007564715c58882a20cac19 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 21 Nov 2025 13:44:40 +0000 Subject: [PATCH] Rename S3 `CompareAndExchangeOperation` As a preparatory step to providing an alternative implementation of the compare-and-exchange operation on S3, this commit renames the existing implementation and shifts the necessary exception-handling logic inside its `run` method for improved encapsulation. --- .../repositories/s3/S3BlobContainer.java | 58 ++++++++++++------- 1 file changed, 37 insertions(+), 21 deletions(-) diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java index 4ff6c9291079f..d5a380b2e3b69 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java @@ -783,7 +783,10 @@ static Tuple numberOfMultiparts(final long totalSize, final long par } } - private class CompareAndExchangeOperation { + /** + * An implementation of {@link BlobContainer#compareAndExchangeRegister} based on strongly-consistent multipart upload APIs. + */ + private class MultipartUploadCompareAndExchangeOperation { private final OperationPurpose purpose; private final S3Client client; @@ -792,7 +795,13 @@ private class CompareAndExchangeOperation { private final String blobKey; private final ThreadPool threadPool; - CompareAndExchangeOperation(OperationPurpose purpose, S3Client client, String bucket, String key, ThreadPool threadPool) { + MultipartUploadCompareAndExchangeOperation( + OperationPurpose purpose, + S3Client client, + String bucket, + String key, + ThreadPool threadPool + ) { this.purpose = purpose; this.client = client; this.bucket = bucket; @@ -802,6 +811,23 @@ private class CompareAndExchangeOperation { } void run(BytesReference expected, BytesReference updated, ActionListener listener) throws Exception { + innerRun(expected, updated, listener.delegateResponse((delegate, e) -> { + logger.trace(() -> Strings.format("[%s]: compareAndExchangeRegister failed", rawKey), e); + if (e instanceof AwsServiceException awsServiceException + && (awsServiceException.statusCode() == 404 + || awsServiceException.statusCode() == 200 + && "NoSuchUpload".equals(awsServiceException.awsErrorDetails().errorCode()))) { + // An uncaught 404 means that our multipart upload was aborted by a concurrent operation before we could complete it. + // Also (rarely) S3 can start processing the request during a concurrent abort and this can result in a 200 OK with an + // NoSuchUpload... in the response. Either way, this means that our write encountered contention: + delegate.onResponse(OptionalBytesReference.MISSING); + } else { + delegate.onFailure(e); + } + })); + } + + void innerRun(BytesReference expected, BytesReference updated, ActionListener listener) throws Exception { BlobContainerUtils.ensureValidRegisterContent(updated); if (hasPreexistingUploads()) { @@ -1094,25 +1120,15 @@ public void compareAndExchangeRegister( ActionListener listener ) { final var clientReference = blobStore.clientReference(); - ActionListener.run(ActionListener.releaseAfter(listener.delegateResponse((delegate, e) -> { - logger.trace(() -> Strings.format("[%s]: compareAndExchangeRegister failed", key), e); - if (e instanceof AwsServiceException awsServiceException - && (awsServiceException.statusCode() == 404 - || awsServiceException.statusCode() == 200 - && "NoSuchUpload".equals(awsServiceException.awsErrorDetails().errorCode()))) { - // An uncaught 404 means that our multipart upload was aborted by a concurrent operation before we could complete it. - // Also (rarely) S3 can start processing the request during a concurrent abort and this can result in a 200 OK with an - // NoSuchUpload... in the response. Either way, this means that our write encountered contention: - delegate.onResponse(OptionalBytesReference.MISSING); - } else { - delegate.onFailure(e); - } - }), clientReference), - l -> new CompareAndExchangeOperation(purpose, clientReference.client(), blobStore.bucket(), key, blobStore.getThreadPool()).run( - expected, - updated, - l - ) + ActionListener.run( + ActionListener.releaseBefore(clientReference, listener), + l -> new MultipartUploadCompareAndExchangeOperation( + purpose, + clientReference.client(), + blobStore.bucket(), + key, + blobStore.getThreadPool() + ).run(expected, updated, l) ); }