diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUpload.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUpload.java index e0795b39de..b79a290969 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUpload.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUpload.java @@ -19,10 +19,12 @@ import com.google.api.core.ApiFuture; import com.google.api.core.BetaApi; import com.google.api.core.InternalExtensionOnly; +import com.google.cloud.storage.BlobAppendableUploadConfig.CloseAction; import com.google.cloud.storage.Storage.BlobWriteOption; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; +import java.util.concurrent.TimeUnit; /** * Interface representing those methods which can be used to write to and interact with an @@ -32,37 +34,120 @@ */ @BetaApi @InternalExtensionOnly -public interface BlobAppendableUpload extends AutoCloseable, WritableByteChannel { +public interface BlobAppendableUpload extends BlobWriteSession { /** - * Write some bytes to the appendable session. Whether a flush happens will depend on how many - * bytes have been written prior, how many bytes are being written now and what {@link - * BlobAppendableUploadConfig} was provided when creating the {@link BlobAppendableUpload}. + * Open the {@link AppendableUploadWriteableByteChannel AppendableUploadWriteableByteChannel} for + * this session. * - *

This method can block the invoking thread in order to ensure written bytes are acknowledged - * by Google Cloud Storage. + *

A session may only be {@code open}ed once. If multiple calls to open are made, an illegal + * state exception will be thrown * - * @see Storage#blobAppendableUpload(BlobInfo, BlobAppendableUploadConfig, BlobWriteOption...) + *

The returned {@code AppendableUploadWriteableByteChannel} can throw IOExceptions from any of + * its usual methods. Any {@link IOException} thrown can have a cause of a {@link + * StorageException}. However, not all {@code IOExceptions} will have {@code StorageException}s. + * + * @throws IOException When creating the {@link AppendableUploadWriteableByteChannel} if an + * unrecoverable underlying IOException occurs it can be rethrown + * @throws IllegalStateException if open is called more than once + * @since 2.51.0 This new api is in preview and is subject to breaking changes. */ @Override - int write(ByteBuffer src) throws IOException; + AppendableUploadWriteableByteChannel open() throws IOException; /** - * Close this instance to further {@link #write(ByteBuffer)}ing. This will close any underlying - * stream and release any releasable resources once out of scope. + * Return an {@link ApiFuture}{@code } which will represent the state of the object in + * Google Cloud Storage. + * + *

This future will not resolve until: + * + *

    + *
  1. The object is successfully finalized in Google Cloud Storage by calling {@link + * AppendableUploadWriteableByteChannel#finalizeAndClose() + * AppendableUploadWriteableByteChannel#finalizeAndClose()} + *
  2. This session is detached from the upload without finalizing by calling {@link + * AppendableUploadWriteableByteChannel#closeWithoutFinalizing() + * AppendableUploadWriteableByteChannel#closeWithoutFinalizing()} + *
  3. The session is closed by calling {@link AppendableUploadWriteableByteChannel#close() + * AppendableUploadWriteableByteChannel#close()} + *
  4. A terminal failure occurs, the terminal failure will become the exception result + *
+ * + *

NOTICE: Some fields may not be populated unless finalization has completed. + * + *

If a terminal failure is encountered, calling either {@link ApiFuture#get()} or {@link + * ApiFuture#get(long, TimeUnit)} will result in an {@link + * java.util.concurrent.ExecutionException} with the cause. * - *

{@link #finalizeUpload()} can be called after this method, but it will not carry any bytes - * with it. + * @since 2.51.0 This new api is in preview and is subject to breaking changes. */ @Override - void close() throws IOException; + ApiFuture getResult(); /** - * Finalize the appendable upload, close any underlying stream and release any releasable - * resources once out of scope. + * The {@link WritableByteChannel} returned from {@link BlobAppendableUpload#open()}. * - *

Once this method is called, and returns no more writes to the object will be allowed by GCS. + *

This interface allows writing bytes to an Appendable Upload, and provides methods to close + * this channel -- optionally finalizing the upload. + * + * @since 2.51.0 This new api is in preview and is subject to breaking changes. */ @BetaApi - ApiFuture finalizeUpload() throws IOException; + @InternalExtensionOnly + interface AppendableUploadWriteableByteChannel extends WritableByteChannel { + + /** + * Finalize the upload and close this instance to further {@link #write(ByteBuffer)}ing. This + * will close any underlying stream and release any releasable resources once out of scope. + * + *

Once this method is called, and returns no more writes to the object will be allowed by + * GCS. + * + *

This method and {@link #close()} are mutually exclusive. If one of the other methods are + * called before this method, this method will be a no-op. + * + * @see Storage#blobAppendableUpload(BlobInfo, BlobAppendableUploadConfig, BlobWriteOption...) + * @see BlobAppendableUploadConfig.CloseAction#FINALIZE_WHEN_CLOSING + * @see BlobAppendableUploadConfig#getCloseAction() + * @see BlobAppendableUploadConfig#withCloseAction(CloseAction) + * @since 2.51.0 This new api is in preview and is subject to breaking changes. + */ + @BetaApi + void finalizeAndClose() throws IOException; + + /** + * Close this instance to further {@link #write(ByteBuffer)}ing without finalizing the upload. + * This will close any underlying stream and release any releasable resources once out of scope. + * + *

This method, {@link AppendableUploadWriteableByteChannel#finalizeAndClose()} and {@link + * AppendableUploadWriteableByteChannel#close()} are mutually exclusive. If one of the other + * methods are called before this method, this method will be a no-op. + * + * @see Storage#blobAppendableUpload(BlobInfo, BlobAppendableUploadConfig, BlobWriteOption...) + * @see BlobAppendableUploadConfig.CloseAction#CLOSE_WITHOUT_FINALIZING + * @see BlobAppendableUploadConfig#getCloseAction() + * @see BlobAppendableUploadConfig#withCloseAction(CloseAction) + * @since 2.51.0 This new api is in preview and is subject to breaking changes. + */ + @BetaApi + void closeWithoutFinalizing() throws IOException; + + /** + * Close this instance to further {@link #write(ByteBuffer)}ing. + * + *

Whether the upload is finalized during this depends on the {@link + * BlobAppendableUploadConfig#getCloseAction()} provided to create the {@link + * BlobAppendableUpload}. If {@link BlobAppendableUploadConfig#getCloseAction()}{@code == + * }{@link CloseAction#FINALIZE_WHEN_CLOSING}, {@link #finalizeAndClose()} will be called. If + * {@link BlobAppendableUploadConfig#getCloseAction()}{@code == }{@link + * CloseAction#CLOSE_WITHOUT_FINALIZING}, {@link #closeWithoutFinalizing()} will be called. + * + * @see Storage#blobAppendableUpload(BlobInfo, BlobAppendableUploadConfig, BlobWriteOption...) + * @see BlobAppendableUploadConfig#getCloseAction() + * @see BlobAppendableUploadConfig#withCloseAction(CloseAction) + * @since 2.51.0 This new api is in preview and is subject to breaking changes. + */ + @BetaApi + void close() throws IOException; + } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUploadConfig.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUploadConfig.java index 6000f50320..ae95356d74 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUploadConfig.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUploadConfig.java @@ -19,10 +19,21 @@ import static com.google.cloud.storage.ByteSizeConstants._256KiB; import static java.util.Objects.requireNonNull; +import com.google.api.core.ApiFutures; import com.google.api.core.BetaApi; import com.google.api.core.InternalApi; +import com.google.api.gax.retrying.BasicResultRetryAlgorithm; +import com.google.api.gax.rpc.AbortedException; +import com.google.api.gax.rpc.ApiException; +import com.google.cloud.storage.BlobAppendableUpload.AppendableUploadWriteableByteChannel; +import com.google.cloud.storage.BlobAppendableUploadImpl.AppendableObjectBufferedWritableByteChannel; import com.google.cloud.storage.Storage.BlobWriteOption; import com.google.cloud.storage.TransportCompatibility.Transport; +import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt; +import com.google.cloud.storage.UnifiedOpts.Opts; +import com.google.storage.v2.BidiWriteObjectRequest; +import com.google.storage.v2.BidiWriteObjectResponse; +import com.google.storage.v2.Object; import javax.annotation.concurrent.Immutable; /** @@ -39,14 +50,20 @@ public final class BlobAppendableUploadConfig { private static final BlobAppendableUploadConfig INSTANCE = - new BlobAppendableUploadConfig(FlushPolicy.minFlushSize(_256KiB), Hasher.enabled()); + new BlobAppendableUploadConfig( + FlushPolicy.minFlushSize(_256KiB), + Hasher.enabled(), + CloseAction.CLOSE_WITHOUT_FINALIZING); private final FlushPolicy flushPolicy; private final Hasher hasher; + private final CloseAction closeAction; - private BlobAppendableUploadConfig(FlushPolicy flushPolicy, Hasher hasher) { + private BlobAppendableUploadConfig( + FlushPolicy flushPolicy, Hasher hasher, CloseAction closeAction) { this.flushPolicy = flushPolicy; this.hasher = hasher; + this.closeAction = closeAction; } /** @@ -77,7 +94,37 @@ public BlobAppendableUploadConfig withFlushPolicy(FlushPolicy flushPolicy) { if (this.flushPolicy.equals(flushPolicy)) { return this; } - return new BlobAppendableUploadConfig(flushPolicy, hasher); + return new BlobAppendableUploadConfig(flushPolicy, hasher, closeAction); + } + + /** + * The {@link CloseAction} which will dictate the behavior of {@link + * AppendableUploadWriteableByteChannel#close()}. + * + *

Default: {@link CloseAction#CLOSE_WITHOUT_FINALIZING} + * + * @see #withCloseAction(CloseAction) + * @since 2.51.0 This new api is in preview and is subject to breaking changes. + */ + @BetaApi + public CloseAction getCloseAction() { + return closeAction; + } + + /** + * Return an instance with the {@code CloseAction} set to be the specified value. Default: + * {@link CloseAction#CLOSE_WITHOUT_FINALIZING} + * + * @see #getCloseAction() + * @since 2.51.0 This new api is in preview and is subject to breaking changes. + */ + @BetaApi + public BlobAppendableUploadConfig withCloseAction(CloseAction closeAction) { + requireNonNull(closeAction, "closeAction must be non null"); + if (this.closeAction == closeAction) { + return this; + } + return new BlobAppendableUploadConfig(flushPolicy, hasher, closeAction); } /** @@ -108,7 +155,8 @@ BlobAppendableUploadConfig withCrc32cValidationEnabled(boolean enabled) { } else if (!enabled && Hasher.noop().equals(hasher)) { return this; } - return new BlobAppendableUploadConfig(flushPolicy, enabled ? Hasher.enabled() : Hasher.noop()); + return new BlobAppendableUploadConfig( + flushPolicy, enabled ? Hasher.enabled() : Hasher.noop(), closeAction); } /** Never to be made public until {@link Hasher} is public */ @@ -125,6 +173,7 @@ Hasher getHasher() { *

{@code
    * BlobAppendableUploadConfig.of()
    *   .withFlushPolicy(FlushPolicy.minFlushSize(256 * 1024))
+   *   .withCloseAction(CloseAction.CLOSE_WITHOUT_FINALIZING)
    * }
* * @since 2.51.0 This new api is in preview and is subject to breaking changes. @@ -134,4 +183,89 @@ Hasher getHasher() { public static BlobAppendableUploadConfig of() { return INSTANCE; } + + /** + * Enum providing the possible actions which can be taken during the {@link + * AppendableUploadWriteableByteChannel#close()} call. + * + * @see AppendableUploadWriteableByteChannel#close() + * @see BlobAppendableUploadConfig#withCloseAction(CloseAction) + * @see BlobAppendableUploadConfig#getCloseAction() + * @since 2.51.0 This new api is in preview and is subject to breaking changes. + */ + @BetaApi + public enum CloseAction { + /** + * Designate that when {@link AppendableUploadWriteableByteChannel#close()} is called, the + * appendable upload should be finalized. + * + * @since 2.51.0 This new api is in preview and is subject to breaking changes. + * @see AppendableUploadWriteableByteChannel#finalizeAndClose() + */ + @BetaApi + FINALIZE_WHEN_CLOSING, + /** + * Designate that when {@link AppendableUploadWriteableByteChannel#close()} is called, the + * appendable upload should NOT be finalized, allowing for takeover by another session or + * client. + * + * @since 2.51.0 This new api is in preview and is subject to breaking changes. + * @see AppendableUploadWriteableByteChannel#closeWithoutFinalizing() + */ + @BetaApi + CLOSE_WITHOUT_FINALIZING + } + + BlobAppendableUpload create(GrpcStorageImpl storage, BlobInfo info, Opts opts) { + boolean takeOver = info.getGeneration() != null; + BidiWriteObjectRequest req = + takeOver + ? storage.getBidiWriteObjectRequestForTakeover(info, opts) + : storage.getBidiWriteObjectRequest(info, opts); + + BidiAppendableWrite baw = new BidiAppendableWrite(req, takeOver); + + WritableByteChannelSession + build = + ResumableMedia.gapic() + .write() + .bidiByteChannel(storage.storageClient.bidiWriteObjectCallable()) + .setHasher(this.getHasher()) + .setByteStringStrategy(ByteStringStrategy.copy()) + .appendable() + .withRetryConfig( + storage.retrier.withAlg( + new BasicResultRetryAlgorithm() { + @Override + public boolean shouldRetry( + Throwable previousThrowable, Object previousResponse) { + // TODO: remove this later once the redirects are not handled by the + // retry loop + ApiException apiEx = null; + if (previousThrowable instanceof StorageException) { + StorageException se = (StorageException) previousThrowable; + Throwable cause = se.getCause(); + if (cause instanceof ApiException) { + apiEx = (ApiException) cause; + } + } + if (apiEx instanceof AbortedException) { + return true; + } + return storage + .retryAlgorithmManager + .idempotent() + .shouldRetry(previousThrowable, null); + } + })) + .buffered(this.getFlushPolicy()) + .setStartAsync(ApiFutures.immediateFuture(baw)) + .setGetCallable(storage.storageClient.getObjectCallable()) + .setFinalizeOnClose(this.closeAction == CloseAction.FINALIZE_WHEN_CLOSING) + .build(); + + return new BlobAppendableUploadImpl( + new DefaultBlobWriteSessionConfig.DecoratedWritableByteChannelSession<>( + build, BidiBlobWriteSessionConfig.Factory.WRITE_OBJECT_RESPONSE_BLOB_INFO_DECODER)); + } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUploadImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUploadImpl.java index b9c88c0b84..cedfbcba58 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUploadImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUploadImpl.java @@ -19,60 +19,35 @@ import com.google.api.core.ApiFuture; import com.google.api.core.BetaApi; import com.google.cloud.storage.BufferedWritableByteChannelSession.BufferedWritableByteChannel; +import com.google.common.base.Preconditions; import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.locks.ReentrantLock; @BetaApi final class BlobAppendableUploadImpl implements BlobAppendableUpload { - private final AppendableObjectBufferedWritableByteChannel channel; - private final ApiFuture result; - - private BlobAppendableUploadImpl(BlobInfo blob, BlobWriteSession session, boolean takeover) - throws IOException { - channel = (AppendableObjectBufferedWritableByteChannel) (session.open()); - result = session.getResult(); - if (takeover) { - channel.startTakeoverStream(); - } - } - - static BlobAppendableUpload createNewAppendableBlob(BlobInfo blob, BlobWriteSession session) - throws IOException { - return new BlobAppendableUploadImpl(blob, session, false); - } - - static BlobAppendableUpload resumeAppendableUpload(BlobInfo blob, BlobWriteSession session) - throws IOException { - return new BlobAppendableUploadImpl(blob, session, true); - } - - void startTakeoverStream() { - channel.startTakeoverStream(); - } - - @BetaApi - public ApiFuture finalizeUpload() throws IOException { - channel.finalizeWrite(); - close(); - return result; + private final WritableByteChannelSession + delegate; + private boolean open; + + BlobAppendableUploadImpl( + WritableByteChannelSession delegate) { + this.delegate = delegate; + this.open = false; } @Override - public int write(ByteBuffer buffer) throws IOException { - return channel.write(buffer); - } - - @Override - public boolean isOpen() { - return channel.isOpen(); + public AppendableUploadWriteableByteChannel open() throws IOException { + synchronized (this) { + Preconditions.checkState(!open, "already open"); + open = true; + return delegate.open(); + } } @Override - public void close() throws IOException { - if (channel.isOpen()) { - channel.close(); - } + public ApiFuture getResult() { + return delegate.getResult(); } /** @@ -85,16 +60,20 @@ public void close() throws IOException { * wrap it over this one. */ static final class AppendableObjectBufferedWritableByteChannel - implements BufferedWritableByteChannel { + implements BufferedWritableByteChannel, + BlobAppendableUpload.AppendableUploadWriteableByteChannel { private final BufferedWritableByteChannel buffered; private final GapicBidiUnbufferedAppendableWritableByteChannel unbuffered; + private final boolean finalizeOnClose; private final ReentrantLock lock; AppendableObjectBufferedWritableByteChannel( BufferedWritableByteChannel buffered, - GapicBidiUnbufferedAppendableWritableByteChannel unbuffered) { + GapicBidiUnbufferedAppendableWritableByteChannel unbuffered, + boolean finalizeOnClose) { this.buffered = buffered; this.unbuffered = unbuffered; + this.finalizeOnClose = finalizeOnClose; lock = new ReentrantLock(); } @@ -129,27 +108,38 @@ public boolean isOpen() { } @Override - public void close() throws IOException { + public void finalizeAndClose() throws IOException { lock.lock(); try { - buffered.close(); + if (buffered.isOpen()) { + buffered.flush(); + unbuffered.finalizeWrite(); + buffered.close(); + } } finally { lock.unlock(); } } - public void finalizeWrite() throws IOException { + @Override + public void closeWithoutFinalizing() throws IOException { lock.lock(); try { - buffered.flush(); - unbuffered.finalizeWrite(); + if (buffered.isOpen()) { + buffered.close(); + } } finally { lock.unlock(); } } - void startTakeoverStream() { - unbuffered.startAppendableTakeoverStream(); + @Override + public void close() throws IOException { + if (finalizeOnClose) { + finalizeAndClose(); + } else { + closeWithoutFinalizing(); + } } } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ChannelSession.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ChannelSession.java index 92964cf002..25ff1e40e5 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ChannelSession.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ChannelSession.java @@ -37,7 +37,7 @@ class ChannelSession { private volatile ApiFuture channelFuture; - private ChannelSession( + ChannelSession( ApiFuture startFuture, BiFunction, ChannelT> f) { this.startFuture = startFuture; this.resultFuture = SettableApiFuture.create(); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiUnbufferedAppendableWriteableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiUnbufferedAppendableWriteableByteChannel.java index f60b1fe7bc..b5e907bae7 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiUnbufferedAppendableWriteableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiUnbufferedAppendableWriteableByteChannel.java @@ -376,7 +376,7 @@ private void processRetryingMessages() { private class BidiObserver implements ApiStreamObserver { private final Semaphore sem; - private volatile BidiWriteObjectResponse last; + private volatile BidiWriteObjectResponse lastResponseWithResource; private volatile StorageException clientDetectedError; private volatile RuntimeException previousError; @@ -504,14 +504,18 @@ public void onError(Throwable t) { @Override public void onCompleted() { - if (last != null) { - resultFuture.set(last); + if (lastResponseWithResource != null) { + BidiWriteObjectResponse.Builder withSize = lastResponseWithResource.toBuilder(); + withSize.getResourceBuilder().setSize(writeCtx.getConfirmedBytes().longValue()); + resultFuture.set(withSize.build()); } sem.release(); } private void ok(BidiWriteObjectResponse value) { - last = value; + if (value.hasResource()) { + lastResponseWithResource = value; + } first = false; sem.release(); } @@ -558,7 +562,7 @@ void await() { public void reset() { sem.drainPermits(); - last = null; + lastResponseWithResource = null; clientDetectedError = null; previousError = null; } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiWritableByteChannelSessionBuilder.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiWritableByteChannelSessionBuilder.java index 02cca773d8..1e32da70ec 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiWritableByteChannelSessionBuilder.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiWritableByteChannelSessionBuilder.java @@ -22,6 +22,7 @@ import com.google.api.core.SettableApiFuture; import com.google.api.gax.rpc.BidiStreamingCallable; import com.google.api.gax.rpc.UnaryCallable; +import com.google.cloud.storage.BlobAppendableUploadImpl.AppendableObjectBufferedWritableByteChannel; import com.google.cloud.storage.ChannelSession.BufferedWriteSession; import com.google.cloud.storage.Retrying.RetrierWithAlg; import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel; @@ -184,6 +185,7 @@ BufferedAppendableUploadBuilder buffered(FlushPolicy flushPolicy) { final class BufferedAppendableUploadBuilder { private final FlushPolicy flushPolicy; + private boolean finalizeOnClose; private ApiFuture start; private UnaryCallable get; @@ -191,6 +193,11 @@ final class BufferedAppendableUploadBuilder { this.flushPolicy = flushPolicy; } + BufferedAppendableUploadBuilder setFinalizeOnClose(boolean finalizeOnClose) { + this.finalizeOnClose = finalizeOnClose; + return this; + } + /** * Set the Future which will contain the AppendableWrite information necessary to open the * Write stream. @@ -206,7 +213,9 @@ public BufferedAppendableUploadBuilder setGetCallable( return this; } - BufferedWritableByteChannelSession build() { + WritableByteChannelSession< + AppendableObjectBufferedWritableByteChannel, BidiWriteObjectResponse> + build() { // it is theoretically possible that the setter methods for the following variables could // be called again between when this method is invoked and the resulting function is // invoked. @@ -218,7 +227,8 @@ BufferedWritableByteChannelSession build() { RetrierWithAlg boundRetrier = retrier; UnaryCallable boundGet = requireNonNull(get, "get must be non null"); - return new BufferedWriteSession<>( + boolean boundFinalizeOnClose = finalizeOnClose; + return new AppendableSession( requireNonNull(start, "start must be non null"), ((BiFunction< BidiAppendableWrite, @@ -235,10 +245,32 @@ BufferedWritableByteChannelSession build() { new BidiWriteCtx<>(start), Retrying::newCallContext)) .andThen( - c -> - new BlobAppendableUploadImpl.AppendableObjectBufferedWritableByteChannel( - flushPolicy.createBufferedChannel(c), c))); + c -> { + boolean takeOver = + c.getWriteCtx().getRequestFactory().getReq().hasAppendObjectSpec(); + if (takeOver) { + c.startAppendableTakeoverStream(); + } + return new AppendableObjectBufferedWritableByteChannel( + flushPolicy.createBufferedChannel(c), c, boundFinalizeOnClose); + })); } } } + + private static final class AppendableSession + extends ChannelSession< + BidiAppendableWrite, BidiWriteObjectResponse, AppendableObjectBufferedWritableByteChannel> + implements WritableByteChannelSession< + AppendableObjectBufferedWritableByteChannel, BidiWriteObjectResponse> { + private AppendableSession( + ApiFuture startFuture, + BiFunction< + BidiAppendableWrite, + SettableApiFuture, + AppendableObjectBufferedWritableByteChannel> + f) { + super(startFuture, f); + } + } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java index 653c840913..9a01926ce7 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java @@ -35,9 +35,7 @@ import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.paging.AbstractPage; import com.google.api.gax.paging.Page; -import com.google.api.gax.retrying.BasicResultRetryAlgorithm; import com.google.api.gax.retrying.ResultRetryAlgorithm; -import com.google.api.gax.rpc.AbortedException; import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.ApiExceptions; import com.google.api.gax.rpc.ClientStreamingCallable; @@ -87,7 +85,6 @@ import com.google.storage.v2.BidiReadObjectRequest; import com.google.storage.v2.BidiReadObjectSpec; import com.google.storage.v2.BidiWriteObjectRequest; -import com.google.storage.v2.BidiWriteObjectResponse; import com.google.storage.v2.BucketAccessControl; import com.google.storage.v2.ComposeObjectRequest; import com.google.storage.v2.ComposeObjectRequest.SourceObject; @@ -1424,60 +1421,9 @@ public BlobWriteSession blobWriteSession(BlobInfo info, BlobWriteOption... optio @BetaApi @Override public BlobAppendableUpload blobAppendableUpload( - BlobInfo blobInfo, BlobAppendableUploadConfig uploadConfig, BlobWriteOption... options) - throws IOException { - boolean takeOver = blobInfo.getGeneration() != null; + BlobInfo blobInfo, BlobAppendableUploadConfig uploadConfig, BlobWriteOption... options) { Opts opts = Opts.unwrap(options).resolveFrom(blobInfo); - BidiWriteObjectRequest req = - takeOver - ? getBidiWriteObjectRequestForTakeover(blobInfo, opts) - : getBidiWriteObjectRequest(blobInfo, opts); - BidiAppendableWrite baw = new BidiAppendableWrite(req, takeOver); - ApiFuture startAppendableWrite = ApiFutures.immediateFuture(baw); - WritableByteChannelSession build = - ResumableMedia.gapic() - .write() - .bidiByteChannel(storageClient.bidiWriteObjectCallable()) - .setHasher(uploadConfig.getHasher()) - .setByteStringStrategy(ByteStringStrategy.copy()) - .appendable() - .withRetryConfig( - retrier.withAlg( - new BasicResultRetryAlgorithm() { - @Override - public boolean shouldRetry( - Throwable previousThrowable, Object previousResponse) { - // TODO: remove this later once the redirects are not handled by the retry - // loop - ApiException apiEx = null; - if (previousThrowable instanceof StorageException) { - StorageException se = (StorageException) previousThrowable; - Throwable cause = se.getCause(); - if (cause instanceof ApiException) { - apiEx = (ApiException) cause; - } - } - if (apiEx instanceof AbortedException) { - return true; - } - return retryAlgorithmManager - .idempotent() - .shouldRetry(previousThrowable, null); - } - })) - .buffered(uploadConfig.getFlushPolicy()) - .setStartAsync(startAppendableWrite) - .setGetCallable(storageClient.getObjectCallable()) - .build(); - DefaultBlobWriteSessionConfig.DecoratedWritableByteChannelSession< - BufferedWritableByteChannel, BidiWriteObjectResponse> - dec = - new DefaultBlobWriteSessionConfig.DecoratedWritableByteChannelSession<>( - build, BidiBlobWriteSessionConfig.Factory.WRITE_OBJECT_RESPONSE_BLOB_INFO_DECODER); - BlobWriteSession session = BlobWriteSessions.of(dec); - return takeOver - ? BlobAppendableUploadImpl.resumeAppendableUpload(blobInfo, session) - : BlobAppendableUploadImpl.createNewAppendableBlob(blobInfo, session); + return uploadConfig.create(this, blobInfo, opts); } @Override diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/OtelStorageDecorator.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/OtelStorageDecorator.java index 6e665c4de6..6733631091 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/OtelStorageDecorator.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/OtelStorageDecorator.java @@ -1506,19 +1506,25 @@ public ApiFuture blobReadSession(BlobId id, BlobSourceOption... @Override public BlobAppendableUpload blobAppendableUpload( - BlobInfo blobInfo, BlobAppendableUploadConfig uploadConfig, BlobWriteOption... options) - throws IOException { - Span span = tracer.spanBuilder("appendableBlobUpload").startSpan(); + BlobInfo blobInfo, BlobAppendableUploadConfig uploadConfig, BlobWriteOption... options) { + + Span span = + tracer + .spanBuilder("appendableBlobUpload") + .setAttribute("gsutil.uri", blobInfo.getBlobId().toGsUtilUriWithGeneration()) + .startSpan(); try (Scope ignore = span.makeCurrent()) { return new OtelDecoratingBlobAppendableUpload( - delegate.blobAppendableUpload(blobInfo, uploadConfig, options), span, Context.current()); + delegate.blobAppendableUpload(blobInfo, uploadConfig, options)); } catch (Throwable t) { span.recordException(t); span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); span.end(); throw t; + } finally { + span.end(); } } @@ -2104,57 +2110,98 @@ public String toString() { final class OtelDecoratingBlobAppendableUpload implements BlobAppendableUpload { private final BlobAppendableUpload delegate; - private final Span openSpan; - private final Context openContext; + private final Tracer tracer; - private OtelDecoratingBlobAppendableUpload( - BlobAppendableUpload delegate, Span openSpan, Context openContext) { + private OtelDecoratingBlobAppendableUpload(BlobAppendableUpload delegate) { this.delegate = delegate; - this.openSpan = openSpan; - this.openContext = openContext; - } - - @Override - public int write(ByteBuffer src) throws IOException { - return delegate.write(src); + this.tracer = + TracerDecorator.decorate( + Context.current(), + otel, + OtelStorageDecorator.this.baseAttributes, + BlobAppendableUpload.class.getName() + "/"); } @Override - public void close() throws IOException { - try { - delegate.close(); + public AppendableUploadWriteableByteChannel open() throws IOException { + Span openSpan = tracer.spanBuilder("open").startSpan(); + try (Scope ignore = openSpan.makeCurrent()) { + AppendableUploadWriteableByteChannel delegate = this.delegate.open(); + return new OtelDecoratingAppendableUploadWriteableByteChannel(delegate, openSpan); } catch (Throwable t) { openSpan.recordException(t); openSpan.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); throw t; - } finally { - openSpan.end(); } } @Override - @BetaApi - public ApiFuture finalizeUpload() throws IOException { - Span span = - tracer - .spanBuilder("appendableBlobUpload/finalizeUpload") - .setParent(openContext) - .startSpan(); - try (Scope ignore = span.makeCurrent()) { - return delegate.finalizeUpload(); - } catch (Throwable t) { - span.recordException(t); - span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); - throw t; - } finally { - span.end(); - openSpan.end(); - } + public ApiFuture getResult() { + return delegate.getResult(); } - @Override - public boolean isOpen() { - return delegate.isOpen(); + private final class OtelDecoratingAppendableUploadWriteableByteChannel + implements AppendableUploadWriteableByteChannel { + private final AppendableUploadWriteableByteChannel delegate; + private final Span openSpan; + + private OtelDecoratingAppendableUploadWriteableByteChannel( + AppendableUploadWriteableByteChannel delegate, Span openSpan) { + this.delegate = delegate; + this.openSpan = openSpan; + } + + @Override + @BetaApi + public void finalizeAndClose() throws IOException { + try { + delegate.finalizeAndClose(); + } catch (IOException | RuntimeException e) { + openSpan.recordException(e); + openSpan.setStatus(StatusCode.ERROR, e.getClass().getSimpleName()); + throw e; + } finally { + openSpan.end(); + } + } + + @Override + @BetaApi + public void closeWithoutFinalizing() throws IOException { + try { + delegate.closeWithoutFinalizing(); + } catch (IOException | RuntimeException e) { + openSpan.recordException(e); + openSpan.setStatus(StatusCode.ERROR, e.getClass().getSimpleName()); + throw e; + } finally { + openSpan.end(); + } + } + + @Override + @BetaApi + public void close() throws IOException { + try { + delegate.close(); + } catch (IOException | RuntimeException e) { + openSpan.recordException(e); + openSpan.setStatus(StatusCode.ERROR, e.getClass().getSimpleName()); + throw e; + } finally { + openSpan.end(); + } + } + + @Override + public int write(ByteBuffer src) throws IOException { + return delegate.write(src); + } + + @Override + public boolean isOpen() { + return delegate.isOpen(); + } } } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/Storage.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/Storage.java index 07cf74001d..64f7d4a5dd 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/Storage.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/Storage.java @@ -5901,21 +5901,21 @@ default ApiFuture blobReadSession(BlobId id, BlobSourceOption.. * BlobInfo blobInfo = BlobInfo.newBuilder(blobId).build(); * ReadableByteChannel readableByteChannel = ...; * - * try ( - * BlobAppendableUpload blobAppendableUpload = storage.blobAppendableUpload( + * BlobAppendableUpload uploadSession = storage.blobAppendableUpload( * blobInfo, * BlobAppendableUploadConfig.of() - * ) - * ) { - * // copy all bytes - * ByteStreams.copy(readableByteChannel, blobAppendableUpload); + * ); + * try (AppendableUploadWriteableByteChannel channel = uploadSession.open()) { + * // copy all bytes + * ByteStreams.copy(readableByteChannel, channel); + * channel.finalizeAndClose(); + * } catch (IOException ex) { + * // handle IOException + * } + * * // get the resulting object metadata - * ApiFuture resultFuture = blobAppendableUpload.finalizeUpload(); + * ApiFuture resultFuture = uploadSession.getResult(); * BlobInfo gen1 = resultFuture.get(); - * - * } catch (IOException e) { - * // handle IOException - * } * } * * @param blobInfo blobInfo to create @@ -5927,8 +5927,7 @@ default ApiFuture blobReadSession(BlobId id, BlobSourceOption.. @BetaApi @TransportCompatibility({Transport.GRPC}) default BlobAppendableUpload blobAppendableUpload( - BlobInfo blobInfo, BlobAppendableUploadConfig uploadConfig, BlobWriteOption... options) - throws IOException { + BlobInfo blobInfo, BlobAppendableUploadConfig uploadConfig, BlobWriteOption... options) { return throwGrpcOnly( fmtMethodName("appendableBlobUpload", BlobId.class, BlobWriteOption.class)); } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITAppendableUploadFakeTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITAppendableUploadFakeTest.java index 6153e02554..5528e004fe 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITAppendableUploadFakeTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITAppendableUploadFakeTest.java @@ -23,6 +23,8 @@ import com.google.api.core.SettableApiFuture; import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.rpc.AbortedException; +import com.google.cloud.storage.BlobAppendableUpload.AppendableUploadWriteableByteChannel; +import com.google.cloud.storage.BlobAppendableUploadConfig.CloseAction; import com.google.cloud.storage.it.ChecksummedTestContent; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; @@ -49,6 +51,7 @@ import io.grpc.protobuf.ProtoUtils; import io.grpc.stub.StreamObserver; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -90,7 +93,8 @@ public class ITAppendableUploadFakeTest { private static final BlobAppendableUploadConfig UPLOAD_CONFIG = BlobAppendableUploadConfig.of() .withFlushPolicy(FlushPolicy.maxFlushSize(5)) - .withCrc32cValidationEnabled(false); + .withCrc32cValidationEnabled(false) + .withCloseAction(CloseAction.FINALIZE_WHEN_CLOSING); /** * @@ -205,8 +209,10 @@ public void bidiWriteObjectRedirectedError() throws Exception { BlobId id = BlobId.of("b", "o"); BlobAppendableUpload b = storage.blobAppendableUpload(BlobInfo.newBuilder(id).build(), UPLOAD_CONFIG); - b.write(ByteBuffer.wrap(content.getBytes())); - BlobInfo bi = b.finalizeUpload().get(5, TimeUnit.SECONDS); + try (AppendableUploadWriteableByteChannel channel = b.open()) { + channel.write(ByteBuffer.wrap(content.getBytes())); + } + BlobInfo bi = b.getResult().get(5, TimeUnit.SECONDS); assertThat(bi.getSize()).isEqualTo(10); } } @@ -264,18 +270,36 @@ public void bidiWriteObjectRedirectedError_maxAttempts() throws Exception { respond -> respond.onError(statusRuntimeException))); try (FakeServer fakeServer = FakeServer.of(fake); - Storage storage = fakeServer.getGrpcStorageOptions().toBuilder().build().getService()) { + Storage storage = + fakeServer + .getGrpcStorageOptions() + .toBuilder() + .setRetrySettings( + fakeServer + .getGrpcStorageOptions() + .getRetrySettings() + .toBuilder() + .setRetryDelayMultiplier(1.0) + .setInitialRetryDelayDuration(Duration.ofMillis(10)) + .build()) + .build() + .getService()) { BlobId id = BlobId.of("b", "o"); BlobAppendableUpload b = storage.blobAppendableUpload(BlobInfo.newBuilder(id).build(), UPLOAD_CONFIG); - StorageException e = - assertThrows( - StorageException.class, - () -> { - b.write(ByteBuffer.wrap("ABCDE".getBytes())); - }); - assertThat(e).hasCauseThat().isInstanceOf(AbortedException.class); + AppendableUploadWriteableByteChannel channel = b.open(); + try { + StorageException e = + assertThrows( + StorageException.class, + () -> { + channel.write(ByteBuffer.wrap("ABCDE".getBytes())); + }); + assertThat(e).hasCauseThat().isInstanceOf(AbortedException.class); + } finally { + channel.close(); + } } } @@ -384,8 +408,10 @@ public void bidiWriteObjectRetryableError() throws Exception { BlobAppendableUpload b = storage.blobAppendableUpload(BlobInfo.newBuilder(id).build(), UPLOAD_CONFIG); ChecksummedTestContent content = ChecksummedTestContent.of(ALL_OBJECT_BYTES, 0, 10); - b.write(ByteBuffer.wrap(content.getBytes())); - BlobInfo bi = b.finalizeUpload().get(5, TimeUnit.SECONDS); + try (AppendableUploadWriteableByteChannel channel = b.open()) { + channel.write(ByteBuffer.wrap(content.getBytes())); + } + BlobInfo bi = b.getResult().get(5, TimeUnit.SECONDS); assertThat(bi.getSize()).isEqualTo(10); } } @@ -497,8 +523,10 @@ public void retryableErrorIncompleteFlush() throws Exception { BlobAppendableUpload b = storage.blobAppendableUpload(BlobInfo.newBuilder(id).build(), UPLOAD_CONFIG); ChecksummedTestContent content = ChecksummedTestContent.of(ALL_OBJECT_BYTES, 0, 10); - b.write(ByteBuffer.wrap(content.getBytes())); - BlobInfo bi = b.finalizeUpload().get(5, TimeUnit.SECONDS); + try (AppendableUploadWriteableByteChannel channel = b.open()) { + channel.write(ByteBuffer.wrap(content.getBytes())); + } + BlobInfo bi = b.getResult().get(5, TimeUnit.SECONDS); assertThat(bi.getSize()).isEqualTo(10); } } @@ -1432,8 +1460,10 @@ public void takeoverRedirectError() throws Exception { BlobAppendableUpload b = storage.blobAppendableUpload(BlobInfo.newBuilder(id).build(), UPLOAD_CONFIG); ChecksummedTestContent content = ChecksummedTestContent.of(ALL_OBJECT_BYTES, 10, 10); - b.write(ByteBuffer.wrap(content.getBytes())); - BlobInfo bi = b.finalizeUpload().get(5, TimeUnit.SECONDS); + try (AppendableUploadWriteableByteChannel channel = b.open()) { + channel.write(ByteBuffer.wrap(content.getBytes())); + } + BlobInfo bi = b.getResult().get(5, TimeUnit.SECONDS); assertThat(bi.getSize()).isEqualTo(20); } } @@ -1485,8 +1515,10 @@ req1, retryableErrorOnce(req1, res, map, 2), req2, maxRetries(req2, res, map, 1) BlobAppendableUpload b = storage.blobAppendableUpload(BlobInfo.newBuilder(id).build(), UPLOAD_CONFIG); ChecksummedTestContent content = ChecksummedTestContent.of(ALL_OBJECT_BYTES, 0, 5); - b.write(ByteBuffer.wrap(content.getBytes())); - BlobInfo bi = b.finalizeUpload().get(5, TimeUnit.SECONDS); + try (AppendableUploadWriteableByteChannel channel = b.open()) { + channel.write(ByteBuffer.wrap(content.getBytes())); + } + BlobInfo bi = b.getResult().get(5, TimeUnit.SECONDS); assertThat(bi.getSize()).isEqualTo(5); assertThat(map.get(req1)).isEqualTo(2); @@ -1577,11 +1609,12 @@ public void crc32cWorks() throws Exception { BlobId id = BlobId.of("b", "o"); BlobAppendableUploadConfig uploadConfig = UPLOAD_CONFIG.withCrc32cValidationEnabled(true); - try (BlobAppendableUpload upload = - storage.blobAppendableUpload(BlobInfo.newBuilder(id).build(), uploadConfig)) { - upload.write(ByteBuffer.wrap(b)); - upload.finalizeUpload().get(5, TimeUnit.SECONDS); + BlobAppendableUpload upload = + storage.blobAppendableUpload(BlobInfo.newBuilder(id).build(), uploadConfig); + try (AppendableUploadWriteableByteChannel channel = upload.open()) { + channel.write(ByteBuffer.wrap(b)); } + upload.getResult().get(5, TimeUnit.SECONDS); } } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITAppendableUploadTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITAppendableUploadTest.java index dc12c32c7a..c07e7b44da 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITAppendableUploadTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITAppendableUploadTest.java @@ -16,10 +16,11 @@ package com.google.cloud.storage; import static com.google.cloud.storage.ByteSizeConstants._2MiB; +import static com.google.cloud.storage.TestUtils.assertAll; import static com.google.common.truth.Truth.assertThat; -import com.google.cloud.storage.Storage.BlobField; -import com.google.cloud.storage.Storage.BlobGetOption; +import com.google.cloud.storage.BlobAppendableUpload.AppendableUploadWriteableByteChannel; +import com.google.cloud.storage.BlobAppendableUploadConfig.CloseAction; import com.google.cloud.storage.TransportCompatibility.Transport; import com.google.cloud.storage.it.runner.StorageITRunner; import com.google.cloud.storage.it.runner.annotations.Backend; @@ -60,7 +61,9 @@ public final class ITAppendableUploadTest { public void testAppendableBlobUpload() throws IOException, ExecutionException, InterruptedException, TimeoutException { BlobAppendableUploadConfig uploadConfig = - BlobAppendableUploadConfig.of().withFlushPolicy(FlushPolicy.maxFlushSize(2000)); + BlobAppendableUploadConfig.of() + .withFlushPolicy(FlushPolicy.maxFlushSize(2000)) + .withCloseAction(CloseAction.FINALIZE_WHEN_CLOSING); BlobAppendableUpload upload = storage.blobAppendableUpload( BlobInfo.newBuilder(bucket, generator.randomObjectName()).build(), uploadConfig); @@ -68,30 +71,46 @@ public void testAppendableBlobUpload() byte[] bytes = DataGenerator.base64Characters().genBytes(512 * 1024); byte[] a1 = Arrays.copyOfRange(bytes, 0, bytes.length / 2); byte[] a2 = Arrays.copyOfRange(bytes, bytes.length / 2 + 1, bytes.length); - - upload.write(ByteBuffer.wrap(a1)); - upload.write(ByteBuffer.wrap(a2)); - BlobInfo blob = upload.finalizeUpload().get(5, TimeUnit.SECONDS); + try (AppendableUploadWriteableByteChannel channel = upload.open()) { + channel.write(ByteBuffer.wrap(a1)); + channel.write(ByteBuffer.wrap(a2)); + } + BlobInfo blob = upload.getResult().get(5, TimeUnit.SECONDS); assertThat(blob.getSize()).isEqualTo(a1.length + a2.length); + + BlobInfo actual = upload.getResult().get(5, TimeUnit.SECONDS); + BlobInfo blob1 = storage.get(actual.getBlobId()); + assertThat(actual).isEqualTo(blob1); } @Test - public void appendableBlobUploadWithoutFinalizing() throws IOException { + public void appendableBlobUploadWithoutFinalizing() throws Exception { BlobAppendableUploadConfig uploadConfig = BlobAppendableUploadConfig.of().withFlushPolicy(FlushPolicy.maxFlushSize(256 * 1024)); - BlobAppendableUpload upload = - storage.blobAppendableUpload( - BlobInfo.newBuilder(bucket, generator.randomObjectName()).build(), uploadConfig); + BlobInfo info = BlobInfo.newBuilder(bucket, generator.randomObjectName()).build(); + BlobAppendableUpload upload = storage.blobAppendableUpload(info, uploadConfig); byte[] bytes = DataGenerator.base64Characters().genBytes(512 * 1024); byte[] a1 = Arrays.copyOfRange(bytes, 0, bytes.length / 2); byte[] a2 = Arrays.copyOfRange(bytes, bytes.length / 2 + 1, bytes.length); - upload.write(ByteBuffer.wrap(a1)); - upload.write(ByteBuffer.wrap(a2)); - - upload.close(); + try (AppendableUploadWriteableByteChannel channel = upload.open()) { + channel.write(ByteBuffer.wrap(a1)); + channel.write(ByteBuffer.wrap(a2)); + } + BlobInfo actual = upload.getResult().get(5, TimeUnit.SECONDS); + assertAll( + () -> assertThat(actual).isNotNull(), + () -> assertThat(actual.getSize()).isEqualTo(512 * 1024 - 1), + () -> { + String crc32c = actual.getCrc32c(); + // prod is null + boolean crc32cNull = crc32c == null; + // testbench is 0 + boolean crc32cZero = Utils.crc32cCodec.encode(0).equalsIgnoreCase(crc32c); + assertThat(crc32cNull || crc32cZero).isTrue(); + }); } @Test @@ -106,16 +125,18 @@ public void appendableBlobUploadTakeover() throws Exception { byte[] bytes = "ABCDEFGHIJ".getBytes(); - upload.write(ByteBuffer.wrap(bytes)); - upload.close(); - - Blob blob = storage.get(bid); + try (AppendableUploadWriteableByteChannel channel = upload.open()) { + channel.write(ByteBuffer.wrap(bytes)); + } + BlobInfo blob = upload.getResult().get(5, TimeUnit.SECONDS); byte[] bytes2 = "KLMNOPQRST".getBytes(); BlobAppendableUpload takeOver = storage.blobAppendableUpload(BlobInfo.newBuilder(blob.getBlobId()).build(), uploadConfig); - takeOver.write(ByteBuffer.wrap(bytes2)); - BlobInfo i = takeOver.finalizeUpload().get(5, TimeUnit.SECONDS); + try (AppendableUploadWriteableByteChannel channel = takeOver.open()) { + channel.write(ByteBuffer.wrap(bytes2)); + } + BlobInfo i = takeOver.getResult().get(5, TimeUnit.SECONDS); assertThat(i.getSize()).isEqualTo(20); } @@ -128,35 +149,19 @@ public void testUploadFileUsingAppendable() throws Exception { try (TmpFile tmpFile = DataGenerator.base64Characters() .tempFile(Paths.get(System.getProperty("java.io.tmpdir")), 100 * 1024 * 1024)) { - try (BlobAppendableUpload appendable = - storage.blobAppendableUpload(BlobInfo.newBuilder(bid).build(), uploadConfig); + + BlobAppendableUpload appendable = + storage.blobAppendableUpload(BlobInfo.newBuilder(bid).build(), uploadConfig); + try (AppendableUploadWriteableByteChannel channel = appendable.open(); SeekableByteChannel r = Files.newByteChannel(tmpFile.getPath(), StandardOpenOption.READ)) { - - ByteStreams.copy(r, appendable); - BlobInfo bi = appendable.finalizeUpload().get(5, TimeUnit.SECONDS); - assertThat(bi.getSize()).isEqualTo(100 * 1024 * 1024); + ByteStreams.copy(r, channel); } + BlobInfo bi = appendable.getResult().get(5, TimeUnit.SECONDS); + assertThat(bi.getSize()).isEqualTo(100 * 1024 * 1024); } } - @Test - // Pending work in testbench, manually verified internally on 2025-03-25 - @CrossRun.Ignore(backends = {Backend.TEST_BENCH}) - public void finalizeAfterCloseWorks() throws Exception { - BlobAppendableUploadConfig uploadConfig = - BlobAppendableUploadConfig.of().withFlushPolicy(FlushPolicy.maxFlushSize(1024)); - BlobId bid = BlobId.of(bucket.getName(), generator.randomObjectName()); - - BlobAppendableUpload appendable = - storage.blobAppendableUpload(BlobInfo.newBuilder(bid).build(), uploadConfig); - appendable.write(DataGenerator.base64Characters().genByteBuffer(3587)); - - appendable.close(); - BlobInfo bi = appendable.finalizeUpload().get(5, TimeUnit.SECONDS); - assertThat(bi.getSize()).isEqualTo(3587); - } - @Test // Pending work in testbench, manually verified internally on 2025-03-25 @CrossRun.Ignore(backends = {Backend.TEST_BENCH}) @@ -168,16 +173,22 @@ public void takeoverJustToFinalizeWorks() throws Exception { BlobAppendableUpload upload = storage.blobAppendableUpload(BlobInfo.newBuilder(bid).build(), uploadConfig); - upload.write(DataGenerator.base64Characters().genByteBuffer(20)); - upload.close(); + try (AppendableUploadWriteableByteChannel channel = upload.open()) { + channel.write(DataGenerator.base64Characters().genByteBuffer(20)); + } - Blob blob = - storage.get( - bid, BlobGetOption.fields(BlobField.BUCKET, BlobField.NAME, BlobField.GENERATION)); + BlobInfo blob = upload.getResult().get(5, TimeUnit.SECONDS); BlobAppendableUpload takeOver = storage.blobAppendableUpload(BlobInfo.newBuilder(blob.getBlobId()).build(), uploadConfig); - BlobInfo i = takeOver.finalizeUpload().get(5, TimeUnit.SECONDS); + takeOver.open().finalizeAndClose(); + BlobInfo i = takeOver.getResult().get(5, TimeUnit.SECONDS); assertThat(i.getSize()).isEqualTo(20); + + BlobInfo actual = takeOver.getResult().get(5, TimeUnit.SECONDS); + assertAll( + () -> assertThat(actual).isNotNull(), + () -> assertThat(actual.getSize()).isEqualTo(20), + () -> assertThat(actual.getCrc32c()).isNotNull()); } } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITObjectReadSessionTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITObjectReadSessionTest.java index 52f4ad383a..fe6e1f5021 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITObjectReadSessionTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITObjectReadSessionTest.java @@ -26,6 +26,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; import com.google.api.gax.rpc.OutOfRangeException; +import com.google.cloud.storage.BlobAppendableUpload.AppendableUploadWriteableByteChannel; import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown; import com.google.cloud.storage.Storage.BlobWriteOption; import com.google.cloud.storage.TransportCompatibility.Transport; @@ -351,11 +352,14 @@ public void outOfRange() private BlobInfo create(ChecksummedTestContent content) throws IOException, ExecutionException, InterruptedException, TimeoutException { BlobInfo info = BlobInfo.newBuilder(bucket, generator.randomObjectName()).build(); - try (BlobAppendableUpload upload = + + BlobAppendableUpload upload = storage.blobAppendableUpload( - info, BlobAppendableUploadConfig.of(), BlobWriteOption.doesNotExist())) { - upload.write(ByteBuffer.wrap(content.getBytes())); - return upload.finalizeUpload().get(30, TimeUnit.SECONDS); + info, BlobAppendableUploadConfig.of(), BlobWriteOption.doesNotExist()); + try (AppendableUploadWriteableByteChannel channel = upload.open(); ) { + channel.write(ByteBuffer.wrap(content.getBytes())); + channel.finalizeAndClose(); } + return upload.getResult().get(5, TimeUnit.SECONDS); } } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/AbstractStorageProxy.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/AbstractStorageProxy.java index b325948bde..eeae233825 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/AbstractStorageProxy.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/AbstractStorageProxy.java @@ -500,8 +500,7 @@ public ApiFuture blobReadSession(BlobId id, BlobSourceOption... @Override public BlobAppendableUpload blobAppendableUpload( - BlobInfo blobInfo, BlobAppendableUploadConfig uploadConfig, BlobWriteOption... options) - throws IOException { + BlobInfo blobInfo, BlobAppendableUploadConfig uploadConfig, BlobWriteOption... options) { return delegate.blobAppendableUpload(blobInfo, uploadConfig, options); }