From 9865c449dedce26d11ae4ea94c38fa664cdae2e8 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 11 Sep 2019 12:01:36 +0100 Subject: [PATCH 01/12] Resume partial download from S3 on connection drop Today if the connection to S3 times out or drops after starting to download an object then the SDK does not attempt to recover or resume the download, causing the restore of the whole shard to fail and retry. This commit allows Elasticsearch to detect such a mid-stream failure and to resume the download from where it failed. --- .../repositories/s3/S3BlobContainer.java | 16 +-- .../repositories/s3/S3BlobStore.java | 4 + .../s3/S3RetryingInputStream.java | 129 ++++++++++++++++++ .../repositories/s3/S3Service.java | 2 +- .../s3/S3BlobContainerRetriesTests.java | 111 ++++++++++++--- 5 files changed, 227 insertions(+), 35 deletions(-) create mode 100644 plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java index 46910d840cd0f..34475544882f9 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java @@ -21,7 +21,6 @@ import com.amazonaws.AmazonClientException; import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; -import com.amazonaws.services.s3.model.AmazonS3Exception; import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; @@ -31,7 +30,6 @@ import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.PartETag; import com.amazonaws.services.s3.model.PutObjectRequest; -import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.UploadPartRequest; import com.amazonaws.services.s3.model.UploadPartResult; import org.apache.lucene.util.SetOnce; @@ -48,7 +46,6 @@ import java.io.IOException; import java.io.InputStream; -import java.nio.file.NoSuchFileException; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -81,18 +78,7 @@ class S3BlobContainer extends AbstractBlobContainer { @Override public InputStream readBlob(String blobName) throws IOException { - try (AmazonS3Reference clientReference = blobStore.clientReference()) { - final S3Object s3Object = SocketAccess.doPrivileged(() -> clientReference.client().getObject(blobStore.bucket(), - buildKey(blobName))); - return s3Object.getObjectContent(); - } catch (final AmazonClientException e) { - if (e instanceof AmazonS3Exception) { - if (404 == ((AmazonS3Exception) e).getStatusCode()) { - throw new NoSuchFileException("Blob object [" + blobName + "] not found: " + e.getMessage()); - } - } - throw e; - } + return new S3RetryingInputStream(blobStore, buildKey(blobName)); } /** diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java index fcded00553580..a8cb87a5526f4 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java @@ -68,6 +68,10 @@ public AmazonS3Reference clientReference() { return service.client(repositoryMetaData); } + int getMaxRetries() { + return service.settings(repositoryMetaData).maxRetries; + } + public String bucket() { return bucket; } diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java new file mode 100644 index 0000000000000..22faae2dc86ff --- /dev/null +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java @@ -0,0 +1,129 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.repositories.s3; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.S3Object; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.NoSuchFileException; + +class S3RetryingInputStream extends InputStream { + + private static final Logger logger = LogManager.getLogger(S3RetryingInputStream.class); + + private final S3BlobStore blobStore; + private final String blobKey; + private final int maxAttempts; + + private InputStream currentStream; + private long currentOffset; + + S3RetryingInputStream(S3BlobStore blobStore, String blobKey) throws IOException { + this.blobStore = blobStore; + this.blobKey = blobKey; + this.maxAttempts = blobStore.getMaxRetries() + 1; + currentStream = openStream(); + } + + private InputStream openStream() throws IOException { + try (AmazonS3Reference clientReference = blobStore.clientReference()) { + final GetObjectRequest getObjectRequest = new GetObjectRequest(blobStore.bucket(), blobKey); + if (currentOffset > 0) { + getObjectRequest.setRange(currentOffset); + } + final S3Object s3Object = SocketAccess.doPrivileged(() -> clientReference.client().getObject(getObjectRequest)); + return s3Object.getObjectContent(); + } catch (final AmazonClientException e) { + if (e instanceof AmazonS3Exception) { + if (404 == ((AmazonS3Exception) e).getStatusCode()) { + throw new NoSuchFileException("Blob object [" + blobKey + "] not found: " + e.getMessage()); + } + } + throw e; + } + } + + @Override + public int read() throws IOException { + int attempt = 0; + while (true) { + attempt += 1; + try { + final int result = currentStream.read(); + currentOffset += 1; + return result; + } catch (IOException e) { + if (attempt >= maxAttempts) { + throw e; + } + logger.debug(new ParameterizedMessage("failed reading [{}/{}] at offset [{}], attempt [{}] of [{}], retrying", + blobStore.bucket(), blobKey, currentOffset, attempt, maxAttempts), e); + currentStream.close(); + currentStream = openStream(); + } + } + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + int attempt = 0; + while (true) { + attempt += 1; + try { + final int bytesRead = currentStream.read(b, off, len); + if (bytesRead == -1) { + return -1; + } + currentOffset += bytesRead; + return bytesRead; + } catch (IOException e) { + if (attempt >= maxAttempts) { + throw e; + } + logger.debug(new ParameterizedMessage("failed reading [{}/{}] at offset [{}], attempt [{}] of [{}], retrying", + blobStore.bucket(), blobKey, currentOffset, attempt, maxAttempts), e); + currentStream.close(); + currentStream = openStream(); + } + } + } + + @Override + public void close() throws IOException { + currentStream.close(); + super.close(); + } + + @Override + public long skip(long n) { + throw new UnsupportedOperationException("S3RetryingInputStream does not support seeking"); + } + + @Override + public synchronized void reset() { + throw new UnsupportedOperationException("S3RetryingInputStream does not support seeking"); + } +} diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java index f7ae303a1c2aa..da0ac1c97222e 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java @@ -106,7 +106,7 @@ public AmazonS3Reference client(RepositoryMetaData repositoryMetaData) { * @param repositoryMetaData Repository Metadata * @return S3ClientSettings */ - private S3ClientSettings settings(RepositoryMetaData repositoryMetaData) { + S3ClientSettings settings(RepositoryMetaData repositoryMetaData) { final String clientName = S3Repository.CLIENT_NAME.get(repositoryMetaData.settings()); final S3ClientSettings staticSettings = staticClientSettings.get(clientName); if (staticSettings != null) { diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java index 2c97ae2b5fa04..2fb6696c28da5 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -21,8 +21,11 @@ import com.amazonaws.SdkClientException; import com.amazonaws.services.s3.internal.MD5DigestCalculatingInputStream; import com.amazonaws.util.Base16; +import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpServer; +import org.apache.http.ConnectionClosedException; import org.apache.http.HttpStatus; +import org.apache.http.NoHttpResponseException; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.SuppressForbidden; @@ -51,12 +54,15 @@ import java.net.InetSocketAddress; import java.net.SocketTimeoutException; import java.nio.charset.StandardCharsets; +import java.nio.file.NoSuchFileException; import java.util.Arrays; import java.util.Locale; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import static org.elasticsearch.repositories.s3.S3ClientSettings.DISABLE_CHUNKED_ENCODING; import static org.elasticsearch.repositories.s3.S3ClientSettings.ENDPOINT_SETTING; @@ -67,6 +73,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; /** * This class tests how a {@link S3BlobContainer} and its underlying AWS S3 client are retrying requests when reading or writing blobs. @@ -130,26 +137,42 @@ private BlobContainer createBlobContainer(final @Nullable Integer maxRetries, repositoryMetaData)); } + public void testReadNonexistentBlobThrowsNoSuchFileException() { + final BlobContainer blobContainer = createBlobContainer(between(1, 5), null, null, null); + final Exception exception = expectThrows(NoSuchFileException.class, () -> blobContainer.readBlob("read_nonexistent_blob")); + assertThat(exception.getMessage().toLowerCase(Locale.ROOT), + containsString("blob object [read_nonexistent_blob] not found")); + } + public void testReadBlobWithRetries() throws Exception { final int maxRetries = randomInt(5); final CountDown countDown = new CountDown(maxRetries + 1); - final byte[] bytes = randomByteArrayOfLength(randomIntBetween(1, 512)); + final byte[] bytes = randomBlobContent(); httpServer.createContext("/bucket/read_blob_max_retries", exchange -> { Streams.readFully(exchange.getRequestBody()); if (countDown.countDown()) { + final int rangeStart = getRangeStart(exchange); + assertThat(rangeStart, lessThan(bytes.length)); exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=utf-8"); - exchange.sendResponseHeaders(HttpStatus.SC_OK, bytes.length); - exchange.getResponseBody().write(bytes); + exchange.sendResponseHeaders(HttpStatus.SC_OK, bytes.length - rangeStart); + exchange.getResponseBody().write(bytes, rangeStart, bytes.length - rangeStart); exchange.close(); return; } - exchange.sendResponseHeaders(randomFrom(HttpStatus.SC_INTERNAL_SERVER_ERROR, HttpStatus.SC_BAD_GATEWAY, - HttpStatus.SC_SERVICE_UNAVAILABLE, HttpStatus.SC_GATEWAY_TIMEOUT), -1); - exchange.close(); + if (randomBoolean()) { + exchange.sendResponseHeaders(randomFrom(HttpStatus.SC_INTERNAL_SERVER_ERROR, HttpStatus.SC_BAD_GATEWAY, + HttpStatus.SC_SERVICE_UNAVAILABLE, HttpStatus.SC_GATEWAY_TIMEOUT), -1); + } else if (randomBoolean()) { + sendIncompleteContent(exchange, bytes); + } + if (randomBoolean()) { + exchange.close(); + } }); - final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null); + final TimeValue readTimeout = TimeValue.timeValueMillis(between(100, 500)); + final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null); try (InputStream inputStream = blobContainer.readBlob("read_blob_max_retries")) { assertArrayEquals(bytes, BytesReference.toBytes(Streams.readFully(inputStream))); assertThat(countDown.isCountedDown(), is(true)); @@ -157,8 +180,8 @@ public void testReadBlobWithRetries() throws Exception { } public void testReadBlobWithReadTimeouts() { - final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500)); - final BlobContainer blobContainer = createBlobContainer(1, readTimeout, null, null); + final TimeValue readTimeout = TimeValue.timeValueMillis(between(100, 200)); + final BlobContainer blobContainer = createBlobContainer(between(1, 5), readTimeout, null, null); // HTTP server does not send a response httpServer.createContext("/bucket/read_blob_unresponsive", exchange -> {}); @@ -168,15 +191,8 @@ public void testReadBlobWithReadTimeouts() { assertThat(exception.getCause(), instanceOf(SocketTimeoutException.class)); // HTTP server sends a partial response - final byte[] bytes = randomByteArrayOfLength(randomIntBetween(10, 128)); - httpServer.createContext("/bucket/read_blob_incomplete", exchange -> { - exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=utf-8"); - exchange.sendResponseHeaders(HttpStatus.SC_OK, bytes.length); - exchange.getResponseBody().write(bytes, 0, randomIntBetween(1, bytes.length - 1)); - if (randomBoolean()) { - exchange.getResponseBody().flush(); - } - }); + final byte[] bytes = randomBlobContent(); + httpServer.createContext("/bucket/read_blob_incomplete", exchange -> sendIncompleteContent(exchange, bytes)); exception = expectThrows(SocketTimeoutException.class, () -> { try (InputStream stream = blobContainer.readBlob("read_blob_incomplete")) { @@ -186,11 +202,64 @@ public void testReadBlobWithReadTimeouts() { assertThat(exception.getMessage().toLowerCase(Locale.ROOT), containsString("read timed out")); } + public void testReadBlobWithPrematureConnectionClose() { + final BlobContainer blobContainer = createBlobContainer(between(1, 5), null, null, null); + + // HTTP server closes connection immediately + httpServer.createContext("/bucket/read_blob_no_response", HttpExchange::close); + + Exception exception = expectThrows(SdkClientException.class, () -> blobContainer.readBlob("read_blob_no_response")); + assertThat(exception.getMessage().toLowerCase(Locale.ROOT), containsString("the target server failed to respond")); + assertThat(exception.getCause(), instanceOf(NoHttpResponseException.class)); + + // HTTP server sends a partial response + final byte[] bytes = randomBlobContent(); + httpServer.createContext("/bucket/read_blob_incomplete", exchange -> { + sendIncompleteContent(exchange, bytes); + exchange.close(); + }); + + exception = expectThrows(ConnectionClosedException.class, () -> { + try (InputStream stream = blobContainer.readBlob("read_blob_incomplete")) { + Streams.readFully(stream); + } + }); + assertThat(exception.getMessage().toLowerCase(Locale.ROOT), + containsString("premature end of content-length delimited message body")); + } + + private static int getRangeStart(HttpExchange exchange) { + final int rangeStart; + final String rangeHeader = exchange.getRequestHeaders().getFirst("Range"); + if (rangeHeader == null) { + rangeStart = 0; + } else { + final Matcher matcher = Pattern.compile("^bytes=([0-9]+)-9223372036854775806$").matcher(rangeHeader); + assertTrue(rangeHeader + " matches expected pattern", matcher.matches()); + rangeStart = Math.toIntExact(Long.parseLong(matcher.group(1))); + } + return rangeStart; + } + + private void sendIncompleteContent(HttpExchange exchange, byte[] bytes) throws IOException { + final int rangeStart = getRangeStart(exchange); + assertThat(rangeStart, lessThan(bytes.length)); + exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=utf-8"); + exchange.sendResponseHeaders(HttpStatus.SC_OK, bytes.length - rangeStart); + final int bytesToSend = randomIntBetween(0, bytes.length - rangeStart - 1); + if (bytesToSend > 0) { + exchange.getResponseBody().write(bytes, rangeStart, bytesToSend); + } + if (randomBoolean()) { + exchange.getResponseBody().flush(); + } + } + public void testWriteBlobWithRetries() throws Exception { final int maxRetries = randomInt(5); final CountDown countDown = new CountDown(maxRetries + 1); - final byte[] bytes = randomByteArrayOfLength(randomIntBetween(1, frequently() ? 512 : 1 << 20)); // rarely up to 1mb + final byte[] bytes = randomBlobContent(); httpServer.createContext("/bucket/write_blob_max_retries", exchange -> { if ("PUT".equals(exchange.getRequestMethod()) && exchange.getRequestURI().getQuery() == null) { if (countDown.countDown()) { @@ -224,6 +293,10 @@ public void testWriteBlobWithRetries() throws Exception { assertThat(countDown.isCountedDown(), is(true)); } + private byte[] randomBlobContent() { + return randomByteArrayOfLength(randomIntBetween(1, frequently() ? 512 : 1 << 20)); // rarely up to 1mb + } + public void testWriteBlobWithReadTimeouts() { final byte[] bytes = randomByteArrayOfLength(randomIntBetween(10, 128)); final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500)); From 35fb904417f79f16be5e5378f29d180eeecdeea2 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 11 Sep 2019 12:15:35 +0100 Subject: [PATCH 02/12] Simplify --- .../repositories/s3/S3BlobContainerRetriesTests.java | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java index 2fb6696c28da5..0bf43ad5c58e7 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -229,16 +229,14 @@ public void testReadBlobWithPrematureConnectionClose() { } private static int getRangeStart(HttpExchange exchange) { - final int rangeStart; final String rangeHeader = exchange.getRequestHeaders().getFirst("Range"); if (rangeHeader == null) { - rangeStart = 0; - } else { - final Matcher matcher = Pattern.compile("^bytes=([0-9]+)-9223372036854775806$").matcher(rangeHeader); - assertTrue(rangeHeader + " matches expected pattern", matcher.matches()); - rangeStart = Math.toIntExact(Long.parseLong(matcher.group(1))); + return 0; } - return rangeStart; + + final Matcher matcher = Pattern.compile("^bytes=([0-9]+)-9223372036854775806$").matcher(rangeHeader); + assertTrue(rangeHeader + " matches expected pattern", matcher.matches()); + return Math.toIntExact(Long.parseLong(matcher.group(1))); } private void sendIncompleteContent(HttpExchange exchange, byte[] bytes) throws IOException { From ad43a77c519e60be1b12d3e1158d515b93813a6a Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 11 Sep 2019 13:15:08 +0100 Subject: [PATCH 03/12] inline noop --- .../org/elasticsearch/repositories/s3/S3RetryingInputStream.java | 1 - 1 file changed, 1 deletion(-) diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java index 22faae2dc86ff..48271e350894c 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java @@ -114,7 +114,6 @@ public int read(byte[] b, int off, int len) throws IOException { @Override public void close() throws IOException { currentStream.close(); - super.close(); } @Override From ebd4e08e22cebe9547ce0ea99b802f177ac74829 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 11 Sep 2019 13:16:14 +0100 Subject: [PATCH 04/12] Suppress exceptions when closing --- .../elasticsearch/repositories/s3/S3RetryingInputStream.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java index 48271e350894c..f5092dcbab821 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java @@ -25,6 +25,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.core.internal.io.IOUtils; import java.io.IOException; import java.io.InputStream; @@ -81,7 +82,7 @@ public int read() throws IOException { } logger.debug(new ParameterizedMessage("failed reading [{}/{}] at offset [{}], attempt [{}] of [{}], retrying", blobStore.bucket(), blobKey, currentOffset, attempt, maxAttempts), e); - currentStream.close(); + IOUtils.closeWhileHandlingException(currentStream); currentStream = openStream(); } } @@ -105,7 +106,7 @@ public int read(byte[] b, int off, int len) throws IOException { } logger.debug(new ParameterizedMessage("failed reading [{}/{}] at offset [{}], attempt [{}] of [{}], retrying", blobStore.bucket(), blobKey, currentOffset, attempt, maxAttempts), e); - currentStream.close(); + IOUtils.closeWhileHandlingException(currentStream); currentStream = openStream(); } } From f97cec83cd5005f8818de7130889d35a281d0269 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 11 Sep 2019 13:17:36 +0100 Subject: [PATCH 05/12] Extract common handler --- .../s3/S3RetryingInputStream.java | 26 +++++++++---------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java index f5092dcbab821..22f33c25a5841 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java @@ -77,13 +77,7 @@ public int read() throws IOException { currentOffset += 1; return result; } catch (IOException e) { - if (attempt >= maxAttempts) { - throw e; - } - logger.debug(new ParameterizedMessage("failed reading [{}/{}] at offset [{}], attempt [{}] of [{}], retrying", - blobStore.bucket(), blobKey, currentOffset, attempt, maxAttempts), e); - IOUtils.closeWhileHandlingException(currentStream); - currentStream = openStream(); + reopenStreamOrFail(attempt, e); } } } @@ -101,17 +95,21 @@ public int read(byte[] b, int off, int len) throws IOException { currentOffset += bytesRead; return bytesRead; } catch (IOException e) { - if (attempt >= maxAttempts) { - throw e; - } - logger.debug(new ParameterizedMessage("failed reading [{}/{}] at offset [{}], attempt [{}] of [{}], retrying", - blobStore.bucket(), blobKey, currentOffset, attempt, maxAttempts), e); - IOUtils.closeWhileHandlingException(currentStream); - currentStream = openStream(); + reopenStreamOrFail(attempt, e); } } } + private void reopenStreamOrFail(int attempt, IOException e) throws IOException { + if (attempt >= maxAttempts) { + throw e; + } + logger.debug(new ParameterizedMessage("failed reading [{}/{}] at offset [{}], attempt [{}] of [{}], retrying", + blobStore.bucket(), blobKey, currentOffset, attempt, maxAttempts), e); + IOUtils.closeWhileHandlingException(currentStream); + currentStream = openStream(); + } + @Override public void close() throws IOException { currentStream.close(); From 9a6f5c6e9845f027c0fcf06eae255dede2a549aa Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 12 Sep 2019 08:07:43 +0100 Subject: [PATCH 06/12] Add timebomb to ensure we remove this when no longer necessary --- .../repositories/s3/S3RetryingInputStream.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java index 22f33c25a5841..0c3fd44707e6f 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java @@ -26,11 +26,17 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.Version; import java.io.IOException; import java.io.InputStream; import java.nio.file.NoSuchFileException; +/** + * Wrapper around an S3 object that will retry the {@link GetObjectRequest} if the download fails part-way through, resuming from where + * the failure occurred. This should be handled by the SDK but it isn't today. This should be revisited in the future (e.g. before removing + * the {@link Version#V_7_0_0} version constant) and removed when the SDK handles retries itself. + */ class S3RetryingInputStream extends InputStream { private static final Logger logger = LogManager.getLogger(S3RetryingInputStream.class); From efb24229c85b4332e9d56ab32ef5019159d39e9e Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 12 Sep 2019 08:08:00 +0100 Subject: [PATCH 07/12] Count retries per blob not per read --- .../repositories/s3/S3RetryingInputStream.java | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java index 0c3fd44707e6f..2c937201d7a97 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java @@ -46,6 +46,7 @@ class S3RetryingInputStream extends InputStream { private final int maxAttempts; private InputStream currentStream; + private int attempt = 1; private long currentOffset; S3RetryingInputStream(S3BlobStore blobStore, String blobKey) throws IOException { @@ -75,24 +76,20 @@ private InputStream openStream() throws IOException { @Override public int read() throws IOException { - int attempt = 0; while (true) { - attempt += 1; try { final int result = currentStream.read(); currentOffset += 1; return result; } catch (IOException e) { - reopenStreamOrFail(attempt, e); + reopenStreamOrFail(e); } } } @Override public int read(byte[] b, int off, int len) throws IOException { - int attempt = 0; while (true) { - attempt += 1; try { final int bytesRead = currentStream.read(b, off, len); if (bytesRead == -1) { @@ -101,17 +98,18 @@ public int read(byte[] b, int off, int len) throws IOException { currentOffset += bytesRead; return bytesRead; } catch (IOException e) { - reopenStreamOrFail(attempt, e); + reopenStreamOrFail(e); } } } - private void reopenStreamOrFail(int attempt, IOException e) throws IOException { + private void reopenStreamOrFail(IOException e) throws IOException { if (attempt >= maxAttempts) { throw e; } logger.debug(new ParameterizedMessage("failed reading [{}/{}] at offset [{}], attempt [{}] of [{}], retrying", blobStore.bucket(), blobKey, currentOffset, attempt, maxAttempts), e); + attempt += 1; IOUtils.closeWhileHandlingException(currentStream); currentStream = openStream(); } From d9890c6ee12ecc6c24c6189a7f21e5583dfb248e Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 12 Sep 2019 08:12:28 +0100 Subject: [PATCH 08/12] Ensure we do not use the stream after close --- .../repositories/s3/S3RetryingInputStream.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java index 2c937201d7a97..496552e8f0175 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java @@ -48,6 +48,7 @@ class S3RetryingInputStream extends InputStream { private InputStream currentStream; private int attempt = 1; private long currentOffset; + private boolean closed; S3RetryingInputStream(S3BlobStore blobStore, String blobKey) throws IOException { this.blobStore = blobStore; @@ -76,6 +77,7 @@ private InputStream openStream() throws IOException { @Override public int read() throws IOException { + ensureOpen(); while (true) { try { final int result = currentStream.read(); @@ -89,6 +91,7 @@ public int read() throws IOException { @Override public int read(byte[] b, int off, int len) throws IOException { + ensureOpen(); while (true) { try { final int bytesRead = currentStream.read(b, off, len); @@ -103,6 +106,13 @@ public int read(byte[] b, int off, int len) throws IOException { } } + private void ensureOpen() { + if (closed) { + assert false : "using S3RetryingInputStream after close"; + throw new IllegalStateException("using S3RetryingInputStream after close"); + } + } + private void reopenStreamOrFail(IOException e) throws IOException { if (attempt >= maxAttempts) { throw e; @@ -117,6 +127,7 @@ private void reopenStreamOrFail(IOException e) throws IOException { @Override public void close() throws IOException { currentStream.close(); + closed = true; } @Override From 81f8a355b634dd63aa6005fcb3144fe67ac1a8bb Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 12 Sep 2019 08:14:50 +0100 Subject: [PATCH 09/12] Make test helpers static and collect at bottom --- .../s3/S3BlobContainerRetriesTests.java | 61 +++++++++---------- 1 file changed, 30 insertions(+), 31 deletions(-) diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java index 0bf43ad5c58e7..a1fbc6e5953f0 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -140,8 +140,7 @@ private BlobContainer createBlobContainer(final @Nullable Integer maxRetries, public void testReadNonexistentBlobThrowsNoSuchFileException() { final BlobContainer blobContainer = createBlobContainer(between(1, 5), null, null, null); final Exception exception = expectThrows(NoSuchFileException.class, () -> blobContainer.readBlob("read_nonexistent_blob")); - assertThat(exception.getMessage().toLowerCase(Locale.ROOT), - containsString("blob object [read_nonexistent_blob] not found")); + assertThat(exception.getMessage().toLowerCase(Locale.ROOT), containsString("blob object [read_nonexistent_blob] not found")); } public void testReadBlobWithRetries() throws Exception { @@ -228,31 +227,6 @@ public void testReadBlobWithPrematureConnectionClose() { containsString("premature end of content-length delimited message body")); } - private static int getRangeStart(HttpExchange exchange) { - final String rangeHeader = exchange.getRequestHeaders().getFirst("Range"); - if (rangeHeader == null) { - return 0; - } - - final Matcher matcher = Pattern.compile("^bytes=([0-9]+)-9223372036854775806$").matcher(rangeHeader); - assertTrue(rangeHeader + " matches expected pattern", matcher.matches()); - return Math.toIntExact(Long.parseLong(matcher.group(1))); - } - - private void sendIncompleteContent(HttpExchange exchange, byte[] bytes) throws IOException { - final int rangeStart = getRangeStart(exchange); - assertThat(rangeStart, lessThan(bytes.length)); - exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=utf-8"); - exchange.sendResponseHeaders(HttpStatus.SC_OK, bytes.length - rangeStart); - final int bytesToSend = randomIntBetween(0, bytes.length - rangeStart - 1); - if (bytesToSend > 0) { - exchange.getResponseBody().write(bytes, rangeStart, bytesToSend); - } - if (randomBoolean()) { - exchange.getResponseBody().flush(); - } - } - public void testWriteBlobWithRetries() throws Exception { final int maxRetries = randomInt(5); final CountDown countDown = new CountDown(maxRetries + 1); @@ -291,10 +265,6 @@ public void testWriteBlobWithRetries() throws Exception { assertThat(countDown.isCountedDown(), is(true)); } - private byte[] randomBlobContent() { - return randomByteArrayOfLength(randomIntBetween(1, frequently() ? 512 : 1 << 20)); // rarely up to 1mb - } - public void testWriteBlobWithReadTimeouts() { final byte[] bytes = randomByteArrayOfLength(randomIntBetween(10, 128)); final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500)); @@ -414,6 +384,35 @@ public void testWriteLargeBlob() throws Exception { assertThat(countDownComplete.isCountedDown(), is(true)); } + private static byte[] randomBlobContent() { + return randomByteArrayOfLength(randomIntBetween(1, frequently() ? 512 : 1 << 20)); // rarely up to 1mb + } + + private static int getRangeStart(HttpExchange exchange) { + final String rangeHeader = exchange.getRequestHeaders().getFirst("Range"); + if (rangeHeader == null) { + return 0; + } + + final Matcher matcher = Pattern.compile("^bytes=([0-9]+)-9223372036854775806$").matcher(rangeHeader); + assertTrue(rangeHeader + " matches expected pattern", matcher.matches()); + return Math.toIntExact(Long.parseLong(matcher.group(1))); + } + + private static void sendIncompleteContent(HttpExchange exchange, byte[] bytes) throws IOException { + final int rangeStart = getRangeStart(exchange); + assertThat(rangeStart, lessThan(bytes.length)); + exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=utf-8"); + exchange.sendResponseHeaders(HttpStatus.SC_OK, bytes.length - rangeStart); + final int bytesToSend = randomIntBetween(0, bytes.length - rangeStart - 1); + if (bytesToSend > 0) { + exchange.getResponseBody().write(bytes, rangeStart, bytesToSend); + } + if (randomBoolean()) { + exchange.getResponseBody().flush(); + } + } + /** * A resettable InputStream that only serves zeros. **/ From 410fa62856a6dd053b0b43e218263930cb69c0a5 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 12 Sep 2019 08:15:35 +0100 Subject: [PATCH 10/12] Unnecessary throws --- .../repositories/s3/S3BlobContainerRetriesTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java index a1fbc6e5953f0..9f03df9f8e2bd 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -483,7 +483,7 @@ public int available() throws IOException { } @Override - public void close() throws IOException { + public void close() { closed.set(true); } From 22f5703205dfde4f298d506bad01a3697efdef38 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 12 Sep 2019 08:56:24 +0100 Subject: [PATCH 11/12] Include a bounded number of suppressed exceptions on failure --- .../s3/S3RetryingInputStream.java | 21 ++++++++++++++++--- .../s3/S3BlobContainerRetriesTests.java | 17 +++++++++++---- 2 files changed, 31 insertions(+), 7 deletions(-) diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java index 496552e8f0175..46e9122ad1144 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java @@ -31,6 +31,8 @@ import java.io.IOException; import java.io.InputStream; import java.nio.file.NoSuchFileException; +import java.util.ArrayList; +import java.util.List; /** * Wrapper around an S3 object that will retry the {@link GetObjectRequest} if the download fails part-way through, resuming from where @@ -41,12 +43,15 @@ class S3RetryingInputStream extends InputStream { private static final Logger logger = LogManager.getLogger(S3RetryingInputStream.class); + static final int MAX_SUPPRESSED_EXCEPTIONS = 10; + private final S3BlobStore blobStore; private final String blobKey; private final int maxAttempts; private InputStream currentStream; private int attempt = 1; + private List failures = new ArrayList<>(MAX_SUPPRESSED_EXCEPTIONS); private long currentOffset; private boolean closed; @@ -68,10 +73,10 @@ private InputStream openStream() throws IOException { } catch (final AmazonClientException e) { if (e instanceof AmazonS3Exception) { if (404 == ((AmazonS3Exception) e).getStatusCode()) { - throw new NoSuchFileException("Blob object [" + blobKey + "] not found: " + e.getMessage()); + throw addSuppressedExceptions(new NoSuchFileException("Blob object [" + blobKey + "] not found: " + e.getMessage())); } } - throw e; + throw addSuppressedExceptions(e); } } @@ -115,11 +120,14 @@ private void ensureOpen() { private void reopenStreamOrFail(IOException e) throws IOException { if (attempt >= maxAttempts) { - throw e; + throw addSuppressedExceptions(e); } logger.debug(new ParameterizedMessage("failed reading [{}/{}] at offset [{}], attempt [{}] of [{}], retrying", blobStore.bucket(), blobKey, currentOffset, attempt, maxAttempts), e); attempt += 1; + if (failures.size() < MAX_SUPPRESSED_EXCEPTIONS) { + failures.add(e); + } IOUtils.closeWhileHandlingException(currentStream); currentStream = openStream(); } @@ -139,4 +147,11 @@ public long skip(long n) { public synchronized void reset() { throw new UnsupportedOperationException("S3RetryingInputStream does not support seeking"); } + + private T addSuppressedExceptions(T e) { + for (IOException failure : failures) { + e.addSuppressed(failure); + } + return e; + } } diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java index 9f03df9f8e2bd..7060082ffcdfa 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -179,8 +179,9 @@ public void testReadBlobWithRetries() throws Exception { } public void testReadBlobWithReadTimeouts() { + final int maxRetries = randomInt(5); final TimeValue readTimeout = TimeValue.timeValueMillis(between(100, 200)); - final BlobContainer blobContainer = createBlobContainer(between(1, 5), readTimeout, null, null); + final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null); // HTTP server does not send a response httpServer.createContext("/bucket/read_blob_unresponsive", exchange -> {}); @@ -199,10 +200,11 @@ public void testReadBlobWithReadTimeouts() { } }); assertThat(exception.getMessage().toLowerCase(Locale.ROOT), containsString("read timed out")); + assertThat(exception.getSuppressed().length, equalTo(maxRetries)); } - public void testReadBlobWithPrematureConnectionClose() { - final BlobContainer blobContainer = createBlobContainer(between(1, 5), null, null, null); + public void testReadBlobWithNoHttpResponse() { + final BlobContainer blobContainer = createBlobContainer(randomInt(5), null, null, null); // HTTP server closes connection immediately httpServer.createContext("/bucket/read_blob_no_response", HttpExchange::close); @@ -210,6 +212,12 @@ public void testReadBlobWithPrematureConnectionClose() { Exception exception = expectThrows(SdkClientException.class, () -> blobContainer.readBlob("read_blob_no_response")); assertThat(exception.getMessage().toLowerCase(Locale.ROOT), containsString("the target server failed to respond")); assertThat(exception.getCause(), instanceOf(NoHttpResponseException.class)); + assertThat(exception.getSuppressed().length, equalTo(0)); + } + + public void testReadBlobWithPrematureConnectionClose() { + final int maxRetries = randomInt(20); + final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null); // HTTP server sends a partial response final byte[] bytes = randomBlobContent(); @@ -218,13 +226,14 @@ public void testReadBlobWithPrematureConnectionClose() { exchange.close(); }); - exception = expectThrows(ConnectionClosedException.class, () -> { + final Exception exception = expectThrows(ConnectionClosedException.class, () -> { try (InputStream stream = blobContainer.readBlob("read_blob_incomplete")) { Streams.readFully(stream); } }); assertThat(exception.getMessage().toLowerCase(Locale.ROOT), containsString("premature end of content-length delimited message body")); + assertThat(exception.getSuppressed().length, equalTo(Math.min(S3RetryingInputStream.MAX_SUPPRESSED_EXCEPTIONS, maxRetries))); } public void testWriteBlobWithRetries() throws Exception { From 3f8c20e0adf8a93444815ae1c53af51b8551bdaf Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 17 Sep 2019 11:14:00 +0100 Subject: [PATCH 12/12] Review feedback --- .../elasticsearch/repositories/s3/S3RetryingInputStream.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java index 46e9122ad1144..cb3a89316f6d7 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java @@ -38,6 +38,8 @@ * Wrapper around an S3 object that will retry the {@link GetObjectRequest} if the download fails part-way through, resuming from where * the failure occurred. This should be handled by the SDK but it isn't today. This should be revisited in the future (e.g. before removing * the {@link Version#V_7_0_0} version constant) and removed when the SDK handles retries itself. + * + * See https://github.com/aws/aws-sdk-java/issues/856 for the related SDK issue */ class S3RetryingInputStream extends InputStream { @@ -144,7 +146,7 @@ public long skip(long n) { } @Override - public synchronized void reset() { + public void reset() { throw new UnsupportedOperationException("S3RetryingInputStream does not support seeking"); }