Skip to content

Commit

Permalink
Retry after all S3 get failures that made progress (#88015)
Browse files Browse the repository at this point in the history
S3 sometimes enters a state where blob downloads repeatedly fail but
with nontrivial progress between failures. Often each attempt yields 10s
or 100s of MBs of data. Today we abort a download after three (by
default) such failures, but this may not be enough to completely
retrieve a large blob during one of these flaky patches.

With this commit we start to avoid counting download attempts that
retrieved at least 1% of the configured `buffer_size` (typically 1MB)
towards the maximum number of retries.

Closes #87243
  • Loading branch information
DaveCTurner committed Jun 30, 2022
1 parent c2f4571 commit 71aeebe
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 27 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/88015.yaml
@@ -0,0 +1,6 @@
pr: 88015
summary: Retry after all S3 get failures that made progress
area: Snapshot/Restore
type: enhancement
issues:
- 87243
Expand Up @@ -16,6 +16,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.Version;
import org.elasticsearch.core.IOUtils;

Expand Down Expand Up @@ -44,12 +45,13 @@ class S3RetryingInputStream extends InputStream {
private final String blobKey;
private final long start;
private final long end;
private final int maxAttempts;
private final List<IOException> failures;

private S3ObjectInputStream currentStream;
private long currentStreamFirstOffset;
private long currentStreamLastOffset;
private int attempt = 1;
private int failuresAfterMeaningfulProgress = 0;
private long currentOffset;
private boolean closed;
private boolean eof;
Expand All @@ -68,7 +70,6 @@ class S3RetryingInputStream extends InputStream {
}
this.blobStore = blobStore;
this.blobKey = blobKey;
this.maxAttempts = blobStore.getMaxRetries() + 1;
this.failures = new ArrayList<>(MAX_SUPPRESSED_EXCEPTIONS);
this.start = start;
this.end = end;
Expand All @@ -85,7 +86,8 @@ private void openStream() throws IOException {
getObjectRequest.setRange(Math.addExact(start, currentOffset), end);
}
final S3Object s3Object = SocketAccess.doPrivileged(() -> clientReference.client().getObject(getObjectRequest));
this.currentStreamLastOffset = Math.addExact(Math.addExact(start, currentOffset), getStreamLength(s3Object));
this.currentStreamFirstOffset = Math.addExact(start, currentOffset);
this.currentStreamLastOffset = Math.addExact(currentStreamFirstOffset, getStreamLength(s3Object));
this.currentStream = s3Object.getObjectContent();
} catch (final AmazonClientException e) {
if (e instanceof AmazonS3Exception amazonS3Exception) {
Expand Down Expand Up @@ -160,31 +162,32 @@ private void ensureOpen() {
}

private void reopenStreamOrFail(IOException e) throws IOException {
if (attempt >= maxAttempts) {
logger.debug(
() -> format(
"failed reading [%s/%s] at offset [%s], attempt [%s] of [%s], giving up",
blobStore.bucket(),
blobKey,
start + currentOffset,
attempt,
maxAttempts
),
e
);
throw addSuppressedExceptions(e);
final int maxAttempts = blobStore.getMaxRetries() + 1;

final long meaningfulProgressSize = Math.max(1L, blobStore.bufferSizeInBytes() / 100L);
final long currentStreamProgress = Math.subtractExact(Math.addExact(start, currentOffset), currentStreamFirstOffset);
if (currentStreamProgress >= meaningfulProgressSize) {
failuresAfterMeaningfulProgress += 1;
}
logger.debug(
() -> format(
"failed reading [%s/%s] at offset [%s], attempt [%s] of [%s], retrying",
blobStore.bucket(),
blobKey,
start + currentOffset,
attempt,
maxAttempts
),
e
final Supplier<String> messageSupplier = () -> format(
"""
failed reading [%s/%s] at offset [%s]; this was attempt [%s] to read this blob which yielded [%s] bytes; in total \
[%s] of the attempts to read this blob have made meaningful progress and do not count towards the maximum number of \
retries; the maximum number of read attempts which do not make meaningful progress is [%s]""",
blobStore.bucket(),
blobKey,
start + currentOffset,
attempt,
currentStreamProgress,
failuresAfterMeaningfulProgress,
maxAttempts
);
if (attempt >= maxAttempts + failuresAfterMeaningfulProgress) {
final var finalException = addSuppressedExceptions(e);
logger.warn(messageSupplier, finalException);
throw finalException;
}
logger.debug(messageSupplier, e);
attempt += 1;
if (failures.size() < MAX_SUPPRESSED_EXCEPTIONS) {
failures.add(e);
Expand Down
Expand Up @@ -10,6 +10,8 @@
import com.amazonaws.SdkClientException;
import com.amazonaws.services.s3.internal.MD5DigestCalculatingInputStream;
import com.amazonaws.util.Base16;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;

import org.apache.http.HttpStatus;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
Expand Down Expand Up @@ -43,6 +45,7 @@
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -57,6 +60,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;

/**
* This class tests how a {@link S3BlobContainer} and its underlying AWS S3 client are retrying requests when reading or writing blobs.
Expand Down Expand Up @@ -439,6 +443,79 @@ public void testWriteLargeBlobStreaming() throws Exception {
assertEquals(blobSize, bytesReceived.get());
}

public void testReadRetriesAfterMeaningfulProgress() throws Exception {
final int maxRetries = between(0, 5);
final int bufferSizeBytes = scaledRandomIntBetween(
0,
randomFrom(1000, Math.toIntExact(S3Repository.BUFFER_SIZE_SETTING.get(Settings.EMPTY).getBytes()))
);
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, new ByteSizeValue(bufferSizeBytes));
final int meaningfulProgressBytes = Math.max(1, bufferSizeBytes / 100);

final byte[] bytes = randomBlobContent();

@SuppressForbidden(reason = "use a http server")
class FlakyReadHandler implements HttpHandler {
private int failuresWithoutProgress;

@Override
public void handle(HttpExchange exchange) throws IOException {
Streams.readFully(exchange.getRequestBody());
if (failuresWithoutProgress >= maxRetries) {
final int rangeStart = getRangeStart(exchange);
assertThat(rangeStart, lessThan(bytes.length));
exchange.getResponseHeaders().add("Content-Type", bytesContentType());
final var remainderLength = bytes.length - rangeStart;
exchange.sendResponseHeaders(HttpStatus.SC_OK, remainderLength);
exchange.getResponseBody()
.write(
bytes,
rangeStart,
remainderLength < meaningfulProgressBytes ? remainderLength : between(meaningfulProgressBytes, remainderLength)
);
} else if (randomBoolean()) {
failuresWithoutProgress += 1;
exchange.sendResponseHeaders(
randomFrom(
HttpStatus.SC_INTERNAL_SERVER_ERROR,
HttpStatus.SC_BAD_GATEWAY,
HttpStatus.SC_SERVICE_UNAVAILABLE,
HttpStatus.SC_GATEWAY_TIMEOUT
),
-1
);
} else if (randomBoolean()) {
final var bytesSent = sendIncompleteContent(exchange, bytes);
if (bytesSent < meaningfulProgressBytes) {
failuresWithoutProgress += 1;
} else {
exchange.getResponseBody().flush();
}
} else {
failuresWithoutProgress += 1;
}
exchange.close();
}
}

httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_blob_max_retries"), new FlakyReadHandler());

try (InputStream inputStream = blobContainer.readBlob("read_blob_max_retries")) {
final int readLimit;
final InputStream wrappedStream;
if (randomBoolean()) {
// read stream only partly
readLimit = randomIntBetween(0, bytes.length);
wrappedStream = Streams.limitStream(inputStream, readLimit);
} else {
readLimit = bytes.length;
wrappedStream = inputStream;
}
final byte[] bytesRead = BytesReference.toBytes(Streams.readFully(wrappedStream));
assertArrayEquals(Arrays.copyOfRange(bytes, 0, readLimit), bytesRead);
}
}

/**
* Asserts that an InputStream is fully consumed, or aborted, when it is closed
*/
Expand Down
Expand Up @@ -369,7 +369,7 @@ protected static OptionalInt getRangeEnd(HttpExchange exchange) {
return OptionalInt.of(Math.toIntExact(rangeEnd));
}

protected void sendIncompleteContent(HttpExchange exchange, byte[] bytes) throws IOException {
protected int sendIncompleteContent(HttpExchange exchange, byte[] bytes) throws IOException {
final int rangeStart = getRangeStart(exchange);
assertThat(rangeStart, lessThan(bytes.length));
final OptionalInt rangeEnd = getRangeEnd(exchange);
Expand All @@ -391,6 +391,7 @@ protected void sendIncompleteContent(HttpExchange exchange, byte[] bytes) throws
if (randomBoolean()) {
exchange.getResponseBody().flush();
}
return bytesToSend;
}

/**
Expand Down

0 comments on commit 71aeebe

Please sign in to comment.