Skip to content

Commit

Permalink
+ Sample, some tools
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Oct 25, 2018
1 parent c0a382c commit b259def
Show file tree
Hide file tree
Showing 6 changed files with 341 additions and 1 deletion.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ finally
- `Reduce` - combine elements with an accumulator and emit the last result
- `Repeat` - repeatedly consume the entire source async sequence (up to a number of times and/or condition)
- `Retry` - retry a failed async sequence (up to a number of times or based on condition)
- `Sample` - periodically take the latest item from the source sequence and emit it
- `Scan` - perform rolling aggregation by emitting intermediate results
- `Single` - signals the only item of the async sequence, fails if the sequence has more than one item
- `Skip` - skip the first specified number of items of the source async sequence
Expand Down
44 changes: 44 additions & 0 deletions async-enumerable-dotnet-test/SampleTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
using System;
using Xunit;
using async_enumerable_dotnet;
using System.Threading.Tasks;

namespace async_enumerable_dotnet_test
{
public class SampleTest
{
[Fact]
public async void Normal()
{
var t = 200;
if (Environment.GetEnvironmentVariable("CI") != null)
{
t = 2000;
}

await AsyncEnumerable.Range(1, 5)
.FlatMap(v =>
AsyncEnumerable.Timer(TimeSpan.FromMilliseconds(t * v - t / 2))
.Map(w => v)
)
.Sample(TimeSpan.FromMilliseconds(t * 2))
.AssertResult(2, 4, 5);
}

[Fact]
public async void Last()
{
await AsyncEnumerable.Range(1, 5)
.Sample(TimeSpan.FromMilliseconds(500))
.AssertResult(5);
}

[Fact]
public async void Error()
{
await AsyncEnumerable.Error<int>(new InvalidOperationException())
.Sample(TimeSpan.FromMilliseconds(500))
.AssertFailure(typeof(InvalidOperationException));
}
}
}
12 changes: 12 additions & 0 deletions async-enumerable-dotnet/AsyncEnumerable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1121,5 +1121,17 @@ public static IAsyncEnumerable<IList<T>> ToList<T>(this IAsyncEnumerable<T> sour
{
return Collect<T, IList<T>>(source, () => new List<T>(capacityHint), (a, b) => a.Add(b));
}

/// <summary>
/// Periodically take the latest item from the source async sequence and relay it.
/// </summary>
/// <typeparam name="T">The element type of the async sequence.</typeparam>
/// <param name="source">The source async sequence to sample.</param>
/// <param name="period">The sampling period.</param>
/// <returns>The new IAsyncEnumerable sequence.</returns>
public static IAsyncEnumerable<T> Sample<T>(this IAsyncEnumerable<T> source, TimeSpan period)
{
return new Sample<T>(source, period);
}
}
}
34 changes: 33 additions & 1 deletion async-enumerable-dotnet/impl/ExceptionHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ internal sealed class ExceptionHelper
/// </summary>
internal static readonly Exception Terminated = new TerminatedException();

/// <summary>
/// Atomically aggregate the given exception into the target field
/// or return false if the field contains the terminated exception indicator.
/// </summary>
/// <param name="field">The target field</param>
/// <param name="ex">The exception to aggregate</param>
/// <returns>True if successful, false if the field already has the terminated
/// indicator.</returns>
internal static bool AddException(ref Exception field, Exception ex)
{
for (; ;)
Expand Down Expand Up @@ -47,17 +55,41 @@ internal static bool AddException(ref Exception field, Exception ex)
}
}

/// <summary>
/// Atomically swap in the terminated indicator and return the
/// previous exception (may be null if none).
/// </summary>
/// <param name="field">The target field.</param>
/// <returns>The last exception or null if no exceptions were in the field.</returns>
internal static Exception Terminate(ref Exception field)
{
return Interlocked.Exchange(ref field, Terminated);
}

internal sealed class TerminatedException : Exception
/// <summary>
/// An exception indicating a terminal state within an Exception field.
/// </summary>
sealed class TerminatedException : Exception
{
internal TerminatedException() : base("No further exceptions.")
{

}
}

/// <summary>
/// If the given exception is of an AggregateException with
/// only a single inner exception, extract it.
/// </summary>
/// <param name="ex">The exception to un-aggregate</param>
/// <returns>The inner solo exception or <paramref name="ex"/>.</returns>
internal static Exception Unaggregate(Exception ex)
{
if (ex is AggregateException g && g.InnerExceptions.Count == 1)
{
return g.InnerExceptions[0];
}
return ex;
}
}
}
79 changes: 79 additions & 0 deletions async-enumerable-dotnet/impl/ResumeHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,84 @@ internal static void Clear<U>(ref TaskCompletionSource<U> resume)
Interlocked.Exchange(ref resume, null);
}

/// <summary>
/// Create an action that takes a Task and sets the given
/// TaskCompletionSource to the same state.
/// </summary>
/// <typeparam name="T">The element type of the source task</typeparam>
/// <param name="tcs">The TaskCompletionSource to complete/fault based on the task.</param>
/// <returns>The new action</returns>
internal static Action<Task<T>> ResumeWith<T>(TaskCompletionSource<T> tcs)
{
return t =>
{
if (t.IsCanceled)
{
tcs.TrySetCanceled();
}
else if (t.IsFaulted)
{
tcs.TrySetException(t.Exception);
}
else
{
tcs.TrySetResult(t.Result);
}
};
}

