Skip to content

Commit

Permalink
cleaned up IAsyncEnumerable Source to use local functions (#6045)
Browse files Browse the repository at this point in the history
* cleaned up `IAsyncEnumerable` Source to use local functions

Better to use `await` around `ValueTask` rather than converting to `Task` and doing `ContinueWith`

* formatted + fixed unit tests

* adding happy path spec

* Change logic to accept IAsyncEnumerable instead of IAsyncEnumerator, add second fast path

* Clean up unit test

* Move IAsyncEnumerator initialization to PreStart

* Add IAsyncEnumerator DisposeAsync

Co-authored-by: Gregorius Soedharmo <arkatufus@yahoo.com>
  • Loading branch information
Aaronontheweb and Arkatufus committed Jul 19, 2022
1 parent af513b0 commit 0c92aac
Show file tree
Hide file tree
Showing 2 changed files with 195 additions and 68 deletions.
173 changes: 128 additions & 45 deletions src/core/Akka.Streams.Tests/Dsl/AsyncEnumerableSpec.cs
Expand Up @@ -10,19 +10,16 @@
using System.Threading;
using System.Threading.Tasks;
using Akka.Pattern;
using Akka.Routing;
using Akka.Streams.Dsl;
using Akka.Streams.TestKit;
using Akka.TestKit;
using FluentAssertions;
using Nito.AsyncEx.Synchronous;
using Xunit;
using Xunit.Abstractions;
using System.Collections.Generic;
using Akka.Actor;
using Akka.Streams.Actors;
using Akka.Streams.Tests.Actor;
using Reactive.Streams;
using System.Runtime.CompilerServices;
using Akka.Util;
using FluentAssertions.Extensions;

namespace Akka.Streams.Tests.Dsl
{
Expand All @@ -31,24 +28,25 @@ public class AsyncEnumerableSpec : AkkaSpec
{
private ActorMaterializer Materializer { get; }
private ITestOutputHelper _helper;

public AsyncEnumerableSpec(ITestOutputHelper helper) : base(
AkkaSpecConfig.WithFallback(StreamTestDefaultMailbox.DefaultConfig),
helper)
AkkaSpecConfig.WithFallback(StreamTestDefaultMailbox.DefaultConfig),
helper)
{
_helper = helper;
var settings = ActorMaterializerSettings.Create(Sys).WithInputBuffer(2, 16);
Materializer = ActorMaterializer.Create(Sys, settings);
}


[Fact]
[Fact]
public async Task RunAsAsyncEnumerable_Uses_CancellationToken()
{
var input = Enumerable.Range(1, 6).ToList();

var cts = new CancellationTokenSource();
var token = cts.Token;

var asyncEnumerable = Source.From(input).RunAsAsyncEnumerable(Materializer);
var output = input.ToArray();
bool caught = false;
Expand All @@ -63,10 +61,10 @@ await foreach (var a in asyncEnumerable.WithCancellation(token))
{
caught = true;
}

caught.ShouldBeTrue();
}

[Fact]
public async Task RunAsAsyncEnumerable_must_return_an_IAsyncEnumerableT_from_a_Source()
{
Expand All @@ -78,7 +76,8 @@ await foreach (var a in asyncEnumerable)
(output[0] == a).ShouldBeTrue("Did not get elements in order!");
output = output.Skip(1).ToArray();
}
output.Length.ShouldBe(0,"Did not receive all elements!");

output.Length.ShouldBe(0, "Did not receive all elements!");
}

[Fact]
Expand All @@ -92,15 +91,17 @@ await foreach (var a in asyncEnumerable)
(output[0] == a).ShouldBeTrue("Did not get elements in order!");
output = output.Skip(1).ToArray();
}
output.Length.ShouldBe(0,"Did not receive all elements!");


output.Length.ShouldBe(0, "Did not receive all elements!");

output = input.ToArray();
await foreach (var a in asyncEnumerable)
{
(output[0] == a).ShouldBeTrue("Did not get elements in order!");
output = output.Skip(1).ToArray();
}
output.Length.ShouldBe(0,"Did not receive all elements in second enumeration!!");

output.Length.ShouldBe(0, "Did not receive all elements in second enumeration!!");
}


