-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Use common retry logic for GCS #138553
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Use common retry logic for GCS #138553
Conversation
| .setRetryDelayMultiplier(options.getRetrySettings().getRetryDelayMultiplier()) | ||
| .setMaxRetryDelay(Duration.ofSeconds(1L)) | ||
| .setMaxAttempts(0) | ||
| .setJittered(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test originally configured retries to be time-based (i.e. no limit on the attempts, just keep retrying for some amount of time). I changed it to just make the retry intervals small and depend on the configured retry limits because we don't support time-based retries anymore.
| container.writeBlob(randomPurpose(), blobKey, new BytesArray(initialValue), true); | ||
|
|
||
| try (InputStream inputStream = container.readBlob(randomPurpose(), blobKey)) { | ||
| try (InputStream inputStream = container.readBlob(randomRetryingPurpose(), blobKey)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have to be careful where we use randomPurpose() now because some purposes no longer retry (e.g. REPOSITORY_ANALYSIS)
| @Override | ||
| public long getMeaningfulProgressSize() { | ||
| return Math.max(1L, GoogleCloudStorageBlobStore.SDK_DEFAULT_CHUNK_SIZE / 100L); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The choice of this value is somewhat arbitrary, open to suggestions of whether we should make this consistent across CSPs or use some other value here. SDK default chunk size is 16MB, so this is 160KB
| * will attempt to create a new one of these. If reading from it fails, it should not retry. | ||
| */ | ||
| protected abstract static class SingleAttemptInputStream extends InputStream { | ||
| protected static final class SingleAttemptInputStream<V> extends FilterInputStream { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the best approach for the SingleAttemptInputStream is to implement it as a decorator. This gives the CSPs more freedom in how they implement the single-attempt stream.
Specifically, if you want to do something on every read, it's much more work to extend FilterInputStream because the default implementations all delegate to the wrapped stream. If you extend InputStream you only need to implement int read() and the defaults are all implemented on top of that.
If we expect everyone to extend SingleAttemptInputStream we force everyone to extend whichever of the above that we extended. This is the inheritance issue I alluded to earlier due to InputStream being an abstract class rather than an interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that GCS's ContentLengthValidatingInputStream extends FilterInputStream which is a different base class than S3SingleAttemptInputStream. I think making S3SingleAttemptInputStream a FilterInputStream could work as well since it is effectively a delegate to ResponseInputStream. But it may be a problem again for Azure.
I am good to go with your suggestion. Can we maybe rename S3SingleAttemptInputStream to something no suggesting potential inheritance?
| PREFIX, | ||
| "max_retries", | ||
| (key) -> Setting.intSetting(key, 5, 0, Setting.Property.NodeScope) | ||
| ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We never configured number of retries for GCS previously. The default settings were for 6 attempts (aka 5 retries)
| private long currentOffset; | ||
| private boolean closed; | ||
| private Long lastGeneration; | ||
| private static final StorageRetryStrategy STORAGE_RETRY_STRATEGY = GoogleCloudStorageService.createStorageRetryStrategy(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The one we use is stateless, we might need to re-think this lifecycle if we switch to one that is not. You can't get it out of the client or StorageOptions as far as I could see.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know important this is. If necessary, I think we can store the original strategy object to the MeteredStorage object so that we can get it in this class?
| } | ||
| return n; | ||
| } catch (IOException e) { | ||
| throw StorageException.translate(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We translate these for consistency with the existing implementation. We retry anything when reading, consistent with the existing implementation, but when something goes wrong the translation might add some more context in the stack-trace.
…c_gcs # Conflicts: # modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java # modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java # server/src/test/java/org/elasticsearch/common/blobstore/RetryingInputStreamTests.java
… use_common_retry_logic_gcs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR refactors the GCS repository implementation to use common retry logic from RetryingInputStream, introducing blob version tracking for safe-resume functionality. This is the second step in breaking up and merging #136663, following S3's earlier adoption of the common retry pattern.
Key Changes:
- Generalized
RetryingInputStreamto support blob versioning with type parameter<V>for version tracking - Implemented GCS-specific retry logic using blob generation headers for safe-resume
- Added
RetryBehaviourenum to control GCS client retry configuration - Enhanced test utilities with
randomRetryingPurpose()andrandomFiniteRetryingPurpose()helpers
Reviewed changes
Copilot reviewed 21 out of 21 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
RetryingInputStream.java |
Made generic with version type parameter <V>, added StreamAction enum, moved SingleAttemptInputStream to use FilterInputStream, added isRetryableException() to BlobStoreServices interface |
GoogleCloudStorageRetryingInputStream.java |
Completely refactored to extend RetryingInputStream<Long>, using GCS generation headers for version tracking, delegates to GoogleCloudStorageBlobStoreServices implementation |
GoogleCloudStorageService.java |
Added RetryBehaviour enum for controlling client retry configuration, updated client caching to include retry behavior in cache key |
GoogleCloudStorageBlobStore.java |
Split client retrieval into client() and clientNoRetries() methods, added getMaxRetries() accessor |
GoogleCloudStorageClientSettings.java |
Added MAX_RETRIES_SETTING configuration option with default value of 5 |
S3RetryingInputStream.java |
Updated to use RetryingInputStream<Void> since S3 doesn't implement version tracking yet |
BlobStoreTestUtil.java |
Added randomRetryingPurpose() and randomFiniteRetryingPurpose() test utilities |
RetryingInputStreamTests.java |
Updated tests to use parameterized version type, added test for blob version tracking behavior |
| Various GCS test files | Updated to accommodate new RetryBehaviour parameter in client creation |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
Pinging @elastic/es-distributed-coordination (Team:Distributed Coordination) |
|
The PR has both labels of |
|
Hi @nicktindall, I've created a changelog YAML for you. |
@ywangd I think it's probably the former, because the |
ywangd
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I skimmed through production code changes. They mostly look good to me. I have some small comments.
The GCS retry has some quirkiness in that the openStream method is always retried default number of times (with RetryHelper.runWithRetries) regardless whether some of the retries are already consumed. I think this as a bug and the PR should fix it. In the meantime, I should we should preserve the inner retry plus outer retry as commentted.
| private long currentOffset; | ||
| private boolean closed; | ||
| private Long lastGeneration; | ||
| private static final StorageRetryStrategy STORAGE_RETRY_STRATEGY = GoogleCloudStorageService.createStorageRetryStrategy(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know important this is. If necessary, I think we can store the original strategy object to the MeteredStorage object so that we can get it in this class?
| try { | ||
| @Override | ||
| public SingleAttemptInputStream<Long> getInputStream(@Nullable Long lastGeneration, long start, long end) throws IOException { | ||
| final MeteredStorage client = blobStore.clientNoRetries(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure whether this is necessary for now. The current S3 implementation also has nested retries, inner layer with provided by the SDK and outer layer with our own implementation. That seems to be the existing behaviour of GCS retrying input stream as well. Maybe we should keep it as is for now. I am slightly concerned that this reduces number of total retries in production.
We can change it in future for all CSP together if it is necessary.
| * will attempt to create a new one of these. If reading from it fails, it should not retry. | ||
| */ | ||
| protected abstract static class SingleAttemptInputStream extends InputStream { | ||
| protected static final class SingleAttemptInputStream<V> extends FilterInputStream { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that GCS's ContentLengthValidatingInputStream extends FilterInputStream which is a different base class than S3SingleAttemptInputStream. I think making S3SingleAttemptInputStream a FilterInputStream could work as well since it is effectively a delegate to ResponseInputStream. But it may be a problem again for Azure.
I am good to go with your suggestion. Can we maybe rename S3SingleAttemptInputStream to something no suggesting potential inheritance?
This is the second step for breaking up and merging #136663
I chose to do GCS next because it introduces safe-resume (where we remember a version of the blob we were downloading so we can request specifically that one when we resume). This will mean less refactoring than if we'd done Azure first.
I didn't implement that logic for S3 although its trivial. I will do that in a subsequent change.