Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use lock free strategy for several Subscription implementations #568

Merged
merged 9 commits into from
Dec 8, 2013

Conversation

jloisel
Copy link
Contributor

@jloisel jloisel commented Dec 5, 2013

Reduce contention by using CAS (Compare And Swap) operations to replace
subscription in several subscription implementations.

Reduce contention by using CAS (Compare And Swap) operations to replace
subscription
@cloudbees-pull-request-builder

RxJava-pull-requests #497 SUCCESS
This pull request looks good

@zsxwing
Copy link
Member

zsxwing commented Dec 5, 2013

Can you use 4-space indent to make this modification more clear? It's better that keeping consistent.

if (unsubscribed.get()) {
subscription.unsubscribe();
} else {
reference.getAndSet(subscription == null ? empty() : subscription).unsubscribe();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why need to check subscription == null? I think subscription should not be null.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assume that we have two threads, one is calling unsubscribe, another is calling setSubscription. If the order is line 43 37 38 46, the subscription sent to setSubscription will not be unsubscribed.

@jloisel
Copy link
Contributor Author

jloisel commented Dec 5, 2013

Right, there is a potential race condition. I'm going to try to reproduce it via a unit test and find an alternative.

@jloisel
Copy link
Contributor Author

jloisel commented Dec 5, 2013

I checked nullity because it was checked by the previous code. As no unit test was covering nullity cases, i chose to keep it compatible with null subscription. Do you suggest not to check for nullity ?

@akarnokd
Copy link
Member

akarnokd commented Dec 5, 2013

You need something like this:

public void setSubscription(final Subscription subscription) {
    Subscription q = null;
    do {
        Subscription r = reference.get();
        if (r == SENTINEL) {
            q = newReg;
            break;
        }
        if (reference.compareAndSet(r, subscription)) {
            q = r;
            break;
        }
    } while (true);
    if (q != null) {
        q.unsubscribe();
    }
}

Similar to this.

@jloisel
Copy link
Contributor Author

jloisel commented Dec 5, 2013

What about this version using a read/write lock to minimize contention between concurrent setSubscription() calls ?

@cloudbees-pull-request-builder

RxJava-pull-requests #498 SUCCESS
This pull request looks good

@jloisel
Copy link
Contributor Author

jloisel commented Dec 5, 2013

Thanks to akarnokd, i could get ride of the AtomicBoolean and Read/Write lock using a sentinel.

@cloudbees-pull-request-builder

RxJava-pull-requests #499 SUCCESS
This pull request looks good

@cloudbees-pull-request-builder

RxJava-pull-requests #501 FAILURE
Looks like there's a problem with this pull request

@cloudbees-pull-request-builder

RxJava-pull-requests #502 SUCCESS
This pull request looks good

@cloudbees-pull-request-builder

RxJava-pull-requests #503 SUCCESS
This pull request looks good

@headinthebox
Copy link
Contributor

We should add isUnsubscribed to SerialSubscription

@headinthebox
Copy link
Contributor

Not sure if I understand the role of unsubscribe. It is set to current when you actually unsubscribe (assuming single threaded execution).

s.unsubscribe()
~~>
setSubscription(UNSUBSCRIBED)
~~>
current = reference.get()

current.unsubscribe();
~~~> [subscription == UNSUBSCRIBED]
unsubscribe = current;

Now we do 

```
s.setSubscription(X)
~~~>
current = reference.get();
~~~> [ current == UNSUBSCRIBED, because we unsubscribed above ]
subscription.unsubscribe();
```

and then we do

```
s.getSubscription()
~~~> []
subscription = reference.get()
~~> [ subscription == UNSUBSCRIBED]
return unsubscribe // which was set above to current
```

So we are returning an arbitrary old subscription when you do getSubscription. Seems you can do without it.

```
@Override
public void unsubscribe() {
 setSubscription(UNSUBSCRIBED);
}

public void setSubscription(final Subscription subscription) {
 do {
     final Subscription current = reference.get();
     if (current == UNSUBSCRIBED) {
         subscription.unsubscribe();
         break;
     }
     if (reference.compareAndSet(current, subscription)) {
         current.unsubscribe();
         if(subscription == UNSUBSCRIBED) {
                 unsubscribe = current;
         }
         break;
     }
 } while (true);
}

public Subscription getSubscription() {
     Subscription subscription = reference.get();
         return subscription == UNSUBSCRIBED ? unsubscribe : subscription;
}
```

@akarnokd
Copy link
Member

akarnokd commented Dec 6, 2013

Because we need a way to indicate completion of the subscription and empty is public so clients might swap it in or out, reactivating the subscription. My example matches the Rx.Net way.

@headinthebox
Copy link
Contributor

Don't get what you say. If you are unsubscribed anyway, why not return UNSUBSCRIBED?

@akarnokd
Copy link
Member

akarnokd commented Dec 6, 2013

Sorry, I couldn't follow jloisel's commits due to the heavy rewriting.

The field private volatile Subscription unsubscribe; seems to be useless, one should return Subscriptions.empty() when unsubscribed.

The setSubscription(UNSUBSCRIBED) is an overkill. We could just use reference.getAndSet(UNSUBSCRIBED) and unsubscribe the returned value if not null.

The correct class should look like this:

public class SerialSubscription implements Subscription {
    private final AtomicReference<Subscription> reference = new AtomicReference<Subscription>();

    private static final Subscription UNSUBSCRIBED = new Subscription() {
        @Override
        public void unsubscribe() {
        }
    };

    @Override
    public void unsubscribe() {
        Subscription q = reference.getAndSet(UNSUBSCRIBED);
        if (q != null) {
            q.unsubscribe();
        }
    }

    public void setSubscription(final Subscription subscription) {
        Subscription q = null;
        do {
            final Subscription current = reference.get();
            if (current == UNSUBSCRIBED) {
                q = subscription;
                break;
            }
            if (reference.compareAndSet(current, subscription)) {
                q = current;
                break;
            }
        } while (true);
        if (q != null) {
            q.unsubscribe();
        }
    }

    public Subscription getSubscription() {
            Subscription subscription = reference.get();
            return subscription == UNSUBSCRIBED ? Subscriptions.empty() : subscription;
    }
    public boolean isUnsubscribed() {
        return reference.get() == UNSUBSCRIBED;
    }
}

@jloisel
Copy link
Contributor Author

jloisel commented Dec 6, 2013

headinthebox > UNSUBSCRIBED is the internal sentinel. It should not escape from SerialSubscription internal implementation, since it could lead to unexpected behavior.

We could another complete different approach: since managing a thread-safe mutable state is difficult, shouldn't we make the serial subscription immutable ?

I mean:

  • getSubscription() method violates encapsulation: it let's the internal state escape. One could unsubscribe the internal subscription from outside the serial subscription, which could lead to double unscribe() if then unscribed from serial,
  • Shouldn't SerialSubscription be package protected, or have at least a static factory method with a private constructor like:
public static Subscription serial(final Subscription delegate) {
    return new SerialSubscription(delegate);
}

And remove getSubscription() since it's never used in the api. Of course, this tends to no backward compatibility on this class, since it's public.

@jloisel
Copy link
Contributor Author

jloisel commented Dec 6, 2013

If we tend to have the same behavior as before, getSubscription() should return null when previously unscribed.

@cloudbees-pull-request-builder

RxJava-pull-requests #505 SUCCESS
This pull request looks good

@jloisel
Copy link
Contributor Author

jloisel commented Dec 6, 2013

I think we cannot use Subscriptions.empty() as unsubscribed sentinel, since it would behave unexpectedly it one sets empty() from outside via setSubscription().

I tend also to avoid null references in implementation to avoid unnecessary burden which reduces comprehension.

rx-core code base analysis shows that SerialSubscription could be easily immutable. But it diverges with Rx contract:
http://msdn.microsoft.com/en-us/library/system.reactive.disposables.serialdisposable(v=vs.103).aspx

Rather than try to have a complex contract for SerialSubscription, i would give a try to make it immutable. Immutable is thread-safe by nature as well as contention free.

@jloisel
Copy link
Contributor Author

jloisel commented Dec 6, 2013

Oh, it seems like due to cyclic dependencies (like on ResultSink), it's not possible to make it immutable.

@cloudbees-pull-request-builder

RxJava-pull-requests #525 SUCCESS
This pull request looks good

@benjchristensen
Copy link
Member

Looks like an improvement on current implementation and don't see problems. Using a state machine here is far preferable to the previous lock based implementation. This should help or fix #577

benjchristensen added a commit that referenced this pull request Dec 8, 2013
Use lock free strategy for several Subscription implementations
@benjchristensen benjchristensen merged commit 2d91c99 into ReactiveX:master Dec 8, 2013
rickbw pushed a commit to rickbw/RxJava that referenced this pull request Jan 9, 2014
Use lock free strategy for several Subscription implementations
jihoonson pushed a commit to jihoonson/RxJava that referenced this pull request Mar 6, 2020
…e exception so that it is also possible to count exceptions as a success. (ReactiveX#573)

The list of ignored exceptions has always precedence. If an exception is ignored it neither counts as a success nor failure.
If the list of recorded exceptions only contains some exceptions, all others count as a success, unless they are not part of the list of ignored exceptions.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants