diff --git a/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java b/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java index f852284d21b2a..4792de1427ce1 100644 --- a/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java +++ b/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java @@ -64,6 +64,7 @@ import java.util.stream.Collectors; import static org.elasticsearch.repositories.RepositoriesMetrics.METRIC_REQUESTS_TOTAL; +import static org.elasticsearch.repositories.blobstore.AbstractBlobContainerRetriesTestCase.randomRetryingPurpose; import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; @@ -360,7 +361,7 @@ public void testReadByteByByte() throws Exception { container.writeBlob(randomPurpose(), blobName, new ByteArrayInputStream(data), data.length, true); var originalDataInputStream = new ByteArrayInputStream(data); - try (var azureInputStream = container.readBlob(randomPurpose(), blobName)) { + try (var azureInputStream = container.readBlob(randomRetryingPurpose(), blobName)) { for (int i = 0; i < data.length; i++) { assertThat(originalDataInputStream.read(), is(equalTo(azureInputStream.read()))); } diff --git a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java index ab8e10ce9de27..276c7e96284c4 100644 --- a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java +++ b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java @@ -9,11 +9,8 @@ package org.elasticsearch.repositories.azure; -import com.azure.core.exception.HttpResponseException; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.Strings; import org.elasticsearch.common.blobstore.BlobContainer; @@ -26,8 +23,6 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.Nullable; -import org.elasticsearch.repositories.blobstore.RequestedRangeNotSatisfiedException; -import org.elasticsearch.rest.RestStatus; import java.io.IOException; import java.io.InputStream; @@ -67,20 +62,7 @@ private InputStream openInputStream(OperationPurpose purpose, String blobName, l // stream to it. throw new NoSuchFileException("Blob [" + blobKey + "] not found"); } - try { - return blobStore.getInputStream(purpose, blobKey, position, length); - } catch (Exception e) { - if (ExceptionsHelper.unwrap(e, HttpResponseException.class) instanceof HttpResponseException httpResponseException) { - final var httpStatusCode = httpResponseException.getResponse().getStatusCode(); - if (httpStatusCode == RestStatus.NOT_FOUND.getStatus()) { - throw new NoSuchFileException("Blob [" + blobKey + "] not found"); - } - if (httpStatusCode == RestStatus.REQUESTED_RANGE_NOT_SATISFIED.getStatus()) { - throw new RequestedRangeNotSatisfiedException(blobKey, position, length == null ? -1 : length, e); - } - } - throw new IOException("Unable to get input stream for blob [" + blobKey + "]", e); - } + return new AzureRetryingInputStream(blobStore, purpose, blobKey, position, length); } @Override diff --git a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java index bb330be6266d4..07d3762c7fdb1 100644 --- a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java +++ b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java @@ -17,7 +17,6 @@ import reactor.core.scheduler.Schedulers; import com.azure.core.http.HttpMethod; -import com.azure.core.http.rest.ResponseBase; import com.azure.core.util.BinaryData; import com.azure.core.util.FluxUtil; import com.azure.core.util.logging.ClientLogger; @@ -345,7 +344,21 @@ private static boolean isIgnorableBatchDeleteException(Throwable exception) { return false; } - public InputStream getInputStream(OperationPurpose purpose, String blob, long position, final @Nullable Long length) { + int getMaxReadRetries() { + return service.getMaxReadRetries(projectId, clientName); + } + + /** + * Get an {@link InputStream} for reading the specified blob. The returned input stream will not retry on a read failure, + * to get an input stream that implements retries use {@link AzureBlobContainer#readBlob(OperationPurpose, String, long, long)} + */ + AzureInputStream getInputStream( + OperationPurpose purpose, + String blob, + long position, + final @Nullable Long length, + @Nullable String eTag + ) throws IOException { logger.trace(() -> format("reading container [%s], blob [%s]", container, blob)); final AzureBlobServiceClient azureBlobServiceClient = getAzureBlobServiceClientClient(purpose); final BlobServiceClient syncClient = azureBlobServiceClient.getSyncClient(); @@ -360,19 +373,15 @@ public InputStream getInputStream(OperationPurpose purpose, String blob, long po totalSize = position + length; } BlobAsyncClient blobAsyncClient = asyncClient.getBlobContainerAsyncClient(container).getBlobAsyncClient(blob); - int maxReadRetries = service.getMaxReadRetries(projectId, clientName); - try { - return new AzureInputStream( - blobAsyncClient, - position, - length == null ? totalSize : length, - totalSize, - maxReadRetries, - azureBlobServiceClient.getAllocator() - ); - } catch (IOException e) { - throw new UncheckedIOException(e); - } + return new AzureInputStream( + blobAsyncClient, + position, + length == null ? totalSize : length, + totalSize, + 0, + azureBlobServiceClient.getAllocator(), + eTag + ); } public Map listBlobsByPrefix(OperationPurpose purpose, String keyPath, String prefix) throws IOException { @@ -1076,11 +1085,12 @@ RequestMetricsRecorder getMetricsRecorder() { return requestMetricsRecorder; } - private static class AzureInputStream extends InputStream { + static class AzureInputStream extends InputStream { private final CancellableRateLimitedFluxIterator cancellableRateLimitedFluxIterator; private ByteBuf byteBuf; private boolean closed; private final ByteBufAllocator allocator; + private final String eTag; private AzureInputStream( final BlobAsyncClient client, @@ -1088,15 +1098,21 @@ private AzureInputStream( long rangeLength, long contentLength, int maxRetries, - ByteBufAllocator allocator + ByteBufAllocator allocator, + @Nullable String ifMatchETag ) throws IOException { rangeLength = Math.min(rangeLength, contentLength - rangeOffset); final BlobRange range = new BlobRange(rangeOffset, rangeLength); - DownloadRetryOptions downloadRetryOptions = new DownloadRetryOptions().setMaxRetryRequests(maxRetries); - Flux byteBufFlux = client.downloadWithResponse(range, downloadRetryOptions, null, false) + final DownloadRetryOptions downloadRetryOptions = new DownloadRetryOptions().setMaxRetryRequests(maxRetries); + final BlobRequestConditions requestConditions = new BlobRequestConditions().setIfMatch(ifMatchETag); + final AtomicReference eTagRef = new AtomicReference<>(); + Flux byteBufFlux = client.downloadWithResponse(range, downloadRetryOptions, requestConditions, false) .flux() - .concatMap(ResponseBase::getValue) // it's important to use concatMap, since flatMap doesn't provide ordering - // guarantees and that's not fun to debug :( + .concatMap(response -> { + eTagRef.set(response.getDeserializedHeaders().getETag()); + return response.getValue(); + }) // it's important to use concatMap, since flatMap doesn't provide ordering + // guarantees and that's not fun to debug :( .filter(Objects::nonNull) .map(this::copyBuffer); // Sadly we have to copy the buffers since the memory is released after the flux execution // ends and we need that the byte buffer outlives that lifecycle. Since the SDK provides an @@ -1112,6 +1128,9 @@ private AzureInputStream( // blob doesn't exist byteBufFlux.subscribe(cancellableRateLimitedFluxIterator); getNextByteBuf(); + assert eTagRef.get() != null : "eTag should have been set"; + assert ifMatchETag == null || eTagRef.get().equals(ifMatchETag) : "eTag mismatch"; + this.eTag = eTagRef.get(); } private ByteBuf copyBuffer(ByteBuffer buffer) { @@ -1173,6 +1192,10 @@ public void close() { } } + public String getETag() { + return eTag; + } + private void releaseByteBuf(ByteBuf buf) { ReferenceCountUtil.safeRelease(buf); this.byteBuf = null; diff --git a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRetryingInputStream.java b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRetryingInputStream.java new file mode 100644 index 0000000000000..03a165f2a8695 --- /dev/null +++ b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRetryingInputStream.java @@ -0,0 +1,89 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.repositories.azure; + +import com.azure.core.exception.HttpResponseException; + +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.common.blobstore.OperationPurpose; +import org.elasticsearch.common.blobstore.RetryingInputStream; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.repositories.blobstore.RequestedRangeNotSatisfiedException; +import org.elasticsearch.rest.RestStatus; + +import java.io.IOException; +import java.nio.file.NoSuchFileException; + +public class AzureRetryingInputStream extends RetryingInputStream { + + protected AzureRetryingInputStream(AzureBlobStore azureBlobStore, OperationPurpose purpose, String blob, long position, Long length) + throws IOException { + super( + new AzureBlobStoreServices(azureBlobStore, purpose, blob), + purpose, + position, + length == null ? Long.MAX_VALUE - 1 : position + length + ); + } + + private record AzureBlobStoreServices(AzureBlobStore blobStore, OperationPurpose purpose, String blob) + implements + RetryingInputStream.BlobStoreServices { + + @Override + public InputStreamAtVersion getInputStreamAtVersion(@Nullable String version, long start, long end) throws IOException { + try { + final Long length = end < Long.MAX_VALUE - 1 ? end - start : null; + final AzureBlobStore.AzureInputStream inputStream = blobStore.getInputStream(purpose, blob, start, length, version); + return new InputStreamAtVersion<>(inputStream, inputStream.getETag()); + } catch (Exception e) { + if (ExceptionsHelper.unwrap(e, HttpResponseException.class) instanceof HttpResponseException httpResponseException) { + final var httpStatusCode = httpResponseException.getResponse().getStatusCode(); + if (httpStatusCode == RestStatus.NOT_FOUND.getStatus()) { + throw new NoSuchFileException("Blob [" + blob + "] not found"); + } + if (httpStatusCode == RestStatus.REQUESTED_RANGE_NOT_SATISFIED.getStatus()) { + throw new RequestedRangeNotSatisfiedException(blob, start, end == Long.MAX_VALUE - 1 ? -1 : end - start, e); + } + } + switch (e) { + case RuntimeException runtimeException -> throw runtimeException; + case IOException ioException -> throw ioException; + default -> throw new IOException("Unable to get input stream for blob [" + blob + "]", e); + } + } + } + + @Override + public void onRetryStarted(String action) { + // No metrics for Azure + } + + @Override + public void onRetrySucceeded(String action, long numberOfRetries) { + // No metrics for Azure + } + + @Override + public long getMeaningfulProgressSize() { + return Math.max(1L, blobStore.getReadChunkSize() / 100L); + } + + @Override + public int getMaxRetries() { + return blobStore.getMaxReadRetries(); + } + + @Override + public String getBlobDescription() { + return blob; + } + } +} diff --git a/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java b/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java index 9e9313249e967..6481c8250c5b3 100644 --- a/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java +++ b/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java @@ -10,6 +10,7 @@ import fixture.azure.AzureHttpHandler; +import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; import org.elasticsearch.common.Strings; @@ -43,6 +44,7 @@ import java.util.stream.Collectors; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.elasticsearch.repositories.blobstore.AbstractBlobContainerRetriesTestCase.randomRetryingPurpose; import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose; import static org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase.randomBytes; import static org.elasticsearch.test.hamcrest.OptionalMatchers.isPresentWith; @@ -114,13 +116,73 @@ public void testReadBlobWithRetries() throws Exception { }); final BlobContainer blobContainer = createBlobContainer(maxRetries); - try (InputStream inputStream = blobContainer.readBlob(randomPurpose(), "read_blob_max_retries")) { + try (InputStream inputStream = blobContainer.readBlob(randomRetryingPurpose(), "read_blob_max_retries")) { assertArrayEquals(bytes, BytesReference.toBytes(Streams.readFully(inputStream))); assertThat(countDownHead.isCountedDown(), is(true)); assertThat(countDownGet.isCountedDown(), is(true)); } } + public void testReadBlobWithFailuresMidDownload() throws IOException { + final int responsesToSend = randomIntBetween(3, 5); + final AtomicInteger responseCounter = new AtomicInteger(responsesToSend); + final byte[] blobContents = randomBlobContent(); + final String eTag = UUIDs.base64UUID(); + httpServer.createContext("/account/container/read_blob_fail_mid_stream", exchange -> { + try { + Streams.readFully(exchange.getRequestBody()); + if ("HEAD".equals(exchange.getRequestMethod())) { + exchange.getResponseHeaders().add("Content-Type", "application/octet-stream"); + exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(blobContents.length)); + exchange.getResponseHeaders().add("Content-Length", String.valueOf(blobContents.length)); + exchange.getResponseHeaders().add("x-ms-blob-type", "blockblob"); + exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1); + } else if ("GET".equals(exchange.getRequestMethod())) { + if (responseCounter.decrementAndGet() > 0) { + switch (randomIntBetween(1, 3)) { + case 1 -> { + final Integer rCode = randomFrom( + RestStatus.INTERNAL_SERVER_ERROR.getStatus(), + RestStatus.SERVICE_UNAVAILABLE.getStatus(), + RestStatus.TOO_MANY_REQUESTS.getStatus() + ); + logger.info("---> sending error: {}", rCode); + exchange.sendResponseHeaders(rCode, -1); + } + case 2 -> logger.info("---> sending no response"); + case 3 -> sendResponse(eTag, blobContents, exchange, true); + } + } else { + sendResponse(eTag, blobContents, exchange, false); + } + } + } finally { + exchange.close(); + } + }); + + final BlobContainer blobContainer = createBlobContainer(responsesToSend * 2); + try (InputStream inputStream = blobContainer.readBlob(randomRetryingPurpose(), "read_blob_fail_mid_stream")) { + assertArrayEquals(blobContents, BytesReference.toBytes(Streams.readFully(inputStream))); + } + } + + private void sendResponse(String eTag, byte[] blobContents, HttpExchange exchange, boolean partial) throws IOException { + final var ranges = getRanges(exchange); + final int start = ranges.v1().intValue(); + final int end = partial ? randomIntBetween(start, ranges.v2().intValue()) : ranges.v2().intValue(); + final var contents = Arrays.copyOfRange(blobContents, start, end + 1); + + logger.info("---> responding to: {} -> {} (sending chunk of size {})", ranges.v1(), ranges.v2(), contents.length); + exchange.getResponseHeaders().add("Content-Type", "application/octet-stream"); + exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(blobContents.length)); + exchange.getResponseHeaders().add("Content-Length", String.valueOf(blobContents.length)); + exchange.getResponseHeaders().add("x-ms-blob-type", "blockblob"); + exchange.getResponseHeaders().add("ETag", eTag); + exchange.sendResponseHeaders(RestStatus.OK.getStatus(), blobContents.length - ranges.v1().intValue()); + exchange.getResponseBody().write(contents, 0, contents.length); + } + public void testReadRangeBlobWithRetries() throws Exception { final int maxRetries = randomIntBetween(1, 5); final CountDown countDownGet = new CountDown(maxRetries); @@ -466,7 +528,7 @@ public void testRetryFromSecondaryLocationPolicies() throws Exception { } final BlobContainer blobContainer = createBlobContainer(maxRetries, secondaryHost, locationMode); - try (InputStream inputStream = blobContainer.readBlob(randomPurpose(), "read_blob_from_secondary")) { + try (InputStream inputStream = blobContainer.readBlob(randomRetryingPurpose(), "read_blob_from_secondary")) { assertArrayEquals(bytes, BytesReference.toBytes(Streams.readFully(inputStream))); // It does round robin, first tries on the primary, then on the secondary diff --git a/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureSasTokenTests.java b/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureSasTokenTests.java index b10472688e3b0..0144345ce489a 100644 --- a/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureSasTokenTests.java +++ b/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureSasTokenTests.java @@ -24,7 +24,7 @@ import static org.elasticsearch.repositories.azure.AzureStorageSettings.ACCOUNT_SETTING; import static org.elasticsearch.repositories.azure.AzureStorageSettings.SAS_TOKEN_SETTING; -import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose; +import static org.elasticsearch.repositories.blobstore.AbstractBlobContainerRetriesTestCase.randomRetryingPurpose; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; @@ -78,7 +78,7 @@ public void testSasTokenIsUsedAsProvidedInSettings() throws Exception { }); final BlobContainer blobContainer = createBlobContainer(maxRetries, null, LocationMode.PRIMARY_ONLY, clientName, secureSettings); - try (InputStream inputStream = blobContainer.readBlob(randomPurpose(), "sas_test")) { + try (InputStream inputStream = blobContainer.readBlob(randomRetryingPurpose(), "sas_test")) { assertArrayEquals(bytes, BytesReference.toBytes(Streams.readFully(inputStream))); } } diff --git a/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java b/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java index 2532a9212f9e2..3fec77c962b87 100644 --- a/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java +++ b/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java @@ -13,7 +13,6 @@ import fixture.gcs.GoogleCloudStorageHttpHandler; import fixture.gcs.TestUtils; -import com.google.api.gax.retrying.RetrySettings; import com.google.cloud.http.HttpTransportOptions; import com.google.cloud.storage.StorageOptions; import com.google.cloud.storage.StorageRetryStrategy; @@ -60,9 +59,11 @@ import java.util.Map; import static org.elasticsearch.common.io.Streams.readFully; +import static org.elasticsearch.repositories.blobstore.AbstractBlobContainerRetriesTestCase.randomRetryingPurpose; import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CREDENTIALS_FILE_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.ENDPOINT_SETTING; +import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.MAX_RETRIES_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.TOKEN_URI_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.BASE_PATH; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.BUCKET; @@ -118,6 +119,7 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { settings.put(super.nodeSettings(nodeOrdinal, otherSettings)); settings.put(ENDPOINT_SETTING.getConcreteSettingForNamespace("test").getKey(), httpServerUrl()); settings.put(TOKEN_URI_SETTING.getConcreteSettingForNamespace("test").getKey(), httpServerUrl() + "/token"); + settings.put(MAX_RETRIES_SETTING.getConcreteSettingForNamespace("test").getKey(), 6); final MockSecureSettings secureSettings = new MockSecureSettings(); final byte[] serviceAccount = TestUtils.createServiceAccount(random()); @@ -195,7 +197,7 @@ public void testWriteReadLarge() throws IOException { random().nextBytes(data); writeBlob(container, "foobar", new BytesArray(data), false); } - try (InputStream stream = container.readBlob(randomPurpose(), "foobar")) { + try (InputStream stream = container.readBlob(randomRetryingPurpose(), "foobar")) { BytesRefBuilder target = new BytesRefBuilder(); while (target.length() < data.length) { byte[] buffer = new byte[scaledRandomIntBetween(1, data.length - target.length())]; @@ -218,7 +220,7 @@ public void testWriteFileMultipleOfChunkSize() throws IOException { byte[] initialValue = randomByteArrayOfLength(uploadSize); container.writeBlob(randomPurpose(), key, new BytesArray(initialValue), true); - BytesReference reference = readFully(container.readBlob(randomPurpose(), key)); + BytesReference reference = readFully(container.readBlob(randomRetryingPurpose(), key)); assertEquals(new BytesArray(initialValue), reference); container.deleteBlobsIgnoringIfNotExists(randomPurpose(), Iterators.single(key)); @@ -242,24 +244,18 @@ protected GoogleCloudStorageService createStorageService(ClusterService clusterS @Override StorageOptions createStorageOptions( final GoogleCloudStorageClientSettings gcsClientSettings, - final HttpTransportOptions httpTransportOptions + final HttpTransportOptions httpTransportOptions, + final RetryBehaviour retryBehaviour ) { - StorageOptions options = super.createStorageOptions(gcsClientSettings, httpTransportOptions); + StorageOptions options = super.createStorageOptions(gcsClientSettings, httpTransportOptions, retryBehaviour); return options.toBuilder() .setStorageRetryStrategy(StorageRetryStrategy.getLegacyStorageRetryStrategy()) - .setHost(options.getHost()) - .setCredentials(options.getCredentials()) .setRetrySettings( - RetrySettings.newBuilder() - .setTotalTimeout(options.getRetrySettings().getTotalTimeout()) + options.getRetrySettings() + .toBuilder() .setInitialRetryDelay(Duration.ofMillis(10L)) - .setRetryDelayMultiplier(options.getRetrySettings().getRetryDelayMultiplier()) .setMaxRetryDelay(Duration.ofSeconds(1L)) - .setMaxAttempts(0) .setJittered(false) - .setInitialRpcTimeout(options.getRetrySettings().getInitialRpcTimeout()) - .setRpcTimeoutMultiplier(options.getRetrySettings().getRpcTimeoutMultiplier()) - .setMaxRpcTimeout(options.getRetrySettings().getMaxRpcTimeout()) .build() ) .build(); diff --git a/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageThirdPartyTests.java b/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageThirdPartyTests.java index 520f922280eb0..0572912e96429 100644 --- a/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageThirdPartyTests.java +++ b/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageThirdPartyTests.java @@ -33,6 +33,7 @@ import java.util.Collection; import static org.elasticsearch.common.io.Streams.readFully; +import static org.elasticsearch.repositories.blobstore.AbstractBlobContainerRetriesTestCase.randomRetryingPurpose; import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose; import static org.hamcrest.Matchers.blankOrNullString; import static org.hamcrest.Matchers.containsString; @@ -115,7 +116,7 @@ public void testResumeAfterUpdate() { executeOnBlobStore(repo, container -> { container.writeBlob(randomPurpose(), blobKey, new BytesArray(initialValue), true); - try (InputStream inputStream = container.readBlob(randomPurpose(), blobKey)) { + try (InputStream inputStream = container.readBlob(randomRetryingPurpose(), blobKey)) { // Trigger the first request for the blob, partially read it int read = inputStream.read(); assert read != -1; diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java index 2af940c19bbac..f02692a742a29 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java @@ -139,8 +139,32 @@ class GoogleCloudStorageBlobStore implements BlobStore { this.casBackoffPolicy = casBackoffPolicy; } - private MeteredStorage client() throws IOException { - return storageService.client(projectId, clientName, repositoryName, statsCollector); + /** + * Get a client that will retry according to its configured settings + * + * @return A client + */ + MeteredStorage client() throws IOException { + return storageService.client( + projectId, + clientName, + repositoryName, + statsCollector, + GoogleCloudStorageService.RetryBehaviour.ClientConfigured + ); + } + + /** + * Get a client that will not retry on failure + * + * @return A client with max retries configured to zero + */ + MeteredStorage clientNoRetries() throws IOException { + return storageService.client(projectId, clientName, repositoryName, statsCollector, GoogleCloudStorageService.RetryBehaviour.None); + } + + int getMaxRetries() { + return storageService.clientSettings(projectId, clientName).getMaxRetries(); } @Override @@ -229,7 +253,7 @@ boolean blobExists(OperationPurpose purpose, String blobName) throws IOException * @return the InputStream used to read the blob's content */ InputStream readBlob(OperationPurpose purpose, String blobName) throws IOException { - return new GoogleCloudStorageRetryingInputStream(purpose, client(), BlobId.of(bucketName, blobName)); + return new GoogleCloudStorageRetryingInputStream(this, purpose, BlobId.of(bucketName, blobName)); } /** @@ -252,8 +276,8 @@ InputStream readBlob(OperationPurpose purpose, String blobName, long position, l return new ByteArrayInputStream(new byte[0]); } else { return new GoogleCloudStorageRetryingInputStream( + this, purpose, - client(), BlobId.of(bucketName, blobName), position, Math.addExact(position, length - 1) diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageClientSettings.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageClientSettings.java index 582e0b48121fa..10d557ce2a8aa 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageClientSettings.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageClientSettings.java @@ -120,6 +120,17 @@ public class GoogleCloudStorageClientSettings { () -> PROXY_HOST_SETTING ); + /** + * The maximum number of retries to use when a GCS request fails. + *

+ * Default to 5 to match {@link com.google.cloud.ServiceOptions#getDefaultRetrySettings()} + */ + static final Setting.AffixSetting MAX_RETRIES_SETTING = Setting.affixKeySetting( + PREFIX, + "max_retries", + (key) -> Setting.intSetting(key, 5, 0, Setting.Property.NodeScope) + ); + /** The credentials used by the client to connect to the Storage endpoint. */ private final ServiceAccountCredentials credential; @@ -144,6 +155,8 @@ public class GoogleCloudStorageClientSettings { @Nullable private final Proxy proxy; + private final int maxRetries; + GoogleCloudStorageClientSettings( final ServiceAccountCredentials credential, final String endpoint, @@ -152,7 +165,8 @@ public class GoogleCloudStorageClientSettings { final TimeValue readTimeout, final String applicationName, final URI tokenUri, - final Proxy proxy + final Proxy proxy, + final int maxRetries ) { this.credential = credential; this.endpoint = endpoint; @@ -162,6 +176,7 @@ public class GoogleCloudStorageClientSettings { this.applicationName = applicationName; this.tokenUri = tokenUri; this.proxy = proxy; + this.maxRetries = maxRetries; } public ServiceAccountCredentials getCredential() { @@ -197,6 +212,10 @@ public Proxy getProxy() { return proxy; } + public int getMaxRetries() { + return maxRetries; + } + @Override public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; @@ -249,7 +268,8 @@ static GoogleCloudStorageClientSettings getClientSettings(final Settings setting getConfigValue(settings, clientName, READ_TIMEOUT_SETTING), getConfigValue(settings, clientName, APPLICATION_NAME_SETTING), getConfigValue(settings, clientName, TOKEN_URI_SETTING), - proxy + proxy, + getConfigValue(settings, clientName, MAX_RETRIES_SETTING) ); } diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java index 2c35aac1a46e8..5b930337bc447 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java @@ -93,7 +93,8 @@ public List> getSettings() { GoogleCloudStorageClientSettings.TOKEN_URI_SETTING, GoogleCloudStorageClientSettings.PROXY_TYPE_SETTING, GoogleCloudStorageClientSettings.PROXY_HOST_SETTING, - GoogleCloudStorageClientSettings.PROXY_PORT_SETTING + GoogleCloudStorageClientSettings.PROXY_PORT_SETTING, + GoogleCloudStorageClientSettings.MAX_RETRIES_SETTING ); } diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java index a74e86d8ee677..e3b4cf55c1594 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java @@ -9,15 +9,14 @@ package org.elasticsearch.repositories.gcs; import com.google.api.client.http.HttpResponse; -import com.google.cloud.BaseService; -import com.google.cloud.RetryHelper; import com.google.cloud.storage.BlobId; import com.google.cloud.storage.StorageException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.common.blobstore.OperationPurpose; -import org.elasticsearch.core.IOUtils; +import org.elasticsearch.common.blobstore.RetryingInputStream; +import org.elasticsearch.core.Nullable; import org.elasticsearch.repositories.blobstore.RequestedRangeNotSatisfiedException; import org.elasticsearch.rest.RestStatus; @@ -25,129 +24,131 @@ import java.io.IOException; import java.io.InputStream; import java.nio.file.NoSuchFileException; -import java.util.ArrayList; -import java.util.List; - -import static org.elasticsearch.core.Strings.format; /** * Wrapper around reads from GCS that will retry blob downloads that fail part-way through, resuming from where the failure occurred. * This should be handled by the SDK but it isn't today. This should be revisited in the future (e.g. before removing * the {@link org.elasticsearch.Version#V_7_0_0} version constant) and removed if the SDK handles retries itself in the future. */ -class GoogleCloudStorageRetryingInputStream extends InputStream { +class GoogleCloudStorageRetryingInputStream extends RetryingInputStream { private static final Logger logger = LogManager.getLogger(GoogleCloudStorageRetryingInputStream.class); - static final int MAX_SUPPRESSED_EXCEPTIONS = 10; - - private final OperationPurpose purpose; - private final MeteredStorage client; - private final BlobId blobId; - private final long start; - private final long end; - private final int maxAttempts; - private InputStream currentStream; - private int attempt = 1; - private List failures = new ArrayList<>(MAX_SUPPRESSED_EXCEPTIONS); - private long currentOffset; - private boolean closed; - private Long lastGeneration; - // Used for testing only - GoogleCloudStorageRetryingInputStream(OperationPurpose purpose, MeteredStorage client, BlobId blobId) throws IOException { - this(purpose, client, blobId, 0, Long.MAX_VALUE - 1); + GoogleCloudStorageRetryingInputStream(GoogleCloudStorageBlobStore blobStore, OperationPurpose purpose, BlobId blobId) + throws IOException { + this(blobStore, purpose, blobId, 0, Long.MAX_VALUE - 1); } // Used for testing only - GoogleCloudStorageRetryingInputStream(OperationPurpose purpose, MeteredStorage client, BlobId blobId, long start, long end) - throws IOException { - if (start < 0L) { - throw new IllegalArgumentException("start must be non-negative"); - } - if (end < start || end == Long.MAX_VALUE) { - throw new IllegalArgumentException("end must be >= start and not Long.MAX_VALUE"); - } - this.purpose = purpose; - this.client = client; - this.blobId = blobId; - this.start = start; - this.end = end; - this.maxAttempts = client.getOptions().getRetrySettings().getMaxAttempts(); - this.currentStream = openStream(); + GoogleCloudStorageRetryingInputStream( + GoogleCloudStorageBlobStore blobStore, + OperationPurpose purpose, + BlobId blobId, + long start, + long end + ) throws IOException { + super(new GoogleCloudStorageBlobStoreServices(blobStore, purpose, blobId), purpose, start, end); } - private InputStream openStream() throws IOException { - try { + private static class GoogleCloudStorageBlobStoreServices implements BlobStoreServices { + + private final GoogleCloudStorageBlobStore blobStore; + private final OperationPurpose purpose; + private final BlobId blobId; + + private GoogleCloudStorageBlobStoreServices(GoogleCloudStorageBlobStore blobStore, OperationPurpose purpose, BlobId blobId) { + this.blobStore = blobStore; + this.purpose = purpose; + this.blobId = blobId; + } + + @Override + public InputStreamAtVersion getInputStreamAtVersion(@Nullable Long lastGeneration, long start, long end) throws IOException { + final MeteredStorage client = blobStore.clientNoRetries(); try { - return RetryHelper.runWithRetries(() -> { - try { - final var meteredGet = client.meteredObjectsGet(purpose, blobId.getBucket(), blobId.getName()); - meteredGet.setReturnRawInputStream(true); - if (lastGeneration != null) { - meteredGet.setGeneration(lastGeneration); - } + try { + final var meteredGet = client.meteredObjectsGet(purpose, blobId.getBucket(), blobId.getName()); + meteredGet.setReturnRawInputStream(true); + if (lastGeneration != null) { + meteredGet.setGeneration(lastGeneration); + } - if (currentOffset > 0 || start > 0 || end < Long.MAX_VALUE - 1) { - if (meteredGet.getRequestHeaders() != null) { - meteredGet.getRequestHeaders().setRange("bytes=" + Math.addExact(start, currentOffset) + "-" + end); - } - } - final HttpResponse resp = meteredGet.executeMedia(); - // Store the generation of the first response we received, so we can detect - // if the file has changed if we need to resume - if (lastGeneration == null) { - lastGeneration = parseGenerationHeader(resp); + if (start > 0 || end < Long.MAX_VALUE - 1) { + if (meteredGet.getRequestHeaders() != null) { + meteredGet.getRequestHeaders().setRange("bytes=" + start + "-" + end); } + } + final HttpResponse resp = meteredGet.executeMedia(); + // Store the generation of the first response we received, so we can detect + // if the file has changed if we need to resume + if (lastGeneration == null) { + lastGeneration = parseGenerationHeader(resp); + } - final Long contentLength = resp.getHeaders().getContentLength(); - InputStream content = resp.getContent(); - if (contentLength != null) { - content = new ContentLengthValidatingInputStream(content, contentLength); - } - return content; - } catch (IOException e) { - throw StorageException.translate(e); + final Long contentLength = resp.getHeaders().getContentLength(); + InputStream content = resp.getContent(); + if (contentLength != null) { + content = new ContentLengthValidatingInputStream(content, contentLength); } - }, client.getOptions().getRetrySettings(), BaseService.EXCEPTION_HANDLER, client.getOptions().getClock()); - } catch (RetryHelper.RetryHelperException e) { - throw StorageException.translateAndThrow(e); - } - } catch (StorageException storageException) { - if (storageException.getCode() == RestStatus.NOT_FOUND.getStatus()) { - if (lastGeneration != null) { - throw addSuppressedExceptions( - new NoSuchFileException( + return new InputStreamAtVersion<>(content, lastGeneration); + } catch (IOException e) { + throw StorageException.translate(e); + } + } catch (StorageException storageException) { + if (storageException.getCode() == RestStatus.NOT_FOUND.getStatus()) { + if (lastGeneration != null) { + throw new NoSuchFileException( "Blob object [" + blobId.getName() + "] generation [" + lastGeneration + "] unavailable on resume (contents changed, or object deleted): " + storageException.getMessage() - ) - ); - } else { - throw addSuppressedExceptions( - new NoSuchFileException("Blob object [" + blobId.getName() + "] not found: " + storageException.getMessage()) - ); + ); + } else { + throw new NoSuchFileException("Blob object [" + blobId.getName() + "] not found: " + storageException.getMessage()); + } } - } - if (storageException.getCode() == RestStatus.REQUESTED_RANGE_NOT_SATISFIED.getStatus()) { - long currentPosition = Math.addExact(start, currentOffset); - throw addSuppressedExceptions( - new RequestedRangeNotSatisfiedException( + if (storageException.getCode() == RestStatus.REQUESTED_RANGE_NOT_SATISFIED.getStatus()) { + throw new RequestedRangeNotSatisfiedException( blobId.getName(), - currentPosition, - (end < Long.MAX_VALUE - 1) ? end - currentPosition + 1 : end, + start, + (end < Long.MAX_VALUE - 1) ? end - start + 1 : end, storageException - ) - ); + ); + } + throw storageException; } - throw addSuppressedExceptions(storageException); + } + + @Override + public void onRetryStarted(String action) { + // No retry metrics for GCS + } + + @Override + public void onRetrySucceeded(String action, long numberOfRetries) { + // No retry metrics for GCS + } + + @Override + public long getMeaningfulProgressSize() { + return Math.max(1L, GoogleCloudStorageBlobStore.SDK_DEFAULT_CHUNK_SIZE / 100L); + } + + @Override + public int getMaxRetries() { + return blobStore.getMaxRetries(); + } + + @Override + public String getBlobDescription() { + return blobId.toString(); } } - private Long parseGenerationHeader(HttpResponse response) { + private static Long parseGenerationHeader(HttpResponse response) { final String generationHeader = response.getHeaders().getFirstHeaderStringValue("x-goog-generation"); if (generationHeader != null) { try { @@ -213,90 +214,11 @@ private void checkContentLengthOnEOF() throws IOException { } } - @Override - public int read() throws IOException { - ensureOpen(); - while (true) { - try { - final int result = currentStream.read(); - currentOffset += 1; - return result; - } catch (IOException e) { - reopenStreamOrFail(StorageException.translate(e)); - } - } - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - ensureOpen(); - while (true) { - try { - final int bytesRead = currentStream.read(b, off, len); - if (bytesRead == -1) { - return -1; - } - currentOffset += bytesRead; - return bytesRead; - } catch (IOException e) { - reopenStreamOrFail(StorageException.translate(e)); - } - } - } - /** * Close the current stream, used to test resume */ // @VisibleForTesting void closeCurrentStream() throws IOException { - currentStream.close(); - } - - private void ensureOpen() { - if (closed) { - assert false : "using GoogleCloudStorageRetryingInputStream after close"; - throw new IllegalStateException("using GoogleCloudStorageRetryingInputStream after close"); - } - } - - private void reopenStreamOrFail(StorageException e) throws IOException { - if (attempt >= maxAttempts) { - throw addSuppressedExceptions(e); - } - logger.debug( - () -> format("failed reading [%s] at offset [%s], attempt [%s] of [%s], retrying", blobId, currentOffset, attempt, maxAttempts), - e - ); - attempt += 1; - if (failures.size() < MAX_SUPPRESSED_EXCEPTIONS) { - failures.add(e); - } - IOUtils.closeWhileHandlingException(currentStream); - currentStream = openStream(); - } - - @Override - public void close() throws IOException { - currentStream.close(); - closed = true; - } - - @Override - public long skip(long n) throws IOException { - // This could be optimized on a failure by re-opening stream directly to the preferred location. However, it is rarely called, - // so for now we will rely on the default implementation which just discards bytes by reading. - return super.skip(n); - } - - @Override - public void reset() { - throw new UnsupportedOperationException("GoogleCloudStorageRetryingInputStream does not support seeking"); - } - - private T addSuppressedExceptions(T e) { - for (StorageException failure : failures) { - e.addSuppressed(failure); - } - return e; + currentStream.inputStream().close(); } } diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageService.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageService.java index cb6b8b334181e..b7f8423f53c2a 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageService.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageService.java @@ -15,6 +15,7 @@ import com.google.api.client.http.javanet.NetHttpTransport; import com.google.api.client.util.SecurityUtils; import com.google.api.gax.retrying.ResultRetryAlgorithm; +import com.google.api.gax.retrying.RetrySettings; import com.google.auth.oauth2.GoogleCredentials; import com.google.auth.oauth2.ServiceAccountCredentials; import com.google.cloud.ServiceOptions; @@ -94,6 +95,23 @@ public void refreshAndClearCache(Map c clientsManager.refreshAndClearCacheForClusterClients(clientsSettings); } + public enum RetryBehaviour { + ClientConfigured { + @Override + public RetrySettings createRetrySettings(GoogleCloudStorageClientSettings settings) { + return ServiceOptions.getDefaultRetrySettings().toBuilder().setMaxAttempts(settings.getMaxRetries() + 1).build(); + } + }, + None { + @Override + public RetrySettings createRetrySettings(GoogleCloudStorageClientSettings settings) { + return ServiceOptions.getNoRetrySettings(); + } + }; + + public abstract RetrySettings createRetrySettings(GoogleCloudStorageClientSettings settings); + } + /** * Attempts to retrieve a client from the cache. If the client does not exist it * will be created from the latest settings and will populate the cache. The @@ -111,9 +129,14 @@ public MeteredStorage client( @Nullable final ProjectId projectId, final String clientName, final String repositoryName, - final GcsRepositoryStatsCollector statsCollector + final GcsRepositoryStatsCollector statsCollector, + final RetryBehaviour retryBehaviour ) throws IOException { - return clientsManager.client(projectId, clientName, repositoryName, statsCollector); + return clientsManager.client(projectId, clientName, repositoryName, statsCollector, retryBehaviour); + } + + GoogleCloudStorageClientSettings clientSettings(@Nullable ProjectId projectId, final String clientName) { + return clientsManager.clientSettings(projectId, clientName); } /** @@ -138,8 +161,11 @@ GoogleCloudStorageClientsManager getClientsManager() { * @return a new client storage instance that can be used to manage objects * (blobs) */ - private MeteredStorage createClient(GoogleCloudStorageClientSettings gcsClientSettings, GcsRepositoryStatsCollector statsCollector) - throws IOException { + private MeteredStorage createClient( + GoogleCloudStorageClientSettings gcsClientSettings, + GcsRepositoryStatsCollector statsCollector, + RetryBehaviour retryBehaviour + ) throws IOException { final NetHttpTransport.Builder builder = new NetHttpTransport.Builder(); // requires java.lang.RuntimePermission "setFactory" @@ -183,13 +209,14 @@ public HttpRequestInitializer getHttpRequestInitializer(ServiceOptions ser } }; - final StorageOptions storageOptions = createStorageOptions(gcsClientSettings, httpTransportOptions); + final StorageOptions storageOptions = createStorageOptions(gcsClientSettings, httpTransportOptions, retryBehaviour); return new MeteredStorage(storageOptions.getService(), statsCollector); } StorageOptions createStorageOptions( final GoogleCloudStorageClientSettings gcsClientSettings, - final HttpTransportOptions httpTransportOptions + final HttpTransportOptions httpTransportOptions, + final RetryBehaviour retryBehaviour ) { final StorageOptions.Builder storageOptionsBuilder = StorageOptions.newBuilder() .setStorageRetryStrategy(getRetryStrategy()) @@ -198,7 +225,8 @@ StorageOptions createStorageOptions( return Strings.hasLength(gcsClientSettings.getApplicationName()) ? Map.of("user-agent", gcsClientSettings.getApplicationName()) : Map.of(); - }); + }) + .setRetrySettings(retryBehaviour.createRetrySettings(gcsClientSettings)); if (Strings.hasLength(gcsClientSettings.getHost())) { storageOptionsBuilder.setHost(gcsClientSettings.getHost()); } @@ -403,12 +431,25 @@ void refreshAndClearCacheForClusterClients(Map clientCache = emptyMap(); + protected volatile Map clientCache = emptyMap(); /** * Get the current client settings for all clients in this holder. @@ -476,46 +519,57 @@ abstract class ClientsHolder { * @return a cached client storage instance that can be used to manage objects * (blobs) */ - MeteredStorage client(final String clientName, final String repositoryName, final GcsRepositoryStatsCollector statsCollector) - throws IOException { + MeteredStorage client( + final String clientName, + final String repositoryName, + final GcsRepositoryStatsCollector statsCollector, + final RetryBehaviour retryBehaviour + ) throws IOException { + final var cacheKey = new CacheKey(repositoryName, retryBehaviour); { - final MeteredStorage storage = clientCache.get(repositoryName); + final MeteredStorage storage = clientCache.get(cacheKey); if (storage != null) { return storage; } } synchronized (this) { - final MeteredStorage existing = clientCache.get(repositoryName); + final MeteredStorage existing = clientCache.get(cacheKey); if (existing != null) { return existing; } - final GoogleCloudStorageClientSettings settings = allClientSettings().get(clientName); - - if (settings == null) { - throw new IllegalArgumentException( - "Unknown client name [" + clientName + "]. Existing client configs: " + allClientSettings().keySet() - ); - } + final GoogleCloudStorageClientSettings settings = clientSettings(clientName); logger.debug(() -> format("creating GCS client with client_name [%s], endpoint [%s]", clientName, settings.getHost())); - final MeteredStorage storage = createClient(settings, statsCollector); - clientCache = Maps.copyMapWithAddedEntry(clientCache, repositoryName, storage); + final MeteredStorage storage = createClient(settings, statsCollector, retryBehaviour); + clientCache = Maps.copyMapWithAddedEntry(clientCache, cacheKey, storage); return storage; } } + public GoogleCloudStorageClientSettings clientSettings(String clientName) { + final GoogleCloudStorageClientSettings settings = allClientSettings().get(clientName); + + if (settings == null) { + throw new IllegalArgumentException( + "Unknown client name [" + clientName + "]. Existing client configs: " + allClientSettings().keySet() + ); + } + + return settings; + } + synchronized void closeRepositoryClients(String repositoryName) { clientCache = clientCache.entrySet() .stream() - .filter(entry -> entry.getKey().equals(repositoryName) == false) + .filter(entry -> entry.getKey().repositoryName().equals(repositoryName) == false) .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue)); } // package private for tests final boolean hasCachedClientForRepository(String repositoryName) { - return clientCache.containsKey(repositoryName); + return clientCache.keySet().stream().anyMatch(key -> key.repositoryName().equals(repositoryName)); } } diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java index a09aa8142ad63..48cf6b5d96d90 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java @@ -85,6 +85,7 @@ import static org.elasticsearch.repositories.gcs.GoogleCloudStorageBlobStore.MAX_DELETES_PER_BATCH; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CREDENTIALS_FILE_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.ENDPOINT_SETTING; +import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.MAX_RETRIES_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.READ_TIMEOUT_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.TOKEN_URI_SETTING; import static org.hamcrest.Matchers.anyOf; @@ -146,6 +147,9 @@ protected BlobContainer createBlobContainer( if (readTimeout != null) { clientSettings.put(READ_TIMEOUT_SETTING.getConcreteSettingForNamespace(client).getKey(), readTimeout); } + if (maxRetries != null) { + clientSettings.put(MAX_RETRIES_SETTING.getConcreteSettingForNamespace(client).getKey(), maxRetries); + } final MockSecureSettings secureSettings = new MockSecureSettings(); secureSettings.setFile(CREDENTIALS_FILE_SETTING.getConcreteSettingForNamespace(client).getKey(), createServiceAccount(random())); @@ -155,7 +159,8 @@ protected BlobContainer createBlobContainer( @Override StorageOptions createStorageOptions( final GoogleCloudStorageClientSettings gcsClientSettings, - final HttpTransportOptions httpTransportOptions + final HttpTransportOptions httpTransportOptions, + final RetryBehaviour retryBehaviour ) { final HttpTransportOptions requestCountingHttpTransportOptions = new HttpTransportOptions( HttpTransportOptions.newBuilder() @@ -182,7 +187,11 @@ public HttpRequestInitializer getHttpRequestInitializer(ServiceOptions ser }; } }; - final StorageOptions options = super.createStorageOptions(gcsClientSettings, requestCountingHttpTransportOptions); + final StorageOptions options = super.createStorageOptions( + gcsClientSettings, + requestCountingHttpTransportOptions, + retryBehaviour + ); final RetrySettings.Builder retrySettingsBuilder = RetrySettings.newBuilder() .setTotalTimeout(options.getRetrySettings().getTotalTimeout()) .setInitialRetryDelay(Duration.ofMillis(10L)) @@ -191,10 +200,8 @@ public HttpRequestInitializer getHttpRequestInitializer(ServiceOptions ser .setJittered(false) .setInitialRpcTimeout(Duration.ofSeconds(1)) .setRpcTimeoutMultiplier(options.getRetrySettings().getRpcTimeoutMultiplier()) - .setMaxRpcTimeout(Duration.ofSeconds(1)); - if (maxRetries != null) { - retrySettingsBuilder.setMaxAttempts(maxRetries + 1); - } + .setMaxRpcTimeout(Duration.ofSeconds(1)) + .setMaxAttempts(options.getRetrySettings().getMaxAttempts()); return options.toBuilder() .setStorageRetryStrategy(getRetryStrategy()) .setHost(options.getHost()) @@ -273,7 +280,7 @@ public void testReadLargeBlobWithRetries() throws Exception { exchange.close(); }); - try (InputStream inputStream = blobContainer.readBlob(randomPurpose(), "large_blob_retries")) { + try (InputStream inputStream = blobContainer.readBlob(randomRetryingPurpose(), "large_blob_retries")) { assertArrayEquals(bytes, BytesReference.toBytes(Streams.readFully(inputStream))); } } @@ -600,10 +607,10 @@ public void testContentsChangeWhileStreaming() throws IOException { byte[] initialValue = randomByteArrayOfLength(enoughBytesToNotBeEntirelyBuffered); container.writeBlob(randomPurpose(), key, new BytesArray(initialValue), true); - BytesReference reference = readFully(container.readBlob(randomPurpose(), key)); + BytesReference reference = readFully(container.readBlob(randomRetryingPurpose(), key)); assertEquals(new BytesArray(initialValue), reference); - try (InputStream inputStream = container.readBlob(randomPurpose(), key)) { + try (InputStream inputStream = container.readBlob(randomRetryingPurpose(), key)) { // Trigger the first chunk to load int read = inputStream.read(); assert read != -1; diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerStatsTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerStatsTests.java index 7e2b4348823fd..6938393077b0b 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerStatsTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerStatsTests.java @@ -53,6 +53,7 @@ import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.APPLICATION_NAME_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CONNECT_TIMEOUT_SETTING; +import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.MAX_RETRIES_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.PROJECT_ID_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.READ_TIMEOUT_SETTING; import static org.elasticsearch.repositories.gcs.StorageOperation.GET; @@ -253,7 +254,8 @@ private ContainerAndBlobStore createBlobContainer(final String repositoryName) t READ_TIMEOUT_SETTING.getDefault(Settings.EMPTY), APPLICATION_NAME_SETTING.getDefault(Settings.EMPTY), new URI(getEndpointForServer(httpServer) + "/token"), - null + null, + MAX_RETRIES_SETTING.getDefault(Settings.EMPTY) ); googleCloudStorageService.refreshAndClearCache(Map.of(clientName, clientSettings)); final GoogleCloudStorageBlobStore blobStore = new GoogleCloudStorageBlobStore( diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreContainerTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreContainerTests.java index 849c09c506078..e6c854c23be9d 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreContainerTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreContainerTests.java @@ -82,8 +82,15 @@ public void testDeleteBlobsIgnoringIfNotExistsThrowsIOException() throws Excepti final MeteredStorage meteredStorage = new MeteredStorage(storage, storageRpc, new GcsRepositoryStatsCollector()); final GoogleCloudStorageService storageService = mock(GoogleCloudStorageService.class); - when(storageService.client(eq(ProjectId.DEFAULT), any(String.class), any(String.class), any(GcsRepositoryStatsCollector.class))) - .thenReturn(meteredStorage); + when( + storageService.client( + eq(ProjectId.DEFAULT), + any(String.class), + any(String.class), + any(GcsRepositoryStatsCollector.class), + any(GoogleCloudStorageService.RetryBehaviour.class) + ) + ).thenReturn(meteredStorage); try ( BlobStore store = new GoogleCloudStorageBlobStore( diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageClientSettingsTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageClientSettingsTests.java index ac69afe7def6a..e0dcb5040ff8d 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageClientSettingsTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageClientSettingsTests.java @@ -40,6 +40,7 @@ import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CONNECT_TIMEOUT_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CREDENTIALS_FILE_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.ENDPOINT_SETTING; +import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.MAX_RETRIES_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.PROJECT_ID_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.READ_TIMEOUT_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.getClientSettings; @@ -123,7 +124,8 @@ public void testProjectIdDefaultsToCredentials() throws Exception { READ_TIMEOUT_SETTING.getDefault(Settings.EMPTY), APPLICATION_NAME_SETTING.getDefault(Settings.EMPTY), new URI(""), - null + null, + MAX_RETRIES_SETTING.getDefault(Settings.EMPTY) ); assertEquals(credential.getProjectId(), googleCloudStorageClientSettings.getProjectId()); } @@ -140,7 +142,8 @@ public void testLoadsProxySettings() throws Exception { READ_TIMEOUT_SETTING.getDefault(Settings.EMPTY), APPLICATION_NAME_SETTING.getDefault(Settings.EMPTY), new URI(""), - proxy + proxy, + MAX_RETRIES_SETTING.getDefault(Settings.EMPTY) ); assertEquals(proxy, googleCloudStorageClientSettings.getProxy()); } @@ -278,6 +281,13 @@ private static GoogleCloudStorageClientSettings randomClient( applicationName = APPLICATION_NAME_SETTING.getDefault(Settings.EMPTY); } + int maxRetries; + if (randomBoolean()) { + maxRetries = randomIntBetween(0, 5); + } else { + maxRetries = MAX_RETRIES_SETTING.getDefault(Settings.EMPTY); + } + return new GoogleCloudStorageClientSettings( credential, endpoint, @@ -286,7 +296,8 @@ private static GoogleCloudStorageClientSettings randomClient( readTimeout, applicationName, new URI(""), - null + null, + maxRetries ); } diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageClientsManagerTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageClientsManagerTests.java index a3a538b82aa6f..b1c9e85834f3e 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageClientsManagerTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageClientsManagerTests.java @@ -147,32 +147,42 @@ public void testClientsLifeCycleForSingleProject() throws Exception { final ProjectId projectId = randomUniqueProjectId(); final String clientName = randomFrom(clientNames); final String anotherClientName = randomValueOtherThan(clientName, () -> randomFrom(clientNames)); + final GoogleCloudStorageService.RetryBehaviour retryBehaviour = randomRetryBehaviour(); + final GoogleCloudStorageService.RetryBehaviour otherRetryBehaviour = randomValueOtherThan( + retryBehaviour, + GoogleCloudStorageClientsManagerTests::randomRetryBehaviour + ); // Configure project secrets for one client - assertClientNotFound(projectId, clientName); + assertClientNotFound(projectId, clientName, retryBehaviour); updateProjectInClusterState(projectId, newProjectClientsSecrets(projectId, clientName)); { assertProjectClientSettings(projectId, clientName); // Retrieve client for the 1st time - final var initialClient = getClientFromManager(projectId, clientName); + final var initialClient = getClientFromManager(projectId, clientName, retryBehaviour); assertClientCredentials(projectId, clientName, initialClient); // Client is cached when retrieved again - assertThat(initialClient, sameInstance(getClientFromManager(projectId, clientName))); + assertThat(initialClient, sameInstance(getClientFromManager(projectId, clientName, retryBehaviour))); // Client not configured cannot be accessed, - assertClientNotFound(projectId, anotherClientName); + assertClientNotFound(projectId, anotherClientName, retryBehaviour); // Update client secrets should release and recreate the client updateProjectInClusterState(projectId, newProjectClientsSecrets(projectId, clientName, anotherClientName)); assertProjectClientSettings(projectId, clientName, anotherClientName); - final var clientUpdated = getClientFromManager(projectId, clientName); + final var clientUpdated = getClientFromManager(projectId, clientName, retryBehaviour); assertThat(clientUpdated, not(sameInstance(initialClient))); // A different client for a different client name - final var anotherClient = getClientFromManager(projectId, anotherClientName); + final var anotherClient = getClientFromManager(projectId, anotherClientName, retryBehaviour); assertClientCredentials(projectId, anotherClientName, anotherClient); assertThat(anotherClient, not(sameInstance(clientUpdated))); + + // A different client for a different retry behaviour + final var anotherRetryBehaviour = getClientFromManager(projectId, clientName, otherRetryBehaviour); + assertClientCredentials(projectId, clientName, anotherRetryBehaviour); + assertThat(anotherRetryBehaviour, not(sameInstance(clientUpdated))); } // Remove project secrets or the entire project @@ -181,16 +191,17 @@ public void testClientsLifeCycleForSingleProject() throws Exception { } else { removeProjectFromClusterState(projectId); } - assertClientNotFound(projectId, clientName); + assertClientNotFound(projectId, clientName, retryBehaviour); assertThat(gcsClientsManager.getPerProjectClientsHolders(), not(hasKey(projectId))); } public void testClientsWithNoCredentialsAreFilteredOut() throws IOException { final ProjectId projectId = randomUniqueProjectId(); + final GoogleCloudStorageService.RetryBehaviour retryBehaviour = randomRetryBehaviour(); updateProjectInClusterState(projectId, newProjectClientsSecrets(projectId, clientNames.toArray(String[]::new))); for (var clientName : clientNames) { - assertNotNull(getClientFromManager(projectId, clientName)); + assertNotNull(getClientFromManager(projectId, clientName, retryBehaviour)); } final List clientsWithIncorrectSecretsConfig = randomNonEmptySubsetOf(clientNames); @@ -205,15 +216,16 @@ public void testClientsWithNoCredentialsAreFilteredOut() throws IOException { for (var clientName : clientNames) { if (clientsWithIncorrectSecretsConfig.contains(clientName)) { - assertClientNotFound(projectId, clientName); + assertClientNotFound(projectId, clientName, retryBehaviour); } else { - assertNotNull(getClientFromManager(projectId, clientName)); + assertNotNull(getClientFromManager(projectId, clientName, retryBehaviour)); } } } public void testClientsForMultipleProjects() throws InterruptedException { final List projectIds = randomList(2, 8, ESTestCase::randomUniqueProjectId); + final GoogleCloudStorageService.RetryBehaviour retryBehaviour = randomRetryBehaviour(); final List threads = projectIds.stream().map(projectId -> new Thread(() -> { final int iterations = between(1, 3); @@ -224,7 +236,7 @@ public void testClientsForMultipleProjects() throws InterruptedException { assertProjectClientSettings(projectId, clientNames.toArray(String[]::new)); for (var clientName : shuffledList(clientNames)) { try { - final var meteredStorage = getClientFromManager(projectId, clientName); + final var meteredStorage = getClientFromManager(projectId, clientName, retryBehaviour); assertClientCredentials(projectId, clientName, meteredStorage); } catch (IOException e) { fail(e); @@ -237,7 +249,7 @@ public void testClientsForMultipleProjects() throws InterruptedException { removeProjectFromClusterState(projectId); } assertThat(gcsClientsManager.getPerProjectClientsHolders(), not(hasKey(projectId))); - clientNames.forEach(clientName -> assertClientNotFound(projectId, clientName)); + clientNames.forEach(clientName -> assertClientNotFound(projectId, clientName, retryBehaviour)); } })).toList(); @@ -251,16 +263,17 @@ public void testClusterAndProjectClients() throws IOException { final ProjectId projectId = randomUniqueProjectId(); final String clientName = randomFrom(clientNames); final boolean configureProjectClientsFirst = randomBoolean(); + final GoogleCloudStorageService.RetryBehaviour retryBehaviour = randomRetryBehaviour(); if (configureProjectClientsFirst) { updateProjectInClusterState(projectId, newProjectClientsSecrets(projectId, clientName)); } - final var clusterClient = getClientFromService(projectIdForClusterClient(), clientName); + final var clusterClient = getClientFromService(projectIdForClusterClient(), clientName, retryBehaviour); if (configureProjectClientsFirst == false) { assertThat(gcsClientsManager.getPerProjectClientsHolders(), anEmptyMap()); updateProjectInClusterState(projectId, newProjectClientsSecrets(projectId, clientName)); } - final var projectClient = getClientFromService(projectId, clientName); + final var projectClient = getClientFromService(projectId, clientName, retryBehaviour); assertThat(projectClient, not(sameInstance(clusterClient))); // Release the cluster client @@ -290,15 +303,27 @@ public void testProjectClientsDisabled() throws IOException { // Cluster client still works final String clientName = randomFrom(clientNames); - assertNotNull(getClientFromService(projectIdForClusterClient(), clientName)); + assertNotNull(getClientFromService(projectIdForClusterClient(), clientName, randomRetryBehaviour())); } - private MeteredStorage getClientFromManager(ProjectId projectId, String clientName) throws IOException { - return gcsClientsManager.client(projectId, clientName, repoNameForClient(clientName), statsCollector); + private MeteredStorage getClientFromManager( + ProjectId projectId, + String clientName, + GoogleCloudStorageService.RetryBehaviour retryBehaviour + ) throws IOException { + return gcsClientsManager.client(projectId, clientName, repoNameForClient(clientName), statsCollector, retryBehaviour); } - private MeteredStorage getClientFromService(ProjectId projectId, String clientName) throws IOException { - return googleCloudStorageService.client(projectId, clientName, repoNameForClient(clientName), statsCollector); + private static GoogleCloudStorageService.RetryBehaviour randomRetryBehaviour() { + return randomFrom(GoogleCloudStorageService.RetryBehaviour.values()); + } + + private MeteredStorage getClientFromService( + ProjectId projectId, + String clientName, + GoogleCloudStorageService.RetryBehaviour retryBehaviour + ) throws IOException { + return googleCloudStorageService.client(projectId, clientName, repoNameForClient(clientName), statsCollector, retryBehaviour); } private ProjectId projectIdForClusterClient() { @@ -332,8 +357,11 @@ private void assertClientCredentials(ProjectId projectId, String clientName, Met assertThat(credentials.getPrivateKeyId(), equalTo(projectClientPrivateKeyId(projectId, clientName))); } - private void assertClientNotFound(ProjectId projectId, String clientName) { - final IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> getClientFromManager(projectId, clientName)); + private void assertClientNotFound(ProjectId projectId, String clientName, GoogleCloudStorageService.RetryBehaviour retryBehaviour) { + final IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> getClientFromManager(projectId, clientName, retryBehaviour) + ); assertThat( e.getMessage(), anyOf( diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePluginTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePluginTests.java index 7b9ef189b7459..e96746dabb536 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePluginTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePluginTests.java @@ -42,7 +42,8 @@ public void testExposedSettings() { "gcs.client.*.token_uri", "gcs.client.*.proxy.type", "gcs.client.*.proxy.host", - "gcs.client.*.proxy.port" + "gcs.client.*.proxy.port", + "gcs.client.*.max_retries" ), settings.stream().map(Setting::getKey).toList() ); diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStreamTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStreamTests.java index 2b223c7a16342..5908f29bb194d 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStreamTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStreamTests.java @@ -131,8 +131,11 @@ private GoogleCloudStorageRetryingInputStream createRetryingInputStream(byte[] d HttpRequest httpRequest = transport.createRequestFactory().buildGetRequest(HttpTesting.SIMPLE_GENERIC_URL); HttpResponse httpResponse = httpRequest.execute(); when(get.executeMedia()).thenReturn(httpResponse); + final GoogleCloudStorageBlobStore mockBlobStore = mock(GoogleCloudStorageBlobStore.class); + when(mockBlobStore.clientNoRetries()).thenReturn(meteredStorage); + when(mockBlobStore.getMaxRetries()).thenReturn(6); - return new GoogleCloudStorageRetryingInputStream(OperationPurpose.SNAPSHOT_DATA, meteredStorage, blobId); + return new GoogleCloudStorageRetryingInputStream(mockBlobStore, OperationPurpose.SNAPSHOT_DATA, blobId); } private GoogleCloudStorageRetryingInputStream createRetryingInputStream(byte[] data, int position, int length) throws IOException { @@ -148,9 +151,13 @@ private GoogleCloudStorageRetryingInputStream createRetryingInputStream(byte[] d when(get.executeMedia()).thenReturn(httpResponse); } + final GoogleCloudStorageBlobStore mockBlobStore = mock(GoogleCloudStorageBlobStore.class); + when(mockBlobStore.clientNoRetries()).thenReturn(meteredStorage); + when(mockBlobStore.getMaxRetries()).thenReturn(6); + return new GoogleCloudStorageRetryingInputStream( + mockBlobStore, OperationPurpose.SNAPSHOT_DATA, - meteredStorage, blobId, position, position + length - 1 diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageServiceTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageServiceTests.java index bf4745a16b41c..369d820a638a0 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageServiceTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageServiceTests.java @@ -40,6 +40,7 @@ import java.util.Locale; import java.util.UUID; +import static org.elasticsearch.repositories.gcs.GoogleCloudStorageService.RetryBehaviour.ClientConfigured; import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -58,6 +59,7 @@ public void testClientInitializer() throws Exception { + ":" + randomIntBetween(1, 65535); final String projectIdName = randomAlphaOfLength(randomIntBetween(1, 10)).toLowerCase(Locale.ROOT); + final int maxRetries = randomIntBetween(0, 10); final Settings settings = Settings.builder() .put( GoogleCloudStorageClientSettings.CONNECT_TIMEOUT_SETTING.getConcreteSettingForNamespace(clientName).getKey(), @@ -76,6 +78,7 @@ public void testClientInitializer() throws Exception { .put(GoogleCloudStorageClientSettings.PROXY_TYPE_SETTING.getConcreteSettingForNamespace(clientName).getKey(), "HTTP") .put(GoogleCloudStorageClientSettings.PROXY_HOST_SETTING.getConcreteSettingForNamespace(clientName).getKey(), "192.168.52.15") .put(GoogleCloudStorageClientSettings.PROXY_PORT_SETTING.getConcreteSettingForNamespace(clientName).getKey(), 8080) + .put(GoogleCloudStorageClientSettings.MAX_RETRIES_SETTING.getConcreteSettingForNamespace(clientName).getKey(), maxRetries) .build(); SetOnce proxy = new SetOnce<>(); final var clusterService = ClusterServiceUtils.createClusterService(new DeterministicTaskQueue().getThreadPool()); @@ -89,17 +92,18 @@ void notifyProxyIsSet(Proxy p) { var statsCollector = new GcsRepositoryStatsCollector(); final IllegalArgumentException e = expectThrows( IllegalArgumentException.class, - () -> service.client(projectIdForClusterClient(), "another_client", "repo", statsCollector) + () -> service.client(projectIdForClusterClient(), "another_client", "repo", statsCollector, randomRetryBehaviour()) ); assertThat(e.getMessage(), Matchers.startsWith("Unknown client name")); assertSettingDeprecationsAndWarnings( new Setting[] { GoogleCloudStorageClientSettings.APPLICATION_NAME_SETTING.getConcreteSettingForNamespace(clientName) } ); - final var storage = service.client(projectIdForClusterClient(), clientName, "repo", statsCollector); + final var storage = service.client(projectIdForClusterClient(), clientName, "repo", statsCollector, ClientConfigured); assertThat(storage.getOptions().getApplicationName(), Matchers.containsString(applicationName)); assertThat(storage.getOptions().getHost(), Matchers.is(endpoint)); assertThat(storage.getOptions().getProjectId(), Matchers.is(projectIdName)); assertThat(storage.getOptions().getTransportOptions(), Matchers.instanceOf(HttpTransportOptions.class)); + assertThat(storage.getOptions().getRetrySettings().getMaxAttempts(), equalTo(maxRetries + 1)); assertThat( ((HttpTransportOptions) storage.getOptions().getTransportOptions()).getConnectTimeout(), Matchers.is((int) connectTimeValue.millis()) @@ -125,18 +129,19 @@ public void testReinitClientSettings() throws Exception { ClusterServiceUtils.createClusterService(new DeterministicTaskQueue().getThreadPool()) ); when(pluginServices.projectResolver()).thenReturn(TestProjectResolvers.DEFAULT_PROJECT_ONLY); + final GoogleCloudStorageService.RetryBehaviour retryBehaviour = randomRetryBehaviour(); try (GoogleCloudStoragePlugin plugin = new GoogleCloudStoragePlugin(settings1)) { plugin.createComponents(pluginServices); final GoogleCloudStorageService storageService = plugin.storageService.get(); var statsCollector = new GcsRepositoryStatsCollector(); - final var client11 = storageService.client(projectIdForClusterClient(), "gcs1", "repo1", statsCollector); + final var client11 = storageService.client(projectIdForClusterClient(), "gcs1", "repo1", statsCollector, retryBehaviour); assertThat(client11.getOptions().getProjectId(), equalTo("project_gcs11")); - final var client12 = storageService.client(projectIdForClusterClient(), "gcs2", "repo2", statsCollector); + final var client12 = storageService.client(projectIdForClusterClient(), "gcs2", "repo2", statsCollector, retryBehaviour); assertThat(client12.getOptions().getProjectId(), equalTo("project_gcs12")); // client 3 is missing final IllegalArgumentException e1 = expectThrows( IllegalArgumentException.class, - () -> storageService.client(projectIdForClusterClient(), "gcs3", "repo3", statsCollector) + () -> storageService.client(projectIdForClusterClient(), "gcs3", "repo3", statsCollector, retryBehaviour) ); assertThat(e1.getMessage(), containsString("Unknown client name [gcs3].")); // update client settings @@ -144,18 +149,18 @@ public void testReinitClientSettings() throws Exception { // old client 1 not changed assertThat(client11.getOptions().getProjectId(), equalTo("project_gcs11")); // new client 1 is changed - final var client21 = storageService.client(projectIdForClusterClient(), "gcs1", "repo1", statsCollector); + final var client21 = storageService.client(projectIdForClusterClient(), "gcs1", "repo1", statsCollector, retryBehaviour); assertThat(client21.getOptions().getProjectId(), equalTo("project_gcs21")); // old client 2 not changed assertThat(client12.getOptions().getProjectId(), equalTo("project_gcs12")); // new client2 is gone final IllegalArgumentException e2 = expectThrows( IllegalArgumentException.class, - () -> storageService.client(projectIdForClusterClient(), "gcs2", "repo2", statsCollector) + () -> storageService.client(projectIdForClusterClient(), "gcs2", "repo2", statsCollector, retryBehaviour) ); assertThat(e2.getMessage(), containsString("Unknown client name [gcs2].")); // client 3 emerged - final var client23 = storageService.client(projectIdForClusterClient(), "gcs3", "repo3", statsCollector); + final var client23 = storageService.client(projectIdForClusterClient(), "gcs3", "repo3", statsCollector, retryBehaviour); assertThat(client23.getOptions().getProjectId(), equalTo("project_gcs23")); } } @@ -173,23 +178,27 @@ public void testClientsAreNotSharedAcrossRepositories() throws Exception { plugin.createComponents(pluginServices); final GoogleCloudStorageService storageService = plugin.storageService.get(); + final GoogleCloudStorageService.RetryBehaviour retryBehaviour = randomRetryBehaviour(); final MeteredStorage repo1Client = storageService.client( projectIdForClusterClient(), "gcs1", "repo1", - new GcsRepositoryStatsCollector() + new GcsRepositoryStatsCollector(), + retryBehaviour ); final MeteredStorage repo2Client = storageService.client( projectIdForClusterClient(), "gcs1", "repo2", - new GcsRepositoryStatsCollector() + new GcsRepositoryStatsCollector(), + retryBehaviour ); final MeteredStorage repo1ClientSecondInstance = storageService.client( projectIdForClusterClient(), "gcs1", "repo1", - new GcsRepositoryStatsCollector() + new GcsRepositoryStatsCollector(), + retryBehaviour ); assertNotSame(repo1Client, repo2Client); @@ -240,4 +249,8 @@ public void handle(HttpRequest request, HttpResponse response, HttpContext conte private ProjectId projectIdForClusterClient() { return randomBoolean() ? ProjectId.DEFAULT : null; } + + private static GoogleCloudStorageService.RetryBehaviour randomRetryBehaviour() { + return randomFrom(GoogleCloudStorageService.RetryBehaviour.values()); + } } diff --git a/modules/repository-s3/qa/third-party/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java b/modules/repository-s3/qa/third-party/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java index f6b37b3f34c89..0f113e8253cc6 100644 --- a/modules/repository-s3/qa/third-party/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java +++ b/modules/repository-s3/qa/third-party/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java @@ -49,6 +49,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import static org.elasticsearch.repositories.blobstore.AbstractBlobContainerRetriesTestCase.randomRetryingPurpose; import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.blankOrNullString; @@ -253,7 +254,7 @@ public void testCopy() { blobBytes.length() ); - return destinationBlobContainer.readBlob(randomPurpose(), destinationBlobName).readAllBytes(); + return destinationBlobContainer.readBlob(randomRetryingPurpose(), destinationBlobName).readAllBytes(); }); assertArrayEquals(BytesReference.toBytes(blobBytes), targetBytes); @@ -280,7 +281,7 @@ public void testMultipartCopy() { blobBytes.length() ); - return destinationBlobContainer.readBlob(randomPurpose(), destinationBlobName).readAllBytes(); + return destinationBlobContainer.readBlob(randomRetryingPurpose(), destinationBlobName).readAllBytes(); }); assertArrayEquals(BytesReference.toBytes(blobBytes), targetBytes); diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java index ed06e7f594ca0..4eb973f4ad0a1 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java @@ -14,12 +14,12 @@ import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; -import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.Version; import org.elasticsearch.common.blobstore.OperationPurpose; -import org.elasticsearch.core.IOUtils; +import org.elasticsearch.common.blobstore.RetryingInputStream; +import org.elasticsearch.core.Nullable; import org.elasticsearch.repositories.blobstore.RequestedRangeNotSatisfiedException; import org.elasticsearch.repositories.s3.S3BlobStore.Operation; import org.elasticsearch.rest.RestStatus; @@ -27,11 +27,8 @@ import java.io.IOException; import java.io.InputStream; import java.nio.file.NoSuchFileException; -import java.util.ArrayList; -import java.util.List; import java.util.Map; -import static org.elasticsearch.core.Strings.format; import static org.elasticsearch.repositories.s3.S3BlobStore.configureRequestForMetrics; /** @@ -41,107 +38,110 @@ * * See https://github.com/aws/aws-sdk-java/issues/856 for the related SDK issue */ -class S3RetryingInputStream extends InputStream { +class S3RetryingInputStream extends RetryingInputStream { private static final Logger logger = LogManager.getLogger(S3RetryingInputStream.class); - static final int MAX_SUPPRESSED_EXCEPTIONS = 10; - - private final OperationPurpose purpose; - private final S3BlobStore blobStore; - private final String blobKey; - private final long start; - private final long end; - private final List failures; - - private ResponseInputStream currentStream; - private long currentStreamFirstOffset; - private long currentStreamLastOffset; - private int attempt = 1; - private int failuresAfterMeaningfulProgress = 0; - private long currentOffset; - private boolean closed; - private boolean eof; - private boolean aborted = false; - S3RetryingInputStream(OperationPurpose purpose, S3BlobStore blobStore, String blobKey) throws IOException { this(purpose, blobStore, blobKey, 0, Long.MAX_VALUE - 1); } // both start and end are inclusive bounds, following the definition in GetObjectRequest.setRange S3RetryingInputStream(OperationPurpose purpose, S3BlobStore blobStore, String blobKey, long start, long end) throws IOException { - if (start < 0L) { - throw new IllegalArgumentException("start must be non-negative"); - } - if (end < start || end == Long.MAX_VALUE) { - throw new IllegalArgumentException("end must be >= start and not Long.MAX_VALUE"); - } - this.purpose = purpose; - this.blobStore = blobStore; - this.blobKey = blobKey; - this.failures = new ArrayList<>(MAX_SUPPRESSED_EXCEPTIONS); - this.start = start; - this.end = end; - final int initialAttempt = attempt; - openStreamWithRetry(); - maybeLogAndRecordMetricsForSuccess(initialAttempt, "open"); + super(new S3BlobStoreServices(blobStore, blobKey, purpose), purpose, start, end); } - private void openStreamWithRetry() throws IOException { - while (true) { + private record S3BlobStoreServices(S3BlobStore blobStore, String blobKey, OperationPurpose purpose) + implements + BlobStoreServices { + + @Override + public InputStreamAtVersion getInputStreamAtVersion(@Nullable String version, long start, long end) throws IOException { try (AmazonS3Reference clientReference = blobStore.clientReference()) { final var getObjectRequestBuilder = GetObjectRequest.builder().bucket(blobStore.bucket()).key(blobKey); configureRequestForMetrics(getObjectRequestBuilder, blobStore, Operation.GET_OBJECT, purpose); - if (currentOffset > 0 || start > 0 || end < Long.MAX_VALUE - 1) { - assert start + currentOffset <= end - : "requesting beyond end, start = " + start + " offset=" + currentOffset + " end=" + end; - getObjectRequestBuilder.range("bytes=" + Math.addExact(start, currentOffset) + "-" + end); + if (start > 0 || end < Long.MAX_VALUE - 1) { + assert start <= end : "requesting beyond end, start = " + start + " end=" + end; + getObjectRequestBuilder.range("bytes=" + start + "-" + end); + } + if (version != null) { + // This is a second or subsequent request, ensure the object hasn't changed since the first request + getObjectRequestBuilder.ifMatch(version); } - this.currentStreamFirstOffset = Math.addExact(start, currentOffset); final var getObjectRequest = getObjectRequestBuilder.build(); final var getObjectResponse = clientReference.client().getObject(getObjectRequest); - this.currentStreamLastOffset = Math.addExact(currentStreamFirstOffset, getStreamLength(getObjectResponse.response())); - this.currentStream = getObjectResponse; - return; + return new InputStreamAtVersion<>(new S3InputStream(getObjectResponse, start, end), getObjectResponse.response().eTag()); } catch (SdkException e) { if (e instanceof SdkServiceException sdkServiceException) { if (sdkServiceException.statusCode() == RestStatus.NOT_FOUND.getStatus()) { - throw addSuppressedExceptions( - new NoSuchFileException("Blob object [" + blobKey + "] not found: " + sdkServiceException.getMessage()) - ); + throw new NoSuchFileException("Blob object [" + blobKey + "] not found: " + sdkServiceException.getMessage()); } if (sdkServiceException.statusCode() == RestStatus.REQUESTED_RANGE_NOT_SATISFIED.getStatus()) { - throw addSuppressedExceptions( - new RequestedRangeNotSatisfiedException( - blobKey, - currentStreamFirstOffset, - (end < Long.MAX_VALUE - 1) ? end - currentStreamFirstOffset + 1 : end, - sdkServiceException - ) + throw new RequestedRangeNotSatisfiedException( + blobKey, + start, + (end < Long.MAX_VALUE - 1) ? end - start + 1 : end, + sdkServiceException ); } } - - if (attempt == 1) { - blobStore.getS3RepositoriesMetrics().retryStartedCounter().incrementBy(1, metricAttributes("open")); - } - final long delayInMillis = maybeLogAndComputeRetryDelay("opening", e); - delayBeforeRetry(delayInMillis); + throw e; } } + + @Override + public void onRetryStarted(String action) { + blobStore.getS3RepositoriesMetrics().retryStartedCounter().incrementBy(1, metricAttributes(action)); + } + + @Override + public void onRetrySucceeded(String action, long numberOfRetries) { + final Map attributes = metricAttributes(action); + blobStore.getS3RepositoriesMetrics().retryCompletedCounter().incrementBy(1, attributes); + blobStore.getS3RepositoriesMetrics().retryHistogram().record(numberOfRetries, attributes); + } + + @Override + public long getMeaningfulProgressSize() { + return Math.max(1L, blobStore.bufferSizeInBytes() / 100L); + } + + @Override + public int getMaxRetries() { + return blobStore.getMaxRetries(); + } + + @Override + public String getBlobDescription() { + return blobStore.bucket() + "/" + blobKey; + } + + private Map metricAttributes(String action) { + return Map.of( + "repo_type", + S3Repository.TYPE, + "repo_name", + blobStore.getRepositoryMetadata().name(), + "operation", + Operation.GET_OBJECT.getKey(), + "purpose", + purpose.getKey(), + "action", + action + ); + } } - private long getStreamLength(final GetObjectResponse getObjectResponse) { + private static long getStreamLength(final GetObjectResponse getObjectResponse, long expectedStart, long expectedEnd) { try { - return tryGetStreamLength(getObjectResponse); + return tryGetStreamLength(getObjectResponse, expectedStart, expectedEnd); } catch (Exception e) { assert false : e; return Long.MAX_VALUE - 1L; // assume a large stream so that the underlying stream is aborted on closing, unless eof is reached } } - // exposed for testing - long tryGetStreamLength(GetObjectResponse getObjectResponse) { + private static long tryGetStreamLength(GetObjectResponse getObjectResponse, long expectedStart, long expectedEnd) { // Returns the content range of the object if response contains the Content-Range header. final var rangeString = getObjectResponse.contentRange(); if (rangeString != null) { @@ -164,14 +164,14 @@ long tryGetStreamLength(GetObjectResponse getObjectResponse) { if (rangeEnd < rangeStart) { throw new IllegalArgumentException("invalid Content-range header [" + rangeString + "]"); } - if (rangeStart != start + currentOffset) { + if (rangeStart != expectedStart) { throw new IllegalArgumentException( - "unexpected Content-range header [" + rangeString + "], should have started at " + (start + currentOffset) + "unexpected Content-range header [" + rangeString + "], should have started at " + expectedStart ); } - if (rangeEnd > end) { + if (rangeEnd > expectedEnd) { throw new IllegalArgumentException( - "unexpected Content-range header [" + rangeString + "], should have ended no later than " + end + "unexpected Content-range header [" + rangeString + "], should have ended no later than " + expectedEnd ); } return rangeEnd - rangeStart + 1L; @@ -179,250 +179,124 @@ long tryGetStreamLength(GetObjectResponse getObjectResponse) { return getObjectResponse.contentLength(); } - @Override - public int read() throws IOException { - ensureOpen(); - final int initialAttempt = attempt; - while (true) { - try { - final int result = currentStream.read(); - if (result == -1) { - eof = true; - } else { - currentOffset += 1; - } - maybeLogAndRecordMetricsForSuccess(initialAttempt, "read"); - return result; - } catch (IOException e) { - if (attempt == initialAttempt) { - blobStore.getS3RepositoriesMetrics().retryStartedCounter().incrementBy(1, metricAttributes("read")); - } - reopenStreamOrFail(e); - } + private static class S3InputStream extends InputStream { + + private final ResponseInputStream responseStream; + private final long start; + private final long end; + private final long lastOffset; + private int offset = 0; + private boolean closed; + private boolean eof; + private boolean aborted; + + private S3InputStream(ResponseInputStream responseStream, long start, long end) { + this.responseStream = responseStream; + this.start = start; + this.end = end; + lastOffset = getStreamLength(responseStream.response(), start, end); } - } - @Override - public int read(byte[] b, int off, int len) throws IOException { - ensureOpen(); - final int initialAttempt = attempt; - while (true) { - try { - final int bytesRead = currentStream.read(b, off, len); - if (bytesRead == -1) { - eof = true; - } else { - currentOffset += bytesRead; - } - maybeLogAndRecordMetricsForSuccess(initialAttempt, "read"); - return bytesRead; - } catch (IOException e) { - if (attempt == initialAttempt) { - blobStore.getS3RepositoriesMetrics().retryStartedCounter().incrementBy(1, metricAttributes("read")); - } - reopenStreamOrFail(e); + @Override + public int read() throws IOException { + ensureOpen(); + int result = responseStream.read(); + if (result == -1) { + eof = true; + } else { + offset++; } + return result; } - } - - private void ensureOpen() { - if (closed) { - assert false : "using S3RetryingInputStream after close"; - throw new IllegalStateException("using S3RetryingInputStream after close"); - } - } - private void reopenStreamOrFail(IOException e) throws IOException { - final long meaningfulProgressSize = Math.max(1L, blobStore.bufferSizeInBytes() / 100L); - if (currentStreamProgress() >= meaningfulProgressSize) { - failuresAfterMeaningfulProgress += 1; + @Override + public int read(byte[] b, int off, int len) throws IOException { + ensureOpen(); + final int bytesRead = responseStream.read(b, off, len); + if (bytesRead == -1) { + eof = true; + } else { + offset += bytesRead; + } + return bytesRead; } - final long delayInMillis = maybeLogAndComputeRetryDelay("reading", e); - maybeAbort(currentStream); - IOUtils.closeWhileHandlingException(currentStream); - delayBeforeRetry(delayInMillis); - openStreamWithRetry(); - } - - // The method throws if the operation should *not* be retried. Otherwise, it keeps a record for the attempt and associated failure - // and compute the delay before retry. - private long maybeLogAndComputeRetryDelay(String action, T e) throws T { - if (shouldRetry(attempt) == false) { - final var finalException = addSuppressedExceptions(e); - logForFailure(action, finalException); - throw finalException; + private void ensureOpen() { + if (closed) { + assert false : "using S3InputStream after close"; + throw new IllegalStateException("using S3InputStream after close"); + } } - // Log at info level for the 1st retry and then exponentially less - logForRetry(Integer.bitCount(attempt) == 1 ? Level.INFO : Level.DEBUG, action, e); - if (failures.size() < MAX_SUPPRESSED_EXCEPTIONS) { - failures.add(e); + @Override + public void close() throws IOException { + maybeAbort(responseStream); + try { + responseStream.close(); + } finally { + closed = true; + } } - final long delayInMillis = getRetryDelayInMillis(); - attempt += 1; // increment after computing delay because attempt affects the result - return delayInMillis; - } - - private void logForFailure(String action, Exception e) { - logger.warn( - () -> format( - "failed %s [%s/%s] at offset [%s] with purpose [%s]", - action, - blobStore.bucket(), - blobKey, - start + currentOffset, - purpose.getKey() - ), - e - ); - } - - private void logForRetry(Level level, String action, Exception e) { - logger.log( - level, - () -> format( - """ - failed %s [%s/%s] at offset [%s] with purpose [%s]; \ - this was attempt [%s] to read this blob which yielded [%s] bytes; in total \ - [%s] of the attempts to read this blob have made meaningful progress and do not count towards the maximum number of \ - retries; the maximum number of read attempts which do not make meaningful progress is [%s]""", - action, - blobStore.bucket(), - blobKey, - start + currentOffset, - purpose.getKey(), - attempt, - currentStreamProgress(), - failuresAfterMeaningfulProgress, - maxRetriesForNoMeaningfulProgress() - ), - e - ); - } - private void maybeLogAndRecordMetricsForSuccess(int initialAttempt, String action) { - if (attempt > initialAttempt) { - final int numberOfRetries = attempt - initialAttempt; - logger.info( - "successfully {} input stream for [{}/{}] with purpose [{}] after [{}] retries", - action, - blobStore.bucket(), - blobKey, - purpose.getKey(), - numberOfRetries - ); - final Map attributes = metricAttributes(action); - blobStore.getS3RepositoriesMetrics().retryCompletedCounter().incrementBy(1, attributes); - blobStore.getS3RepositoriesMetrics().retryHistogram().record(numberOfRetries, attributes); + /** + * Abort the {@link ResponseInputStream} if it wasn't read completely at the time this method is called, + * suppressing all thrown exceptions. + */ + private void maybeAbort(ResponseInputStream stream) { + if (isEof()) { + return; + } + try { + if (offset < lastOffset) { + stream.abort(); + aborted = true; + } + } catch (Exception e) { + logger.warn("Failed to abort stream before closing", e); + } } - } - private long currentStreamProgress() { - return Math.subtractExact(Math.addExact(start, currentOffset), currentStreamFirstOffset); - } - - private boolean shouldRetry(int attempt) { - if (purpose == OperationPurpose.REPOSITORY_ANALYSIS) { - return false; - } - if (purpose == OperationPurpose.INDICES) { - return true; + @Override + public long skip(long n) throws IOException { + // This could be optimized on a failure by re-opening stream directly to the preferred location. However, it is rarely called, + // so for now we will rely on the default implementation which just discards bytes by reading. + return super.skip(n); } - final int maxAttempts = blobStore.getMaxRetries() + 1; - return attempt < maxAttempts + failuresAfterMeaningfulProgress; - } - - private int maxRetriesForNoMeaningfulProgress() { - return purpose == OperationPurpose.INDICES ? Integer.MAX_VALUE : (blobStore.getMaxRetries() + 1); - } - private void delayBeforeRetry(long delayInMillis) { - try { - assert shouldRetry(attempt - 1) : "should not have retried"; - Thread.sleep(delayInMillis); - } catch (InterruptedException e) { - logger.info("s3 input stream delay interrupted", e); - Thread.currentThread().interrupt(); + @Override + public void reset() { + throw new UnsupportedOperationException("S3InputStream does not support seeking"); } - } - - // protected access for testing - protected long getRetryDelayInMillis() { - // Initial delay is 10 ms and cap max delay at 10 * 1024 millis, i.e. it retries every ~10 seconds at a minimum - return 10L << (Math.min(attempt - 1, 10)); - } - - private Map metricAttributes(String action) { - return Map.of( - "repo_type", - S3Repository.TYPE, - "repo_name", - blobStore.getRepositoryMetadata().name(), - "operation", - Operation.GET_OBJECT.getKey(), - "purpose", - purpose.getKey(), - "action", - action - ); - } - @Override - public void close() throws IOException { - maybeAbort(currentStream); - try { - currentStream.close(); - } finally { - closed = true; + // exposed for testing + private boolean isEof() { + return eof || offset == lastOffset; } - } - /** - * Abort the {@link ResponseInputStream} if it wasn't read completely at the time this method is called, - * suppressing all thrown exceptions. - */ - private void maybeAbort(ResponseInputStream stream) { - if (isEof()) { - return; - } - try { - if (start + currentOffset < currentStreamLastOffset) { - stream.abort(); - aborted = true; - } - } catch (Exception e) { - logger.warn("Failed to abort stream before closing", e); + // exposed for testing + private boolean isAborted() { + // just expose whether abort() was called, we cannot tell if the stream is really aborted + return aborted; } - } - - @Override - public long skip(long n) throws IOException { - // This could be optimized on a failure by re-opening stream directly to the preferred location. However, it is rarely called, - // so for now we will rely on the default implementation which just discards bytes by reading. - return super.skip(n); - } - @Override - public void reset() { - throw new UnsupportedOperationException("S3RetryingInputStream does not support seeking"); - } - - private T addSuppressedExceptions(T e) { - for (Exception failure : failures) { - e.addSuppressed(failure); + // exposed for testing + private long tryGetStreamLength(GetObjectResponse response) { + return S3RetryingInputStream.tryGetStreamLength(response, start, end); } - return e; } - // package-private for tests + // exposed for testing boolean isEof() { - return eof || start + currentOffset == currentStreamLastOffset; + return ((S3InputStream) currentStream.inputStream()).isEof(); } - // package-private for tests + // exposed for testing boolean isAborted() { - // just expose whether abort() was called, we cannot tell if the stream is really aborted - return aborted; + return ((S3InputStream) currentStream.inputStream()).isAborted(); + } + + // exposed for testing + long tryGetStreamLength(GetObjectResponse getObjectResponse) { + return ((S3InputStream) currentStream.inputStream()).tryGetStreamLength(getObjectResponse); } } diff --git a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java index 0a208c5241cf1..d19e65e7b5419 100644 --- a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -51,7 +51,6 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.repositories.blobstore.AbstractBlobContainerRetriesTestCase; -import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.telemetry.InstrumentType; import org.elasticsearch.telemetry.Measurement; @@ -60,7 +59,6 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.MockLog; import org.elasticsearch.watcher.ResourceWatcherService; -import org.hamcrest.Matcher; import org.junit.After; import org.junit.Before; import org.mockito.Mockito; @@ -102,19 +100,16 @@ import static org.elasticsearch.repositories.s3.S3ClientSettings.MAX_RETRIES_SETTING; import static org.elasticsearch.repositories.s3.S3ClientSettings.READ_TIMEOUT_SETTING; import static org.elasticsearch.repositories.s3.S3ClientSettingsTests.DEFAULT_REGION_UNAVAILABLE; -import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; -import static org.hamcrest.Matchers.lessThanOrEqualTo; /** * This class tests how a {@link S3BlobContainer} and its underlying AWS S3 client are retrying requests when reading or writing blobs. @@ -1361,25 +1356,6 @@ private static void assertBase16MD5Digest(String input, String expectedDigestStr assertEquals(expectedDigestString, getBase16MD5Digest(new BytesArray(input))); } - @Override - protected Matcher getMaxRetriesMatcher(int maxRetries) { - // some attempts make meaningful progress and do not count towards the max retry limit - return allOf(greaterThanOrEqualTo(maxRetries), lessThanOrEqualTo(S3RetryingInputStream.MAX_SUPPRESSED_EXCEPTIONS)); - } - - @Override - protected OperationPurpose randomRetryingPurpose() { - return randomValueOtherThan(OperationPurpose.REPOSITORY_ANALYSIS, BlobStoreTestUtil::randomPurpose); - } - - @Override - protected OperationPurpose randomFiniteRetryingPurpose() { - return randomValueOtherThanMany( - purpose -> purpose == OperationPurpose.REPOSITORY_ANALYSIS || purpose == OperationPurpose.INDICES, - BlobStoreTestUtil::randomPurpose - ); - } - private void assertMetricsForOpeningStream() { final long numberOfOperations = getOperationMeasurements(); // S3 client sdk internally also retries within the configured maxRetries for retryable errors. diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/RetryingInputStream.java b/server/src/main/java/org/elasticsearch/common/blobstore/RetryingInputStream.java new file mode 100644 index 0000000000000..87da8d05cd8c6 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/blobstore/RetryingInputStream.java @@ -0,0 +1,329 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.common.blobstore; + +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.core.IOUtils; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.repositories.blobstore.RequestedRangeNotSatisfiedException; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.NoSuchFileException; +import java.util.ArrayList; +import java.util.List; + +import static org.elasticsearch.core.Strings.format; + +public abstract class RetryingInputStream extends InputStream { + + private static final Logger logger = LogManager.getLogger(RetryingInputStream.class); + + public static final int MAX_SUPPRESSED_EXCEPTIONS = 10; + + private final BlobStoreServices blobStoreServices; + private final OperationPurpose purpose; + private final long start; + private final long end; + private final List failures; + + protected InputStreamAtVersion currentStream; + private long currentStreamFirstOffset; + private int attempt = 1; + private int failuresAfterMeaningfulProgress = 0; + private int currentOffset = 0; + private boolean closed = false; + + protected RetryingInputStream(BlobStoreServices blobStoreServices, OperationPurpose purpose) throws IOException { + this(blobStoreServices, purpose, 0L, Long.MAX_VALUE - 1L); + } + + @SuppressWarnings("this-escape") // TODO: We can do better than this but I don't want to touch the tests for the first implementation + protected RetryingInputStream(BlobStoreServices blobStoreServices, OperationPurpose purpose, long start, long end) + throws IOException { + if (start < 0L) { + throw new IllegalArgumentException("start must be non-negative"); + } + if (end < start || end == Long.MAX_VALUE) { + throw new IllegalArgumentException("end must be >= start and not Long.MAX_VALUE"); + } + this.blobStoreServices = blobStoreServices; + this.purpose = purpose; + this.failures = new ArrayList<>(MAX_SUPPRESSED_EXCEPTIONS); + this.start = start; + this.end = end; + final int initialAttempt = attempt; + openStreamWithRetry(); + maybeLogAndRecordMetricsForSuccess(initialAttempt, "open"); + } + + private void openStreamWithRetry() throws IOException { + while (true) { + if (currentOffset > 0 || start > 0 || end < Long.MAX_VALUE - 1) { + assert start + currentOffset <= end + : "requesting beyond end, start = " + start + " offset=" + currentOffset + " end=" + end; + } + try { + currentStream = blobStoreServices.getInputStreamAtVersion( + currentStream != null ? currentStream.version : null, + start + currentOffset, + end + ); + this.currentStreamFirstOffset = Math.addExact(start, currentOffset); + return; + } catch (NoSuchFileException | RequestedRangeNotSatisfiedException e) { + throw e; + } catch (RuntimeException | IOException e) { + if (attempt == 1) { + blobStoreServices.onRetryStarted("open"); + } + final long delayInMillis = maybeLogAndComputeRetryDelay("opening", e); + delayBeforeRetry(delayInMillis); + } + } + } + + @Override + public int read() throws IOException { + ensureOpen(); + final int initialAttempt = attempt; + while (true) { + try { + final int result = currentStream.inputStream.read(); + if (result != -1) { + currentOffset += 1; + } + maybeLogAndRecordMetricsForSuccess(initialAttempt, "read"); + return result; + } catch (IOException e) { + if (attempt == initialAttempt) { + blobStoreServices.onRetryStarted("read"); + } + reopenStreamOrFail(e); + } + } + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + ensureOpen(); + final int initialAttempt = attempt; + while (true) { + try { + final int bytesRead = currentStream.inputStream.read(b, off, len); + if (bytesRead != -1) { + currentOffset += bytesRead; + } + maybeLogAndRecordMetricsForSuccess(initialAttempt, "read"); + return bytesRead; + } catch (IOException e) { + if (attempt == initialAttempt) { + blobStoreServices.onRetryStarted("read"); + } + reopenStreamOrFail(e); + } + } + } + + private void ensureOpen() { + if (closed) { + assert false : "using RetryingInputStream after close"; + throw new IllegalStateException("Stream is closed"); + } + } + + private void reopenStreamOrFail(IOException e) throws IOException { + final long meaningfulProgressSize = blobStoreServices.getMeaningfulProgressSize(); + if (currentStreamProgress() >= meaningfulProgressSize) { + failuresAfterMeaningfulProgress += 1; + } + final long delayInMillis = maybeLogAndComputeRetryDelay("reading", e); + IOUtils.closeWhileHandlingException(currentStream.inputStream); + + delayBeforeRetry(delayInMillis); + openStreamWithRetry(); + } + + // The method throws if the operation should *not* be retried. Otherwise, it keeps a record for the attempt and associated failure + // and compute the delay before retry. + private long maybeLogAndComputeRetryDelay(String action, Exception e) throws IOException { + if (shouldRetry(attempt) == false) { + final var finalException = addSuppressedExceptions(e); + logForFailure(action, finalException); + switch (finalException) { + case RuntimeException runtimeException: + throw runtimeException; + case IOException ioException: + throw ioException; + default: + throw new IOException("Error " + action + "blob", finalException); + } + } + + // Log at info level for the 1st retry and then exponentially less + logForRetry(Integer.bitCount(attempt) == 1 ? Level.INFO : Level.DEBUG, action, e); + if (failures.size() < MAX_SUPPRESSED_EXCEPTIONS) { + failures.add(e); + } + final long delayInMillis = getRetryDelayInMillis(); + attempt += 1; // increment after computing delay because attempt affects the result + return delayInMillis; + } + + private void logForFailure(String action, Exception e) { + logger.warn( + () -> format( + "failed %s [%s] at offset [%s] with purpose [%s]", + action, + blobStoreServices.getBlobDescription(), + start + currentOffset, + purpose.getKey() + ), + e + ); + } + + private void logForRetry(Level level, String action, Exception e) { + logger.log( + level, + () -> format( + """ + failed %s [%s] at offset [%s] with purpose [%s]; \ + this was attempt [%s] to read this blob which yielded [%s] bytes; in total \ + [%s] of the attempts to read this blob have made meaningful progress and do not count towards the maximum number of \ + retries; the maximum number of read attempts which do not make meaningful progress is [%s]""", + action, + blobStoreServices.getBlobDescription(), + start + currentOffset, + purpose.getKey(), + attempt, + currentStreamProgress(), + failuresAfterMeaningfulProgress, + maxRetriesForNoMeaningfulProgress() + ), + e + ); + } + + private void maybeLogAndRecordMetricsForSuccess(int initialAttempt, String action) { + if (attempt > initialAttempt) { + final int numberOfRetries = attempt - initialAttempt; + logger.info( + "successfully {} input stream for [{}] with purpose [{}] after [{}] retries", + action, + blobStoreServices.getBlobDescription(), + purpose.getKey(), + numberOfRetries + ); + blobStoreServices.onRetrySucceeded(action, numberOfRetries); + } + } + + private long currentStreamProgress() { + return Math.subtractExact(Math.addExact(start, currentOffset), currentStreamFirstOffset); + } + + private boolean shouldRetry(int attempt) { + if (purpose == OperationPurpose.REPOSITORY_ANALYSIS) { + return false; + } + if (purpose == OperationPurpose.INDICES) { + return true; + } + final int maxAttempts = blobStoreServices.getMaxRetries() + 1; + return attempt < maxAttempts + failuresAfterMeaningfulProgress; + } + + private int maxRetriesForNoMeaningfulProgress() { + return purpose == OperationPurpose.INDICES ? Integer.MAX_VALUE : (blobStoreServices.getMaxRetries() + 1); + } + + private void delayBeforeRetry(long delayInMillis) { + try { + assert shouldRetry(attempt - 1) : "should not have retried"; + Thread.sleep(delayInMillis); + } catch (InterruptedException e) { + logger.info("retrying input stream delay interrupted", e); + Thread.currentThread().interrupt(); + } + } + + // protected access for testing + protected long getRetryDelayInMillis() { + // Initial delay is 10 ms and cap max delay at 10 * 1024 millis, i.e. it retries every ~10 seconds at a minimum + return 10L << (Math.min(attempt - 1, 10)); + } + + @Override + public void close() throws IOException { + try { + currentStream.inputStream.close(); + } finally { + closed = true; + } + } + + @Override + public long skip(long n) throws IOException { + ensureOpen(); + return currentStream.inputStream.skip(n); + } + + @Override + public void reset() { + throw new UnsupportedOperationException("RetryingInputStream does not support seeking"); + } + + private T addSuppressedExceptions(T e) { + for (Exception failure : failures) { + e.addSuppressed(failure); + } + return e; + } + + /** + * This implements all the behavior that is blob-store-specific + * + * @param The type of the version used + */ + protected interface BlobStoreServices { + + /** + * Get an input stream for the given version + * + * @param version The version to request, or null if the latest version should be used + * @param start The start of the range to read, inclusive + * @param end The end of the range to read, exclusive, or {@code Long.MAX_VALUE - 1} if the end of the blob should be used + * @return An input stream for the given version + * @throws IOException if a retryable error occurs while opening the stream + * @throws NoSuchFileException if the blob does not exist, this is not retry-able + * @throws RequestedRangeNotSatisfiedException if the requested range is not valid, this is not retry-able + */ + InputStreamAtVersion getInputStreamAtVersion(@Nullable V version, long start, long end) throws IOException; + + void onRetryStarted(String action); + + void onRetrySucceeded(String action, long numberOfRetries); + + long getMeaningfulProgressSize(); + + int getMaxRetries(); + + String getBlobDescription(); + } + + protected record InputStreamAtVersion(InputStream inputStream, V version) { + // Make the default constructor public + public InputStreamAtVersion {} + + } +} diff --git a/server/src/test/java/org/elasticsearch/common/blobstore/RetryingInputStreamTests.java b/server/src/test/java/org/elasticsearch/common/blobstore/RetryingInputStreamTests.java new file mode 100644 index 0000000000000..8e059e23c0f23 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/common/blobstore/RetryingInputStreamTests.java @@ -0,0 +1,334 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.common.blobstore; + +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.Streams; +import org.elasticsearch.repositories.blobstore.RequestedRangeNotSatisfiedException; +import org.elasticsearch.test.ESTestCase; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.NoSuchFileException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; + +import static org.elasticsearch.repositories.blobstore.AbstractBlobContainerRetriesTestCase.randomFiniteRetryingPurpose; +import static org.elasticsearch.repositories.blobstore.AbstractBlobContainerRetriesTestCase.randomRetryingPurpose; +import static org.hamcrest.Matchers.empty; + +public class RetryingInputStreamTests extends ESTestCase { + + public void testRetryableErrorsWhenReadingAreRetried() throws IOException { + final var retryableFailures = randomIntBetween(1, 5); + final var failureCounter = new AtomicInteger(retryableFailures); + final var resourceBytes = randomBytesReference((int) ByteSizeValue.ofKb(randomIntBetween(5, 200)).getBytes()); + final var eTag = randomUUID(); + + final var services = new BlobStoreServicesAdapter(retryableFailures * 2) { + @Override + public RetryingInputStream.InputStreamAtVersion doGetInputStream(String version, long start, long end) + throws IOException { + final var inputStream = new FailureAtIndexInputStream(resourceBytes, (int) start, failureCounter.getAndDecrement() > 0); + return new RetryingInputStream.InputStreamAtVersion<>(inputStream, eTag); + } + }; + + byte[] results = copyToBytes(new RetryingInputStream<>(services, randomRetryingPurpose()) { + }); + assertEquals(resourceBytes.length(), results.length); + assertEquals(resourceBytes, new BytesArray(results)); + assertEquals(retryableFailures + 1, services.getAttempts()); + assertEquals(Stream.generate(() -> "read").limit(retryableFailures).toList(), services.getRetryStarted()); + } + + public void testReadWillFailWhenRetryableErrorsExceedMaxRetries() { + final var maxRetries = randomIntBetween(1, 5); + final var resourceBytes = randomBytesReference((int) ByteSizeValue.ofKb(randomIntBetween(10, 100)).getBytes()); + final var eTag = randomUUID(); + + final var services = new BlobStoreServicesAdapter(maxRetries) { + @Override + public RetryingInputStream.InputStreamAtVersion doGetInputStream(String version, long start, long end) + throws IOException { + final var inputStream = new FailureAtIndexInputStream(resourceBytes, (int) start, true); + return new RetryingInputStream.InputStreamAtVersion<>(inputStream, eTag); + } + }; + + final var ioException = assertThrows( + IOException.class, + () -> copyToBytes(new RetryingInputStream<>(services, randomFiniteRetryingPurpose()) { + }) + ); + assertEquals("This is retry-able", ioException.getMessage()); + assertEquals(maxRetries + 1, services.getAttempts()); + assertEquals(Stream.generate(() -> "read").limit(maxRetries + 1).toList(), services.getRetryStarted()); + } + + public void testReadWillFailWhenRetryableErrorsOccurDuringRepositoryAnalysis() { + final var maxRetries = randomIntBetween(2, 5); + final var resourceBytes = randomBytesReference((int) ByteSizeValue.ofKb(randomIntBetween(5, 200)).getBytes()); + final var eTag = randomUUID(); + + final var services = new BlobStoreServicesAdapter(maxRetries) { + @Override + public RetryingInputStream.InputStreamAtVersion doGetInputStream(String version, long start, long end) + throws IOException { + final var inputStream = new FailureAtIndexInputStream(resourceBytes, (int) start, true); + return new RetryingInputStream.InputStreamAtVersion<>(inputStream, eTag); + } + }; + + final var ioException = assertThrows( + IOException.class, + () -> copyToBytes(new RetryingInputStream<>(services, OperationPurpose.REPOSITORY_ANALYSIS) { + }) + ); + assertEquals("This is retry-able", ioException.getMessage()); + assertEquals(1, services.getAttempts()); + assertEquals(List.of("read"), services.getRetryStarted()); + } + + public void testReadWillRetryIndefinitelyWhenErrorsOccurDuringIndicesOperation() throws IOException { + final var resourceBytes = randomBytesReference((int) ByteSizeValue.ofKb(randomIntBetween(5, 200)).getBytes()); + final int numberOfFailures = randomIntBetween(1, 10); + final AtomicInteger failureCounter = new AtomicInteger(numberOfFailures); + final var eTag = randomUUID(); + + final var services = new BlobStoreServicesAdapter(0) { + @Override + public RetryingInputStream.InputStreamAtVersion doGetInputStream(String version, long start, long end) + throws IOException { + final var inputStream = new FailureAtIndexInputStream(resourceBytes, (int) start, failureCounter.getAndDecrement() > 0); + return new RetryingInputStream.InputStreamAtVersion<>(inputStream, eTag); + } + }; + + byte[] result = copyToBytes(new RetryingInputStream<>(services, OperationPurpose.INDICES) { + }); + assertEquals(resourceBytes, new BytesArray(result)); + assertEquals(numberOfFailures + 1, services.getAttempts()); + assertEquals(Stream.generate(() -> "read").limit(numberOfFailures).toList(), services.getRetryStarted()); + } + + public void testRetriesWillBeExtendedWhenMeaningfulProgressIsMade() { + final var maxRetries = randomIntBetween(1, 5); + final var resourceBytes = randomBytesReference((int) ByteSizeValue.ofKb(randomIntBetween(100, 150)).getBytes()); + final var meaningfulProgressSize = randomIntBetween(1024, 4096); + final var meaningfulProgressAttempts = randomIntBetween(1, 3); + final var meaningfulProgressAttemptsCounter = new AtomicInteger(meaningfulProgressAttempts); + final var eTag = randomUUID(); + + final var services = new BlobStoreServicesAdapter(maxRetries) { + @Override + public RetryingInputStream.InputStreamAtVersion doGetInputStream(String version, long start, long end) + throws IOException { + final var inputStream = meaningfulProgressAttemptsCounter.decrementAndGet() > 0 + ? new FailureAtIndexInputStream(resourceBytes, (int) start, true, meaningfulProgressSize, Integer.MAX_VALUE) + : new FailureAtIndexInputStream(resourceBytes, (int) start, true, 1, meaningfulProgressSize - 1); + return new RetryingInputStream.InputStreamAtVersion<>(inputStream, eTag); + } + + @Override + public long getMeaningfulProgressSize() { + return meaningfulProgressSize; + } + }; + + final var ioException = assertThrows( + IOException.class, + () -> copyToBytes(new RetryingInputStream<>(services, randomFiniteRetryingPurpose()) { + }) + ); + assertEquals("This is retry-able", ioException.getMessage()); + assertEquals(maxRetries + meaningfulProgressAttempts, services.getAttempts()); + assertEquals(Stream.generate(() -> "read").limit(maxRetries + meaningfulProgressAttempts).toList(), services.getRetryStarted()); + } + + public void testNoSuchFileExceptionAndRangeNotSatisfiedTerminatesWithoutRetry() { + final var notRetriableException = randomBoolean() + ? new NoSuchFileException("This is not retry-able") + : new RequestedRangeNotSatisfiedException("This is not retry-able", randomLong(), randomLong()); + final var retryableFailures = randomIntBetween(1, 5); + final var failureCounter = new AtomicInteger(retryableFailures); + + final var services = new BlobStoreServicesAdapter(retryableFailures * 2) { + @Override + public RetryingInputStream.InputStreamAtVersion doGetInputStream(String version, long start, long end) + throws IOException { + if (failureCounter.getAndDecrement() > 0) { + if (randomBoolean()) { + throw new RuntimeException("This is retry-able"); + } else { + throw new IOException("This is retry-able"); + } + } + throw notRetriableException; + } + }; + final IOException ioException = assertThrows( + IOException.class, + () -> copyToBytes(new RetryingInputStream<>(services, randomRetryingPurpose()) { + }) + ); + assertSame(notRetriableException, ioException); + assertEquals(retryableFailures + 1, services.getAttempts()); + assertEquals(List.of("open"), services.getRetryStarted()); + assertThat(services.getRetrySucceeded(), empty()); + } + + public void testBlobVersionIsRequestedForSecondAndSubsequentAttempts() throws IOException { + final var resourceBytes = randomBytesReference((int) ByteSizeValue.ofKb(randomIntBetween(5, 200)).getBytes()); + final int numberOfFailures = randomIntBetween(1, 10); + final AtomicInteger failureCounter = new AtomicInteger(numberOfFailures); + final var eTag = randomUUID(); + + final var services = new BlobStoreServicesAdapter(0) { + @Override + public RetryingInputStream.InputStreamAtVersion doGetInputStream(String version, long start, long end) + throws IOException { + if (getAttempts() > 1) { + assertEquals(eTag, version); + } else { + assertNull(version); + } + final var inputStream = new FailureAtIndexInputStream(resourceBytes, (int) start, failureCounter.getAndDecrement() > 0); + return new RetryingInputStream.InputStreamAtVersion<>(inputStream, eTag); + } + }; + + copyToBytes(new RetryingInputStream<>(services, OperationPurpose.INDICES) { + }); + } + + private static byte[] copyToBytes(InputStream inputStream) throws IOException { + final var outputStream = new ByteArrayOutputStream(); + if (randomBoolean()) { + Streams.copy(inputStream, outputStream); + } else { + while (true) { + final int read = inputStream.read(); + if (read == -1) { + break; + } + outputStream.write(read); + } + } + return outputStream.toByteArray(); + } + + private abstract static class BlobStoreServicesAdapter implements RetryingInputStream.BlobStoreServices { + + private final AtomicInteger attemptCounter = new AtomicInteger(); + private final List retryStarted = new ArrayList<>(); + private final List retrySucceeded = new ArrayList<>(); + private final int maxRetries; + + private BlobStoreServicesAdapter(int maxRetries) { + this.maxRetries = maxRetries; + } + + @Override + public final RetryingInputStream.InputStreamAtVersion getInputStreamAtVersion(String version, long start, long end) + throws IOException { + attemptCounter.incrementAndGet(); + return doGetInputStream(version, start, end); + } + + protected abstract RetryingInputStream.InputStreamAtVersion doGetInputStream(String version, long start, long end) + throws IOException; + + @Override + public void onRetryStarted(String action) { + retryStarted.add(action); + } + + @Override + public void onRetrySucceeded(String action, long numberOfRetries) { + retrySucceeded.add(new Success(action, numberOfRetries)); + } + + @Override + public long getMeaningfulProgressSize() { + return Long.MAX_VALUE; + } + + @Override + public final int getMaxRetries() { + return maxRetries; + } + + @Override + public String getBlobDescription() { + return ""; + } + + record Success(String action, long numberOfRetries) {}; + + public int getAttempts() { + return attemptCounter.get(); + } + + public List getRetryStarted() { + return retryStarted; + } + + public List getRetrySucceeded() { + return retrySucceeded; + } + } + + private static class FailureAtIndexInputStream extends InputStream { + + private final InputStream delegate; + private int readRemaining; + + private FailureAtIndexInputStream(BytesReference bytesReference, int startIndex, boolean failBeforeEnd) throws IOException { + this(bytesReference, startIndex, failBeforeEnd, 1, Integer.MAX_VALUE); + } + + private FailureAtIndexInputStream( + BytesReference bytesReference, + int startIndex, + boolean failBeforeEnd, + int minimumSuccess, + int maximumSuccess + ) throws IOException { + final int remainingBytes = bytesReference.length() - startIndex; + this.delegate = bytesReference.slice(startIndex, remainingBytes).streamInput(); + if (failBeforeEnd) { + this.readRemaining = randomIntBetween(Math.max(1, minimumSuccess), Math.min(maximumSuccess, remainingBytes / 2)); + } else { + this.readRemaining = Integer.MAX_VALUE; + } + } + + @Override + public int read() throws IOException { + if (readRemaining > 0) { + readRemaining--; + return delegate.read(); + } else { + throw new IOException("This is retry-able"); + } + } + + @Override + public String toString() { + return "Failing after " + readRemaining; + } + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/AbstractBlobContainerRetriesTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/AbstractBlobContainerRetriesTestCase.java index a70ce9340ad00..dda77b31f9982 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/AbstractBlobContainerRetriesTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/AbstractBlobContainerRetriesTestCase.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.OperationPurpose; +import org.elasticsearch.common.blobstore.RetryingInputStream; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.unit.ByteSizeValue; @@ -27,6 +28,7 @@ import org.elasticsearch.mocksocket.MockHttpServer; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.fixture.HttpHeaderParser; +import org.hamcrest.Matcher; import org.junit.After; import org.junit.Before; @@ -44,10 +46,10 @@ import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose; import static org.elasticsearch.test.NeverMatcher.never; +import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.either; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.lessThan; @@ -291,16 +293,20 @@ public void testReadBlobWithReadTimeouts() { assertThat(exception.getSuppressed().length, getMaxRetriesMatcher(maxRetries)); } - protected org.hamcrest.Matcher getMaxRetriesMatcher(int maxRetries) { - return equalTo(maxRetries); + protected Matcher getMaxRetriesMatcher(int maxRetries) { + // some attempts make meaningful progress and do not count towards the max retry limit + return allOf(greaterThanOrEqualTo(maxRetries), lessThanOrEqualTo(RetryingInputStream.MAX_SUPPRESSED_EXCEPTIONS)); } - protected OperationPurpose randomRetryingPurpose() { - return randomPurpose(); + public static OperationPurpose randomRetryingPurpose() { + return randomValueOtherThan(OperationPurpose.REPOSITORY_ANALYSIS, BlobStoreTestUtil::randomPurpose); } - protected OperationPurpose randomFiniteRetryingPurpose() { - return randomPurpose(); + public static OperationPurpose randomFiniteRetryingPurpose() { + return randomValueOtherThanMany( + purpose -> purpose == OperationPurpose.REPOSITORY_ANALYSIS || purpose == OperationPurpose.INDICES, + BlobStoreTestUtil::randomPurpose + ); } public void testReadBlobWithNoHttpResponse() { diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java index 8870c79218780..309133dbaddb9 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java @@ -60,6 +60,7 @@ import java.util.stream.IntStream; import java.util.stream.Stream; +import static org.elasticsearch.repositories.blobstore.AbstractBlobContainerRetriesTestCase.randomRetryingPurpose; import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.READONLY_SETTING_KEY; import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.SNAPSHOT_INDEX_NAME_FORMAT; import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.SNAPSHOT_NAME_FORMAT; @@ -130,7 +131,7 @@ public void testReadNonExistingPath() throws IOException { try (BlobStore store = newBlobStore()) { final BlobContainer container = store.blobContainer(BlobPath.EMPTY); expectThrows(NoSuchFileException.class, () -> { - try (InputStream is = container.readBlob(randomPurpose(), "non-existing")) { + try (InputStream is = container.readBlob(randomRetryingPurpose(), "non-existing")) { is.read(); } }); @@ -157,7 +158,7 @@ public void testWriteMaybeCopyRead() throws IOException { readBlobName = destinationBlobName; } catch (UnsupportedOperationException ignored) {} } - try (InputStream stream = container.readBlob(randomPurpose(), readBlobName)) { + try (InputStream stream = container.readBlob(randomRetryingPurpose(), readBlobName)) { BytesRefBuilder target = new BytesRefBuilder(); while (target.length() < data.length) { byte[] buffer = new byte[scaledRandomIntBetween(1, data.length - target.length())]; @@ -285,7 +286,7 @@ public static byte[] writeRandomBlob(BlobContainer container, String name, int l public static byte[] readBlobFully(BlobContainer container, String name, int length) throws IOException { byte[] data = new byte[length]; - try (InputStream inputStream = container.readBlob(randomPurpose(), name)) { + try (InputStream inputStream = container.readBlob(randomRetryingPurpose(), name)) { assertThat(Streams.readFully(inputStream, data), CoreMatchers.equalTo(length)); assertThat(inputStream.read(), CoreMatchers.equalTo(-1)); }