Skip to content

Commit

Permalink
Minor refactoring in CopyObjectHelper (#3552)
Browse files Browse the repository at this point in the history
  • Loading branch information
zoewangg authored Nov 17, 2022
1 parent 64d5ee9 commit e7a3a12
Showing 1 changed file with 92 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
import software.amazon.awssdk.services.s3.model.CompletedPart;
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
Expand All @@ -36,6 +38,7 @@
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import software.amazon.awssdk.services.s3.model.UploadPartCopyRequest;
import software.amazon.awssdk.services.s3.model.UploadPartCopyResponse;
import software.amazon.awssdk.utils.CompletableFutureUtils;
import software.amazon.awssdk.utils.Logger;
Expand Down Expand Up @@ -74,23 +77,28 @@ public CompletableFuture<CopyObjectResponse> copyObject(CopyObjectRequest copyOb
if (throwable != null) {
handleException(returnFuture, () -> "Failed to retrieve metadata from the source object", throwable);
} else {
Long contentLength = headObjectResponse.contentLength();

long partSize = s3NativeClientConfiguration.partSizeBytes();

if (contentLength <= partSize) {
log.debug(() -> "Starting the copy as a single copy part request");
copyInOneChunk(copyObjectRequest, returnFuture);
} else {
log.debug(() -> "Starting the copy as multipart copy request");
copyInParts(copyObjectRequest, contentLength, returnFuture);
}
doCopyObject(copyObjectRequest, returnFuture, headObjectResponse);
}
});

return returnFuture;
}

private void doCopyObject(CopyObjectRequest copyObjectRequest, CompletableFuture<CopyObjectResponse> returnFuture,
HeadObjectResponse headObjectResponse) {
Long contentLength = headObjectResponse.contentLength();

long partSize = s3NativeClientConfiguration.partSizeBytes();

if (contentLength <= partSize) {
log.debug(() -> "Starting the copy as a single copy part request");
copyInOneChunk(copyObjectRequest, returnFuture);
} else {
log.debug(() -> "Starting the copy as multipart copy request");
copyInParts(copyObjectRequest, contentLength, returnFuture);
}
}

