Skip to content

Commit

Permalink
Merge pull request #293 from danielcweber/OptimizeObservableToTask
Browse files Browse the repository at this point in the history
Reduce number of allocations (closures, delegates) in TaskObservableExtensions.ToTask
  • Loading branch information
shiftkey committed Nov 14, 2016
2 parents 91b4800 + 7c02407 commit 41eba4f
Showing 1 changed file with 56 additions and 39 deletions.
Expand Up @@ -287,6 +287,60 @@ public static Task<TResult> ToTask<TResult>(this IObservable<TResult> observable
return observable.ToTask(cancellationToken, null);
}

private sealed class ToTaskObserver<TResult> : IObserver<TResult>
{
private readonly CancellationToken _ct;
private readonly IDisposable _disposable;
private readonly TaskCompletionSource<TResult> _tcs;
private readonly CancellationTokenRegistration _ctr = default(CancellationTokenRegistration);

private bool _hasValue;
private TResult _lastValue;

public ToTaskObserver(TaskCompletionSource<TResult> tcs, IDisposable disposable, CancellationToken ct)
{
this._ct = ct;
this._tcs = tcs;
this._disposable = disposable;

if (ct.CanBeCanceled)
{
this._ctr = ct.Register(this.Cancel);
}
}

public void OnNext(TResult value)
{
this._hasValue = true;
this._lastValue = value;
}

public void OnError(Exception error)
{
this._tcs.TrySetException(error);

this._ctr.Dispose(); // no null-check needed (struct)
this._disposable.Dispose();
}

public void OnCompleted()
{
if (this._hasValue)
this._tcs.TrySetResult(this._lastValue);
else
this._tcs.TrySetException(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));

this._ctr.Dispose(); // no null-check needed (struct)
this._disposable.Dispose();
}

private void Cancel()
{
this._disposable.Dispose();
this._tcs.TrySetCanceled(this._ct);
}
}

/// <summary>
/// Returns a task that will receive the last value or the exception produced by the observable sequence.
/// </summary>
Expand All @@ -301,49 +355,12 @@ public static Task<TResult> ToTask<TResult>(this IObservable<TResult> observable
if (observable == null)
throw new ArgumentNullException(nameof(observable));

var hasValue = false;
var lastValue = default(TResult);

var tcs = new TaskCompletionSource<TResult>(state);

var disposable = new SingleAssignmentDisposable();

var ctr = default(CancellationTokenRegistration);

if (cancellationToken.CanBeCanceled)
{
ctr = cancellationToken.Register(() =>
{
disposable.Dispose();
tcs.TrySetCanceled(cancellationToken);
});
}

var taskCompletionObserver = new AnonymousObserver<TResult>(
value =>
{
hasValue = true;
lastValue = value;
},
ex =>
{
tcs.TrySetException(ex);
ctr.Dispose(); // no null-check needed (struct)
disposable.Dispose();
},
() =>
{
if (hasValue)
tcs.TrySetResult(lastValue);
else
tcs.TrySetException(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
ctr.Dispose(); // no null-check needed (struct)
disposable.Dispose();
}
);

var taskCompletionObserver = new ToTaskObserver<TResult>(tcs, disposable, cancellationToken);

//
// Subtle race condition: if the source completes before we reach the line below, the SingleAssigmentDisposable
// will already have been disposed. Upon assignment, the disposable resource being set will be disposed on the
Expand Down

0 comments on commit 41eba4f

Please sign in to comment.