Skip to content

Commit

Permalink
+Buffer with boundary
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Nov 2, 2018
1 parent 2ae0748 commit 66fce5a
Show file tree
Hide file tree
Showing 14 changed files with 658 additions and 91 deletions.
122 changes: 122 additions & 0 deletions async-enumerable-dotnet-test/BufferBoundaryTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright (c) David Karnok & Contributors.
// Licensed under the Apache 2.0 License.
// See LICENSE file in the project root for full license information.

using Xunit;
using async_enumerable_dotnet;
using System;
using System.Collections.Generic;

namespace async_enumerable_dotnet_test
{
public class BufferBoundaryTest
{
[Fact]
public async void Size()
{
await AsyncEnumerable.Range(1, 5)
.Buffer(AsyncEnumerable.Timer(TimeSpan.FromMilliseconds(200)), 2)
.AssertResult(
new List<int>(new [] { 1, 2 }),
new List<int>(new[] { 3, 4 }),
new List<int>(new[] { 5 })
);
}

[Fact]
public async void Size_Collection()
{
await AsyncEnumerable.Range(1, 5)
.Buffer(AsyncEnumerable.Timer(TimeSpan.FromMilliseconds(200)), () => new HashSet<int>(), 2)
.AssertResult(
new HashSet<int>(new [] { 1, 2 }),
new HashSet<int>(new[] { 3, 4 }),
new HashSet<int>(new[] { 5 })
);
}

[Fact]
public async void Time()
{
var t = 100L;
if (Environment.GetEnvironmentVariable("CI") != null)
{
t = 1000L;
}

await
TestHelper.TimeSequence(
t, t,
3 * t, 3 * t, 3 * t,
7 * t, 7 * t
)
.Buffer(AsyncEnumerable.Interval(TimeSpan.FromMilliseconds(2 * t)))
.AssertResult(
new List<long>(new[] { t, t }),
new List<long>(new[] { 3 * t, 3 * t, 3 * t }),
new List<long>(),
new List<long>(new[] { 7 * t, 7 * t })
);

}

[Fact]
public async void Error_Main()
{
await AsyncEnumerable.Error<int>(new InvalidOperationException())
.Buffer(AsyncEnumerable.Timer(TimeSpan.FromMilliseconds(200)), 2)
.AssertFailure(typeof(InvalidOperationException));
}

[Fact]
public async void Error_Other()
{
await AsyncEnumerable.Timer(TimeSpan.FromMilliseconds(200))
.Buffer(AsyncEnumerable.Error<int>(new InvalidOperationException()), 2)
.AssertFailure(typeof(InvalidOperationException));
}

[Fact]
public async void Empty_Main()
{
await AsyncEnumerable.Empty<int>()
.Buffer(AsyncEnumerable.Timer(TimeSpan.FromMilliseconds(200)), 2)
.AssertResult();
}

[Fact]
public async void Empty_Other()
{
await AsyncEnumerable.Timer(TimeSpan.FromMilliseconds(200))
.Buffer(AsyncEnumerable.Empty<int>(), 2)
.AssertResult();
}

[Fact]
public async void Time_And_Size()
{
var t = 100L;
if (Environment.GetEnvironmentVariable("CI") != null)
{
t = 1000L;
}

await
TestHelper.TimeSequence(
t, t,
3 * t, 3 * t, 3 * t,
7 * t, 7 * t, 7 * t, 7 * t
)
.Buffer(AsyncEnumerable.Interval(TimeSpan.FromMilliseconds(2 * t)), 3)
.AssertResult(
new List<long>(new[] { t, t }),
new List<long>(new[] { 3 * t, 3 * t, 3 * t }),
new List<long>(),
new List<long>(),
new List<long>(new[] { 7 * t, 7 * t, 7 * t }),
new List<long>(new[] { 7 * t })
);

}
}
}
2 changes: 1 addition & 1 deletion async-enumerable-dotnet-test/CombineLatestTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public async void Second_Many()
public async void Error()
{
await AsyncEnumerable.CombineLatest(v => v.Sum(),
AsyncEnumerable.Just<int>(1),
AsyncEnumerable.Just(1),
AsyncEnumerable.Just(2).ConcatWith(
AsyncEnumerable.Error<int>(new InvalidOperationException())
)
Expand Down
18 changes: 2 additions & 16 deletions async-enumerable-dotnet-test/ConcatMapTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,7 @@ await AsyncEnumerable.Range(1, 5)
public async void Async_Filter()
{
await AsyncEnumerable.Range(1, 10)
.ConcatMap(v =>
{
if (v % 2 == 0)
{
return AsyncEnumerable.Just(v);
}
return AsyncEnumerable.Empty<int>();
})
.ConcatMap(v => v % 2 == 0 ? AsyncEnumerable.Just(v) : AsyncEnumerable.Empty<int>())
.AssertResult(
2, 4, 6, 8, 10
);
Expand Down Expand Up @@ -71,14 +64,7 @@ await AsyncEnumerable.Range(1, 5)
public async void Enumerable_Filter()
{
await AsyncEnumerable.Range(1, 10)
.ConcatMap(v =>
{
if (v % 2 == 0)
{
return new[] { v };
}
return Enumerable.Empty<int>();
})
.ConcatMap(v => v % 2 == 0 ? new[] { v } : Enumerable.Empty<int>())
.AssertResult(
2, 4, 6, 8, 10
);
Expand Down
4 changes: 2 additions & 2 deletions async-enumerable-dotnet-test/CreateTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public async ValueTask Range()
[Fact]
public async void Range_Loop()
{
for (int j = 0; j < 1000; j++)
for (var j = 0; j < 1000; j++)
{
await Range();
}
Expand Down Expand Up @@ -77,7 +77,7 @@ public async ValueTask Take()
[Fact]
public async void Take_Loop()
{
for (int j = 0; j < 1000; j++)
for (var j = 0; j < 1000; j++)
{
await Take();
}
Expand Down
80 changes: 80 additions & 0 deletions async-enumerable-dotnet-test/SlimResumeTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright (c) David Karnok & Contributors.
// Licensed under the Apache 2.0 License.
// See LICENSE file in the project root for full license information.

using Xunit;
using async_enumerable_dotnet.impl;
using System.Threading.Tasks;
using System.Threading;

namespace async_enumerable_dotnet_test
{
public class SlimResumeTest
{
[Fact]
public async void ReadyUpfront()
{
var rsm = new SlimResume();
rsm.Signal();

await rsm;
}

[Fact]
public async void Completed()
{
var rsm = SlimResume.Completed;

await rsm;
}

[Fact]
public async void SignalLater()
{
var rsm = new SlimResume();

var t = Task.Delay(100)
.ContinueWith(t0 => rsm.Signal());

await rsm;

await t;
}


[Fact]
public async void Race()
{
for (var i = 0; i < 10_000; i++)
{
var rsm = new SlimResume();

var wip = 2;

var t1 = Task.Factory.StartNew(() =>
{
if (Interlocked.Decrement(ref wip) != 0)
{
while (Volatile.Read(ref wip) != 0) { }
}
rsm.Signal();
}, TaskCreationOptions.LongRunning);

var t2 = Task.Factory.StartNew(async () =>
{
if (Interlocked.Decrement(ref wip) != 0)
{
while (Volatile.Read(ref wip) != 0) { }
}
await rsm;
}, TaskCreationOptions.LongRunning);

await t1;

await t2;
}
}
}
}
8 changes: 8 additions & 0 deletions async-enumerable-dotnet-test/SwitchMapTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,13 @@ await AsyncEnumerable.Just(2)
.SwitchMap<int, int>(v => throw new InvalidOperationException())
.AssertFailure(typeof(InvalidOperationException));
}

[Fact]
public async void Nested()
{
await AsyncEnumerable.Just(AsyncEnumerable.Range(2, 5))
.Switch()
.AssertResult(2, 3, 4, 5, 6);
}
}
}
12 changes: 12 additions & 0 deletions async-enumerable-dotnet-test/TestHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -125,5 +125,17 @@ public static async ValueTask AssertFailure<T>(this IAsyncEnumerator<T> source,
await source.DisposeAsync();
}
}

/// <summary>
/// Emits the given numbers after the same milliseconds from
/// the start.
/// </summary>
/// <param name="timestamps">The params array of timestamps.</param>
/// <returns>The new IAsyncEnumerable sequence.</returns>
internal static IAsyncEnumerable<long> TimeSequence(params long[] timestamps)
{
return AsyncEnumerable.FromArray(timestamps)
.FlatMap(v => AsyncEnumerable.Timer(TimeSpan.FromMilliseconds(v)).Map(w => v));
}
}
}
53 changes: 46 additions & 7 deletions async-enumerable-dotnet/AsyncEnumerable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1567,15 +1567,54 @@ public static IAsyncEnumerable<TSource> Merge<TSource>(this IAsyncEnumerable<IAs
{
RequireNonNull(sources, nameof(sources));
RequireNonNull(combiner, nameof(combiner));
if (sources.Length == 0)
{
return Empty<TResult>();
}
if (sources.Length == 1)
switch (sources.Length)
{
return sources[0].Map(v => combiner(new[] { v }));
case 0:
return Empty<TResult>();
case 1:
return sources[0].Map(v => combiner(new[] { v }));
default:
return new CombineLatest<TSource, TResult>(sources, combiner);
}
return new CombineLatest<TSource, TResult>(sources, combiner);
}

/// <summary>
/// Buffers items from the source into Lists until the boundary
/// sequence signals an item (or the maxSize is reached) upon which
/// the current buffer is emitted and a new buffer is started.
/// </summary>
/// <typeparam name="TSource">The element type of the source.</typeparam>
/// <typeparam name="TOther">The element type of the boundary sequence.</typeparam>
/// <param name="source">The source to buffer.</param>
/// <param name="boundary">The sequence indicating the boundaries of the buffers.</param>
/// <param name="maxSize">The maximum number of items to buffer.</param>
/// <returns>The new IAsyncEnumerable sequence.</returns>
public static IAsyncEnumerable<IList<TSource>> Buffer<TSource, TOther>(this IAsyncEnumerable<TSource> source, IAsyncEnumerable<TOther> boundary, int maxSize = int.MaxValue)
{
return Buffer<TSource, TOther, IList<TSource>>(source, boundary, () => new List<TSource>(), maxSize);
}

/// <summary>
/// Buffers items from the source into custom collections until the boundary
/// sequence signals an item (or the maxSize is reached) upon which
/// the current collections is emitted and a new collection is started.
/// </summary>
/// <typeparam name="TSource">The element type of the source.</typeparam>
/// <typeparam name="TOther">The element type of the boundary sequence.</typeparam>
/// <typeparam name="TCollection">The resulting collection type.</typeparam>
/// <param name="source">The source to buffer.</param>
/// <param name="boundary">The sequence indicating the boundaries of the buffers.</param>
/// <param name="collectionSupplier">The custom collection supplier.</param>
/// <param name="maxSize">The maximum number of items to buffer.</param>
/// <returns>The new IAsyncEnumerable sequence.</returns>
public static IAsyncEnumerable<TCollection> Buffer<TSource, TOther, TCollection>(this IAsyncEnumerable<TSource> source, IAsyncEnumerable<TOther> boundary, Func<TCollection> collectionSupplier, int maxSize = int.MaxValue) where TCollection : ICollection<TSource>
{
RequireNonNull(source, nameof(source));
RequireNonNull(boundary, nameof(boundary));
RequireNonNull(collectionSupplier, nameof(collectionSupplier));
RequirePositive(maxSize, nameof(maxSize));
return new BufferBoundaryExact<TSource, TOther, TCollection>(source, boundary, collectionSupplier, maxSize);
}

}
}
Loading

0 comments on commit 66fce5a

Please sign in to comment.