Skip to content

Commit

Permalink
+ Replay, fix style & headers
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Oct 30, 2018
1 parent 379b6e2 commit 731b7b7
Show file tree
Hide file tree
Showing 15 changed files with 334 additions and 190 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ finally
- `FromObservable` - convert an `IObservable` into an `IAsyncEnumerable`
- `Interval` - periodically signal an ever increasing number
- `Just` - emit a single constant value
- `Merge` - run multiple sources at once and merge their items into a single async sequence
- `Never` - the async sequence never produces any items and never terminates
- `Range` - emit a range of numbers
- `Timer` - emit zero after some time delay
Expand All @@ -72,10 +73,13 @@ finally
- `Last` - signals the last item of the async sequence
- `Latest` - runs the source async sequence as fast as it can and samples it with the frequency of the consumer
- `Map` - transform one source value into some other value
- `MergeWith` - run two async sources at once and merge their items into a single async sequence
- `OnErrorResumeNext` - if the main source fails, switch to an alternative source
- `Prefetch` - run the source async sequence to prefetch items for a slow consumer
- `Publish` - consume an async sequence once while multicasting its items to intermediate consumers for the duration of a function.
- `Reduce` - combine elements with an accumulator and emit the last result
- `Repeat` - repeatedly consume the entire source async sequence (up to a number of times and/or condition)
- `Replay` - consume an async sequence once, caching some or all of its items and multicasting them to intermediate consumers for the duration of a function.
- `Retry` - retry a failed async sequence (up to a number of times or based on condition)
- `Sample` - periodically take the latest item from the source sequence and emit it
- `Scan` - perform rolling aggregation by emitting intermediate results
Expand Down
5 changes: 2 additions & 3 deletions async-enumerable-dotnet-benchmark/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

using async_enumerable_dotnet;
using System;
using System.Threading.Tasks;

