Skip to content

Commit

Permalink
Switching to an interface-based model.
Browse files Browse the repository at this point in the history
  • Loading branch information
bartdesmet committed Apr 13, 2017
1 parent 91e26a6 commit 0837259
Show file tree
Hide file tree
Showing 5 changed files with 229 additions and 99 deletions.
55 changes: 0 additions & 55 deletions Rx.NET/Source/src/System.Reactive/Observable.cs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,35 @@

using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Runtime.ExceptionServices;
using System.Security;

namespace System.Runtime.CompilerServices
{
/// <summary>
/// Represents a builder for asynchronous methods that return a task-like <see cref="Observable{T}"/>.
/// Represents a builder for asynchronous methods that return a task-like <see cref="ITaskObservable{T}"/>.
/// </summary>
/// <typeparam name="T">The type of the elements in the sequence.</typeparam>
public struct ObservableMethodBuilder<T>
public struct TaskObservableMethodBuilder<T>
{
/// <summary>
/// The compiler-generated asynchronous state machine representing the execution flow of the asynchronous
/// method whose return type is a task-like <see cref="Observable{T}"/>.
/// method whose return type is a task-like <see cref="ITaskObservable{T}"/>.
/// </summary>
private IAsyncStateMachine _stateMachine;

/// <summary>
/// The underlying observable sequence representing the result produced by the asynchronous method.
/// </summary>
private IObservable<T> _result;
private TaskObservable _inner;

/// <summary>
/// Creates an instance of the <see cref="ObservableMethodBuilder{T}"/> struct.
/// Creates an instance of the <see cref="TaskObservableMethodBuilder{T}"/> struct.
/// </summary>
/// <returns>A new instance of the struct.</returns>
public static ObservableMethodBuilder<T> Create() => default(ObservableMethodBuilder<T>);
public static TaskObservableMethodBuilder<T> Create() => default(TaskObservableMethodBuilder<T>);

/// <summary>
/// Begins running the builder with the associated state machine.
Expand Down Expand Up @@ -73,26 +73,13 @@ public void SetStateMachine(IAsyncStateMachine stateMachine)
/// <exception cref="InvalidOperationException">The observable has already completed.</exception>
public void SetResult(T result)
{
if (_result == null)
if (_inner == null)
{
_result = Observable.Return<T>(result);
_inner = new TaskObservable(result);
}
else
{
var subject = _result as AsyncSubject<T>;

// NB: The IsCompleted is not protected by the subject's lock, so we could get a dirty read.
//
// We can live with this limitation and merely put in this check to catch invalid
// manual usage for which behavior is undefined. In the compiler-generated code that
// interacts with the asynchronous method builder, no concurrent calls to the Set*
// methods should occur.

if (subject == null || subject.IsCompleted)
throw new InvalidOperationException();

subject.OnNext(result);
subject.OnCompleted();
_inner.SetResult(result);
}
}

Expand All @@ -107,32 +94,20 @@ public void SetException(Exception exception)
if (exception == null)
throw new ArgumentNullException(nameof(exception));

if (_result == null)
if (_inner == null)
{
_result = Observable.Throw<T>(exception);
_inner = new TaskObservable(exception);
}
else
{
var subject = _result as AsyncSubject<T>;

// NB: The IsCompleted is not protected by the subject's lock, so we could get a dirty read.
//
// We can live with this limitation and merely put in this check to catch invalid
// manual usage for which behavior is undefined. In the compiler-generated code that
// interacts with the asynchronous method builder, no concurrent calls to the Set*
// methods should occur.

if (subject == null || subject.IsCompleted)
throw new InvalidOperationException();

subject.OnError(exception);
_inner.SetException(exception);
}
}

/// <summary>
/// Gets the observable sequence for this builder.
/// </summary>
public Observable<T> Task => new Observable<T>(_result ?? (_result = new AsyncSubject<T>()));
public ITaskObservable<T> Task => _inner ?? (_inner = new TaskObservable());

/// <summary>
/// Schedules the state machine to proceed to the next action when the specified awaiter completes.
Expand Down Expand Up @@ -211,7 +186,164 @@ public void SetException(Exception exception)
/// <param name="exception">The exception to rethrow.</param>
private static void Rethrow(Exception exception)
{
Scheduler.Default.Schedule(ExceptionDispatchInfo.Capture(exception), (state, recurse) => state.Throw());
Scheduler.Default.Schedule(exception, (ex, recurse) => ex.Throw());
}

/// <summary>
/// Implementation of the IObservable&lt;T&gt; interface compatible with async method return types.
/// </summary>
/// <remarks>
/// This class implements a "task-like" type that can be used as the return type of an asynchronous
/// method in C# 7.0 and beyond. For example:
/// <code>
/// async Observable&lt;int&gt; RxAsync()
/// {
/// var res = await Observable.Return(21).Delay(TimeSpan.FromSeconds(1));
/// return res * 2;
/// }
/// </code>
/// </remarks>
/// <typeparam name="T">The type of the elements in the sequence.</typeparam>
internal sealed class TaskObservable : ITaskObservable<T>, ITaskObservableAwaiter<T>
{
/// <summary>
/// The underlying observable sequence to subscribe to in case the asynchronous method did not
/// finish synchronously.
/// </summary>
private readonly AsyncSubject<T> _subject;

/// <summary>
/// The result returned by the asynchronous method in case the method finished synchronously.
/// </summary>
private readonly T _result;

/// <summary>
/// The exception thrown by the asynchronous method in case the method finished synchronously.
/// </summary>
private readonly Exception _exception;

/// <summary>
/// Creates a new <see cref="TaskObservable"/> for an asynchronous method that has not finished yet.
/// </summary>
public TaskObservable()
{
_subject = new AsyncSubject<T>();
}

/// <summary>
/// Creates a new <see cref="TaskObservable"/> for an asynchronous method that synchronously returned
/// the specified <paramref name="result"/> value.
/// </summary>
/// <param name="result">The result returned by the asynchronous method.</param>
public TaskObservable(T result)
{
_result = result;
}

/// <summary>
/// Creates a new <see cref="TaskObservable"/> for an asynchronous method that synchronously threw
/// the specified <paramref name="exception"/>.
/// </summary>
/// <param name="exception">The exception thrown by the asynchronous method.</param>
public TaskObservable(Exception exception)
{
_exception = exception;
}

/// <summary>
/// Marks the observable as successfully completed.
/// </summary>
/// <param name="result">The result to use to complete the observable sequence.</param>
/// <exception cref="InvalidOperationException">The observable has already completed.</exception>
public void SetResult(T result)
{
if (IsCompleted)
throw new InvalidOperationException();

_subject.OnNext(result);
_subject.OnCompleted();
}

/// <summary>
/// Marks the observable as failed and binds the specified exception to the observable sequence.
/// </summary>
/// <param name="exception">The exception to bind to the observable sequence.</param>
/// <exception cref="ArgumentNullException"><paramref name="exception"/> is <c>null</c>.</exception>
/// <exception cref="InvalidOperationException">The observable has already completed.</exception>
public void SetException(Exception exception)
{
if (IsCompleted)
throw new InvalidOperationException();

_subject.OnError(exception);
}

/// <summary>
/// Subscribes the given observer to the observable sequence.
/// </summary>
/// <param name="observer">Observer that will receive notifications from the observable sequence.</param>
/// <returns>Disposable object representing an observer's subscription to the observable sequence.</returns>
/// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
public IDisposable Subscribe(IObserver<T> observer)
{
if (_subject != null)
{
return _subject.Subscribe(observer);
}
else if (_exception != null)
{
observer.OnError(_exception);
return Disposable.Empty;
}
else
{
observer.OnNext(_result);
return Disposable.Empty;
}
}

/// <summary>
/// Gets an awaiter that can be used to await the eventual completion of the observable sequence.
/// </summary>
/// <returns>An awaiter that can be used to await the eventual completion of the observable sequence.</returns>
public ITaskObservableAwaiter<T> GetAwaiter() => this;

/// <summary>
/// Gets a Boolean indicating whether the observable sequence has completed.
/// </summary>
public bool IsCompleted => _subject?.IsCompleted ?? true;

/// <summary>
/// Gets the result produced by the observable sequence.
/// </summary>
/// <returns>The result produced by the observable sequence.</returns>
public T GetResult()
{
if (_subject != null)
{
return _subject.GetResult();
}

_exception.ThrowIfNotNull();

return _result;
}

/// <summary>
/// Attaches the specified <paramref name="continuation"/> to the observable sequence.
/// </summary>
/// <param name="continuation">The continuation to attach.</param>
public void OnCompleted(Action continuation)
{
if (_subject != null)
{
_subject.OnCompleted(continuation);
}
else
{
continuation();
}
}
}
}
}
2 changes: 1 addition & 1 deletion Rx.NET/Source/src/System.Reactive/Subjects/AsyncSubject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ private void OnCompleted(Action continuation, bool originalContext)
this.Subscribe/*Unsafe*/(new AwaitObserver(continuation, originalContext));
}

