Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
33c85bc
Factor out non-CSP-specific retrying logic
nicktindall Oct 16, 2025
419ea29
Restore original exception throwing
nicktindall Oct 16, 2025
7b61719
Merge remote-tracking branch 'origin/main' into retrying_blobstore_st…
nicktindall Oct 16, 2025
643883f
First pass AzureRetryingInputStream
nicktindall Oct 16, 2025
82f871a
Move not-found and range-not-satisfied logic into AzureRetryingInputS…
nicktindall Oct 20, 2025
c42cc7f
Merge branch 'main' into retrying_blobstore_streams
nicktindall Oct 20, 2025
7e936c2
Create BlobRequestConditions eagerly
nicktindall Oct 20, 2025
4629790
Always set ifMatch (null check is redundant)
nicktindall Oct 20, 2025
832141e
Use retrying stream to do retries
nicktindall Oct 21, 2025
968d89b
visibility
nicktindall Oct 21, 2025
e06d79a
tidy
nicktindall Oct 21, 2025
972a505
Add test
nicktindall Oct 21, 2025
c4fcaa1
Use common retry logic for GCS
nicktindall Nov 7, 2025
3670b99
Fix tests
nicktindall Nov 7, 2025
cec7b7f
Merge branch 'main' into retrying_blobstore_streams
nicktindall Nov 7, 2025
2128fd1
Merge branch 'main' into retrying_blobstore_streams
nicktindall Nov 9, 2025
66900b3
Fix GCS tests
nicktindall Nov 10, 2025
d775068
Fix test for exposed settings
nicktindall Nov 10, 2025
36ed417
Merge branch 'main' into retrying_blobstore_streams
nicktindall Nov 10, 2025
6ccd458
Use randomRetryingPurpose whenever we readBlob and care about the res…
nicktindall Nov 11, 2025
7b2ee50
Merge branch 'main' into retrying_blobstore_streams
nicktindall Nov 11, 2025
dd86c6d
Merge branch 'main' into retrying_blobstore_streams
nicktindall Nov 14, 2025
eecda28
Opt out of GCS client retries when reading via retryable stream
nicktindall Nov 18, 2025
6fcef80
Increase max retries to accommodate random failures
nicktindall Nov 19, 2025
eb239b7
Merge branch 'main' into retrying_blobstore_streams
nicktindall Nov 19, 2025
7634e66
Fix test
nicktindall Nov 19, 2025
e4b18ef
Fix tests again
nicktindall Nov 19, 2025
3d2c246
First pass on unit test
nicktindall Nov 19, 2025
c88f8e2
Tidy up tests, fix log message
nicktindall Nov 19, 2025
14cff38
Minimise change
nicktindall Nov 19, 2025
167b29c
[CI] Auto commit changes from spotless
Nov 19, 2025
ce4bd1f
Test that version is requested for retries
nicktindall Nov 19, 2025
0e3ab79
Put some values in for "meaningful progress"
nicktindall Nov 20, 2025
a49cbed
Add comment indicating that getInputStream won't retry on failures
nicktindall Nov 20, 2025
47f1834
Minimise change
nicktindall Nov 20, 2025
5b23aa9
Use default RetrySettings from the GCS client code
nicktindall Nov 20, 2025
af8bc55
Make GoogleCloudStorageClientsManagerTests retry-behaviour aware
nicktindall Nov 20, 2025
86aa89f
Tidy
nicktindall Nov 20, 2025
e3d54da
Tidy
nicktindall Nov 20, 2025
792e6de
Merge branch 'main' into retrying_blobstore_streams
nicktindall Nov 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import java.util.stream.Collectors;