namespace async_enumerable_dotnet_benchmark
{
Expand All @@ -15,14 +14,14 @@ class Program
/// <summary>
/// Don't worry about this program yet. I'm using it to
/// diagnose await hangs and internal state that is otherwise
/// hard (or I don't know how) to debug as an Xunit test.
/// hard (or I don't know how) to debug as an XUnit test.
/// </summary>
/// <param name="args"></param>
// ReSharper disable once UnusedParameter.Local
// ReSharper disable once ArrangeTypeMemberModifiers
static void Main(string[] args)
{
for (int i = 0; i < 100000; i++)
for (var i = 0; i < 100000; i++)
{
if (i % 100 == 0)
{
Expand Down
17 changes: 11 additions & 6 deletions async-enumerable-dotnet-test/LicenseHeader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,17 @@ private static void VisitSources(string path)
var found = false;

var ci = Environment.GetEnvironmentVariable("CI") != null;


var sb = new StringBuilder();

foreach (var entry in Directory.EnumerateFiles(path, "*.cs", SearchOption.AllDirectories))
{
if (entry.Contains("AssemblyInfo")
|| entry.Contains("Temporary")
|| entry.Contains("/obj/"))
var entryForward = entry.Replace("\\", "/");
if (entryForward.Contains("AssemblyInfo")
|| entryForward.Contains("Temporary")
|| entryForward.Contains("/obj/")
|| entryForward.Contains("/Debug/")
|| entryForward.Contains("/Release/"))
{
continue;
}
Expand All @@ -69,7 +74,7 @@ private static void VisitSources(string path)
}
if (!text.StartsWith(HeaderLines))
{
Console.WriteLine("Missing header: " + entry);
sb.Append(entry).Append("\r\n");
found = true;
if (!ci)
{
Expand All @@ -80,7 +85,7 @@ private static void VisitSources(string path)

if (found)
{
throw new InvalidOperationException("Missing header found and added. Please rebuild the project of " + path);
throw new InvalidOperationException("Missing header found and added. Please rebuild the project of " + path + "\r\n" + sb);
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions async-enumerable-dotnet-test/MergeTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public async void Error()
[Fact]
public async void Push()
{
for (int i = 0; i < 10; i++)
for (var i = 0; i < 10; i++)
{
var push = new MulticastAsyncEnumerable<int>();

Expand All @@ -84,7 +84,7 @@ public async void Push()

var t = Task.Run(async () =>
{
for (int j = 0; j < 100_000; j++)
for (var j = 0; j < 100_000; j++)
{
await push.Next(j);
}
Expand All @@ -104,7 +104,7 @@ public async void Push()
[Fact]
public async void Multicast_Merge()
{
for (int i = 0; i < 100000; i++)
for (var i = 0; i < 100000; i++)
{
await AsyncEnumerable.Range(1, 5)
.Publish(a => a.Take(3).MergeWith(a.Skip(3)))
Expand Down
1 change: 0 additions & 1 deletion async-enumerable-dotnet-test/MergeWithTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
// See LICENSE file in the project root for full license information.

using Xunit;
using System.Threading.Tasks;
using async_enumerable_dotnet;

namespace async_enumerable_dotnet_test
Expand Down
12 changes: 11 additions & 1 deletion async-enumerable-dotnet-test/PublishTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
// See LICENSE file in the project root for full license information.

using Xunit;
using System.Threading.Tasks;
using async_enumerable_dotnet;
using System;

namespace async_enumerable_dotnet_test
{
Expand Down Expand Up @@ -81,5 +81,15 @@ await AsyncEnumerable.Range(1, 5)
)
.AssertResult(1, 2, 3, 4, 5);
}


[Fact]
public async void Handler_Crash()
{
await AsyncEnumerable.Range(1, 5)
.Publish<int, int>(v => throw new InvalidOperationException())
.AssertFailure(typeof(InvalidOperationException));
}

}
}
61 changes: 61 additions & 0 deletions async-enumerable-dotnet-test/ReplayTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright (c) David Karnok & Contributors.
// Licensed under the Apache 2.0 License.
// See LICENSE file in the project root for full license information.

using Xunit;
using async_enumerable_dotnet;
using System;

namespace async_enumerable_dotnet_test
{
public class ReplayTest
{
[Fact]
public async void All_Direct()
{
await AsyncEnumerable.Range(1, 5)
.Replay(v => v)
.AssertResult(1, 2, 3, 4, 5);
}

[Fact]
public async void All_Simple()
{
await AsyncEnumerable.Range(1, 5)
.Replay(v => v.Map(w => w + 1))
.AssertResult(2, 3, 4, 5, 6);
}

[Fact]
public async void All_Take()
{
await AsyncEnumerable.Range(1, 5)
.Replay(v => v.Take(3))
.AssertResult(1, 2, 3);
}

[Fact]
public async void All_Recombine()
{
await AsyncEnumerable.Range(1, 5)
.Replay(v => v.Take(3).ConcatWith(v.Skip(3)))
.AssertResult(1, 2, 3, 4, 5);
}

[Fact]
public async void All_Twice()
{
await AsyncEnumerable.Range(1, 5)
.Replay(v => v.ConcatWith(v))
.AssertResult(1, 2, 3, 4, 5, 1, 2, 3, 4, 5);
}

[Fact]
public async void All_Handler_Crash()
{
await AsyncEnumerable.Range(1, 5)
.Replay<int, int>(v => throw new InvalidOperationException())
.AssertFailure(typeof(InvalidOperationException));
}
}
}
1 change: 0 additions & 1 deletion async-enumerable-dotnet-test/UnitTest1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
// See LICENSE file in the project root for full license information.

using Xunit;
using System.Threading.Tasks;
using async_enumerable_dotnet;

namespace async_enumerable_dotnet_test
Expand Down
32 changes: 25 additions & 7 deletions async-enumerable-dotnet/AsyncEnumerable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1414,15 +1414,33 @@ public static IAsyncEnumerable<TSource> MergeWith<TSource>(this IAsyncEnumerable
public static IAsyncEnumerable<TSource> Merge<TSource>(params IAsyncEnumerable<TSource>[] sources)
{
RequireNonNull(sources, nameof(sources));
if (sources.Length == 0)
switch (sources.Length)
{
return Empty<TSource>();
case 0:
return Empty<TSource>();
case 1:
return sources[0];
default:
return new Merge<TSource>(sources);
}
if (sources.Length == 1)
{
return sources[0];
}
return new Merge<TSource>(sources);
}

/// <summary>
/// Shares and multicasts the source async sequence, caching some or
/// all of its items, for the duration
/// of a function and relays items from the returned async sequence.
/// </summary>
/// <typeparam name="TSource">The element type of the source.</typeparam>
/// <typeparam name="TResult">The result type.</typeparam>
/// <param name="source">The source async sequence to multicast.</param>
/// <param name="func">The function to transform the sequence without
/// consuming it multiple times.</param>
/// <returns>The new IAsyncEnumerable sequence.</returns>
public static IAsyncEnumerable<TResult> Replay<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<IAsyncEnumerable<TSource>, IAsyncEnumerable<TResult>> func)
{
RequireNonNull(source, nameof(source));
RequireNonNull(func, nameof(func));
return new Replay<TSource, TResult>(source, func);
}
}
}
17 changes: 11 additions & 6 deletions async-enumerable-dotnet/async-enumerable-dotnet.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,25 @@
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<LangVersion>latest</LangVersion>
<PackageId>akarnokd.async-enumerable-dotnet</PackageId>
<Version>0.0.1.5</Version>
<Version>0.0.1.6</Version>
<Authors>David Karnok</Authors>
<Company />
<AssemblyVersion>0.0.1.5</AssemblyVersion>
<FileVersion>0.0.1.5</FileVersion>
<AssemblyVersion>0.0.1.6</AssemblyVersion>
<FileVersion>0.0.1.6</FileVersion>
<PackageTags>async, concurrency, async-enumerable, operators, async-sequence</PackageTags>
<RepositoryUrl>https://github.com/akarnokd/async-enumerable-dotnet</RepositoryUrl>
<PackageProjectUrl>https://github.com/akarnokd/async-enumerable-dotnet#getting-started</PackageProjectUrl>
<Description>Experimental operators for the upcoming C# 8 IAsyncEnumerables.</Description>
<Copyright>(C) David Karnok</Copyright>
<PackageLicenseUrl>https://www.apache.org/licenses/LICENSE-2.0</PackageLicenseUrl>
<PackageReleaseNotes>- Internal notification mechanism reworked.
- Fixed a few Current/DisposeAsync races
- Renamed certain type arguments to conform the C# naming conventions.</PackageReleaseNotes>
<PackageReleaseNotes>Bugfixes
- FlatMap dispose now awaits the dispose of the inner sources.

New operators:
- Merge
- MergeWith
- Publish
- Replay</PackageReleaseNotes>
<RepositoryType>Github</RepositoryType>
<Product>Async Enumerable operators for .NET</Product>
</PropertyGroup>
Expand Down
2 changes: 1 addition & 1 deletion async-enumerable-dotnet/impl/FlatMap.cs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ private void Handle(Task<bool> task)
IAsyncEnumerator<TResult> innerSource;
try
{
innerSource = _mapper(_source.Current)
innerSource = _mapper(v)
.GetAsyncEnumerator();
}
catch (Exception ex)
Expand Down
15 changes: 8 additions & 7 deletions async-enumerable-dotnet/impl/Merge.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ private sealed class MergeEnumerator : IAsyncEnumerator<TSource>

private bool _once;

// ReSharper disable once SuggestBaseTypeForParameter
public MergeEnumerator(IAsyncEnumerable<TSource>[] sources)
{
var n = sources.Length;
Expand Down Expand Up @@ -111,7 +112,7 @@ private void Dispose(IAsyncDisposable disposable)

private static readonly Action<Task, object> DisposeHandlerAction = (t, state) => ((MergeEnumerator)state).DisposeHandler(t);

void DisposeHandler(Task t)
private void DisposeHandler(Task t)
{
if (t.IsFaulted)
{
Expand All @@ -137,20 +138,20 @@ private void Signal()
ResumeHelper.Resume(ref _resume);
}

internal void InnerNext(InnerHandler sender, TSource item)
private void InnerNext(InnerHandler sender, TSource item)
{
_queue.Enqueue(new Entry { Sender = sender, Value = item });
Signal();
}

internal void InnerError(InnerHandler sender, Exception ex)
private void InnerError(Exception ex)
{
ExceptionHelper.AddException(ref _error, ex);
Interlocked.Decrement(ref _done);
Signal();
}

internal void InnerComplete(InnerHandler sender)
private void InnerComplete()
{
Interlocked.Decrement(ref _done);
Signal();
Expand All @@ -162,7 +163,7 @@ private struct Entry
internal InnerHandler Sender;
}

internal sealed class InnerHandler
private sealed class InnerHandler
{
private readonly IAsyncEnumerator<TSource> _source;

Expand Down Expand Up @@ -232,7 +233,7 @@ private void Next(Task<bool> t)
_done = true;
if (TryDispose())
{
_parent.InnerError(this, ExceptionHelper.Extract(t.Exception));
_parent.InnerError(ExceptionHelper.Extract(t.Exception));
}
}
else if (t.Result)
Expand All @@ -248,7 +249,7 @@ private void Next(Task<bool> t)
_done = true;
if (TryDispose())
{
_parent.InnerComplete(this);
_parent.InnerComplete();
}
}

Expand Down
Loading

0 comments on commit 731b7b7

Please sign in to comment.