From 496538d2f753b35e6c015b6fecb1d5637fe7c5be Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Tue, 16 Sep 2025 19:05:23 +0000 Subject: [PATCH 1/3] fix: fix appendable upload finalization race condition When finalizing an appendable upload depending on how quickly gcs is acking bytes, we could run into a case where finish_write: true was sent before all bytes had been enqueued. Regression introduced in 2.57.0 --- ...pendableUnbufferedWritableByteChannel.java | 3 +- .../google/cloud/storage/BidiUploadState.java | 18 ++- ...ableUnbufferedWritableByteChannelTest.java | 123 ++++++++++++++++++ .../google/cloud/storage/BidiUploadTest.java | 72 +--------- .../cloud/storage/BidiUploadTestUtils.java | 99 ++++++++++++++ 5 files changed, 238 insertions(+), 77 deletions(-) create mode 100644 google-cloud-storage/src/test/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannelTest.java diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannel.java index 3910a672b2..8c45da06df 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannel.java @@ -145,6 +145,7 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength) th // those bytes to implicitly flag as dirty. rewindableContent.flagDirty(); + long remainingAfterPacking = Buffers.totalRemaining(srcs, srcsOffset, srcsLength); long bytesConsumed = 0; for (int i = 0, len = data.length, lastIdx = len - 1; i < len; i++) { ChunkSegment datum = data[i]; @@ -153,7 +154,7 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength) th boolean appended; if (i < lastIdx && !shouldFlush) { appended = stream.append(datum); - } else if (i == lastIdx && nextWriteShouldFinalize) { + } else if (i == lastIdx && remainingAfterPacking == 0 && nextWriteShouldFinalize) { appended = stream.appendAndFinalize(datum); } else { appended = stream.appendAndFlush(datum); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadState.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadState.java index 6d64c8b5e0..244391d4a0 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadState.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadState.java @@ -416,10 +416,10 @@ final boolean offer(ChunkSegmenter.@NonNull ChunkSegment datum) { try { requireNonNull(datum, "data must be non null"); validateCurrentStateIsOneOf(State.allNonTerminal); - checkNotFinalizing(); ByteString b = datum.getB(); - long availableCapacity = availableCapacity(); int size = b.size(); + checkNotFinalizing(size); + long availableCapacity = availableCapacity(); if (size <= availableCapacity) { Crc32cLengthKnown crc32c = datum.getCrc32c(); ChecksummedData.Builder checksummedData = ChecksummedData.newBuilder().setContent(b); @@ -449,7 +449,7 @@ public boolean finalFlush(long totalLength) { lock.lock(); try { validateCurrentStateIsOneOf(State.allNonTerminal); - checkNotFinalizing(); + checkNotFinalizing(0); checkArgument( totalLength == totalSentBytes, "(totalLength == totalSentBytes) (%s == %s)", @@ -490,7 +490,7 @@ final boolean offer(@NonNull BidiWriteObjectRequest e) { requireNonNull(e, "e must be non null"); validateCurrentStateIsOneOf(State.allNonTerminal); if (e.hasChecksummedData()) { - checkNotFinalizing(); + checkNotFinalizing(e.getChecksummedData().getContent().size()); } int size = e.getChecksummedData().getContent().size(); long availableCapacity = availableCapacity(); @@ -827,10 +827,16 @@ protected final void validateCurrentStateIsOneOf(State... allowed) { state); } - private void checkNotFinalizing() { + private void checkNotFinalizing(int size) { checkState( finishWriteOffset == -1, - "Attempting to append bytes even though finalization has previously been signaled."); + "Attempting to append bytes even though finalization has previously been signaled." + + " (finishWriteOffset: %s, totalSentBytes: %s, confirmedBytes: %s, size: %s)", + finishWriteOffset, + totalSentBytes, + confirmedBytes, + size + ); } protected final boolean internalOffer(BidiWriteObjectRequest e) { diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannelTest.java new file mode 100644 index 0000000000..6723b415f2 --- /dev/null +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannelTest.java @@ -0,0 +1,123 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import static com.google.cloud.storage.StorageV2ProtoUtils.fmtProto; +import static com.google.common.truth.Truth.assertThat; + +import com.google.api.core.SettableApiFuture; +import com.google.api.gax.grpc.GrpcCallContext; +import com.google.cloud.storage.BidiUploadState.AppendableUploadState; +import com.google.cloud.storage.ITAppendableUploadFakeTest.FakeStorage; +import com.google.cloud.storage.it.ChecksummedTestContent; +import com.google.common.collect.ImmutableList; +import com.google.storage.v2.BidiWriteObjectResponse; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.time.OffsetDateTime; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicLong; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class BidiAppendableUnbufferedWritableByteChannelTest { + private static final Logger LOGGER = LoggerFactory.getLogger(BidiAppendableUnbufferedWritableByteChannelTest.class); + @Rule public final TestName testName = new TestName(); + + @Test + public void appendAndFinalizeOnlyPerformedIfAllBytesConsumed() throws IOException { + ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); + ChecksummedTestContent ctc = ChecksummedTestContent.gen(27); + AppendableUploadState state = + BidiUploadState.appendableNew( + BidiUploadTest.appendRequestNew, + GrpcCallContext::createDefault, + 16, + SettableApiFuture.create(), + Crc32cValue.zero()); + AtomicLong finishWriteOffset = new AtomicLong(-1); + BidiUploadStreamingStream stream = new BidiUploadStreamingStream( + state, + executor, + BidiUploadTestUtils.adaptOnlySend(respond -> request -> executor.submit(() -> { + LOGGER.info("request = {}", fmtProto(request)); + switch ((int) request.getWriteOffset()) { + case 0: + respond.onResponse(BidiUploadTest.resourceWithSize(0)); + break; + case 4: + case 8: + // do not ack any bytes until we receive 16, this simulates latency on the bytes being + // ack'd. + break; + case 12: + respond.onResponse(BidiUploadTestUtils.incremental(8)); + break; + case 16: + respond.onResponse(BidiUploadTestUtils.incremental(12)); + break; + case 20: + respond.onResponse(BidiUploadTestUtils.incremental(16)); + break; + case 24: + BidiWriteObjectResponse.Builder b = BidiUploadTest.resourceFor(ctc).toBuilder(); + b.getResourceBuilder().setFinalizeTime(Conversions.grpc().timestampCodec.encode( + OffsetDateTime.now() + )); + respond.onResponse(b.build()); + break; + default: + respond.onError(FakeStorage.unexpectedRequest(request, ImmutableList.of())); + break; + } + if (request.getFinishWrite()) { + finishWriteOffset.set(request.getWriteOffset() + request.getChecksummedData().getContent().size()); + } + })), + 3, + RetryContext.neverRetry() + ); + ChunkSegmenter chunkSegmenter = new ChunkSegmenter( + Hasher.enabled(), + ByteStringStrategy.copy(), + 4, + 2 + ); + BidiAppendableUnbufferedWritableByteChannel channel = new BidiAppendableUnbufferedWritableByteChannel( + stream, + chunkSegmenter, + 4, + 0 + ); + + ByteBuffer buf = ctc.asByteBuffer(); + int written1 = channel.write(buf); + // fill up the outbound queue + assertThat(written1).isEqualTo(16); + + // asynchronously bytes will be ack'd 4 at a time, eventually there will be enough space in the + // outbound queue to allow writeAndClose to start consuming bytes. + channel.nextWriteShouldFinalize(); + int written2 = channel.writeAndClose(buf); + assertThat(written2).isEqualTo(11); + assertThat(finishWriteOffset.get()).isEqualTo(ctc.length()); + } +} diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiUploadTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiUploadTest.java index 52b6ebb5fe..fa33f8dced 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiUploadTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiUploadTest.java @@ -17,6 +17,8 @@ package com.google.cloud.storage; import static com.google.cloud.storage.BidiUploadState.appendableNew; +import static com.google.cloud.storage.BidiUploadTestUtils.adaptOnlySend; +import static com.google.cloud.storage.BidiUploadTestUtils.alwaysErrorBidiStreamingCallable; import static com.google.cloud.storage.BidiUploadTestUtils.createSegment; import static com.google.cloud.storage.BidiUploadTestUtils.finishAt; import static com.google.cloud.storage.BidiUploadTestUtils.incremental; @@ -39,11 +41,7 @@ import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.grpc.GrpcStatusCode; import com.google.api.gax.rpc.AbortedException; -import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.ApiExceptionFactory; -import com.google.api.gax.rpc.BidiStreamingCallable; -import com.google.api.gax.rpc.ClientStream; -import com.google.api.gax.rpc.ClientStreamReadyObserver; import com.google.api.gax.rpc.ErrorDetails; import com.google.api.gax.rpc.ResponseObserver; import com.google.api.gax.rpc.StreamController; @@ -64,7 +62,6 @@ import com.google.common.collect.Range; import com.google.protobuf.Any; import com.google.protobuf.ByteString; -import com.google.protobuf.Message; import com.google.protobuf.TextFormat; import com.google.rpc.Code; import com.google.storage.v2.AppendObjectSpec; @@ -1841,58 +1838,6 @@ public void recordError( } } - private static BidiStreamingCallable - alwaysErrorBidiStreamingCallable(Status status) { - return adaptOnlySend(respond -> request -> respond.onError(status.asRuntimeException())); - } - - private static BidiStreamingCallable adaptOnlySend( - Function, OnlySendClientStream> func) { - return adapt(func::apply); - } - - private static BidiStreamingCallable adapt( - Function, ClientStream> func) { - return adapt( - (respond, onReady, context) -> { - ClientStream clientStream = func.apply(respond); - StreamController controller = TestUtils.nullStreamController(); - respond.onStart(controller); - return clientStream; - }); - } - - /** - * BidiStreamingCallable isn't functional even though it's a single abstract method. - * - *

