Skip to content
5 changes: 5 additions & 0 deletions docs/changelog/138663.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 138663
summary: Support weaker consistency model for S3 MPUs
area: Snapshot/Restore
type: bug
issues: []
2 changes: 1 addition & 1 deletion docs/release-notes/known-issues.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,4 @@ This issue will be fixed in a future patch release (see [PR #126990](https://git
* switching the order of the grouping keys (eg. `STATS ... BY keyword2, keyword1`, if the `keyword2` has a lower cardinality)
* reducing the grouping key cardinality, by filtering out values before STATS

* Repository analyses of snapshot repositories based on AWS S3 include some checks that the APIs which relate to multipart uploads have linearizable (strongly-consistent) semantics, based on guarantees offered by representatives from AWS on this subject. Further investigation has determined that these guarantees do not hold under all conditions as previously claimed. If you are analyzing a snapshot repository based on AWS S3 using an affected version of {{es}} and you encounter a failure related to linearizable register operations, you may work around the issue and suppress these checks by setting the query parameter `?register_operation_count=1` and running the analysis using a one-node cluster. This issue currently affects all supported versions of {{es}}. The plan to address it is described in [#137197](https://github.com/elastic/elasticsearch/issues/137197).
* Repository analyses of snapshot repositories based on AWS S3 include some checks that the APIs which relate to multipart uploads have linearizable (strongly-consistent) semantics, based on guarantees offered by representatives from AWS on this subject. Further investigation has determined that these guarantees do not hold under all conditions as previously claimed. If you are analyzing a snapshot repository based on AWS S3 using a version of {{es}} prior to 9.3.0 and you encounter a failure related to linearizable register operations, you may work around the issue and suppress these checks by setting the query parameter `?register_operation_count=1` and running the analysis using a one-node cluster. This issue is fixed in {{es}} version 9.3.0 by [#138663](https://github.com/elastic/elasticsearch/pull/138663).
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.ListMultipartUploadsRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
Expand Down Expand Up @@ -56,6 +57,7 @@
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.common.blobstore.support.BlobContainerUtils;
import org.elasticsearch.common.blobstore.support.BlobMetadata;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.unit.ByteSizeUnit;
Expand Down Expand Up @@ -832,11 +834,14 @@ void run(BytesReference expected, BytesReference updated, ActionListener<Optiona
logger.trace(() -> Strings.format("[%s]: compareAndExchangeRegister failed", rawKey), e);
if ((e instanceof AwsServiceException awsServiceException)
&& (awsServiceException.statusCode() == RestStatus.NOT_FOUND.getStatus()
|| awsServiceException.statusCode() == RestStatus.CONFLICT.getStatus()
|| awsServiceException.statusCode() == RestStatus.PRECONDITION_FAILED.getStatus()
|| awsServiceException.statusCode() == RestStatus.OK.getStatus()
&& "NoSuchUpload".equals(awsServiceException.awsErrorDetails().errorCode()))) {
// An uncaught 404 means that our multipart upload was aborted by a concurrent operation before we could complete it.
// Also (rarely) S3 can start processing the request during a concurrent abort and this can result in a 200 OK with an
// <Error><Code>NoSuchUpload</Code>... in the response. Either way, this means that our write encountered contention:
// <Error><Code>NoSuchUpload</Code>... in the response. Either way, this means that our write encountered contention.
// Also if something else changed the blob out from under us then the If-Match check results in a 409 or a 412.
delegate.onResponse(OptionalBytesReference.MISSING);
} else {
delegate.onFailure(e);
Expand Down Expand Up @@ -891,20 +896,20 @@ void innerRun(BytesReference expected, BytesReference updated, ActionListener<Op
// cannot have observed a stale value, whereas if our operation ultimately fails then it doesn't matter what this read
// observes.

.<OptionalBytesReference>andThen(l -> getRegister(purpose, rawKey, l))
.<RegisterAndEtag>andThen(l -> getRegisterAndEtag(purpose, rawKey, l))

// Step 5: Perform the compare-and-swap by completing our upload iff the witnessed value matches the expected value.

.andThenApply(currentValue -> {
if (currentValue.isPresent() && currentValue.bytesReference().equals(expected)) {
.andThenApply(currentValueAndEtag -> {
if (currentValueAndEtag.registerContents().equals(expected)) {
logger.trace("[{}] completing upload [{}]", blobKey, uploadId);
completeMultipartUpload(uploadId, partETag);
completeMultipartUpload(uploadId, partETag, currentValueAndEtag.eTag());
} else {
// Best-effort attempt to clean up after ourselves.
logger.trace("[{}] aborting upload [{}]", blobKey, uploadId);
safeAbortMultipartUpload(uploadId);
}
return currentValue;
return OptionalBytesReference.of(currentValueAndEtag.registerContents());
})

// Step 6: Complete the listener.
Expand Down Expand Up @@ -1111,7 +1116,7 @@ private void abortMultipartUploadIfExists(String uploadId) {
}
}

private void completeMultipartUpload(String uploadId, String partETag) {
private void completeMultipartUpload(String uploadId, String partETag, String existingEtag) {
final var completeMultipartUploadRequestBuilder = CompleteMultipartUploadRequest.builder()
.bucket(bucket)
.key(blobKey)
Expand All @@ -1123,6 +1128,14 @@ private void completeMultipartUpload(String uploadId, String partETag) {
Operation.PUT_MULTIPART_OBJECT,
purpose
);
if (blobStore.supportsConditionalWrites(purpose)) {
if (existingEtag == null) {
completeMultipartUploadRequestBuilder.ifNoneMatch("*");
} else {
completeMultipartUploadRequestBuilder.ifMatch(existingEtag);
}
}

final var completeMultipartUploadRequest = completeMultipartUploadRequestBuilder.build();
client.completeMultipartUpload(completeMultipartUploadRequest);
}
Expand All @@ -1146,8 +1159,23 @@ public void compareAndExchangeRegister(
).run(expected, updated, ActionListener.releaseBefore(clientReference, listener));
}

/**
* @param registerContents Contents of the register blob; {@link BytesArray#EMPTY} if the blob is absent.
* @param eTag Etag of the register blob; {@code null} if and only if the blob is absent.
*/
private record RegisterAndEtag(BytesReference registerContents, String eTag) {
/**
* Sentinel value to indicate that the register blob is absent.
*/
static RegisterAndEtag ABSENT = new RegisterAndEtag(BytesArray.EMPTY, null);
}

@Override
public void getRegister(OperationPurpose purpose, String key, ActionListener<OptionalBytesReference> listener) {
getRegisterAndEtag(purpose, key, listener.map(registerAndEtag -> OptionalBytesReference.of(registerAndEtag.registerContents())));
}

void getRegisterAndEtag(OperationPurpose purpose, String key, ActionListener<RegisterAndEtag> listener) {
ActionListener.completeWith(listener, () -> {
final var backoffPolicy = purpose == OperationPurpose.REPOSITORY_ANALYSIS
? BackoffPolicy.noBackoff()
Expand All @@ -1163,11 +1191,14 @@ public void getRegister(OperationPurpose purpose, String key, ActionListener<Opt
var clientReference = blobStore.clientReference();
var s3Object = clientReference.client().getObject(getObjectRequest);
) {
return OptionalBytesReference.of(getRegisterUsingConsistentRead(s3Object, keyPath, key));
return new RegisterAndEtag(
getRegisterUsingConsistentRead(s3Object, keyPath, key),
getRequiredEtag(purpose, s3Object.response())
);
} catch (Exception attemptException) {
logger.trace(() -> Strings.format("[%s]: getRegister failed", key), attemptException);
if (attemptException instanceof SdkServiceException sdkException && sdkException.statusCode() == 404) {
return OptionalBytesReference.EMPTY;
return RegisterAndEtag.ABSENT;
} else if (finalException == null) {
finalException = attemptException;
} else if (finalException != attemptException) {
Expand All @@ -1191,6 +1222,23 @@ public void getRegister(OperationPurpose purpose, String key, ActionListener<Opt
});
}

/**
* @return the {@code ETag} header from a {@link GetObjectResponse}, failing with an exception if it is omitted (unless not required
* for the given {@link OperationPurpose}).
*/
private String getRequiredEtag(OperationPurpose purpose, GetObjectResponse getObjectResponse) {
final var etag = getObjectResponse.eTag();
if (Strings.hasText(etag)) {
return etag;
} else if (blobStore.supportsConditionalWrites(purpose)) {
throw new UnsupportedOperationException("GetObject response contained no ETag header, cannot perform conditional write");
} else {
// blob stores which do not support conditional writes may also not return ETag headers, but we won't use it anyway so return
// a non-null dummy value
return "es-missing-but-ignored-etag";
}
}

ActionListener<Void> getMultipartUploadCleanupListener(int maxUploads, RefCountingRunnable refs) {
try (var clientReference = blobStore.clientReference()) {
final var bucket = blobStore.bucket();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
import java.util.regex.Pattern;

import static org.elasticsearch.cluster.node.DiscoveryNode.STATELESS_ENABLED_SETTING_NAME;
import static org.elasticsearch.common.bytes.BytesReferenceTestUtils.equalBytes;
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomNonDataPurpose;
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
import static org.elasticsearch.repositories.s3.S3ClientSettings.DISABLE_CHUNKED_ENCODING;
Expand Down Expand Up @@ -248,7 +249,7 @@ protected BlobContainer createBlobContainer(
S3Repository.MAX_COPY_SIZE_BEFORE_MULTIPART.getDefault(Settings.EMPTY),
S3Repository.CANNED_ACL_SETTING.getDefault(Settings.EMPTY),
S3Repository.STORAGE_CLASS_SETTING.getDefault(Settings.EMPTY),
S3Repository.UNSAFELY_INCOMPATIBLE_WITH_S3_CONDITIONAL_WRITES.getDefault(Settings.EMPTY),
S3Repository.UNSAFELY_INCOMPATIBLE_WITH_S3_CONDITIONAL_WRITES.getDefault(Settings.EMPTY) == Boolean.FALSE,
repositoryMetadata,
BigArrays.NON_RECYCLING_INSTANCE,
new DeterministicTaskQueue().getThreadPool(),
Expand Down Expand Up @@ -1381,6 +1382,76 @@ public void handle(HttpExchange exchange) throws IOException {
);
}

public void testCompareAndExchangeWithConcurrentPutObject() throws Exception {
final var blobContainerPath = BlobPath.EMPTY.add(getTestName());
final var statefulBlobContainer = createBlobContainer(1, null, null, null, null, null, blobContainerPath);

final var objectContentsRequestedLatch = new CountDownLatch(1);

@SuppressForbidden(reason = "use a http server")
class AwaitsListMultipartUploads extends S3HttpHandler {
AwaitsListMultipartUploads() {
super("bucket");
}

@Override
public void handle(HttpExchange exchange) throws IOException {
if (parseRequest(exchange).isGetObjectRequest()) {
// delay the overwrite until the CAS is checking the object contents, forcing a race
objectContentsRequestedLatch.countDown();
}
super.handle(exchange);
}
}

httpServer.createContext("/", new AwaitsListMultipartUploads());

final var blobName = randomIdentifier();
final var initialValue = randomBytesReference(8);
final var overwriteValue = randomValueOtherThan(initialValue, () -> randomBytesReference(8));
final var casTargetValue = randomValueOtherThanMany(
v -> v.equals(initialValue) || v.equals(overwriteValue),
() -> randomBytesReference(8)
);

statefulBlobContainer.writeBlobAtomic(randomPurpose(), blobName, initialValue, randomBoolean());

runInParallel(
() -> safeAwait(
l -> statefulBlobContainer.compareAndExchangeRegister(
randomPurpose(),
blobName,
initialValue,
casTargetValue,
l.map(result -> {
// Really anything can happen here: success (sees initialValue) or failure (sees overwriteValue), or contention
if (result.isPresent()) {
assertThat(
result.toString(),
result.bytesReference(),
anyOf(equalBytes(initialValue), equalBytes(overwriteValue))
);
}
return null;
})
)
),
() -> {
try {
safeAwait(objectContentsRequestedLatch);
statefulBlobContainer.writeBlobAtomic(randomPurpose(), blobName, overwriteValue, false);
} catch (IOException e) {
throw new AssertionError("writeBlobAtomic failed", e);
}
}
);

// If the CAS happens before the overwrite then we'll see the overwritten value, whereas if the CAS happens second then it will
// fail because the value was overwritten leaving the overwritten value in place. If, however, the CAS were not atomic with respect
// to other non-CAS-based writes, then we would see casTargetValue here:
assertThat(Streams.readFully(statefulBlobContainer.readBlob(randomPurpose(), blobName)), equalBytes(overwriteValue));
}

@Override
protected Matcher<Integer> getMaxRetriesMatcher(int maxRetries) {
// some attempts make meaningful progress and do not count towards the max retry limit
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package fixture.s3;

/**
* AWS S3 has weaker consistency for its multipart upload APIs than initially claimed (see support cases 10837136441 and 176070774900712)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side note, are these weaker consistencies to be fixed, or "features" now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Amazon have not indicated any interest in making changes in this area so we have to consider this a (somewhat-undocumented) feature of AWS S3 to be dealt with on our side.

* but strong consistency of conditional writes based on the {@code If-Match} and {@code If-None-Match} headers. Other object storage
* suppliers have decided instead to implement strongly-consistent multipart upload APIs and ignore the conditional writes headers. We
* verify Elasticsearch's behaviour against both models.
*/
public enum S3ConsistencyModel {
/**
* The model implemented by AWS S3: multipart upload APIs are somewhat weak (e.g. aborts may return while the write operation is still
* in flight) but conditional writes work as expected.
*/
AWS_DEFAULT(true, false),

/**
* The alternative model verified by these tests: the multipart upload APIs are strongly consistent, but the {@code If-Match} and
* {@code If-None-Match} headers are ignored and all writes are unconditional.
*/
STRONG_MPUS(false, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible for a model to have both?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but that's not a very interesting case to test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uninteresting because it should just work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes pretty much: if both of the cases here work then the (true,true) case will work automatically. Each of these flags excludes certain kinds of misbehaviour, and the point is that excluding either of those classes yields a correct result. Excluding both classes is stronger than excluding either one of them.


private final boolean conditionalWrites;
private final boolean strongMultipartUploads;

S3ConsistencyModel(boolean conditionalWrites, boolean strongMultipartUploads) {
this.conditionalWrites = conditionalWrites;
this.strongMultipartUploads = strongMultipartUploads;
}

public boolean hasStrongMultipartUploads() {
return strongMultipartUploads;
}

public boolean hasConditionalWrites() {
return conditionalWrites;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ public void handle(final HttpExchange exchange) throws IOException {
throw e;
}
}

@Override
protected S3ConsistencyModel consistencyModel() {
return S3HttpFixture.this.consistencyModel();
}
};
}

Expand Down Expand Up @@ -109,4 +114,8 @@ protected void after() {
ThreadPool.terminate(executorService, 10, TimeUnit.SECONDS);
}
}

protected S3ConsistencyModel consistencyModel() {
return S3ConsistencyModel.AWS_DEFAULT;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a benefit in randomising this between AWS_DEFAULT and STRONG_MPUS? (Not here, because we'd want multiple consistencyModel calls to be consistent, but as a class variable?). Would this remove the need to have the S3RepositoryAnalysisStrongMpusRestIT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to use randomisation for parameters that positively shouldn't matter for the purposes of the test, but this one really does matter so it's better to test both. I tried to do tricks with parameterisation etc. but it didn't work out very nicely since we construct the s3Fixture statically. There are some other options (e.g. we could make it mutable and set it during the test run) but nothing seemed totally ideal.

}
}
Loading
Loading