diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannel.java index 3910a672b2..8c45da06df 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannel.java @@ -145,6 +145,7 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength) th // those bytes to implicitly flag as dirty. rewindableContent.flagDirty(); + long remainingAfterPacking = Buffers.totalRemaining(srcs, srcsOffset, srcsLength); long bytesConsumed = 0; for (int i = 0, len = data.length, lastIdx = len - 1; i < len; i++) { ChunkSegment datum = data[i]; @@ -153,7 +154,7 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength) th boolean appended; if (i < lastIdx && !shouldFlush) { appended = stream.append(datum); - } else if (i == lastIdx && nextWriteShouldFinalize) { + } else if (i == lastIdx && remainingAfterPacking == 0 && nextWriteShouldFinalize) { appended = stream.appendAndFinalize(datum); } else { appended = stream.appendAndFlush(datum); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadState.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadState.java index 6d64c8b5e0..7894e35f99 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadState.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadState.java @@ -416,10 +416,10 @@ final boolean offer(ChunkSegmenter.@NonNull ChunkSegment datum) { try { requireNonNull(datum, "data must be non null"); validateCurrentStateIsOneOf(State.allNonTerminal); - checkNotFinalizing(); ByteString b = datum.getB(); - long availableCapacity = availableCapacity(); int size = b.size(); + checkNotFinalizing(size); + long availableCapacity = availableCapacity(); if (size <= availableCapacity) { Crc32cLengthKnown crc32c = datum.getCrc32c(); ChecksummedData.Builder checksummedData = ChecksummedData.newBuilder().setContent(b); @@ -449,7 +449,7 @@ public boolean finalFlush(long totalLength) { lock.lock(); try { validateCurrentStateIsOneOf(State.allNonTerminal); - checkNotFinalizing(); + checkNotFinalizing(0); checkArgument( totalLength == totalSentBytes, "(totalLength == totalSentBytes) (%s == %s)", @@ -490,7 +490,7 @@ final boolean offer(@NonNull BidiWriteObjectRequest e) { requireNonNull(e, "e must be non null"); validateCurrentStateIsOneOf(State.allNonTerminal); if (e.hasChecksummedData()) { - checkNotFinalizing(); + checkNotFinalizing(e.getChecksummedData().getContent().size()); } int size = e.getChecksummedData().getContent().size(); long availableCapacity = availableCapacity(); @@ -827,10 +827,15 @@ protected final void validateCurrentStateIsOneOf(State... allowed) { state); } - private void checkNotFinalizing() { + private void checkNotFinalizing(int size) { checkState( finishWriteOffset == -1, - "Attempting to append bytes even though finalization has previously been signaled."); + "Attempting to append bytes even though finalization has previously been signaled." + + " (finishWriteOffset: %s, totalSentBytes: %s, confirmedBytes: %s, size: %s)", + finishWriteOffset, + totalSentBytes, + confirmedBytes, + size); } protected final boolean internalOffer(BidiWriteObjectRequest e) { diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannelTest.java new file mode 100644 index 0000000000..0364ddaf18 --- /dev/null +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannelTest.java @@ -0,0 +1,121 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.api.core.SettableApiFuture; +import com.google.api.gax.grpc.GrpcCallContext; +import com.google.cloud.storage.BidiUploadState.AppendableUploadState; +import com.google.cloud.storage.ITAppendableUploadFakeTest.FakeStorage; +import com.google.cloud.storage.it.ChecksummedTestContent; +import com.google.common.collect.ImmutableList; +import com.google.storage.v2.BidiWriteObjectResponse; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.time.OffsetDateTime; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicLong; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +public final class BidiAppendableUnbufferedWritableByteChannelTest { + @Rule public final TestName testName = new TestName(); + + @Test + public void appendAndFinalizeOnlyPerformedIfAllBytesConsumed() throws IOException { + ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); + ChecksummedTestContent ctc = ChecksummedTestContent.gen(27); + AppendableUploadState state = + BidiUploadState.appendableNew( + BidiUploadTest.appendRequestNew, + GrpcCallContext::createDefault, + 16, + SettableApiFuture.create(), + Crc32cValue.zero()); + AtomicLong finishWriteOffset = new AtomicLong(-1); + BidiUploadStreamingStream stream = + new BidiUploadStreamingStream( + state, + executor, + BidiUploadTestUtils.adaptOnlySend( + respond -> + request -> { + if (request.getFinishWrite()) { + finishWriteOffset.set( + request.getWriteOffset() + + request.getChecksummedData().getContent().size()); + } + executor.submit( + () -> { + switch ((int) request.getWriteOffset()) { + case 0: + respond.onResponse(BidiUploadTest.resourceWithSize(0)); + break; + case 4: + case 8: + // do not ack any bytes until we receive 16, this simulates + // latency on the bytes being ack'd. + break; + case 12: + respond.onResponse(BidiUploadTestUtils.incremental(8)); + break; + case 16: + respond.onResponse(BidiUploadTestUtils.incremental(12)); + break; + case 20: + respond.onResponse(BidiUploadTestUtils.incremental(16)); + break; + case 24: + BidiWriteObjectResponse.Builder b = + BidiUploadTest.resourceFor(ctc).toBuilder(); + b.getResourceBuilder() + .setFinalizeTime( + Conversions.grpc() + .timestampCodec + .encode(OffsetDateTime.now())); + respond.onResponse(b.build()); + break; + default: + respond.onError( + FakeStorage.unexpectedRequest(request, ImmutableList.of())); + break; + } + }); + }), + 3, + RetryContext.neverRetry()); + ChunkSegmenter chunkSegmenter = + new ChunkSegmenter(Hasher.enabled(), ByteStringStrategy.copy(), 4, 2); + BidiAppendableUnbufferedWritableByteChannel channel = + new BidiAppendableUnbufferedWritableByteChannel(stream, chunkSegmenter, 4, 0); + + ByteBuffer buf = ctc.asByteBuffer(); + int written1 = channel.write(buf); + // fill up the outbound queue + assertThat(written1).isEqualTo(16); + + // asynchronously bytes will be ack'd 4 at a time, eventually there will be enough space in the + // outbound queue to allow writeAndClose to start consuming bytes. + channel.nextWriteShouldFinalize(); + int written2 = channel.writeAndClose(buf); + assertThat(written2).isEqualTo(11); + assertThat(finishWriteOffset.get()).isEqualTo(ctc.length()); + } +} diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiUploadTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiUploadTest.java index 52b6ebb5fe..7e00427fb4 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiUploadTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiUploadTest.java @@ -17,6 +17,8 @@ package com.google.cloud.storage; import static com.google.cloud.storage.BidiUploadState.appendableNew; +import static com.google.cloud.storage.BidiUploadTestUtils.adaptOnlySend; +import static com.google.cloud.storage.BidiUploadTestUtils.alwaysErrorBidiStreamingCallable; import static com.google.cloud.storage.BidiUploadTestUtils.createSegment; import static com.google.cloud.storage.BidiUploadTestUtils.finishAt; import static com.google.cloud.storage.BidiUploadTestUtils.incremental; @@ -39,11 +41,7 @@ import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.grpc.GrpcStatusCode; import com.google.api.gax.rpc.AbortedException; -import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.ApiExceptionFactory; -import com.google.api.gax.rpc.BidiStreamingCallable; -import com.google.api.gax.rpc.ClientStream; -import com.google.api.gax.rpc.ClientStreamReadyObserver; import com.google.api.gax.rpc.ErrorDetails; import com.google.api.gax.rpc.ResponseObserver; import com.google.api.gax.rpc.StreamController; @@ -64,7 +62,6 @@ import com.google.common.collect.Range; import com.google.protobuf.Any; import com.google.protobuf.ByteString; -import com.google.protobuf.Message; import com.google.protobuf.TextFormat; import com.google.rpc.Code; import com.google.storage.v2.AppendObjectSpec; @@ -1840,59 +1837,6 @@ public void recordError( exec1.shutdownNow(); } } - - private static BidiStreamingCallable - alwaysErrorBidiStreamingCallable(Status status) { - return adaptOnlySend(respond -> request -> respond.onError(status.asRuntimeException())); - } - - private static BidiStreamingCallable adaptOnlySend( - Function, OnlySendClientStream> func) { - return adapt(func::apply); - } - - private static BidiStreamingCallable adapt( - Function, ClientStream> func) { - return adapt( - (respond, onReady, context) -> { - ClientStream clientStream = func.apply(respond); - StreamController controller = TestUtils.nullStreamController(); - respond.onStart(controller); - return clientStream; - }); - } - - /** - * BidiStreamingCallable isn't functional even though it's a single abstract method. - * - *

