Skip to content

Commit

Permalink
rxcpp: Fix data race in composite_subscription
Browse files Browse the repository at this point in the history
composite_subscription_inner had missing checks which could lead to add/remove/clear
racing against unsubscribe.

(See the issue for more details).

Fixes: ReactiveX#475
  • Loading branch information
iam authored and kirkshoop committed Feb 13, 2019
1 parent aee39b9 commit aac2fc9
Showing 1 changed file with 102 additions and 7 deletions.
109 changes: 102 additions & 7 deletions Rx/v2/src/rxcpp/rx-subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,14 @@ class subscription : public subscription_base
std::terminate();
}
}

explicit subscription(std::shared_ptr<base_subscription_state> s)
: state(std::move(s))
{
if (!state) {
std::terminate();
}
}
public:

subscription()
Expand Down Expand Up @@ -178,9 +186,23 @@ class subscription : public subscription_base
weak_state_type get_weak() {
return state;
}

// Atomically promote weak subscription to strong.
// Calls std::terminate if w has already expired.
static subscription lock(weak_state_type w) {
return subscription(w);
}

// Atomically try to promote weak subscription to strong.
// Returns an empty maybe<> if w has already expired.
static rxu::maybe<subscription> maybe_lock(weak_state_type w) {
auto strong_subscription = w.lock();
if (!strong_subscription) {
return rxu::detail::maybe<subscription>{};
} else {
return rxu::detail::maybe<subscription>{subscription{std::move(strong_subscription)}};
}
}
};

inline bool operator<(const subscription& lhs, const subscription& rhs) {
Expand Down Expand Up @@ -223,8 +245,14 @@ class composite_subscription_inner
typedef subscription::weak_state_type weak_subscription;
struct composite_subscription_state : public std::enable_shared_from_this<composite_subscription_state>
{
// invariant: cannot access this data without the lock held.
std::set<subscription> subscriptions;
// double checked locking:
// issubscribed must be loaded again after each lock acquisition.
// invariant:
// never call subscription::unsubscribe with lock held.
std::mutex lock;
// invariant: transitions from 'true' to 'false' exactly once, at any time.
std::atomic<bool> issubscribed;

~composite_subscription_state()
Expand All @@ -242,41 +270,108 @@ class composite_subscription_inner
{
}

// Atomically add 's' to the set of subscriptions.
//
// If unsubscribe() has already occurred, this immediately
// calls s.unsubscribe().
//
// cs.unsubscribe() [must] happens-before s.unsubscribe()
//
// Due to the un-atomic nature of calling 's.unsubscribe()',
// it is possible to observe the unintuitive
// add(s)=>s.unsubscribe() prior
// to any of the unsubscribe()=>sN.unsubscribe().
inline weak_subscription add(subscription s) {
if (!issubscribed) {
if (!issubscribed) { // load.acq [seq_cst]
s.unsubscribe();
} else if (s.is_subscribed()) {
std::unique_lock<decltype(lock)> guard(lock);
subscriptions.insert(s);
if (!issubscribed) { // load.acq [seq_cst]
// unsubscribe was called concurrently.
guard.unlock();
// invariant: do not call unsubscribe with lock held.
s.unsubscribe();
} else {
subscriptions.insert(s);
}
}
return s.get_weak();
}

// Atomically remove 'w' from the set of subscriptions.
//
// This does nothing if 'w' was already previously removed,
// or refers to an expired value.
inline void remove(weak_subscription w) {
if (issubscribed && !w.expired()) {
auto s = subscription::lock(w);
if (issubscribed) { // load.acq [seq_cst]
rxu::maybe<subscription> maybe_subscription = subscription::maybe_lock(w);

if (maybe_subscription.empty()) {
// Do nothing if the subscription has already expired.
return;
}

std::unique_lock<decltype(lock)> guard(lock);
subscriptions.erase(std::move(s));
// invariant: subscriptions must be accessed under the lock.

if (issubscribed) { // load.acq [seq_cst]
subscription& s = maybe_subscription.get();
subscriptions.erase(std::move(s));
} // else unsubscribe() was called concurrently; this becomes a no-op.
}
}

// Atomically clear all subscriptions that were observably added
// (and not subsequently observably removed).
//
// Un-atomically call unsubscribe on those subscriptions.
//
// forall subscriptions in {add(s1),add(s2),...}
// - {remove(s3), remove(s4), ...}:
// cs.unsubscribe() || cs.clear() happens before s.unsubscribe()
//
// cs.unsubscribe() observed-before cs.clear ==> do nothing.
inline void clear() {
if (issubscribed) {
if (issubscribed) { // load.acq [seq_cst]
std::unique_lock<decltype(lock)> guard(lock);

if (!issubscribed) { // load.acq [seq_cst]
// unsubscribe was called concurrently.
return;
}

std::set<subscription> v(std::move(subscriptions));
// invariant: do not call unsubscribe with lock held.
guard.unlock();
std::for_each(v.begin(), v.end(),
[](const subscription& s) {
s.unsubscribe(); });
}
}

// Atomically clear all subscriptions that were observably added
// (and not subsequently observably removed).
//
// Un-atomically call unsubscribe on those subscriptions.
//
// Switches to an 'unsubscribed' state, all subsequent
// adds are immediately unsubscribed.
//
// cs.unsubscribe() [must] happens-before
// cs.add(s) ==> s.unsubscribe()
//
// forall subscriptions in {add(s1),add(s2),...}
// - {remove(s3), remove(s4), ...}:
// cs.unsubscribe() || cs.clear() happens before s.unsubscribe()
inline void unsubscribe() {
if (issubscribed.exchange(false)) {
if (issubscribed.exchange(false)) { // cas.acq_rel [seq_cst]
std::unique_lock<decltype(lock)> guard(lock);

// is_subscribed can only transition to 'false' once,
// does not need an extra atomic access here.

std::set<subscription> v(std::move(subscriptions));
// invariant: do not call unsubscribe with lock held.
guard.unlock();
std::for_each(v.begin(), v.end(),
[](const subscription& s) {
Expand Down

0 comments on commit aac2fc9

Please sign in to comment.