Skip to content

Commit

Permalink
Cherry-picked from 0c92aac (#6048)
Browse files Browse the repository at this point in the history
Co-authored-by: Aaron Stannard <aaron@petabridge.com>
  • Loading branch information
Arkatufus and Aaronontheweb committed Jul 22, 2022
1 parent 7293c6e commit fc777f3
Show file tree
Hide file tree
Showing 2 changed files with 182 additions and 55 deletions.
149 changes: 116 additions & 33 deletions src/core/Akka.Streams.Tests/Dsl/AsyncEnumerableSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,10 @@
using System.Threading;
using System.Threading.Tasks;
using Akka.Pattern;
using Akka.Routing;
using Akka.Streams.Dsl;
using Akka.Streams.TestKit;
using Akka.TestKit;
using FluentAssertions;
using Nito.AsyncEx.Synchronous;
using Xunit;
using Xunit.Abstractions;
using System.Collections.Generic;
Expand All @@ -24,6 +22,9 @@
using Akka.Streams.TestKit.Tests;
using Akka.Streams.Tests.Actor;
using Reactive.Streams;
using System.Runtime.CompilerServices;
using Akka.Util;
using FluentAssertions.Extensions;

namespace Akka.Streams.Tests.Dsl
{
Expand All @@ -33,16 +34,16 @@ public class AsyncEnumerableSpec : AkkaSpec
private ActorMaterializer Materializer { get; }
private ITestOutputHelper _helper;
public AsyncEnumerableSpec(ITestOutputHelper helper) : base(
AkkaSpecConfig.WithFallback(StreamTestDefaultMailbox.DefaultConfig),
helper)
AkkaSpecConfig.WithFallback(StreamTestDefaultMailbox.DefaultConfig),
helper)
{
_helper = helper;
var settings = ActorMaterializerSettings.Create(Sys).WithInputBuffer(2, 16);
Materializer = ActorMaterializer.Create(Sys, settings);
}


[Fact]
[Fact]
public async Task RunAsAsyncEnumerable_Uses_CancellationToken()
{
var input = Enumerable.Range(1, 6).ToList();
Expand All @@ -67,7 +68,7 @@ public async Task RunAsAsyncEnumerable_Uses_CancellationToken()

caught.ShouldBeTrue();
}

[Fact]
public async Task RunAsAsyncEnumerable_must_return_an_IAsyncEnumerableT_from_a_Source()
{
Expand All @@ -79,7 +80,7 @@ public async Task RunAsAsyncEnumerable_must_return_an_IAsyncEnumerableT_from_a_S
(output[0] == a).ShouldBeTrue("Did not get elements in order!");
output = output.Skip(1).ToArray();
}
output.Length.ShouldBe(0,"Did not receive all elements!");
output.Length.ShouldBe(0, "Did not receive all elements!");
}

[Fact]
Expand All @@ -93,15 +94,15 @@ public async Task RunAsAsyncEnumerable_must_allow_multiple_enumerations()
(output[0] == a).ShouldBeTrue("Did not get elements in order!");
output = output.Skip(1).ToArray();
}
output.Length.ShouldBe(0,"Did not receive all elements!");
output.Length.ShouldBe(0, "Did not receive all elements!");

output = input.ToArray();
await foreach (var a in asyncEnumerable)
{
(output[0] == a).ShouldBeTrue("Did not get elements in order!");
output = output.Skip(1).ToArray();
}
output.Length.ShouldBe(0,"Did not receive all elements in second enumeration!!");
output.Length.ShouldBe(0, "Did not receive all elements in second enumeration!!");
}


Expand All @@ -112,7 +113,7 @@ public async Task RunAsAsyncEnumerable_Throws_on_Abrupt_Stream_termination()
var probe = this.CreatePublisherProbe<int>();
var task = Source.FromPublisher(probe).RunAsAsyncEnumerable(materializer);

