diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/CompletableConcatWithCompletable.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/CompletableConcatWithCompletable.java index f4192ee5f6..c26e60b250 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/CompletableConcatWithCompletable.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/CompletableConcatWithCompletable.java @@ -19,7 +19,6 @@ import io.servicetalk.concurrent.internal.SequentialCancellable; import io.servicetalk.concurrent.internal.SignalOffloader; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import javax.annotation.Nullable; import static java.util.Objects.requireNonNull; @@ -63,14 +62,11 @@ protected void handleSubscribe(Subscriber subscriber, SignalOffloader offloader, } private static final class ConcatWithSubscriber implements Subscriber { - private static final AtomicIntegerFieldUpdater subscribedToNextUpdater = - AtomicIntegerFieldUpdater.newUpdater(ConcatWithSubscriber.class, "subscribedToNext"); private final Subscriber target; private final Completable next; @Nullable private SequentialCancellable sequentialCancellable; - @SuppressWarnings("unused") - private volatile int subscribedToNext; + private boolean nextSubscribed; ConcatWithSubscriber(Subscriber target, Completable next) { this.target = target; @@ -89,7 +85,10 @@ public void onSubscribe(Cancellable cancellable) { @Override public void onComplete() { - if (subscribedToNextUpdater.compareAndSet(this, 0, 1)) { + if (nextSubscribed) { + target.onComplete(); + } else { + nextSubscribed = true; // Do not use the same SignalOffloader as used for original as that may cause deadlock. // Using a regular subscribe helps us to inherit the threading model for this next source. However, // since we always offload the original Subscriber (in handleSubscribe above) we are assured that this @@ -98,8 +97,6 @@ public void onComplete() { // This is an asynchronous boundary, and so we should recapture the AsyncContext instead of propagating // it. next.subscribeInternal(this); - } else { - target.onComplete(); } }