Skip to content

Commit

Permalink
Fail S3 repository analysis on partial reads (#102840)
Browse files Browse the repository at this point in the history
Today when reading a blob from a S3 repository we will resume a download
on a partial success. If this happens concurrently with a blob overwrite
then we may resume the download against a blob with updated contents,
which causes a checksum mismatch. A checksum mismatch during an
overwrite suggests an atomicity failure, which can be misleading to
users. With this commit we consider partial downloads during repository
analysis as immediate errors instead, clarifying the repository problem.

Relates #101100
  • Loading branch information
DaveCTurner committed Dec 4, 2023
1 parent 5a4d4b3 commit aedbe68
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 7 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/102840.yaml
@@ -0,0 +1,5 @@
pr: 102840
summary: Fail S3 repository analysis on partial reads
area: Snapshot/Restore
type: enhancement
issues: []
Expand Up @@ -166,6 +166,12 @@ private void ensureOpen() {
}

private void reopenStreamOrFail(IOException e) throws IOException {
if (purpose == OperationPurpose.REPOSITORY_ANALYSIS) {
logger.warn(() -> format("""
failed reading [%s/%s] at offset [%s]""", blobStore.bucket(), blobKey, start + currentOffset), e);
throw e;
}

final int maxAttempts = blobStore.getMaxRetries() + 1;

final long meaningfulProgressSize = Math.max(1L, blobStore.bufferSizeInBytes() / 100L);
Expand Down
Expand Up @@ -14,6 +14,7 @@
import com.sun.net.httpserver.HttpHandler;

import org.apache.http.HttpStatus;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
Expand All @@ -36,6 +37,7 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.env.Environment;
import org.elasticsearch.repositories.blobstore.AbstractBlobContainerRetriesTestCase;
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
import org.elasticsearch.telemetry.metric.MeterRegistry;
import org.hamcrest.Matcher;
import org.junit.After;
Expand Down Expand Up @@ -519,7 +521,7 @@ public void handle(HttpExchange exchange) throws IOException {

httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_blob_max_retries"), new FlakyReadHandler());

try (InputStream inputStream = blobContainer.readBlob(randomPurpose(), "read_blob_max_retries")) {
try (InputStream inputStream = blobContainer.readBlob(randomRetryingPurpose(), "read_blob_max_retries")) {
final int readLimit;
final InputStream wrappedStream;
if (randomBoolean()) {
Expand All @@ -535,12 +537,53 @@ public void handle(HttpExchange exchange) throws IOException {
}
}

public void testReadDoesNotRetryForRepositoryAnalysis() {
final int maxRetries = between(0, 5);
final int bufferSizeBytes = scaledRandomIntBetween(
0,
randomFrom(1000, Math.toIntExact(S3Repository.BUFFER_SIZE_SETTING.get(Settings.EMPTY).getBytes()))
);
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, ByteSizeValue.ofBytes(bufferSizeBytes));

final byte[] bytes = randomBlobContent();

@SuppressForbidden(reason = "use a http server")
class FlakyReadHandler implements HttpHandler {
private int failureCount;

@Override
public void handle(HttpExchange exchange) throws IOException {
if (failureCount != 0) {
ExceptionsHelper.maybeDieOnAnotherThread(new AssertionError("failureCount=" + failureCount));
}
failureCount += 1;
Streams.readFully(exchange.getRequestBody());
sendIncompleteContent(exchange, bytes);
exchange.close();
}
}

httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_blob_repo_analysis"), new FlakyReadHandler());

expectThrows(Exception.class, () -> {
try (InputStream inputStream = blobContainer.readBlob(OperationPurpose.REPOSITORY_ANALYSIS, "read_blob_repo_analysis")) {
final byte[] bytesRead = BytesReference.toBytes(Streams.readFully(inputStream));
assertArrayEquals(Arrays.copyOfRange(bytes, 0, bytes.length), bytesRead);
}
});
}

@Override
protected Matcher<Integer> getMaxRetriesMatcher(int maxRetries) {
// some attempts make meaningful progress and do not count towards the max retry limit
return allOf(greaterThanOrEqualTo(maxRetries), lessThanOrEqualTo(S3RetryingInputStream.MAX_SUPPRESSED_EXCEPTIONS));
}

@Override
protected OperationPurpose randomRetryingPurpose() {
return randomValueOtherThan(OperationPurpose.REPOSITORY_ANALYSIS, BlobStoreTestUtil::randomPurpose);
}

/**
* Asserts that an InputStream is fully consumed, or aborted, when it is closed
*/
Expand Down
Expand Up @@ -14,6 +14,7 @@
import org.apache.http.ConnectionClosedException;
import org.apache.http.HttpStatus;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.OperationPurpose;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.unit.ByteSizeValue;
Expand Down Expand Up @@ -146,7 +147,7 @@ public void testReadBlobWithRetries() throws Exception {
}
});

try (InputStream inputStream = blobContainer.readBlob(randomPurpose(), "read_blob_max_retries")) {
try (InputStream inputStream = blobContainer.readBlob(randomRetryingPurpose(), "read_blob_max_retries")) {
final int readLimit;
final InputStream wrappedStream;
if (randomBoolean()) {
Expand Down Expand Up @@ -212,7 +213,7 @@ public void testReadRangeBlobWithRetries() throws Exception {

final int position = randomIntBetween(0, bytes.length - 1);
final int length = randomIntBetween(0, randomBoolean() ? bytes.length : Integer.MAX_VALUE);
try (InputStream inputStream = blobContainer.readBlob(randomPurpose(), "read_range_blob_max_retries", position, length)) {
try (InputStream inputStream = blobContainer.readBlob(randomRetryingPurpose(), "read_range_blob_max_retries", position, length)) {
final int readLimit;
final InputStream wrappedStream;
if (randomBoolean()) {
Expand Down Expand Up @@ -269,8 +270,8 @@ public void testReadBlobWithReadTimeouts() {
exception = expectThrows(Exception.class, () -> {
try (
InputStream stream = randomBoolean()
? blobContainer.readBlob(randomPurpose(), "read_blob_incomplete")
: blobContainer.readBlob(randomPurpose(), "read_blob_incomplete", position, length)
? blobContainer.readBlob(randomRetryingPurpose(), "read_blob_incomplete")
: blobContainer.readBlob(randomRetryingPurpose(), "read_blob_incomplete", position, length)
) {
Streams.readFully(stream);
}
Expand All @@ -289,6 +290,10 @@ protected org.hamcrest.Matcher<Integer> getMaxRetriesMatcher(int maxRetries) {
return equalTo(maxRetries);
}

protected OperationPurpose randomRetryingPurpose() {
return randomPurpose();
}

public void testReadBlobWithNoHttpResponse() {
final TimeValue readTimeout = TimeValue.timeValueMillis(between(100, 200));
final BlobContainer blobContainer = createBlobContainer(randomInt(5), readTimeout, null, null);
Expand Down Expand Up @@ -323,8 +328,8 @@ public void testReadBlobWithPrematureConnectionClose() {
final Exception exception = expectThrows(Exception.class, () -> {
try (
InputStream stream = randomBoolean()
? blobContainer.readBlob(randomPurpose(), "read_blob_incomplete", 0, 1)
: blobContainer.readBlob(randomPurpose(), "read_blob_incomplete")
? blobContainer.readBlob(randomRetryingPurpose(), "read_blob_incomplete", 0, 1)
: blobContainer.readBlob(randomRetryingPurpose(), "read_blob_incomplete")
) {
Streams.readFully(stream);
}
Expand Down

0 comments on commit aedbe68

Please sign in to comment.