var a = Task.Run( async () =>
var a = Task.Run(async () =>
{
await foreach (var notused in task)
{
Expand All @@ -123,19 +124,20 @@ public async Task RunAsAsyncEnumerable_Throws_on_Abrupt_Stream_termination()
//we want to send messages so we aren't just waiting forever.
probe.SendNext(1);
probe.SendNext(2);
bool thrown = false;
var thrown = false;
try
{
await a;
}
catch (StreamDetachedException e)
{
thrown = true;
}
catch (StreamDetachedException e)
{
thrown = true;
}
catch (AbruptTerminationException e)
{
thrown = true;
}

thrown.ShouldBeTrue();
}

Expand All @@ -151,47 +153,128 @@ async Task ShouldThrow()
{
await foreach (var a in task)
{

}
}

await Assert.ThrowsAsync<IllegalStateException>(ShouldThrow);
}

[Fact]
public void AsyncEnumerableSource_Must_Complete_Immediately_With_No_elements_When_An_Empty_IAsyncEnumerable_Is_Passed_In()
[Fact]
public void
AsyncEnumerableSource_Must_Complete_Immediately_With_No_elements_When_An_Empty_IAsyncEnumerable_Is_Passed_In()
{
Func<IAsyncEnumerable<int>> range = () =>
{
return RangeAsync(1, 100);
};
IAsyncEnumerable<int> Range() => RangeAsync(0, 0);
var subscriber = this.CreateManualSubscriberProbe<int>();

Source.From(range)
Source.From(Range)
.RunWith(Sink.FromSubscriber(subscriber), Materializer);

var subscription = subscriber.ExpectSubscription();
subscription.Request(100);
for (int i = 1; i <= 20; i++)
subscriber.ExpectComplete();
}

[Fact]
public void AsyncEnumerableSource_Must_Process_All_Elements()
{
IAsyncEnumerable<int> Range() => RangeAsync(0, 100);
var subscriber = this.CreateManualSubscriberProbe<int>();

Source.From(Range)
.RunWith(Sink.FromSubscriber(subscriber), Materializer);

var subscription = subscriber.ExpectSubscription();
subscription.Request(101);

subscriber.ExpectNextN(Enumerable.Range(0, 100));

subscriber.ExpectComplete();
}

[Fact]
public void AsyncEnumerableSource_Must_Process_Source_That_Immediately_Throws()
{
IAsyncEnumerable<int> Range() => ThrowingRangeAsync(0, 100, 50);
var subscriber = this.CreateManualSubscriberProbe<int>();

Source.From(Range)
.RunWith(Sink.FromSubscriber(subscriber), Materializer);

var subscription = subscriber.ExpectSubscription();
subscription.Request(101);

subscriber.ExpectNextN(Enumerable.Range(0, 50));

var exception = subscriber.ExpectError();

// Exception should be automatically unrolled, this SHOULD NOT be AggregateException
exception.Should().BeOfType<TestException>();
exception.Message.Should().Be("BOOM!");
}

[Fact]
public async Task AsyncEnumerableSource_Must_Cancel_Running_Source_If_Downstream_Completes()
{
var latch = new AtomicBoolean();
IAsyncEnumerable<int> Range() => ProbeableRangeAsync(0, 100, latch);
var subscriber = this.CreateManualSubscriberProbe<int>();

Source.From(Range)
.RunWith(Sink.FromSubscriber(subscriber), Materializer);

var subscription = subscriber.ExpectSubscription();
subscription.Request(50);
subscriber.ExpectNextN(Enumerable.Range(0, 50));
subscription.Cancel();

// The cancellation token inside the IAsyncEnumerable should be cancelled
await WithinAsync(3.Seconds(), async () => latch.Value);
}

private static async IAsyncEnumerable<int> RangeAsync(int start, int count,
[EnumeratorCancellation] CancellationToken token = default)
{
foreach (var i in Enumerable.Range(start, count))
{
var next = subscriber.ExpectNext(i);
_helper.WriteLine(i.ToString());
await Task.Delay(10, token);
if(token.IsCancellationRequested)
yield break;
yield return i;
}
}

//subscriber.ExpectComplete();
private static async IAsyncEnumerable<int> ThrowingRangeAsync(int start, int count, int throwAt,
[EnumeratorCancellation] CancellationToken token = default)
{
foreach (var i in Enumerable.Range(start, count))
{
if(token.IsCancellationRequested)
yield break;

if (i == throwAt)
throw new TestException("BOOM!");

yield return i;
}
}

static async IAsyncEnumerable<int> RangeAsync(int start, int count)
private static async IAsyncEnumerable<int> ProbeableRangeAsync(int start, int count, AtomicBoolean latch,
[EnumeratorCancellation] CancellationToken token = default)
{
for (var i = 0; i < count; i++)
token.Register(() =>
{
latch.GetAndSet(true);
});
foreach (var i in Enumerable.Range(start, count))
{
await Task.Delay(i);
yield return start + i;
if(token.IsCancellationRequested)
yield break;

yield return i;
}
}

}
#else
#endif

}
}
Loading

0 comments on commit fc777f3

Please sign in to comment.