diff --git a/.changes/next-release/bugfix-AWSS3TransferManager-6d575a5.json b/.changes/next-release/bugfix-AWSS3TransferManager-6d575a5.json new file mode 100644 index 000000000000..2dc05dbc2158 --- /dev/null +++ b/.changes/next-release/bugfix-AWSS3TransferManager-6d575a5.json @@ -0,0 +1,6 @@ +{ + "type": "bugfix", + "category": "AWS S3 Transfer Manager", + "contributor": "", + "description": "Require setting the bytes transferred on transfer progress snapshots. This prevents programming bugs where the caller forgets to set the value and it gets defaulted to 0." +} diff --git a/.changes/next-release/bugfix-AWSS3TransferManager-e849d27.json b/.changes/next-release/bugfix-AWSS3TransferManager-e849d27.json new file mode 100644 index 000000000000..b60ecae321a0 --- /dev/null +++ b/.changes/next-release/bugfix-AWSS3TransferManager-e849d27.json @@ -0,0 +1,6 @@ +{ + "type": "bugfix", + "category": "AWS S3 Transfer Manager", + "contributor": "", + "description": "Allow pausing a resumed download, even if the resumed download hasn't started." +} diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultS3TransferManager.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultS3TransferManager.java index f30596e748ae..129395ced011 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultS3TransferManager.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultS3TransferManager.java @@ -18,6 +18,7 @@ import static software.amazon.awssdk.transfer.s3.internal.utils.ResumableRequestConverter.toDownloadFileRequestAndTransformer; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.annotations.SdkTestInternalApi; import software.amazon.awssdk.arns.Arn; @@ -33,6 +34,7 @@ import software.amazon.awssdk.services.s3.model.CopyObjectResponse; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.HeadObjectResponse; import software.amazon.awssdk.services.s3.model.PutObjectResponse; import software.amazon.awssdk.transfer.s3.S3TransferManager; import software.amazon.awssdk.transfer.s3.config.S3TransferManagerOverrideConfiguration; @@ -43,6 +45,7 @@ import software.amazon.awssdk.transfer.s3.internal.model.DefaultFileDownload; import software.amazon.awssdk.transfer.s3.internal.model.DefaultFileUpload; import software.amazon.awssdk.transfer.s3.internal.model.DefaultUpload; +import software.amazon.awssdk.transfer.s3.internal.progress.ResumeTransferProgress; import software.amazon.awssdk.transfer.s3.internal.progress.TransferProgressUpdater; import software.amazon.awssdk.transfer.s3.model.CompletedCopy; import software.amazon.awssdk.transfer.s3.model.CompletedDownload; @@ -243,8 +246,7 @@ public FileDownload downloadFile(DownloadFileRequest downloadRequest) { CompletableFuture returnFuture = new CompletableFuture<>(); TransferProgressUpdater progressUpdater = doDownloadFile(downloadRequest, responseTransformer, returnFuture); - return new DefaultFileDownload(returnFuture, CompletableFuture.completedFuture(progressUpdater.progress()), - CompletableFuture.completedFuture(downloadRequest)); + return new DefaultFileDownload(returnFuture, progressUpdater.progress(), () -> downloadRequest, null); } private TransferProgressUpdater doDownloadFile( @@ -285,26 +287,43 @@ public FileDownload resumeDownloadFile(ResumableFileDownload resumableFileDownlo CompletableFuture progressFuture = new CompletableFuture<>(); CompletableFuture newDownloadFileRequestFuture = new CompletableFuture<>(); - s3AsyncClient.headObject(b -> b.bucket(getObjectRequest.bucket()).key(getObjectRequest.key())) - .thenAccept(headObjectResponse -> { - Pair> - requestPair = toDownloadFileRequestAndTransformer(resumableFileDownload, headObjectResponse, - originalDownloadRequest); - - DownloadFileRequest newDownloadFileRequest = requestPair.left(); - newDownloadFileRequestFuture.complete(newDownloadFileRequest); - log.debug(() -> "Sending downloadFileRequest " + newDownloadFileRequest); - - TransferProgressUpdater progressUpdater = doDownloadFile(newDownloadFileRequest, - requestPair.right(), - returnFuture); - progressFuture.complete(progressUpdater.progress()); - }).exceptionally(throwable -> { - handleException(returnFuture, progressFuture, newDownloadFileRequestFuture, throwable); - return null; - }); - - return new DefaultFileDownload(returnFuture, progressFuture, newDownloadFileRequestFuture); + CompletableFuture headFuture = + s3AsyncClient.headObject(b -> b.bucket(getObjectRequest.bucket()).key(getObjectRequest.key())); + + // Ensure cancellations are forwarded to the head future + CompletableFutureUtils.forwardExceptionTo(returnFuture, headFuture); + + headFuture.thenAccept(headObjectResponse -> { + Pair> + requestPair = toDownloadFileRequestAndTransformer(resumableFileDownload, headObjectResponse, + originalDownloadRequest); + + DownloadFileRequest newDownloadFileRequest = requestPair.left(); + newDownloadFileRequestFuture.complete(newDownloadFileRequest); + log.debug(() -> "Sending downloadFileRequest " + newDownloadFileRequest); + + TransferProgressUpdater progressUpdater = doDownloadFile(newDownloadFileRequest, + requestPair.right(), + returnFuture); + progressFuture.complete(progressUpdater.progress()); + }).exceptionally(throwable -> { + handleException(returnFuture, progressFuture, newDownloadFileRequestFuture, throwable); + return null; + }); + + return new DefaultFileDownload(returnFuture, + new ResumeTransferProgress(progressFuture), + () -> newOrOriginalRequestForPause(newDownloadFileRequestFuture, originalDownloadRequest), + resumableFileDownload); + } + + private DownloadFileRequest newOrOriginalRequestForPause(CompletableFuture newDownloadFuture, + DownloadFileRequest originalDownloadRequest) { + try { + return newDownloadFuture.getNow(originalDownloadRequest); + } catch (CompletionException e) { + return originalDownloadRequest; + } } private static void handleException(CompletableFuture returnFuture, diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/model/DefaultFileDownload.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/model/DefaultFileDownload.java index bdcd2d76b03d..25979a9ff89b 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/model/DefaultFileDownload.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/model/DefaultFileDownload.java @@ -17,128 +17,89 @@ import java.io.File; import java.time.Instant; -import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; import software.amazon.awssdk.annotations.SdkInternalApi; -import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.services.s3.model.GetObjectResponse; -import software.amazon.awssdk.transfer.s3.internal.progress.DefaultTransferProgress; -import software.amazon.awssdk.transfer.s3.internal.progress.DefaultTransferProgressSnapshot; import software.amazon.awssdk.transfer.s3.model.CompletedFileDownload; import software.amazon.awssdk.transfer.s3.model.DownloadFileRequest; import software.amazon.awssdk.transfer.s3.model.FileDownload; import software.amazon.awssdk.transfer.s3.model.ResumableFileDownload; import software.amazon.awssdk.transfer.s3.progress.TransferProgress; import software.amazon.awssdk.transfer.s3.progress.TransferProgressSnapshot; -import software.amazon.awssdk.utils.Logger; +import software.amazon.awssdk.utils.Lazy; import software.amazon.awssdk.utils.ToString; import software.amazon.awssdk.utils.Validate; @SdkInternalApi public final class DefaultFileDownload implements FileDownload { - private static final Logger log = Logger.loggerFor(FileDownload.class); private final CompletableFuture completionFuture; - private final CompletableFuture progressFuture; - private final CompletableFuture requestFuture; - private volatile ResumableFileDownload resumableFileDownload; - private final Object lock = new Object(); + private final Lazy resumableFileDownload; + private final TransferProgress progress; + private final Supplier requestSupplier; + private final ResumableFileDownload resumedDownload; public DefaultFileDownload(CompletableFuture completedFileDownloadFuture, - CompletableFuture progressFuture, - CompletableFuture requestFuture) { + TransferProgress progress, + Supplier requestSupplier, + ResumableFileDownload resumedDownload) { this.completionFuture = Validate.paramNotNull(completedFileDownloadFuture, "completedFileDownloadFuture"); - this.progressFuture = Validate.paramNotNull(progressFuture, "progressFuture"); - this.requestFuture = Validate.paramNotNull(requestFuture, "requestFuture"); + this.progress = Validate.paramNotNull(progress, "progress"); + this.requestSupplier = Validate.paramNotNull(requestSupplier, "requestSupplier"); + this.resumableFileDownload = new Lazy<>(this::doPause); + this.resumedDownload = resumedDownload; } @Override public TransferProgress progress() { - return progressFuture.isDone() ? progressFuture.join() : - new DefaultTransferProgress(DefaultTransferProgressSnapshot.builder().build()); + return progress; } @Override public ResumableFileDownload pause() { - log.debug(() -> "Start to pause "); - if (resumableFileDownload == null) { - synchronized (lock) { - if (resumableFileDownload == null) { - completionFuture.cancel(true); - - if (!requestFuture.isDone() || !progressFuture.isDone()) { - throw SdkClientException.create("DownloadFileRequest is unknown, not able to pause. This is likely " - + "because you are trying to pause a resumed download request that " - + "hasn't started yet. Please try later"); - } - DownloadFileRequest request = requestFuture.join(); - TransferProgress progress = progressFuture.join(); - - Instant s3objectLastModified = null; - Long totalBytesTransferred = null; - TransferProgressSnapshot snapshot = progress.snapshot(); - if (snapshot.sdkResponse().isPresent() && snapshot.sdkResponse().get() instanceof GetObjectResponse) { - GetObjectResponse getObjectResponse = (GetObjectResponse) snapshot.sdkResponse().get(); - s3objectLastModified = getObjectResponse.lastModified(); - totalBytesTransferred = getObjectResponse.contentLength(); - } - File destination = request.destination().toFile(); - long length = destination.length(); - Instant fileLastModified = Instant.ofEpochMilli(destination.lastModified()); - resumableFileDownload = ResumableFileDownload.builder() - .downloadFileRequest(request) - .s3ObjectLastModified(s3objectLastModified) - .fileLastModified(fileLastModified) - .bytesTransferred(length) - .totalSizeInBytes(totalBytesTransferred) - .build(); - } - - } - } - return resumableFileDownload; - } - - @Override - public CompletableFuture completionFuture() { - return completionFuture; + return resumableFileDownload.getValue(); } - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - DefaultFileDownload that = (DefaultFileDownload) o; + private ResumableFileDownload doPause() { + completionFuture.cancel(true); - if (!Objects.equals(completionFuture, that.completionFuture)) { - return false; - } + Instant s3objectLastModified = null; + Long totalSizeInBytes = null; + TransferProgressSnapshot snapshot = progress.snapshot(); - if (!Objects.equals(requestFuture, that.requestFuture)) { - return false; + if (snapshot.sdkResponse().isPresent() && snapshot.sdkResponse().get() instanceof GetObjectResponse) { + GetObjectResponse getObjectResponse = (GetObjectResponse) snapshot.sdkResponse().get(); + s3objectLastModified = getObjectResponse.lastModified(); + totalSizeInBytes = getObjectResponse.contentLength(); + } else if (resumedDownload != null) { + s3objectLastModified = resumedDownload.s3ObjectLastModified().orElse(null); + totalSizeInBytes = resumedDownload.totalSizeInBytes().orElse(null); } - return Objects.equals(progressFuture, that.progressFuture); + DownloadFileRequest request = requestSupplier.get(); + File destination = request.destination().toFile(); + long length = destination.length(); + Instant fileLastModified = Instant.ofEpochMilli(destination.lastModified()); + return ResumableFileDownload.builder() + .downloadFileRequest(request) + .s3ObjectLastModified(s3objectLastModified) + .fileLastModified(fileLastModified) + .bytesTransferred(length) + .totalSizeInBytes(totalSizeInBytes) + .build(); } @Override - public int hashCode() { - int result = completionFuture != null ? completionFuture.hashCode() : 0; - result = 31 * result + (requestFuture != null ? requestFuture.hashCode() : 0); - result = 31 * result + (progressFuture != null ? progressFuture.hashCode() : 0); - return result; + public CompletableFuture completionFuture() { + return completionFuture; } @Override public String toString() { return ToString.builder("DefaultFileDownload") .add("completionFuture", completionFuture) - .add("progress", progressFuture) - .add("request", requestFuture) + .add("progress", progress) + .add("request", requestSupplier.get()) .build(); } } diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/DefaultTransferProgressSnapshot.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/DefaultTransferProgressSnapshot.java index 64409a336636..fac156351c79 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/DefaultTransferProgressSnapshot.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/DefaultTransferProgressSnapshot.java @@ -44,6 +44,7 @@ private DefaultTransferProgressSnapshot(Builder builder) { "bytesTransferred (%s) must not be greater than transferSizeInBytes (%s)", builder.bytesTransferred, builder.transferSizeInBytes); } + Validate.paramNotNull(builder.bytesTransferred, "byteTransferred"); this.bytesTransferred = Validate.isNotNegative(builder.bytesTransferred, "bytesTransferred"); this.transferSizeInBytes = builder.transferSizeInBytes; this.sdkResponse = builder.sdkResponse; @@ -125,7 +126,7 @@ public String toString() { public static final class Builder implements CopyableBuilder { - private long bytesTransferred = 0L; + private Long bytesTransferred; private Long transferSizeInBytes; private SdkResponse sdkResponse; @@ -138,7 +139,7 @@ private Builder(DefaultTransferProgressSnapshot snapshot) { this.sdkResponse = snapshot.sdkResponse; } - public Builder bytesTransferred(long bytesTransferred) { + public Builder bytesTransferred(Long bytesTransferred) { this.bytesTransferred = bytesTransferred; return this; } diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/ResumeTransferProgress.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/ResumeTransferProgress.java new file mode 100644 index 000000000000..36edf31552ea --- /dev/null +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/ResumeTransferProgress.java @@ -0,0 +1,43 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.transfer.s3.internal.progress; + +import java.util.concurrent.CompletableFuture; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.transfer.s3.progress.TransferProgress; +import software.amazon.awssdk.transfer.s3.progress.TransferProgressSnapshot; +import software.amazon.awssdk.utils.Validate; + +/** + * An implementation of {@link TransferProgress} used when resuming a transfer. This uses a bytes-transferred of 0 until the real + * progress is available (when the transfer starts). + */ +@SdkInternalApi +public class ResumeTransferProgress implements TransferProgress { + private CompletableFuture progressFuture; + + public ResumeTransferProgress(CompletableFuture progressFuture) { + this.progressFuture = Validate.paramNotNull(progressFuture, "progressFuture"); + } + + @Override + public TransferProgressSnapshot snapshot() { + if (progressFuture.isDone() && !progressFuture.isCompletedExceptionally()) { + return progressFuture.join().snapshot(); + } + return DefaultTransferProgressSnapshot.builder().bytesTransferred(0L).build(); + } +} diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferProgressUpdater.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferProgressUpdater.java index d9492a468067..56f34833ac4c 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferProgressUpdater.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferProgressUpdater.java @@ -46,6 +46,7 @@ public class TransferProgressUpdater { public TransferProgressUpdater(TransferObjectRequest request, AsyncRequestBody requestBody) { DefaultTransferProgressSnapshot.Builder snapshotBuilder = DefaultTransferProgressSnapshot.builder(); + snapshotBuilder.bytesTransferred(0L); getContentLengthSafe(requestBody).ifPresent(snapshotBuilder::transferSizeInBytes); TransferProgressSnapshot snapshot = snapshotBuilder.build(); progress = new DefaultTransferProgress(snapshot); @@ -134,7 +135,7 @@ public void subscriberOnComplete() { } private void resetBytesTransferred() { - progress.updateAndGet(b -> b.bytesTransferred(0)); + progress.updateAndGet(b -> b.bytesTransferred(0L)); } private void incrementBytesTransferred(int numBytes) { diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/model/FileDownload.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/model/FileDownload.java index 31da7591c9a5..48c3c8005cf0 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/model/FileDownload.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/model/FileDownload.java @@ -35,7 +35,7 @@ public interface FileDownload extends ObjectTransfer { * The information object is serializable for persistent storage until it should be resumed. * See {@link ResumableFileDownload} for supported formats. * - * @return {@link ResumableFileDownload} that can be used to resume the download + * @return A {@link ResumableFileDownload} that can be used to resume the download. */ ResumableFileDownload pause(); diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileDownloadTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileDownloadTest.java index 64b365a0b1bd..e2b09dc7d183 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileDownloadTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileDownloadTest.java @@ -35,10 +35,11 @@ import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.transfer.s3.internal.model.DefaultFileDownload; +import software.amazon.awssdk.transfer.s3.internal.progress.DefaultTransferProgressSnapshot; +import software.amazon.awssdk.transfer.s3.internal.progress.ResumeTransferProgress; import software.amazon.awssdk.transfer.s3.model.CompletedFileDownload; import software.amazon.awssdk.transfer.s3.model.DownloadFileRequest; import software.amazon.awssdk.transfer.s3.model.ResumableFileDownload; -import software.amazon.awssdk.transfer.s3.internal.progress.DefaultTransferProgressSnapshot; import software.amazon.awssdk.transfer.s3.progress.TransferProgress; class DefaultFileDownloadTest { @@ -58,17 +59,34 @@ public static void tearDown() throws IOException { file.delete(); } - @Test - void equals_hashcode() { - EqualsVerifier.forClass(DefaultFileDownload.class) - .withNonnullFields("completionFuture", "progressFuture", "requestFuture") - .withIgnoredFields("resumableFileDownload", "lock") - .verify(); + void pause_shouldReturnCorrectly() { + CompletableFuture future = + new CompletableFuture<>(); + TransferProgress transferProgress = Mockito.mock(TransferProgress.class); + GetObjectResponse sdkResponse = getObjectResponse(); + + Mockito.when(transferProgress.snapshot()).thenReturn(DefaultTransferProgressSnapshot.builder() + .bytesTransferred(1000L) + .sdkResponse(sdkResponse) + .build()); + + DownloadFileRequest request = getDownloadFileRequest(); + + DefaultFileDownload fileDownload = new DefaultFileDownload(future, + transferProgress, + () -> request, + null); + + ResumableFileDownload pause = fileDownload.pause(); + assertThat(pause.downloadFileRequest()).isEqualTo(request); + assertThat(pause.bytesTransferred()).isEqualTo(file.length()); + assertThat(pause.s3ObjectLastModified()).hasValue(sdkResponse.lastModified()); + assertThat(pause.totalSizeInBytes()).hasValue(sdkResponse.contentLength()); } @Test - void pause_shouldReturnCorrectly() { + void immediatePauseAfterResumeReturnsOriginalRequest() { CompletableFuture future = new CompletableFuture<>(); TransferProgress transferProgress = Mockito.mock(TransferProgress.class); @@ -82,8 +100,9 @@ void pause_shouldReturnCorrectly() { DownloadFileRequest request = getDownloadFileRequest(); DefaultFileDownload fileDownload = new DefaultFileDownload(future, - CompletableFuture.completedFuture(transferProgress), - CompletableFuture.completedFuture(request)); + transferProgress, + () -> request, + null); ResumableFileDownload pause = fileDownload.pause(); assertThat(pause.downloadFileRequest()).isEqualTo(request); @@ -109,8 +128,9 @@ void pause_transferAlreadyFinished_shouldReturnNormally() { .build()); DefaultFileDownload fileDownload = new DefaultFileDownload(future, - CompletableFuture.completedFuture(transferProgress), - CompletableFuture.completedFuture(getDownloadFileRequest())); + transferProgress, + this::getDownloadFileRequest, + null); ResumableFileDownload resumableFileDownload = fileDownload.pause(); assertThat(resumableFileDownload.bytesTransferred()).isEqualTo(file.length()); assertThat(resumableFileDownload.totalSizeInBytes()).hasValue(OBJECT_CONTENT_LENGTH); @@ -127,8 +147,9 @@ void pauseTwice_shouldReturnTheSame() { DownloadFileRequest request = getDownloadFileRequest(); DefaultFileDownload fileDownload = new DefaultFileDownload(future, - CompletableFuture.completedFuture(transferProgress), - CompletableFuture.completedFuture(request)); + transferProgress, + () -> request, + null); ResumableFileDownload resumableFileDownload = fileDownload.pause(); ResumableFileDownload resumableFileDownload2 = fileDownload.pause(); @@ -152,11 +173,13 @@ void progress_progressNotFinished_shouldReturnDefaultProgress() { TransferProgress transferProgress = Mockito.mock(TransferProgress.class); Mockito.when(transferProgress.snapshot()).thenReturn(snapshot); DefaultFileDownload fileDownload = new DefaultFileDownload(completedFileDownloadFuture, - progressFuture, - requestFuture); + new ResumeTransferProgress(progressFuture), + () -> requestFuture.getNow(null), + null); - TransferProgress progress = fileDownload.progress(); - assertThat(fileDownload.progress().snapshot()).isEqualTo(DefaultTransferProgressSnapshot.builder().build()); + assertThat(fileDownload.progress().snapshot()).isEqualTo(DefaultTransferProgressSnapshot.builder() + .bytesTransferred(0L) + .build()); progressFuture.complete(transferProgress); assertThat(fileDownload.progress().snapshot()).isEqualTo(snapshot); diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DefaultTransferProgressSnapshotTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DefaultTransferProgressSnapshotTest.java index 1d455d4e32c2..594825533027 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DefaultTransferProgressSnapshotTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DefaultTransferProgressSnapshotTest.java @@ -26,7 +26,7 @@ public class DefaultTransferProgressSnapshotTest { @Test public void bytesTransferred_greaterThan_transferSize_shouldThrow() { DefaultTransferProgressSnapshot.Builder builder = DefaultTransferProgressSnapshot.builder() - .bytesTransferred(2) + .bytesTransferred(2L) .transferSizeInBytes(1L); assertThatThrownBy(builder::build) .isInstanceOf(IllegalArgumentException.class) @@ -36,7 +36,7 @@ public void bytesTransferred_greaterThan_transferSize_shouldThrow() { @Test public void ratioTransferred_withoutTransferSize_isEmpty() { TransferProgressSnapshot snapshot = DefaultTransferProgressSnapshot.builder() - .bytesTransferred(1) + .bytesTransferred(1L) .build(); assertThat(snapshot.ratioTransferred()).isNotPresent(); } @@ -44,7 +44,7 @@ public void ratioTransferred_withoutTransferSize_isEmpty() { @Test public void ratioTransferred_withTransferSize_isCorrect() { TransferProgressSnapshot snapshot = DefaultTransferProgressSnapshot.builder() - .bytesTransferred(1) + .bytesTransferred(1L) .transferSizeInBytes(2L) .build(); assertThat(snapshot.ratioTransferred()).hasValue(0.5); @@ -53,7 +53,7 @@ public void ratioTransferred_withTransferSize_isCorrect() { @Test public void bytesRemainingTransferred_withoutTransferSize_isEmpty() { TransferProgressSnapshot snapshot = DefaultTransferProgressSnapshot.builder() - .bytesTransferred(1) + .bytesTransferred(1L) .build(); assertThat(snapshot.bytesRemaining()).isNotPresent(); } @@ -61,7 +61,7 @@ public void bytesRemainingTransferred_withoutTransferSize_isEmpty() { @Test public void bytesRemainingTransferred_withTransferSize_isCorrect() { TransferProgressSnapshot snapshot = DefaultTransferProgressSnapshot.builder() - .bytesTransferred(1) + .bytesTransferred(1L) .transferSizeInBytes(3L) .build(); assertThat(snapshot.bytesRemaining()).hasValue(2L); diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperParameterizedTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperParameterizedTest.java index c2099da513b8..52b20b533474 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperParameterizedTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperParameterizedTest.java @@ -145,10 +145,13 @@ private static DefaultFileDownload completedDownload() { return new DefaultFileDownload(CompletableFuture.completedFuture(CompletedFileDownload.builder() .response(GetObjectResponse.builder().build()) .build()), - CompletableFuture.completedFuture(new DefaultTransferProgress(DefaultTransferProgressSnapshot.builder().build())), - CompletableFuture.completedFuture(DownloadFileRequest.builder().getObjectRequest(GetObjectRequest.builder().build()) + new DefaultTransferProgress(DefaultTransferProgressSnapshot.builder() + .bytesTransferred(0L) + .build()), + () -> DownloadFileRequest.builder().getObjectRequest(GetObjectRequest.builder().build()) .destination(Paths.get(".")) - .build())); + .build(), + null); } private static void verifyDestinationPathForSingleDownload(FileSystem jimfs, String delimiter, String[] keys, diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperTest.java index bb5a9894cdcc..f5fcb261225f 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperTest.java @@ -275,8 +275,11 @@ private FileDownload newFailedDownload(SdkClientException exception) { private FileDownload newDownload(CompletableFuture future) { return new DefaultFileDownload(future, - CompletableFuture.completedFuture(new DefaultTransferProgress(DefaultTransferProgressSnapshot.builder().build())), - CompletableFuture.completedFuture(DownloadFileRequest.builder().destination(Paths.get( - ".")).getObjectRequest(GetObjectRequest.builder().build()).build())); + new DefaultTransferProgress(DefaultTransferProgressSnapshot.builder() + .bytesTransferred(0L) + .build()), + () -> DownloadFileRequest.builder().destination(Paths.get( + ".")).getObjectRequest(GetObjectRequest.builder().build()).build(), + null); } } diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/S3TransferManagerPauseAndResumeTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/S3TransferManagerPauseAndResumeTest.java index 19cc205f05c1..51d0bf9429b3 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/S3TransferManagerPauseAndResumeTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/S3TransferManagerPauseAndResumeTest.java @@ -42,9 +42,11 @@ import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.services.s3.model.HeadObjectResponse; import software.amazon.awssdk.testutils.RandomTempFile; +import software.amazon.awssdk.transfer.s3.S3TransferManager; import software.amazon.awssdk.transfer.s3.model.CompletedFileDownload; import software.amazon.awssdk.transfer.s3.model.DownloadFileRequest; -import software.amazon.awssdk.transfer.s3.S3TransferManager; +import software.amazon.awssdk.transfer.s3.model.FileDownload; +import software.amazon.awssdk.transfer.s3.model.ResumableFileDownload; import software.amazon.awssdk.utils.CompletableFutureUtils; class S3TransferManagerPauseAndResumeTest { @@ -121,6 +123,72 @@ void resumeDownloadFile_headObjectFailed_shouldFail() { .join()).hasRootCause(sdkClientException); } + @Test + public void pauseAfterResumeBeforeHeadSucceeds() throws InterruptedException { + DownloadFileRequest downloadFileRequest = DownloadFileRequest.builder() + .getObjectRequest(getObjectRequest()) + .destination(file) + .build(); + + CompletableFuture headFuture = new CompletableFuture<>(); + when(mockS3Crt.headObject(any(Consumer.class))).thenReturn(headFuture); + + ResumableFileDownload originalResumable = + ResumableFileDownload.builder() + .bytesTransferred(file.length()) + .downloadFileRequest(downloadFileRequest) + .fileLastModified(Instant.ofEpochMilli(file.lastModified())) + .s3ObjectLastModified(Instant.now()) + .totalSizeInBytes(2000L) + .build(); + + FileDownload fileDownload = tm.resumeDownloadFile(originalResumable); + ResumableFileDownload newResumable = fileDownload.pause(); + + assertThat(newResumable).isEqualTo(originalResumable); + assertThat(fileDownload.completionFuture()).isCancelled(); + assertThat(headFuture).isCancelled(); + } + + @Test + public void pauseAfterResumeAfterHeadBeforeGetSucceeds() throws InterruptedException { + DownloadFileRequest downloadFileRequest = DownloadFileRequest.builder() + .getObjectRequest(getObjectRequest()) + .destination(file) + .build(); + + CompletableFuture getFuture = new CompletableFuture<>(); + when(mockS3Crt.getObject(any(GetObjectRequest.class), any(AsyncResponseTransformer.class))).thenReturn(getFuture); + + Instant s3LastModified = Instant.now(); + when(mockS3Crt.headObject(any(Consumer.class))) + .thenReturn(CompletableFuture.completedFuture(headObjectResponse(s3LastModified))); + + ResumableFileDownload originalResumable = + ResumableFileDownload.builder() + .bytesTransferred(file.length()) + .downloadFileRequest(downloadFileRequest) + .fileLastModified(Instant.ofEpochMilli(file.lastModified())) + .s3ObjectLastModified(s3LastModified) + .totalSizeInBytes(2000L) + .build(); + + FileDownload fileDownload = tm.resumeDownloadFile(originalResumable); + ResumableFileDownload newResumable = fileDownload.pause(); + + assertThat(newResumable.s3ObjectLastModified()).isEqualTo(originalResumable.s3ObjectLastModified()); + assertThat(newResumable.bytesTransferred()).isEqualTo(originalResumable.bytesTransferred()); + assertThat(newResumable.totalSizeInBytes()).isEqualTo(originalResumable.totalSizeInBytes()); + assertThat(newResumable.fileLastModified()).isEqualTo(originalResumable.fileLastModified()); + + // Download will be modified now that we finished the head request + assertThat(newResumable.downloadFileRequest()).isNotEqualTo(originalResumable.downloadFileRequest()); + + assertThat(fileDownload.completionFuture()).isCancelled(); + assertThat(getFuture).isCancelled(); + } + + private void verifyActualGetObjectRequest(GetObjectRequest getObjectRequest, String range) { ArgumentCaptor getObjectRequestArgumentCaptor = ArgumentCaptor.forClass(GetObjectRequest.class); diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelperParameterizedTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelperParameterizedTest.java index 4c154600aed0..3a4184dd477c 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelperParameterizedTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelperParameterizedTest.java @@ -315,7 +315,9 @@ private DefaultFileUpload completedUpload() { return new DefaultFileUpload(CompletableFuture.completedFuture(CompletedFileUpload.builder() .response(PutObjectResponse.builder().build()) .build()), - new DefaultTransferProgress(DefaultTransferProgressSnapshot.builder().build())); + new DefaultTransferProgress(DefaultTransferProgressSnapshot.builder() + .bytesTransferred(0L) + .build())); } private Path createTestDirectory() throws IOException { diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelperTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelperTest.java index 3ab043025725..ebd76d9636cf 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelperTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelperTest.java @@ -215,7 +215,9 @@ public void uploadDirectory_withRequestTransformer_usesRequestTransformer() thro private FileUpload newUpload(CompletableFuture future) { return new DefaultFileUpload(future, - new DefaultTransferProgress(DefaultTransferProgressSnapshot.builder().build()) + new DefaultTransferProgress(DefaultTransferProgressSnapshot.builder() + .bytesTransferred(0L) + .build()) ); } } diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/progress/LoggingTransferListenerTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/progress/LoggingTransferListenerTest.java index 1f45f10c9677..1dbf3519e0e0 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/progress/LoggingTransferListenerTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/progress/LoggingTransferListenerTest.java @@ -41,6 +41,7 @@ public class LoggingTransferListenerTest { @BeforeEach public void setUp() throws Exception { TransferProgressSnapshot snapshot = DefaultTransferProgressSnapshot.builder() + .bytesTransferred(0L) .transferSizeInBytes(TRANSFER_SIZE_IN_BYTES) .build(); progress = new DefaultTransferProgress(snapshot); @@ -107,7 +108,7 @@ private void invokeSuccessfulLifecycle() { for (int i = 0; i <= TRANSFER_SIZE_IN_BYTES; i++) { int bytes = i; listener.bytesTransferred(context.copy(c -> c.progressSnapshot( - progress.updateAndGet(p -> p.bytesTransferred(bytes))))); + progress.updateAndGet(p -> p.bytesTransferred((long) bytes))))); } listener.transferComplete(context.copy(b -> b.progressSnapshot(progress.snapshot())