Expand All @@ -110,8 +111,8 @@ public async Task RunAsAsyncEnumerable_Throws_on_Abrupt_Stream_termination()
var materializer = ActorMaterializer.Create(Sys);
var probe = this.CreatePublisherProbe<int>();
var task = Source.FromPublisher(probe).RunAsAsyncEnumerable(materializer);
var a = Task.Run( async () =>

var a = Task.Run(async () =>
{
await foreach (var notused in task)
{
Expand All @@ -122,22 +123,23 @@ await foreach (var notused in task)
//we want to send messages so we aren't just waiting forever.
probe.SendNext(1);
probe.SendNext(2);
bool thrown = false;
var thrown = false;
try
{
await a;
}
catch (StreamDetachedException e)
{
thrown = true;
}
catch (StreamDetachedException e)
{
thrown = true;
}
catch (AbruptTerminationException e)
{
thrown = true;
}

thrown.ShouldBeTrue();
}

[Fact]
public async Task RunAsAsyncEnumerable_Throws_if_materializer_gone_before_Enumeration()
{
Expand All @@ -150,47 +152,128 @@ async Task ShouldThrow()
{
await foreach (var a in task)
{

}
}

await Assert.ThrowsAsync<IllegalStateException>(ShouldThrow);
}

[Fact]
public void AsyncEnumerableSource_Must_Complete_Immediately_With_No_elements_When_An_Empty_IAsyncEnumerable_Is_Passed_In()
[Fact]
public async Task
AsyncEnumerableSource_Must_Complete_Immediately_With_No_elements_When_An_Empty_IAsyncEnumerable_Is_Passed_In()
{
Func<IAsyncEnumerable<int>> range = () =>
{
return RangeAsync(1, 100);
};
IAsyncEnumerable<int> Range() => RangeAsync(0, 0);
var subscriber = this.CreateManualSubscriberProbe<int>();

Source.From(range)
Source.From(Range)
.RunWith(Sink.FromSubscriber(subscriber), Materializer);

var subscription = subscriber.ExpectSubscription();
var subscription = await subscriber.ExpectSubscriptionAsync();
subscription.Request(100);
for (int i = 1; i <= 20; i++)
await subscriber.ExpectCompleteAsync();
}

[Fact]
public async Task AsyncEnumerableSource_Must_Process_All_Elements()
{
IAsyncEnumerable<int> Range() => RangeAsync(0, 100);
var subscriber = this.CreateManualSubscriberProbe<int>();

Source.From(Range)
.RunWith(Sink.FromSubscriber(subscriber), Materializer);

var subscription = await subscriber.ExpectSubscriptionAsync();
subscription.Request(101);

await subscriber.ExpectNextNAsync(Enumerable.Range(0, 100));

await subscriber.ExpectCompleteAsync();
}

[Fact]
public async Task AsyncEnumerableSource_Must_Process_Source_That_Immediately_Throws()
{
IAsyncEnumerable<int> Range() => ThrowingRangeAsync(0, 100, 50);
var subscriber = this.CreateManualSubscriberProbe<int>();

Source.From(Range)
.RunWith(Sink.FromSubscriber(subscriber), Materializer);

var subscription = await subscriber.ExpectSubscriptionAsync();
subscription.Request(101);

await subscriber.ExpectNextNAsync(Enumerable.Range(0, 50));

var exception = await subscriber.ExpectErrorAsync();

// Exception should be automatically unrolled, this SHOULD NOT be AggregateException
exception.Should().BeOfType<TestException>();
exception.Message.Should().Be("BOOM!");
}

[Fact]
public async Task AsyncEnumerableSource_Must_Cancel_Running_Source_If_Downstream_Completes()
{
var latch = new AtomicBoolean();
IAsyncEnumerable<int> Range() => ProbeableRangeAsync(0, 100, latch);
var subscriber = this.CreateManualSubscriberProbe<int>();

Source.From(Range)
.RunWith(Sink.FromSubscriber(subscriber), Materializer);

var subscription = await subscriber.ExpectSubscriptionAsync();
subscription.Request(50);
await subscriber.ExpectNextNAsync(Enumerable.Range(0, 50));
subscription.Cancel();

// The cancellation token inside the IAsyncEnumerable should be cancelled
await WithinAsync(3.Seconds(), async () => latch.Value);
}

private static async IAsyncEnumerable<int> RangeAsync(int start, int count,
[EnumeratorCancellation] CancellationToken token = default)
{
foreach (var i in Enumerable.Range(start, count))
{
var next = subscriber.ExpectNext(i);
_helper.WriteLine(i.ToString());
await Task.Delay(10, token);
if(token.IsCancellationRequested)
yield break;
yield return i;
}

//subscriber.ExpectComplete();
}

private static async IAsyncEnumerable<int> ThrowingRangeAsync(int start, int count, int throwAt,
[EnumeratorCancellation] CancellationToken token = default)
{
foreach (var i in Enumerable.Range(start, count))
{
if(token.IsCancellationRequested)
yield break;

static async IAsyncEnumerable<int> RangeAsync(int start, int count)
if (i == throwAt)
throw new TestException("BOOM!");

yield return i;
}
}

private static async IAsyncEnumerable<int> ProbeableRangeAsync(int start, int count, AtomicBoolean latch,
[EnumeratorCancellation] CancellationToken token = default)
{
for (var i = 0; i < count; i++)
token.Register(() =>
{
await Task.Delay(i);
yield return start + i;
latch.GetAndSet(true);
});
foreach (var i in Enumerable.Range(start, count))
{
if(token.IsCancellationRequested)
yield break;

yield return i;
}
}

}
#else
#endif

}
}

0 comments on commit 0c92aac

Please sign in to comment.