diff --git a/async-enumerable-dotnet-test/BufferBoundaryTest.cs b/async-enumerable-dotnet-test/BufferBoundaryTest.cs new file mode 100644 index 0000000..0671767 --- /dev/null +++ b/async-enumerable-dotnet-test/BufferBoundaryTest.cs @@ -0,0 +1,122 @@ +// Copyright (c) David Karnok & Contributors. +// Licensed under the Apache 2.0 License. +// See LICENSE file in the project root for full license information. + +using Xunit; +using async_enumerable_dotnet; +using System; +using System.Collections.Generic; + +namespace async_enumerable_dotnet_test +{ + public class BufferBoundaryTest + { + [Fact] + public async void Size() + { + await AsyncEnumerable.Range(1, 5) + .Buffer(AsyncEnumerable.Timer(TimeSpan.FromMilliseconds(200)), 2) + .AssertResult( + new List(new [] { 1, 2 }), + new List(new[] { 3, 4 }), + new List(new[] { 5 }) + ); + } + + [Fact] + public async void Size_Collection() + { + await AsyncEnumerable.Range(1, 5) + .Buffer(AsyncEnumerable.Timer(TimeSpan.FromMilliseconds(200)), () => new HashSet(), 2) + .AssertResult( + new HashSet(new [] { 1, 2 }), + new HashSet(new[] { 3, 4 }), + new HashSet(new[] { 5 }) + ); + } + + [Fact] + public async void Time() + { + var t = 100L; + if (Environment.GetEnvironmentVariable("CI") != null) + { + t = 1000L; + } + + await + TestHelper.TimeSequence( + t, t, + 3 * t, 3 * t, 3 * t, + 7 * t, 7 * t + ) + .Buffer(AsyncEnumerable.Interval(TimeSpan.FromMilliseconds(2 * t))) + .AssertResult( + new List(new[] { t, t }), + new List(new[] { 3 * t, 3 * t, 3 * t }), + new List(), + new List(new[] { 7 * t, 7 * t }) + ); + + } + + [Fact] + public async void Error_Main() + { + await AsyncEnumerable.Error(new InvalidOperationException()) + .Buffer(AsyncEnumerable.Timer(TimeSpan.FromMilliseconds(200)), 2) + .AssertFailure(typeof(InvalidOperationException)); + } + + [Fact] + public async void Error_Other() + { + await AsyncEnumerable.Timer(TimeSpan.FromMilliseconds(200)) + .Buffer(AsyncEnumerable.Error(new InvalidOperationException()), 2) + .AssertFailure(typeof(InvalidOperationException)); + } + + [Fact] + public async void Empty_Main() + { + await AsyncEnumerable.Empty() + .Buffer(AsyncEnumerable.Timer(TimeSpan.FromMilliseconds(200)), 2) + .AssertResult(); + } + + [Fact] + public async void Empty_Other() + { + await AsyncEnumerable.Timer(TimeSpan.FromMilliseconds(200)) + .Buffer(AsyncEnumerable.Empty(), 2) + .AssertResult(); + } + + [Fact] + public async void Time_And_Size() + { + var t = 100L; + if (Environment.GetEnvironmentVariable("CI") != null) + { + t = 1000L; + } + + await + TestHelper.TimeSequence( + t, t, + 3 * t, 3 * t, 3 * t, + 7 * t, 7 * t, 7 * t, 7 * t + ) + .Buffer(AsyncEnumerable.Interval(TimeSpan.FromMilliseconds(2 * t)), 3) + .AssertResult( + new List(new[] { t, t }), + new List(new[] { 3 * t, 3 * t, 3 * t }), + new List(), + new List(), + new List(new[] { 7 * t, 7 * t, 7 * t }), + new List(new[] { 7 * t }) + ); + + } + } +} diff --git a/async-enumerable-dotnet-test/CombineLatestTest.cs b/async-enumerable-dotnet-test/CombineLatestTest.cs index 7d80681..563140c 100644 --- a/async-enumerable-dotnet-test/CombineLatestTest.cs +++ b/async-enumerable-dotnet-test/CombineLatestTest.cs @@ -81,7 +81,7 @@ public async void Second_Many() public async void Error() { await AsyncEnumerable.CombineLatest(v => v.Sum(), - AsyncEnumerable.Just(1), + AsyncEnumerable.Just(1), AsyncEnumerable.Just(2).ConcatWith( AsyncEnumerable.Error(new InvalidOperationException()) ) diff --git a/async-enumerable-dotnet-test/ConcatMapTest.cs b/async-enumerable-dotnet-test/ConcatMapTest.cs index ef29acb..6f4c9ad 100644 --- a/async-enumerable-dotnet-test/ConcatMapTest.cs +++ b/async-enumerable-dotnet-test/ConcatMapTest.cs @@ -28,14 +28,7 @@ await AsyncEnumerable.Range(1, 5) public async void Async_Filter() { await AsyncEnumerable.Range(1, 10) - .ConcatMap(v => - { - if (v % 2 == 0) - { - return AsyncEnumerable.Just(v); - } - return AsyncEnumerable.Empty(); - }) + .ConcatMap(v => v % 2 == 0 ? AsyncEnumerable.Just(v) : AsyncEnumerable.Empty()) .AssertResult( 2, 4, 6, 8, 10 ); @@ -71,14 +64,7 @@ await AsyncEnumerable.Range(1, 5) public async void Enumerable_Filter() { await AsyncEnumerable.Range(1, 10) - .ConcatMap(v => - { - if (v % 2 == 0) - { - return new[] { v }; - } - return Enumerable.Empty(); - }) + .ConcatMap(v => v % 2 == 0 ? new[] { v } : Enumerable.Empty()) .AssertResult( 2, 4, 6, 8, 10 ); diff --git a/async-enumerable-dotnet-test/CreateTest.cs b/async-enumerable-dotnet-test/CreateTest.cs index d0ee040..81a2742 100644 --- a/async-enumerable-dotnet-test/CreateTest.cs +++ b/async-enumerable-dotnet-test/CreateTest.cs @@ -39,7 +39,7 @@ public async ValueTask Range() [Fact] public async void Range_Loop() { - for (int j = 0; j < 1000; j++) + for (var j = 0; j < 1000; j++) { await Range(); } @@ -77,7 +77,7 @@ public async ValueTask Take() [Fact] public async void Take_Loop() { - for (int j = 0; j < 1000; j++) + for (var j = 0; j < 1000; j++) { await Take(); } diff --git a/async-enumerable-dotnet-test/SlimResumeTest.cs b/async-enumerable-dotnet-test/SlimResumeTest.cs new file mode 100644 index 0000000..33a5291 --- /dev/null +++ b/async-enumerable-dotnet-test/SlimResumeTest.cs @@ -0,0 +1,80 @@ +// Copyright (c) David Karnok & Contributors. +// Licensed under the Apache 2.0 License. +// See LICENSE file in the project root for full license information. + +using Xunit; +using async_enumerable_dotnet.impl; +using System.Threading.Tasks; +using System.Threading; + +namespace async_enumerable_dotnet_test +{ + public class SlimResumeTest + { + [Fact] + public async void ReadyUpfront() + { + var rsm = new SlimResume(); + rsm.Signal(); + + await rsm; + } + + [Fact] + public async void Completed() + { + var rsm = SlimResume.Completed; + + await rsm; + } + + [Fact] + public async void SignalLater() + { + var rsm = new SlimResume(); + + var t = Task.Delay(100) + .ContinueWith(t0 => rsm.Signal()); + + await rsm; + + await t; + } + + + [Fact] + public async void Race() + { + for (var i = 0; i < 10_000; i++) + { + var rsm = new SlimResume(); + + var wip = 2; + + var t1 = Task.Factory.StartNew(() => + { + if (Interlocked.Decrement(ref wip) != 0) + { + while (Volatile.Read(ref wip) != 0) { } + } + + rsm.Signal(); + }, TaskCreationOptions.LongRunning); + + var t2 = Task.Factory.StartNew(async () => + { + if (Interlocked.Decrement(ref wip) != 0) + { + while (Volatile.Read(ref wip) != 0) { } + } + + await rsm; + }, TaskCreationOptions.LongRunning); + + await t1; + + await t2; + } + } + } +} diff --git a/async-enumerable-dotnet-test/SwitchMapTest.cs b/async-enumerable-dotnet-test/SwitchMapTest.cs index 246630d..6a3603c 100644 --- a/async-enumerable-dotnet-test/SwitchMapTest.cs +++ b/async-enumerable-dotnet-test/SwitchMapTest.cs @@ -79,5 +79,13 @@ await AsyncEnumerable.Just(2) .SwitchMap(v => throw new InvalidOperationException()) .AssertFailure(typeof(InvalidOperationException)); } + + [Fact] + public async void Nested() + { + await AsyncEnumerable.Just(AsyncEnumerable.Range(2, 5)) + .Switch() + .AssertResult(2, 3, 4, 5, 6); + } } } diff --git a/async-enumerable-dotnet-test/TestHelper.cs b/async-enumerable-dotnet-test/TestHelper.cs index bbcc2d2..f0df53f 100644 --- a/async-enumerable-dotnet-test/TestHelper.cs +++ b/async-enumerable-dotnet-test/TestHelper.cs @@ -125,5 +125,17 @@ public static async ValueTask AssertFailure(this IAsyncEnumerator source, await source.DisposeAsync(); } } + + /// + /// Emits the given numbers after the same milliseconds from + /// the start. + /// + /// The params array of timestamps. + /// The new IAsyncEnumerable sequence. + internal static IAsyncEnumerable TimeSequence(params long[] timestamps) + { + return AsyncEnumerable.FromArray(timestamps) + .FlatMap(v => AsyncEnumerable.Timer(TimeSpan.FromMilliseconds(v)).Map(w => v)); + } } } diff --git a/async-enumerable-dotnet/AsyncEnumerable.cs b/async-enumerable-dotnet/AsyncEnumerable.cs index fac7fca..c061a0b 100644 --- a/async-enumerable-dotnet/AsyncEnumerable.cs +++ b/async-enumerable-dotnet/AsyncEnumerable.cs @@ -1567,15 +1567,54 @@ public static IAsyncEnumerable Merge(this IAsyncEnumerable(); - } - if (sources.Length == 1) + switch (sources.Length) { - return sources[0].Map(v => combiner(new[] { v })); + case 0: + return Empty(); + case 1: + return sources[0].Map(v => combiner(new[] { v })); + default: + return new CombineLatest(sources, combiner); } - return new CombineLatest(sources, combiner); } + + /// + /// Buffers items from the source into Lists until the boundary + /// sequence signals an item (or the maxSize is reached) upon which + /// the current buffer is emitted and a new buffer is started. + /// + /// The element type of the source. + /// The element type of the boundary sequence. + /// The source to buffer. + /// The sequence indicating the boundaries of the buffers. + /// The maximum number of items to buffer. + /// The new IAsyncEnumerable sequence. + public static IAsyncEnumerable> Buffer(this IAsyncEnumerable source, IAsyncEnumerable boundary, int maxSize = int.MaxValue) + { + return Buffer>(source, boundary, () => new List(), maxSize); + } + + /// + /// Buffers items from the source into custom collections until the boundary + /// sequence signals an item (or the maxSize is reached) upon which + /// the current collections is emitted and a new collection is started. + /// + /// The element type of the source. + /// The element type of the boundary sequence. + /// The resulting collection type. + /// The source to buffer. + /// The sequence indicating the boundaries of the buffers. + /// The custom collection supplier. + /// The maximum number of items to buffer. + /// The new IAsyncEnumerable sequence. + public static IAsyncEnumerable Buffer(this IAsyncEnumerable source, IAsyncEnumerable boundary, Func collectionSupplier, int maxSize = int.MaxValue) where TCollection : ICollection + { + RequireNonNull(source, nameof(source)); + RequireNonNull(boundary, nameof(boundary)); + RequireNonNull(collectionSupplier, nameof(collectionSupplier)); + RequirePositive(maxSize, nameof(maxSize)); + return new BufferBoundaryExact(source, boundary, collectionSupplier, maxSize); + } + } } diff --git a/async-enumerable-dotnet/impl/BufferBoundary.cs b/async-enumerable-dotnet/impl/BufferBoundary.cs new file mode 100644 index 0000000..87289c0 --- /dev/null +++ b/async-enumerable-dotnet/impl/BufferBoundary.cs @@ -0,0 +1,294 @@ +// Copyright (c) David Karnok & Contributors. +// Licensed under the Apache 2.0 License. +// See LICENSE file in the project root for full license information. + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +namespace async_enumerable_dotnet.impl +{ + internal sealed class BufferBoundaryExact : IAsyncEnumerable where TCollection : ICollection + { + + private readonly IAsyncEnumerable _source; + + private readonly Func _collectionSupplier; + + private readonly int _maxSize; + + private readonly IAsyncEnumerable _boundary; + + public BufferBoundaryExact(IAsyncEnumerable source, IAsyncEnumerable boundary, + Func collectionSupplier, int maxSize) + { + _source = source; + _boundary = boundary; + _collectionSupplier = collectionSupplier; + _maxSize = maxSize; + } + + public IAsyncEnumerator GetAsyncEnumerator() + { + var en = new BufferBoundaryExactEnumerator(_source.GetAsyncEnumerator(), _boundary.GetAsyncEnumerator(), _collectionSupplier, _maxSize); + + en.MoveNextOther(); + en.MoveNextSource(); + return en; + } + + private sealed class BufferBoundaryExactEnumerator : IAsyncEnumerator + { + + private readonly IAsyncEnumerator _source; + + private readonly IAsyncEnumerator _other; + + private readonly Func _collectionSupplier; + + private readonly int _maxSize; + + private int _sourceWip; + private int _sourceDisposeWip; + + private int _otherWip; + private int _otherDisposeWip; + + private bool _done; + private Exception _error; + + private readonly ConcurrentQueue _queue; + private TaskCompletionSource _resume; + + private int _disposeWip; + private Exception _disposeError; + private readonly TaskCompletionSource _disposeTask; + + private TCollection _buffer; + private int _size; + + public TCollection Current { get; private set; } + + public BufferBoundaryExactEnumerator(IAsyncEnumerator source, IAsyncEnumerator other, Func collectionSupplier, int maxSize) + { + _source = source; + _other = other; + _collectionSupplier = collectionSupplier; + _maxSize = maxSize; + _queue = new ConcurrentQueue(); + _disposeTask = new TaskCompletionSource(); + Volatile.Write(ref _disposeWip, 2); + } + + public async ValueTask MoveNextAsync() + { + if (_done) + { + var ex = ExceptionHelper.Terminate(ref _error); + if (ex != null) + { + throw ex; + } + return false; + } + for (; ;) + { + var success = _queue.TryDequeue(out var entry); + + if (success) + { + var b = _buffer; + + if (entry.Done) + { + _done = true; + + if (b != null) + { + Current = b; + _buffer = default; + return true; + } + + var ex = ExceptionHelper.Terminate(ref _error); + if (ex != null) + { + throw ex; + } + return false; + } + if (entry.Boundary) + { + if (b == null) + { + Current = _collectionSupplier(); + } + else + { + Current = b; + _buffer = default; + } + _size = 0; + MoveNextOther(); + return true; + } + + if (b == null) + { + b = _collectionSupplier(); + _buffer = b; + } + b.Add(entry.Value); + if (++_size == _maxSize) + { + Current = b; + _buffer = default; + _size = 0; + MoveNextSource(); + return true; + } + + MoveNextSource(); + continue; + } + + await ResumeHelper.Await(ref _resume); + ResumeHelper.Clear(ref _resume); + } + } + + public async ValueTask DisposeAsync() + { + if (Interlocked.Increment(ref _sourceDisposeWip) == 1) + { + Dispose(_source); + } + if (Interlocked.Increment(ref _otherDisposeWip) == 1) + { + Dispose(_other); + } + await _disposeTask.Task; + + Current = default; + while (_queue.TryDequeue(out _)) { } + } + + internal void MoveNextSource() + { + QueueDrainHelper.MoveNext(_source, ref _sourceWip, ref _sourceDisposeWip, HandleNextSourceAction, this); + } + + private static readonly Action, object> HandleNextSourceAction = (t, state) => ((BufferBoundaryExactEnumerator)state).HandleNextSource(t); + + private bool TryDisposeSource() + { + if (Interlocked.Decrement(ref _sourceDisposeWip) != 0) + { + Dispose(_source); + return false; + } + return true; + } + + private void HandleNextSource(Task t) + { + if (t.IsFaulted) + { + ExceptionHelper.AddException(ref _error, ExceptionHelper.Extract(t.Exception)); + _queue.Enqueue(new Entry + { + Done = true + }); + } + else if (t.Result) + { + _queue.Enqueue(new Entry + { + Value = _source.Current + }); + } + else + { + _queue.Enqueue(new Entry + { + Done = true + }); + } + if (TryDisposeSource()) + { + ResumeHelper.Resume(ref _resume); + } + } + + internal void MoveNextOther() + { + QueueDrainHelper.MoveNext(_other, ref _otherWip, ref _otherDisposeWip, HandleNextOtherAction, this); + } + + private static readonly Action, object> HandleNextOtherAction = (t, state) => ((BufferBoundaryExactEnumerator)state).HandleNextOther(t); + + private bool TryDisposeOther() + { + if (Interlocked.Decrement(ref _otherDisposeWip) != 0) + { + Dispose(_other); + return false; + } + return true; + } + + private void HandleNextOther(Task t) + { + if (t.IsFaulted) + { + ExceptionHelper.AddException(ref _error, ExceptionHelper.Extract(t.Exception)); + _queue.Enqueue(new Entry + { + Done = true + }); + } + else if (t.Result) + { + _queue.Enqueue(new Entry + { + Boundary = true + }); + } + else + { + _queue.Enqueue(new Entry + { + Done = true + }); + } + if (TryDisposeOther()) + { + ResumeHelper.Resume(ref _resume); + } + } + + private void Dispose(IAsyncDisposable disposable) + { + disposable.DisposeAsync() + .AsTask() + .ContinueWith(DisposeHandlerAction, this); + } + + private static readonly Action DisposeHandlerAction = (t, state) => ((BufferBoundaryExactEnumerator)state).DisposeHandler(t); + + private void DisposeHandler(Task t) + { + QueueDrainHelper.DisposeHandler(t, ref _disposeWip, ref _disposeError, _disposeTask); + } + + private struct Entry + { + internal bool Boundary; + internal bool Done; + internal TSource Value; + } + } + } +} diff --git a/async-enumerable-dotnet/impl/CombineLatest.cs b/async-enumerable-dotnet/impl/CombineLatest.cs index 8d82d34..33ccdcb 100644 --- a/async-enumerable-dotnet/impl/CombineLatest.cs +++ b/async-enumerable-dotnet/impl/CombineLatest.cs @@ -4,8 +4,6 @@ using System; using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Text; using System.Threading; using System.Threading.Tasks; @@ -36,7 +34,7 @@ private sealed class CombineLatestEnumerator : IAsyncEnumerator public TResult Current { get; private set; } - private bool once; + private bool _once; private TaskCompletionSource _resume; @@ -68,7 +66,7 @@ public CombineLatestEnumerator(IAsyncEnumerable[] sources, Func MoveNextAsync() { - if (!once) + if (!_once) { - once = true; + _once = true; MoveNextAll(); } @@ -120,7 +118,7 @@ public async ValueTask MoveNextAsync() if (entry.Done) { - if (inner._hasLatest) + if (inner.HasLatest) { _done--; } @@ -131,9 +129,9 @@ public async ValueTask MoveNextAsync() continue; } - if (!inner._hasLatest) + if (!inner.HasLatest) { - inner._hasLatest = true; + inner.HasLatest = true; _latestRemaining--; } @@ -159,7 +157,7 @@ public async ValueTask MoveNextAsync() } } - internal void Dispose(IAsyncDisposable disposable) + private void Dispose(IAsyncDisposable disposable) { disposable.DisposeAsync() .AsTask() @@ -173,7 +171,7 @@ private void DisposeHandler(Task t) QueueDrainHelper.DisposeHandler(t, ref _disposeWip, ref _disposeError, _disposeTask); } - internal void InnerNext(int index, TSource value) + private void InnerNext(int index, TSource value) { _queue.Enqueue(new Entry { @@ -183,7 +181,7 @@ internal void InnerNext(int index, TSource value) }); } - internal void InnerError(int index, Exception ex) + private void InnerError(int index, Exception ex) { ExceptionHelper.AddException(ref _error, ex); _queue.Enqueue(new Entry { @@ -191,44 +189,42 @@ internal void InnerError(int index, Exception ex) }); } - internal void InnerComplete(int index) + private void InnerComplete(int index) { _queue.Enqueue(new Entry { Index = index, Done = true, Value = default }); } - internal void Signal() + private void Signal() { ResumeHelper.Resume(ref _resume); } - struct Entry + private struct Entry { internal int Index; internal TSource Value; internal bool Done; } - internal sealed class InnerHandler + private sealed class InnerHandler { private readonly IAsyncEnumerator _source; private readonly CombineLatestEnumerator _parent; - internal readonly int Index; + private readonly int _index; private int _disposeWip; private int _sourceWip; - internal bool _hasLatest; - - public TSource Current => _source.Current; + internal bool HasLatest; public InnerHandler(IAsyncEnumerator source, CombineLatestEnumerator parent, int index) { _source = source; _parent = parent; - Index = index; + _index = index; } internal void MoveNext() @@ -252,15 +248,15 @@ private void NextHandler(Task t) { if (t.IsFaulted) { - _parent.InnerError(Index, ExceptionHelper.Extract(t.Exception)); + _parent.InnerError(_index, ExceptionHelper.Extract(t.Exception)); } else if (t.Result) { - _parent.InnerNext(Index, _source.Current); + _parent.InnerNext(_index, _source.Current); } else { - _parent.InnerComplete(Index); + _parent.InnerComplete(_index); } if (TryDispose()) { diff --git a/async-enumerable-dotnet/impl/QueueDrainHelper.cs b/async-enumerable-dotnet/impl/QueueDrainHelper.cs index 6882fa3..f7162e8 100644 --- a/async-enumerable-dotnet/impl/QueueDrainHelper.cs +++ b/async-enumerable-dotnet/impl/QueueDrainHelper.cs @@ -3,9 +3,7 @@ // See LICENSE file in the project root for full license information. using System; -using System.Collections.Generic; using System.Runtime.CompilerServices; -using System.Text; using System.Threading; using System.Threading.Tasks; diff --git a/async-enumerable-dotnet/impl/ResumeHelper.cs b/async-enumerable-dotnet/impl/ResumeHelper.cs index 3166eb4..82fc1e3 100644 --- a/async-enumerable-dotnet/impl/ResumeHelper.cs +++ b/async-enumerable-dotnet/impl/ResumeHelper.cs @@ -125,38 +125,6 @@ private static TaskCompletionSource GetOrCreate(ref TaskCompletionSource - /// Complete the task completion source based on the ValueTask's outcome. - /// - /// The target field. - /// The value task - internal static void Complete(TaskCompletionSource tcs, ValueTask task) - { - if (task.IsCanceled) - { - tcs.TrySetCanceled(); - } - else - if (task.IsFaulted) - { - tcs.TrySetException(task.AsTask().Exception); - } - else - if (task.IsCompleted) - { - tcs.TrySetResult(true); - } - else - { - task.AsTask() - .ContinueWith(Completer, - tcs, - TaskContinuationOptions.ExecuteSynchronously - ); - } - } - /// /// Complete the source in the field based on the ValueTask's outcome. /// diff --git a/async-enumerable-dotnet/impl/SlimResume.cs b/async-enumerable-dotnet/impl/SlimResume.cs new file mode 100644 index 0000000..247e3e3 --- /dev/null +++ b/async-enumerable-dotnet/impl/SlimResume.cs @@ -0,0 +1,66 @@ +// Copyright (c) David Karnok & Contributors. +// Licensed under the Apache 2.0 License. +// See LICENSE file in the project root for full license information. + +using System; +using System.Runtime.CompilerServices; +using System.Threading; + +namespace async_enumerable_dotnet.impl +{ + /// + /// A minimal awaitable construct that supports only one awaiter + /// and does not convey any value or exception. + /// + internal sealed class SlimResume : INotifyCompletion + { + private Action _continuation; + + private static readonly Action CompletedAction = () => { }; + + /// + /// The singleton instance of a completed SlimResume. + /// + internal static readonly SlimResume Completed; + + static SlimResume() + { + Completed = new SlimResume(); + Volatile.Write(ref Completed._continuation, CompletedAction); + } + + // ReSharper disable once UnusedMethodReturnValue.Global + public SlimResume GetAwaiter() + { + return this; + } + + public bool IsCompleted => Volatile.Read(ref _continuation) == CompletedAction; + + public void OnCompleted(Action continuation) + { + var old = Interlocked.CompareExchange(ref _continuation, continuation, null); + if (old == CompletedAction) + { + continuation.Invoke(); + } + else + if (old != null) + { + throw new InvalidOperationException("Only one continuation allowed"); + } + } + + // ReSharper disable once MemberCanBeMadeStatic.Global + public void GetResult() + { + // no actual outcome, only resumption + } + + internal void Signal() + { + var prev = Interlocked.Exchange(ref _continuation, CompletedAction); + prev?.Invoke(); + } + } +} diff --git a/async-enumerable-dotnet/impl/SwitchMap.cs b/async-enumerable-dotnet/impl/SwitchMap.cs index e5b1165..6311992 100644 --- a/async-enumerable-dotnet/impl/SwitchMap.cs +++ b/async-enumerable-dotnet/impl/SwitchMap.cs @@ -3,8 +3,6 @@ // See LICENSE file in the project root for full license information. using System; -using System.Collections.Generic; -using System.Text; using System.Threading; using System.Threading.Tasks; @@ -211,19 +209,19 @@ private void DisposeHandler(Task t) QueueDrainHelper.DisposeHandler(t, ref _allDisposeWip, ref _allDisposeError, _disposeTask); } - internal void InnerError(InnerHandler sender, Exception ex) + private void InnerError(InnerHandler sender, Exception ex) { ExceptionHelper.AddException(ref _error, ex); sender.Done = true; Signal(); } - internal void Signal() + private void Signal() { ResumeHelper.Resume(ref _resume); } - internal sealed class InnerHandler + private sealed class InnerHandler { private readonly IAsyncEnumerator _source; @@ -245,10 +243,10 @@ public InnerHandler(IAsyncEnumerator source, SwitchMapEnumerator parent internal void MoveNext() { - QueueDrainHelper.MoveNext(_source, ref _sourceWip, ref _disposeWip, NextHandlerAction, this); + QueueDrainHelper.MoveNext(_source, ref _sourceWip, ref _disposeWip, InnerNextHandlerAction, this); } - private static readonly Action, object> NextHandlerAction = (t, state) => ((InnerHandler)state).Next(t); + private static readonly Action, object> InnerNextHandlerAction = (t, state) => ((InnerHandler)state).Next(t); internal void Dispose() {