Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Guard against re-subscription in SplittingPublisher #4253

Merged
merged 3 commits into from
Aug 3, 2023
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.core.exception.NonRetryableException;
import software.amazon.awssdk.core.internal.util.NoopSubscription;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.Validate;
import software.amazon.awssdk.utils.async.SimplePublisher;
Expand All @@ -48,8 +50,8 @@ public class SplittingPublisher implements SdkPublisher<AsyncRequestBody> {
private final long bufferSizeInBytes;

private SplittingPublisher(Builder builder) {
this.upstreamPublisher = Validate.paramNotNull(builder.asyncRequestBody, "asyncRequestBody");
this.chunkSizeInBytes = builder.chunkSizeInBytes == null ? DEFAULT_CHUNK_SIZE : builder.chunkSizeInBytes;
this.upstreamPublisher = Validate.paramNotNull(builder.asyncRequestBody, "asyncRequestBody");
this.chunkSizeInBytes = builder.chunkSizeInBytes == null ? DEFAULT_CHUNK_SIZE : builder.chunkSizeInBytes;
this.bufferSizeInBytes = builder.bufferSizeInBytes == null ? DEFAULT_BUFFER_SIZE : builder.bufferSizeInBytes;
this.splittingSubscriber = new SplittingSubscriber(upstreamPublisher.contentLength().orElse(null));

Expand Down Expand Up @@ -234,13 +236,14 @@ private Long totalDataRemaining() {
private final class DownstreamBody implements AsyncRequestBody {

/**
* The maximum length of the content this AsyncRequestBody can hold.
* If the upstream content length is known, this is the same as totalLength
* The maximum length of the content this AsyncRequestBody can hold. If the upstream content length is known, this is
* the same as totalLength
*/
private final long maxLength;
private final Long totalLength;
private final SimplePublisher<ByteBuffer> delegate = new SimplePublisher<>();
private final int chunkNumber;
private final AtomicBoolean subscribeCalled = new AtomicBoolean(false);
private volatile long transferredLength = 0;

private DownstreamBody(boolean contentLengthKnown, long maxLength, int chunkNumber) {
Expand Down Expand Up @@ -282,7 +285,15 @@ public void error(Throwable error) {

@Override
public void subscribe(Subscriber<? super ByteBuffer> s) {
delegate.subscribe(s);
if (subscribeCalled.compareAndSet(false, true)) {
delegate.subscribe(s);
} else {
s.onSubscribe(new NoopSubscription(s));
s.onError(NonRetryableException.create(
"A retry was attempted, but AsyncRequestBody.split does not "
+ "support retries. Consider using AsyncRequestBody.fromInputStream with an "
+ "input stream that supports mark/reset to get retry support."));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using AsyncRequestBody.fromInputStream with an input stream that supports mark/reset to get retry support."

Maybe delete the last sentence since it's not applicable here?

}
}

private void addDataBuffered(int length) {
Expand All @@ -293,7 +304,7 @@ private void addDataBuffered(int length) {
}
}
}

public static final class Builder {
private AsyncRequestBody asyncRequestBody;
private Long chunkSizeInBytes;
Expand Down
Loading