Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 41 additions & 33 deletions src/main/java/rx/observables/SyncOnSubscribe.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package rx.observables;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;

import rx.Observable.OnSubscribe;
Expand Down Expand Up @@ -321,14 +320,9 @@ private static class SubscriptionProducer<S, T>
private final SyncOnSubscribe<S, T> parent;
private boolean onNextCalled;
private boolean hasTerminated;

private S state;

volatile int isUnsubscribed;
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<SubscriptionProducer> IS_UNSUBSCRIBED =
AtomicIntegerFieldUpdater.newUpdater(SubscriptionProducer.class, "isUnsubscribed");

private SubscriptionProducer(final Subscriber<? super T> subscriber, SyncOnSubscribe<S, T> parent, S state) {
this.actualSubscriber = subscriber;
this.parent = parent;
Expand All @@ -337,14 +331,39 @@ private SubscriptionProducer(final Subscriber<? super T> subscriber, SyncOnSubsc

@Override
public boolean isUnsubscribed() {
return isUnsubscribed != 0;
return get() < 0L;
}

@Override
public void unsubscribe() {
IS_UNSUBSCRIBED.compareAndSet(this, 0, 1);
if (get() == 0L)
parent.onUnsubscribe(state);
while(true) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a much simpler way of fixing this: undo all these latest changes and request 1!

if (BackpressureUtils.getAndAddRequest(this, 1) == 0) {
    parent.onUnsubscribe(state);
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's interesting. If there is currently a thread processing over the request count loop (i.e. processing nextIteration()) then won't this have to block the unsubscribing thread? The unsubscribing thread would have to poll over this until it got access. I'm not seeing how this would force a thread currently iterating to terminate. If you read through the concurrency I think you will see that it's not really all that complicated and I believe it sufficiently covers the possible race conditions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no blocking. My suggestion follows the queue-drain approach where you queue the terminal flag and drain it in the emission loop. Since the loop first checks for the terminal flag there won't be any wrong emissions and the loop quits. If unsubscribe increments from zero, there isn't and won't be any emission loop after that and the resource can be freed.

There is no need for your complicated solution.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not seeing where you are suggesting setting a terminal flag. You are more than welcome to open your own pull request to demonstrate your fix. My pull request does set a flag for the draining thread to signal when it needs to unsubscribe.

long requestCount = get();
if (compareAndSet(0L, -1L)) {
doUnsubscribe();
return;
}
else if (compareAndSet(requestCount, -2L))
// the loop is iterating concurrently
// need to check if requestCount == -1
// and unsub if so after loop iteration
return;
}
}

private boolean tryUnsubscribe() {
// only one thread at a time can iterate over request count
// therefore the requestCount atomic cannot be decrement concurrently here
// safe to set to -1 atomically (since this check can only be done by 1 thread)
if (hasTerminated || get() < -1) {
set(-1);
doUnsubscribe();
return true;
}
return false;
}

private void doUnsubscribe() {
parent.onUnsubscribe(state);
}

@Override
Expand All @@ -358,71 +377,60 @@ public void request(long n) {
}
}

void fastpath() {
private void fastpath() {
final SyncOnSubscribe<S, T> p = parent;
Subscriber<? super T> a = actualSubscriber;

if (isUnsubscribed()) {
p.onUnsubscribe(state);
return;
}

for (;;) {
try {
onNextCalled = false;
nextIteration(p);
} catch (Throwable ex) {
handleThrownError(p, a, state, ex);
handleThrownError(a, ex);
return;
}
if (hasTerminated || isUnsubscribed()) {
p.onUnsubscribe(state);
if (tryUnsubscribe()) {
return;
}
}
}

private void handleThrownError(final SyncOnSubscribe<S, T> p, Subscriber<? super T> a, S st, Throwable ex) {
private void handleThrownError(Subscriber<? super T> a, Throwable ex) {
if (hasTerminated) {
RxJavaPlugins.getInstance().getErrorHandler().handleError(ex);
} else {
hasTerminated = true;
a.onError(ex);
p.onUnsubscribe(st);
unsubscribe();
}
}

void slowPath(long n) {
private void slowPath(long n) {
final SyncOnSubscribe<S, T> p = parent;
Subscriber<? super T> a = actualSubscriber;
long numRequested = n;
for (;;) {
if (isUnsubscribed()) {
p.onUnsubscribe(state);
return;
}
long numRemaining = numRequested;
do {
try {
onNextCalled = false;
nextIteration(p);
} catch (Throwable ex) {
handleThrownError(p, a, state, ex);
handleThrownError(a, ex);
return;
}
if (hasTerminated || isUnsubscribed()) {
p.onUnsubscribe(state);
if (tryUnsubscribe()) {
return;
}
if (onNextCalled)
numRemaining--;
} while (numRemaining != 0L);

numRequested = addAndGet(-numRequested);
if (numRequested == 0L) {
if (numRequested <= 0L)
break;
}
}
// catches cases where unsubscribe is called before decrementing atomic request count
tryUnsubscribe();
}

private void nextIteration(final SyncOnSubscribe<S, T> parent) {
Expand Down
Loading