Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BugFix: Handling of Terminal State for Behavior/Publish Subjects #525

Merged
merged 2 commits into from
Nov 26, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 152 additions & 0 deletions rxjava-core/src/main/java/rx/subjects/AbstractSubject.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package rx.subjects;

import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;

import rx.Notification;
import rx.Observer;
import rx.Subscription;
import rx.operators.SafeObservableSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action2;

public abstract class AbstractSubject<T> extends Subject<T, T> {

protected AbstractSubject(rx.Observable.OnSubscribeFunc<T> onSubscribe) {
super(onSubscribe);
}

protected static class SubjectState<T> {
protected final ConcurrentHashMap<Subscription, Observer<? super T>> observers = new ConcurrentHashMap<Subscription, Observer<? super T>>();
protected final AtomicReference<Notification<T>> currentValue = new AtomicReference<Notification<T>>();
protected final AtomicBoolean completed = new AtomicBoolean();
protected final ReentrantLock SUBSCRIPTION_LOCK = new ReentrantLock();
}

protected static <T> OnSubscribeFunc<T> getOnSubscribeFunc(final SubjectState<T> state, final Action2<SubjectState<T>, Observer<? super T>> onEach) {
return new OnSubscribeFunc<T>() {
@Override
public Subscription onSubscribe(Observer<? super T> observer) {
/*
* Subscription needs to be synchronized with terminal states to ensure
* race conditions are handled. When subscribing we must make sure
* onComplete/onError is correctly emitted to all observers, even if it
* comes in while the onComplete/onError is being propagated.
*/
state.SUBSCRIPTION_LOCK.lock();
try {
if (state.completed.get()) {
emitNotification(state.currentValue.get(), observer);
if (onEach != null) {
onEach.call(state, observer);
}
return Subscriptions.empty();
} else {
// the subject is not completed so we subscribe
final SafeObservableSubscription subscription = new SafeObservableSubscription();

subscription.wrap(new Subscription() {
@Override
public void unsubscribe() {
// on unsubscribe remove it from the map of outbound observers to notify
state.observers.remove(subscription);
}
});

// on subscribe add it to the map of outbound observers to notify
state.observers.put(subscription, observer);

// invoke onSubscribe logic
if (onEach != null) {
onEach.call(state, observer);
}

return subscription;
}
} finally {
state.SUBSCRIPTION_LOCK.unlock();
}

}

};
}

protected static <T> void emitNotification(Notification<T> value, Observer<? super T> observer) {
// if null that means onNext was never invoked (no Notification set)
if (value != null) {
if (value.isOnNext()) {
observer.onNext(value.getValue());
} else if (value.isOnError()) {
observer.onError(value.getThrowable());
} else if (value.isOnCompleted()) {
observer.onCompleted();
}
}
}

/**
* Emit the current value.
*
* @param state
*/
protected static <T> void emitNotification(final SubjectState<T> state, final Action2<SubjectState<T>, Observer<? super T>> onEach) {
for (Subscription s : snapshotOfObservers(state)) {
Observer<? super T> o = state.observers.get(s);
// emit notifications to this observer
emitNotification(state.currentValue.get(), o);
// onEach action if applicable
if (onEach != null) {
onEach.call(state, o);
}
}
}

/**
* Emit the current value to all observers and remove their subscription.
*
* @param state
*/
protected void emitNotificationAndTerminate(final SubjectState<T> state, final Action2<SubjectState<T>, Observer<? super T>> onEach) {
/*
* We can not allow new subscribers to be added while we execute the terminal state.
*/
state.SUBSCRIPTION_LOCK.lock();
try {
if (state.completed.compareAndSet(false, true)) {
for (Subscription s : snapshotOfObservers(state)) {
Observer<? super T> o = state.observers.get(s);
// emit notifications to this observer
emitNotification(state.currentValue.get(), o);
// onEach action if applicable
if (onEach != null) {
onEach.call(state, o);
}

// remove the subscription as it is completed
state.observers.remove(s);
}
}
} finally {
state.SUBSCRIPTION_LOCK.unlock();
}
}

/**
* Current snapshot of 'state.observers.keySet()' so that concurrent modifications aren't included.
*
* This makes it behave deterministically in a single-threaded execution when nesting subscribes.
*
* In multi-threaded execution it will cause new subscriptions to wait until the following onNext instead
* of possibly being included in the current onNext iteration.
*
* @return List<Observer<T>>
*/
private static <T> Collection<Subscription> snapshotOfObservers(final SubjectState<T> state) {
return new ArrayList<Subscription>(state.observers.keySet());
}
}
124 changes: 31 additions & 93 deletions rxjava-core/src/main/java/rx/subjects/AsyncSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,9 @@
*/
package rx.subjects;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;

import rx.Notification;
import rx.Observer;
import rx.Subscription;
import rx.operators.SafeObservableSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action2;

