Skip to content

Commit 0c92aac

Browse files
cleaned up IAsyncEnumerable Source to use local functions (#6045)
* cleaned up `IAsyncEnumerable` Source to use local functions Better to use `await` around `ValueTask` rather than converting to `Task` and doing `ContinueWith` * formatted + fixed unit tests * adding happy path spec * Change logic to accept IAsyncEnumerable instead of IAsyncEnumerator, add second fast path * Clean up unit test * Move IAsyncEnumerator initialization to PreStart * Add IAsyncEnumerator DisposeAsync Co-authored-by: Gregorius Soedharmo <arkatufus@yahoo.com>
1 parent af513b0 commit 0c92aac

File tree

2 files changed

+195
-68
lines changed

2 files changed

+195
-68
lines changed

src/core/Akka.Streams.Tests/Dsl/AsyncEnumerableSpec.cs

Lines changed: 128 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,16 @@
1010
using System.Threading;
1111
using System.Threading.Tasks;
1212
using Akka.Pattern;
13-
using Akka.Routing;
1413
using Akka.Streams.Dsl;
1514
using Akka.Streams.TestKit;
1615
using Akka.TestKit;
1716
using FluentAssertions;
18-
using Nito.AsyncEx.Synchronous;
1917
using Xunit;
2018
using Xunit.Abstractions;
2119
using System.Collections.Generic;
22-
using Akka.Actor;
23-
using Akka.Streams.Actors;
24-
using Akka.Streams.Tests.Actor;
25-
using Reactive.Streams;
20+
using System.Runtime.CompilerServices;
21+
using Akka.Util;
22+
using FluentAssertions.Extensions;
2623

2724
namespace Akka.Streams.Tests.Dsl
2825
{
@@ -31,24 +28,25 @@ public class AsyncEnumerableSpec : AkkaSpec
3128
{
3229
private ActorMaterializer Materializer { get; }
3330
private ITestOutputHelper _helper;
31+
3432
public AsyncEnumerableSpec(ITestOutputHelper helper) : base(
35-
AkkaSpecConfig.WithFallback(StreamTestDefaultMailbox.DefaultConfig),
36-
helper)
33+
AkkaSpecConfig.WithFallback(StreamTestDefaultMailbox.DefaultConfig),
34+
helper)
3735
{
3836
_helper = helper;
3937
var settings = ActorMaterializerSettings.Create(Sys).WithInputBuffer(2, 16);
4038
Materializer = ActorMaterializer.Create(Sys, settings);
4139
}
4240

4341

44-
[Fact]
42+
[Fact]
4543
public async Task RunAsAsyncEnumerable_Uses_CancellationToken()
4644
{
4745
var input = Enumerable.Range(1, 6).ToList();
4846

4947
var cts = new CancellationTokenSource();
5048
var token = cts.Token;
51-
49+
5250
var asyncEnumerable = Source.From(input).RunAsAsyncEnumerable(Materializer);
5351
var output = input.ToArray();
5452
bool caught = false;
@@ -63,10 +61,10 @@ public async Task RunAsAsyncEnumerable_Uses_CancellationToken()
6361
{
6462
caught = true;
6563
}
66-
64+
6765
caught.ShouldBeTrue();
6866
}
69-
67+
7068
[Fact]
7169
public async Task RunAsAsyncEnumerable_must_return_an_IAsyncEnumerableT_from_a_Source()
7270
{
@@ -78,7 +76,8 @@ public async Task RunAsAsyncEnumerable_must_return_an_IAsyncEnumerableT_from_a_S
7876
(output[0] == a).ShouldBeTrue("Did not get elements in order!");
7977
output = output.Skip(1).ToArray();
8078
}
81-
output.Length.ShouldBe(0,"Did not receive all elements!");
79+
80+
output.Length.ShouldBe(0, "Did not receive all elements!");
8281
}
8382

8483
[Fact]
@@ -92,15 +91,17 @@ public async Task RunAsAsyncEnumerable_must_allow_multiple_enumerations()
9291
(output[0] == a).ShouldBeTrue("Did not get elements in order!");
9392
output = output.Skip(1).ToArray();
9493
}
95-
output.Length.ShouldBe(0,"Did not receive all elements!");
96-
94+
95+
output.Length.ShouldBe(0, "Did not receive all elements!");
96+
9797
output = input.ToArray();
9898
await foreach (var a in asyncEnumerable)
9999
{
100100
(output[0] == a).ShouldBeTrue("Did not get elements in order!");
101101
output = output.Skip(1).ToArray();
102102
}
103-
output.Length.ShouldBe(0,"Did not receive all elements in second enumeration!!");
103+
104+
output.Length.ShouldBe(0, "Did not receive all elements in second enumeration!!");
104105
}
105106

