From fe83c799dabe50cf68d53b6bcba9f9886fdfd4e4 Mon Sep 17 00:00:00 2001 From: Zoe Wang <33073555+zoewangg@users.noreply.github.com> Date: Fri, 11 Mar 2022 16:58:46 -0800 Subject: [PATCH 1/5] Part 1: Implement pause for downloadFile operation --- .../awssdk/transfer/s3/FileDownload.java | 12 +- .../transfer/s3/PersistableFileDownload.java | 139 ++++++++++++++++++ .../transfer/s3/PersistableTransfer.java | 25 ++++ .../awssdk/transfer/s3/S3TransferManager.java | 11 ++ .../s3/exception/TransferPauseException.java | 99 +++++++++++++ .../s3/internal/DefaultFileDownload.java | 50 ++++++- .../s3/internal/DefaultS3TransferManager.java | 15 +- .../progress/DownloadFileMonitor.java | 72 +++++++++ .../progress/TransferListenerContext.java | 16 +- .../TransferListenerFailedContext.java | 6 + .../progress/TransferProgressUpdater.java | 33 +++-- .../s3/progress/TransferListener.java | 3 + .../s3/internal/DefaultFileDownloadTest.java | 121 ++++++++++++++- ...nloadDirectoryHelperParameterizedTest.java | 8 +- .../internal/DownloadDirectoryHelperTest.java | 9 +- 15 files changed, 594 insertions(+), 25 deletions(-) create mode 100644 services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/PersistableFileDownload.java create mode 100644 services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/PersistableTransfer.java create mode 100644 services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/exception/TransferPauseException.java create mode 100644 services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/DownloadFileMonitor.java diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/FileDownload.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/FileDownload.java index 578124a55e3a..d06629993550 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/FileDownload.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/FileDownload.java @@ -18,6 +18,7 @@ import java.util.concurrent.CompletableFuture; import software.amazon.awssdk.annotations.SdkPreviewApi; import software.amazon.awssdk.annotations.SdkPublicApi; +import software.amazon.awssdk.transfer.s3.exception.TransferPauseException; /** * A download transfer of a single object from S3. @@ -25,7 +26,16 @@ @SdkPublicApi @SdkPreviewApi public interface FileDownload extends ObjectTransfer { - + + /** + * Pause the current download operation and returns the information that can + * be used to resume the download at a later time. + * + * @return {@link PersistableFileDownload} that can be used to resume the download + * @throws TransferPauseException if pause fails + */ + PersistableFileDownload pause(); + @Override CompletableFuture completionFuture(); } diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/PersistableFileDownload.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/PersistableFileDownload.java new file mode 100644 index 000000000000..f08139ef6b8e --- /dev/null +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/PersistableFileDownload.java @@ -0,0 +1,139 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.transfer.s3; + +import java.time.Instant; +import software.amazon.awssdk.annotations.SdkPublicApi; +import software.amazon.awssdk.utils.Validate; +import software.amazon.awssdk.utils.builder.CopyableBuilder; +import software.amazon.awssdk.utils.builder.ToCopyableBuilder; + +/** + * An opaque token that holds the state and can be used to resume a + * paused download operation. + * + * @see S3TransferManager#downloadFile(DownloadFileRequest) + */ +@SdkPublicApi +public final class PersistableFileDownload implements PersistableTransfer, + ToCopyableBuilder { + private final DownloadFileRequest downloadFileRequest; + private final long bytesTransferred; + private final Instant lastModified; + + private PersistableFileDownload(DefaultBuilder builder) { + this.downloadFileRequest = builder.downloadFileRequest; + this.bytesTransferred = Validate.paramNotNull(builder.bytesTransferred, "bytesTransferred"); + this.lastModified = Validate.paramNotNull(builder.lastModified, "lastModified"); + } + + public static Builder builder() { + return new DefaultBuilder(); + } + + /** + * @return the {@link DownloadFileRequest} to resume + */ + public DownloadFileRequest downloadFileRequest() { + return downloadFileRequest; + } + + /** + * Retrieve the number of bytes that have been transferred. + * @return the number of bytes + */ + public long bytesTransferred() { + return bytesTransferred; + } + + /** + * Last modified time on Amazon S3 for this object. + */ + public Instant lastModified() { + return lastModified; + } + + @Override + public Builder toBuilder() { + return new DefaultBuilder(this); + } + + public interface Builder extends CopyableBuilder { + + /** + * Sets the download file request + * + * @param downloadFileRequest the download file request + * @return a reference to this object so that method calls can be chained together. + */ + Builder downloadFileRequest(DownloadFileRequest downloadFileRequest); + + /** + * Sets the number of bytes transferred + * + * @param bytesTransferred the number of bytes transferred + * @return a reference to this object so that method calls can be chained together. + */ + Builder bytesTransferred(Long bytesTransferred); + + /** + * Sets the last modified time of the object + * + * @param lastModified the last modified time of the object + * @return a reference to this object so that method calls can be chained together. + */ + Builder lastModified(Instant lastModified); + } + + private static final class DefaultBuilder implements Builder { + private DownloadFileRequest downloadFileRequest; + private Long bytesTransferred; + private Instant lastModified; + + private DefaultBuilder() { + + } + + private DefaultBuilder(PersistableFileDownload persistableFileDownload) { + this.downloadFileRequest = persistableFileDownload.downloadFileRequest; + this.bytesTransferred = persistableFileDownload.bytesTransferred; + this.lastModified = persistableFileDownload.lastModified; + } + + @Override + public Builder downloadFileRequest(DownloadFileRequest downloadFileRequest) { + this.downloadFileRequest = downloadFileRequest; + return this; + } + + @Override + public Builder bytesTransferred(Long bytesTransferred) { + this.bytesTransferred = bytesTransferred; + return this; + } + + @Override + public Builder lastModified(Instant lastModified) { + this.lastModified = lastModified; + return this; + } + + @Override + public PersistableFileDownload build() { + return new PersistableFileDownload(this); + } + } +} diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/PersistableTransfer.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/PersistableTransfer.java new file mode 100644 index 000000000000..9fd299514913 --- /dev/null +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/PersistableTransfer.java @@ -0,0 +1,25 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.transfer.s3; + +/** + * Contains the information of a pausible upload or download; such + * information can be used to resume the upload or download later on + * + * @see FileDownload#pause() + */ +public interface PersistableTransfer { +} diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/S3TransferManager.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/S3TransferManager.java index cb6e32e4d1e8..7962516a80b4 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/S3TransferManager.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/S3TransferManager.java @@ -121,6 +121,17 @@ default FileDownload downloadFile(Consumer request) return downloadFile(DownloadFileRequest.builder().applyMutation(request).build()); } + /** + * Resumes a download operation. This download operation uses the same configuration as the original download. Any data + * already fetched will be skipped, and only the remaining data is retrieved from Amazon S3. + * + * @param persistableFileDownload the download to resume. + * @return A new {@code FileDownload} object to use to check the state of the download. + */ + default FileDownload resumeDownloadFile(PersistableFileDownload persistableFileDownload) { + throw new UnsupportedOperationException(); + } + /** * Download an object identified by the bucket and key from S3 through the given {@link AsyncResponseTransformer}. For * downloading to a file, you may use {@link #downloadFile(DownloadFileRequest)} instead. diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/exception/TransferPauseException.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/exception/TransferPauseException.java new file mode 100644 index 000000000000..9b7b87412615 --- /dev/null +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/exception/TransferPauseException.java @@ -0,0 +1,99 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.transfer.s3.exception; + +import software.amazon.awssdk.annotations.SdkPublicApi; +import software.amazon.awssdk.core.exception.SdkClientException; + +@SdkPublicApi +public final class TransferPauseException extends SdkClientException { + private static final long serialVersionUID = 1L; + private final ErrorCode errorCode; + + private TransferPauseException(Builder builder) { + super(builder); + this.errorCode = builder.errorCode(); + } + + public ErrorCode errorCode() { + return errorCode; + } + + public static TransferPauseException create(ErrorCode error, String message) { + return builder().errorCode(error).message(message).build(); + } + + public static Builder builder() { + return new BuilderImpl(); + } + + public enum ErrorCode { + /** + * Pause is not yet applicable since transfer has not started + */ + NOT_STARTED, + + /** + * Pause is not possible since transfer has finished + */ + ALREADY_FINISHED, + + /** + * Pause is not possible because a previous pause was requested + */ + PAUSE_IN_PROGRESS + } + + public interface Builder extends SdkClientException.Builder { + + Builder errorCode(ErrorCode errorCode); + + @Override + Builder message(String message); + + ErrorCode errorCode(); + + @Override + TransferPauseException build(); + } + + protected static class BuilderImpl extends SdkClientException.BuilderImpl implements Builder { + private ErrorCode errorCode; + + @Override + public Builder errorCode(ErrorCode errorCode) { + this.errorCode = errorCode; + return this; + } + + @Override + public Builder message(String message) { + this.message = message; + return this; + } + + @Override + public ErrorCode errorCode() { + return errorCode; + } + + @Override + public TransferPauseException build() { + return new TransferPauseException(this); + } + + } +} diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileDownload.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileDownload.java index 19b6519cd07b..10cc2be083ab 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileDownload.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileDownload.java @@ -15,11 +15,19 @@ package software.amazon.awssdk.transfer.s3.internal; +import static software.amazon.awssdk.transfer.s3.exception.TransferPauseException.ErrorCode.ALREADY_FINISHED; +import static software.amazon.awssdk.transfer.s3.exception.TransferPauseException.ErrorCode.NOT_STARTED; +import static software.amazon.awssdk.transfer.s3.exception.TransferPauseException.ErrorCode.PAUSE_IN_PROGRESS; + import java.util.Objects; import java.util.concurrent.CompletableFuture; import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.transfer.s3.CompletedFileDownload; import software.amazon.awssdk.transfer.s3.FileDownload; +import software.amazon.awssdk.transfer.s3.PersistableFileDownload; +import software.amazon.awssdk.transfer.s3.exception.TransferPauseException; +import software.amazon.awssdk.transfer.s3.internal.progress.DownloadFileMonitor; import software.amazon.awssdk.transfer.s3.progress.TransferProgress; import software.amazon.awssdk.utils.ToString; @@ -28,11 +36,15 @@ public final class DefaultFileDownload implements FileDownload { private final CompletableFuture completionFuture; private final TransferProgress progress; + private final DownloadFileMonitor monitor; + private volatile boolean paused; DefaultFileDownload(CompletableFuture completionFuture, - TransferProgress progress) { + TransferProgress progress, + DownloadFileMonitor monitor) { this.completionFuture = completionFuture; this.progress = progress; + this.monitor = monitor; } @Override @@ -40,6 +52,35 @@ public TransferProgress progress() { return progress; } + @Override + public PersistableFileDownload pause() { + validatePauseStatus(); + paused = true; + completionFuture.cancel(false); + GetObjectResponse getObjectResponse = monitor.initialResponse().get(); + long bytesTransferred = progress.snapshot().bytesTransferred(); + return PersistableFileDownload.builder() + .downloadFileRequest(monitor.downloadFileRequest()) + .lastModified(getObjectResponse.lastModified()) + .bytesTransferred(bytesTransferred) + .build(); + } + + private void validatePauseStatus() { + if (paused) { + throw TransferPauseException.create(PAUSE_IN_PROGRESS, + "Pause failed because a previous pause was requested"); + } + + if (monitor.isFinished()) { + throw TransferPauseException.create(ALREADY_FINISHED, "Pause failed because the transfer has already finished"); + } + + if (!monitor.initialResponse().isPresent()) { + throw TransferPauseException.create(NOT_STARTED, "Pause failed because the transfer has not been initiated yet. " + + "Please try later."); + } + } @Override public CompletableFuture completionFuture() { @@ -60,12 +101,18 @@ public boolean equals(Object o) { if (!Objects.equals(completionFuture, that.completionFuture)) { return false; } + + if (!Objects.equals(monitor, that.monitor)) { + return false; + } + return Objects.equals(progress, that.progress); } @Override public int hashCode() { int result = completionFuture != null ? completionFuture.hashCode() : 0; + result = 31 * result + (monitor != null ? monitor.hashCode() : 0); result = 31 * result + (progress != null ? progress.hashCode() : 0); return result; } @@ -75,6 +122,7 @@ public String toString() { return ToString.builder("DefaultFileDownload") .add("completionFuture", completionFuture) .add("progress", progress) + .add("monitor", monitor) .build(); } } diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultS3TransferManager.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultS3TransferManager.java index d38aa71ac711..2958cfb90be2 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultS3TransferManager.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultS3TransferManager.java @@ -50,6 +50,7 @@ import software.amazon.awssdk.transfer.s3.UploadDirectoryRequest; import software.amazon.awssdk.transfer.s3.UploadFileRequest; import software.amazon.awssdk.transfer.s3.UploadRequest; +import software.amazon.awssdk.transfer.s3.internal.progress.DownloadFileMonitor; import software.amazon.awssdk.transfer.s3.internal.progress.TransferProgressUpdater; import software.amazon.awssdk.utils.CompletableFutureUtils; import software.amazon.awssdk.utils.Validate; @@ -110,7 +111,7 @@ public Upload upload(UploadRequest uploadRequest) { CompletableFuture returnFuture = new CompletableFuture<>(); - TransferProgressUpdater progressUpdater = new TransferProgressUpdater(uploadRequest, requestBody); + TransferProgressUpdater progressUpdater = new TransferProgressUpdater(uploadRequest, requestBody, null); progressUpdater.transferInitiated(); requestBody = progressUpdater.wrapRequestBody(requestBody); progressUpdater.registerCompletion(returnFuture); @@ -143,7 +144,7 @@ public FileUpload uploadFile(UploadFileRequest uploadFileRequest) { CompletableFuture returnFuture = new CompletableFuture<>(); - TransferProgressUpdater progressUpdater = new TransferProgressUpdater(uploadFileRequest, requestBody); + TransferProgressUpdater progressUpdater = new TransferProgressUpdater(uploadFileRequest, requestBody, null); progressUpdater.transferInitiated(); requestBody = progressUpdater.wrapRequestBody(requestBody); progressUpdater.registerCompletion(returnFuture); @@ -190,7 +191,7 @@ public Download download(DownloadRequest downloadReq CompletableFuture> returnFuture = new CompletableFuture<>(); - TransferProgressUpdater progressUpdater = new TransferProgressUpdater(downloadRequest, null); + TransferProgressUpdater progressUpdater = new TransferProgressUpdater(downloadRequest, null, null); progressUpdater.transferInitiated(); responseTransformer = progressUpdater.wrapResponseTransformer(responseTransformer); progressUpdater.registerCompletion(returnFuture); @@ -224,8 +225,10 @@ public FileDownload downloadFile(DownloadFileRequest downloadRequest) { FileTransformerConfiguration.defaultCreateOrReplaceExisting()); CompletableFuture returnFuture = new CompletableFuture<>(); + DownloadFileMonitor listener = new DownloadFileMonitor(downloadRequest); + CompletableFuture downloadFuture = new CompletableFuture<>(); - TransferProgressUpdater progressUpdater = new TransferProgressUpdater(downloadRequest, null); + TransferProgressUpdater progressUpdater = new TransferProgressUpdater(downloadRequest, null, listener); progressUpdater.transferInitiated(); responseTransformer = progressUpdater.wrapResponseTransformer(responseTransformer); progressUpdater.registerCompletion(returnFuture); @@ -247,7 +250,7 @@ public FileDownload downloadFile(DownloadFileRequest downloadRequest) { returnFuture.completeExceptionally(throwable); } - return new DefaultFileDownload(returnFuture, progressUpdater.progress()); + return new DefaultFileDownload(downloadFuture, progressUpdater.progress(), listener); } @Override @@ -269,7 +272,7 @@ public Copy copy(CopyRequest copyRequest) { CompletableFuture returnFuture = new CompletableFuture<>(); - TransferProgressUpdater progressUpdater = new TransferProgressUpdater(copyRequest, null); + TransferProgressUpdater progressUpdater = new TransferProgressUpdater(copyRequest, null, null); progressUpdater.transferInitiated(); progressUpdater.registerCompletion(returnFuture); diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/DownloadFileMonitor.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/DownloadFileMonitor.java new file mode 100644 index 000000000000..9dd64c373d81 --- /dev/null +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/DownloadFileMonitor.java @@ -0,0 +1,72 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.transfer.s3.internal.progress; + +import java.util.Optional; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.transfer.s3.CompletedFileDownload; +import software.amazon.awssdk.transfer.s3.DownloadFileRequest; +import software.amazon.awssdk.transfer.s3.progress.TransferListener; + +@SdkInternalApi +public class DownloadFileMonitor implements TransferListener { + private final DownloadFileRequest downloadFileRequest; + private GetObjectResponse initialResponse; + private CompletedFileDownload finalResponse; + private boolean isFinished; + + public DownloadFileMonitor(DownloadFileRequest downloadFileRequest) { + this.downloadFileRequest = downloadFileRequest; + } + + @Override + public void transferInitiated(Context.TransferInitiated context) { + + if (context.initialResponse() instanceof GetObjectResponse) { + initialResponse = (GetObjectResponse) context.initialResponse(); + } + } + + /** + * @return whether the current transfer is finished or not + */ + public boolean isFinished() { + return isFinished; + } + + public Optional initialResponse() { + return Optional.of(initialResponse); + } + + @Override + public void transferComplete(Context.TransferComplete context) { + isFinished = true; + finalResponse = (CompletedFileDownload) context.completedTransfer(); + } + + @Override + public void transferFailed(Context.TransferFailed context) { + isFinished = true; + } + + /** + * @return the download file request + */ + public DownloadFileRequest downloadFileRequest() { + return downloadFileRequest; + } +} diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferListenerContext.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferListenerContext.java index 19f409db07d5..981ecb40dee6 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferListenerContext.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferListenerContext.java @@ -17,6 +17,7 @@ import software.amazon.awssdk.annotations.Immutable; import software.amazon.awssdk.annotations.SdkProtectedApi; +import software.amazon.awssdk.core.SdkResponse; import software.amazon.awssdk.transfer.s3.CompletedObjectTransfer; import software.amazon.awssdk.transfer.s3.TransferObjectRequest; import software.amazon.awssdk.transfer.s3.progress.TransferListener; @@ -39,11 +40,13 @@ public final class TransferListenerContext private final TransferObjectRequest request; private final TransferProgressSnapshot progressSnapshot; private final CompletedObjectTransfer completedTransfer; + private final SdkResponse initialResponse; private TransferListenerContext(Builder builder) { this.request = builder.request; this.progressSnapshot = builder.progressSnapshot; this.completedTransfer = builder.completedTransfer; + this.initialResponse = builder.initialResponse; } public static Builder builder() { @@ -60,6 +63,11 @@ public TransferObjectRequest request() { return request; } + @Override + public SdkResponse initialResponse() { + return initialResponse; + } + @Override public TransferProgressSnapshot progressSnapshot() { return progressSnapshot; @@ -76,6 +84,7 @@ public String toString() { .add("request", request) .add("progressSnapshot", progressSnapshot) .add("completedTransfer", completedTransfer) + .add("initialResponse", initialResponse) .build(); } @@ -83,9 +92,9 @@ public static final class Builder implements CopyableBuilder endOfStreamFuture; - public TransferProgressUpdater(TransferObjectRequest request, AsyncRequestBody requestBody) { + public TransferProgressUpdater(TransferObjectRequest request, + AsyncRequestBody requestBody, + TransferListener listener) { DefaultTransferProgressSnapshot.Builder snapshotBuilder = DefaultTransferProgressSnapshot.builder(); getContentLengthSafe(requestBody).ifPresent(snapshotBuilder::transferSizeInBytes); TransferProgressSnapshot snapshot = snapshotBuilder.build(); @@ -53,9 +56,17 @@ public TransferProgressUpdater(TransferObjectRequest request, AsyncRequestBody r .request(request) .progressSnapshot(snapshot) .build(); - listeners = new TransferListenerInvoker(request.overrideConfiguration() - .map(TransferRequestOverrideConfiguration::listeners) - .orElseGet(Collections::emptyList)); + + List listeners = new ArrayList<>(); + if (listener != null) { + listeners.add(listener); + } + + request.overrideConfiguration() + .map(TransferRequestOverrideConfiguration::listeners) + .ifPresent(listeners::addAll); + + listenerInvoker = new TransferListenerInvoker(listeners); endOfStreamFuture = new CompletableFuture<>(); } @@ -64,7 +75,7 @@ public TransferProgress progress() { } public void transferInitiated() { - listeners.transferInitiated(context); + listenerInvoker.transferInitiated(context); } public AsyncRequestBody wrapRequestBody(AsyncRequestBody requestBody) { @@ -140,7 +151,7 @@ private void incrementBytesTransferred(int numBytes) { TransferProgressSnapshot snapshot = progress.updateAndGet(b -> { b.bytesTransferred(b.getBytesTransferred() + numBytes); }); - listeners.bytesTransferred(context.copy(b -> b.progressSnapshot(snapshot))); + listenerInvoker.bytesTransferred(context.copy(b -> b.progressSnapshot(snapshot))); } public void registerCompletion(CompletableFuture future) { @@ -160,7 +171,7 @@ public void registerCompletion(CompletableFuture { + listenerInvoker.transferComplete(context.copy(b -> { TransferProgressSnapshot snapshot = progress.snapshot(); b.progressSnapshot(snapshot); b.completedTransfer(r); @@ -168,8 +179,8 @@ private void transferComplete(CompletedObjectTransfer r) { } private void transferFailed(Throwable t) { - listeners.transferFailed(TransferListenerFailedContext.builder() - .transferContext(context.copy(b -> { + listenerInvoker.transferFailed(TransferListenerFailedContext.builder() + .transferContext(context.copy(b -> { b.progressSnapshot(progress.snapshot()); })) .exception(t) diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/progress/TransferListener.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/progress/TransferListener.java index 925f77b1b8b8..afd65e62b041 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/progress/TransferListener.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/progress/TransferListener.java @@ -21,6 +21,7 @@ import software.amazon.awssdk.annotations.SdkProtectedApi; import software.amazon.awssdk.annotations.SdkPublicApi; import software.amazon.awssdk.annotations.ThreadSafe; +import software.amazon.awssdk.core.SdkResponse; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; import software.amazon.awssdk.transfer.s3.CompletedFileDownload; import software.amazon.awssdk.transfer.s3.CompletedFileUpload; @@ -204,6 +205,8 @@ public interface TransferInitiated { */ TransferObjectRequest request(); + SdkResponse initialResponse(); + /** * The immutable {@link TransferProgressSnapshot} for this specific update. */ diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileDownloadTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileDownloadTest.java index be96e0b8bd0a..6908ddf51f2e 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileDownloadTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileDownloadTest.java @@ -15,16 +15,133 @@ package software.amazon.awssdk.transfer.s3.internal; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static software.amazon.awssdk.transfer.s3.exception.TransferPauseException.ErrorCode.ALREADY_FINISHED; +import static software.amazon.awssdk.transfer.s3.exception.TransferPauseException.ErrorCode.PAUSE_IN_PROGRESS; + +import java.nio.file.Paths; +import java.time.Instant; +import java.util.concurrent.CompletableFuture; import nl.jqno.equalsverifier.EqualsVerifier; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.transfer.s3.CompletedFileDownload; +import software.amazon.awssdk.transfer.s3.DownloadFileRequest; +import software.amazon.awssdk.transfer.s3.PersistableFileDownload; +import software.amazon.awssdk.transfer.s3.exception.TransferPauseException; +import software.amazon.awssdk.transfer.s3.internal.progress.DefaultTransferProgressSnapshot; +import software.amazon.awssdk.transfer.s3.internal.progress.DownloadFileMonitor; +import software.amazon.awssdk.transfer.s3.internal.progress.TransferListenerContext; +import software.amazon.awssdk.transfer.s3.progress.TransferProgress; -public class DefaultFileDownloadTest { +class DefaultFileDownloadTest { @Test - public void equals_hashcode() { + void equals_hashcode() { EqualsVerifier.forClass(DefaultFileDownload.class) .withNonnullFields("completionFuture", "progress") + .withIgnoredFields("paused") .verify(); } + @Test + void pause_shouldReturnCorrectly() { + CompletableFuture future = + new CompletableFuture<>(); + TransferProgress transferProgress = Mockito.mock(TransferProgress.class); + + Mockito.when(transferProgress.snapshot()).thenReturn(DefaultTransferProgressSnapshot.builder() + .bytesTransferred(1000L) + .build()); + + DownloadFileMonitor downloadFileMonitor = monitorForInProgressTransfer(); + + DefaultFileDownload fileDownload = new DefaultFileDownload(future, + transferProgress, + downloadFileMonitor); + + PersistableFileDownload pause = fileDownload.pause(); + assertThat(pause.downloadFileRequest()).isEqualTo(downloadFileMonitor.downloadFileRequest()); + assertThat(pause.bytesTransferred()).isEqualTo(1000L); + assertThat(pause.lastModified()).isEqualTo(downloadFileMonitor.initialResponse().get().lastModified()); + } + + @Test + void pause_transferAlreadyFinished_shouldThrowException() { + CompletableFuture future = + CompletableFuture.completedFuture(CompletedFileDownload.builder() + .response(GetObjectResponse.builder().build()) + .build()); + TransferProgress transferProgress = Mockito.mock(TransferProgress.class); + + + DownloadFileMonitor downloadFileMonitor = monitorForCompletedTransfer(); + + DefaultFileDownload fileDownload = new DefaultFileDownload(future, + transferProgress, + downloadFileMonitor); + + assertThatThrownBy(fileDownload::pause).isInstanceOf(TransferPauseException.class) + .satisfies(e -> assertThat(((TransferPauseException) e).errorCode()) + .isEqualTo(ALREADY_FINISHED)); + + } + + + @Test + void pauseTwice_shouldThrowException() { + CompletableFuture future = + new CompletableFuture<>(); + TransferProgress transferProgress = Mockito.mock(TransferProgress.class); + Mockito.when(transferProgress.snapshot()).thenReturn(DefaultTransferProgressSnapshot.builder() + .bytesTransferred(1000L) + .build()); + DownloadFileMonitor downloadFileMonitor = monitorForInProgressTransfer(); + + + DefaultFileDownload fileDownload = new DefaultFileDownload(future, + transferProgress, + downloadFileMonitor); + + fileDownload.pause(); + + + assertThatThrownBy(fileDownload::pause).isInstanceOf(TransferPauseException.class) + .satisfies(e -> assertThat(((TransferPauseException) e).errorCode()) + .isEqualTo(PAUSE_IN_PROGRESS)); + + } + + private DownloadFileMonitor monitorForCompletedTransfer() { + DownloadFileRequest downloadFileRequest = getDownloadFileRequest(); + DownloadFileMonitor downloadFileMonitor = new DownloadFileMonitor(downloadFileRequest); + downloadFileMonitor.transferComplete(TransferListenerContext.builder().build()); + return downloadFileMonitor; + } + + private DownloadFileMonitor monitorForInProgressTransfer() { + DownloadFileRequest downloadFileRequest = getDownloadFileRequest(); + DownloadFileMonitor downloadFileMonitor = new DownloadFileMonitor(downloadFileRequest); + GetObjectResponse getObjectResponse = GetObjectResponse.builder() + .lastModified(Instant.now()) + .build(); + + downloadFileMonitor.transferInitiated(TransferListenerContext.builder() + .initialResponse(getObjectResponse) + .build()); + + return downloadFileMonitor; + } + + private DownloadFileRequest getDownloadFileRequest() { + return DownloadFileRequest.builder() + .destination(Paths.get(".")) + .getObjectRequest(GetObjectRequest.builder().key("KEY").bucket("BUCKET").build()) + + .build(); + } + } \ No newline at end of file diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperParameterizedTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperParameterizedTest.java index c5c18e34c0dc..1a595e378bb1 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperParameterizedTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperParameterizedTest.java @@ -28,6 +28,7 @@ import java.nio.file.FileSystem; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.Paths; import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -40,6 +41,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; import org.mockito.ArgumentCaptor; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.transfer.s3.CompletedDirectoryDownload; import software.amazon.awssdk.transfer.s3.CompletedFileDownload; @@ -49,6 +51,7 @@ import software.amazon.awssdk.transfer.s3.FileDownload; import software.amazon.awssdk.transfer.s3.internal.progress.DefaultTransferProgress; import software.amazon.awssdk.transfer.s3.internal.progress.DefaultTransferProgressSnapshot; +import software.amazon.awssdk.transfer.s3.internal.progress.DownloadFileMonitor; /** * Testing {@link DownloadDirectoryHelper} with different file systems. @@ -142,7 +145,10 @@ private static DefaultFileDownload completedDownload() { return new DefaultFileDownload(CompletableFuture.completedFuture(CompletedFileDownload.builder() .response(GetObjectResponse.builder().build()) .build()), - new DefaultTransferProgress(DefaultTransferProgressSnapshot.builder().build())); + new DefaultTransferProgress(DefaultTransferProgressSnapshot.builder().build()), + new DownloadFileMonitor(DownloadFileRequest.builder().getObjectRequest(GetObjectRequest.builder().build()) + .destination(Paths.get(".")) + .build())); } private static void verifyDestinationPathForSingleDownload(FileSystem jimfs, String delimiter, String[] keys, diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperTest.java index cbd9df604ae1..e516fb2ebe6e 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperTest.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.nio.file.FileSystem; import java.nio.file.Path; +import java.nio.file.Paths; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -37,6 +38,7 @@ import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.transfer.s3.CompletedDirectoryDownload; import software.amazon.awssdk.transfer.s3.CompletedFileDownload; @@ -46,6 +48,7 @@ import software.amazon.awssdk.transfer.s3.FileDownload; import software.amazon.awssdk.transfer.s3.internal.progress.DefaultTransferProgress; import software.amazon.awssdk.transfer.s3.internal.progress.DefaultTransferProgressSnapshot; +import software.amazon.awssdk.transfer.s3.internal.progress.DownloadFileMonitor; public class DownloadDirectoryHelperTest { private static FileSystem jimfs; @@ -174,7 +177,9 @@ private FileDownload newFailedDownload(SdkClientException exception) { private FileDownload newDownload(CompletableFuture future) { return new DefaultFileDownload(future, - new DefaultTransferProgress(DefaultTransferProgressSnapshot.builder().build()) - ); + new DefaultTransferProgress(DefaultTransferProgressSnapshot.builder().build()), + new DownloadFileMonitor(DownloadFileRequest.builder().getObjectRequest(GetObjectRequest.builder().build()) + .destination(Paths.get(".")) + .build())); } } From 8bfd9a856ed9cce33b132321f673e4e80ab10b27 Mon Sep 17 00:00:00 2001 From: Zoe Wang <33073555+zoewangg@users.noreply.github.com> Date: Wed, 16 Mar 2022 10:40:54 -0700 Subject: [PATCH 2/5] Address feedback --- .../awssdk/transfer/s3/FileDownload.java | 4 ++-- ...wnload.java => ResumableFileDownload.java} | 14 ++++++------ .../awssdk/transfer/s3/S3TransferManager.java | 2 +- .../s3/internal/DefaultFileDownload.java | 14 ++++++------ .../s3/internal/exception/PauseException.java | 22 +++++++++++++++++++ .../s3/internal/DefaultFileDownloadTest.java | 4 ++-- 6 files changed, 41 insertions(+), 19 deletions(-) rename services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/{PersistableFileDownload.java => ResumableFileDownload.java} (88%) create mode 100644 services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/exception/PauseException.java diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/FileDownload.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/FileDownload.java index d06629993550..0463d5e5289b 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/FileDownload.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/FileDownload.java @@ -31,10 +31,10 @@ public interface FileDownload extends ObjectTransfer { * Pause the current download operation and returns the information that can * be used to resume the download at a later time. * - * @return {@link PersistableFileDownload} that can be used to resume the download + * @return {@link ResumableFileDownload} that can be used to resume the download * @throws TransferPauseException if pause fails */ - PersistableFileDownload pause(); + ResumableFileDownload pause(); @Override CompletableFuture completionFuture(); diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/PersistableFileDownload.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/ResumableFileDownload.java similarity index 88% rename from services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/PersistableFileDownload.java rename to services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/ResumableFileDownload.java index f08139ef6b8e..43637f6ff60a 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/PersistableFileDownload.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/ResumableFileDownload.java @@ -28,13 +28,13 @@ * @see S3TransferManager#downloadFile(DownloadFileRequest) */ @SdkPublicApi -public final class PersistableFileDownload implements PersistableTransfer, - ToCopyableBuilder { +public final class ResumableFileDownload implements PersistableTransfer, + ToCopyableBuilder { private final DownloadFileRequest downloadFileRequest; private final long bytesTransferred; private final Instant lastModified; - private PersistableFileDownload(DefaultBuilder builder) { + private ResumableFileDownload(DefaultBuilder builder) { this.downloadFileRequest = builder.downloadFileRequest; this.bytesTransferred = Validate.paramNotNull(builder.bytesTransferred, "bytesTransferred"); this.lastModified = Validate.paramNotNull(builder.lastModified, "lastModified"); @@ -71,7 +71,7 @@ public Builder toBuilder() { return new DefaultBuilder(this); } - public interface Builder extends CopyableBuilder { + public interface Builder extends CopyableBuilder { /** * Sets the download file request @@ -107,7 +107,7 @@ private DefaultBuilder() { } - private DefaultBuilder(PersistableFileDownload persistableFileDownload) { + private DefaultBuilder(ResumableFileDownload persistableFileDownload) { this.downloadFileRequest = persistableFileDownload.downloadFileRequest; this.bytesTransferred = persistableFileDownload.bytesTransferred; this.lastModified = persistableFileDownload.lastModified; @@ -132,8 +132,8 @@ public Builder lastModified(Instant lastModified) { } @Override - public PersistableFileDownload build() { - return new PersistableFileDownload(this); + public ResumableFileDownload build() { + return new ResumableFileDownload(this); } } } diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/S3TransferManager.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/S3TransferManager.java index 7962516a80b4..d8c9c42a08fc 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/S3TransferManager.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/S3TransferManager.java @@ -128,7 +128,7 @@ default FileDownload downloadFile(Consumer request) * @param persistableFileDownload the download to resume. * @return A new {@code FileDownload} object to use to check the state of the download. */ - default FileDownload resumeDownloadFile(PersistableFileDownload persistableFileDownload) { + default FileDownload resumeDownloadFile(ResumableFileDownload persistableFileDownload) { throw new UnsupportedOperationException(); } diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileDownload.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileDownload.java index 10cc2be083ab..df900d018dff 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileDownload.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileDownload.java @@ -25,7 +25,7 @@ import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.transfer.s3.CompletedFileDownload; import software.amazon.awssdk.transfer.s3.FileDownload; -import software.amazon.awssdk.transfer.s3.PersistableFileDownload; +import software.amazon.awssdk.transfer.s3.ResumableFileDownload; import software.amazon.awssdk.transfer.s3.exception.TransferPauseException; import software.amazon.awssdk.transfer.s3.internal.progress.DownloadFileMonitor; import software.amazon.awssdk.transfer.s3.progress.TransferProgress; @@ -53,17 +53,17 @@ public TransferProgress progress() { } @Override - public PersistableFileDownload pause() { + public ResumableFileDownload pause() { validatePauseStatus(); paused = true; completionFuture.cancel(false); GetObjectResponse getObjectResponse = monitor.initialResponse().get(); long bytesTransferred = progress.snapshot().bytesTransferred(); - return PersistableFileDownload.builder() - .downloadFileRequest(monitor.downloadFileRequest()) - .lastModified(getObjectResponse.lastModified()) - .bytesTransferred(bytesTransferred) - .build(); + return ResumableFileDownload.builder() + .downloadFileRequest(monitor.downloadFileRequest()) + .lastModified(getObjectResponse.lastModified()) + .bytesTransferred(bytesTransferred) + .build(); } private void validatePauseStatus() { diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/exception/PauseException.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/exception/PauseException.java new file mode 100644 index 000000000000..8edde74a00fd --- /dev/null +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/exception/PauseException.java @@ -0,0 +1,22 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.transfer.s3.internal.exception; + +/** + * Exception thrown when the response subscription is cancelled. + */ +public class PauseException { +} diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileDownloadTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileDownloadTest.java index 6908ddf51f2e..3a4cb7264752 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileDownloadTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileDownloadTest.java @@ -30,7 +30,7 @@ import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.transfer.s3.CompletedFileDownload; import software.amazon.awssdk.transfer.s3.DownloadFileRequest; -import software.amazon.awssdk.transfer.s3.PersistableFileDownload; +import software.amazon.awssdk.transfer.s3.ResumableFileDownload; import software.amazon.awssdk.transfer.s3.exception.TransferPauseException; import software.amazon.awssdk.transfer.s3.internal.progress.DefaultTransferProgressSnapshot; import software.amazon.awssdk.transfer.s3.internal.progress.DownloadFileMonitor; @@ -63,7 +63,7 @@ void pause_shouldReturnCorrectly() { transferProgress, downloadFileMonitor); - PersistableFileDownload pause = fileDownload.pause(); + ResumableFileDownload pause = fileDownload.pause(); assertThat(pause.downloadFileRequest()).isEqualTo(downloadFileMonitor.downloadFileRequest()); assertThat(pause.bytesTransferred()).isEqualTo(1000L); assertThat(pause.lastModified()).isEqualTo(downloadFileMonitor.initialResponse().get().lastModified()); From 95b007f6928dd7bee62a5af23ac7e54d6a266ff1 Mon Sep 17 00:00:00 2001 From: Zoe Wang <33073555+zoewangg@users.noreply.github.com> Date: Mon, 21 Mar 2022 20:41:12 -0700 Subject: [PATCH 3/5] Refactor the logic --- ...gerDownloadPauseResumeIntegrationTest.java | 108 ++++++++++++++++++ .../awssdk/transfer/s3/FileDownload.java | 2 - .../transfer/s3/ResumableFileDownload.java | 81 ++++++++++++- ...leTransfer.java => ResumableTransfer.java} | 7 +- .../awssdk/transfer/s3/S3TransferManager.java | 32 ++++++ .../s3/exception/TransferPauseException.java | 99 ---------------- .../s3/internal/DefaultFileDownload.java | 75 ++++++------ .../s3/internal/DefaultS3TransferManager.java | 14 +-- .../s3/internal/exception/PauseException.java | 22 ---- .../DefaultTransferProgressSnapshot.java | 17 ++- .../progress/DownloadFileMonitor.java | 72 ------------ .../progress/TransferListenerContext.java | 15 --- .../TransferListenerFailedContext.java | 6 - .../progress/TransferProgressUpdater.java | 32 ++---- .../s3/progress/TransferListener.java | 3 - .../s3/progress/TransferProgressSnapshot.java | 13 +++ .../s3/internal/DefaultFileDownloadTest.java | 88 ++++++-------- ...nloadDirectoryHelperParameterizedTest.java | 5 +- .../internal/DownloadDirectoryHelperTest.java | 5 +- 19 files changed, 343 insertions(+), 353 deletions(-) create mode 100644 services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerDownloadPauseResumeIntegrationTest.java rename services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/{PersistableTransfer.java => ResumableTransfer.java} (81%) delete mode 100644 services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/exception/TransferPauseException.java delete mode 100644 services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/exception/PauseException.java delete mode 100644 services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/DownloadFileMonitor.java diff --git a/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerDownloadPauseResumeIntegrationTest.java b/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerDownloadPauseResumeIntegrationTest.java new file mode 100644 index 000000000000..c0fc697c583e --- /dev/null +++ b/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerDownloadPauseResumeIntegrationTest.java @@ -0,0 +1,108 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.transfer.s3; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.core.SdkResponse; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.testutils.RandomTempFile; +import software.amazon.awssdk.transfer.s3.progress.TransferListener; + +public class S3TransferManagerDownloadPauseResumeIntegrationTest extends S3IntegrationTestBase { + private static final String BUCKET = temporaryBucketName(S3TransferManagerDownloadPauseResumeIntegrationTest.class); + private static final String KEY = "key"; + private static final int OBJ_SIZE = 16 * 1024 * 1024; + private static S3TransferManager tm; + private static File file; + + @BeforeAll + public static void setup() throws IOException { + createBucket(BUCKET); + file = new RandomTempFile(OBJ_SIZE); + s3.putObject(PutObjectRequest.builder() + .bucket(BUCKET) + .key(KEY) + .build(), file.toPath()); + tm = S3TransferManager.builder() + .s3ClientConfiguration(b -> b.region(DEFAULT_REGION) + .credentialsProvider(CREDENTIALS_PROVIDER_CHAIN)) + .build(); + } + + @AfterAll + public static void cleanup() { + deleteBucketAndAllContents(BUCKET); + tm.close(); + S3IntegrationTestBase.cleanUp(); + } + + @Test + public void downloadToFile_pause_shouldReturnResumableDownload() throws InterruptedException { + CountDownLatch countDownLatch = new CountDownLatch(1); + Path path = RandomTempFile.randomUncreatedFile().toPath(); + TestDownloadListener testDownloadListener = new TestDownloadListener(countDownLatch); + DownloadFileRequest request = DownloadFileRequest.builder() + .getObjectRequest(b -> b.bucket(BUCKET).key(KEY)) + .destination(path) + .overrideConfiguration(b -> b + .addListener(testDownloadListener)) + .build(); + FileDownload download = + tm.downloadFile(request); + boolean count = countDownLatch.await(10, TimeUnit.SECONDS); + if (!count) { + throw new AssertionError("No data has been transferred within 5 seconds"); + } + ResumableFileDownload pause = download.pause(); + assertThat(pause.downloadFileRequest()).isEqualTo(request); + assertThat(testDownloadListener.getObjectResponse).isNotNull(); + assertThat(pause.lastModified()).isEqualTo(testDownloadListener.getObjectResponse.lastModified()); + assertThat(pause.bytesTransferred()).isLessThanOrEqualTo(path.toFile().length()); + assertThat(pause.transferSizeInBytes()).hasValue(file.length()); + assertThat(download.completionFuture()).isCancelled(); + } + + private static final class TestDownloadListener implements TransferListener { + private final CountDownLatch countDownLatch; + private GetObjectResponse getObjectResponse; + + private TestDownloadListener(CountDownLatch countDownLatch) { + this.countDownLatch = countDownLatch; + } + + @Override + public void bytesTransferred(Context.BytesTransferred context) { + Optional sdkResponse = context.progressSnapshot().sdkResponse(); + if (sdkResponse.isPresent() && sdkResponse.get() instanceof GetObjectResponse) { + getObjectResponse = (GetObjectResponse) sdkResponse.get(); + } + countDownLatch.countDown(); + } + } + +} diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/FileDownload.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/FileDownload.java index 0463d5e5289b..eddb1f359adc 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/FileDownload.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/FileDownload.java @@ -18,7 +18,6 @@ import java.util.concurrent.CompletableFuture; import software.amazon.awssdk.annotations.SdkPreviewApi; import software.amazon.awssdk.annotations.SdkPublicApi; -import software.amazon.awssdk.transfer.s3.exception.TransferPauseException; /** * A download transfer of a single object from S3. @@ -32,7 +31,6 @@ public interface FileDownload extends ObjectTransfer { * be used to resume the download at a later time. * * @return {@link ResumableFileDownload} that can be used to resume the download - * @throws TransferPauseException if pause fails */ ResumableFileDownload pause(); diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/ResumableFileDownload.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/ResumableFileDownload.java index 43637f6ff60a..676348fccda9 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/ResumableFileDownload.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/ResumableFileDownload.java @@ -16,7 +16,10 @@ package software.amazon.awssdk.transfer.s3; import java.time.Instant; +import java.util.Objects; +import java.util.Optional; import software.amazon.awssdk.annotations.SdkPublicApi; +import software.amazon.awssdk.utils.ToString; import software.amazon.awssdk.utils.Validate; import software.amazon.awssdk.utils.builder.CopyableBuilder; import software.amazon.awssdk.utils.builder.ToCopyableBuilder; @@ -25,19 +28,66 @@ * An opaque token that holds the state and can be used to resume a * paused download operation. * + * TODO: should we just store GetObjectResponse? Do we actually need bytesTransferred since + * it can be inferred from file content length + * * @see S3TransferManager#downloadFile(DownloadFileRequest) */ @SdkPublicApi -public final class ResumableFileDownload implements PersistableTransfer, +public final class ResumableFileDownload implements ResumableTransfer, ToCopyableBuilder { private final DownloadFileRequest downloadFileRequest; private final long bytesTransferred; private final Instant lastModified; + private final Long transferSizeInBytes; private ResumableFileDownload(DefaultBuilder builder) { - this.downloadFileRequest = builder.downloadFileRequest; - this.bytesTransferred = Validate.paramNotNull(builder.bytesTransferred, "bytesTransferred"); - this.lastModified = Validate.paramNotNull(builder.lastModified, "lastModified"); + this.downloadFileRequest = Validate.paramNotNull(builder.downloadFileRequest, "downloadFileRequest"); + this.bytesTransferred = builder.bytesTransferred == null ? 0 : builder.bytesTransferred; + this.lastModified = builder.lastModified; + this.transferSizeInBytes = builder.transferSizeInBytes; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ResumableFileDownload that = (ResumableFileDownload) o; + + if (bytesTransferred != that.bytesTransferred) { + return false; + } + if (!downloadFileRequest.equals(that.downloadFileRequest)) { + return false; + } + if (!Objects.equals(lastModified, that.lastModified)) { + return false; + } + return Objects.equals(transferSizeInBytes, that.transferSizeInBytes); + } + + @Override + public int hashCode() { + int result = downloadFileRequest.hashCode(); + result = 31 * result + (int) (bytesTransferred ^ (bytesTransferred >>> 32)); + result = 31 * result + (lastModified != null ? lastModified.hashCode() : 0); + result = 31 * result + (transferSizeInBytes != null ? transferSizeInBytes.hashCode() : 0); + return result; + } + + @Override + public String toString() { + return ToString.builder("ResumableFileDownload") + .add("downloadFileRequest", downloadFileRequest) + .add("bytesTransferred", bytesTransferred) + .add("lastModified", lastModified) + .add("transferSizeInBytes", transferSizeInBytes) + .build(); } public static Builder builder() { @@ -66,6 +116,15 @@ public Instant lastModified() { return lastModified; } + /** + * The total size of the transfer in bytes, or {@link Optional#empty()} if unknown + * + * @return the optional total size of the transfer. + */ + public Optional transferSizeInBytes() { + return Optional.ofNullable(transferSizeInBytes); + } + @Override public Builder toBuilder() { return new DefaultBuilder(this); @@ -89,6 +148,13 @@ public interface Builder extends CopyableBuilder */ Builder bytesTransferred(Long bytesTransferred); + /** + * Sets the total transfer size in bytes + * @param transferSizeInBytes the transfer size in bytes + * @return a reference to this object so that method calls can be chained together. + */ + Builder transferSizeInBytes(Long transferSizeInBytes); + /** * Sets the last modified time of the object * @@ -102,6 +168,7 @@ private static final class DefaultBuilder implements Builder { private DownloadFileRequest downloadFileRequest; private Long bytesTransferred; private Instant lastModified; + private Long transferSizeInBytes; private DefaultBuilder() { @@ -125,6 +192,12 @@ public Builder bytesTransferred(Long bytesTransferred) { return this; } + @Override + public Builder transferSizeInBytes(Long transferSizeInBytes) { + this.transferSizeInBytes = transferSizeInBytes; + return this; + } + @Override public Builder lastModified(Instant lastModified) { this.lastModified = lastModified; diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/PersistableTransfer.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/ResumableTransfer.java similarity index 81% rename from services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/PersistableTransfer.java rename to services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/ResumableTransfer.java index 9fd299514913..0ed948aabcf3 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/PersistableTransfer.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/ResumableTransfer.java @@ -15,11 +15,16 @@ package software.amazon.awssdk.transfer.s3; +import software.amazon.awssdk.annotations.SdkPreviewApi; +import software.amazon.awssdk.annotations.SdkPublicApi; + /** * Contains the information of a pausible upload or download; such * information can be used to resume the upload or download later on * * @see FileDownload#pause() */ -public interface PersistableTransfer { +@SdkPublicApi +@SdkPreviewApi +public interface ResumableTransfer { } diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/S3TransferManager.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/S3TransferManager.java index d8c9c42a08fc..03432bebf52d 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/S3TransferManager.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/S3TransferManager.java @@ -125,13 +125,45 @@ default FileDownload downloadFile(Consumer request) * Resumes a download operation. This download operation uses the same configuration as the original download. Any data * already fetched will be skipped, and only the remaining data is retrieved from Amazon S3. * + *

+ * Usage Example: + *

+     * {@code
+     * // Initiate the transfer
+     * FileDownload download =
+     *     tm.downloadFile(d -> d.getObjectRequest(g -> g.bucket("bucket").key("key"))
+     *                           .destination(Paths.get("myFile.txt")));
+     *
+     * // Pause the download
+     * ResumableFileDownload resumableFileDownload = download.pause();
+     *
+     * // Resume the download
+     * FileDownload resumedDownload = tm.resumeDownloadFile(resumableFileDownload);
+     *
+     * // Wait for the transfer to complete
+     * resumedDownload.completionFuture().join();
+     * }
+     * 
+ * * @param persistableFileDownload the download to resume. * @return A new {@code FileDownload} object to use to check the state of the download. + * @see #downloadFile(DownloadFileRequest) + * @see FileDownload#pause() */ default FileDownload resumeDownloadFile(ResumableFileDownload persistableFileDownload) { throw new UnsupportedOperationException(); } + /** + * This is a convenience method that creates an instance of the {@link ResumableFileDownload} builder, avoiding the need to + * create one manually via {@link ResumableFileDownload#builder()}. + * + * @see #resumeDownloadFile(ResumableFileDownload) + */ + default FileDownload resumeDownloadFile(Consumer persistableFileDownload) { + return resumeDownloadFile(ResumableFileDownload.builder().applyMutation(persistableFileDownload).build()); + } + /** * Download an object identified by the bucket and key from S3 through the given {@link AsyncResponseTransformer}. For * downloading to a file, you may use {@link #downloadFile(DownloadFileRequest)} instead. diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/exception/TransferPauseException.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/exception/TransferPauseException.java deleted file mode 100644 index 9b7b87412615..000000000000 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/exception/TransferPauseException.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package software.amazon.awssdk.transfer.s3.exception; - -import software.amazon.awssdk.annotations.SdkPublicApi; -import software.amazon.awssdk.core.exception.SdkClientException; - -@SdkPublicApi -public final class TransferPauseException extends SdkClientException { - private static final long serialVersionUID = 1L; - private final ErrorCode errorCode; - - private TransferPauseException(Builder builder) { - super(builder); - this.errorCode = builder.errorCode(); - } - - public ErrorCode errorCode() { - return errorCode; - } - - public static TransferPauseException create(ErrorCode error, String message) { - return builder().errorCode(error).message(message).build(); - } - - public static Builder builder() { - return new BuilderImpl(); - } - - public enum ErrorCode { - /** - * Pause is not yet applicable since transfer has not started - */ - NOT_STARTED, - - /** - * Pause is not possible since transfer has finished - */ - ALREADY_FINISHED, - - /** - * Pause is not possible because a previous pause was requested - */ - PAUSE_IN_PROGRESS - } - - public interface Builder extends SdkClientException.Builder { - - Builder errorCode(ErrorCode errorCode); - - @Override - Builder message(String message); - - ErrorCode errorCode(); - - @Override - TransferPauseException build(); - } - - protected static class BuilderImpl extends SdkClientException.BuilderImpl implements Builder { - private ErrorCode errorCode; - - @Override - public Builder errorCode(ErrorCode errorCode) { - this.errorCode = errorCode; - return this; - } - - @Override - public Builder message(String message) { - this.message = message; - return this; - } - - @Override - public ErrorCode errorCode() { - return errorCode; - } - - @Override - public TransferPauseException build() { - return new TransferPauseException(this); - } - - } -} diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileDownload.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileDownload.java index df900d018dff..d005ad0a4eab 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileDownload.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileDownload.java @@ -15,36 +15,36 @@ package software.amazon.awssdk.transfer.s3.internal; -import static software.amazon.awssdk.transfer.s3.exception.TransferPauseException.ErrorCode.ALREADY_FINISHED; -import static software.amazon.awssdk.transfer.s3.exception.TransferPauseException.ErrorCode.NOT_STARTED; -import static software.amazon.awssdk.transfer.s3.exception.TransferPauseException.ErrorCode.PAUSE_IN_PROGRESS; - +import java.time.Instant; import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.transfer.s3.CompletedFileDownload; +import software.amazon.awssdk.transfer.s3.DownloadFileRequest; import software.amazon.awssdk.transfer.s3.FileDownload; import software.amazon.awssdk.transfer.s3.ResumableFileDownload; -import software.amazon.awssdk.transfer.s3.exception.TransferPauseException; -import software.amazon.awssdk.transfer.s3.internal.progress.DownloadFileMonitor; import software.amazon.awssdk.transfer.s3.progress.TransferProgress; +import software.amazon.awssdk.transfer.s3.progress.TransferProgressSnapshot; +import software.amazon.awssdk.utils.Logger; import software.amazon.awssdk.utils.ToString; @SdkInternalApi public final class DefaultFileDownload implements FileDownload { - + private static final Logger log = Logger.loggerFor(FileDownload.class); private final CompletableFuture completionFuture; private final TransferProgress progress; - private final DownloadFileMonitor monitor; - private volatile boolean paused; + private final DownloadFileRequest request; + private final AtomicReference resumableFileDownload; DefaultFileDownload(CompletableFuture completionFuture, TransferProgress progress, - DownloadFileMonitor monitor) { + DownloadFileRequest request) { this.completionFuture = completionFuture; this.progress = progress; - this.monitor = monitor; + this.resumableFileDownload = new AtomicReference<>(); + this.request = request; } @Override @@ -54,32 +54,29 @@ public TransferProgress progress() { @Override public ResumableFileDownload pause() { - validatePauseStatus(); - paused = true; - completionFuture.cancel(false); - GetObjectResponse getObjectResponse = monitor.initialResponse().get(); - long bytesTransferred = progress.snapshot().bytesTransferred(); - return ResumableFileDownload.builder() - .downloadFileRequest(monitor.downloadFileRequest()) - .lastModified(getObjectResponse.lastModified()) - .bytesTransferred(bytesTransferred) - .build(); - } - - private void validatePauseStatus() { - if (paused) { - throw TransferPauseException.create(PAUSE_IN_PROGRESS, - "Pause failed because a previous pause was requested"); - } - - if (monitor.isFinished()) { - throw TransferPauseException.create(ALREADY_FINISHED, "Pause failed because the transfer has already finished"); - } - - if (!monitor.initialResponse().isPresent()) { - throw TransferPauseException.create(NOT_STARTED, "Pause failed because the transfer has not been initiated yet. " - + "Please try later."); + log.trace(() -> "Start to pause " + request); + if (resumableFileDownload.get() == null) { + completionFuture.cancel(false); + + Instant lastModified = null; + Long totalBytesTransferred = null; + TransferProgressSnapshot snapshot = progress.snapshot(); + if (snapshot.sdkResponse().isPresent() && snapshot.sdkResponse().get() instanceof GetObjectResponse) { + GetObjectResponse getObjectResponse = (GetObjectResponse) snapshot.sdkResponse().get(); + lastModified = getObjectResponse.lastModified(); + totalBytesTransferred = getObjectResponse.contentLength(); + } + + long bytesTransferred = snapshot.bytesTransferred(); + ResumableFileDownload fileDownload = ResumableFileDownload.builder() + .downloadFileRequest(request) + .lastModified(lastModified) + .bytesTransferred(bytesTransferred) + .transferSizeInBytes(totalBytesTransferred) + .build(); + resumableFileDownload.set(fileDownload); } + return resumableFileDownload.get(); } @Override @@ -102,7 +99,7 @@ public boolean equals(Object o) { return false; } - if (!Objects.equals(monitor, that.monitor)) { + if (!Objects.equals(request, that.request)) { return false; } @@ -112,7 +109,7 @@ public boolean equals(Object o) { @Override public int hashCode() { int result = completionFuture != null ? completionFuture.hashCode() : 0; - result = 31 * result + (monitor != null ? monitor.hashCode() : 0); + result = 31 * result + (request != null ? request.hashCode() : 0); result = 31 * result + (progress != null ? progress.hashCode() : 0); return result; } @@ -122,7 +119,7 @@ public String toString() { return ToString.builder("DefaultFileDownload") .add("completionFuture", completionFuture) .add("progress", progress) - .add("monitor", monitor) + .add("request", request) .build(); } } diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultS3TransferManager.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultS3TransferManager.java index 2958cfb90be2..fc86ace30155 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultS3TransferManager.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultS3TransferManager.java @@ -50,7 +50,6 @@ import software.amazon.awssdk.transfer.s3.UploadDirectoryRequest; import software.amazon.awssdk.transfer.s3.UploadFileRequest; import software.amazon.awssdk.transfer.s3.UploadRequest; -import software.amazon.awssdk.transfer.s3.internal.progress.DownloadFileMonitor; import software.amazon.awssdk.transfer.s3.internal.progress.TransferProgressUpdater; import software.amazon.awssdk.utils.CompletableFutureUtils; import software.amazon.awssdk.utils.Validate; @@ -111,7 +110,7 @@ public Upload upload(UploadRequest uploadRequest) { CompletableFuture returnFuture = new CompletableFuture<>(); - TransferProgressUpdater progressUpdater = new TransferProgressUpdater(uploadRequest, requestBody, null); + TransferProgressUpdater progressUpdater = new TransferProgressUpdater(uploadRequest, requestBody); progressUpdater.transferInitiated(); requestBody = progressUpdater.wrapRequestBody(requestBody); progressUpdater.registerCompletion(returnFuture); @@ -144,7 +143,7 @@ public FileUpload uploadFile(UploadFileRequest uploadFileRequest) { CompletableFuture returnFuture = new CompletableFuture<>(); - TransferProgressUpdater progressUpdater = new TransferProgressUpdater(uploadFileRequest, requestBody, null); + TransferProgressUpdater progressUpdater = new TransferProgressUpdater(uploadFileRequest, requestBody); progressUpdater.transferInitiated(); requestBody = progressUpdater.wrapRequestBody(requestBody); progressUpdater.registerCompletion(returnFuture); @@ -191,7 +190,7 @@ public Download download(DownloadRequest downloadReq CompletableFuture> returnFuture = new CompletableFuture<>(); - TransferProgressUpdater progressUpdater = new TransferProgressUpdater(downloadRequest, null, null); + TransferProgressUpdater progressUpdater = new TransferProgressUpdater(downloadRequest, null); progressUpdater.transferInitiated(); responseTransformer = progressUpdater.wrapResponseTransformer(responseTransformer); progressUpdater.registerCompletion(returnFuture); @@ -225,10 +224,9 @@ public FileDownload downloadFile(DownloadFileRequest downloadRequest) { FileTransformerConfiguration.defaultCreateOrReplaceExisting()); CompletableFuture returnFuture = new CompletableFuture<>(); - DownloadFileMonitor listener = new DownloadFileMonitor(downloadRequest); CompletableFuture downloadFuture = new CompletableFuture<>(); - TransferProgressUpdater progressUpdater = new TransferProgressUpdater(downloadRequest, null, listener); + TransferProgressUpdater progressUpdater = new TransferProgressUpdater(downloadRequest, null); progressUpdater.transferInitiated(); responseTransformer = progressUpdater.wrapResponseTransformer(responseTransformer); progressUpdater.registerCompletion(returnFuture); @@ -250,7 +248,7 @@ public FileDownload downloadFile(DownloadFileRequest downloadRequest) { returnFuture.completeExceptionally(throwable); } - return new DefaultFileDownload(downloadFuture, progressUpdater.progress(), listener); + return new DefaultFileDownload(downloadFuture, progressUpdater.progress(), downloadRequest); } @Override @@ -272,7 +270,7 @@ public Copy copy(CopyRequest copyRequest) { CompletableFuture returnFuture = new CompletableFuture<>(); - TransferProgressUpdater progressUpdater = new TransferProgressUpdater(copyRequest, null, null); + TransferProgressUpdater progressUpdater = new TransferProgressUpdater(copyRequest, null); progressUpdater.transferInitiated(); progressUpdater.registerCompletion(returnFuture); diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/exception/PauseException.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/exception/PauseException.java deleted file mode 100644 index 8edde74a00fd..000000000000 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/exception/PauseException.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package software.amazon.awssdk.transfer.s3.internal.exception; - -/** - * Exception thrown when the response subscription is cancelled. - */ -public class PauseException { -} diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/DefaultTransferProgressSnapshot.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/DefaultTransferProgressSnapshot.java index 0a713376a3df..1b63bfbdcab6 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/DefaultTransferProgressSnapshot.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/DefaultTransferProgressSnapshot.java @@ -17,6 +17,7 @@ import java.util.Optional; import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.core.SdkResponse; import software.amazon.awssdk.transfer.s3.progress.TransferProgressSnapshot; import software.amazon.awssdk.utils.ToString; import software.amazon.awssdk.utils.Validate; @@ -33,6 +34,7 @@ public final class DefaultTransferProgressSnapshot private final long bytesTransferred; private final Long transferSizeInBytes; + private final SdkResponse sdkResponse; private DefaultTransferProgressSnapshot(Builder builder) { if (builder.transferSizeInBytes != null) { @@ -43,6 +45,7 @@ private DefaultTransferProgressSnapshot(Builder builder) { } this.bytesTransferred = Validate.isNotNegative(builder.bytesTransferred, "bytesTransferred"); this.transferSizeInBytes = builder.transferSizeInBytes; + this.sdkResponse = builder.sdkResponse; } public static Builder builder() { @@ -64,6 +67,11 @@ public Optional transferSizeInBytes() { return Optional.ofNullable(transferSizeInBytes); } + @Override + public Optional sdkResponse() { + return Optional.ofNullable(sdkResponse); + } + @Override public Optional ratioTransferred() { return transferSizeInBytes() @@ -81,20 +89,22 @@ public String toString() { return ToString.builder("TransferProgressSnapshot") .add("bytesTransferred", bytesTransferred) .add("transferSizeInBytes", transferSizeInBytes) + .add("sdkResponse", sdkResponse) .build(); } public static final class Builder implements CopyableBuilder { private long bytesTransferred = 0L; private Long transferSizeInBytes; + private SdkResponse sdkResponse; private Builder() { - super(); } private Builder(DefaultTransferProgressSnapshot snapshot) { this.bytesTransferred = snapshot.bytesTransferred; this.transferSizeInBytes = snapshot.transferSizeInBytes; + this.sdkResponse = snapshot.sdkResponse; } public Builder bytesTransferred(long bytesTransferred) { @@ -115,6 +125,11 @@ public Long getTransferSizeInBytes() { return transferSizeInBytes; } + public Builder sdkResponse(SdkResponse sdkResponse) { + this.sdkResponse = sdkResponse; + return this; + } + @Override public DefaultTransferProgressSnapshot build() { return new DefaultTransferProgressSnapshot(this); diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/DownloadFileMonitor.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/DownloadFileMonitor.java deleted file mode 100644 index 9dd64c373d81..000000000000 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/DownloadFileMonitor.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package software.amazon.awssdk.transfer.s3.internal.progress; - -import java.util.Optional; -import software.amazon.awssdk.annotations.SdkInternalApi; -import software.amazon.awssdk.services.s3.model.GetObjectResponse; -import software.amazon.awssdk.transfer.s3.CompletedFileDownload; -import software.amazon.awssdk.transfer.s3.DownloadFileRequest; -import software.amazon.awssdk.transfer.s3.progress.TransferListener; - -@SdkInternalApi -public class DownloadFileMonitor implements TransferListener { - private final DownloadFileRequest downloadFileRequest; - private GetObjectResponse initialResponse; - private CompletedFileDownload finalResponse; - private boolean isFinished; - - public DownloadFileMonitor(DownloadFileRequest downloadFileRequest) { - this.downloadFileRequest = downloadFileRequest; - } - - @Override - public void transferInitiated(Context.TransferInitiated context) { - - if (context.initialResponse() instanceof GetObjectResponse) { - initialResponse = (GetObjectResponse) context.initialResponse(); - } - } - - /** - * @return whether the current transfer is finished or not - */ - public boolean isFinished() { - return isFinished; - } - - public Optional initialResponse() { - return Optional.of(initialResponse); - } - - @Override - public void transferComplete(Context.TransferComplete context) { - isFinished = true; - finalResponse = (CompletedFileDownload) context.completedTransfer(); - } - - @Override - public void transferFailed(Context.TransferFailed context) { - isFinished = true; - } - - /** - * @return the download file request - */ - public DownloadFileRequest downloadFileRequest() { - return downloadFileRequest; - } -} diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferListenerContext.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferListenerContext.java index 981ecb40dee6..d717af97af0f 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferListenerContext.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferListenerContext.java @@ -17,7 +17,6 @@ import software.amazon.awssdk.annotations.Immutable; import software.amazon.awssdk.annotations.SdkProtectedApi; -import software.amazon.awssdk.core.SdkResponse; import software.amazon.awssdk.transfer.s3.CompletedObjectTransfer; import software.amazon.awssdk.transfer.s3.TransferObjectRequest; import software.amazon.awssdk.transfer.s3.progress.TransferListener; @@ -40,13 +39,11 @@ public final class TransferListenerContext private final TransferObjectRequest request; private final TransferProgressSnapshot progressSnapshot; private final CompletedObjectTransfer completedTransfer; - private final SdkResponse initialResponse; private TransferListenerContext(Builder builder) { this.request = builder.request; this.progressSnapshot = builder.progressSnapshot; this.completedTransfer = builder.completedTransfer; - this.initialResponse = builder.initialResponse; } public static Builder builder() { @@ -63,11 +60,6 @@ public TransferObjectRequest request() { return request; } - @Override - public SdkResponse initialResponse() { - return initialResponse; - } - @Override public TransferProgressSnapshot progressSnapshot() { return progressSnapshot; @@ -84,7 +76,6 @@ public String toString() { .add("request", request) .add("progressSnapshot", progressSnapshot) .add("completedTransfer", completedTransfer) - .add("initialResponse", initialResponse) .build(); } @@ -92,7 +83,6 @@ public static final class Builder implements CopyableBuilder endOfStreamFuture; public TransferProgressUpdater(TransferObjectRequest request, - AsyncRequestBody requestBody, - TransferListener listener) { + AsyncRequestBody requestBody) { DefaultTransferProgressSnapshot.Builder snapshotBuilder = DefaultTransferProgressSnapshot.builder(); getContentLengthSafe(requestBody).ifPresent(snapshotBuilder::transferSizeInBytes); TransferProgressSnapshot snapshot = snapshotBuilder.build(); @@ -57,16 +54,9 @@ public TransferProgressUpdater(TransferObjectRequest request, .progressSnapshot(snapshot) .build(); - List listeners = new ArrayList<>(); - if (listener != null) { - listeners.add(listener); - } - - request.overrideConfiguration() - .map(TransferRequestOverrideConfiguration::listeners) - .ifPresent(listeners::addAll); - - listenerInvoker = new TransferListenerInvoker(listeners); + listenerInvoker = new TransferListenerInvoker(request.overrideConfiguration() + .map(TransferRequestOverrideConfiguration::listeners) + .orElseGet(Collections::emptyList)); endOfStreamFuture = new CompletableFuture<>(); } @@ -112,7 +102,7 @@ public AsyncResponseTransformer wrapRespon @Override public void transformerOnResponse(GetObjectResponse response) { if (response.contentLength() != null) { - progress.updateAndGet(b -> b.transferSizeInBytes(response.contentLength())); + progress.updateAndGet(b -> b.transferSizeInBytes(response.contentLength()).sdkResponse(response)); } } @@ -180,11 +170,11 @@ private void transferComplete(CompletedObjectTransfer r) { private void transferFailed(Throwable t) { listenerInvoker.transferFailed(TransferListenerFailedContext.builder() - .transferContext(context.copy(b -> { - b.progressSnapshot(progress.snapshot()); - })) - .exception(t) - .build()); + .transferContext( + context.copy( + b -> b.progressSnapshot(progress.snapshot()))) + .exception(t) + .build()); } private static Optional getContentLengthSafe(AsyncRequestBody requestBody) { diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/progress/TransferListener.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/progress/TransferListener.java index afd65e62b041..925f77b1b8b8 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/progress/TransferListener.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/progress/TransferListener.java @@ -21,7 +21,6 @@ import software.amazon.awssdk.annotations.SdkProtectedApi; import software.amazon.awssdk.annotations.SdkPublicApi; import software.amazon.awssdk.annotations.ThreadSafe; -import software.amazon.awssdk.core.SdkResponse; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; import software.amazon.awssdk.transfer.s3.CompletedFileDownload; import software.amazon.awssdk.transfer.s3.CompletedFileUpload; @@ -205,8 +204,6 @@ public interface TransferInitiated { */ TransferObjectRequest request(); - SdkResponse initialResponse(); - /** * The immutable {@link TransferProgressSnapshot} for this specific update. */ diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/progress/TransferProgressSnapshot.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/progress/TransferProgressSnapshot.java index 03eeb66750a2..52117a4fcc6d 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/progress/TransferProgressSnapshot.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/progress/TransferProgressSnapshot.java @@ -20,11 +20,14 @@ import software.amazon.awssdk.annotations.SdkPreviewApi; import software.amazon.awssdk.annotations.SdkPublicApi; import software.amazon.awssdk.annotations.ThreadSafe; +import software.amazon.awssdk.core.SdkResponse; import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.PutObjectResponse; import software.amazon.awssdk.transfer.s3.Download; import software.amazon.awssdk.transfer.s3.FileUpload; import software.amazon.awssdk.transfer.s3.S3TransferManager; import software.amazon.awssdk.transfer.s3.TransferRequest; +import software.amazon.awssdk.transfer.s3.Upload; /** * {@link TransferProgressSnapshot} is an immutable, point-in-time representation of the progress of a given transfer @@ -65,6 +68,16 @@ public interface TransferProgressSnapshot { */ Optional transferSizeInBytes(); + /** + * The SDK response, or {@link Optional#empty()} if unknown. + * + *

+ * In the case of {@link Download}, the response is {@link GetObjectResponse}, and the response is known before + * it starts streaming. In the case of {@link Upload}, the response is {@link PutObjectResponse}, and the response is not + * known until streaming finishes. + */ + Optional sdkResponse(); + /** * The ratio of the {@link #transferSizeInBytes()} that has been transferred so far, or {@link Optional#empty()} if unknown. * This method depends on the {@link #transferSizeInBytes()} being known in order to return non-empty. diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileDownloadTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileDownloadTest.java index 3a4cb7264752..df86cd546388 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileDownloadTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileDownloadTest.java @@ -16,9 +16,6 @@ package software.amazon.awssdk.transfer.s3.internal; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static software.amazon.awssdk.transfer.s3.exception.TransferPauseException.ErrorCode.ALREADY_FINISHED; -import static software.amazon.awssdk.transfer.s3.exception.TransferPauseException.ErrorCode.PAUSE_IN_PROGRESS; import java.nio.file.Paths; import java.time.Instant; @@ -31,19 +28,17 @@ import software.amazon.awssdk.transfer.s3.CompletedFileDownload; import software.amazon.awssdk.transfer.s3.DownloadFileRequest; import software.amazon.awssdk.transfer.s3.ResumableFileDownload; -import software.amazon.awssdk.transfer.s3.exception.TransferPauseException; import software.amazon.awssdk.transfer.s3.internal.progress.DefaultTransferProgressSnapshot; -import software.amazon.awssdk.transfer.s3.internal.progress.DownloadFileMonitor; -import software.amazon.awssdk.transfer.s3.internal.progress.TransferListenerContext; import software.amazon.awssdk.transfer.s3.progress.TransferProgress; class DefaultFileDownloadTest { + private static final long OBJECT_CONTENT_LENGTH = 1024L; @Test void equals_hashcode() { EqualsVerifier.forClass(DefaultFileDownload.class) .withNonnullFields("completionFuture", "progress") - .withIgnoredFields("paused") + .withIgnoredFields("resumableFileDownload") .verify(); } @@ -52,90 +47,70 @@ void pause_shouldReturnCorrectly() { CompletableFuture future = new CompletableFuture<>(); TransferProgress transferProgress = Mockito.mock(TransferProgress.class); + GetObjectResponse sdkResponse = getObjectResponse(); Mockito.when(transferProgress.snapshot()).thenReturn(DefaultTransferProgressSnapshot.builder() .bytesTransferred(1000L) + .sdkResponse(sdkResponse) .build()); - DownloadFileMonitor downloadFileMonitor = monitorForInProgressTransfer(); + DownloadFileRequest request = getDownloadFileRequest(); DefaultFileDownload fileDownload = new DefaultFileDownload(future, transferProgress, - downloadFileMonitor); + request); ResumableFileDownload pause = fileDownload.pause(); - assertThat(pause.downloadFileRequest()).isEqualTo(downloadFileMonitor.downloadFileRequest()); + assertThat(pause.downloadFileRequest()).isEqualTo(request); assertThat(pause.bytesTransferred()).isEqualTo(1000L); - assertThat(pause.lastModified()).isEqualTo(downloadFileMonitor.initialResponse().get().lastModified()); + assertThat(pause.lastModified()).isEqualTo(sdkResponse.lastModified()); + assertThat(pause.transferSizeInBytes()).hasValue(sdkResponse.contentLength()); } @Test - void pause_transferAlreadyFinished_shouldThrowException() { + void pause_transferAlreadyFinished_shouldReturnNormally() { + GetObjectResponse getObjectResponse = GetObjectResponse.builder() + .contentLength(OBJECT_CONTENT_LENGTH) + .build(); CompletableFuture future = CompletableFuture.completedFuture(CompletedFileDownload.builder() - .response(GetObjectResponse.builder().build()) + .response(getObjectResponse) .build()); TransferProgress transferProgress = Mockito.mock(TransferProgress.class); - - - DownloadFileMonitor downloadFileMonitor = monitorForCompletedTransfer(); + Mockito.when(transferProgress.snapshot()).thenReturn(DefaultTransferProgressSnapshot.builder() + .bytesTransferred(OBJECT_CONTENT_LENGTH) + .transferSizeInBytes(OBJECT_CONTENT_LENGTH) + .sdkResponse(getObjectResponse) + .build()); DefaultFileDownload fileDownload = new DefaultFileDownload(future, transferProgress, - downloadFileMonitor); - - assertThatThrownBy(fileDownload::pause).isInstanceOf(TransferPauseException.class) - .satisfies(e -> assertThat(((TransferPauseException) e).errorCode()) - .isEqualTo(ALREADY_FINISHED)); - + getDownloadFileRequest()); + ResumableFileDownload resumableFileDownload = fileDownload.pause(); + assertThat(resumableFileDownload.bytesTransferred()).isEqualTo(resumableFileDownload.transferSizeInBytes().get()); } - @Test - void pauseTwice_shouldThrowException() { + void pauseTwice_shouldReturnTheSame() { CompletableFuture future = new CompletableFuture<>(); TransferProgress transferProgress = Mockito.mock(TransferProgress.class); Mockito.when(transferProgress.snapshot()).thenReturn(DefaultTransferProgressSnapshot.builder() .bytesTransferred(1000L) .build()); - DownloadFileMonitor downloadFileMonitor = monitorForInProgressTransfer(); - + DownloadFileRequest request = getDownloadFileRequest(); DefaultFileDownload fileDownload = new DefaultFileDownload(future, transferProgress, - downloadFileMonitor); - - fileDownload.pause(); + request); + ResumableFileDownload resumableFileDownload = fileDownload.pause(); + ResumableFileDownload resumableFileDownload2 = fileDownload.pause(); - assertThatThrownBy(fileDownload::pause).isInstanceOf(TransferPauseException.class) - .satisfies(e -> assertThat(((TransferPauseException) e).errorCode()) - .isEqualTo(PAUSE_IN_PROGRESS)); + assertThat(resumableFileDownload).isEqualTo(resumableFileDownload2); } - private DownloadFileMonitor monitorForCompletedTransfer() { - DownloadFileRequest downloadFileRequest = getDownloadFileRequest(); - DownloadFileMonitor downloadFileMonitor = new DownloadFileMonitor(downloadFileRequest); - downloadFileMonitor.transferComplete(TransferListenerContext.builder().build()); - return downloadFileMonitor; - } - - private DownloadFileMonitor monitorForInProgressTransfer() { - DownloadFileRequest downloadFileRequest = getDownloadFileRequest(); - DownloadFileMonitor downloadFileMonitor = new DownloadFileMonitor(downloadFileRequest); - GetObjectResponse getObjectResponse = GetObjectResponse.builder() - .lastModified(Instant.now()) - .build(); - - downloadFileMonitor.transferInitiated(TransferListenerContext.builder() - .initialResponse(getObjectResponse) - .build()); - - return downloadFileMonitor; - } - private DownloadFileRequest getDownloadFileRequest() { return DownloadFileRequest.builder() .destination(Paths.get(".")) @@ -144,4 +119,11 @@ private DownloadFileRequest getDownloadFileRequest() { .build(); } + private GetObjectResponse getObjectResponse() { + return GetObjectResponse.builder() + .lastModified(Instant.now()) + .contentLength(OBJECT_CONTENT_LENGTH) + .build(); + } + } \ No newline at end of file diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperParameterizedTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperParameterizedTest.java index 1a595e378bb1..8c9ffa76cb44 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperParameterizedTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperParameterizedTest.java @@ -51,7 +51,6 @@ import software.amazon.awssdk.transfer.s3.FileDownload; import software.amazon.awssdk.transfer.s3.internal.progress.DefaultTransferProgress; import software.amazon.awssdk.transfer.s3.internal.progress.DefaultTransferProgressSnapshot; -import software.amazon.awssdk.transfer.s3.internal.progress.DownloadFileMonitor; /** * Testing {@link DownloadDirectoryHelper} with different file systems. @@ -146,9 +145,9 @@ private static DefaultFileDownload completedDownload() { .response(GetObjectResponse.builder().build()) .build()), new DefaultTransferProgress(DefaultTransferProgressSnapshot.builder().build()), - new DownloadFileMonitor(DownloadFileRequest.builder().getObjectRequest(GetObjectRequest.builder().build()) + DownloadFileRequest.builder().getObjectRequest(GetObjectRequest.builder().build()) .destination(Paths.get(".")) - .build())); + .build()); } private static void verifyDestinationPathForSingleDownload(FileSystem jimfs, String delimiter, String[] keys, diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperTest.java index e516fb2ebe6e..b847efcbb772 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperTest.java @@ -48,7 +48,6 @@ import software.amazon.awssdk.transfer.s3.FileDownload; import software.amazon.awssdk.transfer.s3.internal.progress.DefaultTransferProgress; import software.amazon.awssdk.transfer.s3.internal.progress.DefaultTransferProgressSnapshot; -import software.amazon.awssdk.transfer.s3.internal.progress.DownloadFileMonitor; public class DownloadDirectoryHelperTest { private static FileSystem jimfs; @@ -178,8 +177,6 @@ private FileDownload newFailedDownload(SdkClientException exception) { private FileDownload newDownload(CompletableFuture future) { return new DefaultFileDownload(future, new DefaultTransferProgress(DefaultTransferProgressSnapshot.builder().build()), - new DownloadFileMonitor(DownloadFileRequest.builder().getObjectRequest(GetObjectRequest.builder().build()) - .destination(Paths.get(".")) - .build())); + DownloadFileRequest.builder().destination(Paths.get(".")).getObjectRequest(GetObjectRequest.builder().build()).build()); } } From 9a4a68ee1079b096068124f9c5a57670b074a7b3 Mon Sep 17 00:00:00 2001 From: Zoe Wang <33073555+zoewangg@users.noreply.github.com> Date: Tue, 29 Mar 2022 17:37:53 -0700 Subject: [PATCH 4/5] Address feedback --- ...gerDownloadPauseResumeIntegrationTest.java | 6 +-- .../awssdk/transfer/s3/FileDownload.java | 2 + .../transfer/s3/ResumableFileDownload.java | 5 +- .../s3/internal/DefaultFileDownload.java | 51 ++++++++++--------- .../s3/internal/S3CrtAsyncHttpClient.java | 10 ++-- .../internal/S3CrtResponseHandlerAdapter.java | 18 +++++-- .../s3/internal/DefaultFileDownloadTest.java | 36 ++++++++++--- .../src/test/resources/log4j2.properties | 3 ++ 8 files changed, 89 insertions(+), 42 deletions(-) diff --git a/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerDownloadPauseResumeIntegrationTest.java b/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerDownloadPauseResumeIntegrationTest.java index c0fc697c583e..5350da9e4a01 100644 --- a/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerDownloadPauseResumeIntegrationTest.java +++ b/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerDownloadPauseResumeIntegrationTest.java @@ -19,7 +19,6 @@ import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName; import java.io.File; -import java.io.IOException; import java.nio.file.Path; import java.util.Optional; import java.util.concurrent.CountDownLatch; @@ -41,7 +40,8 @@ public class S3TransferManagerDownloadPauseResumeIntegrationTest extends S3Integ private static File file; @BeforeAll - public static void setup() throws IOException { + public static void setup() throws Exception { + S3IntegrationTestBase.setUp(); createBucket(BUCKET); file = new RandomTempFile(OBJ_SIZE); s3.putObject(PutObjectRequest.builder() @@ -62,7 +62,7 @@ public static void cleanup() { } @Test - public void downloadToFile_pause_shouldReturnResumableDownload() throws InterruptedException { + void downloadToFile_pause_shouldReturnResumableDownload() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); Path path = RandomTempFile.randomUncreatedFile().toPath(); TestDownloadListener testDownloadListener = new TestDownloadListener(countDownLatch); diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/FileDownload.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/FileDownload.java index eddb1f359adc..20a50548a8b1 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/FileDownload.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/FileDownload.java @@ -18,12 +18,14 @@ import java.util.concurrent.CompletableFuture; import software.amazon.awssdk.annotations.SdkPreviewApi; import software.amazon.awssdk.annotations.SdkPublicApi; +import software.amazon.awssdk.annotations.ThreadSafe; /** * A download transfer of a single object from S3. */ @SdkPublicApi @SdkPreviewApi +@ThreadSafe public interface FileDownload extends ObjectTransfer { /** diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/ResumableFileDownload.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/ResumableFileDownload.java index 676348fccda9..39b4008503b6 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/ResumableFileDownload.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/ResumableFileDownload.java @@ -28,8 +28,9 @@ * An opaque token that holds the state and can be used to resume a * paused download operation. * - * TODO: should we just store GetObjectResponse? Do we actually need bytesTransferred since - * it can be inferred from file content length + * TODO: 1. should we just store GetObjectResponse? + * 2. consider providing a way to serialize and deserialize the token + * 3. Do we need to store file checksum? * * @see S3TransferManager#downloadFile(DownloadFileRequest) */ diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileDownload.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileDownload.java index d005ad0a4eab..7a97194109a3 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileDownload.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileDownload.java @@ -18,7 +18,6 @@ import java.time.Instant; import java.util.Objects; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicReference; import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.transfer.s3.CompletedFileDownload; @@ -36,14 +35,14 @@ public final class DefaultFileDownload implements FileDownload { private final CompletableFuture completionFuture; private final TransferProgress progress; private final DownloadFileRequest request; - private final AtomicReference resumableFileDownload; + private volatile ResumableFileDownload resumableFileDownload; + private final Object lock = new Object(); DefaultFileDownload(CompletableFuture completionFuture, TransferProgress progress, DownloadFileRequest request) { this.completionFuture = completionFuture; this.progress = progress; - this.resumableFileDownload = new AtomicReference<>(); this.request = request; } @@ -54,29 +53,33 @@ public TransferProgress progress() { @Override public ResumableFileDownload pause() { - log.trace(() -> "Start to pause " + request); - if (resumableFileDownload.get() == null) { - completionFuture.cancel(false); - - Instant lastModified = null; - Long totalBytesTransferred = null; - TransferProgressSnapshot snapshot = progress.snapshot(); - if (snapshot.sdkResponse().isPresent() && snapshot.sdkResponse().get() instanceof GetObjectResponse) { - GetObjectResponse getObjectResponse = (GetObjectResponse) snapshot.sdkResponse().get(); - lastModified = getObjectResponse.lastModified(); - totalBytesTransferred = getObjectResponse.contentLength(); - } + log.debug(() -> "Start to pause " + request); + if (resumableFileDownload == null) { + synchronized (lock) { + if (resumableFileDownload == null) { + completionFuture.cancel(true); + + Instant lastModified = null; + Long totalBytesTransferred = null; + TransferProgressSnapshot snapshot = progress.snapshot(); + if (snapshot.sdkResponse().isPresent() && snapshot.sdkResponse().get() instanceof GetObjectResponse) { + GetObjectResponse getObjectResponse = (GetObjectResponse) snapshot.sdkResponse().get(); + lastModified = getObjectResponse.lastModified(); + totalBytesTransferred = getObjectResponse.contentLength(); + } + + long length = request.destination().toFile().length(); + resumableFileDownload = ResumableFileDownload.builder() + .downloadFileRequest(request) + .lastModified(lastModified) + .bytesTransferred(length) + .transferSizeInBytes(totalBytesTransferred) + .build(); + } - long bytesTransferred = snapshot.bytesTransferred(); - ResumableFileDownload fileDownload = ResumableFileDownload.builder() - .downloadFileRequest(request) - .lastModified(lastModified) - .bytesTransferred(bytesTransferred) - .transferSizeInBytes(totalBytesTransferred) - .build(); - resumableFileDownload.set(fileDownload); + } } - return resumableFileDownload.get(); + return resumableFileDownload; } @Override diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/S3CrtAsyncHttpClient.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/S3CrtAsyncHttpClient.java index 96624d7303cc..502af9ac98a2 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/S3CrtAsyncHttpClient.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/S3CrtAsyncHttpClient.java @@ -38,6 +38,7 @@ import software.amazon.awssdk.http.async.SdkAsyncHttpClient; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.utils.AttributeMap; +import software.amazon.awssdk.utils.Logger; import software.amazon.awssdk.utils.http.SdkHttpUtils; /** @@ -46,7 +47,7 @@ */ @SdkInternalApi public final class S3CrtAsyncHttpClient implements SdkAsyncHttpClient { - + private static final Logger log = Logger.loggerFor(S3CrtAsyncHttpClient.class); private final S3Client crtS3Client; private final S3NativeClientConfiguration s3NativeClientConfiguration; @@ -95,7 +96,7 @@ public CompletableFuture execute(AsyncExecuteRequest asyncRequest) { .withEndpoint(s3NativeClientConfiguration.endpointOverride()); try (S3MetaRequest s3MetaRequest = crtS3Client.makeMetaRequest(requestOptions)) { - closeResourcesWhenComplete(executeFuture, s3MetaRequest); + closeResourcesWhenComplete(executeFuture, s3MetaRequest, responseHandler); } return executeFuture; @@ -124,9 +125,12 @@ private static S3MetaRequestOptions.MetaRequestType requestType(AsyncExecuteRequ } private static void closeResourcesWhenComplete(CompletableFuture executeFuture, - S3MetaRequest s3MetaRequest) { + S3MetaRequest s3MetaRequest, + S3CrtResponseHandlerAdapter responseHandler) { executeFuture.whenComplete((r, t) -> { if (executeFuture.isCancelled()) { + log.debug(() -> "The request is cancelled, cancelling meta request"); + responseHandler.cancelRequest(); s3MetaRequest.cancel(); } diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/S3CrtResponseHandlerAdapter.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/S3CrtResponseHandlerAdapter.java index 5aaf3c92cc0d..ac0395424b3e 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/S3CrtResponseHandlerAdapter.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/S3CrtResponseHandlerAdapter.java @@ -23,6 +23,7 @@ import software.amazon.awssdk.crt.CRT; import software.amazon.awssdk.crt.http.HttpHeader; import software.amazon.awssdk.crt.s3.S3MetaRequestResponseHandler; +import software.amazon.awssdk.http.SdkCancellationException; import software.amazon.awssdk.http.SdkHttpResponse; import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler; @@ -76,6 +77,12 @@ public void onFinished(int crtCode, int responseStatus, byte[] errorPayload) { } } + public void cancelRequest() { + SdkCancellationException sdkClientException = + new SdkCancellationException("request is cancelled"); + notifyError(sdkClientException); + } + private void handleError(int crtCode, int responseStatus, byte[] errorPayload) { if (isErrorResponse(responseStatus) && errorPayload != null) { publisher.deliverData(ByteBuffer.wrap(errorPayload)); @@ -85,13 +92,16 @@ private void handleError(int crtCode, int responseStatus, byte[] errorPayload) { SdkClientException sdkClientException = SdkClientException.create(String.format("Failed to send the request. CRT error code: %s", crtCode)); - resultFuture.completeExceptionally(sdkClientException); - - responseHandler.onError(sdkClientException); - publisher.notifyError(sdkClientException); + notifyError(sdkClientException); } } + private void notifyError(Exception exception) { + resultFuture.completeExceptionally(exception); + responseHandler.onError(exception); + publisher.notifyError(exception); + } + private static boolean isErrorResponse(int responseStatus) { return responseStatus != 0; } diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileDownloadTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileDownloadTest.java index df86cd546388..8b6a8e4c13ea 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileDownloadTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileDownloadTest.java @@ -17,10 +17,19 @@ import static org.assertj.core.api.Assertions.assertThat; -import java.nio.file.Paths; +import com.google.common.jimfs.Jimfs; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.FileSystem; +import java.nio.file.Files; import java.time.Instant; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.mockito.Mockito; import software.amazon.awssdk.services.s3.model.GetObjectRequest; @@ -33,12 +42,27 @@ class DefaultFileDownloadTest { private static final long OBJECT_CONTENT_LENGTH = 1024L; + private static FileSystem fileSystem; + private static File file; + + @BeforeAll + public static void setUp() throws IOException { + fileSystem = Jimfs.newFileSystem(); + file = File.createTempFile("test", UUID.randomUUID().toString()); + Files.write(file.toPath(), RandomStringUtils.random(2000).getBytes(StandardCharsets.UTF_8)); + } + + @AfterAll + public static void tearDown() throws IOException { + file.delete(); + } + @Test void equals_hashcode() { EqualsVerifier.forClass(DefaultFileDownload.class) .withNonnullFields("completionFuture", "progress") - .withIgnoredFields("resumableFileDownload") + .withIgnoredFields("resumableFileDownload", "lock") .verify(); } @@ -62,7 +86,7 @@ void pause_shouldReturnCorrectly() { ResumableFileDownload pause = fileDownload.pause(); assertThat(pause.downloadFileRequest()).isEqualTo(request); - assertThat(pause.bytesTransferred()).isEqualTo(1000L); + assertThat(pause.bytesTransferred()).isEqualTo(file.length()); assertThat(pause.lastModified()).isEqualTo(sdkResponse.lastModified()); assertThat(pause.transferSizeInBytes()).hasValue(sdkResponse.contentLength()); } @@ -87,7 +111,8 @@ void pause_transferAlreadyFinished_shouldReturnNormally() { transferProgress, getDownloadFileRequest()); ResumableFileDownload resumableFileDownload = fileDownload.pause(); - assertThat(resumableFileDownload.bytesTransferred()).isEqualTo(resumableFileDownload.transferSizeInBytes().get()); + assertThat(resumableFileDownload.bytesTransferred()).isEqualTo(file.length()); + assertThat(resumableFileDownload.transferSizeInBytes()).hasValue(OBJECT_CONTENT_LENGTH); } @Test @@ -113,9 +138,8 @@ void pauseTwice_shouldReturnTheSame() { private DownloadFileRequest getDownloadFileRequest() { return DownloadFileRequest.builder() - .destination(Paths.get(".")) + .destination(file) .getObjectRequest(GetObjectRequest.builder().key("KEY").bucket("BUCKET").build()) - .build(); } diff --git a/services-custom/s3-transfer-manager/src/test/resources/log4j2.properties b/services-custom/s3-transfer-manager/src/test/resources/log4j2.properties index e5e68dd2faa4..a15e5bbf99ab 100644 --- a/services-custom/s3-transfer-manager/src/test/resources/log4j2.properties +++ b/services-custom/s3-transfer-manager/src/test/resources/log4j2.properties @@ -28,6 +28,9 @@ rootLogger.appenderRef.stdout.ref = ConsoleAppender #logger.sdk.name = software.amazon.awssdk #logger.sdk.level = debug # +#logger.tm.name = software.amazon.awssdk.transfer.s3 +#logger.tm.level = debug +# #logger.request.name = software.amazon.awssdk.request #logger.request.level = debug # From acf9dca117c432a2bf5a240ed7112ef13c814eeb Mon Sep 17 00:00:00 2001 From: Zoe Wang <33073555+zoewangg@users.noreply.github.com> Date: Wed, 30 Mar 2022 13:52:51 -0700 Subject: [PATCH 5/5] Fix merging issue --- .../S3TransferManagerDownloadPauseResumeIntegrationTest.java | 2 +- .../awssdk/transfer/s3/internal/DefaultS3TransferManager.java | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerDownloadPauseResumeIntegrationTest.java b/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerDownloadPauseResumeIntegrationTest.java index 5350da9e4a01..25062976d5e8 100644 --- a/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerDownloadPauseResumeIntegrationTest.java +++ b/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerDownloadPauseResumeIntegrationTest.java @@ -82,7 +82,7 @@ void downloadToFile_pause_shouldReturnResumableDownload() throws InterruptedExce assertThat(pause.downloadFileRequest()).isEqualTo(request); assertThat(testDownloadListener.getObjectResponse).isNotNull(); assertThat(pause.lastModified()).isEqualTo(testDownloadListener.getObjectResponse.lastModified()); - assertThat(pause.bytesTransferred()).isLessThanOrEqualTo(path.toFile().length()); + assertThat(pause.bytesTransferred()).isEqualTo(path.toFile().length()); assertThat(pause.transferSizeInBytes()).hasValue(file.length()); assertThat(download.completionFuture()).isCancelled(); } diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultS3TransferManager.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultS3TransferManager.java index fc86ace30155..cf5b484067bd 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultS3TransferManager.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultS3TransferManager.java @@ -224,7 +224,6 @@ public FileDownload downloadFile(DownloadFileRequest downloadRequest) { FileTransformerConfiguration.defaultCreateOrReplaceExisting()); CompletableFuture returnFuture = new CompletableFuture<>(); - CompletableFuture downloadFuture = new CompletableFuture<>(); TransferProgressUpdater progressUpdater = new TransferProgressUpdater(downloadRequest, null); progressUpdater.transferInitiated(); @@ -248,7 +247,7 @@ public FileDownload downloadFile(DownloadFileRequest downloadRequest) { returnFuture.completeExceptionally(throwable); } - return new DefaultFileDownload(downloadFuture, progressUpdater.progress(), downloadRequest); + return new DefaultFileDownload(returnFuture, progressUpdater.progress(), downloadRequest); } @Override