Skip to content

Commit

Permalink
[TM DownloadFile Pause and Resume] Part 3: Implement resumeDownloadFi…
Browse files Browse the repository at this point in the history
…le (#3157)

* Implement resumeDownloadFile

* Move test code around

* Address feedback

* Fix flaky test

* fix flaky integ test
  • Loading branch information
zoewangg committed Apr 14, 2022
1 parent c1d55ac commit d05a8e3
Show file tree
Hide file tree
Showing 19 changed files with 827 additions and 133 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,11 @@ public static FileTransformerConfiguration defaultCreateNew() {
* Returns the default {@link FileTransformerConfiguration} for {@link FileWriteOption#CREATE_OR_REPLACE_EXISTING}
* <p>
* Create a new file if it doesn't exist, otherwise replace the existing file.
* In the event of an error, the SDK will attempt to delete the file (whatever has been written to it so far).
* In the event of an error, the SDK will NOT attempt to delete the file, leaving it as-is
*/
public static FileTransformerConfiguration defaultCreateOrReplaceExisting() {
return builder().fileWriteOption(FileWriteOption.CREATE_OR_REPLACE_EXISTING)
.failureBehavior(FailureBehavior.DELETE)
.failureBehavior(FailureBehavior.LEAVE)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,37 +17,49 @@

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName;
import static software.amazon.awssdk.transfer.s3.SizeConstant.MB;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.core.SdkResponse;
import software.amazon.awssdk.core.retry.backoff.FixedDelayBackoffStrategy;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.core.waiters.Waiter;
import software.amazon.awssdk.core.waiters.WaiterAcceptor;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.testutils.RandomTempFile;
import software.amazon.awssdk.transfer.s3.progress.TransferListener;
import software.amazon.awssdk.transfer.s3.progress.TransferProgressSnapshot;
import software.amazon.awssdk.utils.Logger;

public class S3TransferManagerDownloadPauseResumeIntegrationTest extends S3IntegrationTestBase {
private static final Logger log = Logger.loggerFor(S3TransferManagerDownloadPauseResumeIntegrationTest.class);
private static final String BUCKET = temporaryBucketName(S3TransferManagerDownloadPauseResumeIntegrationTest.class);
private static final String KEY = "key";
private static final int OBJ_SIZE = 16 * 1024 * 1024;
// 24 * MB is chosen to make sure we have data written in the file already upon pausing.
private static final long OBJ_SIZE = 24 * MB;
private static S3TransferManager tm;
private static File file;
private static File sourceFile;

@BeforeAll
public static void setup() throws Exception {
S3IntegrationTestBase.setUp();
createBucket(BUCKET);
file = new RandomTempFile(OBJ_SIZE);
sourceFile = new RandomTempFile(OBJ_SIZE);
s3.putObject(PutObjectRequest.builder()
.bucket(BUCKET)
.key(KEY)
.build(), file.toPath());
.build(), sourceFile.toPath());
tm = S3TransferManager.builder()
.s3ClientConfiguration(b -> b.region(DEFAULT_REGION)
.credentialsProvider(CREDENTIALS_PROVIDER_CHAIN))
Expand All @@ -58,50 +70,111 @@ public static void setup() throws Exception {
public static void cleanup() {
deleteBucketAndAllContents(BUCKET);
tm.close();
sourceFile.delete();
S3IntegrationTestBase.cleanUp();
}

@Test
void downloadToFile_pause_shouldReturnResumableDownload() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
void pauseAndResume_ObjectNotChanged_shouldResumeDownload() {
Path path = RandomTempFile.randomUncreatedFile().toPath();
TestDownloadListener testDownloadListener = new TestDownloadListener(countDownLatch);
TestDownloadListener testDownloadListener = new TestDownloadListener();
DownloadFileRequest request = DownloadFileRequest.builder()
.getObjectRequest(b -> b.bucket(BUCKET).key(KEY))
.destination(path)
.overrideConfiguration(b -> b
.addListener(testDownloadListener))
.build();
FileDownload download =
tm.downloadFile(request);
boolean count = countDownLatch.await(10, TimeUnit.SECONDS);
if (!count) {
throw new AssertionError("No data has been transferred within 5 seconds");
}
ResumableFileDownload pause = download.pause();
assertThat(pause.downloadFileRequest()).isEqualTo(request);
FileDownload download = tm.downloadFile(request);
waitUntilFirstByteBufferDelivered(download);

ResumableFileDownload resumableFileDownload = download.pause();
long bytesTransferred = resumableFileDownload.bytesTransferred();
log.debug(() -> "Paused: " + resumableFileDownload);
assertThat(resumableFileDownload.downloadFileRequest()).isEqualTo(request);
assertThat(testDownloadListener.getObjectResponse).isNotNull();
assertThat(pause.lastModified()).isEqualTo(testDownloadListener.getObjectResponse.lastModified());
assertThat(pause.bytesTransferred()).isEqualTo(path.toFile().length());
assertThat(pause.transferSizeInBytes()).hasValue(file.length());
assertThat(resumableFileDownload.s3ObjectLastModified()).hasValue(testDownloadListener.getObjectResponse.lastModified());
assertThat(bytesTransferred).isEqualTo(path.toFile().length());
assertThat(resumableFileDownload.totalSizeInBytes()).hasValue(sourceFile.length());

assertThat(bytesTransferred).isLessThan(sourceFile.length());
assertThat(download.completionFuture()).isCancelled();

log.debug(() -> "Resuming download ");
verifyFileDownload(path, resumableFileDownload, OBJ_SIZE - bytesTransferred);
}

@Test
void pauseAndResume_objectChanged_shouldStartFromBeginning() {
Path path = RandomTempFile.randomUncreatedFile().toPath();
DownloadFileRequest request = DownloadFileRequest.builder()
.getObjectRequest(b -> b.bucket(BUCKET).key(KEY))
.destination(path)
.build();
FileDownload download = tm.downloadFile(request);
waitUntilFirstByteBufferDelivered(download);

ResumableFileDownload resumableFileDownload = download.pause();
log.debug(() -> "Paused: " + resumableFileDownload);
String newObject = RandomStringUtils.randomAlphanumeric(1000);

// Re-upload the S3 object
s3.putObject(PutObjectRequest.builder()
.bucket(BUCKET)
.key(KEY)
.build(), RequestBody.fromString(newObject));

log.debug(() -> "Resuming download ");
FileDownload resumedFileDownload = tm.resumeDownloadFile(resumableFileDownload);
resumedFileDownload.progress().snapshot();
resumedFileDownload.completionFuture().join();
assertThat(path.toFile()).hasContent(newObject);
assertThat(resumedFileDownload.progress().snapshot().transferSizeInBytes()).hasValue((long) newObject.getBytes(StandardCharsets.UTF_8).length);
}

@Test
void pauseAndResume_fileChanged_shouldStartFromBeginning() throws IOException {
Path path = RandomTempFile.randomUncreatedFile().toPath();
DownloadFileRequest request = DownloadFileRequest.builder()
.getObjectRequest(b -> b.bucket(BUCKET).key(KEY))
.destination(path)
.build();
FileDownload download = tm.downloadFile(request);
waitUntilFirstByteBufferDelivered(download);

ResumableFileDownload resumableFileDownload = download.pause();
Files.write(path, "helloworld".getBytes(StandardCharsets.UTF_8));

verifyFileDownload(path, resumableFileDownload, OBJ_SIZE);
}

private static void verifyFileDownload(Path path, ResumableFileDownload resumableFileDownload, long expectedBytesTransferred) {
FileDownload resumedFileDownload = tm.resumeDownloadFile(resumableFileDownload);
resumedFileDownload.progress().snapshot();
resumedFileDownload.completionFuture().join();
assertThat(path.toFile()).hasSameBinaryContentAs(sourceFile);
assertThat(resumedFileDownload.progress().snapshot().transferSizeInBytes()).hasValue(expectedBytesTransferred);
}

private static void waitUntilFirstByteBufferDelivered(FileDownload download) {
Waiter<TransferProgressSnapshot> waiter = Waiter.builder(TransferProgressSnapshot.class)
.addAcceptor(WaiterAcceptor.successOnResponseAcceptor(r -> r.bytesTransferred() > 0))
.addAcceptor(WaiterAcceptor.retryOnResponseAcceptor(r -> true))
.overrideConfiguration(o -> o.waitTimeout(Duration.ofMinutes(1))
.maxAttempts(Integer.MAX_VALUE)
.backoffStrategy(FixedDelayBackoffStrategy.create(Duration.ofMillis(100))))
.build();
waiter.run(() -> download.progress().snapshot());
}

private static final class TestDownloadListener implements TransferListener {
private final CountDownLatch countDownLatch;
private GetObjectResponse getObjectResponse;

private TestDownloadListener(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}

@Override
public void bytesTransferred(Context.BytesTransferred context) {
Optional<SdkResponse> sdkResponse = context.progressSnapshot().sdkResponse();
if (sdkResponse.isPresent() && sdkResponse.get() instanceof GetObjectResponse) {
getObjectResponse = (GetObjectResponse) sdkResponse.get();
}
countDownLatch.countDown();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.util.Objects;
import java.util.Optional;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.utils.ToString;
import software.amazon.awssdk.utils.Validate;
import software.amazon.awssdk.utils.builder.CopyableBuilder;
import software.amazon.awssdk.utils.builder.ToCopyableBuilder;
Expand All @@ -28,25 +27,24 @@
* An opaque token that holds the state and can be used to resume a
* paused download operation.
*
* TODO: 1. should we just store GetObjectResponse?
* 2. consider providing a way to serialize and deserialize the token
* 3. Do we need to store file checksum?
*
* @see S3TransferManager#downloadFile(DownloadFileRequest)
*/
@SdkPublicApi
public final class ResumableFileDownload implements ResumableTransfer,
ToCopyableBuilder<ResumableFileDownload.Builder, ResumableFileDownload> {
private final DownloadFileRequest downloadFileRequest;
private final long bytesTransferred;
private final Instant lastModified;
private final Long transferSizeInBytes;
private final Instant s3ObjectLastModified;
private final Long totalSizeInBytes;
private final Instant fileLastModified;

private ResumableFileDownload(DefaultBuilder builder) {
this.downloadFileRequest = Validate.paramNotNull(builder.downloadFileRequest, "downloadFileRequest");
Validate.isPositiveOrNull(builder.bytesTransferred, "bytesTransferred");
this.bytesTransferred = builder.bytesTransferred == null ? 0 : builder.bytesTransferred;
this.lastModified = builder.lastModified;
this.transferSizeInBytes = builder.transferSizeInBytes;
this.s3ObjectLastModified = builder.s3ObjectLastModified;
this.totalSizeInBytes = Validate.isPositiveOrNull(builder.totalSizeInBytes, "totalSizeInBytes");
this.fileLastModified = builder.fileLastModified;
}

@Override
Expand All @@ -66,31 +64,25 @@ public boolean equals(Object o) {
if (!downloadFileRequest.equals(that.downloadFileRequest)) {
return false;
}
if (!Objects.equals(lastModified, that.lastModified)) {
if (!Objects.equals(s3ObjectLastModified, that.s3ObjectLastModified)) {
return false;
}
if (!Objects.equals(fileLastModified, that.fileLastModified)) {
return false;
}
return Objects.equals(transferSizeInBytes, that.transferSizeInBytes);
return Objects.equals(totalSizeInBytes, that.totalSizeInBytes);
}

@Override
public int hashCode() {
int result = downloadFileRequest.hashCode();
result = 31 * result + (int) (bytesTransferred ^ (bytesTransferred >>> 32));
result = 31 * result + (lastModified != null ? lastModified.hashCode() : 0);
result = 31 * result + (transferSizeInBytes != null ? transferSizeInBytes.hashCode() : 0);
result = 31 * result + (s3ObjectLastModified != null ? s3ObjectLastModified.hashCode() : 0);
result = 31 * result + (fileLastModified != null ? fileLastModified.hashCode() : 0);
result = 31 * result + (totalSizeInBytes != null ? totalSizeInBytes.hashCode() : 0);
return result;
}

@Override
public String toString() {
return ToString.builder("ResumableFileDownload")
.add("downloadFileRequest", downloadFileRequest)
.add("bytesTransferred", bytesTransferred)
.add("lastModified", lastModified)
.add("transferSizeInBytes", transferSizeInBytes)
.build();
}

public static Builder builder() {
return new DefaultBuilder();
}
Expand All @@ -111,19 +103,26 @@ public long bytesTransferred() {
}

/**
* Last modified time on Amazon S3 for this object.
* Last modified time of the S3 object since last pause, or {@link Optional#empty()} if unknown
*/
public Instant lastModified() {
return lastModified;
public Optional<Instant> s3ObjectLastModified() {
return Optional.ofNullable(s3ObjectLastModified);
}

/**
* Last modified time of the file since last pause
*/
public Instant fileLastModified() {
return fileLastModified;
}

/**
* The total size of the transfer in bytes, or {@link Optional#empty()} if unknown
*
* @return the optional total size of the transfer.
*/
public Optional<Long> transferSizeInBytes() {
return Optional.ofNullable(transferSizeInBytes);
public Optional<Long> totalSizeInBytes() {
return Optional.ofNullable(totalSizeInBytes);
}

@Override
Expand Down Expand Up @@ -151,34 +150,43 @@ public interface Builder extends CopyableBuilder<Builder, ResumableFileDownload>

/**
* Sets the total transfer size in bytes
* @param transferSizeInBytes the transfer size in bytes
* @param totalSizeInBytes the transfer size in bytes
* @return a reference to this object so that method calls can be chained together.
*/
Builder totalSizeInBytes(Long totalSizeInBytes);

/**
* Sets the last modified time of the object
*
* @param s3ObjectLastModified the last modified time of the object
* @return a reference to this object so that method calls can be chained together.
*/
Builder transferSizeInBytes(Long transferSizeInBytes);
Builder s3ObjectLastModified(Instant s3ObjectLastModified);

/**
* Sets the last modified time of the object
*
* @param lastModified the last modified time of the object
* @return a reference to this object so that method calls can be chained together.
*/
Builder lastModified(Instant lastModified);
Builder fileLastModified(Instant lastModified);
}

private static final class DefaultBuilder implements Builder {

private DownloadFileRequest downloadFileRequest;
private Long bytesTransferred;
private Instant lastModified;
private Long transferSizeInBytes;
private Instant s3ObjectLastModified;
private Long totalSizeInBytes;
private Instant fileLastModified;

private DefaultBuilder() {

}

private DefaultBuilder(ResumableFileDownload persistableFileDownload) {
this.downloadFileRequest = persistableFileDownload.downloadFileRequest;
this.bytesTransferred = persistableFileDownload.bytesTransferred;
this.lastModified = persistableFileDownload.lastModified;
this.s3ObjectLastModified = persistableFileDownload.s3ObjectLastModified;
}

@Override
Expand All @@ -194,14 +202,20 @@ public Builder bytesTransferred(Long bytesTransferred) {
}

@Override
public Builder transferSizeInBytes(Long transferSizeInBytes) {
this.transferSizeInBytes = transferSizeInBytes;
public Builder totalSizeInBytes(Long totalSizeInBytes) {
this.totalSizeInBytes = totalSizeInBytes;
return this;
}

@Override
public Builder s3ObjectLastModified(Instant s3ObjectLastModified) {
this.s3ObjectLastModified = s3ObjectLastModified;
return this;
}

@Override
public Builder lastModified(Instant lastModified) {
this.lastModified = lastModified;
public Builder fileLastModified(Instant fileLastModified) {
this.fileLastModified = fileLastModified;
return this;
}

Expand Down
Loading

0 comments on commit d05a8e3

Please sign in to comment.