private void copyInParts(CopyObjectRequest copyObjectRequest,
Long contentLength,
CompletableFuture<CopyObjectResponse> returnFuture) {
Expand Down Expand Up @@ -136,40 +144,54 @@ private void doCopyInParts(CopyObjectRequest copyObjectRequest,
completedParts,
optimalPartSize);
CompletableFutureUtils.allOfExceptionForwarded(futures.toArray(new CompletableFuture[0]))
.thenCompose(ignore -> {
log.debug(() -> String.format("Sending completeMultipartUploadRequest, uploadId: %s",
uploadId));
CompletedPart[] parts =
IntStream.range(0, completedParts.length())
.mapToObj(completedParts::get)
.toArray(CompletedPart[]::new);
CompleteMultipartUploadRequest completeMultipartUploadRequest =
CompleteMultipartUploadRequest.builder()
.bucket(copyObjectRequest.destinationBucket())
.key(copyObjectRequest.destinationKey())
.uploadId(uploadId)
.multipartUpload(CompletedMultipartUpload.builder()
.parts(parts)
.build())
.build();

return s3AsyncClient.completeMultipartUpload(completeMultipartUploadRequest);
}).handle((completeMultipartUploadResponse, throwable) -> {
if (throwable != null) {
cleanUpParts(copyObjectRequest, uploadId);
handleException(returnFuture, () -> "Failed to send multipart copy requests.",
throwable);
return null;
}

return returnFuture.complete(CopyRequestConversionUtils.toCopyObjectResponse(
completeMultipartUploadResponse));
}).exceptionally(throwable -> {
.thenCompose(ignore -> completeMultipartUpload(copyObjectRequest, uploadId, completedParts))
.handle(handleExceptionOrResponse(copyObjectRequest, returnFuture, uploadId))
.exceptionally(throwable -> {
handleException(returnFuture, () -> "Unexpected exception occurred", throwable);
return null;
});
}

private BiFunction<CompleteMultipartUploadResponse, Throwable, Void> handleExceptionOrResponse(
CopyObjectRequest copyObjectRequest,
CompletableFuture<CopyObjectResponse> returnFuture,
String uploadId) {

return (completeMultipartUploadResponse, throwable) -> {
if (throwable != null) {
cleanUpParts(copyObjectRequest, uploadId);
handleException(returnFuture, () -> "Failed to send multipart copy requests.",
throwable);
} else {
returnFuture.complete(CopyRequestConversionUtils.toCopyObjectResponse(
completeMultipartUploadResponse));
}

return null;
};
}

private CompletableFuture<CompleteMultipartUploadResponse> completeMultipartUpload(
CopyObjectRequest copyObjectRequest, String uploadId, AtomicReferenceArray<CompletedPart> completedParts) {
log.debug(() -> String.format("Sending completeMultipartUploadRequest, uploadId: %s",
uploadId));
CompletedPart[] parts =
IntStream.range(0, completedParts.length())
.mapToObj(completedParts::get)
.toArray(CompletedPart[]::new);
CompleteMultipartUploadRequest completeMultipartUploadRequest =
CompleteMultipartUploadRequest.builder()
.bucket(copyObjectRequest.destinationBucket())
.key(copyObjectRequest.destinationKey())
.uploadId(uploadId)
.multipartUpload(CompletedMultipartUpload.builder()
.parts(parts)
.build())
.build();

return s3AsyncClient.completeMultipartUpload(completeMultipartUploadRequest);
}

private void cleanUpParts(CopyObjectRequest copyObjectRequest, String uploadId) {
AbortMultipartUploadRequest abortMultipartUploadRequest =
CopyRequestConversionUtils.toAbortMultipartUploadRequest(copyObjectRequest, uploadId);
Expand Down Expand Up @@ -211,29 +233,41 @@ private List<CompletableFuture<CompletedPart>> sendUploadPartCopyRequests(CopyOb
copyObjectRequest,
contentLength);

uploadPartCopyRequests.stream().forEach(uploadPartCopyRequest -> {
Integer partNumber = uploadPartCopyRequest.partNumber();
log.debug(() -> "Sending uploadPartCopyRequest with range: " + uploadPartCopyRequest.copySourceRange() + " uploadId: "
+ uploadId);
uploadPartCopyRequests.forEach(uploadPartCopyRequest ->
sendIndividualUploadPartCopy(uploadId, completedParts, futures,
uploadPartCopyRequest));

return futures;
}

CompletableFuture<UploadPartCopyResponse> uploadPartCopyFuture = s3AsyncClient.uploadPartCopy(uploadPartCopyRequest);
private void sendIndividualUploadPartCopy(String uploadId,
AtomicReferenceArray<CompletedPart> completedParts,
List<CompletableFuture<CompletedPart>> futures,
UploadPartCopyRequest uploadPartCopyRequest) {
Integer partNumber = uploadPartCopyRequest.partNumber();
log.debug(() -> "Sending uploadPartCopyRequest with range: " + uploadPartCopyRequest.copySourceRange() + " uploadId: "
+ uploadId);

CompletableFuture<CompletedPart> convertFuture =
uploadPartCopyFuture.thenApply(uploadPartCopyResponse -> {
CopyPartResult copyPartResult = uploadPartCopyResponse.copyPartResult();
CompletedPart completedPart =
CopyRequestConversionUtils.toCompletedPart(copyPartResult,
partNumber);
CompletableFuture<UploadPartCopyResponse> uploadPartCopyFuture = s3AsyncClient.uploadPartCopy(uploadPartCopyRequest);

completedParts.set(partNumber - 1, completedPart);
return completedPart;
});
futures.add(convertFuture);
CompletableFuture<CompletedPart> convertFuture =
uploadPartCopyFuture.thenApply(uploadPartCopyResponse ->
convertUploadPartCopyResponse(completedParts, partNumber, uploadPartCopyResponse));
futures.add(convertFuture);

CompletableFutureUtils.forwardExceptionTo(convertFuture, uploadPartCopyFuture);
});
CompletableFutureUtils.forwardExceptionTo(convertFuture, uploadPartCopyFuture);
}

return futures;
private static CompletedPart convertUploadPartCopyResponse(AtomicReferenceArray<CompletedPart> completedParts,
Integer partNumber,
UploadPartCopyResponse uploadPartCopyResponse) {
CopyPartResult copyPartResult = uploadPartCopyResponse.copyPartResult();
CompletedPart completedPart =
CopyRequestConversionUtils.toCompletedPart(copyPartResult,
partNumber);

completedParts.set(partNumber - 1, completedPart);
return completedPart;
}

/**
Expand Down

0 comments on commit e7a3a12

Please sign in to comment.