class AwaitObserver : IObserver<T>
private sealed class AwaitObserver : IObserver<T>
{
private readonly SynchronizationContext _context;
private readonly Action _callback;
Expand Down
53 changes: 53 additions & 0 deletions Rx.NET/Source/src/System.Reactive/TaskObservable.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

using System.Runtime.CompilerServices;

namespace System.Reactive
{
/// <summary>
/// Extension of the <see cref="IObservable{T}"/> interface compatible with async method return types.
/// </summary>
/// <remarks>
/// This class implements a "task-like" type that can be used as the return type of an asynchronous
/// method in C# 7.0 and beyond. For example:
/// <code>
/// async ITaskObservable&lt;int&gt; RxAsync()
/// {
/// var res = await Observable.Return(21).Delay(TimeSpan.FromSeconds(1));
/// return res * 2;
/// }
/// </code>
/// </remarks>
/// <typeparam name="T">The type of the elements in the sequence.</typeparam>
[AsyncMethodBuilder(typeof(TaskObservableMethodBuilder<>))]
public interface ITaskObservable<out T> : IObservable<T>
{
// NB: An interface type is preferred to enable the use of covariance.

/// <summary>
/// Gets an awaiter that can be used to await the eventual completion of the observable sequence.
/// </summary>
/// <returns>An awaiter that can be used to await the eventual completion of the observable sequence.</returns>
ITaskObservableAwaiter<T> GetAwaiter();
}

/// <summary>
/// Interface representing an awaiter for an <see cref="ITaskObservable{T}"/>.
/// </summary>
/// <typeparam name="T">The type of the elements in the sequence.</typeparam>
public interface ITaskObservableAwaiter<out T> : INotifyCompletion
{
/// <summary>
/// Gets a Boolean indicating whether the observable sequence has completed.
/// </summary>
bool IsCompleted { get; }

/// <summary>
/// Gets the result produced by the observable sequence.
/// </summary>
/// <returns>The result produced by the observable sequence.</returns>
T GetResult();
}
}

0 comments on commit 0837259

Please sign in to comment.