Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.BetaApi;
import com.google.api.core.InternalExtensionOnly;
import com.google.cloud.storage.BlobAppendableUploadConfig.CloseAction;
import com.google.cloud.storage.Storage.BlobWriteOption;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.util.concurrent.TimeUnit;

/**
* Interface representing those methods which can be used to write to and interact with an
Expand All @@ -32,37 +34,120 @@
*/
@BetaApi
@InternalExtensionOnly
public interface BlobAppendableUpload extends AutoCloseable, WritableByteChannel {
public interface BlobAppendableUpload extends BlobWriteSession {

/**
* Write some bytes to the appendable session. Whether a flush happens will depend on how many
* bytes have been written prior, how many bytes are being written now and what {@link
* BlobAppendableUploadConfig} was provided when creating the {@link BlobAppendableUpload}.
* Open the {@link AppendableUploadWriteableByteChannel AppendableUploadWriteableByteChannel} for
* this session.
*
* <p>This method can block the invoking thread in order to ensure written bytes are acknowledged
* by Google Cloud Storage.
* <p>A session may only be {@code open}ed once. If multiple calls to open are made, an illegal
* state exception will be thrown
*
* @see Storage#blobAppendableUpload(BlobInfo, BlobAppendableUploadConfig, BlobWriteOption...)
* <p>The returned {@code AppendableUploadWriteableByteChannel} can throw IOExceptions from any of
* its usual methods. Any {@link IOException} thrown can have a cause of a {@link
* StorageException}. However, not all {@code IOExceptions} will have {@code StorageException}s.
*
* @throws IOException When creating the {@link AppendableUploadWriteableByteChannel} if an
* unrecoverable underlying IOException occurs it can be rethrown
* @throws IllegalStateException if open is called more than once
* @since 2.51.0 This new api is in preview and is subject to breaking changes.
*/
@Override
int write(ByteBuffer src) throws IOException;
AppendableUploadWriteableByteChannel open() throws IOException;

/**
* Close this instance to further {@link #write(ByteBuffer)}ing. This will close any underlying
* stream and release any releasable resources once out of scope.
* Return an {@link ApiFuture}{@code <BlobInfo>} which will represent the state of the object in
* Google Cloud Storage.
*
* <p>This future will not resolve until:
*
* <ol>
* <li>The object is successfully finalized in Google Cloud Storage by calling {@link
* AppendableUploadWriteableByteChannel#finalizeAndClose()
* AppendableUploadWriteableByteChannel#finalizeAndClose()}
* <li>This session is detached from the upload without finalizing by calling {@link
* AppendableUploadWriteableByteChannel#closeWithoutFinalizing()
* AppendableUploadWriteableByteChannel#closeWithoutFinalizing()}
* <li>The session is closed by calling {@link AppendableUploadWriteableByteChannel#close()
* AppendableUploadWriteableByteChannel#close()}
* <li>A terminal failure occurs, the terminal failure will become the exception result
* </ol>
*
* <p><i>NOTICE:</i> Some fields may not be populated unless finalization has completed.
*
* <p>If a terminal failure is encountered, calling either {@link ApiFuture#get()} or {@link
* ApiFuture#get(long, TimeUnit)} will result in an {@link
* java.util.concurrent.ExecutionException} with the cause.
*
* <p>{@link #finalizeUpload()} can be called after this method, but it will not carry any bytes
* with it.
* @since 2.51.0 This new api is in preview and is subject to breaking changes.
*/
@Override
void close() throws IOException;
ApiFuture<BlobInfo> getResult();

/**
* Finalize the appendable upload, close any underlying stream and release any releasable
* resources once out of scope.
* The {@link WritableByteChannel} returned from {@link BlobAppendableUpload#open()}.
*
* <p>Once this method is called, and returns no more writes to the object will be allowed by GCS.
* <p>This interface allows writing bytes to an Appendable Upload, and provides methods to close
* this channel -- optionally finalizing the upload.
*
* @since 2.51.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
ApiFuture<BlobInfo> finalizeUpload() throws IOException;
@InternalExtensionOnly
interface AppendableUploadWriteableByteChannel extends WritableByteChannel {

/**
* Finalize the upload and close this instance to further {@link #write(ByteBuffer)}ing. This
* will close any underlying stream and release any releasable resources once out of scope.
*
* <p>Once this method is called, and returns no more writes to the object will be allowed by
* GCS.
*
* <p>This method and {@link #close()} are mutually exclusive. If one of the other methods are
* called before this method, this method will be a no-op.
*
* @see Storage#blobAppendableUpload(BlobInfo, BlobAppendableUploadConfig, BlobWriteOption...)
* @see BlobAppendableUploadConfig.CloseAction#FINALIZE_WHEN_CLOSING
* @see BlobAppendableUploadConfig#getCloseAction()
* @see BlobAppendableUploadConfig#withCloseAction(CloseAction)
* @since 2.51.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
void finalizeAndClose() throws IOException;

/**
* Close this instance to further {@link #write(ByteBuffer)}ing without finalizing the upload.
* This will close any underlying stream and release any releasable resources once out of scope.
*
* <p>This method, {@link AppendableUploadWriteableByteChannel#finalizeAndClose()} and {@link
* AppendableUploadWriteableByteChannel#close()} are mutually exclusive. If one of the other
* methods are called before this method, this method will be a no-op.
*
* @see Storage#blobAppendableUpload(BlobInfo, BlobAppendableUploadConfig, BlobWriteOption...)
* @see BlobAppendableUploadConfig.CloseAction#CLOSE_WITHOUT_FINALIZING
* @see BlobAppendableUploadConfig#getCloseAction()
* @see BlobAppendableUploadConfig#withCloseAction(CloseAction)
* @since 2.51.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
void closeWithoutFinalizing() throws IOException;

/**
* Close this instance to further {@link #write(ByteBuffer)}ing.
*
* <p>Whether the upload is finalized during this depends on the {@link
* BlobAppendableUploadConfig#getCloseAction()} provided to create the {@link
* BlobAppendableUpload}. If {@link BlobAppendableUploadConfig#getCloseAction()}{@code ==
* }{@link CloseAction#FINALIZE_WHEN_CLOSING}, {@link #finalizeAndClose()} will be called. If
* {@link BlobAppendableUploadConfig#getCloseAction()}{@code == }{@link
* CloseAction#CLOSE_WITHOUT_FINALIZING}, {@link #closeWithoutFinalizing()} will be called.
*
* @see Storage#blobAppendableUpload(BlobInfo, BlobAppendableUploadConfig, BlobWriteOption...)
* @see BlobAppendableUploadConfig#getCloseAction()
* @see BlobAppendableUploadConfig#withCloseAction(CloseAction)
* @since 2.51.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
void close() throws IOException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,21 @@
import static com.google.cloud.storage.ByteSizeConstants._256KiB;
import static java.util.Objects.requireNonNull;

import com.google.api.core.ApiFutures;
import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.api.gax.retrying.BasicResultRetryAlgorithm;
import com.google.api.gax.rpc.AbortedException;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.storage.BlobAppendableUpload.AppendableUploadWriteableByteChannel;
import com.google.cloud.storage.BlobAppendableUploadImpl.AppendableObjectBufferedWritableByteChannel;
import com.google.cloud.storage.Storage.BlobWriteOption;
import com.google.cloud.storage.TransportCompatibility.Transport;
import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt;
import com.google.cloud.storage.UnifiedOpts.Opts;
import com.google.storage.v2.BidiWriteObjectRequest;
import com.google.storage.v2.BidiWriteObjectResponse;
import com.google.storage.v2.Object;
import javax.annotation.concurrent.Immutable;

/**
Expand All @@ -39,14 +50,20 @@
public final class BlobAppendableUploadConfig {

private static final BlobAppendableUploadConfig INSTANCE =
new BlobAppendableUploadConfig(FlushPolicy.minFlushSize(_256KiB), Hasher.enabled());
new BlobAppendableUploadConfig(
FlushPolicy.minFlushSize(_256KiB),
Hasher.enabled(),
CloseAction.CLOSE_WITHOUT_FINALIZING);

private final FlushPolicy flushPolicy;
private final Hasher hasher;
private final CloseAction closeAction;

private BlobAppendableUploadConfig(FlushPolicy flushPolicy, Hasher hasher) {
private BlobAppendableUploadConfig(
FlushPolicy flushPolicy, Hasher hasher, CloseAction closeAction) {
this.flushPolicy = flushPolicy;
this.hasher = hasher;
this.closeAction = closeAction;
}

/**
Expand Down Expand Up @@ -77,7 +94,37 @@ public BlobAppendableUploadConfig withFlushPolicy(FlushPolicy flushPolicy) {
if (this.flushPolicy.equals(flushPolicy)) {
return this;
}
return new BlobAppendableUploadConfig(flushPolicy, hasher);
return new BlobAppendableUploadConfig(flushPolicy, hasher, closeAction);
}

/**
* The {@link CloseAction} which will dictate the behavior of {@link
* AppendableUploadWriteableByteChannel#close()}.
*
* <p><i>Default:</i> {@link CloseAction#CLOSE_WITHOUT_FINALIZING}
*
* @see #withCloseAction(CloseAction)
* @since 2.51.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
public CloseAction getCloseAction() {
return closeAction;
}

/**
* Return an instance with the {@code CloseAction} set to be the specified value. <i>Default:</i>
* {@link CloseAction#CLOSE_WITHOUT_FINALIZING}
*
* @see #getCloseAction()
* @since 2.51.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
public BlobAppendableUploadConfig withCloseAction(CloseAction closeAction) {
requireNonNull(closeAction, "closeAction must be non null");
if (this.closeAction == closeAction) {
return this;
}
return new BlobAppendableUploadConfig(flushPolicy, hasher, closeAction);
}

/**
Expand Down Expand Up @@ -108,7 +155,8 @@ BlobAppendableUploadConfig withCrc32cValidationEnabled(boolean enabled) {
} else if (!enabled && Hasher.noop().equals(hasher)) {
return this;
}
return new BlobAppendableUploadConfig(flushPolicy, enabled ? Hasher.enabled() : Hasher.noop());
return new BlobAppendableUploadConfig(
flushPolicy, enabled ? Hasher.enabled() : Hasher.noop(), closeAction);
}

/** Never to be made public until {@link Hasher} is public */
Expand All @@ -125,6 +173,7 @@ Hasher getHasher() {
* <pre>{@code
* BlobAppendableUploadConfig.of()
* .withFlushPolicy(FlushPolicy.minFlushSize(256 * 1024))
* .withCloseAction(CloseAction.CLOSE_WITHOUT_FINALIZING)
* }</pre>
*
* @since 2.51.0 This new api is in preview and is subject to breaking changes.
Expand All @@ -134,4 +183,89 @@ Hasher getHasher() {
public static BlobAppendableUploadConfig of() {
return INSTANCE;
}

/**
* Enum providing the possible actions which can be taken during the {@link
* AppendableUploadWriteableByteChannel#close()} call.
*
* @see AppendableUploadWriteableByteChannel#close()
* @see BlobAppendableUploadConfig#withCloseAction(CloseAction)
* @see BlobAppendableUploadConfig#getCloseAction()
* @since 2.51.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
public enum CloseAction {
/**
* Designate that when {@link AppendableUploadWriteableByteChannel#close()} is called, the
* appendable upload should be finalized.
*
* @since 2.51.0 This new api is in preview and is subject to breaking changes.
* @see AppendableUploadWriteableByteChannel#finalizeAndClose()
*/
@BetaApi
FINALIZE_WHEN_CLOSING,
/**
* Designate that when {@link AppendableUploadWriteableByteChannel#close()} is called, the
* appendable upload should NOT be finalized, allowing for takeover by another session or
* client.
*
* @since 2.51.0 This new api is in preview and is subject to breaking changes.
* @see AppendableUploadWriteableByteChannel#closeWithoutFinalizing()
*/
@BetaApi
CLOSE_WITHOUT_FINALIZING
}

BlobAppendableUpload create(GrpcStorageImpl storage, BlobInfo info, Opts<ObjectTargetOpt> opts) {
boolean takeOver = info.getGeneration() != null;
BidiWriteObjectRequest req =
takeOver
? storage.getBidiWriteObjectRequestForTakeover(info, opts)
: storage.getBidiWriteObjectRequest(info, opts);

BidiAppendableWrite baw = new BidiAppendableWrite(req, takeOver);

WritableByteChannelSession<AppendableObjectBufferedWritableByteChannel, BidiWriteObjectResponse>
build =
ResumableMedia.gapic()
.write()
.bidiByteChannel(storage.storageClient.bidiWriteObjectCallable())
.setHasher(this.getHasher())
.setByteStringStrategy(ByteStringStrategy.copy())
.appendable()
.withRetryConfig(
storage.retrier.withAlg(
new BasicResultRetryAlgorithm<Object>() {
@Override
public boolean shouldRetry(
Throwable previousThrowable, Object previousResponse) {
// TODO: remove this later once the redirects are not handled by the
// retry loop
ApiException apiEx = null;
if (previousThrowable instanceof StorageException) {
StorageException se = (StorageException) previousThrowable;
Throwable cause = se.getCause();
if (cause instanceof ApiException) {
apiEx = (ApiException) cause;
}
}
if (apiEx instanceof AbortedException) {
return true;
}
return storage
.retryAlgorithmManager
.idempotent()
.shouldRetry(previousThrowable, null);
}
}))
.buffered(this.getFlushPolicy())
.setStartAsync(ApiFutures.immediateFuture(baw))
.setGetCallable(storage.storageClient.getObjectCallable())
.setFinalizeOnClose(this.closeAction == CloseAction.FINALIZE_WHEN_CLOSING)
.build();

return new BlobAppendableUploadImpl(
new DefaultBlobWriteSessionConfig.DecoratedWritableByteChannelSession<>(
build, BidiBlobWriteSessionConfig.Factory.WRITE_OBJECT_RESPONSE_BLOB_INFO_DECODER));
}
}
Loading