-
Notifications
You must be signed in to change notification settings - Fork 776
Description
If we use the RefCount
overload that takes a disconnectDelay
argument, and a minObservers
of 2 or more, it does not work with sources that complete immediately on subscription.
For example, this:
var o2 = Observable.Empty<int>().Publish().RefCount(2, TimeSpan.FromSeconds(10));
var s1 = o2.Subscribe();
var s2 = o2.Subscribe();
s1.Dispose();
s2.Dispose();
var s3 = o2.Subscribe();
will throw an InvalidOperationException
reporting that a Disposable has already been assigned
from the final line.
Here's the sequence of events:
- Initial subscription does not connect because we've not reached the
minObservers
threshold - Second subscription hits the threshold, so
RefCount
callsConnect
on the connectable observable returned byPublish
, which immediately completes both subscriptions (before returning fromConnect
) - Because both subscriptions have now completed, the
RefCount
's internal count drops to 0, so it schedules the work item that will eventually disconnect after the specified delay - The code calls
Dispose
to shut down the first two subscriptions, but this doesn't do anything because they have both already terminated as a result of the underlying source delivering anOnComplete
- The third subscription to
RefCount
happens before thedisconnectDelay
has elapsed. (Note that theRefCount
is still connected to the connectable observable returned byPublish
.) RefCount
callsSubscribe
on its source (the observable returned byPublish
) and since that happens to be aSubject<int>
that has already completed, that source immediately callsOnComplete
before returning fromSubscribe
, meaning that this newly-created subscription is already completed before that call toSubscribe
returns.- The
RefCount
's internal count is 1 at this point, but it then sets up the callback to run on completion, which executes immediately because this new subscription is already complete, and this causes the internal count to drop back to 0 a second time. RefCount
tries to schedule a second work item to disconnect after the specified delay, but the one it set up earlier the first time the count dropped to 0 remains in place. It's using aSingleAssignmentDisposable
to hold that, which notices the attempt to assign a second disposable, and throws this exception.
The basic problem seems to be that RefCount.Lazy
(which is used when a disconnectDelay
is specified) does not correctly handle the transition from the "0 subscribers, but still connected" state to "at least 1 subscriber" state. It should cancel the work item that would perform the delayed disconnect, but that still seems to be in place, which seems to be why we get this exception.