From ea3f020206da5a75f92fc81c2d593df2ae72200b Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Mon, 15 Dec 2025 16:47:35 -0500 Subject: [PATCH] fix: update appendable upload retry logic to be able to more gracefully handle slow uploads Prior to this change, the retry context for an appendable upload would start its retry timeout clock at stream opening time, if an upload is proceeding slowly and a retryable error is delivered the retry might not retry even if there had been previous successful incremental response messages more recently. This change updates the logic to reset the retry timeout clock each time a successful response message is received. This maps logically to what is done with resumable uploads today where the timer is live for each PUT. --- .../storage/BidiUploadStreamingStream.java | 2 + .../google/cloud/storage/BidiUploadTest.java | 39 +++++++++++ .../storage/ITAppendableUploadFakeTest.java | 66 +++++++++++++++++++ 3 files changed, 107 insertions(+) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadStreamingStream.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadStreamingStream.java index 6da243e541..f6b9ae399e 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadStreamingStream.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadStreamingStream.java @@ -408,6 +408,8 @@ public void onResponse(BidiWriteObjectResponse response) { @Nullable StorageException se = state.onResponse(response); if (se != null) { retryContext.recordError(se, onSuccess, onFailure); + } else { + retryContext.reset(); } } catch (Throwable t) { // catch an error that might happen while processing and forward it to our retry context diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiUploadTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiUploadTest.java index 7e00427fb4..2774981ecd 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiUploadTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiUploadTest.java @@ -2195,6 +2195,45 @@ public void recordError( assertThat(recordErrorCalled.get()).isTrue(); } + + @Test + public void onResponse_resetsRetryContextToEnsureRetriesArePossibleForLongWrites() { + AtomicBoolean resetCalled = new AtomicBoolean(false); + AtomicBoolean recordErrorCalled = new AtomicBoolean(false); + StreamingResponseObserver obs = + new StreamingResponseObserver( + new BidiUploadState(name.getMethodName()) { + @Override + StorageException onResponse(BidiWriteObjectResponse response) { + return null; + } + }, + new RetryContext() { + @Override + public boolean inBackoff() { + return false; + } + + @Override + public void reset() { + resetCalled.set(true); + } + + @Override + public void recordError( + T t, OnSuccess onSuccess, OnFailure onFailure) { + recordErrorCalled.set(true); + } + }, + RetryContextTest.failOnSuccess(), + RetryContextTest.failOnFailure()); + + obs.onStart(TestUtils.nullStreamController()); + obs.onResponse(resourceWithSize(0)); + + assertThat(resetCalled.get()).isTrue(); + assertThat(recordErrorCalled.get()).isFalse(); + } } static BidiWriteObjectRequest flushOffset(long offset) { diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITAppendableUploadFakeTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITAppendableUploadFakeTest.java index b6a549aa98..78d07c4d94 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITAppendableUploadFakeTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITAppendableUploadFakeTest.java @@ -921,6 +921,68 @@ public void crc32cWorks() throws Exception { } } + /** + * If a stream is held open for an extended period (i.e. longer than the configured retry timeout) + * and the server returns an error, we want to make sure the currently pending request is able to + * be retried. To accomplish this, the retry context needs to reset it's attempt elapsed timer + * each time a successful response from the server is received. + * + *

This test simulates (using our {@link TestApiClock} the server pausing 60 seconds before + * delivering an ACK. After the ACK, we raise an Unavailable error, the client's retries should be + * able to handle this and pick up where it left off. + */ + @Test + public void + receivingASuccessfulMessageOnTheStreamShouldResetTheElapsedTimerForRetryBudgetCalculation() + throws Exception { + + TestApiClock testClock = TestApiClock.of(0, TestApiClock.addExact(Duration.ofSeconds(1))); + FakeStorage fake = + FakeStorage.of( + ImmutableMap.of( + flush(open_abc), + respond -> respond.onNext(res_abc), + flush(def), + respond -> { + // when receiving the second message, simulate it taking one minute to process + testClock.advance(Duration.ofMinutes(1)); + // then return the incremental response before erroring with a retryable error + respond.onNext(incrementalResponse(6)); + respond.onError(TestUtils.apiException(Status.Code.UNAVAILABLE, "Unavailable")); + }, + reconnect, + respond -> { + BidiWriteObjectResponse.Builder b = res_abc.toBuilder(); + b.getResourceBuilder() + .setSize(6) + .setChecksums( + ObjectChecksums.newBuilder() + .setCrc32C(content.slice(0, 6).getCrc32c()) + .build()); + respond.onNext(b.build()); + }, + flush(ghi), + respond -> respond.onNext(incrementalResponse(9)), + j_finish, + respond -> respond.onNext(resource_10))); + try (FakeServer fakeServer = FakeServer.of(fake); + Storage storage = + fakeServer.getGrpcStorageOptions().toBuilder() + .setClock(testClock) + .build() + .getService()) { + BlobId id = BlobId.of("b", "o"); + + BlobAppendableUpload upload = + storage.blobAppendableUpload(BlobInfo.newBuilder(id).build(), UPLOAD_CONFIG); + try (AppendableUploadWriteableByteChannel channel = upload.open()) { + StorageChannelUtils.blockingEmptyTo(content.asByteBuffer(), channel); + } + ApiFuture result = upload.getResult(); + result.get(5, TimeUnit.SECONDS); + } + } + private static Consumer> maxRetries( @NonNull BidiWriteObjectRequest req, Map<@NonNull BidiWriteObjectRequest, Integer> retryMap, @@ -1020,6 +1082,10 @@ private static BidiWriteObjectRequest finishMessage(long offset) { return BidiWriteObjectRequest.newBuilder().setWriteOffset(offset).setFinishWrite(true).build(); } + private static BidiWriteObjectRequest flush(BidiWriteObjectRequest req) { + return req.toBuilder().setStateLookup(true).setFlush(true).build(); + } + private static void runTestFlushMultipleSegments(FakeStorage fake) throws Exception { try (FakeServer fakeServer = FakeServer.of(fake); GrpcStorageImpl storage =