From 8aa2e41cdd1da9b622e994da5b7d06d09c37c0fc Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 26 Nov 2025 09:34:34 +0000 Subject: [PATCH 1/5] Support weaker consistency model for S3 MPUs Adjusts the implementation of linearizable registers in S3 repositories to allow for the weaker multipart upload API semantics observed in practice. Also adjusts the S3 test fixture to (optionally) simulate the weaker semantics, and extends the repository analysis REST tests to cover both cases. --- .../repositories/s3/S3BlobContainer.java | 66 +++++++- .../s3/S3BlobContainerRetriesTests.java | 73 ++++++++- .../java/fixture/s3/S3ConsistencyModel.java | 46 ++++++ .../main/java/fixture/s3/S3HttpFixture.java | 9 + .../main/java/fixture/s3/S3HttpHandler.java | 154 +++++++++++------- .../common/bytes/BytesReferenceTestUtils.java | 73 +++++++++ .../org/elasticsearch/test/ESTestCase.java | 4 + ...tractS3RepositoryAnalysisRestTestCase.java | 124 ++++++++++++++ .../analyze/S3RepositoryAnalysisRestIT.java | 100 +----------- .../S3RepositoryAnalysisStrongMpusRestIT.java | 37 +++++ 10 files changed, 521 insertions(+), 165 deletions(-) create mode 100644 test/fixtures/s3-fixture/src/main/java/fixture/s3/S3ConsistencyModel.java create mode 100644 test/framework/src/main/java/org/elasticsearch/common/bytes/BytesReferenceTestUtils.java create mode 100644 x-pack/plugin/snapshot-repo-test-kit/qa/s3/src/javaRestTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/AbstractS3RepositoryAnalysisRestTestCase.java create mode 100644 x-pack/plugin/snapshot-repo-test-kit/qa/s3/src/javaRestTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/S3RepositoryAnalysisStrongMpusRestIT.java diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java index e613b38fcdde2..affed10b0255e 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java @@ -20,6 +20,7 @@ import software.amazon.awssdk.services.s3.model.CopyObjectRequest; import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.services.s3.model.HeadObjectRequest; import software.amazon.awssdk.services.s3.model.ListMultipartUploadsRequest; import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; @@ -56,6 +57,7 @@ import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; import org.elasticsearch.common.blobstore.support.BlobContainerUtils; import org.elasticsearch.common.blobstore.support.BlobMetadata; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -832,11 +834,14 @@ void run(BytesReference expected, BytesReference updated, ActionListener Strings.format("[%s]: compareAndExchangeRegister failed", rawKey), e); if ((e instanceof AwsServiceException awsServiceException) && (awsServiceException.statusCode() == RestStatus.NOT_FOUND.getStatus() + || awsServiceException.statusCode() == RestStatus.CONFLICT.getStatus() + || awsServiceException.statusCode() == RestStatus.PRECONDITION_FAILED.getStatus() || awsServiceException.statusCode() == RestStatus.OK.getStatus() && "NoSuchUpload".equals(awsServiceException.awsErrorDetails().errorCode()))) { // An uncaught 404 means that our multipart upload was aborted by a concurrent operation before we could complete it. // Also (rarely) S3 can start processing the request during a concurrent abort and this can result in a 200 OK with an - // NoSuchUpload... in the response. Either way, this means that our write encountered contention: + // NoSuchUpload... in the response. Either way, this means that our write encountered contention. + // Also if something else changed the blob out from under us then the If-Match check results in a 409 or a 412. delegate.onResponse(OptionalBytesReference.MISSING); } else { delegate.onFailure(e); @@ -891,20 +896,20 @@ void innerRun(BytesReference expected, BytesReference updated, ActionListenerandThen(l -> getRegister(purpose, rawKey, l)) + .andThen(l -> getRegisterAndEtag(purpose, rawKey, l)) // Step 5: Perform the compare-and-swap by completing our upload iff the witnessed value matches the expected value. - .andThenApply(currentValue -> { - if (currentValue.isPresent() && currentValue.bytesReference().equals(expected)) { + .andThenApply(currentValueAndEtag -> { + if (currentValueAndEtag.registerContents().equals(expected)) { logger.trace("[{}] completing upload [{}]", blobKey, uploadId); - completeMultipartUpload(uploadId, partETag); + completeMultipartUpload(uploadId, partETag, currentValueAndEtag.eTag()); } else { // Best-effort attempt to clean up after ourselves. logger.trace("[{}] aborting upload [{}]", blobKey, uploadId); safeAbortMultipartUpload(uploadId); } - return currentValue; + return OptionalBytesReference.of(currentValueAndEtag.registerContents()); }) // Step 6: Complete the listener. @@ -1111,7 +1116,7 @@ private void abortMultipartUploadIfExists(String uploadId) { } } - private void completeMultipartUpload(String uploadId, String partETag) { + private void completeMultipartUpload(String uploadId, String partETag, String existingEtag) { final var completeMultipartUploadRequestBuilder = CompleteMultipartUploadRequest.builder() .bucket(bucket) .key(blobKey) @@ -1123,6 +1128,14 @@ private void completeMultipartUpload(String uploadId, String partETag) { Operation.PUT_MULTIPART_OBJECT, purpose ); + if (blobStore.supportsConditionalWrites(purpose)) { + if (existingEtag == null) { + completeMultipartUploadRequestBuilder.ifNoneMatch("*"); + } else { + completeMultipartUploadRequestBuilder.ifMatch(existingEtag); + } + } + final var completeMultipartUploadRequest = completeMultipartUploadRequestBuilder.build(); client.completeMultipartUpload(completeMultipartUploadRequest); } @@ -1146,8 +1159,23 @@ public void compareAndExchangeRegister( ).run(expected, updated, ActionListener.releaseBefore(clientReference, listener)); } + /** + * @param registerContents Contents of the register blob; {@link BytesArray#EMPTY} if the blob is absent. + * @param eTag Etag of the register blob; {@code null} if and only if the blob is absent. + */ + private record RegisterAndEtag(BytesReference registerContents, String eTag) { + /** + * Sentinel value to indicate that the register blob is absent. + */ + static RegisterAndEtag ABSENT = new RegisterAndEtag(BytesArray.EMPTY, null); + } + @Override public void getRegister(OperationPurpose purpose, String key, ActionListener listener) { + getRegisterAndEtag(purpose, key, listener.map(registerAndEtag -> OptionalBytesReference.of(registerAndEtag.registerContents()))); + } + + void getRegisterAndEtag(OperationPurpose purpose, String key, ActionListener listener) { ActionListener.completeWith(listener, () -> { final var backoffPolicy = purpose == OperationPurpose.REPOSITORY_ANALYSIS ? BackoffPolicy.noBackoff() @@ -1163,11 +1191,14 @@ public void getRegister(OperationPurpose purpose, String key, ActionListener Strings.format("[%s]: getRegister failed", key), attemptException); if (attemptException instanceof SdkServiceException sdkException && sdkException.statusCode() == 404) { - return OptionalBytesReference.EMPTY; + return RegisterAndEtag.ABSENT; } else if (finalException == null) { finalException = attemptException; } else if (finalException != attemptException) { @@ -1191,6 +1222,23 @@ public void getRegister(OperationPurpose purpose, String key, ActionListener getMultipartUploadCleanupListener(int maxUploads, RefCountingRunnable refs) { try (var clientReference = blobStore.clientReference()) { final var bucket = blobStore.bucket(); diff --git a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java index d07fa80551a24..a34bb21174288 100644 --- a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -93,6 +93,7 @@ import java.util.regex.Pattern; import static org.elasticsearch.cluster.node.DiscoveryNode.STATELESS_ENABLED_SETTING_NAME; +import static org.elasticsearch.common.bytes.BytesReferenceTestUtils.equalBytes; import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomNonDataPurpose; import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose; import static org.elasticsearch.repositories.s3.S3ClientSettings.DISABLE_CHUNKED_ENCODING; @@ -248,7 +249,7 @@ protected BlobContainer createBlobContainer( S3Repository.MAX_COPY_SIZE_BEFORE_MULTIPART.getDefault(Settings.EMPTY), S3Repository.CANNED_ACL_SETTING.getDefault(Settings.EMPTY), S3Repository.STORAGE_CLASS_SETTING.getDefault(Settings.EMPTY), - S3Repository.UNSAFELY_INCOMPATIBLE_WITH_S3_CONDITIONAL_WRITES.getDefault(Settings.EMPTY), + S3Repository.UNSAFELY_INCOMPATIBLE_WITH_S3_CONDITIONAL_WRITES.getDefault(Settings.EMPTY) == Boolean.FALSE, repositoryMetadata, BigArrays.NON_RECYCLING_INSTANCE, new DeterministicTaskQueue().getThreadPool(), @@ -1381,6 +1382,76 @@ public void handle(HttpExchange exchange) throws IOException { ); } + public void testCompareAndExchangeWithConcurrentPutObject() throws Exception { + final var blobContainerPath = BlobPath.EMPTY.add(getTestName()); + final var statefulBlobContainer = createBlobContainer(1, null, null, null, null, null, blobContainerPath); + + final var objectContentsRequestedLatch = new CountDownLatch(1); + + @SuppressForbidden(reason = "use a http server") + class AwaitsListMultipartUploads extends S3HttpHandler { + AwaitsListMultipartUploads() { + super("bucket"); + } + + @Override + public void handle(HttpExchange exchange) throws IOException { + if (parseRequest(exchange).isGetObjectRequest()) { + // delay the overwrite until the CAS is checking the object contents, forcing a race + objectContentsRequestedLatch.countDown(); + } + super.handle(exchange); + } + } + + httpServer.createContext("/", new AwaitsListMultipartUploads()); + + final var blobName = randomIdentifier(); + final var initialValue = randomBytesReference(8); + final var overwriteValue = randomValueOtherThan(initialValue, () -> randomBytesReference(8)); + final var casTargetValue = randomValueOtherThanMany( + v -> v.equals(initialValue) || v.equals(overwriteValue), + () -> randomBytesReference(8) + ); + + statefulBlobContainer.writeBlobAtomic(randomPurpose(), blobName, initialValue, randomBoolean()); + + runInParallel( + () -> safeAwait( + l -> statefulBlobContainer.compareAndExchangeRegister( + randomPurpose(), + blobName, + initialValue, + casTargetValue, + l.map(result -> { + // Really anything can happen here: success (sees initialValue) or failure (sees overwriteValue), or contention + if (result.isPresent()) { + assertThat( + result.toString(), + result.bytesReference(), + anyOf(equalBytes(initialValue), equalBytes(overwriteValue)) + ); + } + return null; + }) + ) + ), + () -> { + try { + safeAwait(objectContentsRequestedLatch); + statefulBlobContainer.writeBlobAtomic(randomPurpose(), blobName, overwriteValue, false); + } catch (IOException e) { + throw new AssertionError("writeBlobAtomic failed", e); + } + } + ); + + // If the CAS happens before the overwrite then we'll see the overwritten value, whereas if the CAS happens second then it will + // fail because the value was overwritten leaving the overwritten value in place. If, however, the CAS were not atomic with respect + // to other non-CAS-based writes, then we would see casTargetValue here: + assertThat(Streams.readFully(statefulBlobContainer.readBlob(randomPurpose(), blobName)), equalBytes(overwriteValue)); + } + @Override protected Matcher getMaxRetriesMatcher(int maxRetries) { // some attempts make meaningful progress and do not count towards the max retry limit diff --git a/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3ConsistencyModel.java b/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3ConsistencyModel.java new file mode 100644 index 0000000000000..2558bc24c0e3a --- /dev/null +++ b/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3ConsistencyModel.java @@ -0,0 +1,46 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package fixture.s3; + +/** + * AWS S3 has weaker consistency for its multipart upload APIs than initially claimed (see support cases 10837136441 and 176070774900712) + * but strong consistency of conditional writes based on the {@code If-Match} and {@code If-None-Match} headers. Other object storage + * suppliers have decided instead to implement strongly-consistent multipart upload APIs and ignore the conditional writes headers. We + * verify Elasticsearch's behaviour against both models. + */ +public enum S3ConsistencyModel { + /** + * The model implemented by AWS S3: multipart upload APIs are somewhat weak (e.g. aborts may return while the write operation is still + * in flight) but conditional writes work as expected. + */ + AWS_DEFAULT(true, false), + + /** + * The alternative model verified by these tests: the multipart upload APIs are strongly consistent, but the {@code If-Match} and + * {@code If-None-Match} headers are ignored and all writes are unconditional. + */ + STRONG_MPUS(false, true); + + private final boolean conditionalWrites; + private final boolean strongMultipartUploads; + + S3ConsistencyModel(boolean conditionalWrites, boolean strongMultipartUploads) { + this.conditionalWrites = conditionalWrites; + this.strongMultipartUploads = strongMultipartUploads; + } + + public boolean hasStrongMultipartUploads() { + return strongMultipartUploads; + } + + public boolean hasConditionalWrites() { + return conditionalWrites; + } +} diff --git a/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpFixture.java b/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpFixture.java index 2174afbe82c92..455ea93e18437 100644 --- a/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpFixture.java +++ b/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpFixture.java @@ -70,6 +70,11 @@ public void handle(final HttpExchange exchange) throws IOException { throw e; } } + + @Override + protected S3ConsistencyModel consistencyModel() { + return S3HttpFixture.this.consistencyModel(); + } }; } @@ -109,4 +114,8 @@ protected void after() { ThreadPool.terminate(executorService, 10, TimeUnit.SECONDS); } } + + protected S3ConsistencyModel consistencyModel() { + return S3ConsistencyModel.AWS_DEFAULT; + } } diff --git a/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java b/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java index eeb35ed637c5d..873ead7dbdf7c 100644 --- a/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java +++ b/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java @@ -20,6 +20,7 @@ import org.elasticsearch.common.hash.MessageDigests; import org.elasticsearch.common.io.Streams; import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Releasable; import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.core.Tuple; import org.elasticsearch.core.XmlUtils; @@ -45,6 +46,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -70,6 +72,7 @@ public class S3HttpHandler implements HttpHandler { private final ConcurrentMap blobs = new ConcurrentHashMap<>(); private final ConcurrentMap uploads = new ConcurrentHashMap<>(); + private final ConcurrentMap completingUploads = new ConcurrentHashMap<>(); public S3HttpHandler(final String bucket) { this(bucket, null); @@ -191,43 +194,46 @@ public void handle(final HttpExchange exchange) throws IOException { } } else if (request.isCompleteMultipartUploadRequest()) { + final var uploadId = request.getQueryParamOnce("uploadId"); final byte[] responseBody; final RestStatus responseCode; - synchronized (uploads) { - final var upload = getUpload(request.getQueryParamOnce("uploadId")); - if (upload == null) { - if (Randomness.get().nextBoolean()) { - responseCode = RestStatus.NOT_FOUND; - responseBody = null; + try (var ignoredCompletingUploadRef = setUploadCompleting(uploadId)) { + synchronized (uploads) { + final var upload = getUpload(request.getQueryParamOnce("uploadId")); + if (upload == null) { + if (Randomness.get().nextBoolean()) { + responseCode = RestStatus.NOT_FOUND; + responseBody = null; + } else { + responseCode = RestStatus.OK; + responseBody = """ + + + NoSuchUpload + No such upload + test-request-id + test-host-id + """.getBytes(StandardCharsets.UTF_8); + } } else { - responseCode = RestStatus.OK; - responseBody = """ - - - NoSuchUpload - No such upload - test-request-id - test-host-id - """.getBytes(StandardCharsets.UTF_8); - } - } else { - final var blobContents = upload.complete(extractPartEtags(Streams.readFully(exchange.getRequestBody()))); - responseCode = updateBlobContents(exchange, request.path(), blobContents); - if (responseCode == RestStatus.OK) { - responseBody = ("\n" - + "\n" - + "" - + bucket - + "\n" - + "" - + request.path() - + "\n" - + "").getBytes(StandardCharsets.UTF_8); - } else { - responseBody = null; - } - if (responseCode != RestStatus.PRECONDITION_FAILED) { - removeUpload(upload.getUploadId()); + final var blobContents = upload.complete(extractPartEtags(Streams.readFully(exchange.getRequestBody()))); + responseCode = updateBlobContents(exchange, request.path(), blobContents); + if (responseCode == RestStatus.OK) { + responseBody = ("\n" + + "\n" + + "" + + bucket + + "\n" + + "" + + request.path() + + "\n" + + "").getBytes(StandardCharsets.UTF_8); + } else { + responseBody = null; + } + if (responseCode != RestStatus.PRECONDITION_FAILED) { + removeUpload(upload.getUploadId()); + } } } } @@ -238,9 +244,16 @@ public void handle(final HttpExchange exchange) throws IOException { } else { exchange.sendResponseHeaders(responseCode.getStatus(), -1); } + } else if (request.isAbortMultipartUploadRequest()) { - final var upload = removeUpload(request.getQueryParamOnce("uploadId")); - exchange.sendResponseHeaders((upload == null ? RestStatus.NOT_FOUND : RestStatus.NO_CONTENT).getStatus(), -1); + final var uploadId = request.getQueryParamOnce("uploadId"); + if (consistencyModel().hasStrongMultipartUploads() == false && completingUploads.containsKey(uploadId)) { + // See AWS support case 176070774900712: aborts may sometimes return early if complete is already in progress + exchange.sendResponseHeaders(RestStatus.NO_CONTENT.getStatus(), -1); + } else { + final var upload = removeUpload(request.getQueryParamOnce("uploadId")); + exchange.sendResponseHeaders((upload == null ? RestStatus.NOT_FOUND : RestStatus.NO_CONTENT).getStatus(), -1); + } } else if (request.isPutObjectRequest()) { // a copy request is a put request with an X-amz-copy-source header @@ -417,29 +430,31 @@ public void handle(final HttpExchange exchange) throws IOException { * @see AWS docs */ private RestStatus updateBlobContents(HttpExchange exchange, String path, BytesReference newContents) { - if (isProtectOverwrite(exchange)) { - return blobs.putIfAbsent(path, newContents) == null - ? RestStatus.OK - : ESTestCase.randomFrom(RestStatus.PRECONDITION_FAILED, RestStatus.CONFLICT); - } - - final var requireExistingETag = getRequiredExistingETag(exchange); - if (requireExistingETag != null) { - final var responseCode = new AtomicReference<>(RestStatus.OK); - blobs.compute(path, (ignoredPath, existingContents) -> { - if (existingContents != null && requireExistingETag.equals(getEtagFromContents(existingContents))) { - return newContents; - } + if (consistencyModel().hasConditionalWrites()) { + if (isProtectOverwrite(exchange)) { + return blobs.putIfAbsent(path, newContents) == null + ? RestStatus.OK + : ESTestCase.randomFrom(RestStatus.PRECONDITION_FAILED, RestStatus.CONFLICT); + } - responseCode.set( - ESTestCase.randomFrom( - existingContents == null ? RestStatus.NOT_FOUND : RestStatus.PRECONDITION_FAILED, - RestStatus.CONFLICT - ) - ); - return existingContents; - }); - return responseCode.get(); + final var requireExistingETag = getRequiredExistingETag(exchange); + if (requireExistingETag != null) { + final var responseCode = new AtomicReference<>(RestStatus.OK); + blobs.compute(path, (ignoredPath, existingContents) -> { + if (existingContents != null && requireExistingETag.equals(getEtagFromContents(existingContents))) { + return newContents; + } + + responseCode.set( + ESTestCase.randomFrom( + existingContents == null ? RestStatus.NOT_FOUND : RestStatus.PRECONDITION_FAILED, + RestStatus.CONFLICT + ) + ); + return existingContents; + }); + return responseCode.get(); + } } blobs.put(path, newContents); @@ -625,6 +640,7 @@ private static boolean isProtectOverwrite(final HttpExchange exchange) { if (exchange.getRequestHeaders().get("If-Match") != null) { throw new AssertionError("Handling both If-None-Match and If-Match headers is not supported"); } + if (ifNoneMatch.size() != 1) { throw new AssertionError("multiple If-None-Match headers found: " + ifNoneMatch); } @@ -679,6 +695,28 @@ MultipartUpload removeUpload(String uploadId) { } } + private Releasable setUploadCompleting(String uploadId) { + completingUploads.computeIfAbsent(uploadId, ignored -> new AtomicInteger()).incrementAndGet(); + return () -> clearUploadCompleting(uploadId); + } + + private void clearUploadCompleting(String uploadId) { + completingUploads.compute(uploadId, (ignored, uploadCount) -> { + if (uploadCount == null) { + throw new AssertionError("upload [" + uploadId + "] not tracked"); + } + if (uploadCount.decrementAndGet() == 0) { + return null; + } else { + return uploadCount; + } + }); + } + + protected S3ConsistencyModel consistencyModel() { + return S3ConsistencyModel.AWS_DEFAULT; + } + public S3Request parseRequest(HttpExchange exchange) { final String queryString = exchange.getRequestURI().getQuery(); final Map> queryParameters; diff --git a/test/framework/src/main/java/org/elasticsearch/common/bytes/BytesReferenceTestUtils.java b/test/framework/src/main/java/org/elasticsearch/common/bytes/BytesReferenceTestUtils.java new file mode 100644 index 0000000000000..da4e8cf2b7268 --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/common/bytes/BytesReferenceTestUtils.java @@ -0,0 +1,73 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.common.bytes; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.Strings; +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.Matchers; + +import java.io.IOException; +import java.util.Objects; + +public enum BytesReferenceTestUtils { + ; + + /** + * Like {@link Matchers#equalTo} except it reports the contents of the respective {@link BytesReference} instances on mismatch. + */ + public static Matcher equalBytes(BytesReference expected) { + return new BaseMatcher<>() { + @Override + public boolean matches(Object actual) { + return Objects.equals(expected, actual); + } + + @Override + public void describeTo(Description description) { + appendBytesReferenceDescription(expected, description); + } + + private void appendBytesReferenceDescription(BytesReference bytesReference, Description description) { + final var stringBuilder = new StringBuilder("BytesReference["); + final var iterator = bytesReference.iterator(); + BytesRef bytesRef; + boolean first = true; + try { + while ((bytesRef = iterator.next()) != null) { + for (int i = 0; i < bytesRef.length; i++) { + if (first) { + first = false; + } else { + stringBuilder.append(' '); + } + stringBuilder.append(Strings.format("%02x", bytesRef.bytes[bytesRef.offset + i])); + } + } + description.appendText(stringBuilder.append(']').toString()); + } catch (IOException e) { + throw new AssertionError("no IO happens here", e); + } + } + + @Override + public void describeMismatch(Object item, Description description) { + description.appendText("was "); + if (item instanceof BytesReference bytesReference) { + appendBytesReferenceDescription(bytesReference, description); + } else { + description.appendValue(item); + } + } + }; + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java index 2b3513f764065..06140c1f8141e 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java @@ -2925,6 +2925,10 @@ public static void runInParallel(int numberOfTasks, IntConsumer taskFactory) { } } + public static void runInParallel(Runnable... tasks) { + runInParallel(tasks.length, i -> tasks[i].run()); + } + public static void ensureAllContextsReleased(SearchService searchService) { try { assertBusy(() -> { diff --git a/x-pack/plugin/snapshot-repo-test-kit/qa/s3/src/javaRestTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/AbstractS3RepositoryAnalysisRestTestCase.java b/x-pack/plugin/snapshot-repo-test-kit/qa/s3/src/javaRestTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/AbstractS3RepositoryAnalysisRestTestCase.java new file mode 100644 index 0000000000000..e9d962a947da1 --- /dev/null +++ b/x-pack/plugin/snapshot-repo-test-kit/qa/s3/src/javaRestTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/AbstractS3RepositoryAnalysisRestTestCase.java @@ -0,0 +1,124 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.repositories.blobstore.testkit.analyze; + +import fixture.aws.DynamicRegionSupplier; +import fixture.s3.S3HttpFixture; +import fixture.s3.S3HttpHandler; + +import com.sun.net.httpserver.HttpHandler; + +import org.elasticsearch.common.regex.Regex; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.elasticsearch.test.cluster.local.distribution.DistributionType; + +import java.util.function.Supplier; + +import static fixture.aws.AwsCredentialsUtils.fixedAccessKey; +import static org.hamcrest.Matchers.blankOrNullString; +import static org.hamcrest.Matchers.not; + +public abstract class AbstractS3RepositoryAnalysisRestTestCase extends AbstractRepositoryAnalysisRestTestCase { + protected static final Supplier regionSupplier = new DynamicRegionSupplier(); + + protected static class RepositoryAnalysisHttpFixture extends S3HttpFixture { + RepositoryAnalysisHttpFixture(boolean enabled) { + super(enabled, "bucket", "base_path_integration_tests", fixedAccessKey("s3_test_access_key", regionSupplier, "s3")); + } + + private volatile boolean repoAnalysisStarted; + + @Override + protected HttpHandler createHandler() { + final var delegateHandler = asInstanceOf(S3HttpHandler.class, super.createHandler()); + return exchange -> { + ensurePurposeParameterPresent(delegateHandler.parseRequest(exchange)); + delegateHandler.handle(exchange); + }; + } + + private void ensurePurposeParameterPresent(S3HttpHandler.S3Request request) { + if (request.path().startsWith("/bucket/base_path_integration_tests/temp-analysis-")) { + repoAnalysisStarted = true; + } + if (repoAnalysisStarted == false) { + if (Regex.simpleMatch("/bucket/base_path_integration_tests/tests-*/master.dat", request.path()) + || Regex.simpleMatch("/bucket/base_path_integration_tests/tests-*/data-*.dat", request.path()) + || (request.isListObjectsRequest() + && request.getQueryParamOnce("prefix").startsWith("base_path_integration_tests/tests-")) + || (request.isMultiObjectDeleteRequest())) { + // verify repository is not part of repo analysis so will have different/missing x-purpose parameter + return; + } + if (request.isListObjectsRequest() && request.getQueryParamOnce("prefix").equals("base_path_integration_tests/index-")) { + // getRepositoryData looking for root index-N blob will have different/missing x-purpose parameter + return; + } + repoAnalysisStarted = true; + } + assertTrue(request.toString(), request.hasQueryParamOnce("x-purpose")); + assertEquals(request.toString(), "RepositoryAnalysis", request.getQueryParamOnce("x-purpose")); + } + } + + protected static final String CLIENT_NAME = "repo_test_kit"; + + protected static ElasticsearchCluster buildCluster(S3HttpFixture s3HttpFixture, boolean enabled) { + final var clientPrefix = "s3.client." + CLIENT_NAME + "."; + return ElasticsearchCluster.local() + .distribution(DistributionType.DEFAULT) + .keystore(clientPrefix + "access_key", System.getProperty("s3AccessKey")) + .keystore(clientPrefix + "secret_key", System.getProperty("s3SecretKey")) + .setting(clientPrefix + "protocol", () -> "http", (n) -> enabled) + .setting(clientPrefix + "region", regionSupplier, (n) -> enabled) + .setting(clientPrefix + "add_purpose_custom_query_parameter", () -> randomFrom("true", "false"), n -> randomBoolean()) + .setting(clientPrefix + "endpoint", s3HttpFixture::getAddress, (n) -> enabled) + .setting( + "repository_s3.compare_and_exchange.anti_contention_delay", + () -> randomFrom("1s" /* == default */, "1ms"), + n -> randomBoolean() + ) + .setting("xpack.security.enabled", "false") + .setting("thread_pool.snapshot.max", "10") + .build(); + } + + @Override + protected Settings repositorySettings() { + final String bucket = System.getProperty("test.s3.bucket"); + assertThat(bucket, not(blankOrNullString())); + + final String basePath = System.getProperty("test.s3.base_path"); + assertThat(basePath, not(blankOrNullString())); + + return Settings.builder() + .put("client", CLIENT_NAME) + .put("bucket", bucket) + .put("base_path", basePath) + .put("delete_objects_max_size", between(1, 1000)) + .put("buffer_size", ByteSizeValue.ofMb(5)) // so some uploads are multipart ones + .put("max_copy_size_before_multipart", ByteSizeValue.ofMb(5)) + // verify we always set the x-purpose header even if disabled for other repository operations + .put(randomBooleanSetting("add_purpose_custom_query_parameter")) + // this parameter is ignored for repo analysis + .put(randomBooleanSetting("unsafely_incompatible_with_s3_conditional_writes")) + .build(); + } + + private static Settings randomBooleanSetting(String settingKey) { + return randomFrom(Settings.EMPTY, Settings.builder().put(settingKey, randomBoolean()).build()); + } + + @Override + protected String repositoryType() { + return "s3"; + } + +} diff --git a/x-pack/plugin/snapshot-repo-test-kit/qa/s3/src/javaRestTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/S3RepositoryAnalysisRestIT.java b/x-pack/plugin/snapshot-repo-test-kit/qa/s3/src/javaRestTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/S3RepositoryAnalysisRestIT.java index d85479ee5be46..ad9894c07892d 100644 --- a/x-pack/plugin/snapshot-repo-test-kit/qa/s3/src/javaRestTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/S3RepositoryAnalysisRestIT.java +++ b/x-pack/plugin/snapshot-repo-test-kit/qa/s3/src/javaRestTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/S3RepositoryAnalysisRestIT.java @@ -6,84 +6,21 @@ */ package org.elasticsearch.repositories.blobstore.testkit.analyze; -import fixture.aws.DynamicRegionSupplier; import fixture.s3.S3HttpFixture; -import fixture.s3.S3HttpHandler; -import com.sun.net.httpserver.HttpHandler; - -import org.elasticsearch.common.regex.Regex; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.core.Booleans; import org.elasticsearch.test.cluster.ElasticsearchCluster; -import org.elasticsearch.test.cluster.local.distribution.DistributionType; import org.junit.ClassRule; import org.junit.rules.RuleChain; import org.junit.rules.TestRule; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Supplier; - -import static fixture.aws.AwsCredentialsUtils.fixedAccessKey; -import static org.hamcrest.Matchers.blankOrNullString; -import static org.hamcrest.Matchers.not; - -public class S3RepositoryAnalysisRestIT extends AbstractRepositoryAnalysisRestTestCase { +public class S3RepositoryAnalysisRestIT extends AbstractS3RepositoryAnalysisRestTestCase { static final boolean USE_FIXTURE = Booleans.parseBoolean(System.getProperty("tests.use.fixture", "true")); - private static final Supplier regionSupplier = new DynamicRegionSupplier(); - public static final S3HttpFixture s3Fixture = new S3HttpFixture( - USE_FIXTURE, - "bucket", - "base_path_integration_tests", - fixedAccessKey("s3_test_access_key", regionSupplier, "s3") - ) { - @Override - protected HttpHandler createHandler() { - final var delegateHandler = asInstanceOf(S3HttpHandler.class, super.createHandler()); - final var repoAnalysisStarted = new AtomicBoolean(); - return exchange -> { - ensurePurposeParameterPresent(delegateHandler.parseRequest(exchange), repoAnalysisStarted); - delegateHandler.handle(exchange); - }; - } - }; - - private static void ensurePurposeParameterPresent(S3HttpHandler.S3Request request, AtomicBoolean repoAnalysisStarted) { - if (request.path().startsWith("/bucket/base_path_integration_tests/temp-analysis-")) { - repoAnalysisStarted.set(true); - } - if (repoAnalysisStarted.get() == false) { - if (Regex.simpleMatch("/bucket/base_path_integration_tests/tests-*/master.dat", request.path()) - || Regex.simpleMatch("/bucket/base_path_integration_tests/tests-*/data-*.dat", request.path()) - || (request.isListObjectsRequest() && request.getQueryParamOnce("prefix").startsWith("base_path_integration_tests/tests-")) - || (request.isMultiObjectDeleteRequest())) { - // verify repository is not part of repo analysis so will have different/missing x-purpose parameter - return; - } - if (request.isListObjectsRequest() && request.getQueryParamOnce("prefix").equals("base_path_integration_tests/index-")) { - // getRepositoryData looking for root index-N blob will have different/missing x-purpose parameter - return; - } - repoAnalysisStarted.set(true); - } - assertTrue(request.toString(), request.hasQueryParamOnce("x-purpose")); - assertEquals(request.toString(), "RepositoryAnalysis", request.getQueryParamOnce("x-purpose")); - } + public static final S3HttpFixture s3Fixture = new RepositoryAnalysisHttpFixture(USE_FIXTURE); - public static ElasticsearchCluster cluster = ElasticsearchCluster.local() - .distribution(DistributionType.DEFAULT) - .keystore("s3.client.repo_test_kit.access_key", System.getProperty("s3AccessKey")) - .keystore("s3.client.repo_test_kit.secret_key", System.getProperty("s3SecretKey")) - .setting("s3.client.repo_test_kit.protocol", () -> "http", (n) -> USE_FIXTURE) - .setting("s3.client.repo_test_kit.endpoint", s3Fixture::getAddress, (n) -> USE_FIXTURE) - .setting("s3.client.repo_test_kit.region", regionSupplier, (n) -> USE_FIXTURE) - .setting("s3.client.repo-test_kit.add_purpose_custom_query_parameter", () -> randomFrom("true", "false"), n -> randomBoolean()) - .setting("xpack.security.enabled", "false") - .setting("thread_pool.snapshot.max", "10") - .build(); + public static final ElasticsearchCluster cluster = buildCluster(s3Fixture, USE_FIXTURE); @ClassRule public static TestRule ruleChain = RuleChain.outerRule(s3Fixture).around(cluster); @@ -92,35 +29,4 @@ private static void ensurePurposeParameterPresent(S3HttpHandler.S3Request reques protected String getTestRestCluster() { return cluster.getHttpAddresses(); } - - @Override - protected String repositoryType() { - return "s3"; - } - - @Override - protected Settings repositorySettings() { - final String bucket = System.getProperty("test.s3.bucket"); - assertThat(bucket, not(blankOrNullString())); - - final String basePath = System.getProperty("test.s3.base_path"); - assertThat(basePath, not(blankOrNullString())); - - return Settings.builder() - .put("client", "repo_test_kit") - .put("bucket", bucket) - .put("base_path", basePath) - .put("delete_objects_max_size", between(1, 1000)) - .put("buffer_size", ByteSizeValue.ofMb(5)) // so some uploads are multipart ones - .put("max_copy_size_before_multipart", ByteSizeValue.ofMb(5)) - // verify we always set the x-purpose header even if disabled for other repository operations - .put(randomBooleanSetting("add_purpose_custom_query_parameter")) - // this parameter is ignored for repo analysis - .put(randomBooleanSetting("unsafely_incompatible_with_s3_conditional_writes")) - .build(); - } - - private Settings randomBooleanSetting(String settingKey) { - return randomFrom(Settings.EMPTY, Settings.builder().put(settingKey, randomBoolean()).build()); - } } diff --git a/x-pack/plugin/snapshot-repo-test-kit/qa/s3/src/javaRestTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/S3RepositoryAnalysisStrongMpusRestIT.java b/x-pack/plugin/snapshot-repo-test-kit/qa/s3/src/javaRestTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/S3RepositoryAnalysisStrongMpusRestIT.java new file mode 100644 index 0000000000000..e9c38b9468983 --- /dev/null +++ b/x-pack/plugin/snapshot-repo-test-kit/qa/s3/src/javaRestTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/S3RepositoryAnalysisStrongMpusRestIT.java @@ -0,0 +1,37 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.repositories.blobstore.testkit.analyze; + +import fixture.s3.S3ConsistencyModel; +import fixture.s3.S3HttpFixture; + +import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.junit.ClassRule; +import org.junit.rules.RuleChain; +import org.junit.rules.TestRule; + +public class S3RepositoryAnalysisStrongMpusRestIT extends AbstractS3RepositoryAnalysisRestTestCase { + + public static final S3HttpFixture s3Fixture = new RepositoryAnalysisHttpFixture(true) { + @Override + protected S3ConsistencyModel consistencyModel() { + return S3ConsistencyModel.STRONG_MPUS; + } + }; + + public static final ElasticsearchCluster cluster = buildCluster(s3Fixture, true); + + @ClassRule + public static TestRule ruleChain = RuleChain.outerRule(s3Fixture).around(cluster); + + @Override + protected String getTestRestCluster() { + return cluster.getHttpAddresses(); + } + +} From 937263147cc08de76a631176e3949548409853ff Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 26 Nov 2025 12:21:59 +0000 Subject: [PATCH 2/5] Update docs/changelog/138663.yaml --- docs/changelog/138663.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/138663.yaml diff --git a/docs/changelog/138663.yaml b/docs/changelog/138663.yaml new file mode 100644 index 0000000000000..225d52420e4b7 --- /dev/null +++ b/docs/changelog/138663.yaml @@ -0,0 +1,5 @@ +pr: 138663 +summary: Support weaker consistency model for S3 MPUs +area: Snapshot/Restore +type: bug +issues: [] From a62babd0abfe73342df6c4abe8c656badde94c98 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 26 Nov 2025 12:23:31 +0000 Subject: [PATCH 3/5] Adjust known issues docs --- docs/release-notes/known-issues.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/release-notes/known-issues.md b/docs/release-notes/known-issues.md index 871b1197d4537..83441c4c66d47 100644 --- a/docs/release-notes/known-issues.md +++ b/docs/release-notes/known-issues.md @@ -89,4 +89,4 @@ This issue will be fixed in a future patch release (see [PR #126990](https://git * switching the order of the grouping keys (eg. `STATS ... BY keyword2, keyword1`, if the `keyword2` has a lower cardinality) * reducing the grouping key cardinality, by filtering out values before STATS -* Repository analyses of snapshot repositories based on AWS S3 include some checks that the APIs which relate to multipart uploads have linearizable (strongly-consistent) semantics, based on guarantees offered by representatives from AWS on this subject. Further investigation has determined that these guarantees do not hold under all conditions as previously claimed. If you are analyzing a snapshot repository based on AWS S3 using an affected version of {{es}} and you encounter a failure related to linearizable register operations, you may work around the issue and suppress these checks by setting the query parameter `?register_operation_count=1` and running the analysis using a one-node cluster. This issue currently affects all supported versions of {{es}}. The plan to address it is described in [#137197](https://github.com/elastic/elasticsearch/issues/137197). +* Repository analyses of snapshot repositories based on AWS S3 include some checks that the APIs which relate to multipart uploads have linearizable (strongly-consistent) semantics, based on guarantees offered by representatives from AWS on this subject. Further investigation has determined that these guarantees do not hold under all conditions as previously claimed. If you are analyzing a snapshot repository based on AWS S3 using an affected version of {{es}} and you encounter a failure related to linearizable register operations, you may work around the issue and suppress these checks by setting the query parameter `?register_operation_count=1` and running the analysis using a one-node cluster. This issue is fixed in {{es}} version 9.3.0 by [#138663](https://github.com/elastic/elasticsearch/pull/138663). From 5b93838bd52606caf67fb4c741ddb849bcda1d67 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 26 Nov 2025 13:05:24 +0000 Subject: [PATCH 4/5] Clarify "affected" --- docs/release-notes/known-issues.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/release-notes/known-issues.md b/docs/release-notes/known-issues.md index 83441c4c66d47..f450661122bfb 100644 --- a/docs/release-notes/known-issues.md +++ b/docs/release-notes/known-issues.md @@ -89,4 +89,4 @@ This issue will be fixed in a future patch release (see [PR #126990](https://git * switching the order of the grouping keys (eg. `STATS ... BY keyword2, keyword1`, if the `keyword2` has a lower cardinality) * reducing the grouping key cardinality, by filtering out values before STATS -* Repository analyses of snapshot repositories based on AWS S3 include some checks that the APIs which relate to multipart uploads have linearizable (strongly-consistent) semantics, based on guarantees offered by representatives from AWS on this subject. Further investigation has determined that these guarantees do not hold under all conditions as previously claimed. If you are analyzing a snapshot repository based on AWS S3 using an affected version of {{es}} and you encounter a failure related to linearizable register operations, you may work around the issue and suppress these checks by setting the query parameter `?register_operation_count=1` and running the analysis using a one-node cluster. This issue is fixed in {{es}} version 9.3.0 by [#138663](https://github.com/elastic/elasticsearch/pull/138663). +* Repository analyses of snapshot repositories based on AWS S3 include some checks that the APIs which relate to multipart uploads have linearizable (strongly-consistent) semantics, based on guarantees offered by representatives from AWS on this subject. Further investigation has determined that these guarantees do not hold under all conditions as previously claimed. If you are analyzing a snapshot repository based on AWS S3 using a version of {{es}} prior to 9.3.0 and you encounter a failure related to linearizable register operations, you may work around the issue and suppress these checks by setting the query parameter `?register_operation_count=1` and running the analysis using a one-node cluster. This issue is fixed in {{es}} version 9.3.0 by [#138663](https://github.com/elastic/elasticsearch/pull/138663). From 1fc58058ff859765f7833c99b369c0a9778d7185 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 27 Nov 2025 13:02:59 +0000 Subject: [PATCH 5/5] Add javadoc --- .../src/main/java/org/elasticsearch/test/ESTestCase.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java index 06140c1f8141e..c31da20973bf2 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java @@ -2925,6 +2925,10 @@ public static void runInParallel(int numberOfTasks, IntConsumer taskFactory) { } } + /** + * Run the given tasks in parallel. One of the tasks will be run on the calling thread, and each of the others will run on its own + * fresh thread. + */ public static void runInParallel(Runnable... tasks) { runInParallel(tasks.length, i -> tasks[i].run()); }