Skip to content

Commit

Permalink
Allow Streams to be consumed as IAsyncEnumerable (#4742)
Browse files Browse the repository at this point in the history
* Add `.RunAsAsyncEnumerable` and `.RunAsAsyncEnumerableBuffer` to Akka Streams Source DSL

* Add tests for CancellationToken support, use Cancellationtoken in rest of stream flow

Co-authored-by: Aaron Stannard <aaron@petabridge.com>
  • Loading branch information
to11mtm and Aaronontheweb committed Dec 20, 2021
1 parent a0f4705 commit b77ed9a
Show file tree
Hide file tree
Showing 5 changed files with 295 additions and 0 deletions.
17 changes: 17 additions & 0 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1919,6 +1919,15 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> Sum<TIn>(System.Func<TIn, TIn, TIn> reduce) { }
public static Akka.Streams.Dsl.Sink<TIn, TMat> Wrap<TIn, TMat>(Akka.Streams.IGraph<Akka.Streams.SinkShape<TIn>, TMat> graph) { }
}
public sealed class SinkQueueAsyncEnumerator<T> : System.Collections.Generic.IAsyncEnumerator<T>, System.IAsyncDisposable
{
public SinkQueueAsyncEnumerator([System.Runtime.CompilerServices.TupleElementNamesAttribute(new string[] {
"killSwitch",
"sinkQueue"})] System.ValueTuple<Akka.Streams.UniqueKillSwitch, Akka.Streams.ISinkQueue<T>> queueAndSwitch, System.Threading.CancellationToken token) { }
public T Current { get; }
public System.Threading.Tasks.ValueTask DisposeAsync() { }
public System.Threading.Tasks.ValueTask<bool> MoveNextAsync() { }
}
public sealed class Sink<TIn, TMat> : Akka.Streams.IGraph<Akka.Streams.SinkShape<TIn>>, Akka.Streams.IGraph<Akka.Streams.SinkShape<TIn>, TMat>
{
public Sink(Akka.Streams.Implementation.IModule module) { }
Expand Down Expand Up @@ -2108,6 +2117,8 @@ namespace Akka.Streams.Dsl
public System.ValueTuple<TMat, Akka.Streams.Dsl.Source<TOut, Akka.NotUsed>> PreMaterialize(Akka.Streams.IMaterializer materializer) { }
public System.Threading.Tasks.Task<TOut2> RunAggregate<TOut2>(TOut2 zero, System.Func<TOut2, TOut, TOut2> aggregate, Akka.Streams.IMaterializer materializer) { }
public System.Threading.Tasks.Task<TOut2> RunAggregateAsync<TOut2>(TOut2 zero, System.Func<TOut2, TOut, System.Threading.Tasks.Task<TOut2>> aggregate, Akka.Streams.IMaterializer materializer) { }
public System.Collections.Generic.IAsyncEnumerable<TOut> RunAsAsyncEnumerable(Akka.Streams.IMaterializer materializer) { }
public System.Collections.Generic.IAsyncEnumerable<TOut> RunAsAsyncEnumerableBuffer(Akka.Streams.IMaterializer materializer, int minBuffer = 4, int maxBuffer = 16) { }
public System.Threading.Tasks.Task RunForeach(System.Action<TOut> action, Akka.Streams.IMaterializer materializer) { }
public System.Threading.Tasks.Task<TOut> RunSum(System.Func<TOut, TOut, TOut> reduce, Akka.Streams.IMaterializer materializer) { }
public TMat2 RunWith<TMat2>(Akka.Streams.IGraph<Akka.Streams.SinkShape<TOut>, TMat2> sink, Akka.Streams.IMaterializer materializer) { }
Expand Down Expand Up @@ -2150,6 +2161,12 @@ namespace Akka.Streams.Dsl
[Akka.Annotations.ApiMayChangeAttribute()]
public static Akka.Streams.Dsl.Sink<T, System.Threading.Tasks.Task<Akka.Streams.ISourceRef<T>>> SourceRef<T>() { }
}
public sealed class StreamsAsyncEnumerableRerunnable<T, TMat> : System.Collections.Generic.IAsyncEnumerable<T>
{
public StreamsAsyncEnumerableRerunnable(Akka.Streams.Dsl.Source<T, TMat> source, Akka.Streams.IMaterializer materializer) { }
public StreamsAsyncEnumerableRerunnable(Akka.Streams.Dsl.Source<T, TMat> source, Akka.Streams.IMaterializer materializer, int minBuf, int maxBuf) { }
public System.Collections.Generic.IAsyncEnumerator<T> GetAsyncEnumerator(System.Threading.CancellationToken cancellationToken) { }
}
public class static SubFlowOperations
{
public static Akka.Streams.Dsl.SubFlow<TOut2, TMat, TClosed> Aggregate<TOut1, TOut2, TMat, TClosed>(this Akka.Streams.Dsl.SubFlow<TOut1, TMat, TClosed> flow, TOut2 zero, System.Func<TOut2, TOut1, TOut2> fold) { }
Expand Down
151 changes: 151 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/AsyncEnumerableSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
//-----------------------------------------------------------------------
// <copyright file="AsyncEnumerableSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2021 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka.Pattern;
using Akka.Routing;
using Akka.Streams.Dsl;
using Akka.Streams.TestKit;
using Akka.TestKit;
using FluentAssertions;
using Nito.AsyncEx.Synchronous;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Streams.Tests.Dsl
{
#if NETCOREAPP
public class AsyncEnumerableSpec : AkkaSpec
{
private ActorMaterializer Materializer { get; }

public AsyncEnumerableSpec(ITestOutputHelper helper) : base(helper)
{
var settings = ActorMaterializerSettings.Create(Sys).WithInputBuffer(2, 16);
Materializer = ActorMaterializer.Create(Sys, settings);
}

[Fact] public async Task RunAsAsyncEnumerable_Uses_CancellationToken()
{
var input = Enumerable.Range(1, 6).ToList();

var cts = new CancellationTokenSource();
var token = cts.Token;

var asyncEnumerable = Source.From(input).RunAsAsyncEnumerable(Materializer);
var output = input.ToArray();
bool caught = false;
try
{
await foreach (var a in asyncEnumerable.WithCancellation(token))
{
cts.Cancel();
}
}
catch (OperationCanceledException e)
{
caught = true;
}

caught.ShouldBeTrue();
}

[Fact]
public async Task RunAsAsyncEnumerable_must_return_an_IAsyncEnumerableT_from_a_Source()
{
var input = Enumerable.Range(1, 6).ToList();
var asyncEnumerable = Source.From(input).RunAsAsyncEnumerable(Materializer);
var output = input.ToArray();
await foreach (var a in asyncEnumerable)
{
(output[0] == a).ShouldBeTrue("Did not get elements in order!");
output = output.Skip(1).ToArray();
}
output.Length.ShouldBe(0,"Did not receive all elements!");
}

[Fact]
public async Task RunAsAsyncEnumerable_must_allow_multiple_enumerations()
{
var input = Enumerable.Range(1, 6).ToList();
var asyncEnumerable = Source.From(input).RunAsAsyncEnumerable(Materializer);
var output = input.ToArray();
await foreach (var a in asyncEnumerable)
{
(output[0] == a).ShouldBeTrue("Did not get elements in order!");
output = output.Skip(1).ToArray();
}
output.Length.ShouldBe(0,"Did not receive all elements!");

output = input.ToArray();
await foreach (var a in asyncEnumerable)
{
(output[0] == a).ShouldBeTrue("Did not get elements in order!");
output = output.Skip(1).ToArray();
}
output.Length.ShouldBe(0,"Did not receive all elements in second enumeration!!");
}


[Fact]
public async Task RunAsAsyncEnumerable_Throws_on_Abrupt_Stream_termination()
{
var materializer = ActorMaterializer.Create(Sys);
var probe = this.CreatePublisherProbe<int>();
var task = Source.FromPublisher(probe).RunAsAsyncEnumerable(materializer);

var a = Task.Run( async () =>
{
await foreach (var notused in task)
{
materializer.Shutdown();
}
});
//since we are collapsing the stream inside the read
//we want to send messages so we aren't just waiting forever.
probe.SendNext(1);
probe.SendNext(2);
bool thrown = false;
try
{
await a;
}
catch (AbruptTerminationException e)
{
thrown = true;
}
thrown.ShouldBeTrue();
}

[Fact]
public async Task RunAsAsyncEnumerable_Throws_if_materializer_gone_before_Enumeration()
{
var materializer = ActorMaterializer.Create(Sys);
var probe = this.CreatePublisherProbe<int>();
var task = Source.FromPublisher(probe).RunAsAsyncEnumerable(materializer);
materializer.Shutdown();

async Task ShouldThrow()
{
await foreach (var a in task)
{

}
}

await Assert.ThrowsAsync<IllegalStateException>(ShouldThrow);
}


}

#else
#endif
}
1 change: 1 addition & 0 deletions src/core/Akka.Streams/Akka.Streams.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="Google.Protobuf" Version="$(ProtobufVersion)" />
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="5.0.0" />
<PackageReference Include="Reactive.Streams" Version="1.0.2" />
</ItemGroup>
<PropertyGroup Condition=" '$(Configuration)' == 'Release' ">
Expand Down
29 changes: 29 additions & 0 deletions src/core/Akka.Streams/Dsl/Source.cs
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,35 @@ public Task<TOut> RunSum(Func<TOut, TOut, TOut> reduce, IMaterializer materializ
public Task RunForeach(Action<TOut> action, IMaterializer materializer)
=> RunWith(Sink.ForEach(action), materializer);

/// <summary>
/// Shortcut for running this <see cref="Source{TOut,TMat}"/> as an <see cref="IAsyncEnumerable{TOut}"/>.
/// The given enumerable is re-runnable but will cause a re-materialization of the stream each time.
/// This is implemented using a SourceQueue and will buffer elements based on configured stream defaults.
/// For custom buffers Please use <see cref="RunAsAsyncEnumerableBuffer"/>
/// </summary>
/// <param name="materializer">The materializer to use for each enumeration</param>
/// <returns>A lazy <see cref="IAsyncEnumerable{T}"/> that will run each time it is enumerated.</returns>
public IAsyncEnumerable<TOut> RunAsAsyncEnumerable(
IMaterializer materializer) =>
new StreamsAsyncEnumerableRerunnable<TOut,TMat>(this, materializer);

/// <summary>
/// Shortcut for running this <see cref="Source{TOut,TMat}"/> as an <see cref="IAsyncEnumerable{TOut}"/>.
/// The given enumerable is re-runnable but will cause a re-materialization of the stream each time.
/// This is implemented using a SourceQueue and will buffer elements and/or backpressure,
/// based on the buffer values provided.
/// </summary>
/// <param name="materializer">The materializer to use for each enumeration</param>
/// <param name="minBuffer">The minimum input buffer size</param>
/// <param name="maxBuffer">The Max input buffer size.</param>
/// <returns>A lazy <see cref="IAsyncEnumerable{T}"/> that will run each time it is enumerated.</returns>
public IAsyncEnumerable<TOut> RunAsAsyncEnumerableBuffer(
IMaterializer materializer, int minBuffer = 4,
int maxBuffer = 16) =>
new StreamsAsyncEnumerableRerunnable<TOut,TMat>(
this, materializer,minBuffer,maxBuffer);


/// <summary>
/// Combines several sources with fun-in strategy like <see cref="Merge{TIn,TOut}"/> or <see cref="Concat{TIn,TOut}"/> and returns <see cref="Source{TOut,TMat}"/>.
/// </summary>
Expand Down
97 changes: 97 additions & 0 deletions src/core/Akka.Streams/Implementation/AsyncEnumerable.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// //-----------------------------------------------------------------------
// // <copyright file="AsyncEnumerable.cs" company="Akka.NET Project">
// // Copyright (C) 2009-2021 Lightbend Inc. <http://www.lightbend.com>
// // Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
// // </copyright>
// //-----------------------------------------------------------------------

using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Akka.Configuration.Hocon;

namespace Akka.Streams.Dsl
{
/// <summary>
/// Used to treat an <see cref="IRunnableGraph{TMat}"/> of <see cref="ISinkQueue{T}"/>
/// as an <see cref="IAsyncEnumerable{T}"/>
/// </summary>
/// <typeparam name="T"></typeparam>
public sealed class StreamsAsyncEnumerableRerunnable<T,TMat> : IAsyncEnumerable<T>
{
private static readonly Sink<T, ISinkQueue<T>> defaultSinkqueue =
Sink.Queue<T>();
private readonly Source<T, TMat> _source;
private readonly IMaterializer _materializer;

private readonly Sink<T, ISinkQueue<T>> thisSinkQueue;
//private readonly IRunnableGraph<(UniqueKillSwitch, ISinkQueue<T>)> _graph;
public StreamsAsyncEnumerableRerunnable(Source<T,TMat> source, IMaterializer materializer)
{
_source = source;
_materializer = materializer;
thisSinkQueue = defaultSinkqueue;
}

public StreamsAsyncEnumerableRerunnable(Source<T, TMat> source,
IMaterializer materializer, int minBuf, int maxBuf):this(source, materializer)
{
thisSinkQueue =
defaultSinkqueue.WithAttributes(
Attributes.CreateInputBuffer(minBuf, maxBuf));
}

public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

return new SinkQueueAsyncEnumerator<T>(_source
.Via(cancellationToken.AsFlow<T>(cancelGracefully: true))
.ViaMaterialized(KillSwitches.Single<T>(), Keep.Right)
.ToMaterialized(thisSinkQueue, Keep.Both)
.Run(_materializer),
cancellationToken);
}
}
/// <summary>
/// Wraps a Sink Queue and Killswitch around <see cref="IAsyncEnumerator{T}"/>
/// </summary>
/// <typeparam name="T"></typeparam>
public sealed class SinkQueueAsyncEnumerator<T> : IAsyncEnumerator<T>
{
private ISinkQueue<T> _sinkQueue;
private IKillSwitch _killSwitch;
private CancellationToken _token;
public SinkQueueAsyncEnumerator((UniqueKillSwitch killSwitch,ISinkQueue<T> sinkQueue) queueAndSwitch, CancellationToken token)
{
_sinkQueue = queueAndSwitch.sinkQueue;
_killSwitch = queueAndSwitch.killSwitch;
_token = token;
}
public async ValueTask DisposeAsync()
{
//If we are disposing, let's shut down the stream
//so that we don't have data hanging around.
_killSwitch.Shutdown();
_killSwitch = null;
_sinkQueue = null;
}

public async ValueTask<bool> MoveNextAsync()
{
_token.ThrowIfCancellationRequested();
var opt = await _sinkQueue.PullAsync();
if (opt.HasValue)
{
Current = opt.Value;
return true;
}
else
{
return false;
}
}

public T Current { get; private set; }
}
}

0 comments on commit b77ed9a

Please sign in to comment.