Skip to content

Commit

Permalink
Allow pausing a resumed download even when the download hasn't alread…
Browse files Browse the repository at this point in the history
…y started.
  • Loading branch information
millems committed Jul 14, 2022
1 parent 170afec commit b3a003e
Show file tree
Hide file tree
Showing 16 changed files with 279 additions and 140 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "AWS S3 Transfer Manager",
"contributor": "",
"description": "Require setting the bytes transferred on transfer progress snapshots. This prevents programming bugs where the caller forgets to set the value and it gets defaulted to 0."
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "AWS S3 Transfer Manager",
"contributor": "",
"description": "Allow pausing a resumed download, even if the resumed download hasn't started."
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static software.amazon.awssdk.transfer.s3.internal.utils.ResumableRequestConverter.toDownloadFileRequestAndTransformer;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.annotations.SdkTestInternalApi;
import software.amazon.awssdk.arns.Arn;
Expand All @@ -33,6 +34,7 @@
import software.amazon.awssdk.services.s3.model.CopyObjectResponse;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.config.S3TransferManagerOverrideConfiguration;
Expand All @@ -43,6 +45,7 @@
import software.amazon.awssdk.transfer.s3.internal.model.DefaultFileDownload;
import software.amazon.awssdk.transfer.s3.internal.model.DefaultFileUpload;
import software.amazon.awssdk.transfer.s3.internal.model.DefaultUpload;
import software.amazon.awssdk.transfer.s3.internal.progress.ResumeTransferProgress;
import software.amazon.awssdk.transfer.s3.internal.progress.TransferProgressUpdater;
import software.amazon.awssdk.transfer.s3.model.CompletedCopy;
import software.amazon.awssdk.transfer.s3.model.CompletedDownload;
Expand Down Expand Up @@ -243,8 +246,7 @@ public FileDownload downloadFile(DownloadFileRequest downloadRequest) {
CompletableFuture<CompletedFileDownload> returnFuture = new CompletableFuture<>();
TransferProgressUpdater progressUpdater = doDownloadFile(downloadRequest, responseTransformer, returnFuture);

return new DefaultFileDownload(returnFuture, CompletableFuture.completedFuture(progressUpdater.progress()),
CompletableFuture.completedFuture(downloadRequest));
return new DefaultFileDownload(returnFuture, progressUpdater.progress(), () -> downloadRequest, null);
}

