diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java index 8f33b1e5ca..ce41620d33 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java @@ -163,41 +163,42 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { readObjectObserver.request(); ReadObjectResponse resp = (ReadObjectResponse) take; - ResponseContentLifecycleHandle handle = - read.getResponseContentLifecycleManager().get(resp); - ReadObjectResponseChildRef ref = ReadObjectResponseChildRef.from(handle); - if (resp.hasMetadata()) { - Object respMetadata = resp.getMetadata(); - if (metadata == null) { - metadata = respMetadata; - } else if (metadata.getGeneration() != respMetadata.getGeneration()) { - throw closeWithError( - String.format( - Locale.US, - "Mismatch Generation between subsequent reads. Expected %d but received %d", - metadata.getGeneration(), - respMetadata.getGeneration())); + try (ResponseContentLifecycleHandle handle = + read.getResponseContentLifecycleManager().get(resp)) { + ReadObjectResponseChildRef ref = ReadObjectResponseChildRef.from(handle); + if (resp.hasMetadata()) { + Object respMetadata = resp.getMetadata(); + if (metadata == null) { + metadata = respMetadata; + } else if (metadata.getGeneration() != respMetadata.getGeneration()) { + throw closeWithError( + String.format( + Locale.US, + "Mismatch Generation between subsequent reads. Expected %d but received %d", + metadata.getGeneration(), + respMetadata.getGeneration())); + } } - } - ChecksummedData checksummedData = resp.getChecksummedData(); - ByteString content = checksummedData.getContent(); - int contentSize = content.size(); - // Very important to know whether a crc32c value is set. Without checking, protobuf will - // happily return 0, which is a valid crc32c value. - if (checksummedData.hasCrc32C()) { - Crc32cLengthKnown expected = Crc32cValue.of(checksummedData.getCrc32C(), contentSize); - try { - hasher.validate(expected, content); - } catch (IOException e) { - close(); - throw e; + ChecksummedData checksummedData = resp.getChecksummedData(); + ByteString content = checksummedData.getContent(); + int contentSize = content.size(); + // Very important to know whether a crc32c value is set. Without checking, protobuf will + // happily return 0, which is a valid crc32c value. + if (checksummedData.hasCrc32C()) { + Crc32cLengthKnown expected = Crc32cValue.of(checksummedData.getCrc32C(), contentSize); + try { + hasher.validate(expected, content); + } catch (IOException e) { + close(); + throw e; + } + } + ref.copy(c, dsts, offset, length); + if (ref.hasRemaining()) { + leftovers = ref; + } else { + ref.close(); } - } - ref.copy(c, dsts, offset, length); - if (ref.hasRemaining()) { - leftovers = ref; - } else { - ref.close(); } } long read = c.read(); diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannelTest.java new file mode 100644 index 0000000000..f74b21fb28 --- /dev/null +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannelTest.java @@ -0,0 +1,91 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import static com.google.cloud.storage.TestUtils.xxd; +import static com.google.common.truth.Truth.assertThat; + +import com.google.api.core.SettableApiFuture; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.api.gax.rpc.StreamController; +import com.google.cloud.storage.GrpcUtils.ZeroCopyServerStreamingCallable; +import com.google.cloud.storage.Retrying.Retrier; +import com.google.cloud.storage.it.ChecksummedTestContent; +import com.google.storage.v2.ReadObjectRequest; +import com.google.storage.v2.ReadObjectResponse; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.Test; + +public final class GapicUnbufferedReadableByteChannelTest { + + @Test + public void ensureResponseAreClosed() throws IOException { + ChecksummedTestContent testContent = + ChecksummedTestContent.of(DataGenerator.base64Characters().genBytes(10)); + + AtomicBoolean close = new AtomicBoolean(false); + + ResponseContentLifecycleManager manager = + resp -> ResponseContentLifecycleHandle.create(resp, () -> close.compareAndSet(false, true)); + + try (GapicUnbufferedReadableByteChannel c = + new GapicUnbufferedReadableByteChannel( + SettableApiFuture.create(), + new ZeroCopyServerStreamingCallable<>( + new ServerStreamingCallable() { + @Override + public void call( + ReadObjectRequest request, + ResponseObserver respond, + ApiCallContext context) { + respond.onStart(new NullStreamController()); + respond.onResponse( + ReadObjectResponse.newBuilder() + .setChecksummedData(testContent.asChecksummedData()) + .build()); + respond.onComplete(); + } + }, + manager), + ReadObjectRequest.getDefaultInstance(), + Hasher.noop(), + Retrier.attemptOnce(), + Retrying.neverRetry())) { + + ByteBuffer buffer = ByteBuffer.allocate(15); + c.read(buffer); + assertThat(xxd(buffer)).isEqualTo(xxd(testContent.getBytes())); + assertThat(close.get()).isTrue(); + } + } + + private static class NullStreamController implements StreamController { + + @Override + public void cancel() {} + + @Override + public void disableAutoInboundFlowControl() {} + + @Override + public void request(int count) {} + } +}