Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BugFix: AsyncSubject #486

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 89 additions & 32 deletions rxjava-core/src/main/java/rx/subjects/AsyncSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;

import rx.Notification;
import rx.Observer;
import rx.Subscription;
import rx.operators.SafeObservableSubscription;
import rx.subscriptions.Subscriptions;

/**
* Subject that publishes only the last event to each {@link Observer} that has subscribed when the
Expand Down Expand Up @@ -60,61 +63,115 @@ public class AsyncSubject<T> extends Subject<T, T> {
* @return a new AsyncSubject
*/
public static <T> AsyncSubject<T> create() {
final ConcurrentHashMap<Subscription, Observer<? super T>> observers = new ConcurrentHashMap<Subscription, Observer<? super T>>();
final AsyncSubjectState<T> state = new AsyncSubjectState<T>();

OnSubscribeFunc<T> onSubscribe = new OnSubscribeFunc<T>() {
@Override
public Subscription onSubscribe(Observer<? super T> observer) {
final SafeObservableSubscription subscription = new SafeObservableSubscription();

subscription.wrap(new Subscription() {
@Override
public void unsubscribe() {
// on unsubscribe remove it from the map of outbound observers to notify
observers.remove(subscription);
/*
* Subscription needs to be synchronized with terminal states to ensure
* race conditions are handled. When subscribing we must make sure
* onComplete/onError is correctly emitted to all observers, even if it
* comes in while the onComplete/onError is being propagated.
*/
state.SUBSCRIPTION_LOCK.lock();
try {
if (state.completed.get()) {
emitNotificationToObserver(state, observer);
return Subscriptions.empty();
} else {
// the subject is not completed so we subscribe
final SafeObservableSubscription subscription = new SafeObservableSubscription();

subscription.wrap(new Subscription() {
@Override
public void unsubscribe() {
// on unsubscribe remove it from the map of outbound observers to notify
state.observers.remove(subscription);
}
});

// on subscribe add it to the map of outbound observers to notify
state.observers.put(subscription, observer);

return subscription;
}
});
} finally {
state.SUBSCRIPTION_LOCK.unlock();
}

// on subscribe add it to the map of outbound observers to notify
observers.put(subscription, observer);
return subscription;
}

};

return new AsyncSubject<T>(onSubscribe, observers);
return new AsyncSubject<T>(onSubscribe, state);
}

private final ConcurrentHashMap<Subscription, Observer<? super T>> observers;
private final AtomicReference<T> currentValue;
private final AtomicBoolean hasValue = new AtomicBoolean();
private static <T> void emitNotificationToObserver(final AsyncSubjectState<T> state, Observer<? super T> observer) {
Notification<T> finalValue = state.currentValue.get();

// if null that means onNext was never invoked (no Notification set)
if (finalValue != null) {
if (finalValue.isOnNext()) {
observer.onNext(finalValue.getValue());
} else if (finalValue.isOnError()) {
observer.onError(finalValue.getThrowable());
}
}
observer.onCompleted();
}

protected AsyncSubject(OnSubscribeFunc<T> onSubscribe, ConcurrentHashMap<Subscription, Observer<? super T>> observers) {
/**
* State externally constructed and passed in so the onSubscribe function has access to it.
*
* @param <T>
*/
private static class AsyncSubjectState<T> {
private final ConcurrentHashMap<Subscription, Observer<? super T>> observers = new ConcurrentHashMap<Subscription, Observer<? super T>>();
private final AtomicReference<Notification<T>> currentValue = new AtomicReference<Notification<T>>();
private final AtomicBoolean completed = new AtomicBoolean();
private final ReentrantLock SUBSCRIPTION_LOCK = new ReentrantLock();
}

private final AsyncSubjectState<T> state;

protected AsyncSubject(OnSubscribeFunc<T> onSubscribe, AsyncSubjectState<T> state) {
super(onSubscribe);
this.observers = observers;
this.currentValue = new AtomicReference<T>();
this.state = state;
}

@Override
public void onCompleted() {
T finalValue = currentValue.get();
for (Observer<? super T> observer : observers.values()) {
if (hasValue.get()) {
observer.onNext(finalValue);
}
observer.onCompleted();
}
terminalState();
}

@Override
public void onError(Throwable e) {
for (Observer<? super T> observer : observers.values()) {
observer.onError(e);
}
state.currentValue.set(new Notification<T>(e));
terminalState();
}

@Override
public void onNext(T args) {
hasValue.set(true);
currentValue.set(args);
public void onNext(T v) {
state.currentValue.set(new Notification<T>(v));
}

private void terminalState() {
/*
* We can not allow new subscribers to be added while we execute the terminal state.
*/
state.SUBSCRIPTION_LOCK.lock();
try {
if (state.completed.compareAndSet(false, true)) {
for (Subscription s : state.observers.keySet()) {
// emit notifications to this observer
emitNotificationToObserver(state, state.observers.get(s));
// remove the subscription as it is completed
state.observers.remove(s);
}
}
} finally {
state.SUBSCRIPTION_LOCK.unlock();
}
}
}
150 changes: 150 additions & 0 deletions rxjava-core/src/test/java/rx/subjects/AsyncSubjectTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@
*/
package rx.subjects;

import static org.junit.Assert.*;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;
Expand Down Expand Up @@ -66,6 +70,62 @@ public void testCompleted() {
verify(aObserver, times(1)).onCompleted();
}

@Test
public void testNull() {
AsyncSubject<String> subject = AsyncSubject.create();

@SuppressWarnings("unchecked")
Observer<String> aObserver = mock(Observer.class);
subject.subscribe(aObserver);

subject.onNext(null);
subject.onCompleted();

verify(aObserver, times(1)).onNext(null);
verify(aObserver, Mockito.never()).onError(any(Throwable.class));
verify(aObserver, times(1)).onCompleted();
}

@Test
public void testSubscribeAfterCompleted() {
AsyncSubject<String> subject = AsyncSubject.create();

@SuppressWarnings("unchecked")
Observer<String> aObserver = mock(Observer.class);

subject.onNext("one");
subject.onNext("two");
subject.onNext("three");
subject.onCompleted();

subject.subscribe(aObserver);

verify(aObserver, times(1)).onNext("three");
verify(aObserver, Mockito.never()).onError(any(Throwable.class));
verify(aObserver, times(1)).onCompleted();
}

@Test
public void testSubscribeAfterError() {
AsyncSubject<String> subject = AsyncSubject.create();

@SuppressWarnings("unchecked")
Observer<String> aObserver = mock(Observer.class);

subject.onNext("one");
subject.onNext("two");
subject.onNext("three");

RuntimeException re = new RuntimeException("failed");
subject.onError(re);

subject.subscribe(aObserver);

verify(aObserver, times(1)).onError(re);
verify(aObserver, Mockito.never()).onNext(any(String.class));
verify(aObserver, Mockito.never()).onCompleted();
}

@Test
public void testError() {
AsyncSubject<String> subject = AsyncSubject.create();
Expand Down Expand Up @@ -151,4 +211,94 @@ public void testEmptySubjectCompleted() {
inOrder.verify(aObserver, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();
}

/**
* Can receive timeout if subscribe never receives an onError/onCompleted ... which reveals a race condition.
*/
@Test
public void testSubscribeCompletionRaceCondition() {
/*
* With non-threadsafe code this fails most of the time on my dev laptop and is non-deterministic enough
* to act as a unit test to the race conditions.
*
* With the synchronization code in place I can not get this to fail on my laptop.
*/
for (int i = 0; i < 50; i++) {
final AsyncSubject<String> subject = AsyncSubject.create();
final AtomicReference<String> value1 = new AtomicReference<String>();

subject.subscribe(new Action1<String>() {

@Override
public void call(String t1) {
try {
// simulate a slow observer
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
value1.set(t1);
}

});

Thread t1 = new Thread(new Runnable() {

@Override
public void run() {
subject.onNext("value");
subject.onCompleted();
}
});

SubjectObserverThread t2 = new SubjectObserverThread(subject);
SubjectObserverThread t3 = new SubjectObserverThread(subject);
SubjectObserverThread t4 = new SubjectObserverThread(subject);
SubjectObserverThread t5 = new SubjectObserverThread(subject);

t2.start();
t3.start();
t1.start();
t4.start();
t5.start();
try {
t1.join();
t2.join();
t3.join();
t4.join();
t5.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

assertEquals("value", value1.get());
assertEquals("value", t2.value.get());
assertEquals("value", t3.value.get());
assertEquals("value", t4.value.get());
assertEquals("value", t5.value.get());
}

}

private static class SubjectObserverThread extends Thread {

private final AsyncSubject<String> subject;
private final AtomicReference<String> value = new AtomicReference<String>();

public SubjectObserverThread(AsyncSubject<String> subject) {
this.subject = subject;
}

@Override
public void run() {
try {
// a timeout exception will happen if we don't get a terminal state
String v = subject.timeout(2000, TimeUnit.MILLISECONDS).toBlockingObservable().single();
value.set(v);
} catch (Exception e) {
e.printStackTrace();
}
}
}

}