private TransferProgressUpdater doDownloadFile(
Expand Down Expand Up @@ -285,26 +287,43 @@ public FileDownload resumeDownloadFile(ResumableFileDownload resumableFileDownlo
CompletableFuture<TransferProgress> progressFuture = new CompletableFuture<>();
CompletableFuture<DownloadFileRequest> newDownloadFileRequestFuture = new CompletableFuture<>();

s3AsyncClient.headObject(b -> b.bucket(getObjectRequest.bucket()).key(getObjectRequest.key()))
.thenAccept(headObjectResponse -> {
Pair<DownloadFileRequest, AsyncResponseTransformer<GetObjectResponse, GetObjectResponse>>
requestPair = toDownloadFileRequestAndTransformer(resumableFileDownload, headObjectResponse,
originalDownloadRequest);

DownloadFileRequest newDownloadFileRequest = requestPair.left();
newDownloadFileRequestFuture.complete(newDownloadFileRequest);
log.debug(() -> "Sending downloadFileRequest " + newDownloadFileRequest);

TransferProgressUpdater progressUpdater = doDownloadFile(newDownloadFileRequest,
requestPair.right(),
returnFuture);
progressFuture.complete(progressUpdater.progress());
}).exceptionally(throwable -> {
handleException(returnFuture, progressFuture, newDownloadFileRequestFuture, throwable);
return null;
});

return new DefaultFileDownload(returnFuture, progressFuture, newDownloadFileRequestFuture);
CompletableFuture<HeadObjectResponse> headFuture =
s3AsyncClient.headObject(b -> b.bucket(getObjectRequest.bucket()).key(getObjectRequest.key()));

// Ensure cancellations are forwarded to the head future
CompletableFutureUtils.forwardExceptionTo(returnFuture, headFuture);

headFuture.thenAccept(headObjectResponse -> {
Pair<DownloadFileRequest, AsyncResponseTransformer<GetObjectResponse, GetObjectResponse>>
requestPair = toDownloadFileRequestAndTransformer(resumableFileDownload, headObjectResponse,
originalDownloadRequest);

DownloadFileRequest newDownloadFileRequest = requestPair.left();
newDownloadFileRequestFuture.complete(newDownloadFileRequest);
log.debug(() -> "Sending downloadFileRequest " + newDownloadFileRequest);

TransferProgressUpdater progressUpdater = doDownloadFile(newDownloadFileRequest,
requestPair.right(),
returnFuture);
progressFuture.complete(progressUpdater.progress());
}).exceptionally(throwable -> {
handleException(returnFuture, progressFuture, newDownloadFileRequestFuture, throwable);
return null;
});

return new DefaultFileDownload(returnFuture,
new ResumeTransferProgress(progressFuture),
() -> newOrOriginalRequestForPause(newDownloadFileRequestFuture, originalDownloadRequest),
resumableFileDownload);
}

private DownloadFileRequest newOrOriginalRequestForPause(CompletableFuture<DownloadFileRequest> newDownloadFuture,
DownloadFileRequest originalDownloadRequest) {
try {
return newDownloadFuture.getNow(originalDownloadRequest);
} catch (CompletionException e) {
return originalDownloadRequest;
}
}

private static void handleException(CompletableFuture<CompletedFileDownload> returnFuture,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,128 +17,89 @@

import java.io.File;
import java.time.Instant;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.transfer.s3.internal.progress.DefaultTransferProgress;
import software.amazon.awssdk.transfer.s3.internal.progress.DefaultTransferProgressSnapshot;
import software.amazon.awssdk.transfer.s3.model.CompletedFileDownload;
import software.amazon.awssdk.transfer.s3.model.DownloadFileRequest;
import software.amazon.awssdk.transfer.s3.model.FileDownload;
import software.amazon.awssdk.transfer.s3.model.ResumableFileDownload;
import software.amazon.awssdk.transfer.s3.progress.TransferProgress;
import software.amazon.awssdk.transfer.s3.progress.TransferProgressSnapshot;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.Lazy;
import software.amazon.awssdk.utils.ToString;
import software.amazon.awssdk.utils.Validate;

@SdkInternalApi
public final class DefaultFileDownload implements FileDownload {
private static final Logger log = Logger.loggerFor(FileDownload.class);
private final CompletableFuture<CompletedFileDownload> completionFuture;
private final CompletableFuture<TransferProgress> progressFuture;
private final CompletableFuture<DownloadFileRequest> requestFuture;
private volatile ResumableFileDownload resumableFileDownload;
private final Object lock = new Object();
private final Lazy<ResumableFileDownload> resumableFileDownload;
private final TransferProgress progress;
private final Supplier<DownloadFileRequest> requestSupplier;
private final ResumableFileDownload resumedDownload;

public DefaultFileDownload(CompletableFuture<CompletedFileDownload> completedFileDownloadFuture,
CompletableFuture<TransferProgress> progressFuture,
CompletableFuture<DownloadFileRequest> requestFuture) {
TransferProgress progress,
Supplier<DownloadFileRequest> requestSupplier,
ResumableFileDownload resumedDownload) {
this.completionFuture = Validate.paramNotNull(completedFileDownloadFuture, "completedFileDownloadFuture");
this.progressFuture = Validate.paramNotNull(progressFuture, "progressFuture");
this.requestFuture = Validate.paramNotNull(requestFuture, "requestFuture");
this.progress = Validate.paramNotNull(progress, "progress");
this.requestSupplier = Validate.paramNotNull(requestSupplier, "requestSupplier");
this.resumableFileDownload = new Lazy<>(this::doPause);
this.resumedDownload = resumedDownload;
}

@Override
public TransferProgress progress() {
return progressFuture.isDone() ? progressFuture.join() :
new DefaultTransferProgress(DefaultTransferProgressSnapshot.builder().build());
return progress;
}

@Override
public ResumableFileDownload pause() {
log.debug(() -> "Start to pause ");
if (resumableFileDownload == null) {
synchronized (lock) {
if (resumableFileDownload == null) {
completionFuture.cancel(true);

if (!requestFuture.isDone() || !progressFuture.isDone()) {
throw SdkClientException.create("DownloadFileRequest is unknown, not able to pause. This is likely "
+ "because you are trying to pause a resumed download request that "
+ "hasn't started yet. Please try later");
}
DownloadFileRequest request = requestFuture.join();
TransferProgress progress = progressFuture.join();

Instant s3objectLastModified = null;
Long totalBytesTransferred = null;
TransferProgressSnapshot snapshot = progress.snapshot();
if (snapshot.sdkResponse().isPresent() && snapshot.sdkResponse().get() instanceof GetObjectResponse) {
GetObjectResponse getObjectResponse = (GetObjectResponse) snapshot.sdkResponse().get();
s3objectLastModified = getObjectResponse.lastModified();
totalBytesTransferred = getObjectResponse.contentLength();
}
File destination = request.destination().toFile();
long length = destination.length();
Instant fileLastModified = Instant.ofEpochMilli(destination.lastModified());
resumableFileDownload = ResumableFileDownload.builder()
.downloadFileRequest(request)
.s3ObjectLastModified(s3objectLastModified)
.fileLastModified(fileLastModified)
.bytesTransferred(length)
.totalSizeInBytes(totalBytesTransferred)
.build();
}

}
}
return resumableFileDownload;
}

@Override
public CompletableFuture<CompletedFileDownload> completionFuture() {
return completionFuture;
return resumableFileDownload.getValue();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

DefaultFileDownload that = (DefaultFileDownload) o;
private ResumableFileDownload doPause() {
completionFuture.cancel(true);

if (!Objects.equals(completionFuture, that.completionFuture)) {
return false;
}
Instant s3objectLastModified = null;
Long totalSizeInBytes = null;
TransferProgressSnapshot snapshot = progress.snapshot();

if (!Objects.equals(requestFuture, that.requestFuture)) {
return false;
if (snapshot.sdkResponse().isPresent() && snapshot.sdkResponse().get() instanceof GetObjectResponse) {
GetObjectResponse getObjectResponse = (GetObjectResponse) snapshot.sdkResponse().get();
s3objectLastModified = getObjectResponse.lastModified();
totalSizeInBytes = getObjectResponse.contentLength();
} else if (resumedDownload != null) {
s3objectLastModified = resumedDownload.s3ObjectLastModified().orElse(null);
totalSizeInBytes = resumedDownload.totalSizeInBytes().orElse(null);
}

return Objects.equals(progressFuture, that.progressFuture);
DownloadFileRequest request = requestSupplier.get();
File destination = request.destination().toFile();
long length = destination.length();
Instant fileLastModified = Instant.ofEpochMilli(destination.lastModified());
return ResumableFileDownload.builder()
.downloadFileRequest(request)
.s3ObjectLastModified(s3objectLastModified)
.fileLastModified(fileLastModified)
.bytesTransferred(length)
.totalSizeInBytes(totalSizeInBytes)
.build();
}

@Override
public int hashCode() {
int result = completionFuture != null ? completionFuture.hashCode() : 0;
result = 31 * result + (requestFuture != null ? requestFuture.hashCode() : 0);
result = 31 * result + (progressFuture != null ? progressFuture.hashCode() : 0);
return result;
public CompletableFuture<CompletedFileDownload> completionFuture() {
return completionFuture;
}

@Override
public String toString() {
return ToString.builder("DefaultFileDownload")
.add("completionFuture", completionFuture)
.add("progress", progressFuture)
.add("request", requestFuture)
.add("progress", progress)
.add("request", requestSupplier.get())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ private DefaultTransferProgressSnapshot(Builder builder) {
"bytesTransferred (%s) must not be greater than transferSizeInBytes (%s)",
builder.bytesTransferred, builder.transferSizeInBytes);
}
Validate.paramNotNull(builder.bytesTransferred, "byteTransferred");
this.bytesTransferred = Validate.isNotNegative(builder.bytesTransferred, "bytesTransferred");
this.transferSizeInBytes = builder.transferSizeInBytes;
this.sdkResponse = builder.sdkResponse;
Expand Down Expand Up @@ -125,7 +126,7 @@ public String toString() {


public static final class Builder implements CopyableBuilder<Builder, DefaultTransferProgressSnapshot> {
private long bytesTransferred = 0L;
private Long bytesTransferred;
private Long transferSizeInBytes;
private SdkResponse sdkResponse;

Expand All @@ -138,7 +139,7 @@ private Builder(DefaultTransferProgressSnapshot snapshot) {
this.sdkResponse = snapshot.sdkResponse;
}

public Builder bytesTransferred(long bytesTransferred) {
public Builder bytesTransferred(Long bytesTransferred) {
this.bytesTransferred = bytesTransferred;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.transfer.s3.internal.progress;

import java.util.concurrent.CompletableFuture;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.transfer.s3.progress.TransferProgress;
import software.amazon.awssdk.transfer.s3.progress.TransferProgressSnapshot;
import software.amazon.awssdk.utils.Validate;

/**
* An implementation of {@link TransferProgress} used when resuming a transfer. This uses a bytes-transferred of 0 until the real
* progress is available (when the transfer starts).
*/
@SdkInternalApi
public class ResumeTransferProgress implements TransferProgress {
private CompletableFuture<TransferProgress> progressFuture;

public ResumeTransferProgress(CompletableFuture<TransferProgress> progressFuture) {
this.progressFuture = Validate.paramNotNull(progressFuture, "progressFuture");
}

@Override
public TransferProgressSnapshot snapshot() {
if (progressFuture.isDone() && !progressFuture.isCompletedExceptionally()) {
return progressFuture.join().snapshot();
}
return DefaultTransferProgressSnapshot.builder().bytesTransferred(0L).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class TransferProgressUpdater {
public TransferProgressUpdater(TransferObjectRequest request,
AsyncRequestBody requestBody) {
DefaultTransferProgressSnapshot.Builder snapshotBuilder = DefaultTransferProgressSnapshot.builder();
snapshotBuilder.bytesTransferred(0L);
getContentLengthSafe(requestBody).ifPresent(snapshotBuilder::transferSizeInBytes);
TransferProgressSnapshot snapshot = snapshotBuilder.build();
progress = new DefaultTransferProgress(snapshot);
Expand Down Expand Up @@ -134,7 +135,7 @@ public void subscriberOnComplete() {
}

private void resetBytesTransferred() {
progress.updateAndGet(b -> b.bytesTransferred(0));
progress.updateAndGet(b -> b.bytesTransferred(0L));
}

private void incrementBytesTransferred(int numBytes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public interface FileDownload extends ObjectTransfer {
* The information object is serializable for persistent storage until it should be resumed.
* See {@link ResumableFileDownload} for supported formats.
*
* @return {@link ResumableFileDownload} that can be used to resume the download
* @return A {@link ResumableFileDownload} that can be used to resume the download.
*/
ResumableFileDownload pause();

Expand Down
Loading

0 comments on commit b3a003e

Please sign in to comment.