diff --git a/README.md b/README.md index 2a144dd..7e0f7b0 100644 --- a/README.md +++ b/README.md @@ -74,6 +74,7 @@ finally - `Reduce` - combine elements with an accumulator and emit the last result - `Repeat` - repeatedly consume the entire source async sequence (up to a number of times and/or condition) - `Retry` - retry a failed async sequence (up to a number of times or based on condition) +- `Sample` - periodically take the latest item from the source sequence and emit it - `Scan` - perform rolling aggregation by emitting intermediate results - `Single` - signals the only item of the async sequence, fails if the sequence has more than one item - `Skip` - skip the first specified number of items of the source async sequence diff --git a/async-enumerable-dotnet-test/SampleTest.cs b/async-enumerable-dotnet-test/SampleTest.cs new file mode 100644 index 0000000..20aafa4 --- /dev/null +++ b/async-enumerable-dotnet-test/SampleTest.cs @@ -0,0 +1,44 @@ +using System; +using Xunit; +using async_enumerable_dotnet; +using System.Threading.Tasks; + +namespace async_enumerable_dotnet_test +{ + public class SampleTest + { + [Fact] + public async void Normal() + { + var t = 200; + if (Environment.GetEnvironmentVariable("CI") != null) + { + t = 2000; + } + + await AsyncEnumerable.Range(1, 5) + .FlatMap(v => + AsyncEnumerable.Timer(TimeSpan.FromMilliseconds(t * v - t / 2)) + .Map(w => v) + ) + .Sample(TimeSpan.FromMilliseconds(t * 2)) + .AssertResult(2, 4, 5); + } + + [Fact] + public async void Last() + { + await AsyncEnumerable.Range(1, 5) + .Sample(TimeSpan.FromMilliseconds(500)) + .AssertResult(5); + } + + [Fact] + public async void Error() + { + await AsyncEnumerable.Error(new InvalidOperationException()) + .Sample(TimeSpan.FromMilliseconds(500)) + .AssertFailure(typeof(InvalidOperationException)); + } + } +} diff --git a/async-enumerable-dotnet/AsyncEnumerable.cs b/async-enumerable-dotnet/AsyncEnumerable.cs index 5d3072a..7333fc8 100644 --- a/async-enumerable-dotnet/AsyncEnumerable.cs +++ b/async-enumerable-dotnet/AsyncEnumerable.cs @@ -1121,5 +1121,17 @@ public static IAsyncEnumerable> ToList(this IAsyncEnumerable sour { return Collect>(source, () => new List(capacityHint), (a, b) => a.Add(b)); } + + /// + /// Periodically take the latest item from the source async sequence and relay it. + /// + /// The element type of the async sequence. + /// The source async sequence to sample. + /// The sampling period. + /// The new IAsyncEnumerable sequence. + public static IAsyncEnumerable Sample(this IAsyncEnumerable source, TimeSpan period) + { + return new Sample(source, period); + } } } diff --git a/async-enumerable-dotnet/impl/ExceptionHelper.cs b/async-enumerable-dotnet/impl/ExceptionHelper.cs index 5b62317..7dc9766 100644 --- a/async-enumerable-dotnet/impl/ExceptionHelper.cs +++ b/async-enumerable-dotnet/impl/ExceptionHelper.cs @@ -16,6 +16,14 @@ internal sealed class ExceptionHelper /// internal static readonly Exception Terminated = new TerminatedException(); + /// + /// Atomically aggregate the given exception into the target field + /// or return false if the field contains the terminated exception indicator. + /// + /// The target field + /// The exception to aggregate + /// True if successful, false if the field already has the terminated + /// indicator. internal static bool AddException(ref Exception field, Exception ex) { for (; ;) @@ -47,17 +55,41 @@ internal static bool AddException(ref Exception field, Exception ex) } } + /// + /// Atomically swap in the terminated indicator and return the + /// previous exception (may be null if none). + /// + /// The target field. + /// The last exception or null if no exceptions were in the field. internal static Exception Terminate(ref Exception field) { return Interlocked.Exchange(ref field, Terminated); } - internal sealed class TerminatedException : Exception + /// + /// An exception indicating a terminal state within an Exception field. + /// + sealed class TerminatedException : Exception { internal TerminatedException() : base("No further exceptions.") { } } + + /// + /// If the given exception is of an AggregateException with + /// only a single inner exception, extract it. + /// + /// The exception to un-aggregate + /// The inner solo exception or . + internal static Exception Unaggregate(Exception ex) + { + if (ex is AggregateException g && g.InnerExceptions.Count == 1) + { + return g.InnerExceptions[0]; + } + return ex; + } } } diff --git a/async-enumerable-dotnet/impl/ResumeHelper.cs b/async-enumerable-dotnet/impl/ResumeHelper.cs index 57a0122..97a855f 100644 --- a/async-enumerable-dotnet/impl/ResumeHelper.cs +++ b/async-enumerable-dotnet/impl/ResumeHelper.cs @@ -53,5 +53,84 @@ internal static void Clear(ref TaskCompletionSource resume) Interlocked.Exchange(ref resume, null); } + /// + /// Create an action that takes a Task and sets the given + /// TaskCompletionSource to the same state. + /// + /// The element type of the source task + /// The TaskCompletionSource to complete/fault based on the task. + /// The new action + internal static Action> ResumeWith(TaskCompletionSource tcs) + { + return t => + { + if (t.IsCanceled) + { + tcs.TrySetCanceled(); + } + else if (t.IsFaulted) + { + tcs.TrySetException(t.Exception); + } + else + { + tcs.TrySetResult(t.Result); + } + }; + } + + /// + /// Create an action that takes a Task and sets the given + /// TaskCompletionSource to the same state. + /// + /// The TaskCompletionSource to complete/fault based on the task. + /// The new action + internal static Action ResumeWith(TaskCompletionSource tcs) + { + return t => + { + if (t.IsCanceled) + { + tcs.TrySetCanceled(); + } + else if (t.IsFaulted) + { + tcs.TrySetException(t.Exception); + } + else + { + tcs.TrySetResult(true); // by convention + } + }; + } + + /// + /// Terminates the given TaskCompletionSource if the ValueTask completed + /// or adds a continuation to it which will set the completion state on + /// The TCS. + /// + /// The task that will be completed. + /// The task completion source to terminate. + internal static void ResumeWhen(ValueTask task, TaskCompletionSource tcs) + { + if (task.IsCanceled) + { + tcs.TrySetCanceled(); + } + else + if (task.IsFaulted) + { + tcs.TrySetException(task.AsTask().Exception); + } + else + if (task.IsCompleted) + { + tcs.TrySetResult(true); // by convention + } + else + { + task.AsTask().ContinueWith(ResumeWith(tcs)); + } + } } } diff --git a/async-enumerable-dotnet/impl/Sample.cs b/async-enumerable-dotnet/impl/Sample.cs new file mode 100644 index 0000000..634db1f --- /dev/null +++ b/async-enumerable-dotnet/impl/Sample.cs @@ -0,0 +1,172 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace async_enumerable_dotnet.impl +{ + internal sealed class Sample : IAsyncEnumerable + { + readonly IAsyncEnumerable source; + + readonly TimeSpan period; + + public Sample(IAsyncEnumerable source, TimeSpan period) + { + this.source = source; + this.period = period; + } + + public IAsyncEnumerator GetAsyncEnumerator() + { + var en = new SampleEnumerator(source.GetAsyncEnumerator(), period); + en.StartTimer(); + en.MoveNext(); + return en; + } + + internal sealed class SampleEnumerator : IAsyncEnumerator + { + readonly IAsyncEnumerator source; + + readonly TimeSpan period; + + readonly CancellationTokenSource cts; + + int consumerWip; + + TaskCompletionSource resume; + + object latest; + volatile bool done; + Exception error; + + long wip; + + int disposeWip; + + readonly TaskCompletionSource disposeTask; + + public T Current { get; private set; } + + static readonly object EmptyIndicator = new object(); + + public SampleEnumerator(IAsyncEnumerator source, TimeSpan period) + { + this.source = source; + this.period = period; + this.disposeTask = new TaskCompletionSource(); + this.cts = new CancellationTokenSource(); + Volatile.Write(ref latest, EmptyIndicator); + } + + public ValueTask DisposeAsync() + { + cts.Cancel(); + if (Interlocked.Increment(ref disposeWip) == 1) + { + return source.DisposeAsync(); + } + return new ValueTask(disposeTask.Task); + } + + public async ValueTask MoveNextAsync() + { + for (; ; ) + { + bool d = done; + var v = Interlocked.Exchange(ref latest, EmptyIndicator); + + if (d && v == EmptyIndicator) + { + if (error != null) + { + throw error; + } + return false; + } + else if (v != EmptyIndicator) + { + Current = (T)v; + return true; + } + + if (Volatile.Read(ref wip) == 0) + { + await ResumeHelper.Resume(ref resume).Task; + } + ResumeHelper.Clear(ref resume); + Interlocked.Exchange(ref wip, 0); + } + } + + internal void StartTimer() + { + Task.Delay(period, cts.Token) + .ContinueWith(t => HandleTimer(t), cts.Token); + } + + void HandleTimer(Task timer) + { + Signal(); + StartTimer(); + } + + void Signal() + { + if (Interlocked.Increment(ref wip) == 1) + { + ResumeHelper.Resume(ref resume).TrySetResult(true); + } + } + + internal void MoveNext() + { + if (Interlocked.Increment(ref consumerWip) == 1) + { + do + { + if (Interlocked.Increment(ref disposeWip) == 1) + { + source.MoveNextAsync() + .AsTask().ContinueWith(t => Handler(t)); + } + else + { + break; + } + } + while (Interlocked.Decrement(ref consumerWip) != 0); + } + } + + void Handler(Task t) + { + if (Interlocked.Decrement(ref disposeWip) != 0) + { + ResumeHelper.ResumeWhen(source.DisposeAsync(), disposeTask); + } + else if (t.IsFaulted) + { + error = ExceptionHelper.Unaggregate(t.Exception); + done = true; + cts.Cancel(); + Signal(); + } + else if (t.Result) + { + Interlocked.Exchange(ref latest, source.Current); + // resumption will be triggered by the timer + MoveNext(); + } + else + { + done = true; + cts.Cancel(); + Signal(); + } + } + } + } +}