Skip to content

Commit

Permalink
+CombineLatest
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Nov 2, 2018
1 parent e2d525e commit 2ae0748
Show file tree
Hide file tree
Showing 4 changed files with 400 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ finally

- `Amb` - Relay items of the source that responds first, disposing the others
- `Create` - generate values via async push
- `CombineLatest` - combines the latest items of the source async sequences via a function into results
- `Concat` - concatenate multiple async sequences
- `Defer` - defer the creation of the actual `IAsyncEnumerable`
- `Error` - signal an error
Expand Down
92 changes: 92 additions & 0 deletions async-enumerable-dotnet-test/CombineLatestTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright (c) David Karnok & Contributors.
// Licensed under the Apache 2.0 License.
// See LICENSE file in the project root for full license information.

using Xunit;
using async_enumerable_dotnet;
using System.Linq;
using System;

namespace async_enumerable_dotnet_test
{
public class CombineLatestTest
{
[Fact]
public async void Empty()
{
await AsyncEnumerable.CombineLatest<int, int>(v => v.Sum())
.AssertResult();
}

[Fact]
public async void Single()
{
await AsyncEnumerable.CombineLatest(v => v.Sum() + 1,
AsyncEnumerable.Just(1))
.AssertResult(2);
}

[Fact]
public async void One_Item_Each()
{
await AsyncEnumerable.CombineLatest(v => v.Sum(), AsyncEnumerable.Just(1), AsyncEnumerable.Just(2))
.AssertResult(3);
}

[Fact]
public async void One_Is_Empty()
{
await AsyncEnumerable.CombineLatest(v => v.Sum(), AsyncEnumerable.Empty<int>(), AsyncEnumerable.Just(2))
.AssertResult();
}

[Fact]
public async void Two_Is_Empty()
{
await AsyncEnumerable.CombineLatest(v => v.Sum(), AsyncEnumerable.Just(1), AsyncEnumerable.Empty<int>())
.AssertResult();
}

[Fact]
public async void ZigZag()
{
var t = 200;
if (Environment.GetEnvironmentVariable("CI") != null)
{
t = 2000;
}
await AsyncEnumerable.CombineLatest(v => v.Sum(),
AsyncEnumerable.Interval(1, 5, TimeSpan.FromMilliseconds(t)),
AsyncEnumerable.Interval(1, 5, TimeSpan.FromMilliseconds(t + t / 2), TimeSpan.FromMilliseconds(t)).Map(v => v * 10)
)
.AssertResult(11, 12, 22, 23, 33, 34, 44, 45, 55);
}

[Fact]
public async void Second_Many()
{
var t = 200;
if (Environment.GetEnvironmentVariable("CI") != null)
{
t = 2000;
}
await AsyncEnumerable.CombineLatest(v => v.Sum(),
AsyncEnumerable.Just(10L),
AsyncEnumerable.Interval(1, 5, TimeSpan.FromMilliseconds(t + t / 2), TimeSpan.FromMilliseconds(t))
)
.AssertResult(11, 12, 13, 14, 15);
}

[Fact]
public async void Error()
{
await AsyncEnumerable.CombineLatest(v => v.Sum(),
AsyncEnumerable.Just<int>(1),
AsyncEnumerable.Just(2).ConcatWith(
AsyncEnumerable.Error<int>(new InvalidOperationException())
)
)
.AssertFailure(typeof(InvalidOperationException), 3);
}
}
}
26 changes: 26 additions & 0 deletions async-enumerable-dotnet/AsyncEnumerable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1551,5 +1551,31 @@ public static IAsyncEnumerable<TSource> Merge<TSource>(this IAsyncEnumerable<IAs
RequireNonNull(func, nameof(func));
return new WithLatestFrom<TSource, TOther, TResult>(source, other, func);
}

/// <summary>
/// Combines the latest items from each async source into a single
/// sequence of results via a combiner function.
/// </summary>
/// <typeparam name="TSource">The element type of the sources.</typeparam>
/// <typeparam name="TResult">The result type.</typeparam>
/// <param name="combiner">The function that receives the latest elements
/// of all sources (if they all produced an item at least) and should
/// a value to be emitted to the consumer.</param>
/// <param name="sources">The params array of the async sequences to combine.</param>
/// <returns>The new IAsyncEnumerable sequence.</returns>
public static IAsyncEnumerable<TResult> CombineLatest<TSource, TResult>(Func<TSource[], TResult> combiner, params IAsyncEnumerable<TSource>[] sources)
{
RequireNonNull(sources, nameof(sources));
RequireNonNull(combiner, nameof(combiner));
if (sources.Length == 0)
{
return Empty<TResult>();
}
if (sources.Length == 1)
{
return sources[0].Map(v => combiner(new[] { v }));
}
return new CombineLatest<TSource, TResult>(sources, combiner);
}
}
}
Loading

0 comments on commit 2ae0748

Please sign in to comment.