diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/TakeLast.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/TakeLast.cs index 8666518a80..dfe60773e0 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/TakeLast.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/TakeLast.cs @@ -25,7 +25,7 @@ public Count(IObservable source, int count, IScheduler loopScheduler) protected override _ CreateSink(IObserver observer) => new _(this, observer); - protected override void Run(_ sink) => sink.Run(); + protected override void Run(_ sink) => sink.Run(_source); internal sealed class _ : IdentitySink { @@ -41,21 +41,15 @@ public _(Count parent, IObserver observer) _queue = new Queue(); } - private IDisposable _sourceDisposable; private IDisposable _loopDisposable; - public void Run() - { - Disposable.SetSingle(ref _sourceDisposable, _parent._source.SubscribeSafe(this)); - } - protected override void Dispose(bool disposing) { if (disposing) { Disposable.TryDispose(ref _loopDisposable); - Disposable.TryDispose(ref _sourceDisposable); } + base.Dispose(disposing); } @@ -68,7 +62,7 @@ public override void OnNext(TSource value) public override void OnCompleted() { - Disposable.TryDispose(ref _sourceDisposable); + DisposeUpstream(); var longRunning = _parent._loopScheduler.AsLongRunning(); if (longRunning != null)