From 622d41f556c65319eabadbe92f8b56edd3fb0d0a Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Fri, 10 Oct 2025 15:31:42 -0400 Subject: [PATCH] fix: update BlobReadSession channels to not implicitly close once EOF is observed This allows a channel to be read, and EOF(-1) returned multiple times rather than receiving a ClosedChannelException after the first EOF is returned. --- .../BaseObjectReadSessionStreamRead.java | 18 +-- .../com/google/cloud/storage/GrpcUtils.java | 3 + .../ObjectReadSessionStreamReadTest.java | 28 +---- .../it/ITReadableByteChannelBehaviorTest.java | 104 ++++++++++++++++++ 4 files changed, 121 insertions(+), 32 deletions(-) create mode 100644 google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITReadableByteChannelBehaviorTest.java diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseObjectReadSessionStreamRead.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseObjectReadSessionStreamRead.java index 3ad80a1986..dc71350d70 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseObjectReadSessionStreamRead.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseObjectReadSessionStreamRead.java @@ -343,12 +343,8 @@ public boolean canShareStreamWith(ObjectReadSessionStreamRead other) { @Override public void internalClose() throws IOException { if (!closed) { - retryContext.reset(); closed = true; - if (leftovers != null) { - leftovers.ref.close(); - } - GrpcUtils.closeAll(queue); + internalCleanup(); } } @@ -378,7 +374,7 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { throw new ClosedChannelException(); } if (complete) { - close(); + internalCleanup(); return -1; } @@ -406,7 +402,7 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { } else if (poll == EofMarker.INSTANCE) { complete = true; if (read == 0) { - close(); + internalCleanup(); return -1; } break; @@ -442,6 +438,14 @@ private void offer(Closeable offer) throws InterruptedIOException { } } + private void internalCleanup() throws IOException { + retryContext.reset(); + if (leftovers != null) { + leftovers.ref.close(); + } + GrpcUtils.closeAll(queue); + } + /** * The queue items are added to is a queue of {@link Closeable}. This class smuggles a Throwable * in a no-op Closable, such that the throwable can be in the queue. diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcUtils.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcUtils.java index 7717972012..cbf5d3172a 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcUtils.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcUtils.java @@ -70,6 +70,9 @@ static GrpcCallContext contextWithBucketName(String bucketName, GrpcCallContext * them all as suppressed exceptions on the first occurrence. */ static void closeAll(Collection closeables) throws IOException { + if (closeables.isEmpty()) { + return; + } IOException ioException = closeables.stream() .filter(Objects::nonNull) diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ObjectReadSessionStreamReadTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ObjectReadSessionStreamReadTest.java index 3e70f9c048..9db08fd638 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ObjectReadSessionStreamReadTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ObjectReadSessionStreamReadTest.java @@ -356,31 +356,9 @@ public void streamingRead_eofShouldBeReturnedIfNoOtherBytesRead() throws Excepti 1, RangeSpec.of(0, 137), Hasher.enabled(), RetryContext.neverRetry())) { read.eof(); assertThat(read.read(ByteBuffer.allocate(1))).isEqualTo(-1); - - assertAll( - () -> assertThrows(ClosedChannelException.class, () -> read.read((ByteBuffer) null)), - () -> assertThat(read.isOpen()).isFalse()); - } - } - - @Test - public void streamingRead_closedOnceEofIsRead() throws Exception { - try (StreamingRead read = - ObjectReadSessionStreamRead.streamingRead( - 1, RangeSpec.of(0, 137), Hasher.enabled(), RetryContext.neverRetry())) { - ByteString bytes1 = ByteString.copyFrom(DataGenerator.base64Characters().genBytes(62)); - try (ResponseContentLifecycleHandle handle = noopContentHandle(bytes1)) { - read.accept(handle.borrow(Function.identity())); - } - - ByteBuffer buf = ByteBuffer.allocate(512); - read.read(buf); - read.eof(); - assertThat(read.read(buf)).isEqualTo(-1); - - assertAll( - () -> assertThrows(ClosedChannelException.class, () -> read.read(buf)), - () -> assertThat(read.isOpen()).isFalse()); + assertThat(read.isOpen()).isTrue(); + read.close(); + assertThat(read.isOpen()).isFalse(); } } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITReadableByteChannelBehaviorTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITReadableByteChannelBehaviorTest.java new file mode 100644 index 0000000000..b4082ed4fc --- /dev/null +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITReadableByteChannelBehaviorTest.java @@ -0,0 +1,104 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage.it; + +import static com.google.common.truth.Truth.assertThat; +import static com.google.common.truth.Truth.assertWithMessage; + +import com.google.cloud.ReadChannel; +import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.BlobReadSession; +import com.google.cloud.storage.BucketInfo; +import com.google.cloud.storage.ReadProjectionConfig; +import com.google.cloud.storage.ReadProjectionConfigs; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.TransportCompatibility.Transport; +import com.google.cloud.storage.it.runner.StorageITRunner; +import com.google.cloud.storage.it.runner.annotations.Backend; +import com.google.cloud.storage.it.runner.annotations.CrossRun; +import com.google.cloud.storage.it.runner.annotations.Inject; +import com.google.cloud.storage.it.runner.registry.Generator; +import com.google.cloud.storage.it.runner.registry.ObjectsFixture; +import com.google.common.io.ByteStreams; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(StorageITRunner.class) +@CrossRun( + backends = {Backend.PROD}, + transports = {Transport.HTTP, Transport.GRPC}) +public final class ITReadableByteChannelBehaviorTest { + + @Inject public Storage storage; + @Inject public BucketInfo bucket; + @Inject public Generator generator; + @Inject public ObjectsFixture objectsFixture; + + @Test + public void eofReturnedMultipleTimes_reader() throws IOException { + BlobId id = objectsFixture.getObj512KiB().getInfo().getBlobId(); + + try (ReadChannel reader = storage.reader(id)) { + eofReturnedMultipleTimes_doTest(reader); + } + } + + @Test + @CrossRun.Exclude(transports = Transport.HTTP) + public void eofReturnedMultipleTimes_blobReadSession_channel() + throws ExecutionException, InterruptedException, TimeoutException, IOException { + eofReturnedMultipleTimes_doTestBlobReadSession(ReadProjectionConfigs.asChannel()); + } + + @Test + @CrossRun.Exclude(transports = Transport.HTTP) + public void eofReturnedMultipleTimes_blobReadSession_seekableChannel() + throws ExecutionException, InterruptedException, TimeoutException, IOException { + eofReturnedMultipleTimes_doTestBlobReadSession(ReadProjectionConfigs.asSeekableChannel()); + } + + private void eofReturnedMultipleTimes_doTestBlobReadSession( + ReadProjectionConfig config) + throws IOException, ExecutionException, InterruptedException, TimeoutException { + BlobId id = objectsFixture.getObj512KiB().getInfo().getBlobId(); + + try (BlobReadSession session = storage.blobReadSession(id).get(3, TimeUnit.SECONDS)) { + try (ReadableByteChannel c = session.readAs(config)) { + eofReturnedMultipleTimes_doTest(c); + } + } + } + + private void eofReturnedMultipleTimes_doTest(ReadableByteChannel c) throws IOException { + long copy = ByteStreams.copy(c, Channels.newChannel(ByteStreams.nullOutputStream())); + assertThat(copy).isEqualTo(objectsFixture.getObj512KiB().getInfo().getSize()); + + ByteBuffer buf = ByteBuffer.allocate(8); + int i = ThreadLocalRandom.current().nextInt(3, 10); + for (int j = 0; j < i; j++) { + assertWithMessage("expected EOF " + j).that(c.read(buf)).isEqualTo(-1); + } + } +}