Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions Rx.NET/Source/src/System.Reactive/AnonymousObservable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,46 @@ protected override IDisposable SubscribeCore(IObserver<T> observer)
{
return _subscribe(observer) ?? Disposable.Empty;
}

public static StatefulAnonymousObservable<T, TState> CreateStateful<TState>(Func<IObserver<T>, TState, IDisposable> subscribe, TState state)
{
return new StatefulAnonymousObservable<T, TState>(subscribe, state);
}
}

/// <summary>
/// Class to create an <see cref="IObservable{T}"/> instance from a delegate-based implementation of the <see cref="IObservable{T}.Subscribe(IObserver{T})"/> method.
/// </summary>
/// <typeparam name="T">The type of the elements in the sequence.</typeparam>
/// <typeparam name="TState">The type of the state that is passed to the subscription function.</typeparam>
public sealed class StatefulAnonymousObservable<T, TState> : ObservableBase<T>
{
private readonly TState _state;
private readonly Func<IObserver<T>, TState, IDisposable> _subscribe;

/// <summary>
/// Creates an observable sequence object from the specified subscription function.
/// </summary>
/// <param name="subscribe"><see cref="IObservable{T}.Subscribe(IObserver{T})"/> method implementation.</param>
/// <param name="state">The state to pass to the subscription function.</param>
/// <exception cref="ArgumentNullException"><paramref name="subscribe"/> is <c>null</c>.</exception>
public StatefulAnonymousObservable(Func<IObserver<T>, TState, IDisposable> subscribe, TState state)
{
if (subscribe == null)
throw new ArgumentNullException(nameof(subscribe));

_state = state;
_subscribe = subscribe;
}

/// <summary>
/// Calls the subscription function that was supplied to the constructor.
/// </summary>
/// <param name="observer">Observer to send notifications to.</param>
/// <returns>Disposable object representing an observer's subscription to the observable sequence.</returns>
protected override IDisposable SubscribeCore(IObserver<T> observer)
{
return _subscribe(observer, this._state) ?? Disposable.Empty;
}
}
}
24 changes: 24 additions & 0 deletions Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Simple.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,30 @@ public static IDisposable Schedule(this IScheduler scheduler, Action action)
return scheduler.Schedule(action, Invoke);
}

/// <summary>
/// Schedules an action to be executed.
/// </summary>
/// <param name="scheduler">Scheduler to execute the action on.</param>
/// <param name="action">Action to execute.</param>
/// <param name="state">A state object to be passed to <paramref name="action"/>.</param>
/// <returns>The disposable object used to cancel the scheduled action (best effort).</returns>
/// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is <c>null</c>.</exception>
internal static IDisposable Schedule<TState>(this IScheduler scheduler, Action<TState> action, TState state)
{
if (scheduler == null)
throw new ArgumentNullException(nameof(scheduler));
if (action == null)
throw new ArgumentNullException(nameof(action));

return scheduler.Schedule(
(action, state),
(_, tuple) =>
{
tuple.action(tuple.state);
return Disposable.Empty;
});
}

/// <summary>
/// Schedules an action to be executed after the specified relative due time.
/// </summary>
Expand Down
49 changes: 28 additions & 21 deletions Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,24 @@ public static IObservable<TSource> SubscribeOn<TSource>(IObservable<TSource> sou
if (scheduler == null)
throw new ArgumentNullException(nameof(scheduler));

return new AnonymousObservable<TSource>(observer =>
{
var m = new SingleAssignmentDisposable();
var d = new SerialDisposable();
d.Disposable = m;

m.Disposable = scheduler.Schedule(() =>
return AnonymousObservable<TSource>.CreateStateful(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about not using a delegate either but extending some base class?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All this can be considered, but it would make review and merging at this point even more complicated. Instead I tried to keep replacements straightforward and easy to review.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess in the end, there will be no justification to use AnonymousObservable from inside Rx any more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible that this or some other class will be re-optimized in the future, depending on how the project leads decide upon critical components, which would lead to wasted "small step changes" on our parts. Just say'n.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AnonymousX in Rx were a workaround because C# still doesn't support anonymous inner classes like Java does and the devs wasn't apparently very keen on just defining new dedicated classes.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just say'n.

Don't really know what that's supposed to mean again. Try to encourage people for a change.

(observer, closureTuple1) =>
{
d.Disposable = new ScheduledDisposable(scheduler, source.SubscribeSafe(observer));
});
var m = new SingleAssignmentDisposable();
var d = new SerialDisposable();

d.Disposable = m;

m.Disposable = closureTuple1.scheduler.Schedule(
closureTuple2 =>
{
closureTuple2.d.Disposable = new ScheduledDisposable(closureTuple2.scheduler, closureTuple2.source.SubscribeSafe(closureTuple2.observer));
},
(closureTuple1.scheduler, closureTuple1.source, observer, d));

return d;
});
return d;
},
(source, scheduler));
}

