Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Fixes for issue #1136
OperatorMulticast
is straightforward from concurrency perspective. The only consideration is that if the current subscription gets unsubscribed before theconnect
reaches theunsafeSubscribe
, it really depends on the source what it will do with an unsubscribed client. It is possible to put extra effort to make sure a newly established connection won't get unsubscribed before it is actually connected or if it gets unsubscribed immediately, no subscription is attempted at all.OperatorSampleWithTime
: didn't want to push too many PRs so I just included it here. There was a missing unsubscribe in theonCompleted()
that makes sure the worker is stopped.Subscribers.empty()
was implemented by returning the same Subscriber to everyone, which Subscriber is stateful so if someone unsubscribes it, it will appear everywhere as unsubscribed and can have unwanted effects. There is no such problem withObservers.empty()
as it is stateless. The change just usesSubscribers.from()
to wrapObservers.empty()
and every caller gets its own independent instance.OperatorRefCount
was a bit more tricky. Since it has a connection counter, one has to serialize subscriptions with unsubscriptions. However, it is possible a subscription gets unsubscribed before code reaches the connect check which may disrupt the connection counter. The solution is to keep track of the unsubscriptions that happen before the connection attempts and not change the counter in case of out-of-order behavior. The final aim was to avoid leaking the connection statuses if theunsafeSubscribe
throws concurrently with a client unsubscribing by using weak tokens (integers wouldn't have worked as the first 0-127 are cached in the JVM and would never GC).