From 95e67d28c946e5ae037689d3ac773f7d65e348d2 Mon Sep 17 00:00:00 2001 From: Daniel Weber Date: Fri, 18 May 2018 09:28:26 +0200 Subject: [PATCH 1/3] Add a new extension method for IScheduler that allows to schedule an Action and pass a corresponding state object. The ability to pass state will greatly help to reduce the allocations of closures. --- .../Concurrency/Scheduler.Simple.cs | 24 +++++++++++++++++++ .../Tests/Concurrency/SchedulerTest.cs | 18 ++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Simple.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Simple.cs index 4a4b92f934..9edd8fceb2 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Simple.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Simple.cs @@ -25,6 +25,30 @@ public static IDisposable Schedule(this IScheduler scheduler, Action action) return scheduler.Schedule(action, Invoke); } + /// + /// Schedules an action to be executed. + /// + /// Scheduler to execute the action on. + /// Action to execute. + /// A state object to be passed to . + /// The disposable object used to cancel the scheduled action (best effort). + /// or is null. + internal static IDisposable Schedule(this IScheduler scheduler, Action action, TState state) + { + if (scheduler == null) + throw new ArgumentNullException(nameof(scheduler)); + if (action == null) + throw new ArgumentNullException(nameof(action)); + + return scheduler.Schedule( + (action, state), + (_, tuple) => + { + tuple.action(tuple.state); + return Disposable.Empty; + }); + } + /// /// Schedules an action to be executed after the specified relative due time. /// diff --git a/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Concurrency/SchedulerTest.cs b/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Concurrency/SchedulerTest.cs index f84b9eb02d..0d81f7970e 100644 --- a/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Concurrency/SchedulerTest.cs +++ b/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Concurrency/SchedulerTest.cs @@ -34,6 +34,7 @@ public void Scheduler_ArgumentChecks() var ms = new MyScheduler(); ReactiveAssert.Throws(() => Scheduler.Schedule(default(IScheduler), a => { })); ReactiveAssert.Throws(() => Scheduler.Schedule(default(IScheduler), () => { })); + ReactiveAssert.Throws(() => Scheduler.Schedule(default(IScheduler), state => { }, new object())); ReactiveAssert.Throws(() => Scheduler.Schedule(default(IScheduler), 1, (a, s) => { })); ReactiveAssert.Throws(() => Scheduler.Schedule(ms, default(Action))); ReactiveAssert.Throws(() => Scheduler.Schedule(ms, 1, default(Action>))); @@ -53,23 +54,29 @@ public void Scheduler_ArgumentChecks() public void Schedulers_ArgumentChecks() { ReactiveAssert.Throws(() => Scheduler.CurrentThread.Schedule(default(Action))); + ReactiveAssert.Throws(() => Scheduler.CurrentThread.Schedule(default(Action), new object())); ReactiveAssert.Throws(() => Scheduler.CurrentThread.Schedule(TimeSpan.Zero, default(Action))); ReactiveAssert.Throws(() => Scheduler.CurrentThread.Schedule(DateTimeOffset.MaxValue, default(Action))); #if DESKTOPCLR ReactiveAssert.Throws(() => DispatcherScheduler.Instance.Schedule(default(Action))); + ReactiveAssert.Throws(() => DispatcherScheduler.Instance.Schedule(default(Action), new object())); ReactiveAssert.Throws(() => DispatcherScheduler.Instance.Schedule(TimeSpan.Zero, default(Action))); ReactiveAssert.Throws(() => DispatcherScheduler.Instance.Schedule(DateTimeOffset.MaxValue, default(Action))); #endif ReactiveAssert.Throws(() => Scheduler.Immediate.Schedule(default(Action))); + ReactiveAssert.Throws(() => Scheduler.Immediate.Schedule(default(Action), new object())); ReactiveAssert.Throws(() => Scheduler.Immediate.Schedule(TimeSpan.Zero, default(Action))); ReactiveAssert.Throws(() => Scheduler.Immediate.Schedule(DateTimeOffset.MaxValue, default(Action))); ReactiveAssert.Throws(() => NewThreadScheduler.Default.Schedule(default(Action))); + ReactiveAssert.Throws(() => NewThreadScheduler.Default.Schedule(default(Action), new object())); ReactiveAssert.Throws(() => NewThreadScheduler.Default.Schedule(TimeSpan.Zero, default(Action))); ReactiveAssert.Throws(() => NewThreadScheduler.Default.Schedule(DateTimeOffset.MaxValue, default(Action))); ReactiveAssert.Throws(() => TaskPoolScheduler.Default.Schedule(default(Action))); + ReactiveAssert.Throws(() => TaskPoolScheduler.Default.Schedule(default(Action), new object())); ReactiveAssert.Throws(() => TaskPoolScheduler.Default.Schedule(TimeSpan.Zero, default(Action))); ReactiveAssert.Throws(() => TaskPoolScheduler.Default.Schedule(DateTimeOffset.MaxValue, default(Action))); ReactiveAssert.Throws(() => DefaultScheduler.Instance.Schedule(default(Action))); + ReactiveAssert.Throws(() => DefaultScheduler.Instance.Schedule(default(Action), new object())); ReactiveAssert.Throws(() => DefaultScheduler.Instance.Schedule(TimeSpan.Zero, default(Action))); ReactiveAssert.Throws(() => DefaultScheduler.Instance.Schedule(DateTimeOffset.MaxValue, default(Action))); ReactiveAssert.Throws(() => DefaultScheduler.Instance.SchedulePeriodic(42, TimeSpan.FromSeconds(1), default(Func))); @@ -77,11 +84,13 @@ public void Schedulers_ArgumentChecks() #if HAS_WINFORMS var lbl = new Label(); ReactiveAssert.Throws(() => new ControlScheduler(lbl).Schedule(default(Action))); + ReactiveAssert.Throws(() => new ControlScheduler(lbl).Schedule(default(Action), new object())); ReactiveAssert.Throws(() => new ControlScheduler(lbl).Schedule(TimeSpan.Zero, default(Action))); ReactiveAssert.Throws(() => new ControlScheduler(lbl).Schedule(DateTimeOffset.MaxValue, default(Action))); #endif var ctx = new SynchronizationContext(); ReactiveAssert.Throws(() => new SynchronizationContextScheduler(ctx).Schedule(default(Action))); + ReactiveAssert.Throws(() => new SynchronizationContextScheduler(ctx).Schedule(default(Action), new object())); ReactiveAssert.Throws(() => new SynchronizationContextScheduler(ctx).Schedule(TimeSpan.Zero, default(Action))); ReactiveAssert.Throws(() => new SynchronizationContextScheduler(ctx).Schedule(DateTimeOffset.MaxValue, default(Action))); } @@ -104,6 +113,15 @@ public void Scheduler_ScheduleRecursive() Assert.Equal(10, i); } + [Fact] + public void Scheduler_Schedule_With_State() + { + var ms = new MyScheduler(); + var res = false; + Scheduler.Schedule(ms, state => { Assert.Equal("state", state); res = true; }, "state"); + Assert.True(res); + } + [Fact] public void Scheduler_ScheduleWithTimeNonRecursive() { From 3eb12463aedb2ca3064446bfe10845bf4358b229 Mon Sep 17 00:00:00 2001 From: Daniel Weber Date: Thu, 17 May 2018 15:28:50 +0200 Subject: [PATCH 2/3] Add a StatefulAnonymousObservable that allows to pass parameter to the subscription function. This will allow the avoidance of closure allocations and enable more delegate caching where AnonymousObservables are created. --- .../System.Reactive/AnonymousObservable.cs | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/Rx.NET/Source/src/System.Reactive/AnonymousObservable.cs b/Rx.NET/Source/src/System.Reactive/AnonymousObservable.cs index 93ed5e6c6a..54dda2eb96 100644 --- a/Rx.NET/Source/src/System.Reactive/AnonymousObservable.cs +++ b/Rx.NET/Source/src/System.Reactive/AnonymousObservable.cs @@ -36,5 +36,46 @@ protected override IDisposable SubscribeCore(IObserver observer) { return _subscribe(observer) ?? Disposable.Empty; } + + public static StatefulAnonymousObservable CreateStateful(Func, TState, IDisposable> subscribe, TState state) + { + return new StatefulAnonymousObservable(subscribe, state); + } + } + + /// + /// Class to create an instance from a delegate-based implementation of the method. + /// + /// The type of the elements in the sequence. + /// The type of the state that is passed to the subscription function. + public sealed class StatefulAnonymousObservable : ObservableBase + { + private readonly TState _state; + private readonly Func, TState, IDisposable> _subscribe; + + /// + /// Creates an observable sequence object from the specified subscription function. + /// + /// method implementation. + /// The state to pass to the subscription function. + /// is null. + public StatefulAnonymousObservable(Func, TState, IDisposable> subscribe, TState state) + { + if (subscribe == null) + throw new ArgumentNullException(nameof(subscribe)); + + _state = state; + _subscribe = subscribe; + } + + /// + /// Calls the subscription function that was supplied to the constructor. + /// + /// Observer to send notifications to. + /// Disposable object representing an observer's subscription to the observable sequence. + protected override IDisposable SubscribeCore(IObserver observer) + { + return _subscribe(observer, this._state) ?? Disposable.Empty; + } } } From 3415fb251b97317d9eb3090e85cd45db3d22ded7 Mon Sep 17 00:00:00 2001 From: Daniel Weber Date: Fri, 18 May 2018 09:29:35 +0200 Subject: [PATCH 3/3] Use AnonymousObservable.CreateStateful to pass state to the subscribe function at various places, resulting in some good savings on closure allocations and delegate caching everywhere! --- .../Concurrency/Synchronization.cs | 49 +-- .../Linq/QueryLanguage.Async.cs | 36 +- .../Linq/QueryLanguage.Creation.cs | 102 +++--- .../Linq/QueryLanguage.Joins.cs | 82 ++--- .../System.Reactive/Linq/QueryLanguageEx.cs | 314 +++++++++--------- .../src/System.Reactive/Notification.cs | 22 +- 6 files changed, 319 insertions(+), 286 deletions(-) diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.cs index f689e8705e..05af8da551 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.cs @@ -35,19 +35,24 @@ public static IObservable SubscribeOn(IObservable sou if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); - return new AnonymousObservable(observer => - { - var m = new SingleAssignmentDisposable(); - var d = new SerialDisposable(); - d.Disposable = m; - - m.Disposable = scheduler.Schedule(() => + return AnonymousObservable.CreateStateful( + (observer, closureTuple1) => { - d.Disposable = new ScheduledDisposable(scheduler, source.SubscribeSafe(observer)); - }); + var m = new SingleAssignmentDisposable(); + var d = new SerialDisposable(); + + d.Disposable = m; + + m.Disposable = closureTuple1.scheduler.Schedule( + closureTuple2 => + { + closureTuple2.d.Disposable = new ScheduledDisposable(closureTuple2.scheduler, closureTuple2.source.SubscribeSafe(closureTuple2.observer)); + }, + (closureTuple1.scheduler, closureTuple1.source, observer, d)); - return d; - }); + return d; + }, + (source, scheduler)); } /// @@ -69,18 +74,20 @@ public static IObservable SubscribeOn(IObservable sou if (context == null) throw new ArgumentNullException(nameof(context)); - return new AnonymousObservable(observer => - { - var subscription = new SingleAssignmentDisposable(); - context.PostWithStartComplete(() => + return AnonymousObservable.CreateStateful( + (observer, closureTuple) => { - if (!subscription.IsDisposed) + var subscription = new SingleAssignmentDisposable(); + closureTuple.context.PostWithStartComplete(() => { - subscription.Disposable = new ContextDisposable(context, source.SubscribeSafe(observer)); - } - }); - return subscription; - }); + if (!subscription.IsDisposed) + { + subscription.Disposable = new ContextDisposable(closureTuple.context, closureTuple.source.SubscribeSafe(observer)); + } + }); + return subscription; + }, + (source, context)); } #endregion diff --git a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Async.cs b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Async.cs index a61810991f..3a3b0ee9a5 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Async.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Async.cs @@ -725,14 +725,16 @@ private IObservable StartAsyncImpl(Func(observer => - { - // - // [OK] Use of unsafe Subscribe: result is an AsyncSubject. - // - var subscription = result.Subscribe/*Unsafe*/(observer); - return StableCompositeDisposable.Create(cancellable, subscription); - }); + return AnonymousObservable.CreateStateful( + (observer, closureTuple) => + { + // + // [OK] Use of unsafe Subscribe: result is an AsyncSubject. + // + var subscription = closureTuple.result.Subscribe/*Unsafe*/(observer); + return StableCompositeDisposable.Create(closureTuple.cancellable, subscription); + }, + (result, cancellable)); } #endregion @@ -816,14 +818,16 @@ private IObservable StartAsyncImpl(Func actionAsy result = task.ToObservable(); } - return new AnonymousObservable(observer => - { - // - // [OK] Use of unsafe Subscribe: result is an AsyncSubject. - // - var subscription = result.Subscribe/*Unsafe*/(observer); - return StableCompositeDisposable.Create(cancellable, subscription); - }); + return AnonymousObservable.CreateStateful( + (observer, closureTuple) => + { + // + // [OK] Use of unsafe Subscribe: result is an AsyncSubject. + // + var subscription = closureTuple.result.Subscribe/*Unsafe*/(observer); + return StableCompositeDisposable.Create(closureTuple.cancellable, subscription); + }, + (cancellable, result)); } #endregion diff --git a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs index bd2aa3eb66..18f7b20b68 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs @@ -25,11 +25,13 @@ public virtual IObservable Create(Func, IDi public virtual IObservable Create(Func, Action> subscribe) { - return new AnonymousObservable(o => - { - var a = subscribe(o); - return a != null ? Disposable.Create(a) : Disposable.Empty; - }); + return AnonymousObservable.CreateStateful( + (o, closureSubscribe) => + { + var a = closureSubscribe(o); + return a != null ? Disposable.Create(a) : Disposable.Empty; + }, + subscribe); } #endregion @@ -38,16 +40,18 @@ public virtual IObservable Create(Func, Act public virtual IObservable Create(Func, CancellationToken, Task> subscribeAsync) { - return new AnonymousObservable(observer => - { - var cancellable = new CancellationDisposable(); - - var taskObservable = subscribeAsync(observer, cancellable.Token).ToObservable(); - var taskCompletionObserver = new AnonymousObserver(Stubs.Ignore, observer.OnError, observer.OnCompleted); - var subscription = taskObservable.Subscribe(taskCompletionObserver); - - return StableCompositeDisposable.Create(cancellable, subscription); - }); + return AnonymousObservable.CreateStateful( + (observer, closureSubscribeAsync) => + { + var cancellable = new CancellationDisposable(); + + var taskObservable = closureSubscribeAsync(observer, cancellable.Token).ToObservable(); + var taskCompletionObserver = new AnonymousObserver(Stubs.Ignore, observer.OnError, observer.OnCompleted); + var subscription = taskObservable.Subscribe(taskCompletionObserver); + + return StableCompositeDisposable.Create(cancellable, subscription); + }, + subscribeAsync); } public virtual IObservable Create(Func, Task> subscribeAsync) @@ -57,22 +61,24 @@ public virtual IObservable Create(Func, Tas public virtual IObservable Create(Func, CancellationToken, Task> subscribeAsync) { - return new AnonymousObservable(observer => - { - var subscription = new SingleAssignmentDisposable(); - var cancellable = new CancellationDisposable(); - - var taskObservable = subscribeAsync(observer, cancellable.Token).ToObservable(); - var taskCompletionObserver = new AnonymousObserver(d => subscription.Disposable = d ?? Disposable.Empty, observer.OnError, Stubs.Nop); - - // - // We don't cancel the subscription below *ever* and want to make sure the returned resource gets disposed eventually. - // Notice because we're using the AnonymousObservable type, we get auto-detach behavior for free. - // - taskObservable.Subscribe(taskCompletionObserver); - - return StableCompositeDisposable.Create(cancellable, subscription); - }); + return AnonymousObservable.CreateStateful( + (observer, closureSubscribeAsync) => + { + var subscription = new SingleAssignmentDisposable(); + var cancellable = new CancellationDisposable(); + + var taskObservable = closureSubscribeAsync(observer, cancellable.Token).ToObservable(); + var taskCompletionObserver = new AnonymousObserver(d => subscription.Disposable = d ?? Disposable.Empty, observer.OnError, Stubs.Nop); + + // + // We don't cancel the subscription below *ever* and want to make sure the returned resource gets disposed eventually. + // Notice because we're using the AnonymousObservable type, we get auto-detach behavior for free. + // + taskObservable.Subscribe(taskCompletionObserver); + + return StableCompositeDisposable.Create(cancellable, subscription); + }, + subscribeAsync); } public virtual IObservable Create(Func, Task> subscribeAsync) @@ -82,22 +88,24 @@ public virtual IObservable Create(Func, Tas public virtual IObservable Create(Func, CancellationToken, Task> subscribeAsync) { - return new AnonymousObservable(observer => - { - var subscription = new SingleAssignmentDisposable(); - var cancellable = new CancellationDisposable(); - - var taskObservable = subscribeAsync(observer, cancellable.Token).ToObservable(); - var taskCompletionObserver = new AnonymousObserver(a => subscription.Disposable = a != null ? Disposable.Create(a) : Disposable.Empty, observer.OnError, Stubs.Nop); - - // - // We don't cancel the subscription below *ever* and want to make sure the returned resource eventually gets disposed. - // Notice because we're using the AnonymousObservable type, we get auto-detach behavior for free. - // - taskObservable.Subscribe(taskCompletionObserver); - - return StableCompositeDisposable.Create(cancellable, subscription); - }); + return AnonymousObservable.CreateStateful( + (observer, closureSubscribeAsync) => + { + var subscription = new SingleAssignmentDisposable(); + var cancellable = new CancellationDisposable(); + + var taskObservable = closureSubscribeAsync(observer, cancellable.Token).ToObservable(); + var taskCompletionObserver = new AnonymousObserver(a => subscription.Disposable = a != null ? Disposable.Create(a) : Disposable.Empty, observer.OnError, Stubs.Nop); + + // + // We don't cancel the subscription below *ever* and want to make sure the returned resource eventually gets disposed. + // Notice because we're using the AnonymousObservable type, we get auto-detach behavior for free. + // + taskObservable.Subscribe(taskCompletionObserver); + + return StableCompositeDisposable.Create(cancellable, subscription); + }, + subscribeAsync); } public virtual IObservable Create(Func, Task> subscribeAsync) diff --git a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Joins.cs b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Joins.cs index 4dc4402c4d..057894cc68 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Joins.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Joins.cs @@ -37,48 +37,50 @@ public virtual IObservable When(params Plan[] plans) public virtual IObservable When(IEnumerable> plans) { - return new AnonymousObservable(observer => - { - var externalSubscriptions = new Dictionary(); - var gate = new object(); - var activePlans = new List(); - var outObserver = Observer.Create(observer.OnNext, - exception => - { - foreach (var po in externalSubscriptions.Values) - { - po.Dispose(); - } - observer.OnError(exception); - }, - observer.OnCompleted); - try + return AnonymousObservable.CreateStateful( + (observer, closureTuple) => { - foreach (var plan in plans) - activePlans.Add(plan.Activate(externalSubscriptions, outObserver, - activePlan => - { - activePlans.Remove(activePlan); - if (activePlans.Count == 0) - outObserver.OnCompleted(); - })); - } - catch (Exception e) - { - // - // [OK] Use of unsafe Subscribe: we're calling into a known producer implementation. - // - return Throw(e).Subscribe/*Unsafe*/(observer); - } + var externalSubscriptions = new Dictionary(); + var gate = new object(); + var activePlans = new List(); + var outObserver = Observer.Create(observer.OnNext, + exception => + { + foreach (var po in externalSubscriptions.Values) + { + po.Dispose(); + } + observer.OnError(exception); + }, + observer.OnCompleted); + try + { + foreach (var plan in closureTuple.plans) + activePlans.Add(plan.Activate(externalSubscriptions, outObserver, + activePlan => + { + activePlans.Remove(activePlan); + if (activePlans.Count == 0) + outObserver.OnCompleted(); + })); + } + catch (Exception e) + { + // + // [OK] Use of unsafe Subscribe: we're calling into a known producer implementation. + // + return closureTuple.queryLanguage.Throw(e).Subscribe/*Unsafe*/(observer); + } - var group = new CompositeDisposable(externalSubscriptions.Values.Count); - foreach (var joinObserver in externalSubscriptions.Values) - { - joinObserver.Subscribe(gate); - group.Add(joinObserver); - } - return group; - }); + var group = new CompositeDisposable(externalSubscriptions.Values.Count); + foreach (var joinObserver in externalSubscriptions.Values) + { + joinObserver.Subscribe(gate); + group.Add(joinObserver); + } + return group; + }, + (queryLanguage: this, plans)); } #endregion diff --git a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguageEx.cs b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguageEx.cs index 81d7f7f660..cf735549c8 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguageEx.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguageEx.cs @@ -16,14 +16,16 @@ internal class QueryLanguageEx : IQueryLanguageEx public virtual IObservable Create(Func, IEnumerable>> iteratorMethod) { - return new AnonymousObservable(observer => - iteratorMethod(observer).Concat().Subscribe(_ => { }, observer.OnError, observer.OnCompleted)); + return AnonymousObservable.CreateStateful( + (observer, closureIteratorMethod) => closureIteratorMethod(observer).Concat().Subscribe(_ => { }, observer.OnError, observer.OnCompleted), + iteratorMethod); } public virtual IObservable Create(Func>> iteratorMethod) { - return new AnonymousObservable(observer => - iteratorMethod().Concat().Subscribe(_ => { }, observer.OnError, observer.OnCompleted)); + return AnonymousObservable.CreateStateful( + (observer, closureIteratorMethod) => closureIteratorMethod().Concat().Subscribe(_ => { }, observer.OnError, observer.OnCompleted), + iteratorMethod); } #endregion @@ -32,108 +34,110 @@ public virtual IObservable Create(Func>> i public virtual IObservable Expand(IObservable source, Func> selector, IScheduler scheduler) { - return new AnonymousObservable(observer => - { - var outGate = new object(); - var q = new Queue>(); - var m = new SerialDisposable(); - var d = new CompositeDisposable { m }; - var activeCount = 0; - var isAcquired = false; - - var ensureActive = default(Action); - - ensureActive = () => + return AnonymousObservable.CreateStateful( + (observer, tuple) => { - var isOwner = false; + var outGate = new object(); + var q = new Queue>(); + var m = new SerialDisposable(); + var d = new CompositeDisposable { m }; + var activeCount = 0; + var isAcquired = false; - lock (q) + var ensureActive = default(Action); + + ensureActive = () => { - if (q.Count > 0) + var isOwner = false; + + lock (q) { - isOwner = !isAcquired; - isAcquired = true; + if (q.Count > 0) + { + isOwner = !isAcquired; + isAcquired = true; + } } - } - if (isOwner) - { - m.Disposable = scheduler.Schedule(self => + if (isOwner) { - var work = default(IObservable); - - lock (q) + m.Disposable = tuple.scheduler.Schedule(self => { - if (q.Count > 0) - work = q.Dequeue(); - else - { - isAcquired = false; - return; - } - } + var work = default(IObservable); - var m1 = new SingleAssignmentDisposable(); - d.Add(m1); - m1.Disposable = work.Subscribe( - x => + lock (q) { - lock (outGate) - observer.OnNext(x); - - var result = default(IObservable); - try + if (q.Count > 0) + work = q.Dequeue(); + else { - result = selector(x); + isAcquired = false; + return; } - catch (Exception exception) + } + + var m1 = new SingleAssignmentDisposable(); + d.Add(m1); + m1.Disposable = work.Subscribe( + x => { lock (outGate) - observer.OnError(exception); - } + observer.OnNext(x); - lock (q) - { - q.Enqueue(result); - activeCount++; - } + var result = default(IObservable); + try + { + result = tuple.selector(x); + } + catch (Exception exception) + { + lock (outGate) + observer.OnError(exception); + } - ensureActive(); - }, - exception => - { - lock (outGate) - observer.OnError(exception); - }, - () => - { - d.Remove(m1); + lock (q) + { + q.Enqueue(result); + activeCount++; + } - var done = false; - lock (q) + ensureActive(); + }, + exception => { - activeCount--; - if (activeCount == 0) - done = true; - } - if (done) lock (outGate) - observer.OnCompleted(); - }); - self(); - }); - } - }; + observer.OnError(exception); + }, + () => + { + d.Remove(m1); - lock (q) - { - q.Enqueue(source); - activeCount++; - } - ensureActive(); + var done = false; + lock (q) + { + activeCount--; + if (activeCount == 0) + done = true; + } + if (done) + lock (outGate) + observer.OnCompleted(); + }); + self(); + }); + } + }; - return d; - }); + lock (q) + { + q.Enqueue(tuple.source); + activeCount++; + } + ensureActive(); + + return d; + }, + (source, selector, scheduler)); } public virtual IObservable Expand(IObservable source, Func> selector) @@ -245,80 +249,82 @@ public virtual IObservable ForkJoin(params IObservable ForkJoin(IEnumerable> sources) { - return new AnonymousObservable(subscriber => - { - var allSources = sources.ToArray(); - var count = allSources.Length; - - if (count == 0) + return AnonymousObservable.CreateStateful( + (subscriber, closureSources) => { - subscriber.OnCompleted(); - return Disposable.Empty; - } + var allSources = closureSources.ToArray(); + var count = allSources.Length; - var group = new CompositeDisposable(allSources.Length); - var gate = new object(); + if (count == 0) + { + subscriber.OnCompleted(); + return Disposable.Empty; + } - var finished = false; - var hasResults = new bool[count]; - var hasCompleted = new bool[count]; - var results = new List(count); + var group = new CompositeDisposable(allSources.Length); + var gate = new object(); - lock (gate) - { - for (var index = 0; index < count; index++) + var finished = false; + var hasResults = new bool[count]; + var hasCompleted = new bool[count]; + var results = new List(count); + + lock (gate) { - var currentIndex = index; - var source = allSources[index]; - results.Add(default(TSource)); - group.Add(source.Subscribe( - value => - { - lock (gate) + for (var index = 0; index < count; index++) + { + var currentIndex = index; + var source = allSources[index]; + results.Add(default(TSource)); + group.Add(source.Subscribe( + value => { - if (!finished) + lock (gate) { - hasResults[currentIndex] = true; - results[currentIndex] = value; + if (!finished) + { + hasResults[currentIndex] = true; + results[currentIndex] = value; + } } - } - }, - error => - { - lock (gate) + }, + error => { - finished = true; - subscriber.OnError(error); - group.Dispose(); - } - }, - () => - { - lock (gate) + lock (gate) + { + finished = true; + subscriber.OnError(error); + group.Dispose(); + } + }, + () => { - if (!finished) + lock (gate) { - if (!hasResults[currentIndex]) - { - subscriber.OnCompleted(); - return; - } - hasCompleted[currentIndex] = true; - foreach (var completed in hasCompleted) + if (!finished) { - if (!completed) + if (!hasResults[currentIndex]) + { + subscriber.OnCompleted(); return; + } + hasCompleted[currentIndex] = true; + foreach (var completed in hasCompleted) + { + if (!completed) + return; + } + finished = true; + subscriber.OnNext(results.ToArray()); + subscriber.OnCompleted(); } - finished = true; - subscriber.OnNext(results.ToArray()); - subscriber.OnCompleted(); } - } - })); + })); + } } - } - return group; - }); + return group; + }, + sources); } #endregion @@ -427,19 +433,21 @@ public virtual ListObservable ToListObservable(IObservable Combine(IObservable leftSource, IObservable rightSource, Func, IDisposable, IDisposable, IObserver, Notification>>> combinerSelector) { - return new AnonymousObservable(observer => - { - var leftSubscription = new SingleAssignmentDisposable(); - var rightSubscription = new SingleAssignmentDisposable(); + return AnonymousObservable.CreateStateful( + (observer, tuple) => + { + var leftSubscription = new SingleAssignmentDisposable(); + var rightSubscription = new SingleAssignmentDisposable(); - var combiner = combinerSelector(observer, leftSubscription, rightSubscription); - var gate = new object(); + var combiner = tuple.combinerSelector(observer, leftSubscription, rightSubscription); + var gate = new object(); - leftSubscription.Disposable = leftSource.Materialize().Select(x => Either, Notification>.CreateLeft(x)).Synchronize(gate).Subscribe(combiner); - rightSubscription.Disposable = rightSource.Materialize().Select(x => Either, Notification>.CreateRight(x)).Synchronize(gate).Subscribe(combiner); + leftSubscription.Disposable = tuple.leftSource.Materialize().Select(x => Either, Notification>.CreateLeft(x)).Synchronize(gate).Subscribe(combiner); + rightSubscription.Disposable = tuple.rightSource.Materialize().Select(x => Either, Notification>.CreateRight(x)).Synchronize(gate).Subscribe(combiner); - return StableCompositeDisposable.Create(leftSubscription, rightSubscription); - }); + return StableCompositeDisposable.Create(leftSubscription, rightSubscription); + }, + (leftSource, rightSource, combinerSelector)); } #endregion diff --git a/Rx.NET/Source/src/System.Reactive/Notification.cs b/Rx.NET/Source/src/System.Reactive/Notification.cs index a4cb94106f..e6c0ab2177 100644 --- a/Rx.NET/Source/src/System.Reactive/Notification.cs +++ b/Rx.NET/Source/src/System.Reactive/Notification.cs @@ -555,15 +555,19 @@ public IObservable ToObservable(IScheduler scheduler) if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); - return new AnonymousObservable(observer => scheduler.Schedule(() => - { - Accept(observer); - - if (Kind == NotificationKind.OnNext) - { - observer.OnCompleted(); - } - })); + return AnonymousObservable.CreateStateful( + (observer, closureTuple1) => closureTuple1.scheduler.Schedule( + closureTuple2 => + { + closureTuple2.@this.Accept(closureTuple2.observer); + + if (closureTuple2.@this.Kind == NotificationKind.OnNext) + { + closureTuple2.observer.OnCompleted(); + } + }, + (closureTuple1.@this, closureTuple1.scheduler, observer)), + (@this: this, scheduler)); } }