Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -810,12 +810,12 @@ private class MultipartUploadCompareAndExchangeOperation {
this.threadPool = threadPool;
}

void run(BytesReference expected, BytesReference updated, ActionListener<OptionalBytesReference> listener) throws Exception {
innerRun(expected, updated, listener.delegateResponse((delegate, e) -> {
void run(BytesReference expected, BytesReference updated, ActionListener<OptionalBytesReference> listener) {
ActionListener.run(listener.delegateResponse((delegate, e) -> {
logger.trace(() -> Strings.format("[%s]: compareAndExchangeRegister failed", rawKey), e);
if (e instanceof AwsServiceException awsServiceException
&& (awsServiceException.statusCode() == 404
|| awsServiceException.statusCode() == 200
if ((e instanceof AwsServiceException awsServiceException)
&& (awsServiceException.statusCode() == RestStatus.NOT_FOUND.getStatus()
|| awsServiceException.statusCode() == RestStatus.OK.getStatus()
&& "NoSuchUpload".equals(awsServiceException.awsErrorDetails().errorCode()))) {
// An uncaught 404 means that our multipart upload was aborted by a concurrent operation before we could complete it.
// Also (rarely) S3 can start processing the request during a concurrent abort and this can result in a 200 OK with an
Expand All @@ -824,7 +824,7 @@ void run(BytesReference expected, BytesReference updated, ActionListener<Optiona
} else {
delegate.onFailure(e);
}
}));
}), l -> innerRun(expected, updated, l));
}

void innerRun(BytesReference expected, BytesReference updated, ActionListener<OptionalBytesReference> listener) throws Exception {
Expand Down Expand Up @@ -1120,16 +1120,13 @@ public void compareAndExchangeRegister(
ActionListener<OptionalBytesReference> listener
) {
final var clientReference = blobStore.clientReference();
ActionListener.run(
ActionListener.releaseBefore(clientReference, listener),
l -> new MultipartUploadCompareAndExchangeOperation(
purpose,
clientReference.client(),
blobStore.bucket(),
key,
blobStore.getThreadPool()
).run(expected, updated, l)
);
new MultipartUploadCompareAndExchangeOperation(
purpose,
clientReference.client(),
blobStore.bucket(),
key,
blobStore.getThreadPool()
).run(expected, updated, ActionListener.releaseBefore(clientReference, listener));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1346,6 +1346,42 @@ public void testRetryOn403InStateless() {
assertEquals(denyAccessAfterAttempt <= maxRetries ? 1 : 0, accessDeniedResponseCount.get());
}

public void testUploadNotFoundInCompareAndExchange() {
final var blobContainerPath = BlobPath.EMPTY.add(getTestName());
final var statefulBlobContainer = createBlobContainer(1, null, null, null, null, null, blobContainerPath);

@SuppressForbidden(reason = "use a http server")
class RejectsUploadPartRequests extends S3HttpHandler {
RejectsUploadPartRequests() {
super("bucket");
}

@Override
public void handle(HttpExchange exchange) throws IOException {
if (parseRequest(exchange).isUploadPartRequest()) {
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
} else {
super.handle(exchange);
}
}
}

httpServer.createContext("/", new RejectsUploadPartRequests());

safeAwait(
l -> statefulBlobContainer.compareAndExchangeRegister(
randomPurpose(),
"not_found_register",
BytesArray.EMPTY,
new BytesArray(new byte[1]),
l.map(result -> {
assertFalse(result.isPresent());
return null;
})
)
);
}

private static String getBase16MD5Digest(BytesReference bytesReference) {
return MessageDigests.toHexString(MessageDigests.digest(bytesReference, MessageDigests.md5()));
}
Expand Down