Skip to content

Commit

Permalink
Refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
zoewangg committed Jul 26, 2023
1 parent 9b661d8 commit 45ced32
Show file tree
Hide file tree
Showing 13 changed files with 435 additions and 409 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
Expand Down Expand Up @@ -420,24 +419,20 @@ static AsyncRequestBody empty() {
* @param maxMemoryUsageInBytes the max memory the SDK will use to buffer the content
* @return SplitAsyncRequestBodyResult
*/
default SplitAsyncRequestBodyResponse split(long chunkSizeInBytes, long maxMemoryUsageInBytes) {
default SdkPublisher<AsyncRequestBody> split(long chunkSizeInBytes, long maxMemoryUsageInBytes) {
Validate.isPositive(chunkSizeInBytes, "chunkSizeInBytes");
Validate.isPositive(maxMemoryUsageInBytes, "maxMemoryUsageInBytes");

if (!this.contentLength().isPresent()) {
if (!contentLength().isPresent()) {
Validate.isTrue(maxMemoryUsageInBytes >= chunkSizeInBytes,
"maxMemoryUsageInBytes must be larger than or equal to " +
"chunkSizeInBytes if the content length is unknown");
}

CompletableFuture<Void> future = new CompletableFuture<>();
SplittingPublisher splittingPublisher = SplittingPublisher.builder()
.asyncRequestBody(this)
.chunkSizeInBytes(chunkSizeInBytes)
.maxMemoryUsageInBytes(maxMemoryUsageInBytes)
.resultFuture(future)
.build();

return SplitAsyncRequestBodyResponse.create(splittingPublisher, future);
return SplittingPublisher.builder()
.asyncRequestBody(this)
.chunkSizeInBytes(chunkSizeInBytes)
.maxMemoryUsageInBytes(maxMemoryUsageInBytes)
.build();
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -45,24 +44,12 @@ public class SplittingPublisher implements SdkPublisher<AsyncRequestBody> {
private final SimplePublisher<AsyncRequestBody> downstreamPublisher = new SimplePublisher<>();
private final long chunkSizeInBytes;
private final long maxMemoryUsageInBytes;
private final CompletableFuture<Void> future;

private SplittingPublisher(Builder builder) {
this.upstreamPublisher = Validate.paramNotNull(builder.asyncRequestBody, "asyncRequestBody");
this.chunkSizeInBytes = Validate.isPositive(builder.chunkSizeInBytes, "chunkSizeInBytes");
this.splittingSubscriber = new SplittingSubscriber(upstreamPublisher.contentLength().orElse(null));
this.maxMemoryUsageInBytes = Validate.isPositive(builder.maxMemoryUsageInBytes, "maxMemoryUsageInBytes");
this.future = builder.future;

// We need to cancel upstream subscription if the future gets cancelled.
future.whenComplete((r, t) -> {
if (t != null) {
if (splittingSubscriber.upstreamSubscription != null) {
log.trace(() -> "Cancelling subscription because return future completed exceptionally ", t);
splittingSubscriber.upstreamSubscription.cancel();
}
}
});
}

public static Builder builder() {
Expand Down Expand Up @@ -117,16 +104,20 @@ public void onNext(ByteBuffer byteBuffer) {
byteBufferSizeHint = byteBuffer.remaining();

while (true) {

if (!byteBuffer.hasRemaining()) {
break;
}

int amountRemainingInChunk = amountRemainingInChunk();

// If we have fulfilled this chunk,
// complete the current body
if (amountRemainingInChunk == 0) {
completeCurrentBodyAndCreateNewIfNeeded(byteBuffer);
amountRemainingInChunk = amountRemainingInChunk();
}

amountRemainingInChunk = amountRemainingInChunk();

// If the current ByteBuffer < this chunk, send it as-is
if (amountRemainingInChunk > byteBuffer.remaining()) {
currentBody.send(byteBuffer.duplicate());
Expand Down Expand Up @@ -154,29 +145,28 @@ public void onNext(ByteBuffer byteBuffer) {

private void completeCurrentBodyAndCreateNewIfNeeded(ByteBuffer byteBuffer) {
completeCurrentBody();
int currentChunk = chunkNumber.incrementAndGet();
boolean shouldCreateNewDownstreamRequestBody;
Long dataRemaining = totalDataRemaining();

if (shouldCreateNewDownstreamRequestBody(byteBuffer)) {
int currentChunk = chunkNumber.incrementAndGet();
long chunkSize = calculateChunkSize(totalDataRemaining());
currentBody = initializeNextDownstreamBody(upstreamSize != null, chunkSize, currentChunk);
if (upstreamSize == null) {
shouldCreateNewDownstreamRequestBody = !upstreamComplete || byteBuffer.hasRemaining();
} else {
shouldCreateNewDownstreamRequestBody = dataRemaining != null && dataRemaining > 0;
}
}


/**
* If content length is known, we should create new DownstreamRequestBody if there's remaining data.
* If content length is unknown, we should create new DownstreamRequestBody if upstream is not completed yet.
*/
private boolean shouldCreateNewDownstreamRequestBody(ByteBuffer byteBuffer) {
return !upstreamComplete || byteBuffer.remaining() > 0;
if (shouldCreateNewDownstreamRequestBody) {
long chunkSize = calculateChunkSize(dataRemaining);
currentBody = initializeNextDownstreamBody(upstreamSize != null, chunkSize, currentChunk);
}
}

private int amountRemainingInChunk() {
return Math.toIntExact(currentBody.maxLength - currentBody.transferredLength);
}

private void completeCurrentBody() {
log.debug(() -> "completeCurrentBody");
log.debug(() -> "completeCurrentBody for chunk " + chunkNumber.get());
currentBody.complete();
if (upstreamSize == null) {
sendCurrentBody(currentBody);
Expand All @@ -188,16 +178,16 @@ public void onComplete() {
upstreamComplete = true;
log.trace(() -> "Received onComplete()");
completeCurrentBody();
downstreamPublisher.complete().thenRun(() -> future.complete(null));
downstreamPublisher.complete();
}

@Override
public void onError(Throwable t) {
currentBody.error(t);
log.trace(() -> "Received onError()", t);
downstreamPublisher.error(t);
}

private void sendCurrentBody(AsyncRequestBody body) {
log.debug(() -> "sendCurrentBody");
downstreamPublisher.send(body).exceptionally(t -> {
downstreamPublisher.error(t);
return null;
Expand Down Expand Up @@ -300,7 +290,6 @@ public static final class Builder {
private AsyncRequestBody asyncRequestBody;
private Long chunkSizeInBytes;
private Long maxMemoryUsageInBytes;
private CompletableFuture<Void> future;

/**
* Configures the asyncRequestBody to split
Expand Down Expand Up @@ -339,18 +328,6 @@ public Builder maxMemoryUsageInBytes(long maxMemoryUsageInBytes) {
return this;
}

/**
* Sets the result future. The future will be completed when all request bodies
* have been sent.
*
* @param future The new future value.
* @return This object for method chaining.
*/
public Builder resultFuture(CompletableFuture<Void> future) {
this.future = future;
return this;
}

public SplittingPublisher build() {
return new SplittingPublisher(this);
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.AfterAll;
Expand All @@ -45,7 +42,6 @@
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.testutils.RandomTempFile;
import software.amazon.awssdk.utils.BinaryUtils;

public class SplittingPublisherTest {
Expand Down Expand Up @@ -87,26 +83,6 @@ void differentChunkSize_byteArrayShouldSplitAsyncRequestBodyCorrectly(int chunkS
verifySplitContent(AsyncRequestBody.fromBytes(CONTENT), chunkSize);
}


@Test
void cancelFuture_shouldCancelUpstream() throws IOException {
CompletableFuture<Void> future = new CompletableFuture<>();
TestAsyncRequestBody asyncRequestBody = new TestAsyncRequestBody();
SplittingPublisher splittingPublisher = SplittingPublisher.builder()
.resultFuture(future)
.asyncRequestBody(asyncRequestBody)
.chunkSizeInBytes(CHUNK_SIZE)
.maxMemoryUsageInBytes(10L)
.build();

OnlyRequestOnceSubscriber downstreamSubscriber = new OnlyRequestOnceSubscriber();
splittingPublisher.subscribe(downstreamSubscriber);

future.completeExceptionally(new RuntimeException("test"));
assertThat(asyncRequestBody.cancelled).isTrue();
assertThat(downstreamSubscriber.asyncRequestBodies.size()).isEqualTo(1);
}

@Test
void contentLengthNotPresent_shouldHandle() throws Exception {
CompletableFuture<Void> future = new CompletableFuture<>();
Expand All @@ -117,7 +93,6 @@ public Optional<Long> contentLength() {
}
};
SplittingPublisher splittingPublisher = SplittingPublisher.builder()
.resultFuture(future)
.asyncRequestBody(asyncRequestBody)
.chunkSizeInBytes(CHUNK_SIZE)
.maxMemoryUsageInBytes(10L)
Expand Down Expand Up @@ -159,11 +134,8 @@ public Optional<Long> contentLength() {


private static void verifySplitContent(AsyncRequestBody asyncRequestBody, int chunkSize) throws Exception {
CompletableFuture<Void> future = new CompletableFuture<>();
SplittingPublisher splittingPublisher = SplittingPublisher.builder()
.resultFuture(future)
.asyncRequestBody(asyncRequestBody)
.resultFuture(future)
.chunkSizeInBytes(chunkSize)
.maxMemoryUsageInBytes((long) chunkSize * 4)
.build();
Expand Down Expand Up @@ -194,7 +166,6 @@ private static void verifySplitContent(AsyncRequestBody asyncRequestBody, int ch
assertThat(actualBytes).isEqualTo(expected);
};
}
assertThat(future).isCompleted();
}

private static class TestAsyncRequestBody implements AsyncRequestBody {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class S3MultipartClientPutObjectIntegrationTest extends S3IntegrationTest

private static final String TEST_BUCKET = temporaryBucketName(S3MultipartClientPutObjectIntegrationTest.class);
private static final String TEST_KEY = "testfile.dat";
private static final int OBJ_SIZE = 31 * 1024 * 1024;
private static final int OBJ_SIZE = 19 * 1024 * 1024;

private static File testFile;
private static S3AsyncClient mpuS3Client;
Expand Down Expand Up @@ -90,7 +90,6 @@ void putObject_fileRequestBody_objectSentCorrectly() throws Exception {
}

@Test
@Timeout(value = 30, unit = SECONDS)
void putObject_byteAsyncRequestBody_objectSentCorrectly() throws Exception {
byte[] bytes = RandomStringUtils.randomAscii(OBJ_SIZE).getBytes(Charset.defaultCharset());
AsyncRequestBody body = AsyncRequestBody.fromBytes(bytes);
Expand All @@ -105,11 +104,9 @@ void putObject_byteAsyncRequestBody_objectSentCorrectly() throws Exception {
}

@Test
@Timeout(value = 30, unit = SECONDS)
void putObject_unknownContentLength_objectSentCorrectly() throws Exception {
AsyncRequestBody body = FileAsyncRequestBody.builder()
.path(testFile.toPath())
//.chunkSizeInBytes(2 * 1024 * 1024)
.build();
mpuS3Client.putObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY), new AsyncRequestBody() {
@Override
Expand Down
Loading

0 comments on commit 45ced32

Please sign in to comment.