Skip to content

Commit

Permalink
Merge pull request #568 from jloisel/master
Browse files Browse the repository at this point in the history
Use lock free strategy for several Subscription implementations
  • Loading branch information
benjchristensen committed Dec 8, 2013
2 parents 01aab57 + 078b687 commit 2d91c99
Show file tree
Hide file tree
Showing 4 changed files with 455 additions and 218 deletions.
160 changes: 90 additions & 70 deletions rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,103 +15,123 @@
*/
package rx.subscriptions;

import static java.util.Arrays.asList;
import static java.util.Collections.unmodifiableSet;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;

import rx.Subscription;
import rx.util.CompositeException;

/**
* Subscription that represents a group of Subscriptions that are unsubscribed together.
* Subscription that represents a group of Subscriptions that are unsubscribed
* together.
*
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable(v=vs.103).aspx">Rx.Net equivalent CompositeDisposable</a>
* @see <a
* href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable(v=vs.103).aspx">Rx.Net
* equivalent CompositeDisposable</a>
*/
public class CompositeSubscription implements Subscription {
private static final Set<Subscription> MUTATE_STATE = unmodifiableSet(new HashSet<Subscription>());
private static final Set<Subscription> UNSUBSCRIBED_STATE = unmodifiableSet(new HashSet<Subscription>());

private final AtomicReference<Set<Subscription>> reference = new AtomicReference<Set<Subscription>>();

/*
* The reason 'synchronized' is used on 'add' and 'unsubscribe' is because AtomicBoolean/ConcurrentLinkedQueue are both being modified so it needs to be done atomically.
*
* TODO evaluate whether use of synchronized is a performance issue here and if it's worth using an atomic state machine or other non-locking approach
*/
private AtomicBoolean unsubscribed = new AtomicBoolean(false);
private final ConcurrentHashMap<Subscription, Boolean> subscriptions = new ConcurrentHashMap<Subscription, Boolean>();

public CompositeSubscription(List<Subscription> subscriptions) {
for (Subscription s : subscriptions) {
this.subscriptions.put(s, Boolean.TRUE);
}
public CompositeSubscription(final Subscription... subscriptions) {
reference.set(new HashSet<Subscription>(asList(subscriptions)));
}

public CompositeSubscription(Subscription... subscriptions) {
for (Subscription s : subscriptions) {
this.subscriptions.put(s, Boolean.TRUE);
}
public boolean isUnsubscribed() {
return reference.get() == UNSUBSCRIBED_STATE;
}

/**
* Remove and unsubscribe all subscriptions but do not unsubscribe the outer CompositeSubscription.
*/
public void clear() {
Collection<Throwable> es = null;
for (Subscription s : subscriptions.keySet()) {
try {
public void add(final Subscription s) {
do {
final Set<Subscription> existing = reference.get();
if (existing == UNSUBSCRIBED_STATE) {
s.unsubscribe();
this.subscriptions.remove(s);
} catch (Throwable e) {
if (es == null) {
es = new ArrayList<Throwable>();
}
es.add(e);
break;
}
}
if (es != null) {
throw new CompositeException("Failed to unsubscribe to 1 or more subscriptions.", es);
}
}

/**
* Remove the {@link Subscription} and unsubscribe it.
*
* @param s
*/
public void remove(Subscription s) {
this.subscriptions.remove(s);
// also unsubscribe from it: http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable.remove(v=vs.103).aspx
s.unsubscribe();
if (reference.compareAndSet(existing, MUTATE_STATE)) {
existing.add(s);
reference.set(existing);
break;
}
} while (true);
}

public boolean isUnsubscribed() {
return unsubscribed.get();
public void remove(final Subscription s) {
do {
final Set<Subscription> subscriptions = reference.get();
if (subscriptions == UNSUBSCRIBED_STATE) {
s.unsubscribe();
break;
}

if (reference.compareAndSet(subscriptions, MUTATE_STATE)) {
// also unsubscribe from it:
// http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable.remove(v=vs.103).aspx
subscriptions.remove(s);
reference.set(subscriptions);
s.unsubscribe();
break;
}
} while (true);
}

public synchronized void add(Subscription s) {
if (unsubscribed.get()) {
s.unsubscribe();
} else {
subscriptions.put(s, Boolean.TRUE);
}
public void clear() {
do {
final Set<Subscription> subscriptions = reference.get();
if (subscriptions == UNSUBSCRIBED_STATE) {
break;
}

if (reference.compareAndSet(subscriptions, MUTATE_STATE)) {
final Set<Subscription> copy = new HashSet<Subscription>(
subscriptions);
subscriptions.clear();
reference.set(subscriptions);

for (final Subscription subscription : copy) {
subscription.unsubscribe();
}
break;
}
} while (true);
}

@Override
public synchronized void unsubscribe() {
if (unsubscribed.compareAndSet(false, true)) {
Collection<Throwable> es = null;
for (Subscription s : subscriptions.keySet()) {
try {
s.unsubscribe();
} catch (Throwable e) {
if (es == null) {
es = new ArrayList<Throwable>();
public void unsubscribe() {
do {
final Set<Subscription> subscriptions = reference.get();
if (subscriptions == UNSUBSCRIBED_STATE) {
break;
}

if (subscriptions == MUTATE_STATE) {
continue;
}

if (reference.compareAndSet(subscriptions, UNSUBSCRIBED_STATE)) {
final Collection<Throwable> es = new ArrayList<Throwable>();
for (final Subscription s : subscriptions) {
try {
s.unsubscribe();
} catch (final Throwable e) {
es.add(e);
}
es.add(e);
}
if (es.isEmpty()) {
break;
}
throw new CompositeException(
"Failed to unsubscribe to 1 or more subscriptions.", es);
}
if (es != null) {
throw new CompositeException("Failed to unsubscribe to 1 or more subscriptions.", es);
}
}
} while (true);
}
}
132 changes: 62 additions & 70 deletions rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java
Original file line number Diff line number Diff line change
@@ -1,70 +1,62 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.subscriptions;

import rx.Subscription;

/**
* Represents a subscription whose underlying subscription can be swapped for another subscription
* which causes the previous underlying subscription to be unsubscribed.
*
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.serialdisposable(v=vs.103).aspx">Rx.Net equivalent SerialDisposable</a>
*/
public class SerialSubscription implements Subscription {
private boolean unsubscribed;
private Subscription subscription;
private final Object gate = new Object();

@Override
public void unsubscribe() {
Subscription toUnsubscribe = null;
synchronized (gate) {
if (!unsubscribed) {
if (subscription != null) {
toUnsubscribe = subscription;
subscription = null;
}
unsubscribed = true;
}
}
if (toUnsubscribe != null) {
toUnsubscribe.unsubscribe();
}
}

public Subscription getSubscription() {
synchronized (gate) {
return subscription;
}
}

public void setSubscription(Subscription subscription) {
Subscription toUnsubscribe = null;
synchronized (gate) {
if (!unsubscribed) {
if (this.subscription != null) {
toUnsubscribe = this.subscription;
}
this.subscription = subscription;
} else {
toUnsubscribe = subscription;
}
}
if (toUnsubscribe != null) {
toUnsubscribe.unsubscribe();
}
}
}
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.subscriptions;

import static rx.subscriptions.Subscriptions.empty;

import java.util.concurrent.atomic.AtomicReference;

import rx.Subscription;

/**
* Represents a subscription whose underlying subscription can be swapped for another subscription
* which causes the previous underlying subscription to be unsubscribed.
*
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.serialdisposable(v=vs.103).aspx">Rx.Net equivalent SerialDisposable</a>
*/
public class SerialSubscription implements Subscription {
private final AtomicReference<Subscription> reference = new AtomicReference<Subscription>(empty());

private static final Subscription UNSUBSCRIBED = new Subscription() {
@Override
public void unsubscribe() {
}
};

@Override
public void unsubscribe() {
setSubscription(UNSUBSCRIBED);
}

public void setSubscription(final Subscription subscription) {
do {
final Subscription current = reference.get();
if (current == UNSUBSCRIBED) {
subscription.unsubscribe();
break;
}
if (reference.compareAndSet(current, subscription)) {
current.unsubscribe();
break;
}
} while (true);
}

public Subscription getSubscription() {
final Subscription subscription = reference.get();
return subscription == UNSUBSCRIBED ? null : subscription;
}
}
Loading

0 comments on commit 2d91c99

Please sign in to comment.