-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Subjects reimplemented. #605
Conversation
RxJava-pull-requests #542 SUCCESS |
Holding the lock while dispatching events (i.e. executing arbitrary code) seems pretty dangerous. In my experience I've always ended up regretting it when I did this. |
"... In Rx.NET when an event is received, the list of observers is retrieved while holding the lock, then outside the lock, the list is traversed and the events are propagated to the observers. Note however, if an observer unsubscribes right after the unlock and before the event propagation, it will still appear in the list and will receive the event. IMO, this is an undesired behavior ...". Unsubscribing does a "best effort", don't hold the lock. |
* @return false if a terminal condition was reached, i.e., | ||
* this is an isOnCompleted, isOnError or the observer.onNext threw | ||
*/ | ||
public boolean acceptSafe(Observer<? super T> observer) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is repeating what SafeObserver
does. A Notification
doesn't need to do anything different and thus we shouldn't be replicating that logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay.
@akarnokd Thanks for this ... @headinthebox and I are reviewing and will end up with some merged/refactored form. So this specific PR won't be merged. |
No problem. |
}; | ||
Replayer rp = new Replayer(obs, s); | ||
replayers.put(s, rp); | ||
rp.replayTill(values.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is called from inside the lock being held which means that replaying all historical values to a new Observer will block all existing Observers and new values from proceeding.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True. It would need something like the start-once processQueue in observeOn,
Based on this discussion I've tried my hand in pull request #651 I'd appreciate your review. |
Sure. |
Closing as we ended up doing this in #651. Thank you very much for the work on this and the significant performance gains you helped achieve! |
Reimplemented all 4 kinds of subjects with the following properties:
onNext
,onError
andonCompleted
are fully thread safe against subscription and unsubscription.AsyncSubject
,PublishSubject
andBehaviorSubject
will re-emit just the very first exception when an observer subscribes to them.Notification.acceptSafe
which will capture the exception of theonNext
and propagate it through theonError
. Its return value indicates if the observer can still be used after (i.e., no terminal event was delivered).AbstractSubject
, although none of the subjects use this class any more.UnsubscribeTester
class, which seems to be out-of-place. Can this be moved into the test directory?