Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use lock free strategy for several Subscription implementations #568

Merged
merged 9 commits into from
Dec 8, 2013
63 changes: 23 additions & 40 deletions rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
*/
package rx.subscriptions;

import static rx.subscriptions.Subscriptions.empty;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import rx.Subscription;

/**
Expand All @@ -24,47 +29,25 @@
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.serialdisposable(v=vs.103).aspx">Rx.Net equivalent SerialDisposable</a>
*/
public class SerialSubscription implements Subscription {
private boolean unsubscribed;
private Subscription subscription;
private final Object gate = new Object();
private final AtomicBoolean unsubscribed = new AtomicBoolean();
private final AtomicReference<Subscription> reference = new AtomicReference<Subscription>(empty());

@Override
public void unsubscribe() {
Subscription toUnsubscribe = null;
synchronized (gate) {
if (!unsubscribed) {
if (subscription != null) {
toUnsubscribe = subscription;
subscription = null;
}
unsubscribed = true;
}
}
if (toUnsubscribe != null) {
toUnsubscribe.unsubscribe();
}
}
@Override
public void unsubscribe() {
if (unsubscribed.compareAndSet(false, true)) {
reference.getAndSet(empty()).unsubscribe();
}
}

public Subscription getSubscription() {
synchronized (gate) {
return subscription;
}
}
public void setSubscription(final Subscription subscription) {
if (unsubscribed.get()) {
subscription.unsubscribe();
} else {
reference.getAndSet(subscription == null ? empty() : subscription).unsubscribe();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why need to check subscription == null? I think subscription should not be null.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assume that we have two threads, one is calling unsubscribe, another is calling setSubscription. If the order is line 43 37 38 46, the subscription sent to setSubscription will not be unsubscribed.

}
}

public void setSubscription(Subscription subscription) {
Subscription toUnsubscribe = null;
synchronized (gate) {
if (!unsubscribed) {
if (this.subscription != null) {
toUnsubscribe = this.subscription;
}
this.subscription = subscription;
} else {
toUnsubscribe = subscription;
}
}
if (toUnsubscribe != null) {
toUnsubscribe.unsubscribe();
}
}
public Subscription getSubscription() {
return reference.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@
*/
package rx.subscriptions;

import static org.mockito.Mockito.*;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import org.junit.Before;
import org.junit.Test;
Expand All @@ -24,53 +30,94 @@
import rx.Subscription;

public class SerialSubscriptionTests {
private SerialSubscription serialSubscription;

@Before
public void setUp() {
MockitoAnnotations.initMocks(this);

serialSubscription = new SerialSubscription();
}

@Test
public void unsubscribingWithoutUnderlyingDoesNothing() {
serialSubscription.unsubscribe();
}

@Test
public void unsubscribingWithSingleUnderlyingUnsubscribes() {
Subscription underlying = mock(Subscription.class);
serialSubscription.setSubscription(underlying);
underlying.unsubscribe();
verify(underlying).unsubscribe();
}

@Test
public void replacingFirstUnderlyingCausesUnsubscription() {
Subscription first = mock(Subscription.class);
serialSubscription.setSubscription(first);
Subscription second = mock(Subscription.class);
serialSubscription.setSubscription(second);
verify(first).unsubscribe();
}

@Test
public void whenUnsubscribingSecondUnderlyingUnsubscribed() {
Subscription first = mock(Subscription.class);
serialSubscription.setSubscription(first);
Subscription second = mock(Subscription.class);
serialSubscription.setSubscription(second);
serialSubscription.unsubscribe();
verify(second).unsubscribe();
}

@Test
public void settingUnderlyingWhenUnsubscribedCausesImmediateUnsubscription()
{
serialSubscription.unsubscribe();
Subscription underlying = mock(Subscription.class);
serialSubscription.setSubscription(underlying);
verify(underlying).unsubscribe();
}
private SerialSubscription serialSubscription;

@Before
public void setUp() {
MockitoAnnotations.initMocks(this);

serialSubscription = new SerialSubscription();
}

@Test
public void unsubscribingWithoutUnderlyingDoesNothing() {
serialSubscription.unsubscribe();
}

@Test
public void unsubscribingWithSingleUnderlyingUnsubscribes() {
Subscription underlying = mock(Subscription.class);
serialSubscription.setSubscription(underlying);
underlying.unsubscribe();
verify(underlying).unsubscribe();
}

@Test
public void replacingFirstUnderlyingCausesUnsubscription() {
Subscription first = mock(Subscription.class);
serialSubscription.setSubscription(first);
Subscription second = mock(Subscription.class);
serialSubscription.setSubscription(second);
verify(first).unsubscribe();
}

@Test
public void whenUnsubscribingSecondUnderlyingUnsubscribed() {
Subscription first = mock(Subscription.class);
serialSubscription.setSubscription(first);
Subscription second = mock(Subscription.class);
serialSubscription.setSubscription(second);
serialSubscription.unsubscribe();
verify(second).unsubscribe();
}

@Test
public void settingUnderlyingWhenUnsubscribedCausesImmediateUnsubscription() {
serialSubscription.unsubscribe();
Subscription underlying = mock(Subscription.class);
serialSubscription.setSubscription(underlying);
verify(underlying).unsubscribe();
}

@Test(timeout = 1000)
public void settingUnderlyingWhenUnsubscribedCausesImmediateUnsubscriptionConcurrently()
throws InterruptedException {
final Subscription firstSet = mock(Subscription.class);
serialSubscription.setSubscription(firstSet);

final CountDownLatch start = new CountDownLatch(1);

final int count = 10;
final CountDownLatch end = new CountDownLatch(count);

final List<Thread> threads = new ArrayList<Thread>();
for (int i = 0 ; i < count ; i++) {
final Thread t = new Thread() {
@Override
public void run() {
try {
start.await();
serialSubscription.unsubscribe();
} catch (InterruptedException e) {
fail(e.getMessage());
} finally {
end.countDown();
}
}
};
t.start();
threads.add(t);
}

final Subscription underlying = mock(Subscription.class);
start.countDown();
serialSubscription.setSubscription(underlying);
end.await();
verify(firstSet).unsubscribe();
verify(underlying).unsubscribe();

for (final Thread t : threads) {
t.interrupt();
}
}
}