Skip to content

Commit

Permalink
Initial implementation of C# 7.0 task-like support for observable seq…
Browse files Browse the repository at this point in the history
…uences.
  • Loading branch information
bartdesmet committed Apr 12, 2017
1 parent 48f434a commit 91e26a6
Show file tree
Hide file tree
Showing 5 changed files with 369 additions and 2 deletions.
55 changes: 55 additions & 0 deletions Rx.NET/Source/src/System.Reactive/Observable.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

using System.Runtime.CompilerServices;

namespace System.Reactive
{
/// <summary>
/// Implementation of the IObservable&lt;T&gt; interface compatible with async method return types.
/// </summary>
/// <remarks>
/// This class implements a "task-like" type that can be used as the return type of an asynchronous
/// method in C# 7.0 and beyond. For example:
/// <code>
/// async Observable&lt;int&gt; RxAsync()
/// {
/// var res = await Observable.Return(21).Delay(TimeSpan.FromSeconds(1));
/// return res * 2;
/// }
/// </code>
/// </remarks>
/// <typeparam name="T">The type of the elements in the sequence.</typeparam>
[AsyncMethodBuilder(typeof(ObservableMethodBuilder<>))]
public sealed class Observable<T> : IObservable<T>
{
/// <summary>
/// The underlying observable sequence to subscribe to.
/// </summary>
private readonly IObservable<T> _inner;

/// <summary>
/// Creates a new task-like observable instance using the specified <paramref name="inner"/> observable sequence.
/// </summary>
/// <param name="inner">The underlying observable sequence to subscribe to.</param>
internal Observable(IObservable<T> inner)
{
_inner = inner;
}

/// <summary>
/// Subscribes the given observer to the observable sequence.
/// </summary>
/// <param name="observer">Observer that will receive notifications from the observable sequence.</param>
/// <returns>Disposable object representing an observer's subscription to the observable sequence.</returns>
/// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
public IDisposable Subscribe(IObserver<T> observer)
{
if (observer == null)
throw new ArgumentNullException(nameof(observer));

return _inner.Subscribe(observer);
}
}
}
4 changes: 2 additions & 2 deletions Rx.NET/Source/src/System.Reactive/ObservableBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
namespace System.Reactive
{
/// <summary>
/// Abstract base class for implementations of the IObservable&lt;T&gt; interface.
/// Abstract base class for implementations of the <see cref="IObservable{T}"/> interface.
/// </summary>
/// <remarks>
/// If you don't need a named type to create an observable sequence (i.e. you rather need
Expand All @@ -30,7 +30,7 @@ public IDisposable Subscribe(IObserver<T> observer)
throw new ArgumentNullException(nameof(observer));

var autoDetachObserver = new AutoDetachObserver<T>(observer);

if (CurrentThreadScheduler.IsScheduleRequired)
{
//
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

namespace System.Runtime.CompilerServices
{
/// <summary>
/// Attribute to decorate a task-like type to specify a compatible asynchronous method builder.
/// </summary>
internal sealed class AsyncMethodBuilderAttribute : Attribute
{
/// <summary>
/// Creates a new instance of the attribute using the specified <paramref name="type"/>.
/// </summary>
/// <param name="type">The type implementing the asynchronous method builder.</param>
public AsyncMethodBuilderAttribute(Type type)
{
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Runtime.ExceptionServices;
using System.Security;

namespace System.Runtime.CompilerServices
{
/// <summary>
/// Represents a builder for asynchronous methods that return a task-like <see cref="Observable{T}"/>.
/// </summary>
/// <typeparam name="T">The type of the elements in the sequence.</typeparam>
public struct ObservableMethodBuilder<T>
{
/// <summary>
/// The compiler-generated asynchronous state machine representing the execution flow of the asynchronous
/// method whose return type is a task-like <see cref="Observable{T}"/>.
/// </summary>
private IAsyncStateMachine _stateMachine;

/// <summary>
/// The underlying observable sequence representing the result produced by the asynchronous method.
/// </summary>
private IObservable<T> _result;

/// <summary>
/// Creates an instance of the <see cref="ObservableMethodBuilder{T}"/> struct.
/// </summary>
/// <returns>A new instance of the struct.</returns>
public static ObservableMethodBuilder<T> Create() => default(ObservableMethodBuilder<T>);

/// <summary>
/// Begins running the builder with the associated state machine.
/// </summary>
/// <typeparam name="TStateMachine">The type of the state machine.</typeparam>
/// <param name="stateMachine">The state machine instance, passed by reference.</param>
/// <exception cref="ArgumentNullException"><paramref name="stateMachine"/> is <c>null</c>.</exception>
public void Start<TStateMachine>(ref TStateMachine stateMachine)
where TStateMachine : IAsyncStateMachine
{
if (stateMachine == null)
throw new ArgumentNullException(nameof(stateMachine));

stateMachine.MoveNext();
}

/// <summary>
/// Associates the builder with the specified state machine.
/// </summary>
/// <param name="stateMachine">The state machine instance to associate with the builder.</param>
/// <exception cref="ArgumentNullException"><paramref name="stateMachine"/> is <c>null</c>.</exception>
/// <exception cref="InvalidOperationException">The state machine was previously set.</exception>
public void SetStateMachine(IAsyncStateMachine stateMachine)
{
if (stateMachine == null)
throw new ArgumentNullException(nameof(stateMachine));

if (_stateMachine != null)
throw new InvalidOperationException();

_stateMachine = stateMachine;
}

/// <summary>
/// Marks the observable as successfully completed.
/// </summary>
/// <param name="result">The result to use to complete the observable sequence.</param>
/// <exception cref="InvalidOperationException">The observable has already completed.</exception>
public void SetResult(T result)
{
if (_result == null)
{
_result = Observable.Return<T>(result);
}
else
{
var subject = _result as AsyncSubject<T>;

// NB: The IsCompleted is not protected by the subject's lock, so we could get a dirty read.
//
// We can live with this limitation and merely put in this check to catch invalid
// manual usage for which behavior is undefined. In the compiler-generated code that
// interacts with the asynchronous method builder, no concurrent calls to the Set*
// methods should occur.

if (subject == null || subject.IsCompleted)
throw new InvalidOperationException();

subject.OnNext(result);
subject.OnCompleted();
}
}

/// <summary>
/// Marks the observable as failed and binds the specified exception to the observable sequence.
/// </summary>
/// <param name="exception">The exception to bind to the observable sequence.</param>
/// <exception cref="ArgumentNullException"><paramref name="exception"/> is <c>null</c>.</exception>
/// <exception cref="InvalidOperationException">The observable has already completed.</exception>
public void SetException(Exception exception)
{
if (exception == null)
throw new ArgumentNullException(nameof(exception));

if (_result == null)
{
_result = Observable.Throw<T>(exception);
}
else
{
var subject = _result as AsyncSubject<T>;

// NB: The IsCompleted is not protected by the subject's lock, so we could get a dirty read.
//
// We can live with this limitation and merely put in this check to catch invalid
// manual usage for which behavior is undefined. In the compiler-generated code that
// interacts with the asynchronous method builder, no concurrent calls to the Set*
// methods should occur.

if (subject == null || subject.IsCompleted)
throw new InvalidOperationException();

subject.OnError(exception);
}
}

/// <summary>
/// Gets the observable sequence for this builder.
/// </summary>
public Observable<T> Task => new Observable<T>(_result ?? (_result = new AsyncSubject<T>()));

/// <summary>
/// Schedules the state machine to proceed to the next action when the specified awaiter completes.
/// </summary>
/// <typeparam name="TAwaiter">The type of the awaiter.</typeparam>
/// <typeparam name="TStateMachine">The type of the state machine.</typeparam>
/// <param name="awaiter">The awaiter.</param>
/// <param name="stateMachine">The state machine.</param>
public void AwaitOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter, ref TStateMachine stateMachine)
where TAwaiter : INotifyCompletion
where TStateMachine : IAsyncStateMachine
{
try
{
if (_stateMachine == null)
{
var ignored = Task; // NB: Ensure we have the observable backed by an async subject ready.

_stateMachine = stateMachine;
_stateMachine.SetStateMachine(_stateMachine);
}

// NB: Rx has historically not bothered with execution contexts, so we don't do it here either.

awaiter.OnCompleted(_stateMachine.MoveNext);
}
catch (Exception ex)
{
// NB: Prevent reentrancy into the async state machine when an exception would be observed
// by the caller. This could cause concurrent execution of the async method. Instead,
// rethrow the exception elsewhere.

Rethrow(ex);
}
}

/// <summary>
/// Schedules the state machine to proceed to the next action when the specified awaiter completes.
/// </summary>
/// <typeparam name="TAwaiter">The type of the awaiter.</typeparam>
/// <typeparam name="TStateMachine">The type of the state machine.</typeparam>
/// <param name="awaiter">The awaiter.</param>
/// <param name="stateMachine">The state machine.</param>
[SecuritySafeCritical]
public void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter, ref TStateMachine stateMachine)
where TAwaiter : ICriticalNotifyCompletion
where TStateMachine : IAsyncStateMachine
{
try
{
if (_stateMachine == null)
{
var ignored = Task; // NB: Ensure we have the observable backed by an async subject ready.

_stateMachine = stateMachine;
_stateMachine.SetStateMachine(_stateMachine);
}

// NB: Rx has historically not bothered with execution contexts, so we don't do it here either.

awaiter.UnsafeOnCompleted(_stateMachine.MoveNext);
}
catch (Exception ex)
{
// NB: Prevent reentrancy into the async state machine when an exception would be observed
// by the caller. This could cause concurrent execution of the async method. Instead,
// rethrow the exception elsewhere.

Rethrow(ex);
}
}

/// <summary>
/// Rethrows an exception that was thrown from an awaiter's OnCompleted methods.
/// </summary>
/// <param name="exception">The exception to rethrow.</param>
private static void Rethrow(Exception exception)
{
Scheduler.Default.Schedule(ExceptionDispatchInfo.Capture(exception), (state, recurse) => state.Throw());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

using System;
using System.Reactive;
using System.Reactive.Linq;
using System.Threading.Tasks;
using Xunit;

namespace Tests.System.Reactive.Tests
{
public class TaskLikeSupportTest
{
[Fact]
public async Task Return()
{
Assert.Equal(42, await ManOrBoy_Return());
}

#pragma warning disable 1998
private async Observable<int> ManOrBoy_Return()
{
return 42;
}
#pragma warning restore 1998

[Fact]
public async Task Throw()
{
await Assert.ThrowsAsync<DivideByZeroException>(async () => await ManOrBoy_Throw(42, 0));
}

#pragma warning disable 1998
private async Observable<int> ManOrBoy_Throw(int n, int d)
{
return n / d;
}
#pragma warning restore 1998

[Fact]
public async Task Basics()
{
Assert.Equal(45, await ManOrBoy_Basics());
}

#pragma warning disable 1998
private async Observable<int> ManOrBoy_Basics()
{
var res = 0;

for (var i = 0; i < 10; i++)
{
switch (i % 4)
{
case 0:
res += await Observable.Return(i);
break;
case 1:
res += await Observable.Return(i).Delay(TimeSpan.FromMilliseconds(50));
break;
case 2:
res += await Task.FromResult(i);
break;
case 3:
res += await Task.Run(() => { Task.Delay(50).Wait(); return i; });
break;
}
}

return res;
}
#pragma warning restore 1998
}
}

0 comments on commit 91e26a6

Please sign in to comment.