Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,10 @@ static Tuple<Long, Long> numberOfMultiparts(final long totalSize, final long par
}
}

private class CompareAndExchangeOperation {
/**
* An implementation of {@link BlobContainer#compareAndExchangeRegister} based on strongly-consistent multipart upload APIs.
*/
private class MultipartUploadCompareAndExchangeOperation {

private final OperationPurpose purpose;
private final S3Client client;
Expand All @@ -792,7 +795,13 @@ private class CompareAndExchangeOperation {
private final String blobKey;
private final ThreadPool threadPool;

CompareAndExchangeOperation(OperationPurpose purpose, S3Client client, String bucket, String key, ThreadPool threadPool) {
MultipartUploadCompareAndExchangeOperation(
OperationPurpose purpose,
S3Client client,
String bucket,
String key,
ThreadPool threadPool
) {
this.purpose = purpose;
this.client = client;
this.bucket = bucket;
Expand All @@ -802,6 +811,23 @@ private class CompareAndExchangeOperation {
}

void run(BytesReference expected, BytesReference updated, ActionListener<OptionalBytesReference> listener) throws Exception {
innerRun(expected, updated, listener.delegateResponse((delegate, e) -> {
logger.trace(() -> Strings.format("[%s]: compareAndExchangeRegister failed", rawKey), e);
if (e instanceof AwsServiceException awsServiceException
&& (awsServiceException.statusCode() == 404
|| awsServiceException.statusCode() == 200
&& "NoSuchUpload".equals(awsServiceException.awsErrorDetails().errorCode()))) {
// An uncaught 404 means that our multipart upload was aborted by a concurrent operation before we could complete it.
// Also (rarely) S3 can start processing the request during a concurrent abort and this can result in a 200 OK with an
// <Error><Code>NoSuchUpload</Code>... in the response. Either way, this means that our write encountered contention:
delegate.onResponse(OptionalBytesReference.MISSING);
} else {
delegate.onFailure(e);
}
}));
}

void innerRun(BytesReference expected, BytesReference updated, ActionListener<OptionalBytesReference> listener) throws Exception {
BlobContainerUtils.ensureValidRegisterContent(updated);

if (hasPreexistingUploads()) {
Expand Down Expand Up @@ -1094,25 +1120,15 @@ public void compareAndExchangeRegister(
ActionListener<OptionalBytesReference> listener
) {
final var clientReference = blobStore.clientReference();
ActionListener.run(ActionListener.releaseAfter(listener.delegateResponse((delegate, e) -> {
logger.trace(() -> Strings.format("[%s]: compareAndExchangeRegister failed", key), e);
if (e instanceof AwsServiceException awsServiceException
&& (awsServiceException.statusCode() == 404
|| awsServiceException.statusCode() == 200
&& "NoSuchUpload".equals(awsServiceException.awsErrorDetails().errorCode()))) {
// An uncaught 404 means that our multipart upload was aborted by a concurrent operation before we could complete it.
// Also (rarely) S3 can start processing the request during a concurrent abort and this can result in a 200 OK with an
// <Error><Code>NoSuchUpload</Code>... in the response. Either way, this means that our write encountered contention:
delegate.onResponse(OptionalBytesReference.MISSING);
} else {
delegate.onFailure(e);
}
}), clientReference),
l -> new CompareAndExchangeOperation(purpose, clientReference.client(), blobStore.bucket(), key, blobStore.getThreadPool()).run(
expected,
updated,
l
)
ActionListener.run(
ActionListener.releaseBefore(clientReference, listener),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only difference in behaviour is that this used to be releaseAfter, but I think that was only because we didn't have releaseBefore when the code was originally written.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this is lies, there's another more important change in behaviour which #138488 addresses. 🤦

l -> new MultipartUploadCompareAndExchangeOperation(
purpose,
clientReference.client(),
blobStore.bucket(),
key,
blobStore.getThreadPool()
).run(expected, updated, l)
);
}

Expand Down