Skip to content

Commit

Permalink
Address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
zoewangg committed Mar 30, 2022
1 parent 95b007f commit 9a4a68e
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
Expand All @@ -41,7 +40,8 @@ public class S3TransferManagerDownloadPauseResumeIntegrationTest extends S3Integ
private static File file;

@BeforeAll
public static void setup() throws IOException {
public static void setup() throws Exception {
S3IntegrationTestBase.setUp();
createBucket(BUCKET);
file = new RandomTempFile(OBJ_SIZE);
s3.putObject(PutObjectRequest.builder()
Expand All @@ -62,7 +62,7 @@ public static void cleanup() {
}

@Test
public void downloadToFile_pause_shouldReturnResumableDownload() throws InterruptedException {
void downloadToFile_pause_shouldReturnResumableDownload() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
Path path = RandomTempFile.randomUncreatedFile().toPath();
TestDownloadListener testDownloadListener = new TestDownloadListener(countDownLatch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
import java.util.concurrent.CompletableFuture;
import software.amazon.awssdk.annotations.SdkPreviewApi;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.annotations.ThreadSafe;

/**
* A download transfer of a single object from S3.
*/
@SdkPublicApi
@SdkPreviewApi
@ThreadSafe
public interface FileDownload extends ObjectTransfer {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@
* An opaque token that holds the state and can be used to resume a
* paused download operation.
*
* TODO: should we just store GetObjectResponse? Do we actually need bytesTransferred since
* it can be inferred from file content length
* TODO: 1. should we just store GetObjectResponse?
* 2. consider providing a way to serialize and deserialize the token
* 3. Do we need to store file checksum?
*
* @see S3TransferManager#downloadFile(DownloadFileRequest)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.time.Instant;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.transfer.s3.CompletedFileDownload;
Expand All @@ -36,14 +35,14 @@ public final class DefaultFileDownload implements FileDownload {
private final CompletableFuture<CompletedFileDownload> completionFuture;
private final TransferProgress progress;
private final DownloadFileRequest request;
private final AtomicReference<ResumableFileDownload> resumableFileDownload;
private volatile ResumableFileDownload resumableFileDownload;
private final Object lock = new Object();

DefaultFileDownload(CompletableFuture<CompletedFileDownload> completionFuture,
TransferProgress progress,
DownloadFileRequest request) {
this.completionFuture = completionFuture;
this.progress = progress;
this.resumableFileDownload = new AtomicReference<>();
this.request = request;
}

Expand All @@ -54,29 +53,33 @@ public TransferProgress progress() {

@Override
public ResumableFileDownload pause() {
log.trace(() -> "Start to pause " + request);
if (resumableFileDownload.get() == null) {
completionFuture.cancel(false);

Instant lastModified = null;
Long totalBytesTransferred = null;
TransferProgressSnapshot snapshot = progress.snapshot();
if (snapshot.sdkResponse().isPresent() && snapshot.sdkResponse().get() instanceof GetObjectResponse) {
GetObjectResponse getObjectResponse = (GetObjectResponse) snapshot.sdkResponse().get();
lastModified = getObjectResponse.lastModified();
totalBytesTransferred = getObjectResponse.contentLength();
}
log.debug(() -> "Start to pause " + request);
if (resumableFileDownload == null) {
synchronized (lock) {
if (resumableFileDownload == null) {
completionFuture.cancel(true);

Instant lastModified = null;
Long totalBytesTransferred = null;
TransferProgressSnapshot snapshot = progress.snapshot();
if (snapshot.sdkResponse().isPresent() && snapshot.sdkResponse().get() instanceof GetObjectResponse) {
GetObjectResponse getObjectResponse = (GetObjectResponse) snapshot.sdkResponse().get();
lastModified = getObjectResponse.lastModified();
totalBytesTransferred = getObjectResponse.contentLength();
}

long length = request.destination().toFile().length();
resumableFileDownload = ResumableFileDownload.builder()
.downloadFileRequest(request)
.lastModified(lastModified)
.bytesTransferred(length)
.transferSizeInBytes(totalBytesTransferred)
.build();
}

long bytesTransferred = snapshot.bytesTransferred();
ResumableFileDownload fileDownload = ResumableFileDownload.builder()
.downloadFileRequest(request)
.lastModified(lastModified)
.bytesTransferred(bytesTransferred)
.transferSizeInBytes(totalBytesTransferred)
.build();
resumableFileDownload.set(fileDownload);
}
}
return resumableFileDownload.get();
return resumableFileDownload;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.utils.AttributeMap;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.http.SdkHttpUtils;

/**
Expand All @@ -46,7 +47,7 @@
*/
@SdkInternalApi
public final class S3CrtAsyncHttpClient implements SdkAsyncHttpClient {

private static final Logger log = Logger.loggerFor(S3CrtAsyncHttpClient.class);
private final S3Client crtS3Client;
private final S3NativeClientConfiguration s3NativeClientConfiguration;

Expand Down Expand Up @@ -95,7 +96,7 @@ public CompletableFuture<Void> execute(AsyncExecuteRequest asyncRequest) {
.withEndpoint(s3NativeClientConfiguration.endpointOverride());

try (S3MetaRequest s3MetaRequest = crtS3Client.makeMetaRequest(requestOptions)) {
closeResourcesWhenComplete(executeFuture, s3MetaRequest);
closeResourcesWhenComplete(executeFuture, s3MetaRequest, responseHandler);
}

return executeFuture;
Expand Down Expand Up @@ -124,9 +125,12 @@ private static S3MetaRequestOptions.MetaRequestType requestType(AsyncExecuteRequ
}

private static void closeResourcesWhenComplete(CompletableFuture<Void> executeFuture,
S3MetaRequest s3MetaRequest) {
S3MetaRequest s3MetaRequest,
S3CrtResponseHandlerAdapter responseHandler) {
executeFuture.whenComplete((r, t) -> {
if (executeFuture.isCancelled()) {
log.debug(() -> "The request is cancelled, cancelling meta request");
responseHandler.cancelRequest();
s3MetaRequest.cancel();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import software.amazon.awssdk.crt.CRT;
import software.amazon.awssdk.crt.http.HttpHeader;
import software.amazon.awssdk.crt.s3.S3MetaRequestResponseHandler;
import software.amazon.awssdk.http.SdkCancellationException;
import software.amazon.awssdk.http.SdkHttpResponse;
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;

Expand Down Expand Up @@ -76,6 +77,12 @@ public void onFinished(int crtCode, int responseStatus, byte[] errorPayload) {
}
}

public void cancelRequest() {
SdkCancellationException sdkClientException =
new SdkCancellationException("request is cancelled");
notifyError(sdkClientException);
}

private void handleError(int crtCode, int responseStatus, byte[] errorPayload) {
if (isErrorResponse(responseStatus) && errorPayload != null) {
publisher.deliverData(ByteBuffer.wrap(errorPayload));
Expand All @@ -85,13 +92,16 @@ private void handleError(int crtCode, int responseStatus, byte[] errorPayload) {
SdkClientException sdkClientException =
SdkClientException.create(String.format("Failed to send the request. CRT error code: %s",
crtCode));
resultFuture.completeExceptionally(sdkClientException);

responseHandler.onError(sdkClientException);
publisher.notifyError(sdkClientException);
notifyError(sdkClientException);
}
}

private void notifyError(Exception exception) {
resultFuture.completeExceptionally(exception);
responseHandler.onError(exception);
publisher.notifyError(exception);
}

private static boolean isErrorResponse(int responseStatus) {
return responseStatus != 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,19 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.nio.file.Paths;
import com.google.common.jimfs.Jimfs;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileSystem;
import java.nio.file.Files;
import java.time.Instant;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
Expand All @@ -33,12 +42,27 @@

class DefaultFileDownloadTest {
private static final long OBJECT_CONTENT_LENGTH = 1024L;
private static FileSystem fileSystem;
private static File file;

@BeforeAll
public static void setUp() throws IOException {
fileSystem = Jimfs.newFileSystem();
file = File.createTempFile("test", UUID.randomUUID().toString());
Files.write(file.toPath(), RandomStringUtils.random(2000).getBytes(StandardCharsets.UTF_8));
}

@AfterAll
public static void tearDown() throws IOException {
file.delete();
}


@Test
void equals_hashcode() {
EqualsVerifier.forClass(DefaultFileDownload.class)
.withNonnullFields("completionFuture", "progress")
.withIgnoredFields("resumableFileDownload")
.withIgnoredFields("resumableFileDownload", "lock")
.verify();
}

Expand All @@ -62,7 +86,7 @@ void pause_shouldReturnCorrectly() {

ResumableFileDownload pause = fileDownload.pause();
assertThat(pause.downloadFileRequest()).isEqualTo(request);
assertThat(pause.bytesTransferred()).isEqualTo(1000L);
assertThat(pause.bytesTransferred()).isEqualTo(file.length());
assertThat(pause.lastModified()).isEqualTo(sdkResponse.lastModified());
assertThat(pause.transferSizeInBytes()).hasValue(sdkResponse.contentLength());
}
Expand All @@ -87,7 +111,8 @@ void pause_transferAlreadyFinished_shouldReturnNormally() {
transferProgress,
getDownloadFileRequest());
ResumableFileDownload resumableFileDownload = fileDownload.pause();
assertThat(resumableFileDownload.bytesTransferred()).isEqualTo(resumableFileDownload.transferSizeInBytes().get());
assertThat(resumableFileDownload.bytesTransferred()).isEqualTo(file.length());
assertThat(resumableFileDownload.transferSizeInBytes()).hasValue(OBJECT_CONTENT_LENGTH);
}

@Test
Expand All @@ -113,9 +138,8 @@ void pauseTwice_shouldReturnTheSame() {

private DownloadFileRequest getDownloadFileRequest() {
return DownloadFileRequest.builder()
.destination(Paths.get("."))
.destination(file)
.getObjectRequest(GetObjectRequest.builder().key("KEY").bucket("BUCKET").build())

.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ rootLogger.appenderRef.stdout.ref = ConsoleAppender
#logger.sdk.name = software.amazon.awssdk
#logger.sdk.level = debug
#
#logger.tm.name = software.amazon.awssdk.transfer.s3
#logger.tm.level = debug
#
#logger.request.name = software.amazon.awssdk.request
#logger.request.level = debug
#
Expand Down

0 comments on commit 9a4a68e

Please sign in to comment.