From a4b6c2b743e5698e0ac790d5b75e7e5670297445 Mon Sep 17 00:00:00 2001 From: Daniel Weber Date: Thu, 5 Jul 2018 12:47:30 +0200 Subject: [PATCH 1/5] Simplify a continuation. --- .../System.Reactive/Concurrency/Scheduler.Async.cs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Async.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Async.cs index 4de1917d5e..390111f78c 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Async.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Async.cs @@ -23,17 +23,14 @@ public IDisposable Run(IScheduler self, TState s, Func { - if (!t.IsCanceled) - { - var @this = (AsyncInvocation)thisObject; + var @this = (AsyncInvocation)thisObject; - t.Exception?.Handle(e => e is OperationCanceledException); + t.Exception?.Handle(e => e is OperationCanceledException); - Disposable.SetSingle(ref @this._run, t.Result); - } + Disposable.SetSingle(ref @this._run, t.Result); }, this, - TaskContinuationOptions.ExecuteSynchronously); + TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.NotOnCanceled); return this; } From f850df1c98fe325560e81e8875d2a1b0bb55a6b0 Mon Sep 17 00:00:00 2001 From: Daniel Weber Date: Thu, 5 Jul 2018 12:39:29 +0200 Subject: [PATCH 2/5] A Task-continuation should be cancelled when the Sink is disposed to avoid leaking the continuation on long-lived tasks. --- .../src/System.Reactive/Linq/Observable/Merge.cs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Merge.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Merge.cs index 05637284b8..c86f85ed5b 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Merge.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Merge.cs @@ -288,6 +288,7 @@ public _(IObserver observer) } private readonly object _gate = new object(); + private readonly CancellationTokenSource _cts = new CancellationTokenSource(); private volatile int _count = 1; public override void OnNext(Task value) @@ -299,7 +300,7 @@ public override void OnNext(Task value) } else { - value.ContinueWith((t, thisObject) => ((_)thisObject).OnCompletedTask(t), this); + value.ContinueWith((t, thisObject) => ((_)thisObject).OnCompletedTask(t), this, _cts.Token); } } @@ -354,6 +355,14 @@ public override void OnCompleted() } } } + + protected override void Dispose(bool disposing) + { + if (disposing) + _cts.Cancel(); + + base.Dispose(disposing); + } } } } From c4121798ffec393ef736e4a7e091bf3a410ee668 Mon Sep 17 00:00:00 2001 From: Daniel Weber Date: Thu, 5 Jul 2018 12:43:50 +0200 Subject: [PATCH 3/5] SelectMany: Cancel continuations as soon as the Sink is disposed to avoid a leaking continuation if the task lives longer than the Sink. --- .../src/System.Reactive/Linq/Observable/SelectMany.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/SelectMany.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/SelectMany.cs index 20380cfc49..e5c07d7ee7 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/SelectMany.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/SelectMany.cs @@ -597,7 +597,7 @@ private void AttachContinuation(TSource value, Task task) // // Separate method to avoid closure in synchronous completion case. // - task.ContinueWith(t => OnCompletedTask(value, t)); + task.ContinueWith(t => OnCompletedTask(value, t), _cancel.Token); } private void OnCompletedTask(TSource value, Task task) @@ -758,7 +758,7 @@ private void AttachContinuation(TSource value, int index, Task task // // Separate method to avoid closure in synchronous completion case. // - task.ContinueWith(t => OnCompletedTask(value, index, t)); + task.ContinueWith(t => OnCompletedTask(value, index, t), _cancel.Token); } private void OnCompletedTask(TSource value, int index, Task task) @@ -1538,7 +1538,7 @@ public override void OnNext(TSource value) } else { - task.ContinueWith((closureTask, thisObject) => ((_)thisObject).OnCompletedTask(closureTask), this); + task.ContinueWith((closureTask, thisObject) => ((_)thisObject).OnCompletedTask(closureTask), this, _cts.Token); } } @@ -1670,7 +1670,7 @@ public override void OnNext(TSource value) } else { - task.ContinueWith((closureTask, thisObject) => ((_)thisObject).OnCompletedTask(closureTask), this); + task.ContinueWith((closureTask, thisObject) => ((_)thisObject).OnCompletedTask(closureTask), this, _cts.Token); } } From 26cbadf62c9340e7af2825e05aae36982146dacb Mon Sep 17 00:00:00 2001 From: Daniel Weber Date: Thu, 5 Jul 2018 13:41:16 +0200 Subject: [PATCH 4/5] Avoid continuation leaks in TaskObservableExtensions.ToObservable. This is a bit more involved. The continuation must not be registered before a subscription happens because it might leak on long lived tasks. So instead of registering a continuation on a task once and forwarding the results to an AsyncSubject, we construct an Observable that registers the continuation for each subscription. Memory performance might of course go down if a lot of subscriptions happen on a task, but it won't leak. --- .../Tasks/TaskObservableExtensions.cs | 73 ++++++++++++++++--- 1 file changed, 61 insertions(+), 12 deletions(-) diff --git a/Rx.NET/Source/src/System.Reactive/Threading/Tasks/TaskObservableExtensions.cs b/Rx.NET/Source/src/System.Reactive/Threading/Tasks/TaskObservableExtensions.cs index d657a34a6e..e7fe59bcb3 100644 --- a/Rx.NET/Source/src/System.Reactive/Threading/Tasks/TaskObservableExtensions.cs +++ b/Rx.NET/Source/src/System.Reactive/Threading/Tasks/TaskObservableExtensions.cs @@ -17,6 +17,65 @@ namespace System.Reactive.Threading.Tasks /// public static class TaskObservableExtensions { + private sealed class SlowTaskObservable : IObservable + { + private readonly Task _task; + private readonly IScheduler _scheduler; + + public SlowTaskObservable(Task task, IScheduler scheduler) + { + _task = task; + _scheduler = scheduler; + } + + public IDisposable Subscribe(IObserver observer) + { + if (observer == null) + { + throw new ArgumentNullException(nameof(observer)); + } + + var cts = new CancellationDisposable(); + var options = GetTaskContinuationOptions(_scheduler); + + if (_scheduler == null) + _task.ContinueWith((t, subjectObject) => t.EmitTaskResult((IObserver)subjectObject), observer, cts.Token, options, TaskScheduler.Current); + else + _task.ContinueWith((t, subjectObject) => _scheduler.ScheduleAction((t, subjectObject), tuple => tuple.t.EmitTaskResult((IObserver)tuple.subjectObject)), observer, cts.Token, options, TaskScheduler.Current); + + return cts; + } + } + + private sealed class SlowTaskObservable : IObservable + { + private readonly Task _task; + private readonly IScheduler _scheduler; + + public SlowTaskObservable(Task task, IScheduler scheduler) + { + _task = task; + _scheduler = scheduler; + } + + public IDisposable Subscribe(IObserver observer) + { + if (observer == null) + { + throw new ArgumentNullException(nameof(observer)); + } + + var cts = new CancellationDisposable(); + var options = GetTaskContinuationOptions(_scheduler); + + if (_scheduler == null) + _task.ContinueWith((t, subjectObject) => t.EmitTaskResult((IObserver)subjectObject), observer, cts.Token, options, TaskScheduler.Current); + else + _task.ContinueWith((t, subjectObject) => _scheduler.ScheduleAction((t, subjectObject), tuple => tuple.t.EmitTaskResult((IObserver)tuple.subjectObject)), observer, cts.Token, options, TaskScheduler.Current); + + return cts; + } + } /// /// Returns an observable sequence that signals when the task completes. /// @@ -74,12 +133,7 @@ private static IObservable ToObservableImpl(Task task, IScheduler schedule return new Return(Unit.Default, scheduler); } - var subject = new AsyncSubject(); - var options = GetTaskContinuationOptions(scheduler); - - task.ContinueWith((t, subjectObject) => t.EmitTaskResult((AsyncSubject)subjectObject), subject, options); - - return subject.ToObservableResult(scheduler); + return new SlowTaskObservable(task, scheduler); } private static void EmitTaskResult(this Task task, IObserver subject) @@ -178,12 +232,7 @@ private static IObservable ToObservableImpl(Task task return new Return(task.Result, scheduler); } - var subject = new AsyncSubject(); - var options = GetTaskContinuationOptions(scheduler); - - task.ContinueWith((t, subjectObject) => t.EmitTaskResult((AsyncSubject)subjectObject), subject, options); - - return subject.ToObservableResult(scheduler); + return new SlowTaskObservable(task, scheduler); } private static void EmitTaskResult(this Task task, IObserver subject) From bdcdf8ad59d91335acbbd35ab39d79f50569f37e Mon Sep 17 00:00:00 2001 From: Daniel Weber Date: Thu, 5 Jul 2018 15:07:57 +0200 Subject: [PATCH 5/5] Cleanup. --- .../Threading/Tasks/TaskObservableExtensions.cs | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/Rx.NET/Source/src/System.Reactive/Threading/Tasks/TaskObservableExtensions.cs b/Rx.NET/Source/src/System.Reactive/Threading/Tasks/TaskObservableExtensions.cs index e7fe59bcb3..cdad6426e0 100644 --- a/Rx.NET/Source/src/System.Reactive/Threading/Tasks/TaskObservableExtensions.cs +++ b/Rx.NET/Source/src/System.Reactive/Threading/Tasks/TaskObservableExtensions.cs @@ -274,16 +274,6 @@ private static TaskContinuationOptions GetTaskContinuationOptions(IScheduler sch return options; } - private static IObservable ToObservableResult(this AsyncSubject subject, IScheduler scheduler) - { - if (scheduler != null) - { - return subject.ObserveOn(scheduler); - } - - return subject.AsObservable(); - } - internal static IDisposable Subscribe(this Task task, IObserver observer) { if (task.IsCompleted)