Skip to content

Commit

Permalink
chore: implement automatic cleanup on failure in ParallelCompositeUpl…
Browse files Browse the repository at this point in the history
…oadWritableByteChannel
  • Loading branch information
BenWhitehead committed Sep 29, 2023
1 parent 0ae060b commit 0a6919b
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -534,36 +534,42 @@ public boolean delete(String bucket, String blob, BlobSourceOption... options) {

@Override
public boolean delete(BlobId blob, BlobSourceOption... options) {
Opts<ObjectSourceOpt> opts = Opts.unwrap(options).resolveFrom(blob).prepend(defaultOpts);
GrpcCallContext grpcCallContext =
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
DeleteObjectRequest.Builder builder =
DeleteObjectRequest.newBuilder()
.setBucket(bucketNameCodec.encode(blob.getBucket()))
.setObject(blob.getName());
ifNonNull(blob.getGeneration(), builder::setGeneration);
DeleteObjectRequest req = opts.deleteObjectsRequest().apply(builder).build();
GrpcCallContext merge = Utils.merge(grpcCallContext, Retrying.newCallContext());
return Boolean.TRUE.equals(
Retrying.run(
getOptions(),
retryAlgorithmManager.getFor(req),
() -> {
try {
storageClient.deleteObjectCallable().call(req, merge);
return true;
} catch (NotFoundException e) {
return false;
}
},
Decoder.identity()));
Opts<ObjectSourceOpt> opts = Opts.unwrap(options);
try {
internalObjectDelete(blob, opts);
return true;
} catch (NotFoundException e) {
return false;
}
}

@Override
public boolean delete(BlobId blob) {
return delete(blob, new BlobSourceOption[0]);
}

@Override
public Void internalObjectDelete(BlobId id, Opts<ObjectSourceOpt> opts) {
Opts<ObjectSourceOpt> finalOpts = opts.resolveFrom(id).prepend(defaultOpts);
GrpcCallContext grpcCallContext =
finalOpts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
DeleteObjectRequest.Builder builder =
DeleteObjectRequest.newBuilder()
.setBucket(bucketNameCodec.encode(id.getBucket()))
.setObject(id.getName());
ifNonNull(id.getGeneration(), builder::setGeneration);
DeleteObjectRequest req = finalOpts.deleteObjectsRequest().apply(builder).build();
GrpcCallContext merge = Utils.merge(grpcCallContext, Retrying.newCallContext());
return Retrying.run(
getOptions(),
retryAlgorithmManager.getFor(req),
() -> {
storageClient.deleteObjectCallable().call(req, merge);
return null;
},
Decoder.identity());
}

@Override
public Blob compose(ComposeRequest composeRequest) {
Opts<ObjectTargetOpt> opts = composeRequest.getTargetOpts().prepend(defaultOpts);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.rpc.ApiExceptionFactory;
import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.BaseServiceException;
import com.google.cloud.storage.ApiFutureUtils.OnFailureApiFutureCallback;
import com.google.cloud.storage.ApiFutureUtils.OnSuccessApiFutureCallback;
Expand Down Expand Up @@ -117,6 +118,7 @@ final class ParallelCompositeUploadWritableByteChannel implements BufferedWritab

// immutable bootstrapped state
private final Opts<ObjectTargetOpt> partOpts;
private final Opts<ObjectSourceOpt> srcOpts;
private final AsyncAppendingQueue<BlobInfo> queue;
private final FailureForwarder failureForwarder;
// mutable running state
Expand Down Expand Up @@ -154,6 +156,7 @@ final class ParallelCompositeUploadWritableByteChannel implements BufferedWritab
this.totalObjectOffset = 0;

this.partOpts = getPartOpts(opts);
this.srcOpts = partOpts.transformTo(ObjectSourceOpt.class);
this.cumulativeHasher = Hashing.crc32c().newHasher();
this.failureForwarder = new FailureForwarder();
}
Expand Down Expand Up @@ -250,14 +253,7 @@ public synchronized void close() throws IOException {
if (partCleanupStrategy.isDeleteOnError()) {
ApiFuture<BlobInfo> cleaningFuture =
ApiFutures.catchingAsync(
validatingTransform,
Throwable.class,
t -> {
// todo:
return ApiFutures.immediateFailedFuture(t);
},
exec);
// todo: verify this gets the first failure and not one from cleaning
validatingTransform, Throwable.class, this::asyncCleanupAfterFailure, exec);
ApiFutures.addCallback(cleaningFuture, failureForwarder, exec);
} else {
ApiFutures.addCallback(validatingTransform, failureForwarder, exec);
Expand Down Expand Up @@ -288,7 +284,6 @@ private void internalFlush(ByteBuffer buf) {
// a precondition failure usually means the part was created, but we didn't get the
// response. And when we tried to retry the object already exists.
if (e.getCode() == 412) {
Opts<ObjectSourceOpt> srcOpts = partOpts.transformTo(ObjectSourceOpt.class);
return storage.internalObjectGet(info.getBlobId(), srcOpts);
} else {
throw e;
Expand Down Expand Up @@ -320,23 +315,46 @@ private void internalFlush(ByteBuffer buf) {
}

Throwable cause = e.getCause();
// create our exception containing information about the upload context
ParallelCompositeUploadException pcue =
buildParallelCompositeUploadException(cause, exec, pendingParts, successfulParts);
BaseServiceException storageException = StorageException.coalesce(pcue);

// asynchronously fail the finalObject future
CancellationException cancellationException =
new CancellationException(storageException.getMessage());
cancellationException.initCause(storageException);
ApiFutures.addCallback(
ApiFutures.immediateFailedFuture(cancellationException),
(OnFailureApiFutureCallback<BlobInfo>) failureForwarder::onFailure,
exec);
BaseServiceException storageException;
if (partCleanupStrategy.isDeleteOnError()) {
// TODO: cleanup
storageException = StorageException.coalesce(cause);
ApiFuture<Object> cleanupFutures = asyncCleanupAfterFailure(storageException);
// asynchronously fail the finalObject future
CancellationException cancellationException =
new CancellationException(storageException.getMessage());
cancellationException.initCause(storageException);
ApiFutures.addCallback(
cleanupFutures,
new ApiFutureCallback<Object>() {
@Override
public void onFailure(Throwable throwable) {
cancellationException.addSuppressed(throwable);
failureForwarder.onFailure(cancellationException);
}

@Override
public void onSuccess(Object o) {
failureForwarder.onFailure(cancellationException);
}
},
exec);
// this will throw out if anything fails
ApiFutureUtils.await(cleanupFutures);
} else {
// create our exception containing information about the upload context
ParallelCompositeUploadException pcue =
buildParallelCompositeUploadException(cause, exec, pendingParts, successfulParts);
storageException = StorageException.coalesce(pcue);
// asynchronously fail the finalObject future
CancellationException cancellationException =
new CancellationException(storageException.getMessage());
cancellationException.initCause(storageException);
ApiFutures.addCallback(
ApiFutures.immediateFailedFuture(cancellationException),
(OnFailureApiFutureCallback<BlobInfo>) failureForwarder::onFailure,
exec);
throw storageException;
}
throw storageException;
} finally {
current = null;
}
Expand Down Expand Up @@ -383,13 +401,16 @@ private ApiFuture<BlobInfo> cleanupParts(BlobInfo finalInfo) {
successfulParts.stream()
// make sure we don't delete the object we're wanting to create
.filter(id -> !id.equals(finalInfo.getBlobId()))
.map(ApiFutures::immediateFuture)
.map(f -> ApiFutures.transform(f, storage::delete, exec))
.map(this::deleteAsync)
.collect(Collectors.toList());

ApiFuture<List<Boolean>> deletes2 = ApiFutureUtils.quietAllAsList(deletes);

return ApiFutures.transform(deletes2, ignore -> finalInfo, exec);
return ApiFutures.catchingAsync(
ApiFutures.transform(deletes2, ignore -> finalInfo, exec),
Throwable.class,
cause -> ApiFutures.immediateFailedFuture(StorageException.coalesce(cause)),
exec);
}

private BlobInfo definePart(BlobInfo ultimateObject, PartRange partRange, long offset) {
Expand All @@ -409,6 +430,75 @@ private BlobInfo definePart(BlobInfo ultimateObject, PartRange partRange, long o
return b.build();
}

private <R> ApiFuture<R> asyncCleanupAfterFailure(Throwable originalFailure) {
ApiFuture<ImmutableList<BlobId>> pendingAndSuccessfulBlobIds =
getPendingAndSuccessfulBlobIds(exec, pendingParts, successfulParts);
return ApiFutures.transformAsync(
pendingAndSuccessfulBlobIds,
blobIds -> {
ImmutableList<ApiFuture<Boolean>> pendingDeletes =
blobIds.stream().map(this::deleteAsync).collect(ImmutableList.toImmutableList());

ApiFuture<List<Boolean>> futureDeleteResults =
ApiFutures.successfulAsList(pendingDeletes);

return ApiFutures.transformAsync(
futureDeleteResults,
deleteResults -> {
List<BlobId> failedDeletes = new ArrayList<>();
for (int i = 0; i < blobIds.size(); i++) {
BlobId id = blobIds.get(i);
Boolean deleteResult = deleteResults.get(i);
// deleteResult not equal to true means the request completed but was
// unsuccessful
// deleteResult being null means the future failed
if (!Boolean.TRUE.equals(deleteResult)) {
failedDeletes.add(id);
}
}

if (!failedDeletes.isEmpty()) {
String failedGsUris =
failedDeletes.stream()
.map(BlobId::toGsUtilUriWithGeneration)
.collect(Collectors.joining(",\n", "[\n", "\n]"));

String message =
String.format(
"Incomplete parallel composite upload cleanup after previous error. Unknown object ids: %s",
failedGsUris);
StorageException storageException = new StorageException(0, message, null);
originalFailure.addSuppressed(storageException);
}
return ApiFutures.immediateFailedFuture(originalFailure);
},
exec);
},
exec);
}

@NonNull
private ApiFuture<Boolean> deleteAsync(BlobId id) {
return ApiFutures.transform(
ApiFutures.immediateFuture(id),
v -> {
try {
storage.internalObjectDelete(v, srcOpts);
return true;
} catch (NotFoundException e) {
// not found means the part doesn't exist, which is what we want
return true;
} catch (StorageException e) {
if (e.getCode() == 404) {
return true;
} else {
throw e;
}
}
},
exec);
}

@VisibleForTesting
@NonNull
static Opts<ObjectTargetOpt> getPartOpts(Opts<ObjectTargetOpt> opts) {
Expand Down Expand Up @@ -468,6 +558,15 @@ static ParallelCompositeUploadException buildParallelCompositeUploadException(
Executor exec,
List<ApiFuture<BlobInfo>> pendingParts,
List<BlobId> successfulParts) {
ApiFuture<ImmutableList<BlobId>> fCreatedObjects =
getPendingAndSuccessfulBlobIds(exec, pendingParts, successfulParts);

return ParallelCompositeUploadException.of(cause, fCreatedObjects);
}

@NonNull
private static ApiFuture<ImmutableList<BlobId>> getPendingAndSuccessfulBlobIds(
Executor exec, List<ApiFuture<BlobInfo>> pendingParts, List<BlobId> successfulParts) {
ApiFuture<List<BlobInfo>> successfulList = ApiFutures.successfulAsList(pendingParts);
// suppress any failure that might happen when waiting for any pending futures to resolve
ApiFuture<List<BlobInfo>> catching =
Expand All @@ -490,7 +589,6 @@ static ParallelCompositeUploadException buildParallelCompositeUploadException(
.distinct()
.collect(ImmutableList.toImmutableList()),
exec);

return ParallelCompositeUploadException.of(cause, fCreatedObjects);
return fCreatedObjects;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ default BlobInfo internalDirectUpload(BlobInfo info, Opts<ObjectTargetOpt> opts,
throw new UnsupportedOperationException("not implemented");
}

default boolean delete(BlobId id) {
// Void to allow easier mapping/use within streams and other mapping contexts
@SuppressWarnings("UnusedReturnValue")
default Void internalObjectDelete(BlobId id, Opts<ObjectSourceOpt> opts) {
throw new UnsupportedOperationException("not implemented");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ public void badServerCrc32cResultsInException() throws Exception {
bufferHandlePool,
MoreExecutors.directExecutor(),
partNamingStrategy,
PartCleanupStrategy.never(),
PartCleanupStrategy.always(),
3,
finalObject,
new FakeStorageInternal() {
Expand Down Expand Up @@ -839,9 +839,13 @@ public BlobInfo compose(ComposeRequest composeRequest) {
}

@Override
public boolean delete(BlobId id) {
public Void internalObjectDelete(BlobId id, Opts<ObjectSourceOpt> opts) {
deleteRequests.add(id);
return addedObjects.containsKey(id);
boolean containsKey = addedObjects.containsKey(id);
if (!containsKey) {
throw ApiExceptionFactory.createException(null, GrpcStatusCode.of(Code.NOT_FOUND), false);
}
return null;
}

@Override
Expand Down

0 comments on commit 0a6919b

Please sign in to comment.