/// <summary>
Expand All @@ -69,18 +74,20 @@ public static IObservable<TSource> SubscribeOn<TSource>(IObservable<TSource> sou
if (context == null)
throw new ArgumentNullException(nameof(context));

return new AnonymousObservable<TSource>(observer =>
{
var subscription = new SingleAssignmentDisposable();
context.PostWithStartComplete(() =>
return AnonymousObservable<TSource>.CreateStateful(
(observer, closureTuple) =>
{
if (!subscription.IsDisposed)
var subscription = new SingleAssignmentDisposable();
closureTuple.context.PostWithStartComplete(() =>
{
subscription.Disposable = new ContextDisposable(context, source.SubscribeSafe(observer));
}
});
return subscription;
});
if (!subscription.IsDisposed)
{
subscription.Disposable = new ContextDisposable(closureTuple.context, closureTuple.source.SubscribeSafe(observer));
}
});
return subscription;
},
(source, context));
}

#endregion
Expand Down
36 changes: 20 additions & 16 deletions Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Async.cs
Original file line number Diff line number Diff line change
Expand Up @@ -725,14 +725,16 @@ private IObservable<TSource> StartAsyncImpl<TSource>(Func<CancellationToken, Tas
result = task.ToObservable();
}

return new AnonymousObservable<TSource>(observer =>
{
//
// [OK] Use of unsafe Subscribe: result is an AsyncSubject<TSource>.
//
var subscription = result.Subscribe/*Unsafe*/(observer);
return StableCompositeDisposable.Create(cancellable, subscription);
});
return AnonymousObservable<TSource>.CreateStateful(
(observer, closureTuple) =>
{
//
// [OK] Use of unsafe Subscribe: result is an AsyncSubject<TSource>.
//
var subscription = closureTuple.result.Subscribe/*Unsafe*/(observer);
return StableCompositeDisposable.Create(closureTuple.cancellable, subscription);
},
(result, cancellable));
}

#endregion
Expand Down Expand Up @@ -816,14 +818,16 @@ private IObservable<Unit> StartAsyncImpl(Func<CancellationToken, Task> actionAsy
result = task.ToObservable();
}

return new AnonymousObservable<Unit>(observer =>
{
//
// [OK] Use of unsafe Subscribe: result is an AsyncSubject<TSource>.
//
var subscription = result.Subscribe/*Unsafe*/(observer);
return StableCompositeDisposable.Create(cancellable, subscription);
});
return AnonymousObservable<Unit>.CreateStateful(
(observer, closureTuple) =>
{
//
// [OK] Use of unsafe Subscribe: result is an AsyncSubject<TSource>.
//
var subscription = closureTuple.result.Subscribe/*Unsafe*/(observer);
return StableCompositeDisposable.Create(closureTuple.cancellable, subscription);
},
(cancellable, result));
}

#endregion
Expand Down
102 changes: 55 additions & 47 deletions Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ public virtual IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, IDi

public virtual IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, Action> subscribe)
{
return new AnonymousObservable<TSource>(o =>
{
var a = subscribe(o);
return a != null ? Disposable.Create(a) : Disposable.Empty;
});
return AnonymousObservable<TSource>.CreateStateful(
(o, closureSubscribe) =>
{
var a = closureSubscribe(o);
return a != null ? Disposable.Create(a) : Disposable.Empty;
},
subscribe);
}

#endregion
Expand All @@ -38,16 +40,18 @@ public virtual IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, Act

