Skip to content

Commit

Permalink
Use ActorSystem for Materializer (#6453)
Browse files Browse the repository at this point in the history
* Akka.Streams syntax cleanup

* added `DefaultMaterializer` `ActorSystemExtension`

Makes it possible to pass in an `ActorSystem` directly into most `RunnableGraph.Run` methods and their equivalents exposed inside the Dsl.

* fixed unit tests

* added test to validate caching of singleton `DefaultMaterializer` instance

---------

Co-authored-by: Gregorius Soedharmo <arkatufus@yahoo.com>
  • Loading branch information
Aaronontheweb and Arkatufus committed Feb 27, 2023
1 parent 21a3d31 commit 35cb6df
Show file tree
Hide file tree
Showing 12 changed files with 439 additions and 260 deletions.
Expand Up @@ -1619,6 +1619,7 @@ namespace Akka.Streams.Dsl
Akka.Streams.Dsl.IRunnableGraph<TMat2> MapMaterializedValue<TMat2>(System.Func<TMat, TMat2> func);
Akka.Streams.Dsl.IRunnableGraph<TMat> Named(string name);
TMat Run(Akka.Streams.IMaterializer materializer);
TMat Run(Akka.Actor.ActorSystem actorSystem);
Akka.Streams.Dsl.IRunnableGraph<TMat> WithAttributes(Akka.Streams.Attributes attributes);
}
public interface IUnzipWithCreator<out TIn, in TOut, out T>
Expand Down Expand Up @@ -1924,6 +1925,7 @@ namespace Akka.Streams.Dsl
public Akka.Streams.Dsl.IRunnableGraph<TMat2> MapMaterializedValue<TMat2>(System.Func<TMat, TMat2> func) { }
public Akka.Streams.Dsl.IRunnableGraph<TMat> Named(string name) { }
public TMat Run(Akka.Streams.IMaterializer materializer) { }
public TMat Run(Akka.Actor.ActorSystem actorSystem) { }
public Akka.Streams.Dsl.IRunnableGraph<TMat> WithAttributes(Akka.Streams.Attributes attributes) { }
}
public class Sample<T> : Akka.Streams.Stage.GraphStage<Akka.Streams.FlowShape<T, T>>
Expand Down Expand Up @@ -2190,13 +2192,21 @@ namespace Akka.Streams.Dsl
public Akka.Streams.Dsl.Source<TOut, TMat2> MapMaterializedValue<TMat2>(System.Func<TMat, TMat2> mapFunc) { }
public Akka.Streams.Dsl.Source<TOut, TMat> Named(string name) { }
public System.ValueTuple<TMat, Akka.Streams.Dsl.Source<TOut, Akka.NotUsed>> PreMaterialize(Akka.Streams.IMaterializer materializer) { }
public System.ValueTuple<TMat, Akka.Streams.Dsl.Source<TOut, Akka.NotUsed>> PreMaterialize(Akka.Actor.ActorSystem actorSystem) { }
public System.Threading.Tasks.Task<TOut2> RunAggregate<TOut2>(TOut2 zero, System.Func<TOut2, TOut, TOut2> aggregate, Akka.Streams.IMaterializer materializer) { }
public System.Threading.Tasks.Task<TOut2> RunAggregate<TOut2>(TOut2 zero, System.Func<TOut2, TOut, TOut2> aggregate, Akka.Actor.ActorSystem materializer) { }
public System.Threading.Tasks.Task<TOut2> RunAggregateAsync<TOut2>(TOut2 zero, System.Func<TOut2, TOut, System.Threading.Tasks.Task<TOut2>> aggregate, Akka.Streams.IMaterializer materializer) { }
public System.Threading.Tasks.Task<TOut2> RunAggregateAsync<TOut2>(TOut2 zero, System.Func<TOut2, TOut, System.Threading.Tasks.Task<TOut2>> aggregate, Akka.Actor.ActorSystem materializer) { }
public System.Collections.Generic.IAsyncEnumerable<TOut> RunAsAsyncEnumerable(Akka.Streams.IMaterializer materializer) { }
public System.Collections.Generic.IAsyncEnumerable<TOut> RunAsAsyncEnumerable(Akka.Actor.ActorSystem materializer) { }
public System.Collections.Generic.IAsyncEnumerable<TOut> RunAsAsyncEnumerableBuffer(Akka.Streams.IMaterializer materializer, int minBuffer = 4, int maxBuffer = 16) { }
public System.Collections.Generic.IAsyncEnumerable<TOut> RunAsAsyncEnumerableBuffer(Akka.Actor.ActorSystem materializer, int minBuffer = 4, int maxBuffer = 16) { }
public System.Threading.Tasks.Task RunForeach(System.Action<TOut> action, Akka.Streams.IMaterializer materializer) { }
public System.Threading.Tasks.Task RunForeach(System.Action<TOut> action, Akka.Actor.ActorSystem materializer) { }
public System.Threading.Tasks.Task<TOut> RunSum(System.Func<TOut, TOut, TOut> reduce, Akka.Streams.IMaterializer materializer) { }
public System.Threading.Tasks.Task<TOut> RunSum(System.Func<TOut, TOut, TOut> reduce, Akka.Actor.ActorSystem materializer) { }
public TMat2 RunWith<TMat2>(Akka.Streams.IGraph<Akka.Streams.SinkShape<TOut>, TMat2> sink, Akka.Streams.IMaterializer materializer) { }
public TMat2 RunWith<TMat2>(Akka.Streams.IGraph<Akka.Streams.SinkShape<TOut>, TMat2> sink, Akka.Actor.ActorSystem materializer) { }
public Akka.Streams.Dsl.IRunnableGraph<TMat> To<TMat2>(Akka.Streams.IGraph<Akka.Streams.SinkShape<TOut>, TMat2> sink) { }
public Akka.Streams.Dsl.IRunnableGraph<TMat3> ToMaterialized<TMat2, TMat3>(Akka.Streams.IGraph<Akka.Streams.SinkShape<TOut>, TMat2> sink, System.Func<TMat, TMat2, TMat3> combine) { }
public override string ToString() { }
Expand Down Expand Up @@ -3637,7 +3647,7 @@ namespace Akka.Streams.Implementation
}
public class static StreamLayout
{
public static readonly bool IsDebug;
public const bool IsDebug = false;
public static void Validate(Akka.Streams.Implementation.IModule module, int level = 0, bool shouldPrint = False, System.Collections.Generic.IDictionary<object, int> idMap = null) { }
public sealed class Atomic : Akka.Streams.Implementation.StreamLayout.IMaterializedValueNode
{
Expand Down
Expand Up @@ -1619,6 +1619,7 @@ namespace Akka.Streams.Dsl
Akka.Streams.Dsl.IRunnableGraph<TMat2> MapMaterializedValue<TMat2>(System.Func<TMat, TMat2> func);
Akka.Streams.Dsl.IRunnableGraph<TMat> Named(string name);
TMat Run(Akka.Streams.IMaterializer materializer);
TMat Run(Akka.Actor.ActorSystem actorSystem);
Akka.Streams.Dsl.IRunnableGraph<TMat> WithAttributes(Akka.Streams.Attributes attributes);
}
public interface IUnzipWithCreator<out TIn, in TOut, out T>
Expand Down Expand Up @@ -1924,6 +1925,7 @@ namespace Akka.Streams.Dsl
public Akka.Streams.Dsl.IRunnableGraph<TMat2> MapMaterializedValue<TMat2>(System.Func<TMat, TMat2> func) { }
public Akka.Streams.Dsl.IRunnableGraph<TMat> Named(string name) { }
public TMat Run(Akka.Streams.IMaterializer materializer) { }
public TMat Run(Akka.Actor.ActorSystem actorSystem) { }
public Akka.Streams.Dsl.IRunnableGraph<TMat> WithAttributes(Akka.Streams.Attributes attributes) { }
}
public class Sample<T> : Akka.Streams.Stage.GraphStage<Akka.Streams.FlowShape<T, T>>
Expand Down Expand Up @@ -2190,13 +2192,21 @@ namespace Akka.Streams.Dsl
public Akka.Streams.Dsl.Source<TOut, TMat2> MapMaterializedValue<TMat2>(System.Func<TMat, TMat2> mapFunc) { }
public Akka.Streams.Dsl.Source<TOut, TMat> Named(string name) { }
public System.ValueTuple<TMat, Akka.Streams.Dsl.Source<TOut, Akka.NotUsed>> PreMaterialize(Akka.Streams.IMaterializer materializer) { }
public System.ValueTuple<TMat, Akka.Streams.Dsl.Source<TOut, Akka.NotUsed>> PreMaterialize(Akka.Actor.ActorSystem actorSystem) { }
public System.Threading.Tasks.Task<TOut2> RunAggregate<TOut2>(TOut2 zero, System.Func<TOut2, TOut, TOut2> aggregate, Akka.Streams.IMaterializer materializer) { }
public System.Threading.Tasks.Task<TOut2> RunAggregate<TOut2>(TOut2 zero, System.Func<TOut2, TOut, TOut2> aggregate, Akka.Actor.ActorSystem materializer) { }
public System.Threading.Tasks.Task<TOut2> RunAggregateAsync<TOut2>(TOut2 zero, System.Func<TOut2, TOut, System.Threading.Tasks.Task<TOut2>> aggregate, Akka.Streams.IMaterializer materializer) { }
public System.Threading.Tasks.Task<TOut2> RunAggregateAsync<TOut2>(TOut2 zero, System.Func<TOut2, TOut, System.Threading.Tasks.Task<TOut2>> aggregate, Akka.Actor.ActorSystem materializer) { }
public System.Collections.Generic.IAsyncEnumerable<TOut> RunAsAsyncEnumerable(Akka.Streams.IMaterializer materializer) { }
public System.Collections.Generic.IAsyncEnumerable<TOut> RunAsAsyncEnumerable(Akka.Actor.ActorSystem materializer) { }
public System.Collections.Generic.IAsyncEnumerable<TOut> RunAsAsyncEnumerableBuffer(Akka.Streams.IMaterializer materializer, int minBuffer = 4, int maxBuffer = 16) { }
public System.Collections.Generic.IAsyncEnumerable<TOut> RunAsAsyncEnumerableBuffer(Akka.Actor.ActorSystem materializer, int minBuffer = 4, int maxBuffer = 16) { }
public System.Threading.Tasks.Task RunForeach(System.Action<TOut> action, Akka.Streams.IMaterializer materializer) { }
public System.Threading.Tasks.Task RunForeach(System.Action<TOut> action, Akka.Actor.ActorSystem materializer) { }
public System.Threading.Tasks.Task<TOut> RunSum(System.Func<TOut, TOut, TOut> reduce, Akka.Streams.IMaterializer materializer) { }
public System.Threading.Tasks.Task<TOut> RunSum(System.Func<TOut, TOut, TOut> reduce, Akka.Actor.ActorSystem materializer) { }
public TMat2 RunWith<TMat2>(Akka.Streams.IGraph<Akka.Streams.SinkShape<TOut>, TMat2> sink, Akka.Streams.IMaterializer materializer) { }
public TMat2 RunWith<TMat2>(Akka.Streams.IGraph<Akka.Streams.SinkShape<TOut>, TMat2> sink, Akka.Actor.ActorSystem materializer) { }
public Akka.Streams.Dsl.IRunnableGraph<TMat> To<TMat2>(Akka.Streams.IGraph<Akka.Streams.SinkShape<TOut>, TMat2> sink) { }
public Akka.Streams.Dsl.IRunnableGraph<TMat3> ToMaterialized<TMat2, TMat3>(Akka.Streams.IGraph<Akka.Streams.SinkShape<TOut>, TMat2> sink, System.Func<TMat, TMat2, TMat3> combine) { }
public override string ToString() { }
Expand Down Expand Up @@ -3637,7 +3647,7 @@ namespace Akka.Streams.Implementation
}
public class static StreamLayout
{
public static readonly bool IsDebug;
public const bool IsDebug = false;
public static void Validate(Akka.Streams.Implementation.IModule module, int level = 0, bool shouldPrint = False, System.Collections.Generic.IDictionary<object, int> idMap = null) { }
public sealed class Atomic : Akka.Streams.Implementation.StreamLayout.IMaterializedValueNode
{
Expand Down
Expand Up @@ -1619,6 +1619,7 @@ namespace Akka.Streams.Dsl
Akka.Streams.Dsl.IRunnableGraph<TMat2> MapMaterializedValue<TMat2>(System.Func<TMat, TMat2> func);
Akka.Streams.Dsl.IRunnableGraph<TMat> Named(string name);
TMat Run(Akka.Streams.IMaterializer materializer);
TMat Run(Akka.Actor.ActorSystem actorSystem);
Akka.Streams.Dsl.IRunnableGraph<TMat> WithAttributes(Akka.Streams.Attributes attributes);
}
public interface IUnzipWithCreator<out TIn, in TOut, out T>
Expand Down Expand Up @@ -1924,6 +1925,7 @@ namespace Akka.Streams.Dsl
public Akka.Streams.Dsl.IRunnableGraph<TMat2> MapMaterializedValue<TMat2>(System.Func<TMat, TMat2> func) { }
public Akka.Streams.Dsl.IRunnableGraph<TMat> Named(string name) { }
public TMat Run(Akka.Streams.IMaterializer materializer) { }
public TMat Run(Akka.Actor.ActorSystem actorSystem) { }
public Akka.Streams.Dsl.IRunnableGraph<TMat> WithAttributes(Akka.Streams.Attributes attributes) { }
}
public class Sample<T> : Akka.Streams.Stage.GraphStage<Akka.Streams.FlowShape<T, T>>
Expand Down Expand Up @@ -2190,13 +2192,21 @@ namespace Akka.Streams.Dsl
public Akka.Streams.Dsl.Source<TOut, TMat2> MapMaterializedValue<TMat2>(System.Func<TMat, TMat2> mapFunc) { }
public Akka.Streams.Dsl.Source<TOut, TMat> Named(string name) { }
public System.ValueTuple<TMat, Akka.Streams.Dsl.Source<TOut, Akka.NotUsed>> PreMaterialize(Akka.Streams.IMaterializer materializer) { }
public System.ValueTuple<TMat, Akka.Streams.Dsl.Source<TOut, Akka.NotUsed>> PreMaterialize(Akka.Actor.ActorSystem actorSystem) { }
public System.Threading.Tasks.Task<TOut2> RunAggregate<TOut2>(TOut2 zero, System.Func<TOut2, TOut, TOut2> aggregate, Akka.Streams.IMaterializer materializer) { }
public System.Threading.Tasks.Task<TOut2> RunAggregate<TOut2>(TOut2 zero, System.Func<TOut2, TOut, TOut2> aggregate, Akka.Actor.ActorSystem materializer) { }
public System.Threading.Tasks.Task<TOut2> RunAggregateAsync<TOut2>(TOut2 zero, System.Func<TOut2, TOut, System.Threading.Tasks.Task<TOut2>> aggregate, Akka.Streams.IMaterializer materializer) { }
public System.Threading.Tasks.Task<TOut2> RunAggregateAsync<TOut2>(TOut2 zero, System.Func<TOut2, TOut, System.Threading.Tasks.Task<TOut2>> aggregate, Akka.Actor.ActorSystem materializer) { }
public System.Collections.Generic.IAsyncEnumerable<TOut> RunAsAsyncEnumerable(Akka.Streams.IMaterializer materializer) { }
public System.Collections.Generic.IAsyncEnumerable<TOut> RunAsAsyncEnumerable(Akka.Actor.ActorSystem materializer) { }
public System.Collections.Generic.IAsyncEnumerable<TOut> RunAsAsyncEnumerableBuffer(Akka.Streams.IMaterializer materializer, int minBuffer = 4, int maxBuffer = 16) { }
public System.Collections.Generic.IAsyncEnumerable<TOut> RunAsAsyncEnumerableBuffer(Akka.Actor.ActorSystem materializer, int minBuffer = 4, int maxBuffer = 16) { }
public System.Threading.Tasks.Task RunForeach(System.Action<TOut> action, Akka.Streams.IMaterializer materializer) { }
public System.Threading.Tasks.Task RunForeach(System.Action<TOut> action, Akka.Actor.ActorSystem materializer) { }
public System.Threading.Tasks.Task<TOut> RunSum(System.Func<TOut, TOut, TOut> reduce, Akka.Streams.IMaterializer materializer) { }
public System.Threading.Tasks.Task<TOut> RunSum(System.Func<TOut, TOut, TOut> reduce, Akka.Actor.ActorSystem materializer) { }
public TMat2 RunWith<TMat2>(Akka.Streams.IGraph<Akka.Streams.SinkShape<TOut>, TMat2> sink, Akka.Streams.IMaterializer materializer) { }
public TMat2 RunWith<TMat2>(Akka.Streams.IGraph<Akka.Streams.SinkShape<TOut>, TMat2> sink, Akka.Actor.ActorSystem materializer) { }
public Akka.Streams.Dsl.IRunnableGraph<TMat> To<TMat2>(Akka.Streams.IGraph<Akka.Streams.SinkShape<TOut>, TMat2> sink) { }
public Akka.Streams.Dsl.IRunnableGraph<TMat3> ToMaterialized<TMat2, TMat3>(Akka.Streams.IGraph<Akka.Streams.SinkShape<TOut>, TMat2> sink, System.Func<TMat, TMat2, TMat3> combine) { }
public override string ToString() { }
Expand Down Expand Up @@ -3637,7 +3647,7 @@ namespace Akka.Streams.Implementation
}
public class static StreamLayout
{
public static readonly bool IsDebug;
public const bool IsDebug = false;
public static void Validate(Akka.Streams.Implementation.IModule module, int level = 0, bool shouldPrint = False, System.Collections.Generic.IDictionary<object, int> idMap = null) { }
public sealed class Atomic : Akka.Streams.Implementation.StreamLayout.IMaterializedValueNode
{
Expand Down
18 changes: 18 additions & 0 deletions src/core/Akka.Streams.Tests/ActorMaterializerSpec.cs
Expand Up @@ -89,5 +89,23 @@ public void ActorMaterializer_should_report_correctly_if_it_has_been_shut_down_f
sys.Terminate().Wait();
m.IsShutdown.Should().BeTrue();
}

[Fact]
public async Task CanMaterializeStreamsUsingActorSystem()
{
Func<Task> task = () => Source.Single(1).RunForeach(i => { }, Sys);
await task.Should().NotThrowAsync();
}

[Fact]
public void ShouldReturnSameMaterializerForActorSystem()
{
var mat1 = Sys.Materializer();
var mat2 = Sys.Materializer();
var mat3 = Sys.Materializer(namePrefix: "different");

mat1.Should().Be(mat2);
mat1.Should().NotBe(mat3);
}
}
}
2 changes: 1 addition & 1 deletion src/core/Akka.Streams.Tests/Dsl/QueueSourceSpec.cs
Expand Up @@ -265,7 +265,7 @@ public void QueueSource_should_complete_watching_future_with_failure_if_material
{
this.AssertAllStagesStopped(() =>
{
var tempMap = ActorMaterializer.Create(Sys);
var tempMap = ActorMaterializer.Create(Sys, ActorMaterializerSettings.Create(Sys)); // need to create a new materializer to be able to shutdown it
var s = this.CreateManualSubscriberProbe<int>();
var queue = Source.Queue<int>(1, OverflowStrategy.Fail)
.To(Sink.FromSubscriber(s))
Expand Down
Expand Up @@ -266,7 +266,8 @@ public void ActorGraphInterpreter_should_trigger_PostStop_in_all_stages_when_abr
{
this.AssertAllStagesStopped(() =>
{
var materializer = ActorMaterializer.Create(Sys);
// force the system to create a new materializer
var materializer = ActorMaterializer.Create(Sys, ActorMaterializerSettings.Create(Sys));
var gotStop = new TestLatch(1);
var downstream = this.CreateSubscriberProbe<string>();
Expand Down

0 comments on commit 35cb6df

Please sign in to comment.