diff --git a/.changes/next-release/bugfix-AmazonS3-4025803.json b/.changes/next-release/bugfix-AmazonS3-4025803.json new file mode 100644 index 00000000000..cb068c21f8a --- /dev/null +++ b/.changes/next-release/bugfix-AmazonS3-4025803.json @@ -0,0 +1,6 @@ +{ + "type": "bugfix", + "category": "Amazon S3", + "contributor": "", + "description": "Fix bug in S3 Multipart uploads with FileAsyncRequestBody - ensure that concurrency is limited correctly by bufferSizeInBytes" +} diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java index 6a9831ed726..7de0add5613 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java @@ -166,6 +166,7 @@ private final class FileAsyncRequestBodyWrapper implements AsyncRequestBody { private final FileAsyncRequestBody fileAsyncRequestBody; private final SimplePublisher simplePublisher; + private final AtomicBoolean hasCompleted = new AtomicBoolean(false); FileAsyncRequestBodyWrapper(FileAsyncRequestBody fileAsyncRequestBody, SimplePublisher simplePublisher) { @@ -175,16 +176,22 @@ private final class FileAsyncRequestBodyWrapper implements AsyncRequestBody { @Override public void subscribe(Subscriber s) { - fileAsyncRequestBody.doAfterOnComplete(() -> startNextRequestBody(simplePublisher)) + fileAsyncRequestBody.doAfterOnComplete(this::startNextIfNeeded) // The reason we still need to call startNextRequestBody when the subscription is // cancelled is that upstream could cancel the subscription even though the stream has // finished successfully before onComplete. If this happens, doAfterOnComplete callback // will never be invoked, and if the current buffer is full, the publisher will stop // sending new FileAsyncRequestBody, leading to uncompleted future. - .doAfterOnCancel(() -> startNextRequestBody(simplePublisher)) + .doAfterOnCancel(this::startNextIfNeeded) .subscribe(s); } + private void startNextIfNeeded() { + if (hasCompleted.compareAndSet(false, true)) { + startNextRequestBody(simplePublisher); + } + } + @Override public Optional contentLength() { return fileAsyncRequestBody.contentLength(); diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelperTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelperTest.java index 1edea1a58b1..514c549c359 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelperTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelperTest.java @@ -92,7 +92,7 @@ private static Runnable verifyConcurrentRequests(FileAsyncRequestBodySplitHelper if (concurrency > maxConcurrency.get()) { maxConcurrency.set(concurrency); } - assertThat(helper.numAsyncRequestBodiesInFlight()).hasValueLessThan(10); + assertThat(helper.numAsyncRequestBodiesInFlight()).hasValueBetween(0,10); }; } }