Define a method that can adapt a TriFunc as the required implementation of {@link - * BidiStreamingCallable#internalCall(ResponseObserver, ClientStreamReadyObserver, - * ApiCallContext)}. - * - *

Saves several lines of boilerplate in each test. - */ - private static BidiStreamingCallable adapt( - StreamingStreamTest.TriFunc< - ResponseObserver, - ClientStreamReadyObserver, - ApiCallContext, - ClientStream> - func) { - return new BidiStreamingCallable() { - @Override - public ClientStream internalCall( - ResponseObserver respond, - ClientStreamReadyObserver onReady, - ApiCallContext context) { - return func.apply(respond, onReady, context); - } - }; - } - - @FunctionalInterface - interface TriFunc { - R apply(A a, B b, C c); - } } public static final class BidiUploadStreamingStreamResponseObserverTest { @@ -2290,17 +2235,4 @@ static BidiWriteObjectRequest flushOffset(long offset) { return BidiWriteObjectResponse.newBuilder().setResource(f.apply(b)).build(); } - @FunctionalInterface - private interface OnlySendClientStream extends ClientStream { - @Override - default void closeSendWithError(Throwable t) {} - - @Override - default void closeSend() {} - - @Override - default boolean isSendReady() { - return true; - } - } } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiUploadTestUtils.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiUploadTestUtils.java index 827f1320d3..a1986c2723 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiUploadTestUtils.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiUploadTestUtils.java @@ -21,12 +21,19 @@ import com.google.api.gax.grpc.GrpcStatusCode; import com.google.api.gax.rpc.AbortedException; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.BidiStreamingCallable; +import com.google.api.gax.rpc.ClientStream; +import com.google.api.gax.rpc.ClientStreamReadyObserver; import com.google.api.gax.rpc.ErrorDetails; +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.StreamController; import com.google.cloud.storage.ChunkSegmenter.ChunkSegment; import com.google.cloud.storage.it.ChecksummedTestContent; import com.google.common.collect.ImmutableList; import com.google.protobuf.Any; import com.google.protobuf.ByteString; +import com.google.protobuf.Message; import com.google.protobuf.Timestamp; import com.google.rpc.Code; import com.google.storage.v2.BidiWriteObjectRedirectedError; @@ -39,7 +46,10 @@ import java.nio.ByteBuffer; import java.time.OffsetDateTime; import java.util.List; +import java.util.function.Function; import org.checkerframework.checker.nullness.qual.NonNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; final class BidiUploadTestUtils { @@ -122,4 +132,93 @@ static BidiWriteObjectRequest withFlushAndStateLookup(BidiWriteObjectRequest ori static Timestamp timestampNow() { return Conversions.grpc().timestampCodec.encode(OffsetDateTime.now()); } + + static BidiStreamingCallable + alwaysErrorBidiStreamingCallable(Status status) { + return adaptOnlySend(respond -> request -> respond.onError(status.asRuntimeException())); + } + + static BidiStreamingCallable adaptOnlySend( + Function, OnlySendClientStream> func) { + return adapt(func::apply); + } + + static BidiStreamingCallable adapt( + Function, ClientStream> func) { + return adapt( + (respond, onReady, context) -> { + ClientStream clientStream = func.apply(respond); + StreamController controller = TestUtils.nullStreamController(); + respond.onStart(controller); + return clientStream; + }); + } + + private static final Logger LOGGER = LoggerFactory.getLogger(BidiUploadTestUtils.class); + /** + * BidiStreamingCallable isn't functional even though it's a single abstract method. + * + *

Define a method that can adapt a TriFunc as the required implementation of {@link + * BidiStreamingCallable#internalCall(ResponseObserver, ClientStreamReadyObserver, + * ApiCallContext)}. + * + *

Saves several lines of boilerplate in each test. + */ + static BidiStreamingCallable adapt( + TriFunc< + ResponseObserver, + ClientStreamReadyObserver, + ApiCallContext, + ClientStream> + func) { + return new BidiStreamingCallable() { + @Override + public ClientStream internalCall( + ResponseObserver respond, + ClientStreamReadyObserver onReady, + ApiCallContext context) { + return func.apply(new ResponseObserver() { + @Override + public void onStart(StreamController controller) { + respond.onStart(controller); + } + + @Override + public void onResponse(ResT response) { + LOGGER.info("response = {}", fmtProto(response)); + respond.onResponse(response); + } + + @Override + public void onError(Throwable t) { + respond.onError(t); + } + + @Override + public void onComplete() { + respond.onComplete(); + } + }, onReady, context); + } + }; + } + + @FunctionalInterface + interface TriFunc { + R apply(A a, B b, C c); + } + + @FunctionalInterface + interface OnlySendClientStream extends ClientStream { + @Override + default void closeSendWithError(Throwable t) {} + + @Override + default void closeSend() {} + + @Override + default boolean isSendReady() { + return true; + } + } } From a2907b53bcf92c26cb4f0b064917bd7cbb7f3cc8 Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Tue, 16 Sep 2025 19:46:58 +0000 Subject: [PATCH 2/3] chore: mvn-fmt --- .../google/cloud/storage/BidiUploadState.java | 3 +- ...ableUnbufferedWritableByteChannelTest.java | 111 +++++++++--------- .../google/cloud/storage/BidiUploadTest.java | 2 - .../cloud/storage/BidiUploadTestUtils.java | 34 +----- 4 files changed, 60 insertions(+), 90 deletions(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadState.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadState.java index 244391d4a0..7894e35f99 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadState.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadState.java @@ -835,8 +835,7 @@ private void checkNotFinalizing(int size) { finishWriteOffset, totalSentBytes, confirmedBytes, - size - ); + size); } protected final boolean internalOffer(BidiWriteObjectRequest e) { diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannelTest.java index 6723b415f2..4c571ab0dc 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannelTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannelTest.java @@ -16,7 +16,6 @@ package com.google.cloud.storage; -import static com.google.cloud.storage.StorageV2ProtoUtils.fmtProto; import static com.google.common.truth.Truth.assertThat; import com.google.api.core.SettableApiFuture; @@ -35,11 +34,8 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public final class BidiAppendableUnbufferedWritableByteChannelTest { - private static final Logger LOGGER = LoggerFactory.getLogger(BidiAppendableUnbufferedWritableByteChannelTest.class); @Rule public final TestName testName = new TestName(); @Test @@ -54,59 +50,60 @@ public void appendAndFinalizeOnlyPerformedIfAllBytesConsumed() throws IOExceptio SettableApiFuture.create(), Crc32cValue.zero()); AtomicLong finishWriteOffset = new AtomicLong(-1); - BidiUploadStreamingStream stream = new BidiUploadStreamingStream( - state, - executor, - BidiUploadTestUtils.adaptOnlySend(respond -> request -> executor.submit(() -> { - LOGGER.info("request = {}", fmtProto(request)); - switch ((int) request.getWriteOffset()) { - case 0: - respond.onResponse(BidiUploadTest.resourceWithSize(0)); - break; - case 4: - case 8: - // do not ack any bytes until we receive 16, this simulates latency on the bytes being - // ack'd. - break; - case 12: - respond.onResponse(BidiUploadTestUtils.incremental(8)); - break; - case 16: - respond.onResponse(BidiUploadTestUtils.incremental(12)); - break; - case 20: - respond.onResponse(BidiUploadTestUtils.incremental(16)); - break; - case 24: - BidiWriteObjectResponse.Builder b = BidiUploadTest.resourceFor(ctc).toBuilder(); - b.getResourceBuilder().setFinalizeTime(Conversions.grpc().timestampCodec.encode( - OffsetDateTime.now() - )); - respond.onResponse(b.build()); - break; - default: - respond.onError(FakeStorage.unexpectedRequest(request, ImmutableList.of())); - break; - } - if (request.getFinishWrite()) { - finishWriteOffset.set(request.getWriteOffset() + request.getChecksummedData().getContent().size()); - } - })), - 3, - RetryContext.neverRetry() - ); - ChunkSegmenter chunkSegmenter = new ChunkSegmenter( - Hasher.enabled(), - ByteStringStrategy.copy(), - 4, - 2 - ); - BidiAppendableUnbufferedWritableByteChannel channel = new BidiAppendableUnbufferedWritableByteChannel( - stream, - chunkSegmenter, - 4, - 0 - ); + BidiUploadStreamingStream stream = + new BidiUploadStreamingStream( + state, + executor, + BidiUploadTestUtils.adaptOnlySend( + respond -> + request -> + executor.submit( + () -> { + switch ((int) request.getWriteOffset()) { + case 0: + respond.onResponse(BidiUploadTest.resourceWithSize(0)); + break; + case 4: + case 8: + // do not ack any bytes until we receive 16, this simulates + // latency on the bytes being ack'd. + break; + case 12: + respond.onResponse(BidiUploadTestUtils.incremental(8)); + break; + case 16: + respond.onResponse(BidiUploadTestUtils.incremental(12)); + break; + case 20: + respond.onResponse(BidiUploadTestUtils.incremental(16)); + break; + case 24: + BidiWriteObjectResponse.Builder b = + BidiUploadTest.resourceFor(ctc).toBuilder(); + b.getResourceBuilder() + .setFinalizeTime( + Conversions.grpc() + .timestampCodec + .encode(OffsetDateTime.now())); + respond.onResponse(b.build()); + break; + default: + respond.onError( + FakeStorage.unexpectedRequest(request, ImmutableList.of())); + break; + } + if (request.getFinishWrite()) { + finishWriteOffset.set( + request.getWriteOffset() + + request.getChecksummedData().getContent().size()); + } + })), + 3, + RetryContext.neverRetry()); + ChunkSegmenter chunkSegmenter = + new ChunkSegmenter(Hasher.enabled(), ByteStringStrategy.copy(), 4, 2); + BidiAppendableUnbufferedWritableByteChannel channel = + new BidiAppendableUnbufferedWritableByteChannel(stream, chunkSegmenter, 4, 0); ByteBuffer buf = ctc.asByteBuffer(); int written1 = channel.write(buf); diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiUploadTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiUploadTest.java index fa33f8dced..7e00427fb4 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiUploadTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiUploadTest.java @@ -1837,7 +1837,6 @@ public void recordError( exec1.shutdownNow(); } } - } public static final class BidiUploadStreamingStreamResponseObserverTest { @@ -2234,5 +2233,4 @@ static BidiWriteObjectRequest flushOffset(long offset) { } return BidiWriteObjectResponse.newBuilder().setResource(f.apply(b)).build(); } - } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiUploadTestUtils.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiUploadTestUtils.java index a1986c2723..110fdd4510 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiUploadTestUtils.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiUploadTestUtils.java @@ -48,8 +48,6 @@ import java.util.List; import java.util.function.Function; import org.checkerframework.checker.nullness.qual.NonNull; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; final class BidiUploadTestUtils { @@ -154,7 +152,6 @@ static BidiStreamingCallable adapt( }); } - private static final Logger LOGGER = LoggerFactory.getLogger(BidiUploadTestUtils.class); /** * BidiStreamingCallable isn't functional even though it's a single abstract method. * @@ -166,10 +163,10 @@ static BidiStreamingCallable adapt( */ static BidiStreamingCallable adapt( TriFunc< - ResponseObserver, - ClientStreamReadyObserver, - ApiCallContext, - ClientStream> + ResponseObserver, + ClientStreamReadyObserver, + ApiCallContext, + ClientStream> func) { return new BidiStreamingCallable() { @Override @@ -177,28 +174,7 @@ public ClientStream internalCall( ResponseObserver respond, ClientStreamReadyObserver onReady, ApiCallContext context) { - return func.apply(new ResponseObserver() { - @Override - public void onStart(StreamController controller) { - respond.onStart(controller); - } - - @Override - public void onResponse(ResT response) { - LOGGER.info("response = {}", fmtProto(response)); - respond.onResponse(response); - } - - @Override - public void onError(Throwable t) { - respond.onError(t); - } - - @Override - public void onComplete() { - respond.onComplete(); - } - }, onReady, context); + return func.apply(respond, onReady, context); } }; } From 82ae5de6cb5c519ce94dde7c1b1bc3b1169e1e3d Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Tue, 16 Sep 2025 20:25:09 +0000 Subject: [PATCH 3/3] chore: move finish_write offset tracking before response handling --- ...ableUnbufferedWritableByteChannelTest.java | 85 ++++++++++--------- 1 file changed, 43 insertions(+), 42 deletions(-) diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannelTest.java index 4c571ab0dc..0364ddaf18 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannelTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannelTest.java @@ -56,48 +56,49 @@ public void appendAndFinalizeOnlyPerformedIfAllBytesConsumed() throws IOExceptio executor, BidiUploadTestUtils.adaptOnlySend( respond -> - request -> - executor.submit( - () -> { - switch ((int) request.getWriteOffset()) { - case 0: - respond.onResponse(BidiUploadTest.resourceWithSize(0)); - break; - case 4: - case 8: - // do not ack any bytes until we receive 16, this simulates - // latency on the bytes being ack'd. - break; - case 12: - respond.onResponse(BidiUploadTestUtils.incremental(8)); - break; - case 16: - respond.onResponse(BidiUploadTestUtils.incremental(12)); - break; - case 20: - respond.onResponse(BidiUploadTestUtils.incremental(16)); - break; - case 24: - BidiWriteObjectResponse.Builder b = - BidiUploadTest.resourceFor(ctc).toBuilder(); - b.getResourceBuilder() - .setFinalizeTime( - Conversions.grpc() - .timestampCodec - .encode(OffsetDateTime.now())); - respond.onResponse(b.build()); - break; - default: - respond.onError( - FakeStorage.unexpectedRequest(request, ImmutableList.of())); - break; - } - if (request.getFinishWrite()) { - finishWriteOffset.set( - request.getWriteOffset() - + request.getChecksummedData().getContent().size()); - } - })), + request -> { + if (request.getFinishWrite()) { + finishWriteOffset.set( + request.getWriteOffset() + + request.getChecksummedData().getContent().size()); + } + executor.submit( + () -> { + switch ((int) request.getWriteOffset()) { + case 0: + respond.onResponse(BidiUploadTest.resourceWithSize(0)); + break; + case 4: + case 8: + // do not ack any bytes until we receive 16, this simulates + // latency on the bytes being ack'd. + break; + case 12: + respond.onResponse(BidiUploadTestUtils.incremental(8)); + break; + case 16: + respond.onResponse(BidiUploadTestUtils.incremental(12)); + break; + case 20: + respond.onResponse(BidiUploadTestUtils.incremental(16)); + break; + case 24: + BidiWriteObjectResponse.Builder b = + BidiUploadTest.resourceFor(ctc).toBuilder(); + b.getResourceBuilder() + .setFinalizeTime( + Conversions.grpc() + .timestampCodec + .encode(OffsetDateTime.now())); + respond.onResponse(b.build()); + break; + default: + respond.onError( + FakeStorage.unexpectedRequest(request, ImmutableList.of())); + break; + } + }); + }), 3, RetryContext.neverRetry()); ChunkSegmenter chunkSegmenter =