import static org.elasticsearch.repositories.RepositoriesMetrics.METRIC_REQUESTS_TOTAL;
import static org.elasticsearch.repositories.blobstore.AbstractBlobContainerRetriesTestCase.randomRetryingPurpose;
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
Expand Down Expand Up @@ -360,7 +361,7 @@ public void testReadByteByByte() throws Exception {
container.writeBlob(randomPurpose(), blobName, new ByteArrayInputStream(data), data.length, true);

var originalDataInputStream = new ByteArrayInputStream(data);
try (var azureInputStream = container.readBlob(randomPurpose(), blobName)) {
try (var azureInputStream = container.readBlob(randomRetryingPurpose(), blobName)) {
for (int i = 0; i < data.length; i++) {
assertThat(originalDataInputStream.read(), is(equalTo(azureInputStream.read())));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,8 @@

package org.elasticsearch.repositories.azure;

import com.azure.core.exception.HttpResponseException;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobContainer;
Expand All @@ -26,8 +23,6 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.repositories.blobstore.RequestedRangeNotSatisfiedException;
import org.elasticsearch.rest.RestStatus;

import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -67,20 +62,7 @@ private InputStream openInputStream(OperationPurpose purpose, String blobName, l
// stream to it.
throw new NoSuchFileException("Blob [" + blobKey + "] not found");
}
try {
return blobStore.getInputStream(purpose, blobKey, position, length);
} catch (Exception e) {
if (ExceptionsHelper.unwrap(e, HttpResponseException.class) instanceof HttpResponseException httpResponseException) {
final var httpStatusCode = httpResponseException.getResponse().getStatusCode();
if (httpStatusCode == RestStatus.NOT_FOUND.getStatus()) {
throw new NoSuchFileException("Blob [" + blobKey + "] not found");
}
if (httpStatusCode == RestStatus.REQUESTED_RANGE_NOT_SATISFIED.getStatus()) {
throw new RequestedRangeNotSatisfiedException(blobKey, position, length == null ? -1 : length, e);
}
}
throw new IOException("Unable to get input stream for blob [" + blobKey + "]", e);
}
return new AzureRetryingInputStream(blobStore, purpose, blobKey, position, length);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import reactor.core.scheduler.Schedulers;

import com.azure.core.http.HttpMethod;
import com.azure.core.http.rest.ResponseBase;
import com.azure.core.util.BinaryData;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
Expand Down Expand Up @@ -345,7 +344,21 @@ private static boolean isIgnorableBatchDeleteException(Throwable exception) {
return false;
}

public InputStream getInputStream(OperationPurpose purpose, String blob, long position, final @Nullable Long length) {
int getMaxReadRetries() {
return service.getMaxReadRetries(projectId, clientName);
}

/**
* Get an {@link InputStream} for reading the specified blob. The returned input stream will not retry on a read failure,
* to get an input stream that implements retries use {@link AzureBlobContainer#readBlob(OperationPurpose, String, long, long)}
*/
AzureInputStream getInputStream(
OperationPurpose purpose,
String blob,
long position,
final @Nullable Long length,
@Nullable String eTag
) throws IOException {
logger.trace(() -> format("reading container [%s], blob [%s]", container, blob));
final AzureBlobServiceClient azureBlobServiceClient = getAzureBlobServiceClientClient(purpose);
final BlobServiceClient syncClient = azureBlobServiceClient.getSyncClient();
Expand All @@ -360,19 +373,15 @@ public InputStream getInputStream(OperationPurpose purpose, String blob, long po
totalSize = position + length;
}
BlobAsyncClient blobAsyncClient = asyncClient.getBlobContainerAsyncClient(container).getBlobAsyncClient(blob);
int maxReadRetries = service.getMaxReadRetries(projectId, clientName);
try {
return new AzureInputStream(
blobAsyncClient,
position,
length == null ? totalSize : length,
totalSize,
maxReadRetries,
azureBlobServiceClient.getAllocator()
);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
return new AzureInputStream(
blobAsyncClient,
position,
length == null ? totalSize : length,
totalSize,
0,
Copy link
Contributor Author

@nicktindall nicktindall Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We disable Azure client retries in these downloads so we can control them in the RetryingInputStream

azureBlobServiceClient.getAllocator(),
eTag
);
}

public Map<String, BlobMetadata> listBlobsByPrefix(OperationPurpose purpose, String keyPath, String prefix) throws IOException {
Expand Down Expand Up @@ -1076,27 +1085,34 @@ RequestMetricsRecorder getMetricsRecorder() {
return requestMetricsRecorder;
}

private static class AzureInputStream extends InputStream {
static class AzureInputStream extends InputStream {
private final CancellableRateLimitedFluxIterator<ByteBuf> cancellableRateLimitedFluxIterator;
private ByteBuf byteBuf;
private boolean closed;
private final ByteBufAllocator allocator;
private final String eTag;

private AzureInputStream(
final BlobAsyncClient client,
long rangeOffset,
long rangeLength,
long contentLength,
int maxRetries,
ByteBufAllocator allocator
ByteBufAllocator allocator,
@Nullable String ifMatchETag
) throws IOException {
rangeLength = Math.min(rangeLength, contentLength - rangeOffset);
final BlobRange range = new BlobRange(rangeOffset, rangeLength);
DownloadRetryOptions downloadRetryOptions = new DownloadRetryOptions().setMaxRetryRequests(maxRetries);
Flux<ByteBuf> byteBufFlux = client.downloadWithResponse(range, downloadRetryOptions, null, false)
final DownloadRetryOptions downloadRetryOptions = new DownloadRetryOptions().setMaxRetryRequests(maxRetries);
final BlobRequestConditions requestConditions = new BlobRequestConditions().setIfMatch(ifMatchETag);
final AtomicReference<String> eTagRef = new AtomicReference<>();
Flux<ByteBuf> byteBufFlux = client.downloadWithResponse(range, downloadRetryOptions, requestConditions, false)
.flux()
.concatMap(ResponseBase::getValue) // it's important to use concatMap, since flatMap doesn't provide ordering
// guarantees and that's not fun to debug :(
.concatMap(response -> {
eTagRef.set(response.getDeserializedHeaders().getETag());
return response.getValue();
}) // it's important to use concatMap, since flatMap doesn't provide ordering
// guarantees and that's not fun to debug :(
.filter(Objects::nonNull)
.map(this::copyBuffer); // Sadly we have to copy the buffers since the memory is released after the flux execution
// ends and we need that the byte buffer outlives that lifecycle. Since the SDK provides an
Expand All @@ -1112,6 +1128,9 @@ private AzureInputStream(
// blob doesn't exist
byteBufFlux.subscribe(cancellableRateLimitedFluxIterator);
getNextByteBuf();
assert eTagRef.get() != null : "eTag should have been set";
assert ifMatchETag == null || eTagRef.get().equals(ifMatchETag) : "eTag mismatch";
this.eTag = eTagRef.get();
}

private ByteBuf copyBuffer(ByteBuffer buffer) {
Expand Down Expand Up @@ -1173,6 +1192,10 @@ public void close() {
}
}

public String getETag() {
return eTag;
}

private void releaseByteBuf(ByteBuf buf) {
ReferenceCountUtil.safeRelease(buf);
this.byteBuf = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.repositories.azure;

import com.azure.core.exception.HttpResponseException;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.blobstore.OperationPurpose;
import org.elasticsearch.common.blobstore.RetryingInputStream;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.repositories.blobstore.RequestedRangeNotSatisfiedException;
import org.elasticsearch.rest.RestStatus;

import java.io.IOException;
import java.nio.file.NoSuchFileException;

public class AzureRetryingInputStream extends RetryingInputStream<String> {

protected AzureRetryingInputStream(AzureBlobStore azureBlobStore, OperationPurpose purpose, String blob, long position, Long length)
throws IOException {
super(
new AzureBlobStoreServices(azureBlobStore, purpose, blob),
purpose,
position,
length == null ? Long.MAX_VALUE - 1 : position + length
);
}

private record AzureBlobStoreServices(AzureBlobStore blobStore, OperationPurpose purpose, String blob)
implements
RetryingInputStream.BlobStoreServices<String> {

@Override
public InputStreamAtVersion<String> getInputStreamAtVersion(@Nullable String version, long start, long end) throws IOException {
try {
final Long length = end < Long.MAX_VALUE - 1 ? end - start : null;
final AzureBlobStore.AzureInputStream inputStream = blobStore.getInputStream(purpose, blob, start, length, version);
return new InputStreamAtVersion<>(inputStream, inputStream.getETag());
} catch (Exception e) {
if (ExceptionsHelper.unwrap(e, HttpResponseException.class) instanceof HttpResponseException httpResponseException) {
final var httpStatusCode = httpResponseException.getResponse().getStatusCode();
if (httpStatusCode == RestStatus.NOT_FOUND.getStatus()) {
throw new NoSuchFileException("Blob [" + blob + "] not found");
}
if (httpStatusCode == RestStatus.REQUESTED_RANGE_NOT_SATISFIED.getStatus()) {
throw new RequestedRangeNotSatisfiedException(blob, start, end == Long.MAX_VALUE - 1 ? -1 : end - start, e);
}
}
switch (e) {
case RuntimeException runtimeException -> throw runtimeException;
case IOException ioException -> throw ioException;
default -> throw new IOException("Unable to get input stream for blob [" + blob + "]", e);
}
}
}

@Override
public void onRetryStarted(String action) {
// No metrics for Azure
}

@Override
public void onRetrySucceeded(String action, long numberOfRetries) {
// No metrics for Azure
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

S3 has its own special metrics for these, we can probably make that consistent now, but I wonder if we want to do that in a separate PR to keep the volume down


@Override
public long getMeaningfulProgressSize() {
return Math.max(1L, blobStore.getReadChunkSize() / 100L);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This value seems kind-of arbitrary.

The Azure value calculates to about 320KiB, the GCS to 160KiB and the S3 one to 1MiB, they are all functions of various loosely-related thresholds. Perhaps it makes sense to make this a first-class setting and consistent across the CSPs?


@Override
public int getMaxRetries() {
return blobStore.getMaxReadRetries();
}

@Override
public String getBlobDescription() {
return blob;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import fixture.azure.AzureHttpHandler;

import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;

import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -43,6 +44,7 @@
import java.util.stream.Collectors;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.elasticsearch.repositories.blobstore.AbstractBlobContainerRetriesTestCase.randomRetryingPurpose;
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
import static org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase.randomBytes;
import static org.elasticsearch.test.hamcrest.OptionalMatchers.isPresentWith;
Expand Down Expand Up @@ -114,13 +116,73 @@ public void testReadBlobWithRetries() throws Exception {
});

final BlobContainer blobContainer = createBlobContainer(maxRetries);
try (InputStream inputStream = blobContainer.readBlob(randomPurpose(), "read_blob_max_retries")) {
try (InputStream inputStream = blobContainer.readBlob(randomRetryingPurpose(), "read_blob_max_retries")) {
assertArrayEquals(bytes, BytesReference.toBytes(Streams.readFully(inputStream)));
assertThat(countDownHead.isCountedDown(), is(true));
assertThat(countDownGet.isCountedDown(), is(true));
}
}

public void testReadBlobWithFailuresMidDownload() throws IOException {
final int responsesToSend = randomIntBetween(3, 5);
final AtomicInteger responseCounter = new AtomicInteger(responsesToSend);
final byte[] blobContents = randomBlobContent();
final String eTag = UUIDs.base64UUID();
httpServer.createContext("/account/container/read_blob_fail_mid_stream", exchange -> {
try {
Streams.readFully(exchange.getRequestBody());
if ("HEAD".equals(exchange.getRequestMethod())) {
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(blobContents.length));
exchange.getResponseHeaders().add("Content-Length", String.valueOf(blobContents.length));
exchange.getResponseHeaders().add("x-ms-blob-type", "blockblob");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
} else if ("GET".equals(exchange.getRequestMethod())) {
if (responseCounter.decrementAndGet() > 0) {
switch (randomIntBetween(1, 3)) {
case 1 -> {
final Integer rCode = randomFrom(
RestStatus.INTERNAL_SERVER_ERROR.getStatus(),
RestStatus.SERVICE_UNAVAILABLE.getStatus(),
RestStatus.TOO_MANY_REQUESTS.getStatus()
);
logger.info("---> sending error: {}", rCode);
exchange.sendResponseHeaders(rCode, -1);
}
case 2 -> logger.info("---> sending no response");
case 3 -> sendResponse(eTag, blobContents, exchange, true);
}
} else {
sendResponse(eTag, blobContents, exchange, false);
}
}
} finally {
exchange.close();
}
});

final BlobContainer blobContainer = createBlobContainer(responsesToSend * 2);
try (InputStream inputStream = blobContainer.readBlob(randomRetryingPurpose(), "read_blob_fail_mid_stream")) {
assertArrayEquals(blobContents, BytesReference.toBytes(Streams.readFully(inputStream)));
}
}

private void sendResponse(String eTag, byte[] blobContents, HttpExchange exchange, boolean partial) throws IOException {
final var ranges = getRanges(exchange);
final int start = ranges.v1().intValue();
final int end = partial ? randomIntBetween(start, ranges.v2().intValue()) : ranges.v2().intValue();
final var contents = Arrays.copyOfRange(blobContents, start, end + 1);

logger.info("---> responding to: {} -> {} (sending chunk of size {})", ranges.v1(), ranges.v2(), contents.length);
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(blobContents.length));
exchange.getResponseHeaders().add("Content-Length", String.valueOf(blobContents.length));
exchange.getResponseHeaders().add("x-ms-blob-type", "blockblob");
exchange.getResponseHeaders().add("ETag", eTag);
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), blobContents.length - ranges.v1().intValue());
exchange.getResponseBody().write(contents, 0, contents.length);
}

public void testReadRangeBlobWithRetries() throws Exception {
final int maxRetries = randomIntBetween(1, 5);
final CountDown countDownGet = new CountDown(maxRetries);
Expand Down Expand Up @@ -466,7 +528,7 @@ public void testRetryFromSecondaryLocationPolicies() throws Exception {
}

final BlobContainer blobContainer = createBlobContainer(maxRetries, secondaryHost, locationMode);
try (InputStream inputStream = blobContainer.readBlob(randomPurpose(), "read_blob_from_secondary")) {
try (InputStream inputStream = blobContainer.readBlob(randomRetryingPurpose(), "read_blob_from_secondary")) {
assertArrayEquals(bytes, BytesReference.toBytes(Streams.readFully(inputStream)));

// It does round robin, first tries on the primary, then on the secondary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

import static org.elasticsearch.repositories.azure.AzureStorageSettings.ACCOUNT_SETTING;
import static org.elasticsearch.repositories.azure.AzureStorageSettings.SAS_TOKEN_SETTING;
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
import static org.elasticsearch.repositories.blobstore.AbstractBlobContainerRetriesTestCase.randomRetryingPurpose;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
Expand Down Expand Up @@ -78,7 +78,7 @@ public void testSasTokenIsUsedAsProvidedInSettings() throws Exception {
});

final BlobContainer blobContainer = createBlobContainer(maxRetries, null, LocationMode.PRIMARY_ONLY, clientName, secureSettings);
try (InputStream inputStream = blobContainer.readBlob(randomPurpose(), "sas_test")) {
try (InputStream inputStream = blobContainer.readBlob(randomRetryingPurpose(), "sas_test")) {
assertArrayEquals(bytes, BytesReference.toBytes(Streams.readFully(inputStream)));
}
}
Expand Down
Loading