Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-AmazonS3-4025803.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "Amazon S3",
"contributor": "",
"description": "Fix bug in S3 Multipart uploads with FileAsyncRequestBody - ensure that concurrency is limited correctly by bufferSizeInBytes"
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ private final class FileAsyncRequestBodyWrapper implements AsyncRequestBody {

private final FileAsyncRequestBody fileAsyncRequestBody;
private final SimplePublisher<AsyncRequestBody> simplePublisher;
private final AtomicBoolean hasCompleted = new AtomicBoolean(false);

FileAsyncRequestBodyWrapper(FileAsyncRequestBody fileAsyncRequestBody,
SimplePublisher<AsyncRequestBody> simplePublisher) {
Expand All @@ -175,16 +176,22 @@ private final class FileAsyncRequestBodyWrapper implements AsyncRequestBody {

@Override
public void subscribe(Subscriber<? super ByteBuffer> s) {
fileAsyncRequestBody.doAfterOnComplete(() -> startNextRequestBody(simplePublisher))
fileAsyncRequestBody.doAfterOnComplete(this::startNextIfNeeded)
// The reason we still need to call startNextRequestBody when the subscription is
// cancelled is that upstream could cancel the subscription even though the stream has
// finished successfully before onComplete. If this happens, doAfterOnComplete callback
// will never be invoked, and if the current buffer is full, the publisher will stop
// sending new FileAsyncRequestBody, leading to uncompleted future.
.doAfterOnCancel(() -> startNextRequestBody(simplePublisher))
.doAfterOnCancel(this::startNextIfNeeded)
.subscribe(s);
}

private void startNextIfNeeded() {
if (hasCompleted.compareAndSet(false, true)) {
startNextRequestBody(simplePublisher);
}
}

@Override
public Optional<Long> contentLength() {
return fileAsyncRequestBody.contentLength();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ private static Runnable verifyConcurrentRequests(FileAsyncRequestBodySplitHelper
if (concurrency > maxConcurrency.get()) {
maxConcurrency.set(concurrency);
}
assertThat(helper.numAsyncRequestBodiesInFlight()).hasValueLessThan(10);
assertThat(helper.numAsyncRequestBodiesInFlight()).hasValueBetween(0,10);
};
}
}
Loading