/**
* Subject that publishes only the last event to each {@link Observer} that has subscribed when the
Expand Down Expand Up @@ -55,123 +48,68 @@
*
* @param <T>
*/
public class AsyncSubject<T> extends Subject<T, T> {
public class AsyncSubject<T> extends AbstractSubject<T> {

/**
* Create a new AsyncSubject
*
* @return a new AsyncSubject
*/
public static <T> AsyncSubject<T> create() {
final AsyncSubjectState<T> state = new AsyncSubjectState<T>();
final SubjectState<T> state = new SubjectState<T>();
OnSubscribeFunc<T> onSubscribe = getOnSubscribeFunc(state, new Action2<SubjectState<T>, Observer<? super T>>() {

OnSubscribeFunc<T> onSubscribe = new OnSubscribeFunc<T>() {
@Override
public Subscription onSubscribe(Observer<? super T> observer) {
/*
* Subscription needs to be synchronized with terminal states to ensure
* race conditions are handled. When subscribing we must make sure
* onComplete/onError is correctly emitted to all observers, even if it
* comes in while the onComplete/onError is being propagated.
*/
state.SUBSCRIPTION_LOCK.lock();
try {
if (state.completed.get()) {
emitNotificationToObserver(state, observer);
return Subscriptions.empty();
} else {
// the subject is not completed so we subscribe
final SafeObservableSubscription subscription = new SafeObservableSubscription();

subscription.wrap(new Subscription() {
@Override
public void unsubscribe() {
// on unsubscribe remove it from the map of outbound observers to notify
state.observers.remove(subscription);
}
});

// on subscribe add it to the map of outbound observers to notify
state.observers.put(subscription, observer);

return subscription;
public void call(SubjectState<T> state, Observer<? super T> o) {
// we want the last value + completed so add this extra logic
// to send onCompleted if the last value is an onNext
if (state.completed.get()) {
Notification<T> value = state.currentValue.get();
if (value != null && value.isOnNext()) {
o.onCompleted();
}
} finally {
state.SUBSCRIPTION_LOCK.unlock();
}

}

};

});
return new AsyncSubject<T>(onSubscribe, state);
}

private static <T> void emitNotificationToObserver(final AsyncSubjectState<T> state, Observer<? super T> observer) {
Notification<T> finalValue = state.currentValue.get();

// if null that means onNext was never invoked (no Notification set)
if (finalValue != null) {
if (finalValue.isOnNext()) {
observer.onNext(finalValue.getValue());
} else if (finalValue.isOnError()) {
observer.onError(finalValue.getThrowable());
}
}
observer.onCompleted();
}

/**
* State externally constructed and passed in so the onSubscribe function has access to it.
*
* @param <T>
*/
private static class AsyncSubjectState<T> {
private final ConcurrentHashMap<Subscription, Observer<? super T>> observers = new ConcurrentHashMap<Subscription, Observer<? super T>>();
private final AtomicReference<Notification<T>> currentValue = new AtomicReference<Notification<T>>();
private final AtomicBoolean completed = new AtomicBoolean();
private final ReentrantLock SUBSCRIPTION_LOCK = new ReentrantLock();
}

private final AsyncSubjectState<T> state;
private final SubjectState<T> state;

protected AsyncSubject(OnSubscribeFunc<T> onSubscribe, AsyncSubjectState<T> state) {
protected AsyncSubject(OnSubscribeFunc<T> onSubscribe, SubjectState<T> state) {
super(onSubscribe);
this.state = state;
}

@Override
public void onCompleted() {
terminalState();
/**
* Mark this subject as completed and emit latest value + 'onCompleted' to all Observers
*/
emitNotificationAndTerminate(state, new Action2<SubjectState<T>, Observer<? super T>>() {

@Override
public void call(SubjectState<T> state, Observer<? super T> o) {
o.onCompleted();
}
});
}

@Override
public void onError(Throwable e) {
/**
* Mark this subject as completed with an error as the last value and emit 'onError' to all Observers
*/
state.currentValue.set(new Notification<T>(e));
terminalState();
emitNotificationAndTerminate(state, null);
}

@Override
public void onNext(T v) {
/**
* Store the latest value but do not send it. It only gets sent when 'onCompleted' occurs.
*/
state.currentValue.set(new Notification<T>(v));
}

private void terminalState() {
/*
* We can not allow new subscribers to be added while we execute the terminal state.
*/
state.SUBSCRIPTION_LOCK.lock();
try {
if (state.completed.compareAndSet(false, true)) {
for (Subscription s : state.observers.keySet()) {
// emit notifications to this observer
emitNotificationToObserver(state, state.observers.get(s));
// remove the subscription as it is completed
state.observers.remove(s);
}
}
} finally {
state.SUBSCRIPTION_LOCK.unlock();
}
}
}
Loading