Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Async.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,14 @@ public IDisposable Run(IScheduler self, TState s, Func<IScheduler, TState, Cance
action(new CancelableScheduler(self, _cts.Token), s, _cts.Token).ContinueWith(
(t, thisObject) =>
{
if (!t.IsCanceled)
{
var @this = (AsyncInvocation<TState>)thisObject;
var @this = (AsyncInvocation<TState>)thisObject;

t.Exception?.Handle(e => e is OperationCanceledException);
t.Exception?.Handle(e => e is OperationCanceledException);

Disposable.SetSingle(ref @this._run, t.Result);
}
Disposable.SetSingle(ref @this._run, t.Result);
},
this,
TaskContinuationOptions.ExecuteSynchronously);
TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.NotOnCanceled);

return this;
}
Expand Down
11 changes: 10 additions & 1 deletion Rx.NET/Source/src/System.Reactive/Linq/Observable/Merge.cs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ public _(IObserver<TSource> observer)
}

private readonly object _gate = new object();
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
private volatile int _count = 1;

public override void OnNext(Task<TSource> value)
Expand All @@ -299,7 +300,7 @@ public override void OnNext(Task<TSource> value)
}
else
{
value.ContinueWith((t, thisObject) => ((_)thisObject).OnCompletedTask(t), this);
value.ContinueWith((t, thisObject) => ((_)thisObject).OnCompletedTask(t), this, _cts.Token);
}
}

Expand Down Expand Up @@ -354,6 +355,14 @@ public override void OnCompleted()
}
}
}

protected override void Dispose(bool disposing)
{
if (disposing)
_cts.Cancel();

base.Dispose(disposing);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ private void AttachContinuation(TSource value, Task<TCollection> task)
//
// Separate method to avoid closure in synchronous completion case.
//
task.ContinueWith(t => OnCompletedTask(value, t));
task.ContinueWith(t => OnCompletedTask(value, t), _cancel.Token);
}

private void OnCompletedTask(TSource value, Task<TCollection> task)
Expand Down Expand Up @@ -758,7 +758,7 @@ private void AttachContinuation(TSource value, int index, Task<TCollection> task
//
// Separate method to avoid closure in synchronous completion case.
//
task.ContinueWith(t => OnCompletedTask(value, index, t));
task.ContinueWith(t => OnCompletedTask(value, index, t), _cancel.Token);
}

private void OnCompletedTask(TSource value, int index, Task<TCollection> task)
Expand Down Expand Up @@ -1538,7 +1538,7 @@ public override void OnNext(TSource value)
}
else
{
task.ContinueWith((closureTask, thisObject) => ((_)thisObject).OnCompletedTask(closureTask), this);
task.ContinueWith((closureTask, thisObject) => ((_)thisObject).OnCompletedTask(closureTask), this, _cts.Token);
}
}

Expand Down Expand Up @@ -1670,7 +1670,7 @@ public override void OnNext(TSource value)
}
else
{
task.ContinueWith((closureTask, thisObject) => ((_)thisObject).OnCompletedTask(closureTask), this);
task.ContinueWith((closureTask, thisObject) => ((_)thisObject).OnCompletedTask(closureTask), this, _cts.Token);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,65 @@ namespace System.Reactive.Threading.Tasks
/// </summary>
public static class TaskObservableExtensions
{
private sealed class SlowTaskObservable : IObservable<Unit>
{
private readonly Task _task;
private readonly IScheduler _scheduler;

public SlowTaskObservable(Task task, IScheduler scheduler)
{
_task = task;
_scheduler = scheduler;
}

public IDisposable Subscribe(IObserver<Unit> observer)
{
if (observer == null)
{
throw new ArgumentNullException(nameof(observer));
}

var cts = new CancellationDisposable();
var options = GetTaskContinuationOptions(_scheduler);

if (_scheduler == null)
_task.ContinueWith((t, subjectObject) => t.EmitTaskResult((IObserver<Unit>)subjectObject), observer, cts.Token, options, TaskScheduler.Current);
else
_task.ContinueWith((t, subjectObject) => _scheduler.ScheduleAction((t, subjectObject), tuple => tuple.t.EmitTaskResult((IObserver<Unit>)tuple.subjectObject)), observer, cts.Token, options, TaskScheduler.Current);

return cts;
}
}

private sealed class SlowTaskObservable<TResult> : IObservable<TResult>
{
private readonly Task<TResult> _task;
private readonly IScheduler _scheduler;

public SlowTaskObservable(Task<TResult> task, IScheduler scheduler)
{
_task = task;
_scheduler = scheduler;
}

public IDisposable Subscribe(IObserver<TResult> observer)
{
if (observer == null)
{
throw new ArgumentNullException(nameof(observer));
}

var cts = new CancellationDisposable();
var options = GetTaskContinuationOptions(_scheduler);

if (_scheduler == null)
_task.ContinueWith((t, subjectObject) => t.EmitTaskResult((IObserver<TResult>)subjectObject), observer, cts.Token, options, TaskScheduler.Current);
else
_task.ContinueWith((t, subjectObject) => _scheduler.ScheduleAction((t, subjectObject), tuple => tuple.t.EmitTaskResult((IObserver<TResult>)tuple.subjectObject)), observer, cts.Token, options, TaskScheduler.Current);

return cts;
}
}
/// <summary>
/// Returns an observable sequence that signals when the task completes.
/// </summary>
Expand Down Expand Up @@ -74,12 +133,7 @@ private static IObservable<Unit> ToObservableImpl(Task task, IScheduler schedule
return new Return<Unit>(Unit.Default, scheduler);
}

var subject = new AsyncSubject<Unit>();
var options = GetTaskContinuationOptions(scheduler);

task.ContinueWith((t, subjectObject) => t.EmitTaskResult((AsyncSubject<Unit>)subjectObject), subject, options);

return subject.ToObservableResult(scheduler);
return new SlowTaskObservable(task, scheduler);
}

private static void EmitTaskResult(this Task task, IObserver<Unit> subject)
Expand Down Expand Up @@ -178,12 +232,7 @@ private static IObservable<TResult> ToObservableImpl<TResult>(Task<TResult> task
return new Return<TResult>(task.Result, scheduler);
}

var subject = new AsyncSubject<TResult>();
var options = GetTaskContinuationOptions(scheduler);

task.ContinueWith((t, subjectObject) => t.EmitTaskResult((AsyncSubject<TResult>)subjectObject), subject, options);

return subject.ToObservableResult(scheduler);
return new SlowTaskObservable<TResult>(task, scheduler);
}

private static void EmitTaskResult<TResult>(this Task<TResult> task, IObserver<TResult> subject)
Expand Down Expand Up @@ -225,16 +274,6 @@ private static TaskContinuationOptions GetTaskContinuationOptions(IScheduler sch
return options;
}

private static IObservable<TResult> ToObservableResult<TResult>(this AsyncSubject<TResult> subject, IScheduler scheduler)
{
if (scheduler != null)
{
return subject.ObserveOn(scheduler);
}

return subject.AsObservable();
}

internal static IDisposable Subscribe<TResult>(this Task<TResult> task, IObserver<TResult> observer)
{
if (task.IsCompleted)
Expand Down