From 96e40491fb31c8986f0785a24f6846a9c2aa012c Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Tue, 10 Sep 2024 16:43:46 -0400 Subject: [PATCH 1/2] chore: attempt to drain the stream iterator Related #2696 --- .../GapicUnbufferedReadableByteChannel.java | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java index ecaa3bc878..7b680dc448 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java @@ -201,8 +201,23 @@ public ReadObjectResponse next() { @Override public void close() { if (serverStream != null) { - // todo: do we need to "drain" anything? serverStream.cancel(); + if (responseIterator != null) { + IOException ioException = null; + while (responseIterator.hasNext()) { + try { + ReadObjectResponse next = responseIterator.next(); + ResponseContentLifecycleHandle handle = rclm.get(next); + handle.close(); + } catch (IOException e) { + if (ioException == null) { + ioException = e; + } else { + ioException.addSuppressed(e); + } + } + } + } } } From d0ffc843ab57649ffc81578ed95e66677426fbf6 Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Tue, 10 Sep 2024 17:18:18 -0400 Subject: [PATCH 2/2] chore: attempt to drain gRPC ReadObject stream iterator Related #2696 --- .../storage/GapicUnbufferedReadableByteChannel.java | 10 +++++++++- .../com/google/cloud/storage/GrpcStorageOptions.java | 2 +- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java index 7b680dc448..8904dce664 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java @@ -182,6 +182,7 @@ public boolean hasNext() { if (!result.isDone()) { result.setException(StorageException.coalesce(e)); } + reset(); throw e; } } @@ -194,6 +195,7 @@ public ReadObjectResponse next() { if (!result.isDone()) { result.setException(StorageException.coalesce(e)); } + reset(); throw e; } } @@ -212,7 +214,7 @@ public void close() { } catch (IOException e) { if (ioException == null) { ioException = e; - } else { + } else if (ioException != e) { ioException.addSuppressed(e); } } @@ -238,5 +240,11 @@ private Iterator ensureResponseIteratorOpen() { } } } + + private void reset() { + serverStream = null; + responseIterator = null; + streamInitialized = false; + } } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java index 07616d0ef1..90f16bca68 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java @@ -1098,7 +1098,7 @@ static void closeAllStreams(Iterable inputStreams) throws IOExcepti } catch (IOException e) { if (ioException == null) { ioException = e; - } else { + } else if (ioException != e) { ioException.addSuppressed(e); } }