From bb3f8a6bedd3eb1aca43b7579f6debd7d519deec Mon Sep 17 00:00:00 2001 From: Alex Woods Date: Thu, 13 Nov 2025 09:48:16 -0800 Subject: [PATCH 1/6] Fix bug in FileAsyncRequestBody where inflight parts were negative --- .../next-release/bugfix-AmazonS3-4025803.json | 6 ++++ .../FileAsyncRequestBodySplitHelper.java | 35 +++++++++++-------- .../FileAsyncRequestBodySplitHelperTest.java | 5 +-- .../async/SplittingPublisherTestUtils.java | 2 +- 4 files changed, 31 insertions(+), 17 deletions(-) create mode 100644 .changes/next-release/bugfix-AmazonS3-4025803.json diff --git a/.changes/next-release/bugfix-AmazonS3-4025803.json b/.changes/next-release/bugfix-AmazonS3-4025803.json new file mode 100644 index 000000000000..cb068c21f8a1 --- /dev/null +++ b/.changes/next-release/bugfix-AmazonS3-4025803.json @@ -0,0 +1,6 @@ +{ + "type": "bugfix", + "category": "Amazon S3", + "contributor": "", + "description": "Fix bug in S3 Multipart uploads with FileAsyncRequestBody - ensure that concurrency is limited correctly by bufferSizeInBytes" +} diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java index 6a9831ed7261..cb1a3340f9a7 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java @@ -18,7 +18,10 @@ import java.nio.ByteBuffer; import java.nio.file.Path; import java.nio.file.attribute.FileTime; +import java.util.Collections; +import java.util.HashSet; import java.util.Optional; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -52,7 +55,7 @@ public final class FileAsyncRequestBodySplitHelper { private volatile boolean isDone = false; - private AtomicInteger numAsyncRequestBodiesInFlight = new AtomicInteger(0); + private Set requestBodyStartPositionsInFlight = Collections.synchronizedSet(new HashSet<>()); private AtomicInteger chunkIndex = new AtomicInteger(0); private final FileTime modifiedTimeAtStart; private final long sizeAtStart; @@ -106,9 +109,10 @@ private void sendAsyncRequestBody(SimplePublisher simplePublis private void doSendAsyncRequestBody(SimplePublisher simplePublisher) { while (shouldSendMore()) { - AsyncRequestBody currentAsyncRequestBody = newFileAsyncRequestBody(simplePublisher); + long position = chunkSize * chunkIndex.getAndIncrement(); + AsyncRequestBody currentAsyncRequestBody = newFileAsyncRequestBody(position, simplePublisher); simplePublisher.send(currentAsyncRequestBody); - numAsyncRequestBodiesInFlight.incrementAndGet(); + requestBodyStartPositionsInFlight.add(position); checkCompletion(simplePublisher, currentAsyncRequestBody); } } @@ -126,13 +130,12 @@ private void checkCompletion(SimplePublisher simplePublisher, } } - private void startNextRequestBody(SimplePublisher simplePublisher) { - numAsyncRequestBodiesInFlight.decrementAndGet(); + private void startNextRequestBody(SimplePublisher simplePublisher, long completedPosition) { + requestBodyStartPositionsInFlight.remove(completedPosition); sendAsyncRequestBody(simplePublisher); } - private AsyncRequestBody newFileAsyncRequestBody(SimplePublisher simplePublisher) { - long position = chunkSize * chunkIndex.getAndIncrement(); + private AsyncRequestBody newFileAsyncRequestBody(long position, SimplePublisher simplePublisher) { long numBytesToReadForThisChunk = Math.min(totalContentLength - position, chunkSize); FileAsyncRequestBody fileAsyncRequestBody = FileAsyncRequestBody.builder() .path(path) @@ -142,7 +145,7 @@ private AsyncRequestBody newFileAsyncRequestBody(SimplePublisher simplePublisher; + private final long position; FileAsyncRequestBodyWrapper(FileAsyncRequestBody fileAsyncRequestBody, - SimplePublisher simplePublisher) { + SimplePublisher simplePublisher, long position) { this.fileAsyncRequestBody = fileAsyncRequestBody; this.simplePublisher = simplePublisher; + this.position = position; } @Override public void subscribe(Subscriber s) { - fileAsyncRequestBody.doAfterOnComplete(() -> startNextRequestBody(simplePublisher)) + fileAsyncRequestBody.doAfterOnComplete(() -> startNextRequestBody(simplePublisher, position)) // The reason we still need to call startNextRequestBody when the subscription is // cancelled is that upstream could cancel the subscription even though the stream has // finished successfully before onComplete. If this happens, doAfterOnComplete callback // will never be invoked, and if the current buffer is full, the publisher will stop // sending new FileAsyncRequestBody, leading to uncompleted future. - .doAfterOnCancel(() -> startNextRequestBody(simplePublisher)) + .doAfterOnCancel(() -> { + startNextRequestBody(simplePublisher, position); + }) .subscribe(s); } diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelperTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelperTest.java index 1edea1a58b1b..26aa8001f220 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelperTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelperTest.java @@ -87,12 +87,13 @@ public void split_differentChunkSize_shouldSplitCorrectly(int chunkSize) throws private static Runnable verifyConcurrentRequests(FileAsyncRequestBodySplitHelper helper, AtomicInteger maxConcurrency) { return () -> { - int concurrency = helper.numAsyncRequestBodiesInFlight().get(); + int concurrency = helper.numAsyncRequestBodiesInFlight(); if (concurrency > maxConcurrency.get()) { maxConcurrency.set(concurrency); } - assertThat(helper.numAsyncRequestBodiesInFlight()).hasValueLessThan(10); + assertThat(concurrency).isLessThan(10); + assertThat(concurrency).isGreaterThanOrEqualTo(0); }; } } diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTestUtils.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTestUtils.java index 64a9dc296525..328b86fb6ec4 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTestUtils.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTestUtils.java @@ -33,6 +33,7 @@ public static void verifyIndividualAsyncRequestBody(SdkPublisher> futures = new ArrayList<>(); publisher.subscribe(requestBody -> { CompletableFuture baosFuture = new CompletableFuture<>(); @@ -47,7 +48,6 @@ public static void verifyIndividualAsyncRequestBody(SdkPublisher Date: Thu, 13 Nov 2025 09:55:17 -0800 Subject: [PATCH 2/6] cleanups --- .../core/internal/async/FileAsyncRequestBodySplitHelper.java | 4 +--- .../core/internal/async/SplittingPublisherTestUtils.java | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java index cb1a3340f9a7..6d967038fcaa 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java @@ -186,9 +186,7 @@ public void subscribe(Subscriber s) { // finished successfully before onComplete. If this happens, doAfterOnComplete callback // will never be invoked, and if the current buffer is full, the publisher will stop // sending new FileAsyncRequestBody, leading to uncompleted future. - .doAfterOnCancel(() -> { - startNextRequestBody(simplePublisher, position); - }) + .doAfterOnCancel(() -> startNextRequestBody(simplePublisher, position)) .subscribe(s); } diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTestUtils.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTestUtils.java index 328b86fb6ec4..64a9dc296525 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTestUtils.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTestUtils.java @@ -33,7 +33,6 @@ public static void verifyIndividualAsyncRequestBody(SdkPublisher> futures = new ArrayList<>(); publisher.subscribe(requestBody -> { CompletableFuture baosFuture = new CompletableFuture<>(); @@ -48,6 +47,7 @@ public static void verifyIndividualAsyncRequestBody(SdkPublisher Date: Thu, 13 Nov 2025 10:34:26 -0800 Subject: [PATCH 3/6] Update test --- .../internal/async/FileAsyncRequestBodySplitHelperTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelperTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelperTest.java index 26aa8001f220..fde8844c82c4 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelperTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelperTest.java @@ -92,8 +92,8 @@ private static Runnable verifyConcurrentRequests(FileAsyncRequestBodySplitHelper if (concurrency > maxConcurrency.get()) { maxConcurrency.set(concurrency); } - assertThat(concurrency).isLessThan(10); - assertThat(concurrency).isGreaterThanOrEqualTo(0); + assertThat(helper.numAsyncRequestBodiesInFlight()).isLessThan(10); + assertThat(helper.numAsyncRequestBodiesInFlight()).isGreaterThanOrEqualTo(0); }; } } From f220dea4936f91d5cb7a0e26b5956276eddb6448 Mon Sep 17 00:00:00 2001 From: Alex Woods Date: Thu, 13 Nov 2025 11:24:16 -0800 Subject: [PATCH 4/6] Alternative approach --- .../FileAsyncRequestBodySplitHelper.java | 43 +++++++++++-------- .../FileAsyncRequestBodySplitHelperTest.java | 5 +-- 2 files changed, 26 insertions(+), 22 deletions(-) diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java index 6d967038fcaa..32de02d4722e 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java @@ -18,10 +18,7 @@ import java.nio.ByteBuffer; import java.nio.file.Path; import java.nio.file.attribute.FileTime; -import java.util.Collections; -import java.util.HashSet; import java.util.Optional; -import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -55,7 +52,7 @@ public final class FileAsyncRequestBodySplitHelper { private volatile boolean isDone = false; - private Set requestBodyStartPositionsInFlight = Collections.synchronizedSet(new HashSet<>()); + private AtomicInteger numAsyncRequestBodiesInFlight = new AtomicInteger(0); private AtomicInteger chunkIndex = new AtomicInteger(0); private final FileTime modifiedTimeAtStart; private final long sizeAtStart; @@ -109,10 +106,9 @@ private void sendAsyncRequestBody(SimplePublisher simplePublis private void doSendAsyncRequestBody(SimplePublisher simplePublisher) { while (shouldSendMore()) { - long position = chunkSize * chunkIndex.getAndIncrement(); - AsyncRequestBody currentAsyncRequestBody = newFileAsyncRequestBody(position, simplePublisher); + AsyncRequestBody currentAsyncRequestBody = newFileAsyncRequestBody(simplePublisher); simplePublisher.send(currentAsyncRequestBody); - requestBodyStartPositionsInFlight.add(position); + numAsyncRequestBodiesInFlight.incrementAndGet(); checkCompletion(simplePublisher, currentAsyncRequestBody); } } @@ -130,12 +126,16 @@ private void checkCompletion(SimplePublisher simplePublisher, } } - private void startNextRequestBody(SimplePublisher simplePublisher, long completedPosition) { - requestBodyStartPositionsInFlight.remove(completedPosition); + private void startNextRequestBody(SimplePublisher simplePublisher) { + int d = numAsyncRequestBodiesInFlight.decrementAndGet(); + if (d < 0) { + throw new RuntimeException("Unexpected error occurred. numAsyncRequestBodiesInFlight is negative: " + d); + } sendAsyncRequestBody(simplePublisher); } - private AsyncRequestBody newFileAsyncRequestBody(long position, SimplePublisher simplePublisher) { + private AsyncRequestBody newFileAsyncRequestBody(SimplePublisher simplePublisher) { + long position = chunkSize * chunkIndex.getAndIncrement(); long numBytesToReadForThisChunk = Math.min(totalContentLength - position, chunkSize); FileAsyncRequestBody fileAsyncRequestBody = FileAsyncRequestBody.builder() .path(path) @@ -145,7 +145,7 @@ private AsyncRequestBody newFileAsyncRequestBody(long position, SimplePublisher< .modifiedTimeAtStart(modifiedTimeAtStart) .sizeAtStart(sizeAtStart) .build(); - return new FileAsyncRequestBodyWrapper(fileAsyncRequestBody, simplePublisher, position); + return new FileAsyncRequestBodyWrapper(fileAsyncRequestBody, simplePublisher); } /** @@ -156,40 +156,45 @@ private boolean shouldSendMore() { return false; } - long currentUsedBuffer = (long) requestBodyStartPositionsInFlight.size() * bufferPerAsyncRequestBody; + long currentUsedBuffer = (long) numAsyncRequestBodiesInFlight.get() * bufferPerAsyncRequestBody; return currentUsedBuffer + bufferPerAsyncRequestBody <= totalBufferSize; } @SdkTestInternalApi - int numAsyncRequestBodiesInFlight() { - return requestBodyStartPositionsInFlight.size(); + AtomicInteger numAsyncRequestBodiesInFlight() { + return numAsyncRequestBodiesInFlight; } private final class FileAsyncRequestBodyWrapper implements AsyncRequestBody { private final FileAsyncRequestBody fileAsyncRequestBody; private final SimplePublisher simplePublisher; - private final long position; + private final AtomicBoolean isDone = new AtomicBoolean(false); FileAsyncRequestBodyWrapper(FileAsyncRequestBody fileAsyncRequestBody, - SimplePublisher simplePublisher, long position) { + SimplePublisher simplePublisher) { this.fileAsyncRequestBody = fileAsyncRequestBody; this.simplePublisher = simplePublisher; - this.position = position; } @Override public void subscribe(Subscriber s) { - fileAsyncRequestBody.doAfterOnComplete(() -> startNextRequestBody(simplePublisher, position)) + fileAsyncRequestBody.doAfterOnComplete(this::startNextIfNeeded) // The reason we still need to call startNextRequestBody when the subscription is // cancelled is that upstream could cancel the subscription even though the stream has // finished successfully before onComplete. If this happens, doAfterOnComplete callback // will never be invoked, and if the current buffer is full, the publisher will stop // sending new FileAsyncRequestBody, leading to uncompleted future. - .doAfterOnCancel(() -> startNextRequestBody(simplePublisher, position)) + .doAfterOnCancel(this::startNextIfNeeded) .subscribe(s); } + private void startNextIfNeeded() { + if (isDone.compareAndSet(false, true)) { + startNextRequestBody(simplePublisher); + } + } + @Override public Optional contentLength() { return fileAsyncRequestBody.contentLength(); diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelperTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelperTest.java index fde8844c82c4..514c549c3594 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelperTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelperTest.java @@ -87,13 +87,12 @@ public void split_differentChunkSize_shouldSplitCorrectly(int chunkSize) throws private static Runnable verifyConcurrentRequests(FileAsyncRequestBodySplitHelper helper, AtomicInteger maxConcurrency) { return () -> { - int concurrency = helper.numAsyncRequestBodiesInFlight(); + int concurrency = helper.numAsyncRequestBodiesInFlight().get(); if (concurrency > maxConcurrency.get()) { maxConcurrency.set(concurrency); } - assertThat(helper.numAsyncRequestBodiesInFlight()).isLessThan(10); - assertThat(helper.numAsyncRequestBodiesInFlight()).isGreaterThanOrEqualTo(0); + assertThat(helper.numAsyncRequestBodiesInFlight()).hasValueBetween(0,10); }; } } From 353fb0d0fa4b83aa5e3bd1e605b2425f0bf12e9c Mon Sep 17 00:00:00 2001 From: Alex Woods Date: Thu, 13 Nov 2025 12:17:13 -0800 Subject: [PATCH 5/6] Cleanup/rename --- .../internal/async/FileAsyncRequestBodySplitHelper.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java index 32de02d4722e..dec0d2da6085 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java @@ -128,9 +128,6 @@ private void checkCompletion(SimplePublisher simplePublisher, private void startNextRequestBody(SimplePublisher simplePublisher) { int d = numAsyncRequestBodiesInFlight.decrementAndGet(); - if (d < 0) { - throw new RuntimeException("Unexpected error occurred. numAsyncRequestBodiesInFlight is negative: " + d); - } sendAsyncRequestBody(simplePublisher); } @@ -169,7 +166,7 @@ private final class FileAsyncRequestBodyWrapper implements AsyncRequestBody { private final FileAsyncRequestBody fileAsyncRequestBody; private final SimplePublisher simplePublisher; - private final AtomicBoolean isDone = new AtomicBoolean(false); + private final AtomicBoolean hasCompleted = new AtomicBoolean(false); FileAsyncRequestBodyWrapper(FileAsyncRequestBody fileAsyncRequestBody, SimplePublisher simplePublisher) { @@ -190,7 +187,7 @@ public void subscribe(Subscriber s) { } private void startNextIfNeeded() { - if (isDone.compareAndSet(false, true)) { + if (hasCompleted.compareAndSet(false, true)) { startNextRequestBody(simplePublisher); } } From 68feb19759054e11d5e8ccc62e5f390f481ec468 Mon Sep 17 00:00:00 2001 From: Alex Woods Date: Thu, 13 Nov 2025 12:36:43 -0800 Subject: [PATCH 6/6] Fix spotbugs --- .../core/internal/async/FileAsyncRequestBodySplitHelper.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java index dec0d2da6085..7de0add5613c 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java @@ -127,7 +127,7 @@ private void checkCompletion(SimplePublisher simplePublisher, } private void startNextRequestBody(SimplePublisher simplePublisher) { - int d = numAsyncRequestBodiesInFlight.decrementAndGet(); + numAsyncRequestBodiesInFlight.decrementAndGet(); sendAsyncRequestBody(simplePublisher); }