Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions google-cloud-storage/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -184,4 +184,12 @@
<method>int write(java.nio.ByteBuffer)</method>
</difference>

<!-- @InternalExtensionOnly -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/storage/BlobAppendableUpload$AppendableUploadWriteableByteChannel</className>
<method>void flush()</method>
</difference>


</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ public void nextWriteShouldFinalize() {
this.nextWriteShouldFinalize = true;
}

void flush() throws InterruptedException {
stream.flush();
stream.awaitAckOf(writeOffset);
}

private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength) throws IOException {
if (!open) {
throw new ClosedChannelException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,10 @@ public void awaitTakeoverStateReconciliation(Runnable restart) {
unimplemented();
}

public void awaitAck(long writeOffset) throws InterruptedException {
unimplemented();
}

enum State {
INITIALIZING,
TAKEOVER,
Expand Down Expand Up @@ -286,6 +290,7 @@ abstract static class BaseUploadState extends BidiUploadState {
protected final Supplier<GrpcCallContext> baseCallContext;
protected final ReentrantLock lock;
protected final Condition stateUpdated;
protected final Condition confirmedBytesUpdated;

/** The maximum number of bytes allowed to be enqueued in {@link #queue} across all messages. */
protected final long maxBytes;
Expand Down Expand Up @@ -345,6 +350,7 @@ private BaseUploadState(
this.enqueuedBytes = 0;
this.lock = new ReentrantLock();
this.stateUpdated = lock.newCondition();
this.confirmedBytesUpdated = lock.newCondition();
this.lastSentRequestIndex = -1;
this.minByteOffset = 0;
this.totalSentBytes = 0;
Expand Down Expand Up @@ -501,6 +507,11 @@ final boolean offer(@NonNull BidiWriteObjectRequest e) {
}
}

protected void setConfirmedBytes(long newConfirmedBytes) {
this.confirmedBytes = newConfirmedBytes;
this.confirmedBytesUpdated.signalAll();
}

@Override
final void updateStateFromResponse(BidiWriteObjectResponse response) {
lock.lock();
Expand All @@ -525,7 +536,7 @@ final void updateStateFromResponse(BidiWriteObjectResponse response) {
// todo: test more permutations where this might be true
// 1. retry, object not yet created
if (state == State.INITIALIZING) {
confirmedBytes = persistedSize;
setConfirmedBytes(persistedSize);
totalSentBytes = Math.max(totalSentBytes, persistedSize);
}
if (state == State.INITIALIZING || state == State.RETRYING) {
Expand All @@ -541,7 +552,7 @@ final void updateStateFromResponse(BidiWriteObjectResponse response) {
long endOffset = peek.getWriteOffset() + size;
if (endOffset <= persistedSize) {
poll();
confirmedBytes = endOffset;
setConfirmedBytes(endOffset);
enqueuedBytes -= size;
minByteOffset = peek.getWriteOffset();
} else {
Expand All @@ -551,11 +562,11 @@ final void updateStateFromResponse(BidiWriteObjectResponse response) {
poll();
} else if (peek.getFlush()) {
if (finalFlushSent && persistedSize == totalSentBytes) {
confirmedBytes = persistedSize;
setConfirmedBytes(persistedSize);
signalTerminalSuccess = true;
poll();
} else if (persistedSize >= peek.getWriteOffset()) {
confirmedBytes = persistedSize;
setConfirmedBytes(persistedSize);
poll();
} else {
break;
Expand All @@ -565,7 +576,7 @@ final void updateStateFromResponse(BidiWriteObjectResponse response) {
enqueuedBytes == 0,
"attempting to evict finish_write: true while bytes are still enqueued");
if (response.hasResource() && persistedSize == totalSentBytes) {
confirmedBytes = persistedSize;
setConfirmedBytes(persistedSize);
if (response.getResource().hasFinalizeTime()) {
signalTerminalSuccess = true;
poll();
Expand Down Expand Up @@ -883,6 +894,21 @@ public void awaitTakeoverStateReconciliation(Runnable restart) {
throw StorageException.coalesce(e);
}
}

@Override
public void awaitAck(long writeOffset) throws InterruptedException {
lock.lock();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: why are we using lock here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Condition confirmedBytesUpdated is tied to the lock instance.

Conditions (also known as condition queues or condition variables) provide a means for one thread to suspend execution (to "wait") until notified by another thread that some state condition may now be true. Because access to this shared state information occurs in different threads, it must be protected, so a lock of some form is associated with the condition. The key property that waiting for a condition provides is that it atomically releases the associated lock and suspends the current thread, just like Object.wait. [1]

When we await the condition on line 903, the lock will automatically be released while the invoking thread waits for the call of await to return.

try {
while (confirmedBytes < writeOffset
&& !confirmedBytesUpdated.await(5, TimeUnit.MILLISECONDS)) {
if (resultFuture.isDone()) {
return;
}
}
} finally {
lock.unlock();
}
}
}

abstract static class AppendableUploadState extends BaseUploadState {
Expand Down Expand Up @@ -950,7 +976,7 @@ private AppendableUploadState(
checkState(persistedSize > -1, "persistedSize > -1 (%s > -1)", persistedSize);
if (state == State.TAKEOVER || stateToReturnToAfterRetry == State.TAKEOVER) {
totalSentBytes = persistedSize;
confirmedBytes = persistedSize;
setConfirmedBytes(persistedSize);
if (response.hasResource()
&& response.getResource().hasChecksums()
&& response.getResource().getChecksums().hasCrc32C()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,10 @@ public void awaitTakeoverStateReconciliation() {
state.awaitTakeoverStateReconciliation(this::restart);
}

void awaitAckOf(long writeOffset) throws InterruptedException {
state.awaitAck(writeOffset);
}

/**
* It is possible for this value to change after reading, however it is guaranteed that the amount
* of available capacity will only ever increase.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,19 @@ interface AppendableUploadWriteableByteChannel extends WritableByteChannel {
@Override
int write(ByteBuffer src) throws IOException;

/**
* <b>This method is blocking</b>
*
* <p>Block the invoking thread, waiting until the number of bytes written so far has been
* acknowledged by Google Cloud Storage.
*
* @throws IOException if an error happens while waiting for the flush to complete
* @throws java.io.InterruptedIOException if the current thread is interrupted while waiting
* @since 2.56.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
void flush() throws IOException;

/**
* <b>This method is blocking</b>
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.cloud.storage.BufferedWritableByteChannelSession.BufferedWritableByteChannel;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.util.concurrent.locks.ReentrantLock;

Expand Down Expand Up @@ -82,6 +83,14 @@ public void flush() throws IOException {
lock.lock();
try {
buffered.flush();
try {
unbuffered.flush();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
InterruptedIOException interruptedIOException = new InterruptedIOException();
interruptedIOException.initCause(e);
throw interruptedIOException;
}
} finally {
lock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ interface BufferedWritableByteChannelSession<ResultT>
extends WritableByteChannelSession<BufferedWritableByteChannel, ResultT> {

interface BufferedWritableByteChannel extends WritableByteChannel {

/** Block the invoking thread until all written bytes are accepted by the lower layer */
void flush() throws IOException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public void close() throws IOException {

@Override
public void flush() throws IOException {
if (enqueuedBytes()) {
while (enqueuedBytes()) {
ByteBuffer buffer = handle.get();
Buffers.flip(buffer);
channel.write(buffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public void close() throws IOException {

@Override
public void flush() throws IOException {
if (enqueuedBytes()) {
while (enqueuedBytes()) {
ByteBuffer buffer = handle.get();
Buffers.flip(buffer);
channel.write(buffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2217,6 +2217,7 @@ private OtelDecoratingAppendableUploadWriteableByteChannel(
@Override
@BetaApi
public void finalizeAndClose() throws IOException {
setScope();
try {
delegate.finalizeAndClose();
} catch (IOException | RuntimeException e) {
Expand All @@ -2235,6 +2236,7 @@ public void finalizeAndClose() throws IOException {
@Override
@BetaApi
public void closeWithoutFinalizing() throws IOException {
setScope();
try {
delegate.closeWithoutFinalizing();
} catch (IOException | RuntimeException e) {
Expand Down Expand Up @@ -2269,6 +2271,12 @@ public void close() throws IOException {
}
}

@Override
public void flush() throws IOException {
setScope();
delegate.flush();
}

@Override
public int write(ByteBuffer src) throws IOException {
setScope();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,30 @@ interface UnbufferedWritableByteChannelSession<ResultT>
extends WritableByteChannelSession<UnbufferedWritableByteChannel, ResultT> {

interface UnbufferedWritableByteChannel extends WritableByteChannel, GatheringByteChannel {

/** Default assumed to be blocking, non-blocking allowed but must be documented. */
@Override
default int write(ByteBuffer src) throws IOException {
return Math.toIntExact(write(new ByteBuffer[] {src}, 0, 1));
}

/** Default assumed to be blocking, non-blocking allowed but must be documented. */
@Override
default long write(ByteBuffer[] srcs) throws IOException {
return write(srcs, 0, srcs.length);
}

/** This method must block until terminal state is reached. */
default int writeAndClose(ByteBuffer src) throws IOException {
return Math.toIntExact(writeAndClose(new ByteBuffer[] {src}, 0, 1));
}

/** This method must block until terminal state is reached. */
default long writeAndClose(ByteBuffer[] srcs) throws IOException {
return writeAndClose(srcs, 0, srcs.length);
}

/** This method must block until terminal state is reached. */
default long writeAndClose(ByteBuffer[] srcs, int offset, int length) throws IOException {
long write = write(srcs, offset, length);
close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package com.google.cloud.storage;

import static com.google.cloud.storage.BidiUploadState.appendableNew;
import static com.google.cloud.storage.BidiUploadTestUtils.createSegment;
import static com.google.cloud.storage.BidiUploadTestUtils.finishAt;
import static com.google.cloud.storage.BidiUploadTestUtils.incremental;
import static com.google.cloud.storage.BidiUploadTestUtils.makeRedirect;
import static com.google.cloud.storage.BidiUploadTestUtils.packRedirectIntoAbortedException;
import static com.google.cloud.storage.BidiUploadTestUtils.timestampNow;
Expand Down Expand Up @@ -84,6 +86,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -480,6 +483,45 @@ public void redirectToken_appendable_noPreviousSuccessfulFlush() throws Exceptio
() -> assertThat(actualCtx.getExtraHeaders()).isEqualTo(expectedHeaders));
}

@Test
public void awaitAck_alreadyThere() throws InterruptedException {
BidiUploadState state = factory.createInitialized(17);

assertThat(state.offer(createSegment(2))).isTrue();
assertThat(state.onResponse(incremental(2))).isNull();

state.awaitAck(2);
}

@Test
public void awaitAck_multipleResponses()
throws InterruptedException, ExecutionException, TimeoutException {
BidiUploadState state = factory.createInitialized(17);

assertThat(state.offer(createSegment(4))).isTrue();
ExecutorService exec = Executors.newSingleThreadExecutor();
try {
Future<Integer> f =
exec.submit(
() -> {
try {
Thread.sleep(10);
assertThat(state.onResponse(incremental(2))).isNull();
Thread.sleep(10);
assertThat(state.onResponse(incremental(4))).isNull();
return 3;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});

state.awaitAck(4);
assertThat(f.get(3, TimeUnit.SECONDS)).isEqualTo(3);
} finally {
exec.shutdownNow();
}
}

private abstract static class BidiUploadStateFactory {
final BidiUploadState createInitialized() {
return createInitialized(25);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ void manualFlushingIsAccurate() throws IOException {

assertWithMessage("Unexpected total flushed length")
.that(adapter.writeEndPoints)
.isEqualTo(ImmutableList.of(3L, 5L, 10L, 12L));
.isEqualTo(ImmutableList.of(3L, 5L, 6L, 11L, 12L));
assertThat(baos.toByteArray()).isEqualTo(allData);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,34 @@ public void appendableUpload_bytes()
assertThat(xxd(actualBytes)).isEqualTo(xxd(a1_a2.getBytes()));
}

@Test
public void explicitFlush()
throws IOException, ExecutionException, InterruptedException, TimeoutException {
checkTestbenchIssue733();

BlobAppendableUpload upload =
storage.blobAppendableUpload(
BlobInfo.newBuilder(bucket, UUID.randomUUID().toString()).build(), p.uploadConfig);

try (AppendableUploadWriteableByteChannel channel = upload.open()) {
ByteBuffer src = p.content.asByteBuffer();
ByteBuffer zed = src.slice();
zed.limit(zed.position() + 1);
src.position(src.position() + 1);

int written = channel.write(zed);
assertThat(written).isEqualTo(1);
channel.flush();

written = StorageChannelUtils.blockingEmptyTo(src, channel);
assertThat(written).isEqualTo(p.content.length() - 1);
}

BlobInfo gen1 = upload.getResult().get(3, TimeUnit.SECONDS);
assertThat(gen1.getSize()).isEqualTo(p.content.length());
assertThat(gen1.getCrc32c()).isEqualTo(Utils.crc32cCodec.encode(p.content.getCrc32c()));
}

@Test
// Pending work in testbench: https://github.com/googleapis/storage-testbench/issues/723
// manually verified internally on 2025-03-25
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ void manualFlushingIsAccurate() throws IOException {

assertWithMessage("Unexpected total flushed length")
.that(adapter.writeEndPoints)
.isEqualTo(ImmutableList.of(3L, 5L, 12L));
.isEqualTo(ImmutableList.of(3L, 5L, 6L, 12L));
assertThat(baos.toByteArray()).isEqualTo(allData);
}
}
Expand Down
Loading