From 12484ef62397176caabea1fb132896ddc1b0b957 Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Fri, 22 Aug 2025 15:45:11 -0400 Subject: [PATCH] feat: add AppendableUploadWriteableByteChannel#flush() Allows blocking the invoking thread until the number of bytes acknowledged by GCS matches the number of written bytes prior to calling flush(). --- .../clirr-ignored-differences.xml | 8 ++++ ...pendableUnbufferedWritableByteChannel.java | 5 +++ .../google/cloud/storage/BidiUploadState.java | 38 ++++++++++++++--- .../storage/BidiUploadStreamingStream.java | 4 ++ .../cloud/storage/BlobAppendableUpload.java | 13 ++++++ .../storage/BlobAppendableUploadImpl.java | 9 ++++ .../BufferedWritableByteChannelSession.java | 2 + .../DefaultBufferedWritableByteChannel.java | 2 +- .../MinFlushBufferedWritableByteChannel.java | 2 +- .../cloud/storage/OtelStorageDecorator.java | 8 ++++ .../UnbufferedWritableByteChannelSession.java | 6 +++ .../google/cloud/storage/BidiUploadTest.java | 42 +++++++++++++++++++ ...efaultBufferedWritableByteChannelTest.java | 2 +- .../cloud/storage/ITAppendableUploadTest.java | 28 +++++++++++++ ...nFlushBufferedWritableByteChannelTest.java | 2 +- 15 files changed, 161 insertions(+), 10 deletions(-) diff --git a/google-cloud-storage/clirr-ignored-differences.xml b/google-cloud-storage/clirr-ignored-differences.xml index bdc578c4c1..9cb223aebc 100644 --- a/google-cloud-storage/clirr-ignored-differences.xml +++ b/google-cloud-storage/clirr-ignored-differences.xml @@ -184,4 +184,12 @@ int write(java.nio.ByteBuffer) + + + 7012 + com/google/cloud/storage/BlobAppendableUpload$AppendableUploadWriteableByteChannel + void flush() + + + diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannel.java index 7f105d758b..28663f813b 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannel.java @@ -106,6 +106,11 @@ public void nextWriteShouldFinalize() { this.nextWriteShouldFinalize = true; } + void flush() throws InterruptedException { + stream.flush(); + stream.awaitAckOf(writeOffset); + } + private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength) throws IOException { if (!open) { throw new ClosedChannelException(); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadState.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadState.java index 151ff402cb..08ed0c414f 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadState.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadState.java @@ -254,6 +254,10 @@ public void awaitTakeoverStateReconciliation(Runnable restart) { unimplemented(); } + public void awaitAck(long writeOffset) throws InterruptedException { + unimplemented(); + } + enum State { INITIALIZING, TAKEOVER, @@ -286,6 +290,7 @@ abstract static class BaseUploadState extends BidiUploadState { protected final Supplier baseCallContext; protected final ReentrantLock lock; protected final Condition stateUpdated; + protected final Condition confirmedBytesUpdated; /** The maximum number of bytes allowed to be enqueued in {@link #queue} across all messages. */ protected final long maxBytes; @@ -345,6 +350,7 @@ private BaseUploadState( this.enqueuedBytes = 0; this.lock = new ReentrantLock(); this.stateUpdated = lock.newCondition(); + this.confirmedBytesUpdated = lock.newCondition(); this.lastSentRequestIndex = -1; this.minByteOffset = 0; this.totalSentBytes = 0; @@ -501,6 +507,11 @@ final boolean offer(@NonNull BidiWriteObjectRequest e) { } } + protected void setConfirmedBytes(long newConfirmedBytes) { + this.confirmedBytes = newConfirmedBytes; + this.confirmedBytesUpdated.signalAll(); + } + @Override final void updateStateFromResponse(BidiWriteObjectResponse response) { lock.lock(); @@ -525,7 +536,7 @@ final void updateStateFromResponse(BidiWriteObjectResponse response) { // todo: test more permutations where this might be true // 1. retry, object not yet created if (state == State.INITIALIZING) { - confirmedBytes = persistedSize; + setConfirmedBytes(persistedSize); totalSentBytes = Math.max(totalSentBytes, persistedSize); } if (state == State.INITIALIZING || state == State.RETRYING) { @@ -541,7 +552,7 @@ final void updateStateFromResponse(BidiWriteObjectResponse response) { long endOffset = peek.getWriteOffset() + size; if (endOffset <= persistedSize) { poll(); - confirmedBytes = endOffset; + setConfirmedBytes(endOffset); enqueuedBytes -= size; minByteOffset = peek.getWriteOffset(); } else { @@ -551,11 +562,11 @@ final void updateStateFromResponse(BidiWriteObjectResponse response) { poll(); } else if (peek.getFlush()) { if (finalFlushSent && persistedSize == totalSentBytes) { - confirmedBytes = persistedSize; + setConfirmedBytes(persistedSize); signalTerminalSuccess = true; poll(); } else if (persistedSize >= peek.getWriteOffset()) { - confirmedBytes = persistedSize; + setConfirmedBytes(persistedSize); poll(); } else { break; @@ -565,7 +576,7 @@ final void updateStateFromResponse(BidiWriteObjectResponse response) { enqueuedBytes == 0, "attempting to evict finish_write: true while bytes are still enqueued"); if (response.hasResource() && persistedSize == totalSentBytes) { - confirmedBytes = persistedSize; + setConfirmedBytes(persistedSize); if (response.getResource().hasFinalizeTime()) { signalTerminalSuccess = true; poll(); @@ -883,6 +894,21 @@ public void awaitTakeoverStateReconciliation(Runnable restart) { throw StorageException.coalesce(e); } } + + @Override + public void awaitAck(long writeOffset) throws InterruptedException { + lock.lock(); + try { + while (confirmedBytes < writeOffset + && !confirmedBytesUpdated.await(5, TimeUnit.MILLISECONDS)) { + if (resultFuture.isDone()) { + return; + } + } + } finally { + lock.unlock(); + } + } } abstract static class AppendableUploadState extends BaseUploadState { @@ -950,7 +976,7 @@ private AppendableUploadState( checkState(persistedSize > -1, "persistedSize > -1 (%s > -1)", persistedSize); if (state == State.TAKEOVER || stateToReturnToAfterRetry == State.TAKEOVER) { totalSentBytes = persistedSize; - confirmedBytes = persistedSize; + setConfirmedBytes(persistedSize); if (response.hasResource() && response.getResource().hasChecksums() && response.getResource().getChecksums().hasCrc32C()) { diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadStreamingStream.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadStreamingStream.java index 3cdfb76e2e..6da243e541 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadStreamingStream.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadStreamingStream.java @@ -218,6 +218,10 @@ public void awaitTakeoverStateReconciliation() { state.awaitTakeoverStateReconciliation(this::restart); } + void awaitAckOf(long writeOffset) throws InterruptedException { + state.awaitAck(writeOffset); + } + /** * It is possible for this value to change after reading, however it is guaranteed that the amount * of available capacity will only ever increase. diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUpload.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUpload.java index e6f9167ac7..056f665ab6 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUpload.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUpload.java @@ -124,6 +124,19 @@ interface AppendableUploadWriteableByteChannel extends WritableByteChannel { @Override int write(ByteBuffer src) throws IOException; + /** + * This method is blocking + * + *