Define a method that can adapt a TriFunc as the required implementation of {@link - * BidiStreamingCallable#internalCall(ResponseObserver, ClientStreamReadyObserver, - * ApiCallContext)}. - * - *

Saves several lines of boilerplate in each test. - */ - private static BidiStreamingCallable adapt( - StreamingStreamTest.TriFunc< - ResponseObserver, - ClientStreamReadyObserver, - ApiCallContext, - ClientStream> - func) { - return new BidiStreamingCallable() { - @Override - public ClientStream internalCall( - ResponseObserver respond, - ClientStreamReadyObserver onReady, - ApiCallContext context) { - return func.apply(respond, onReady, context); - } - }; - } - - @FunctionalInterface - interface TriFunc { - R apply(A a, B b, C c); - } } public static final class BidiUploadStreamingStreamResponseObserverTest { @@ -2289,18 +2233,4 @@ static BidiWriteObjectRequest flushOffset(long offset) { } return BidiWriteObjectResponse.newBuilder().setResource(f.apply(b)).build(); } - - @FunctionalInterface - private interface OnlySendClientStream extends ClientStream { - @Override - default void closeSendWithError(Throwable t) {} - - @Override - default void closeSend() {} - - @Override - default boolean isSendReady() { - return true; - } - } } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiUploadTestUtils.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiUploadTestUtils.java index 827f1320d3..110fdd4510 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiUploadTestUtils.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiUploadTestUtils.java @@ -21,12 +21,19 @@ import com.google.api.gax.grpc.GrpcStatusCode; import com.google.api.gax.rpc.AbortedException; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.BidiStreamingCallable; +import com.google.api.gax.rpc.ClientStream; +import com.google.api.gax.rpc.ClientStreamReadyObserver; import com.google.api.gax.rpc.ErrorDetails; +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.StreamController; import com.google.cloud.storage.ChunkSegmenter.ChunkSegment; import com.google.cloud.storage.it.ChecksummedTestContent; import com.google.common.collect.ImmutableList; import com.google.protobuf.Any; import com.google.protobuf.ByteString; +import com.google.protobuf.Message; import com.google.protobuf.Timestamp; import com.google.rpc.Code; import com.google.storage.v2.BidiWriteObjectRedirectedError; @@ -39,6 +46,7 @@ import java.nio.ByteBuffer; import java.time.OffsetDateTime; import java.util.List; +import java.util.function.Function; import org.checkerframework.checker.nullness.qual.NonNull; final class BidiUploadTestUtils { @@ -122,4 +130,71 @@ static BidiWriteObjectRequest withFlushAndStateLookup(BidiWriteObjectRequest ori static Timestamp timestampNow() { return Conversions.grpc().timestampCodec.encode(OffsetDateTime.now()); } + + static BidiStreamingCallable + alwaysErrorBidiStreamingCallable(Status status) { + return adaptOnlySend(respond -> request -> respond.onError(status.asRuntimeException())); + } + + static BidiStreamingCallable adaptOnlySend( + Function, OnlySendClientStream> func) { + return adapt(func::apply); + } + + static BidiStreamingCallable adapt( + Function, ClientStream> func) { + return adapt( + (respond, onReady, context) -> { + ClientStream clientStream = func.apply(respond); + StreamController controller = TestUtils.nullStreamController(); + respond.onStart(controller); + return clientStream; + }); + } + + /** + * BidiStreamingCallable isn't functional even though it's a single abstract method. + * + *

Define a method that can adapt a TriFunc as the required implementation of {@link + * BidiStreamingCallable#internalCall(ResponseObserver, ClientStreamReadyObserver, + * ApiCallContext)}. + * + *

Saves several lines of boilerplate in each test. + */ + static BidiStreamingCallable adapt( + TriFunc< + ResponseObserver, + ClientStreamReadyObserver, + ApiCallContext, + ClientStream> + func) { + return new BidiStreamingCallable() { + @Override + public ClientStream internalCall( + ResponseObserver respond, + ClientStreamReadyObserver onReady, + ApiCallContext context) { + return func.apply(respond, onReady, context); + } + }; + } + + @FunctionalInterface + interface TriFunc { + R apply(A a, B b, C c); + } + + @FunctionalInterface + interface OnlySendClientStream extends ClientStream { + @Override + default void closeSendWithError(Throwable t) {} + + @Override + default void closeSend() {} + + @Override + default boolean isSendReady() { + return true; + } + } }