Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/138569.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 138569
summary: Ignore abort-on-cleanup failure in S3 repo
area: Snapshot/Restore
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,24 @@ private void executeMultipart(
final String uploadId;
try (AmazonS3Reference clientReference = s3BlobStore.clientReference()) {
uploadId = clientReference.client().createMultipartUpload(createMultipartUpload(purpose, operation, blobName)).uploadId();
cleanupOnFailureActions.add(() -> abortMultiPartUpload(purpose, uploadId, blobName));
cleanupOnFailureActions.add(() -> {
try {
abortMultiPartUpload(purpose, uploadId, blobName);
} catch (Exception e) {
if (e instanceof SdkServiceException sdkServiceException
&& sdkServiceException.statusCode() == RestStatus.NOT_FOUND.getStatus()) {
// NOT_FOUND is what we wanted
logger.atDebug()
.withThrowable(e)
.log("multipart upload of [{}] with ID [{}] not found on abort", blobName, uploadId);
} else {
// aborting the upload on failure is a best-effort cleanup step - if it fails then we must just move on
logger.atWarn()
.withThrowable(e)
.log("failed to clean up multipart upload of [{}] with ID [{}] after earlier failure", blobName, uploadId);
}
}
});
}
if (Strings.isEmpty(uploadId)) {
throw new IOException("Failed to initialize multipart operation for " + blobName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.fixture.HttpHeaderParser;

import java.io.IOException;
Expand Down Expand Up @@ -190,14 +191,15 @@ public void handle(final HttpExchange exchange) throws IOException {

} else if (request.isCompleteMultipartUploadRequest()) {
final byte[] responseBody;
final boolean preconditionFailed;
final RestStatus responseCode;
synchronized (uploads) {
final var upload = getUpload(request.getQueryParamOnce("uploadId"));
if (upload == null) {
preconditionFailed = false;
if (Randomness.get().nextBoolean()) {
responseCode = RestStatus.NOT_FOUND;
responseBody = null;
} else {
responseCode = RestStatus.OK;
responseBody = """
<?xml version="1.0" encoding="UTF-8"?>
<Error>
Expand All @@ -209,10 +211,8 @@ public void handle(final HttpExchange exchange) throws IOException {
}
} else {
final var blobContents = upload.complete(extractPartEtags(Streams.readFully(exchange.getRequestBody())));

preconditionFailed = updateBlobContents(exchange, request.path(), blobContents) == false;

if (preconditionFailed == false) {
responseCode = updateBlobContents(exchange, request.path(), blobContents);
if (responseCode == RestStatus.OK) {
responseBody = ("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
+ "<CompleteMultipartUploadResult>\n"
+ "<Bucket>"
Expand All @@ -222,20 +222,20 @@ public void handle(final HttpExchange exchange) throws IOException {
+ request.path()
+ "</Key>\n"
+ "</CompleteMultipartUploadResult>").getBytes(StandardCharsets.UTF_8);
removeUpload(upload.getUploadId());
} else {
responseBody = null;
}
if (responseCode != RestStatus.PRECONDITION_FAILED) {
removeUpload(upload.getUploadId());
}
}
}
if (preconditionFailed) {
exchange.sendResponseHeaders(RestStatus.PRECONDITION_FAILED.getStatus(), -1);
} else if (responseBody == null) {
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
} else {
if (responseCode == RestStatus.OK) {
exchange.getResponseHeaders().add("Content-Type", "application/xml");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), responseBody.length);
exchange.getResponseBody().write(responseBody);
} else {
exchange.sendResponseHeaders(responseCode.getStatus(), -1);
}
} else if (request.isAbortMultipartUploadRequest()) {
final var upload = removeUpload(request.getQueryParamOnce("uploadId"));
Expand Down Expand Up @@ -264,14 +264,12 @@ public void handle(final HttpExchange exchange) throws IOException {
}
} else {
final Tuple<String, BytesReference> blob = parseRequestBody(exchange);
final var preconditionFailed = updateBlobContents(exchange, request.path(), blob.v2()) == false;
final var updateResponseCode = updateBlobContents(exchange, request.path(), blob.v2());

if (preconditionFailed) {
exchange.sendResponseHeaders(RestStatus.PRECONDITION_FAILED.getStatus(), -1);
} else {
if (updateResponseCode == RestStatus.OK) {
exchange.getResponseHeaders().add("ETag", blob.v1());
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
}
exchange.sendResponseHeaders(updateResponseCode.getStatus(), -1);
}

} else if (request.isListObjectsRequest()) {
Expand Down Expand Up @@ -407,15 +405,21 @@ public void handle(final HttpExchange exchange) throws IOException {
/**
* Update the blob contents if and only if the preconditions in the request are satisfied.
*
* @return whether the blob contents were updated: if {@code false} then a requested precondition was not satisfied.
* @return {@link RestStatus#OK} if the blob contents were updated, or else a different status code to indicate the error: possibly
* {@link RestStatus#CONFLICT} or {@link RestStatus#PRECONDITION_FAILED} if the object exists and the precondition requires it
* not to.
*
* @see <a href="https://docs.aws.amazon.com/AmazonS3/latest/userguide/conditional-writes.html#conditional-error-response">AWS docs</a>
*/
private boolean updateBlobContents(HttpExchange exchange, String path, BytesReference newContents) {
private RestStatus updateBlobContents(HttpExchange exchange, String path, BytesReference newContents) {
if (isProtectOverwrite(exchange)) {
return blobs.putIfAbsent(path, newContents) == null;
} else {
blobs.put(path, newContents);
return true;
return blobs.putIfAbsent(path, newContents) == null
? RestStatus.OK
: ESTestCase.randomFrom(RestStatus.PRECONDITION_FAILED, RestStatus.CONFLICT);
}

blobs.put(path, newContents);
return RestStatus.OK;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,13 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.oneOf;

public class S3HttpHandlerTests extends ESTestCase {

Expand Down Expand Up @@ -412,7 +411,7 @@ public void testExtractPartEtags() {

}

public void testPreventObjectOverwrite() throws InterruptedException {
public void testPreventObjectOverwrite() {
final var handler = new S3HttpHandler("bucket", "path");

var tasks = List.of(
Expand All @@ -422,12 +421,7 @@ public void testPreventObjectOverwrite() throws InterruptedException {
createMultipartUploadTask(handler)
);

try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
tasks.forEach(task -> executor.submit(task.consumer));
executor.shutdown();
var done = executor.awaitTermination(SAFE_AWAIT_TIMEOUT.seconds(), TimeUnit.SECONDS);
assertTrue(done);
}
runInParallel(tasks.size(), i -> tasks.get(i).consumer.run());

List<TestWriteTask> successfulTasks = tasks.stream().filter(task -> task.status == RestStatus.OK).toList();
assertThat(successfulTasks, hasSize(1));
Expand All @@ -436,6 +430,7 @@ public void testPreventObjectOverwrite() throws InterruptedException {
if (task.status == RestStatus.PRECONDITION_FAILED) {
assertNotNull(handler.getUpload(task.uploadId));
} else {
assertThat(task.status, oneOf(RestStatus.OK, RestStatus.CONFLICT));
assertNull(handler.getUpload(task.uploadId));
}
});
Expand Down