public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, CancellationToken, Task> subscribeAsync)
{
return new AnonymousObservable<TResult>(observer =>
{
var cancellable = new CancellationDisposable();

var taskObservable = subscribeAsync(observer, cancellable.Token).ToObservable();
var taskCompletionObserver = new AnonymousObserver<Unit>(Stubs<Unit>.Ignore, observer.OnError, observer.OnCompleted);
var subscription = taskObservable.Subscribe(taskCompletionObserver);

return StableCompositeDisposable.Create(cancellable, subscription);
});
return AnonymousObservable<TResult>.CreateStateful(
(observer, closureSubscribeAsync) =>
{
var cancellable = new CancellationDisposable();

var taskObservable = closureSubscribeAsync(observer, cancellable.Token).ToObservable();
var taskCompletionObserver = new AnonymousObserver<Unit>(Stubs<Unit>.Ignore, observer.OnError, observer.OnCompleted);
var subscription = taskObservable.Subscribe(taskCompletionObserver);

return StableCompositeDisposable.Create(cancellable, subscription);
},
subscribeAsync);
}

public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, Task> subscribeAsync)
Expand All @@ -57,22 +61,24 @@ public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, Tas

public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, CancellationToken, Task<IDisposable>> subscribeAsync)
{
return new AnonymousObservable<TResult>(observer =>
{
var subscription = new SingleAssignmentDisposable();
var cancellable = new CancellationDisposable();

var taskObservable = subscribeAsync(observer, cancellable.Token).ToObservable();
var taskCompletionObserver = new AnonymousObserver<IDisposable>(d => subscription.Disposable = d ?? Disposable.Empty, observer.OnError, Stubs.Nop);

//
// We don't cancel the subscription below *ever* and want to make sure the returned resource gets disposed eventually.
// Notice because we're using the AnonymousObservable<T> type, we get auto-detach behavior for free.
//
taskObservable.Subscribe(taskCompletionObserver);

return StableCompositeDisposable.Create(cancellable, subscription);
});
return AnonymousObservable<TResult>.CreateStateful(
(observer, closureSubscribeAsync) =>
{
var subscription = new SingleAssignmentDisposable();
var cancellable = new CancellationDisposable();

var taskObservable = closureSubscribeAsync(observer, cancellable.Token).ToObservable();
var taskCompletionObserver = new AnonymousObserver<IDisposable>(d => subscription.Disposable = d ?? Disposable.Empty, observer.OnError, Stubs.Nop);

//
// We don't cancel the subscription below *ever* and want to make sure the returned resource gets disposed eventually.
// Notice because we're using the AnonymousObservable<T> type, we get auto-detach behavior for free.
//
taskObservable.Subscribe(taskCompletionObserver);

return StableCompositeDisposable.Create(cancellable, subscription);
},
subscribeAsync);
}

public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, Task<IDisposable>> subscribeAsync)
Expand All @@ -82,22 +88,24 @@ public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, Tas

public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, CancellationToken, Task<Action>> subscribeAsync)
{
return new AnonymousObservable<TResult>(observer =>
{
var subscription = new SingleAssignmentDisposable();
var cancellable = new CancellationDisposable();

var taskObservable = subscribeAsync(observer, cancellable.Token).ToObservable();
var taskCompletionObserver = new AnonymousObserver<Action>(a => subscription.Disposable = a != null ? Disposable.Create(a) : Disposable.Empty, observer.OnError, Stubs.Nop);

//
// We don't cancel the subscription below *ever* and want to make sure the returned resource eventually gets disposed.
// Notice because we're using the AnonymousObservable<T> type, we get auto-detach behavior for free.
//
taskObservable.Subscribe(taskCompletionObserver);

return StableCompositeDisposable.Create(cancellable, subscription);
});
return AnonymousObservable<TResult>.CreateStateful(
(observer, closureSubscribeAsync) =>
{
var subscription = new SingleAssignmentDisposable();
var cancellable = new CancellationDisposable();

var taskObservable = closureSubscribeAsync(observer, cancellable.Token).ToObservable();
var taskCompletionObserver = new AnonymousObserver<Action>(a => subscription.Disposable = a != null ? Disposable.Create(a) : Disposable.Empty, observer.OnError, Stubs.Nop);

//
// We don't cancel the subscription below *ever* and want to make sure the returned resource eventually gets disposed.
// Notice because we're using the AnonymousObservable<T> type, we get auto-detach behavior for free.
//
taskObservable.Subscribe(taskCompletionObserver);

return StableCompositeDisposable.Create(cancellable, subscription);
},
subscribeAsync);
}

public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, Task<Action>> subscribeAsync)
Expand Down
Loading