/// <summary>
/// Create an action that takes a Task and sets the given
/// TaskCompletionSource to the same state.
/// </summary>
/// <param name="tcs">The TaskCompletionSource to complete/fault based on the task.</param>
/// <returns>The new action</returns>
internal static Action<Task> ResumeWith(TaskCompletionSource<bool> tcs)
{
return t =>
{
if (t.IsCanceled)
{
tcs.TrySetCanceled();
}
else if (t.IsFaulted)
{
tcs.TrySetException(t.Exception);
}
else
{
tcs.TrySetResult(true); // by convention
}
};
}

/// <summary>
/// Terminates the given TaskCompletionSource if the ValueTask completed
/// or adds a continuation to it which will set the completion state on
/// The TCS.
/// </summary>
/// <param name="task">The task that will be completed.</param>
/// <param name="tcs">The task completion source to terminate.</param>
internal static void ResumeWhen(ValueTask task, TaskCompletionSource<bool> tcs)
{
if (task.IsCanceled)
{
tcs.TrySetCanceled();
}
else
if (task.IsFaulted)
{
tcs.TrySetException(task.AsTask().Exception);
}
else
if (task.IsCompleted)
{
tcs.TrySetResult(true); // by convention
}
else
{
task.AsTask().ContinueWith(ResumeWith(tcs));
}
}
}
}
172 changes: 172 additions & 0 deletions async-enumerable-dotnet/impl/Sample.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace async_enumerable_dotnet.impl
{
internal sealed class Sample<T> : IAsyncEnumerable<T>
{
readonly IAsyncEnumerable<T> source;

readonly TimeSpan period;

public Sample(IAsyncEnumerable<T> source, TimeSpan period)
{
this.source = source;
this.period = period;
}

public IAsyncEnumerator<T> GetAsyncEnumerator()
{
var en = new SampleEnumerator(source.GetAsyncEnumerator(), period);
en.StartTimer();
en.MoveNext();
return en;
}

internal sealed class SampleEnumerator : IAsyncEnumerator<T>
{
readonly IAsyncEnumerator<T> source;

readonly TimeSpan period;

readonly CancellationTokenSource cts;

int consumerWip;

TaskCompletionSource<bool> resume;

object latest;
volatile bool done;
Exception error;

long wip;

int disposeWip;

readonly TaskCompletionSource<bool> disposeTask;

public T Current { get; private set; }

static readonly object EmptyIndicator = new object();

public SampleEnumerator(IAsyncEnumerator<T> source, TimeSpan period)
{
this.source = source;
this.period = period;
this.disposeTask = new TaskCompletionSource<bool>();
this.cts = new CancellationTokenSource();
Volatile.Write(ref latest, EmptyIndicator);
}

public ValueTask DisposeAsync()
{
cts.Cancel();
if (Interlocked.Increment(ref disposeWip) == 1)
{
return source.DisposeAsync();
}
return new ValueTask(disposeTask.Task);
}

public async ValueTask<bool> MoveNextAsync()
{
for (; ; )
{
bool d = done;
var v = Interlocked.Exchange(ref latest, EmptyIndicator);

if (d && v == EmptyIndicator)
{
if (error != null)
{
throw error;
}
return false;
}
else if (v != EmptyIndicator)
{
Current = (T)v;
return true;
}

if (Volatile.Read(ref wip) == 0)
{
await ResumeHelper.Resume(ref resume).Task;
}
ResumeHelper.Clear(ref resume);
Interlocked.Exchange(ref wip, 0);
}
}

internal void StartTimer()
{
Task.Delay(period, cts.Token)
.ContinueWith(t => HandleTimer(t), cts.Token);
}

void HandleTimer(Task timer)
{
Signal();
StartTimer();
}

void Signal()
{
if (Interlocked.Increment(ref wip) == 1)
{
ResumeHelper.Resume(ref resume).TrySetResult(true);
}
}

internal void MoveNext()
{
if (Interlocked.Increment(ref consumerWip) == 1)
{
do
{
if (Interlocked.Increment(ref disposeWip) == 1)
{
source.MoveNextAsync()
.AsTask().ContinueWith(t => Handler(t));
}
else
{
break;
}
}
while (Interlocked.Decrement(ref consumerWip) != 0);
}
}

void Handler(Task<bool> t)
{
if (Interlocked.Decrement(ref disposeWip) != 0)
{
ResumeHelper.ResumeWhen(source.DisposeAsync(), disposeTask);
}
else if (t.IsFaulted)
{
error = ExceptionHelper.Unaggregate(t.Exception);
done = true;
cts.Cancel();
Signal();
}
else if (t.Result)
{
Interlocked.Exchange(ref latest, source.Current);
// resumption will be triggered by the timer
MoveNext();
}
else
{
done = true;
cts.Cancel();
Signal();
}
}
}
}
}

0 comments on commit b259def

Please sign in to comment.