diff --git a/Rx.NET/Source/src/System.Reactive/Internal/Sink.cs b/Rx.NET/Source/src/System.Reactive/Internal/Sink.cs index a4c21fb01c..1b07c57d78 100644 --- a/Rx.NET/Source/src/System.Reactive/Internal/Sink.cs +++ b/Rx.NET/Source/src/System.Reactive/Internal/Sink.cs @@ -30,6 +30,11 @@ public void Dispose() Dispose(true); } + /// + /// Override this method to dispose additional resources. + /// The method is guaranteed to be called at most once. + /// + /// If true, the method was called from . protected virtual void Dispose(bool disposing) { //Calling base.Dispose(true) is not a proper disposal, so we can omit the assignment here. diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/RefCount.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/RefCount.cs index e55b8c4323..ebbf60e6ec 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/RefCount.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/RefCount.cs @@ -16,54 +16,114 @@ internal sealed class Eager : Producer private readonly IConnectableObservable _source; private readonly object _gate; - private int _count; - private IDisposable _connectableSubscription; + /// + /// Contains the current active connection's state or null + /// if no connection is active at the moment. + /// Should be manipulated while holding the lock. + /// + private RefConnection _connection; public Eager(IConnectableObservable source) { _source = source; _gate = new object(); - _count = 0; - _connectableSubscription = default(IDisposable); } - protected override _ CreateSink(IObserver observer) => new _(observer); + protected override _ CreateSink(IObserver observer) => new _(observer, this); - protected override void Run(_ sink) => sink.Run(this); + protected override void Run(_ sink) => sink.Run(); internal sealed class _ : IdentitySink { - public _(IObserver observer) + private readonly Eager _parent; + /// + /// Contains the connection reference the downstream observer + /// has subscribed to. Its purpose is to + /// avoid subscribing, connecting and disconnecting + /// while holding a lock. + /// + private RefConnection _targetConnection; + + public _(IObserver observer, Eager parent) : base(observer) { + _parent = parent; } - public void Run(Eager parent) + public void Run() { - var subscription = parent._source.SubscribeSafe(this); + var doConnect = false; + var conn = default(RefConnection); - lock (parent._gate) + lock (_parent._gate) { - if (++parent._count == 1) + // get the active connection state + conn = _parent._connection; + // if null, a new connection should be established + if (conn == null) { - parent._connectableSubscription = parent._source.Connect(); + conn = new RefConnection(); + // make it the active one + _parent._connection = conn; } + + // this is the first observer, then connect + doConnect = conn._count++ == 0; + // save the current connection for this observer + _targetConnection = conn; } - SetUpstream(Disposable.Create(() => + // subscribe to the source first + Run(_parent._source); + // then connect the source if necessary + if (doConnect && !Disposable.GetIsDisposed(ref conn._disposable)) { - subscription.Dispose(); + // this makes sure if the connection ends synchronously + // only the currently known connection is affected + // and a connection from a concurrent reconnection won't + // interfere + Disposable.SetSingle(ref conn._disposable, _parent._source.Connect()); + } + } - lock (parent._gate) + protected override void Dispose(bool disposing) + { + base.Dispose(disposing); + if (disposing) + { + // get and forget the saved connection + var targetConnection = _targetConnection; + _targetConnection = null; + + lock (_parent._gate) { - if (--parent._count == 0) + // if the current connection is no longer the saved connection + // or the counter hasn't reached zero yet + if (targetConnection != _parent._connection + || --targetConnection._count != 0) { - parent._connectableSubscription.Dispose(); + // nothing to do. + return; } + // forget the current connection + _parent._connection = null; } - })); + + // disconnect + Disposable.TryDispose(ref targetConnection._disposable); + } } } + + /// + /// Holds an individual connection state: the observer count and + /// the connection's IDisposable. + /// + private sealed class RefConnection + { + internal int _count; + internal IDisposable _disposable; + } } internal sealed class Lazy : Producer diff --git a/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/RefCountTest.cs b/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/RefCountTest.cs index e9d749b1ee..577a7d821a 100644 --- a/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/RefCountTest.cs +++ b/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/RefCountTest.cs @@ -411,5 +411,30 @@ public void LazyRefCount_Publish() Subscribe(285, 300) ); } + + [Fact] + public void RefCount_source_already_completed_synchronously() + { + var subscribed = 0; + var unsubscribed = 0; + + var o1 = Observable.Create(observer => + { + subscribed++; + observer.OnCompleted(); + + return Disposable.Create(() => unsubscribed++); + }); + + var o2 = o1.Publish().RefCount(); + + var s1 = o2.Subscribe(); + Assert.Equal(1, subscribed); + Assert.Equal(1, unsubscribed); + + var s2 = o2.Subscribe(); + Assert.Equal(1, subscribed); + Assert.Equal(1, unsubscribed); + } } }