Block the invoking thread, waiting until the number of bytes written so far has been + * acknowledged by Google Cloud Storage. + * + * @throws IOException if an error happens while waiting for the flush to complete + * @throws java.io.InterruptedIOException if the current thread is interrupted while waiting + * @since 2.56.0 This new api is in preview and is subject to breaking changes. + */ + @BetaApi + void flush() throws IOException; + /** * This method is blocking * diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUploadImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUploadImpl.java index 909d11dfa2..cc3bac3f1a 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUploadImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUploadImpl.java @@ -21,6 +21,7 @@ import com.google.cloud.storage.BufferedWritableByteChannelSession.BufferedWritableByteChannel; import com.google.common.base.Preconditions; import java.io.IOException; +import java.io.InterruptedIOException; import java.nio.ByteBuffer; import java.util.concurrent.locks.ReentrantLock; @@ -82,6 +83,14 @@ public void flush() throws IOException { lock.lock(); try { buffered.flush(); + try { + unbuffered.flush(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + InterruptedIOException interruptedIOException = new InterruptedIOException(); + interruptedIOException.initCause(e); + throw interruptedIOException; + } } finally { lock.unlock(); } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BufferedWritableByteChannelSession.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BufferedWritableByteChannelSession.java index 67cf231333..cf9c19602f 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BufferedWritableByteChannelSession.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BufferedWritableByteChannelSession.java @@ -24,6 +24,8 @@ interface BufferedWritableByteChannelSession extends WritableByteChannelSession { interface BufferedWritableByteChannel extends WritableByteChannel { + + /** Block the invoking thread until all written bytes are accepted by the lower layer */ void flush() throws IOException; } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBufferedWritableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBufferedWritableByteChannel.java index 7b92b25724..4e9a7c107f 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBufferedWritableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBufferedWritableByteChannel.java @@ -200,7 +200,7 @@ public void close() throws IOException { @Override public void flush() throws IOException { - if (enqueuedBytes()) { + while (enqueuedBytes()) { ByteBuffer buffer = handle.get(); Buffers.flip(buffer); channel.write(buffer); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/MinFlushBufferedWritableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/MinFlushBufferedWritableByteChannel.java index 30e8206ea6..4bd4d9eaa5 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/MinFlushBufferedWritableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/MinFlushBufferedWritableByteChannel.java @@ -155,7 +155,7 @@ public void close() throws IOException { @Override public void flush() throws IOException { - if (enqueuedBytes()) { + while (enqueuedBytes()) { ByteBuffer buffer = handle.get(); Buffers.flip(buffer); channel.write(buffer); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/OtelStorageDecorator.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/OtelStorageDecorator.java index 1742833ecb..0a5eae9577 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/OtelStorageDecorator.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/OtelStorageDecorator.java @@ -2217,6 +2217,7 @@ private OtelDecoratingAppendableUploadWriteableByteChannel( @Override @BetaApi public void finalizeAndClose() throws IOException { + setScope(); try { delegate.finalizeAndClose(); } catch (IOException | RuntimeException e) { @@ -2235,6 +2236,7 @@ public void finalizeAndClose() throws IOException { @Override @BetaApi public void closeWithoutFinalizing() throws IOException { + setScope(); try { delegate.closeWithoutFinalizing(); } catch (IOException | RuntimeException e) { @@ -2269,6 +2271,12 @@ public void close() throws IOException { } } + @Override + public void flush() throws IOException { + setScope(); + delegate.flush(); + } + @Override public int write(ByteBuffer src) throws IOException { setScope(); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/UnbufferedWritableByteChannelSession.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/UnbufferedWritableByteChannelSession.java index d7a5fcef60..2210822fa0 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/UnbufferedWritableByteChannelSession.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/UnbufferedWritableByteChannelSession.java @@ -26,24 +26,30 @@ interface UnbufferedWritableByteChannelSession extends WritableByteChannelSession { interface UnbufferedWritableByteChannel extends WritableByteChannel, GatheringByteChannel { + + /** Default assumed to be blocking, non-blocking allowed but must be documented. */ @Override default int write(ByteBuffer src) throws IOException { return Math.toIntExact(write(new ByteBuffer[] {src}, 0, 1)); } + /** Default assumed to be blocking, non-blocking allowed but must be documented. */ @Override default long write(ByteBuffer[] srcs) throws IOException { return write(srcs, 0, srcs.length); } + /** This method must block until terminal state is reached. */ default int writeAndClose(ByteBuffer src) throws IOException { return Math.toIntExact(writeAndClose(new ByteBuffer[] {src}, 0, 1)); } + /** This method must block until terminal state is reached. */ default long writeAndClose(ByteBuffer[] srcs) throws IOException { return writeAndClose(srcs, 0, srcs.length); } + /** This method must block until terminal state is reached. */ default long writeAndClose(ByteBuffer[] srcs, int offset, int length) throws IOException { long write = write(srcs, offset, length); close(); diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiUploadTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiUploadTest.java index bfb561b593..52b6ebb5fe 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiUploadTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/BidiUploadTest.java @@ -17,7 +17,9 @@ package com.google.cloud.storage; import static com.google.cloud.storage.BidiUploadState.appendableNew; +import static com.google.cloud.storage.BidiUploadTestUtils.createSegment; import static com.google.cloud.storage.BidiUploadTestUtils.finishAt; +import static com.google.cloud.storage.BidiUploadTestUtils.incremental; import static com.google.cloud.storage.BidiUploadTestUtils.makeRedirect; import static com.google.cloud.storage.BidiUploadTestUtils.packRedirectIntoAbortedException; import static com.google.cloud.storage.BidiUploadTestUtils.timestampNow; @@ -84,6 +86,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -480,6 +483,45 @@ public void redirectToken_appendable_noPreviousSuccessfulFlush() throws Exceptio () -> assertThat(actualCtx.getExtraHeaders()).isEqualTo(expectedHeaders)); } + @Test + public void awaitAck_alreadyThere() throws InterruptedException { + BidiUploadState state = factory.createInitialized(17); + + assertThat(state.offer(createSegment(2))).isTrue(); + assertThat(state.onResponse(incremental(2))).isNull(); + + state.awaitAck(2); + } + + @Test + public void awaitAck_multipleResponses() + throws InterruptedException, ExecutionException, TimeoutException { + BidiUploadState state = factory.createInitialized(17); + + assertThat(state.offer(createSegment(4))).isTrue(); + ExecutorService exec = Executors.newSingleThreadExecutor(); + try { + Future f = + exec.submit( + () -> { + try { + Thread.sleep(10); + assertThat(state.onResponse(incremental(2))).isNull(); + Thread.sleep(10); + assertThat(state.onResponse(incremental(4))).isNull(); + return 3; + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + state.awaitAck(4); + assertThat(f.get(3, TimeUnit.SECONDS)).isEqualTo(3); + } finally { + exec.shutdownNow(); + } + } + private abstract static class BidiUploadStateFactory { final BidiUploadState createInitialized() { return createInitialized(25); diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/DefaultBufferedWritableByteChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/DefaultBufferedWritableByteChannelTest.java index fa88ec76ec..954d2917fd 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/DefaultBufferedWritableByteChannelTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/DefaultBufferedWritableByteChannelTest.java @@ -259,7 +259,7 @@ void manualFlushingIsAccurate() throws IOException { assertWithMessage("Unexpected total flushed length") .that(adapter.writeEndPoints) - .isEqualTo(ImmutableList.of(3L, 5L, 10L, 12L)); + .isEqualTo(ImmutableList.of(3L, 5L, 6L, 11L, 12L)); assertThat(baos.toByteArray()).isEqualTo(allData); } } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITAppendableUploadTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITAppendableUploadTest.java index b21896a1c7..eeb24f59c5 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITAppendableUploadTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITAppendableUploadTest.java @@ -134,6 +134,34 @@ public void appendableUpload_bytes() assertThat(xxd(actualBytes)).isEqualTo(xxd(a1_a2.getBytes())); } + @Test + public void explicitFlush() + throws IOException, ExecutionException, InterruptedException, TimeoutException { + checkTestbenchIssue733(); + + BlobAppendableUpload upload = + storage.blobAppendableUpload( + BlobInfo.newBuilder(bucket, UUID.randomUUID().toString()).build(), p.uploadConfig); + + try (AppendableUploadWriteableByteChannel channel = upload.open()) { + ByteBuffer src = p.content.asByteBuffer(); + ByteBuffer zed = src.slice(); + zed.limit(zed.position() + 1); + src.position(src.position() + 1); + + int written = channel.write(zed); + assertThat(written).isEqualTo(1); + channel.flush(); + + written = StorageChannelUtils.blockingEmptyTo(src, channel); + assertThat(written).isEqualTo(p.content.length() - 1); + } + + BlobInfo gen1 = upload.getResult().get(3, TimeUnit.SECONDS); + assertThat(gen1.getSize()).isEqualTo(p.content.length()); + assertThat(gen1.getCrc32c()).isEqualTo(Utils.crc32cCodec.encode(p.content.getCrc32c())); + } + @Test // Pending work in testbench: https://github.com/googleapis/storage-testbench/issues/723 // manually verified internally on 2025-03-25 diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/MinFlushBufferedWritableByteChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/MinFlushBufferedWritableByteChannelTest.java index 255d0e4bea..6ccd5c669f 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/MinFlushBufferedWritableByteChannelTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/MinFlushBufferedWritableByteChannelTest.java @@ -338,7 +338,7 @@ void manualFlushingIsAccurate() throws IOException { assertWithMessage("Unexpected total flushed length") .that(adapter.writeEndPoints) - .isEqualTo(ImmutableList.of(3L, 5L, 12L)); + .isEqualTo(ImmutableList.of(3L, 5L, 6L, 12L)); assertThat(baos.toByteArray()).isEqualTo(allData); } }