106107

@@ -110,8 +111,8 @@ public async Task RunAsAsyncEnumerable_Throws_on_Abrupt_Stream_termination()
110111
var materializer = ActorMaterializer.Create(Sys);
111112
var probe = this.CreatePublisherProbe<int>();
112113
var task = Source.FromPublisher(probe).RunAsAsyncEnumerable(materializer);
113-
114-
var a = Task.Run( async () =>
114+
115+
var a = Task.Run(async () =>
115116
{
116117
await foreach (var notused in task)
117118
{
@@ -122,22 +123,23 @@ public async Task RunAsAsyncEnumerable_Throws_on_Abrupt_Stream_termination()
122123
//we want to send messages so we aren't just waiting forever.
123124
probe.SendNext(1);
124125
probe.SendNext(2);
125-
bool thrown = false;
126+
var thrown = false;
126127
try
127128
{
128129
await a;
129130
}
130-
catch (StreamDetachedException e)
131-
{
132-
thrown = true;
133-
}
131+
catch (StreamDetachedException e)
132+
{
133+
thrown = true;
134+
}
134135
catch (AbruptTerminationException e)
135136
{
136137
thrown = true;
137138
}
139+
138140
thrown.ShouldBeTrue();
139141
}
140-
142+
141143
[Fact]
142144
public async Task RunAsAsyncEnumerable_Throws_if_materializer_gone_before_Enumeration()
143145
{
@@ -150,47 +152,128 @@ async Task ShouldThrow()
150152
{
151153
await foreach (var a in task)
152154
{
153-
154155
}
155156
}
156-
157+
157158
await Assert.ThrowsAsync<IllegalStateException>(ShouldThrow);
158159
}
159160

160-
[Fact]
161-
public void AsyncEnumerableSource_Must_Complete_Immediately_With_No_elements_When_An_Empty_IAsyncEnumerable_Is_Passed_In()
161+
[Fact]
162+
public async Task
163+
AsyncEnumerableSource_Must_Complete_Immediately_With_No_elements_When_An_Empty_IAsyncEnumerable_Is_Passed_In()
162164
{
163-
Func<IAsyncEnumerable<int>> range = () =>
164-
{
165-
return RangeAsync(1, 100);
166-
};
165+
IAsyncEnumerable<int> Range() => RangeAsync(0, 0);
167166
var subscriber = this.CreateManualSubscriberProbe<int>();
168167

169-
Source.From(range)
168+
Source.From(Range)
170169
.RunWith(Sink.FromSubscriber(subscriber), Materializer);
171170

172-
var subscription = subscriber.ExpectSubscription();
171+
var subscription = await subscriber.ExpectSubscriptionAsync();
173172
subscription.Request(100);
174-
for (int i = 1; i <= 20; i++)
173+
await subscriber.ExpectCompleteAsync();
174+
}
175+
176+
[Fact]
177+
public async Task AsyncEnumerableSource_Must_Process_All_Elements()
178+
{
179+
IAsyncEnumerable<int> Range() => RangeAsync(0, 100);
180+
var subscriber = this.CreateManualSubscriberProbe<int>();
181+
182+
Source.From(Range)
183+
.RunWith(Sink.FromSubscriber(subscriber), Materializer);
184+
185+
var subscription = await subscriber.ExpectSubscriptionAsync();
186+
subscription.Request(101);
187+
188+
await subscriber.ExpectNextNAsync(Enumerable.Range(0, 100));
189+
190+
await subscriber.ExpectCompleteAsync();
191+
}
192+
193+
[Fact]
194+
public async Task AsyncEnumerableSource_Must_Process_Source_That_Immediately_Throws()
195+
{
196+
IAsyncEnumerable<int> Range() => ThrowingRangeAsync(0, 100, 50);
197+
var subscriber = this.CreateManualSubscriberProbe<int>();
198+
199+
Source.From(Range)
200+
.RunWith(Sink.FromSubscriber(subscriber), Materializer);
201+
202+
var subscription = await subscriber.ExpectSubscriptionAsync();
203+
subscription.Request(101);
204+
205+
await subscriber.ExpectNextNAsync(Enumerable.Range(0, 50));
206+
207+
var exception = await subscriber.ExpectErrorAsync();
208+
209+
// Exception should be automatically unrolled, this SHOULD NOT be AggregateException
210+
exception.Should().BeOfType<TestException>();
211+
exception.Message.Should().Be("BOOM!");
212+
}
213+
214+
[Fact]
215+
public async Task AsyncEnumerableSource_Must_Cancel_Running_Source_If_Downstream_Completes()
216+
{
217+
var latch = new AtomicBoolean();
218+
IAsyncEnumerable<int> Range() => ProbeableRangeAsync(0, 100, latch);
219+
var subscriber = this.CreateManualSubscriberProbe<int>();
220+
221+
Source.From(Range)
222+
.RunWith(Sink.FromSubscriber(subscriber), Materializer);
223+
224+
var subscription = await subscriber.ExpectSubscriptionAsync();
225+
subscription.Request(50);
226+
await subscriber.ExpectNextNAsync(Enumerable.Range(0, 50));
227+
subscription.Cancel();
228+
229+
// The cancellation token inside the IAsyncEnumerable should be cancelled
230+
await WithinAsync(3.Seconds(), async () => latch.Value);
231+
}
232+
233+
private static async IAsyncEnumerable<int> RangeAsync(int start, int count,
234+
[EnumeratorCancellation] CancellationToken token = default)
235+
{
236+
foreach (var i in Enumerable.Range(start, count))
175237
{
176-
var next = subscriber.ExpectNext(i);
177-
_helper.WriteLine(i.ToString());
238+
await Task.Delay(10, token);
239+
if(token.IsCancellationRequested)
240+
yield break;
241+
yield return i;
178242
}
179-
180-
//subscriber.ExpectComplete();
181243
}
244+
245+
private static async IAsyncEnumerable<int> ThrowingRangeAsync(int start, int count, int throwAt,
246+
[EnumeratorCancellation] CancellationToken token = default)
247+
{
248+
foreach (var i in Enumerable.Range(start, count))
249+
{
250+
if(token.IsCancellationRequested)
251+
yield break;
182252

183-
static async IAsyncEnumerable<int> RangeAsync(int start, int count)
253+
if (i == throwAt)
254+
throw new TestException("BOOM!");
255+
256+
yield return i;
257+
}
258+
}
259+
260+
private static async IAsyncEnumerable<int> ProbeableRangeAsync(int start, int count, AtomicBoolean latch,
261+
[EnumeratorCancellation] CancellationToken token = default)
184262
{
185-
for (var i = 0; i < count; i++)
263+
token.Register(() =>
186264
{
187-
await Task.Delay(i);
188-
yield return start + i;
265+
latch.GetAndSet(true);
266+
});
267+
foreach (var i in Enumerable.Range(start, count))
268+
{
269+
if(token.IsCancellationRequested)
270+
yield break;
271+
272+
yield return i;
189273
}
190274
}
191275

192276
}
193277
#else
194278
#endif
195-
196-
}
279+
}

0 commit comments

Comments
 (0)