diff --git a/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerDownloadPauseResumeIntegrationTest.java b/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerDownloadPauseResumeIntegrationTest.java new file mode 100644 index 000000000000..25062976d5e8 --- /dev/null +++ b/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerDownloadPauseResumeIntegrationTest.java @@ -0,0 +1,108 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.transfer.s3; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName; + +import java.io.File; +import java.nio.file.Path; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.core.SdkResponse; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.testutils.RandomTempFile; +import software.amazon.awssdk.transfer.s3.progress.TransferListener; + +public class S3TransferManagerDownloadPauseResumeIntegrationTest extends S3IntegrationTestBase { + private static final String BUCKET = temporaryBucketName(S3TransferManagerDownloadPauseResumeIntegrationTest.class); + private static final String KEY = "key"; + private static final int OBJ_SIZE = 16 * 1024 * 1024; + private static S3TransferManager tm; + private static File file; + + @BeforeAll + public static void setup() throws Exception { + S3IntegrationTestBase.setUp(); + createBucket(BUCKET); + file = new RandomTempFile(OBJ_SIZE); + s3.putObject(PutObjectRequest.builder() + .bucket(BUCKET) + .key(KEY) + .build(), file.toPath()); + tm = S3TransferManager.builder() + .s3ClientConfiguration(b -> b.region(DEFAULT_REGION) + .credentialsProvider(CREDENTIALS_PROVIDER_CHAIN)) + .build(); + } + + @AfterAll + public static void cleanup() { + deleteBucketAndAllContents(BUCKET); + tm.close(); + S3IntegrationTestBase.cleanUp(); + } + + @Test + void downloadToFile_pause_shouldReturnResumableDownload() throws InterruptedException { + CountDownLatch countDownLatch = new CountDownLatch(1); + Path path = RandomTempFile.randomUncreatedFile().toPath(); + TestDownloadListener testDownloadListener = new TestDownloadListener(countDownLatch); + DownloadFileRequest request = DownloadFileRequest.builder() + .getObjectRequest(b -> b.bucket(BUCKET).key(KEY)) + .destination(path) + .overrideConfiguration(b -> b + .addListener(testDownloadListener)) + .build(); + FileDownload download = + tm.downloadFile(request); + boolean count = countDownLatch.await(10, TimeUnit.SECONDS); + if (!count) { + throw new AssertionError("No data has been transferred within 5 seconds"); + } + ResumableFileDownload pause = download.pause(); + assertThat(pause.downloadFileRequest()).isEqualTo(request); + assertThat(testDownloadListener.getObjectResponse).isNotNull(); + assertThat(pause.lastModified()).isEqualTo(testDownloadListener.getObjectResponse.lastModified()); + assertThat(pause.bytesTransferred()).isEqualTo(path.toFile().length()); + assertThat(pause.transferSizeInBytes()).hasValue(file.length()); + assertThat(download.completionFuture()).isCancelled(); + } + + private static final class TestDownloadListener implements TransferListener { + private final CountDownLatch countDownLatch; + private GetObjectResponse getObjectResponse; + + private TestDownloadListener(CountDownLatch countDownLatch) { + this.countDownLatch = countDownLatch; + } + + @Override + public void bytesTransferred(Context.BytesTransferred context) { + Optional sdkResponse = context.progressSnapshot().sdkResponse(); + if (sdkResponse.isPresent() && sdkResponse.get() instanceof GetObjectResponse) { + getObjectResponse = (GetObjectResponse) sdkResponse.get(); + } + countDownLatch.countDown(); + } + } + +} diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/FileDownload.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/FileDownload.java index 578124a55e3a..20a50548a8b1 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/FileDownload.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/FileDownload.java @@ -18,14 +18,24 @@ import java.util.concurrent.CompletableFuture; import software.amazon.awssdk.annotations.SdkPreviewApi; import software.amazon.awssdk.annotations.SdkPublicApi; +import software.amazon.awssdk.annotations.ThreadSafe; /** * A download transfer of a single object from S3. */ @SdkPublicApi @SdkPreviewApi +@ThreadSafe public interface FileDownload extends ObjectTransfer { - + + /** + * Pause the current download operation and returns the information that can + * be used to resume the download at a later time. + * + * @return {@link ResumableFileDownload} that can be used to resume the download + */ + ResumableFileDownload pause(); + @Override CompletableFuture completionFuture(); } diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/ResumableFileDownload.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/ResumableFileDownload.java new file mode 100644 index 000000000000..39b4008503b6 --- /dev/null +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/ResumableFileDownload.java @@ -0,0 +1,213 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.transfer.s3; + +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; +import software.amazon.awssdk.annotations.SdkPublicApi; +import software.amazon.awssdk.utils.ToString; +import software.amazon.awssdk.utils.Validate; +import software.amazon.awssdk.utils.builder.CopyableBuilder; +import software.amazon.awssdk.utils.builder.ToCopyableBuilder; + +/** + * An opaque token that holds the state and can be used to resume a + * paused download operation. + * + * TODO: 1. should we just store GetObjectResponse? + * 2. consider providing a way to serialize and deserialize the token + * 3. Do we need to store file checksum? + * + * @see S3TransferManager#downloadFile(DownloadFileRequest) + */ +@SdkPublicApi +public final class ResumableFileDownload implements ResumableTransfer, + ToCopyableBuilder { + private final DownloadFileRequest downloadFileRequest; + private final long bytesTransferred; + private final Instant lastModified; + private final Long transferSizeInBytes; + + private ResumableFileDownload(DefaultBuilder builder) { + this.downloadFileRequest = Validate.paramNotNull(builder.downloadFileRequest, "downloadFileRequest"); + this.bytesTransferred = builder.bytesTransferred == null ? 0 : builder.bytesTransferred; + this.lastModified = builder.lastModified; + this.transferSizeInBytes = builder.transferSizeInBytes; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ResumableFileDownload that = (ResumableFileDownload) o; + + if (bytesTransferred != that.bytesTransferred) { + return false; + } + if (!downloadFileRequest.equals(that.downloadFileRequest)) { + return false; + } + if (!Objects.equals(lastModified, that.lastModified)) { + return false; + } + return Objects.equals(transferSizeInBytes, that.transferSizeInBytes); + } + + @Override + public int hashCode() { + int result = downloadFileRequest.hashCode(); + result = 31 * result + (int) (bytesTransferred ^ (bytesTransferred >>> 32)); + result = 31 * result + (lastModified != null ? lastModified.hashCode() : 0); + result = 31 * result + (transferSizeInBytes != null ? transferSizeInBytes.hashCode() : 0); + return result; + } + + @Override + public String toString() { + return ToString.builder("ResumableFileDownload") + .add("downloadFileRequest", downloadFileRequest) + .add("bytesTransferred", bytesTransferred) + .add("lastModified", lastModified) + .add("transferSizeInBytes", transferSizeInBytes) + .build(); + } + + public static Builder builder() { + return new DefaultBuilder(); + } + + /** + * @return the {@link DownloadFileRequest} to resume + */ + public DownloadFileRequest downloadFileRequest() { + return downloadFileRequest; + } + + /** + * Retrieve the number of bytes that have been transferred. + * @return the number of bytes + */ + public long bytesTransferred() { + return bytesTransferred; + } + + /** + * Last modified time on Amazon S3 for this object. + */ + public Instant lastModified() { + return lastModified; + } + + /** + * The total size of the transfer in bytes, or {@link Optional#empty()} if unknown + * + * @return the optional total size of the transfer. + */ + public Optional transferSizeInBytes() { + return Optional.ofNullable(transferSizeInBytes); + } + + @Override + public Builder toBuilder() { + return new DefaultBuilder(this); + } + + public interface Builder extends CopyableBuilder { + + /** + * Sets the download file request + * + * @param downloadFileRequest the download file request + * @return a reference to this object so that method calls can be chained together. + */ + Builder downloadFileRequest(DownloadFileRequest downloadFileRequest); + + /** + * Sets the number of bytes transferred + * + * @param bytesTransferred the number of bytes transferred + * @return a reference to this object so that method calls can be chained together. + */ + Builder bytesTransferred(Long bytesTransferred); + + /** + * Sets the total transfer size in bytes + * @param transferSizeInBytes the transfer size in bytes + * @return a reference to this object so that method calls can be chained together. + */ + Builder transferSizeInBytes(Long transferSizeInBytes); + + /** + * Sets the last modified time of the object + * + * @param lastModified the last modified time of the object + * @return a reference to this object so that method calls can be chained together. + */ + Builder lastModified(Instant lastModified); + } + + private static final class DefaultBuilder implements Builder { + private DownloadFileRequest downloadFileRequest; + private Long bytesTransferred; + private Instant lastModified; + private Long transferSizeInBytes; + + private DefaultBuilder() { + + } + + private DefaultBuilder(ResumableFileDownload persistableFileDownload) { + this.downloadFileRequest = persistableFileDownload.downloadFileRequest; + this.bytesTransferred = persistableFileDownload.bytesTransferred; + this.lastModified = persistableFileDownload.lastModified; + } + + @Override + public Builder downloadFileRequest(DownloadFileRequest downloadFileRequest) { + this.downloadFileRequest = downloadFileRequest; + return this; + } + + @Override + public Builder bytesTransferred(Long bytesTransferred) { + this.bytesTransferred = bytesTransferred; + return this; + } + + @Override + public Builder transferSizeInBytes(Long transferSizeInBytes) { + this.transferSizeInBytes = transferSizeInBytes; + return this; + } + + @Override + public Builder lastModified(Instant lastModified) { + this.lastModified = lastModified; + return this; + } + + @Override + public ResumableFileDownload build() { + return new ResumableFileDownload(this); + } + } +} diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/ResumableTransfer.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/ResumableTransfer.java new file mode 100644 index 000000000000..0ed948aabcf3 --- /dev/null +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/ResumableTransfer.java @@ -0,0 +1,30 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.transfer.s3; + +import software.amazon.awssdk.annotations.SdkPreviewApi; +import software.amazon.awssdk.annotations.SdkPublicApi; + +/** + * Contains the information of a pausible upload or download; such + * information can be used to resume the upload or download later on + * + * @see FileDownload#pause() + */ +@SdkPublicApi +@SdkPreviewApi +public interface ResumableTransfer { +} diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/S3TransferManager.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/S3TransferManager.java index cb6e32e4d1e8..03432bebf52d 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/S3TransferManager.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/S3TransferManager.java @@ -121,6 +121,49 @@ default FileDownload downloadFile(Consumer request) return downloadFile(DownloadFileRequest.builder().applyMutation(request).build()); } + /** + * Resumes a download operation. This download operation uses the same configuration as the original download. Any data + * already fetched will be skipped, and only the remaining data is retrieved from Amazon S3. + * + *

+ * Usage Example: + *

+     * {@code
+     * // Initiate the transfer
+     * FileDownload download =
+     *     tm.downloadFile(d -> d.getObjectRequest(g -> g.bucket("bucket").key("key"))
+     *                           .destination(Paths.get("myFile.txt")));
+     *
+     * // Pause the download
+     * ResumableFileDownload resumableFileDownload = download.pause();
+     *
+     * // Resume the download
+     * FileDownload resumedDownload = tm.resumeDownloadFile(resumableFileDownload);
+     *
+     * // Wait for the transfer to complete
+     * resumedDownload.completionFuture().join();
+     * }
+     * 
+ * + * @param persistableFileDownload the download to resume. + * @return A new {@code FileDownload} object to use to check the state of the download. + * @see #downloadFile(DownloadFileRequest) + * @see FileDownload#pause() + */ + default FileDownload resumeDownloadFile(ResumableFileDownload persistableFileDownload) { + throw new UnsupportedOperationException(); + } + + /** + * This is a convenience method that creates an instance of the {@link ResumableFileDownload} builder, avoiding the need to + * create one manually via {@link ResumableFileDownload#builder()}. + * + * @see #resumeDownloadFile(ResumableFileDownload) + */ + default FileDownload resumeDownloadFile(Consumer persistableFileDownload) { + return resumeDownloadFile(ResumableFileDownload.builder().applyMutation(persistableFileDownload).build()); + } + /** * Download an object identified by the bucket and key from S3 through the given {@link AsyncResponseTransformer}. For * downloading to a file, you may use {@link #downloadFile(DownloadFileRequest)} instead. diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileDownload.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileDownload.java index 19b6519cd07b..7a97194109a3 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileDownload.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileDownload.java @@ -15,24 +15,35 @@ package software.amazon.awssdk.transfer.s3.internal; +import java.time.Instant; import java.util.Objects; import java.util.concurrent.CompletableFuture; import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.transfer.s3.CompletedFileDownload; +import software.amazon.awssdk.transfer.s3.DownloadFileRequest; import software.amazon.awssdk.transfer.s3.FileDownload; +import software.amazon.awssdk.transfer.s3.ResumableFileDownload; import software.amazon.awssdk.transfer.s3.progress.TransferProgress; +import software.amazon.awssdk.transfer.s3.progress.TransferProgressSnapshot; +import software.amazon.awssdk.utils.Logger; import software.amazon.awssdk.utils.ToString; @SdkInternalApi public final class DefaultFileDownload implements FileDownload { - + private static final Logger log = Logger.loggerFor(FileDownload.class); private final CompletableFuture completionFuture; private final TransferProgress progress; + private final DownloadFileRequest request; + private volatile ResumableFileDownload resumableFileDownload; + private final Object lock = new Object(); DefaultFileDownload(CompletableFuture completionFuture, - TransferProgress progress) { + TransferProgress progress, + DownloadFileRequest request) { this.completionFuture = completionFuture; this.progress = progress; + this.request = request; } @Override @@ -40,6 +51,36 @@ public TransferProgress progress() { return progress; } + @Override + public ResumableFileDownload pause() { + log.debug(() -> "Start to pause " + request); + if (resumableFileDownload == null) { + synchronized (lock) { + if (resumableFileDownload == null) { + completionFuture.cancel(true); + + Instant lastModified = null; + Long totalBytesTransferred = null; + TransferProgressSnapshot snapshot = progress.snapshot(); + if (snapshot.sdkResponse().isPresent() && snapshot.sdkResponse().get() instanceof GetObjectResponse) { + GetObjectResponse getObjectResponse = (GetObjectResponse) snapshot.sdkResponse().get(); + lastModified = getObjectResponse.lastModified(); + totalBytesTransferred = getObjectResponse.contentLength(); + } + + long length = request.destination().toFile().length(); + resumableFileDownload = ResumableFileDownload.builder() + .downloadFileRequest(request) + .lastModified(lastModified) + .bytesTransferred(length) + .transferSizeInBytes(totalBytesTransferred) + .build(); + } + + } + } + return resumableFileDownload; + } @Override public CompletableFuture completionFuture() { @@ -60,12 +101,18 @@ public boolean equals(Object o) { if (!Objects.equals(completionFuture, that.completionFuture)) { return false; } + + if (!Objects.equals(request, that.request)) { + return false; + } + return Objects.equals(progress, that.progress); } @Override public int hashCode() { int result = completionFuture != null ? completionFuture.hashCode() : 0; + result = 31 * result + (request != null ? request.hashCode() : 0); result = 31 * result + (progress != null ? progress.hashCode() : 0); return result; } @@ -75,6 +122,7 @@ public String toString() { return ToString.builder("DefaultFileDownload") .add("completionFuture", completionFuture) .add("progress", progress) + .add("request", request) .build(); } } diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultS3TransferManager.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultS3TransferManager.java index d38aa71ac711..cf5b484067bd 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultS3TransferManager.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultS3TransferManager.java @@ -247,7 +247,7 @@ public FileDownload downloadFile(DownloadFileRequest downloadRequest) { returnFuture.completeExceptionally(throwable); } - return new DefaultFileDownload(returnFuture, progressUpdater.progress()); + return new DefaultFileDownload(returnFuture, progressUpdater.progress(), downloadRequest); } @Override diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/S3CrtAsyncHttpClient.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/S3CrtAsyncHttpClient.java index 96624d7303cc..502af9ac98a2 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/S3CrtAsyncHttpClient.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/S3CrtAsyncHttpClient.java @@ -38,6 +38,7 @@ import software.amazon.awssdk.http.async.SdkAsyncHttpClient; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.utils.AttributeMap; +import software.amazon.awssdk.utils.Logger; import software.amazon.awssdk.utils.http.SdkHttpUtils; /** @@ -46,7 +47,7 @@ */ @SdkInternalApi public final class S3CrtAsyncHttpClient implements SdkAsyncHttpClient { - + private static final Logger log = Logger.loggerFor(S3CrtAsyncHttpClient.class); private final S3Client crtS3Client; private final S3NativeClientConfiguration s3NativeClientConfiguration; @@ -95,7 +96,7 @@ public CompletableFuture execute(AsyncExecuteRequest asyncRequest) { .withEndpoint(s3NativeClientConfiguration.endpointOverride()); try (S3MetaRequest s3MetaRequest = crtS3Client.makeMetaRequest(requestOptions)) { - closeResourcesWhenComplete(executeFuture, s3MetaRequest); + closeResourcesWhenComplete(executeFuture, s3MetaRequest, responseHandler); } return executeFuture; @@ -124,9 +125,12 @@ private static S3MetaRequestOptions.MetaRequestType requestType(AsyncExecuteRequ } private static void closeResourcesWhenComplete(CompletableFuture executeFuture, - S3MetaRequest s3MetaRequest) { + S3MetaRequest s3MetaRequest, + S3CrtResponseHandlerAdapter responseHandler) { executeFuture.whenComplete((r, t) -> { if (executeFuture.isCancelled()) { + log.debug(() -> "The request is cancelled, cancelling meta request"); + responseHandler.cancelRequest(); s3MetaRequest.cancel(); } diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/S3CrtResponseHandlerAdapter.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/S3CrtResponseHandlerAdapter.java index 5aaf3c92cc0d..ac0395424b3e 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/S3CrtResponseHandlerAdapter.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/S3CrtResponseHandlerAdapter.java @@ -23,6 +23,7 @@ import software.amazon.awssdk.crt.CRT; import software.amazon.awssdk.crt.http.HttpHeader; import software.amazon.awssdk.crt.s3.S3MetaRequestResponseHandler; +import software.amazon.awssdk.http.SdkCancellationException; import software.amazon.awssdk.http.SdkHttpResponse; import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler; @@ -76,6 +77,12 @@ public void onFinished(int crtCode, int responseStatus, byte[] errorPayload) { } } + public void cancelRequest() { + SdkCancellationException sdkClientException = + new SdkCancellationException("request is cancelled"); + notifyError(sdkClientException); + } + private void handleError(int crtCode, int responseStatus, byte[] errorPayload) { if (isErrorResponse(responseStatus) && errorPayload != null) { publisher.deliverData(ByteBuffer.wrap(errorPayload)); @@ -85,13 +92,16 @@ private void handleError(int crtCode, int responseStatus, byte[] errorPayload) { SdkClientException sdkClientException = SdkClientException.create(String.format("Failed to send the request. CRT error code: %s", crtCode)); - resultFuture.completeExceptionally(sdkClientException); - - responseHandler.onError(sdkClientException); - publisher.notifyError(sdkClientException); + notifyError(sdkClientException); } } + private void notifyError(Exception exception) { + resultFuture.completeExceptionally(exception); + responseHandler.onError(exception); + publisher.notifyError(exception); + } + private static boolean isErrorResponse(int responseStatus) { return responseStatus != 0; } diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/DefaultTransferProgressSnapshot.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/DefaultTransferProgressSnapshot.java index 0a713376a3df..1b63bfbdcab6 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/DefaultTransferProgressSnapshot.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/DefaultTransferProgressSnapshot.java @@ -17,6 +17,7 @@ import java.util.Optional; import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.core.SdkResponse; import software.amazon.awssdk.transfer.s3.progress.TransferProgressSnapshot; import software.amazon.awssdk.utils.ToString; import software.amazon.awssdk.utils.Validate; @@ -33,6 +34,7 @@ public final class DefaultTransferProgressSnapshot private final long bytesTransferred; private final Long transferSizeInBytes; + private final SdkResponse sdkResponse; private DefaultTransferProgressSnapshot(Builder builder) { if (builder.transferSizeInBytes != null) { @@ -43,6 +45,7 @@ private DefaultTransferProgressSnapshot(Builder builder) { } this.bytesTransferred = Validate.isNotNegative(builder.bytesTransferred, "bytesTransferred"); this.transferSizeInBytes = builder.transferSizeInBytes; + this.sdkResponse = builder.sdkResponse; } public static Builder builder() { @@ -64,6 +67,11 @@ public Optional transferSizeInBytes() { return Optional.ofNullable(transferSizeInBytes); } + @Override + public Optional sdkResponse() { + return Optional.ofNullable(sdkResponse); + } + @Override public Optional ratioTransferred() { return transferSizeInBytes() @@ -81,20 +89,22 @@ public String toString() { return ToString.builder("TransferProgressSnapshot") .add("bytesTransferred", bytesTransferred) .add("transferSizeInBytes", transferSizeInBytes) + .add("sdkResponse", sdkResponse) .build(); } public static final class Builder implements CopyableBuilder { private long bytesTransferred = 0L; private Long transferSizeInBytes; + private SdkResponse sdkResponse; private Builder() { - super(); } private Builder(DefaultTransferProgressSnapshot snapshot) { this.bytesTransferred = snapshot.bytesTransferred; this.transferSizeInBytes = snapshot.transferSizeInBytes; + this.sdkResponse = snapshot.sdkResponse; } public Builder bytesTransferred(long bytesTransferred) { @@ -115,6 +125,11 @@ public Long getTransferSizeInBytes() { return transferSizeInBytes; } + public Builder sdkResponse(SdkResponse sdkResponse) { + this.sdkResponse = sdkResponse; + return this; + } + @Override public DefaultTransferProgressSnapshot build() { return new DefaultTransferProgressSnapshot(this); diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferListenerContext.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferListenerContext.java index 19f409db07d5..d717af97af0f 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferListenerContext.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferListenerContext.java @@ -85,7 +85,6 @@ public static final class Builder implements CopyableBuilder endOfStreamFuture; - public TransferProgressUpdater(TransferObjectRequest request, AsyncRequestBody requestBody) { + public TransferProgressUpdater(TransferObjectRequest request, + AsyncRequestBody requestBody) { DefaultTransferProgressSnapshot.Builder snapshotBuilder = DefaultTransferProgressSnapshot.builder(); getContentLengthSafe(requestBody).ifPresent(snapshotBuilder::transferSizeInBytes); TransferProgressSnapshot snapshot = snapshotBuilder.build(); @@ -53,9 +53,10 @@ public TransferProgressUpdater(TransferObjectRequest request, AsyncRequestBody r .request(request) .progressSnapshot(snapshot) .build(); - listeners = new TransferListenerInvoker(request.overrideConfiguration() - .map(TransferRequestOverrideConfiguration::listeners) - .orElseGet(Collections::emptyList)); + + listenerInvoker = new TransferListenerInvoker(request.overrideConfiguration() + .map(TransferRequestOverrideConfiguration::listeners) + .orElseGet(Collections::emptyList)); endOfStreamFuture = new CompletableFuture<>(); } @@ -64,7 +65,7 @@ public TransferProgress progress() { } public void transferInitiated() { - listeners.transferInitiated(context); + listenerInvoker.transferInitiated(context); } public AsyncRequestBody wrapRequestBody(AsyncRequestBody requestBody) { @@ -101,7 +102,7 @@ public AsyncResponseTransformer wrapRespon @Override public void transformerOnResponse(GetObjectResponse response) { if (response.contentLength() != null) { - progress.updateAndGet(b -> b.transferSizeInBytes(response.contentLength())); + progress.updateAndGet(b -> b.transferSizeInBytes(response.contentLength()).sdkResponse(response)); } } @@ -140,7 +141,7 @@ private void incrementBytesTransferred(int numBytes) { TransferProgressSnapshot snapshot = progress.updateAndGet(b -> { b.bytesTransferred(b.getBytesTransferred() + numBytes); }); - listeners.bytesTransferred(context.copy(b -> b.progressSnapshot(snapshot))); + listenerInvoker.bytesTransferred(context.copy(b -> b.progressSnapshot(snapshot))); } public void registerCompletion(CompletableFuture future) { @@ -160,7 +161,7 @@ public void registerCompletion(CompletableFuture { + listenerInvoker.transferComplete(context.copy(b -> { TransferProgressSnapshot snapshot = progress.snapshot(); b.progressSnapshot(snapshot); b.completedTransfer(r); @@ -168,12 +169,12 @@ private void transferComplete(CompletedObjectTransfer r) { } private void transferFailed(Throwable t) { - listeners.transferFailed(TransferListenerFailedContext.builder() - .transferContext(context.copy(b -> { - b.progressSnapshot(progress.snapshot()); - })) - .exception(t) - .build()); + listenerInvoker.transferFailed(TransferListenerFailedContext.builder() + .transferContext( + context.copy( + b -> b.progressSnapshot(progress.snapshot()))) + .exception(t) + .build()); } private static Optional getContentLengthSafe(AsyncRequestBody requestBody) { diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/progress/TransferProgressSnapshot.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/progress/TransferProgressSnapshot.java index 03eeb66750a2..52117a4fcc6d 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/progress/TransferProgressSnapshot.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/progress/TransferProgressSnapshot.java @@ -20,11 +20,14 @@ import software.amazon.awssdk.annotations.SdkPreviewApi; import software.amazon.awssdk.annotations.SdkPublicApi; import software.amazon.awssdk.annotations.ThreadSafe; +import software.amazon.awssdk.core.SdkResponse; import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.PutObjectResponse; import software.amazon.awssdk.transfer.s3.Download; import software.amazon.awssdk.transfer.s3.FileUpload; import software.amazon.awssdk.transfer.s3.S3TransferManager; import software.amazon.awssdk.transfer.s3.TransferRequest; +import software.amazon.awssdk.transfer.s3.Upload; /** * {@link TransferProgressSnapshot} is an immutable, point-in-time representation of the progress of a given transfer @@ -65,6 +68,16 @@ public interface TransferProgressSnapshot { */ Optional transferSizeInBytes(); + /** + * The SDK response, or {@link Optional#empty()} if unknown. + * + *

+ * In the case of {@link Download}, the response is {@link GetObjectResponse}, and the response is known before + * it starts streaming. In the case of {@link Upload}, the response is {@link PutObjectResponse}, and the response is not + * known until streaming finishes. + */ + Optional sdkResponse(); + /** * The ratio of the {@link #transferSizeInBytes()} that has been transferred so far, or {@link Optional#empty()} if unknown. * This method depends on the {@link #transferSizeInBytes()} being known in order to return non-empty. diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileDownloadTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileDownloadTest.java index be96e0b8bd0a..8b6a8e4c13ea 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileDownloadTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileDownloadTest.java @@ -15,16 +15,139 @@ package software.amazon.awssdk.transfer.s3.internal; +import static org.assertj.core.api.Assertions.assertThat; + +import com.google.common.jimfs.Jimfs; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.FileSystem; +import java.nio.file.Files; +import java.time.Instant; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.transfer.s3.CompletedFileDownload; +import software.amazon.awssdk.transfer.s3.DownloadFileRequest; +import software.amazon.awssdk.transfer.s3.ResumableFileDownload; +import software.amazon.awssdk.transfer.s3.internal.progress.DefaultTransferProgressSnapshot; +import software.amazon.awssdk.transfer.s3.progress.TransferProgress; + +class DefaultFileDownloadTest { + private static final long OBJECT_CONTENT_LENGTH = 1024L; + private static FileSystem fileSystem; + private static File file; + + @BeforeAll + public static void setUp() throws IOException { + fileSystem = Jimfs.newFileSystem(); + file = File.createTempFile("test", UUID.randomUUID().toString()); + Files.write(file.toPath(), RandomStringUtils.random(2000).getBytes(StandardCharsets.UTF_8)); + } + + @AfterAll + public static void tearDown() throws IOException { + file.delete(); + } -public class DefaultFileDownloadTest { @Test - public void equals_hashcode() { + void equals_hashcode() { EqualsVerifier.forClass(DefaultFileDownload.class) .withNonnullFields("completionFuture", "progress") + .withIgnoredFields("resumableFileDownload", "lock") .verify(); } + @Test + void pause_shouldReturnCorrectly() { + CompletableFuture future = + new CompletableFuture<>(); + TransferProgress transferProgress = Mockito.mock(TransferProgress.class); + GetObjectResponse sdkResponse = getObjectResponse(); + + Mockito.when(transferProgress.snapshot()).thenReturn(DefaultTransferProgressSnapshot.builder() + .bytesTransferred(1000L) + .sdkResponse(sdkResponse) + .build()); + + DownloadFileRequest request = getDownloadFileRequest(); + + DefaultFileDownload fileDownload = new DefaultFileDownload(future, + transferProgress, + request); + + ResumableFileDownload pause = fileDownload.pause(); + assertThat(pause.downloadFileRequest()).isEqualTo(request); + assertThat(pause.bytesTransferred()).isEqualTo(file.length()); + assertThat(pause.lastModified()).isEqualTo(sdkResponse.lastModified()); + assertThat(pause.transferSizeInBytes()).hasValue(sdkResponse.contentLength()); + } + + @Test + void pause_transferAlreadyFinished_shouldReturnNormally() { + GetObjectResponse getObjectResponse = GetObjectResponse.builder() + .contentLength(OBJECT_CONTENT_LENGTH) + .build(); + CompletableFuture future = + CompletableFuture.completedFuture(CompletedFileDownload.builder() + .response(getObjectResponse) + .build()); + TransferProgress transferProgress = Mockito.mock(TransferProgress.class); + Mockito.when(transferProgress.snapshot()).thenReturn(DefaultTransferProgressSnapshot.builder() + .bytesTransferred(OBJECT_CONTENT_LENGTH) + .transferSizeInBytes(OBJECT_CONTENT_LENGTH) + .sdkResponse(getObjectResponse) + .build()); + + DefaultFileDownload fileDownload = new DefaultFileDownload(future, + transferProgress, + getDownloadFileRequest()); + ResumableFileDownload resumableFileDownload = fileDownload.pause(); + assertThat(resumableFileDownload.bytesTransferred()).isEqualTo(file.length()); + assertThat(resumableFileDownload.transferSizeInBytes()).hasValue(OBJECT_CONTENT_LENGTH); + } + + @Test + void pauseTwice_shouldReturnTheSame() { + CompletableFuture future = + new CompletableFuture<>(); + TransferProgress transferProgress = Mockito.mock(TransferProgress.class); + Mockito.when(transferProgress.snapshot()).thenReturn(DefaultTransferProgressSnapshot.builder() + .bytesTransferred(1000L) + .build()); + DownloadFileRequest request = getDownloadFileRequest(); + + DefaultFileDownload fileDownload = new DefaultFileDownload(future, + transferProgress, + request); + + ResumableFileDownload resumableFileDownload = fileDownload.pause(); + ResumableFileDownload resumableFileDownload2 = fileDownload.pause(); + + assertThat(resumableFileDownload).isEqualTo(resumableFileDownload2); + + } + + private DownloadFileRequest getDownloadFileRequest() { + return DownloadFileRequest.builder() + .destination(file) + .getObjectRequest(GetObjectRequest.builder().key("KEY").bucket("BUCKET").build()) + .build(); + } + + private GetObjectResponse getObjectResponse() { + return GetObjectResponse.builder() + .lastModified(Instant.now()) + .contentLength(OBJECT_CONTENT_LENGTH) + .build(); + } + } \ No newline at end of file diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperParameterizedTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperParameterizedTest.java index c5c18e34c0dc..8c9ffa76cb44 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperParameterizedTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperParameterizedTest.java @@ -28,6 +28,7 @@ import java.nio.file.FileSystem; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.Paths; import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -40,6 +41,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; import org.mockito.ArgumentCaptor; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.transfer.s3.CompletedDirectoryDownload; import software.amazon.awssdk.transfer.s3.CompletedFileDownload; @@ -142,7 +144,10 @@ private static DefaultFileDownload completedDownload() { return new DefaultFileDownload(CompletableFuture.completedFuture(CompletedFileDownload.builder() .response(GetObjectResponse.builder().build()) .build()), - new DefaultTransferProgress(DefaultTransferProgressSnapshot.builder().build())); + new DefaultTransferProgress(DefaultTransferProgressSnapshot.builder().build()), + DownloadFileRequest.builder().getObjectRequest(GetObjectRequest.builder().build()) + .destination(Paths.get(".")) + .build()); } private static void verifyDestinationPathForSingleDownload(FileSystem jimfs, String delimiter, String[] keys, diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperTest.java index cbd9df604ae1..b847efcbb772 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperTest.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.nio.file.FileSystem; import java.nio.file.Path; +import java.nio.file.Paths; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -37,6 +38,7 @@ import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.transfer.s3.CompletedDirectoryDownload; import software.amazon.awssdk.transfer.s3.CompletedFileDownload; @@ -174,7 +176,7 @@ private FileDownload newFailedDownload(SdkClientException exception) { private FileDownload newDownload(CompletableFuture future) { return new DefaultFileDownload(future, - new DefaultTransferProgress(DefaultTransferProgressSnapshot.builder().build()) - ); + new DefaultTransferProgress(DefaultTransferProgressSnapshot.builder().build()), + DownloadFileRequest.builder().destination(Paths.get(".")).getObjectRequest(GetObjectRequest.builder().build()).build()); } } diff --git a/services-custom/s3-transfer-manager/src/test/resources/log4j2.properties b/services-custom/s3-transfer-manager/src/test/resources/log4j2.properties index e5e68dd2faa4..a15e5bbf99ab 100644 --- a/services-custom/s3-transfer-manager/src/test/resources/log4j2.properties +++ b/services-custom/s3-transfer-manager/src/test/resources/log4j2.properties @@ -28,6 +28,9 @@ rootLogger.appenderRef.stdout.ref = ConsoleAppender #logger.sdk.name = software.amazon.awssdk #logger.sdk.level = debug # +#logger.tm.name = software.amazon.awssdk.transfer.s3 +#logger.tm.level = debug +# #logger.request.name = software.amazon.awssdk.request #logger.request.level = debug #