Skip to content

Commit

Permalink
Guard against re-subscription in SplittingPublisher (#4253)
Browse files Browse the repository at this point in the history
* guard against re-subscription in SplittingPublisher

* fix checkstyle

* Error msg
  • Loading branch information
L-Applin committed Aug 3, 2023
1 parent 28c126d commit 85a1fd7
Showing 1 changed file with 16 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.core.exception.NonRetryableException;
import software.amazon.awssdk.core.internal.util.NoopSubscription;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.Validate;
import software.amazon.awssdk.utils.async.SimplePublisher;
Expand All @@ -48,8 +50,8 @@ public class SplittingPublisher implements SdkPublisher<AsyncRequestBody> {
private final long bufferSizeInBytes;

private SplittingPublisher(Builder builder) {
this.upstreamPublisher = Validate.paramNotNull(builder.asyncRequestBody, "asyncRequestBody");
this.chunkSizeInBytes = builder.chunkSizeInBytes == null ? DEFAULT_CHUNK_SIZE : builder.chunkSizeInBytes;
this.upstreamPublisher = Validate.paramNotNull(builder.asyncRequestBody, "asyncRequestBody");
this.chunkSizeInBytes = builder.chunkSizeInBytes == null ? DEFAULT_CHUNK_SIZE : builder.chunkSizeInBytes;
this.bufferSizeInBytes = builder.bufferSizeInBytes == null ? DEFAULT_BUFFER_SIZE : builder.bufferSizeInBytes;
this.splittingSubscriber = new SplittingSubscriber(upstreamPublisher.contentLength().orElse(null));

Expand Down Expand Up @@ -234,13 +236,14 @@ private Long totalDataRemaining() {
private final class DownstreamBody implements AsyncRequestBody {

/**
* The maximum length of the content this AsyncRequestBody can hold.
* If the upstream content length is known, this is the same as totalLength
* The maximum length of the content this AsyncRequestBody can hold. If the upstream content length is known, this is
* the same as totalLength
*/
private final long maxLength;
private final Long totalLength;
private final SimplePublisher<ByteBuffer> delegate = new SimplePublisher<>();
private final int chunkNumber;
private final AtomicBoolean subscribeCalled = new AtomicBoolean(false);
private volatile long transferredLength = 0;

private DownstreamBody(boolean contentLengthKnown, long maxLength, int chunkNumber) {
Expand Down Expand Up @@ -282,7 +285,14 @@ public void error(Throwable error) {

@Override
public void subscribe(Subscriber<? super ByteBuffer> s) {
delegate.subscribe(s);
if (subscribeCalled.compareAndSet(false, true)) {
delegate.subscribe(s);
} else {
s.onSubscribe(new NoopSubscription(s));
s.onError(NonRetryableException.create(
"A retry was attempted, but AsyncRequestBody.split does not "
+ "support retries."));
}
}

private void addDataBuffered(int length) {
Expand All @@ -293,7 +303,7 @@ private void addDataBuffered(int length) {
}
}
}

public static final class Builder {
private AsyncRequestBody asyncRequestBody;
private Long chunkSizeInBytes;
Expand Down

0 comments on commit 85a1fd7

Please sign in to comment.