From f595f6313a18d36efdc6e34adbfe0a019990ee60 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Sat, 11 Jun 2022 01:35:30 +0700 Subject: [PATCH] Convert Akka.Streams.Tests to async - Actor.ActorPublisherSpec (#5991) --- .../Akka.Streams.TestKit/TestSubscriber.cs | 8 + src/core/Akka.Streams.TestKit/Utils.cs | 49 +- .../Actor/ActorPublisherSpec.cs | 453 ++++++++++-------- 3 files changed, 298 insertions(+), 212 deletions(-) diff --git a/src/core/Akka.Streams.TestKit/TestSubscriber.cs b/src/core/Akka.Streams.TestKit/TestSubscriber.cs index 1b41fe4e5d1..77594aa322d 100644 --- a/src/core/Akka.Streams.TestKit/TestSubscriber.cs +++ b/src/core/Akka.Streams.TestKit/TestSubscriber.cs @@ -272,6 +272,14 @@ public async Task ExpectSubscriptionAndErrorAsync(CancellationToken c => await ExpectSubscriptionAndErrorTask(this, signalDemand, cancellationToken) .ConfigureAwait(false); + public async Task ExpectSubscriptionAndCompleteAsync( + bool signalDemand = true, + CancellationToken cancellationToken = default) + { + await ExpectSubscriptionAndCompleteTask(this, signalDemand, cancellationToken) + .ConfigureAwait(false); + } + /// /// Expect given next element or error signal, returning whichever was signaled. /// diff --git a/src/core/Akka.Streams.TestKit/Utils.cs b/src/core/Akka.Streams.TestKit/Utils.cs index 5a2e457f34a..e05a2893c42 100644 --- a/src/core/Akka.Streams.TestKit/Utils.cs +++ b/src/core/Akka.Streams.TestKit/Utils.cs @@ -8,12 +8,15 @@ using System; using System.Collections.Immutable; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Akka.Actor; using Akka.Configuration; using Akka.Streams.Implementation; using Akka.TestKit; +using Akka.TestKit.Extensions; using Akka.Util.Internal; +using FluentAssertions.Extensions; namespace Akka.Streams.TestKit { @@ -22,32 +25,52 @@ public static class Utils public static Config UnboundedMailboxConfig { get; } = ConfigurationFactory.ParseString(@"akka.actor.default-mailbox.mailbox-type = ""Akka.Dispatch.UnboundedMailbox, Akka"""); - public static void AssertAllStagesStopped(this AkkaSpec spec, Action block, IMaterializer materializer) + public static void AssertAllStagesStopped( + this AkkaSpec spec, + Action block, + IMaterializer materializer, + TimeSpan? timeout = null, + CancellationToken cancellationToken = default) => AssertAllStagesStoppedAsync(spec, () => { block(); return NotUsed.Instance; - }, materializer) + }, materializer, timeout, cancellationToken) .ConfigureAwait(false).GetAwaiter().GetResult(); - public static T AssertAllStagesStopped(this AkkaSpec spec, Func block, IMaterializer materializer) - => AssertAllStagesStoppedAsync(spec, () => Task.FromResult(block()), materializer) + public static T AssertAllStagesStopped( + this AkkaSpec spec, + Func block, + IMaterializer materializer, + TimeSpan? timeout = null, + CancellationToken cancellationToken = default) + => AssertAllStagesStoppedAsync(spec, async () => block(), materializer, timeout, cancellationToken) .ConfigureAwait(false).GetAwaiter().GetResult(); - public static async Task AssertAllStagesStoppedAsync(this AkkaSpec spec, Func block, - IMaterializer materializer) - => await AssertAllStagesStoppedAsync(spec, () => Task.FromResult(block()), materializer) + public static async Task AssertAllStagesStoppedAsync( + this AkkaSpec spec, + Func block, + IMaterializer materializer, + TimeSpan? timeout = null, + CancellationToken cancellationToken = default) + => await AssertAllStagesStoppedAsync(spec, () => Task.FromResult(block()), materializer, timeout, cancellationToken) .ConfigureAwait(false); - public static async Task AssertAllStagesStoppedAsync(this AkkaSpec spec, Func> block, IMaterializer materializer) + public static async Task AssertAllStagesStoppedAsync( + this AkkaSpec spec, + Func> block, + IMaterializer materializer, + TimeSpan? timeout = null, + CancellationToken cancellationToken = default) { - var result = await block(); + timeout ??= 20.Seconds(); + var result = await block().ShouldCompleteWithin(timeout.Value); if (!(materializer is ActorMaterializerImpl impl)) return result; var probe = spec.CreateTestProbe(impl.System); probe.Send(impl.Supervisor, StreamSupervisor.StopChildren.Instance); - await probe.ExpectMsgAsync(); + await probe.ExpectMsgAsync(cancellationToken: cancellationToken); await probe.WithinAsync(TimeSpan.FromSeconds(5), async () => { @@ -57,17 +80,17 @@ public static async Task AssertAllStagesStoppedAsync(this AkkaSpec spec, F await probe.AwaitAssertAsync(async () => { impl.Supervisor.Tell(StreamSupervisor.GetChildren.Instance, probe.Ref); - children = (await probe.ExpectMsgAsync()).Refs; + children = (await probe.ExpectMsgAsync(cancellationToken: cancellationToken)).Refs; if (children.Count != 0) throw new Exception($"expected no StreamSupervisor children, but got {children.Aggregate("", (s, @ref) => s + @ref + ", ")}"); - }); + }, cancellationToken: cancellationToken); } catch { children.ForEach(c=>c.Tell(StreamSupervisor.PrintDebugDump.Instance)); throw; } - }); + }, cancellationToken: cancellationToken); return result; } diff --git a/src/core/Akka.Streams.Tests/Actor/ActorPublisherSpec.cs b/src/core/Akka.Streams.Tests/Actor/ActorPublisherSpec.cs index 774a37d36a0..8c4b000d088 100644 --- a/src/core/Akka.Streams.Tests/Actor/ActorPublisherSpec.cs +++ b/src/core/Akka.Streams.Tests/Actor/ActorPublisherSpec.cs @@ -10,6 +10,7 @@ using System.Collections.Immutable; using System.Linq; using System.Threading; +using System.Threading.Tasks; using Akka.Actor; using Akka.Configuration; using Akka.Pattern; @@ -57,7 +58,7 @@ public ActorPublisherSpec(ITestOutputHelper output = null) } [Fact] - public void ActorPublisher_should_accumulate_demand() + public async Task ActorPublisher_should_accumulate_demand() { var probe = CreateTestProbe(); var actorRef = Sys.ActorOf(TestPublisher.Props(probe.Ref)); @@ -65,190 +66,206 @@ public void ActorPublisher_should_accumulate_demand() var s = this.CreateSubscriberProbe(); p.Subscribe(s); - s.Request(2); - probe.ExpectMsg().Elements.Should().Be(2); - s.Request(3); - probe.ExpectMsg().Elements.Should().Be(5); - s.Cancel(); + await s.RequestAsync(2); + (await probe.ExpectMsgAsync()).Elements.Should().Be(2); + await s.RequestAsync(3); + (await probe.ExpectMsgAsync()).Elements.Should().Be(5); + await s.CancelAsync(); } [Fact] - public void ActorPublisher_should_allow_onNext_up_to_requested_elements_but_not_more() + public async Task ActorPublisher_should_allow_onNext_up_to_requested_elements_but_not_more() { var probe = CreateTestProbe(); var actorRef = Sys.ActorOf(TestPublisher.Props(probe.Ref)); var p = ActorPublisher.Create(actorRef); var s = this.CreateSubscriberProbe(); p.Subscribe(s); - s.Request(2); + + await s.RequestAsync(2); actorRef.Tell(new Produce("elem-1")); actorRef.Tell(new Produce("elem-2")); actorRef.Tell(new Produce("elem-3")); - s.ExpectNext("elem-1"); - s.ExpectNext("elem-2"); - s.ExpectNoMsg(TimeSpan.FromMilliseconds(300)); - s.Cancel(); + + await s.AsyncBuilder() + .ExpectNext("elem-1") + .ExpectNext("elem-2") + .ExpectNoMsg(TimeSpan.FromMilliseconds(300)) + .Cancel() + .ExecuteAsync(); } [Fact] - public void ActorPublisher_should_signal_error() + public async Task ActorPublisher_should_signal_error() { var probe = CreateTestProbe(); var actorRef = Sys.ActorOf(TestPublisher.Props(probe.Ref)); var s = this.CreateManualSubscriberProbe(); ActorPublisher.Create(actorRef).Subscribe(s); actorRef.Tell(new Err("wrong")); - s.ExpectSubscription(); - s.ExpectError().Message.Should().Be("wrong"); + + await s.ExpectSubscriptionAsync(); + (await s.ExpectErrorAsync()).Message.Should().Be("wrong"); } [Fact] - public void ActorPublisher_should_not_terminate_after_signaling_onError() + public async Task ActorPublisher_should_not_terminate_after_signaling_onError() { var probe = CreateTestProbe(); var actorRef = Sys.ActorOf(TestPublisher.Props(probe.Ref)); var s = this.CreateManualSubscriberProbe(); ActorPublisher.Create(actorRef).Subscribe(s); - s.ExpectSubscription(); + + await s.ExpectSubscriptionAsync(); probe.Watch(actorRef); actorRef.Tell(new Err("wrong")); - s.ExpectError().Message.Should().Be("wrong"); - probe.ExpectNoMsg(TimeSpan.FromMilliseconds(200)); + (await s.ExpectErrorAsync()).Message.Should().Be("wrong"); + await probe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200)); } [Fact] - public void ActorPublisher_should_terminate_after_signalling_OnErrorThenStop() + public async Task ActorPublisher_should_terminate_after_signalling_OnErrorThenStop() { var probe = CreateTestProbe(); var actorRef = Sys.ActorOf(TestPublisher.Props(probe.Ref)); var s = this.CreateManualSubscriberProbe(); ActorPublisher.Create(actorRef).Subscribe(s); - s.ExpectSubscription(); + + await s.ExpectSubscriptionAsync(); probe.Watch(actorRef); actorRef.Tell(new ErrThenStop("wrong")); - s.ExpectError().Message.Should().Be("wrong"); - probe.ExpectTerminated(actorRef, TimeSpan.FromSeconds(3)); + (await s.ExpectErrorAsync()).Message.Should().Be("wrong"); + await probe.ExpectTerminatedAsync(actorRef, TimeSpan.FromSeconds(3)); } [Fact] - public void ActorPublisher_should_signal_error_before_subscribe() + public async Task ActorPublisher_should_signal_error_before_subscribe() { var probe = CreateTestProbe(); var actorRef = Sys.ActorOf(TestPublisher.Props(probe.Ref)); actorRef.Tell(new Err("early err")); var s = this.CreateManualSubscriberProbe(); ActorPublisher.Create(actorRef).Subscribe(s); - s.ExpectSubscriptionAndError().Message.Should().Be("early err"); + + (await s.ExpectSubscriptionAndErrorAsync()).Message.Should().Be("early err"); } [Fact] - public void ActorPublisher_should_drop_onNext_elements_after_cancel() + public async Task ActorPublisher_should_drop_onNext_elements_after_cancel() { var probe = CreateTestProbe(); var actorRef = Sys.ActorOf(TestPublisher.Props(probe.Ref)); var p = ActorPublisher.Create(actorRef); var s = this.CreateSubscriberProbe(); p.Subscribe(s); - s.Request(2); + + await s.RequestAsync(2); actorRef.Tell(new Produce("elem-1")); - s.Cancel(); + await s.CancelAsync(); actorRef.Tell(new Produce("elem-2")); - s.ExpectNext("elem-1"); - s.ExpectNoMsg(TimeSpan.FromMilliseconds(300)); + await s.ExpectNextAsync("elem-1"); + await s.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(300)); } [Fact] - public void ActorPublisher_should_remember_requested_after_restart() + public async Task ActorPublisher_should_remember_requested_after_restart() { var probe = CreateTestProbe(); var actorRef = Sys.ActorOf(TestPublisher.Props(probe.Ref)); var p = ActorPublisher.Create(actorRef); var s = this.CreateSubscriberProbe(); p.Subscribe(s); - s.Request(3); - probe.ExpectMsg().Elements.Should().Be(3); + + await s.RequestAsync(3); + (await probe.ExpectMsgAsync()).Elements.Should().Be(3); actorRef.Tell(new Produce("elem-1")); actorRef.Tell(Boom.Instance); actorRef.Tell(new Produce("elem-2")); - s.ExpectNext("elem-1"); - s.ExpectNext("elem-2"); - s.Request(5); - probe.ExpectMsg().Elements.Should().Be(6); - s.Cancel(); + await s.ExpectNextAsync("elem-1"); + await s.ExpectNextAsync("elem-2"); + await s.RequestAsync(5); + (await probe.ExpectMsgAsync()).Elements.Should().Be(6); + await s.CancelAsync(); } [Fact] - public void ActorPublisher_should_signal_onComplete() + public async Task ActorPublisher_should_signal_onComplete() { var probe = CreateTestProbe(); var actorRef = Sys.ActorOf(TestPublisher.Props(probe.Ref)); var s = this.CreateSubscriberProbe(); ActorPublisher.Create(actorRef).Subscribe(s); - s.Request(3); + + await s.RequestAsync(3); actorRef.Tell(new Produce("elem-1")); actorRef.Tell(Complete.Instance); - s.ExpectNext("elem-1"); - s.ExpectComplete(); + await s.ExpectNextAsync("elem-1"); + await s.ExpectCompleteAsync(); } [Fact] - public void ActorPublisher_should_not_terminate_after_signalling_onComplete() + public async Task ActorPublisher_should_not_terminate_after_signalling_onComplete() { var probe = CreateTestProbe(); var actorRef = Sys.ActorOf(TestPublisher.Props(probe.Ref)); var s = this.CreateSubscriberProbe(); ActorPublisher.Create(actorRef).Subscribe(s); - var sub = s.ExpectSubscription(); + + var sub = await s.ExpectSubscriptionAsync(); sub.Request(3); - probe.ExpectMsg().Elements.Should().Be(3); + (await probe.ExpectMsgAsync()).Elements.Should().Be(3); probe.Watch(actorRef); actorRef.Tell(new Produce("elem-1")); actorRef.Tell(Complete.Instance); - s.ExpectNext("elem-1"); - s.ExpectComplete(); - probe.ExpectNoMsg(TimeSpan.FromMilliseconds(200)); + await s.ExpectNextAsync("elem-1"); + await s.ExpectCompleteAsync(); + await probe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200)); } [Fact] - public void ActorPublisher_should_terminate_after_signalling_onCompleteThenStop() + public async Task ActorPublisher_should_terminate_after_signalling_onCompleteThenStop() { var probe = CreateTestProbe(); var actorRef = Sys.ActorOf(TestPublisher.Props(probe.Ref)); var s = this.CreateSubscriberProbe(); ActorPublisher.Create(actorRef).Subscribe(s); - var sub = s.ExpectSubscription(); + + var sub = await s.ExpectSubscriptionAsync(); sub.Request(3); - probe.ExpectMsg().Elements.Should().Be(3); + (await probe.ExpectMsgAsync()).Elements.Should().Be(3); probe.Watch(actorRef); actorRef.Tell(new Produce("elem-1")); actorRef.Tell(CompleteThenStop.Instance); - s.ExpectNext("elem-1"); - s.ExpectComplete(); - probe.ExpectTerminated(actorRef,TimeSpan.FromSeconds(3)); + await s.ExpectNextAsync("elem-1"); + await s.ExpectCompleteAsync(); + await probe.ExpectTerminatedAsync(actorRef,TimeSpan.FromSeconds(3)); } [Fact] - public void ActorPublisher_should_signal_immediate_onComplete() + public async Task ActorPublisher_should_signal_immediate_onComplete() { var probe = CreateTestProbe(); var actorRef = Sys.ActorOf(TestPublisher.Props(probe.Ref)); actorRef.Tell(Complete.Instance); var s = this.CreateManualSubscriberProbe(); ActorPublisher.Create(actorRef).Subscribe(s); - s.ExpectSubscriptionAndComplete(); + + await s.ExpectSubscriptionAndCompleteAsync(); } [Fact] - public void ActorPublisher_should_only_allow_one_subscriber() + public async Task ActorPublisher_should_only_allow_one_subscriber() { var probe = CreateTestProbe(); var actorRef = Sys.ActorOf(TestPublisher.Props(probe.Ref)); var s = this.CreateManualSubscriberProbe(); ActorPublisher.Create(actorRef).Subscribe(s); - s.ExpectSubscription(); + + await s.ExpectSubscriptionAsync(); var s2 = this.CreateManualSubscriberProbe(); ActorPublisher.Create(actorRef).Subscribe(s2); - s2.ExpectSubscriptionAndError() + + (await s2.ExpectSubscriptionAndErrorAsync()) .Should() .BeOfType() .Which.Message.Should() @@ -256,162 +273,160 @@ public void ActorPublisher_should_only_allow_one_subscriber() } [Fact] - public void ActorPublisher_should_not_subscribe_the_same_subscriber_multiple_times() + public async Task ActorPublisher_should_not_subscribe_the_same_subscriber_multiple_times() { var probe = CreateTestProbe(); var actorRef = Sys.ActorOf(TestPublisher.Props(probe.Ref)); var s = this.CreateManualSubscriberProbe(); ActorPublisher.Create(actorRef).Subscribe(s); - s.ExpectSubscription(); + + await s.ExpectSubscriptionAsync(); ActorPublisher.Create(actorRef).Subscribe(s); - s.ExpectError().Message.Should().Be(ReactiveStreamsCompliance.CanNotSubscribeTheSameSubscriberMultipleTimes); + (await s.ExpectErrorAsync()).Message + .Should().Be(ReactiveStreamsCompliance.CanNotSubscribeTheSameSubscriberMultipleTimes); } [Fact] - public void ActorPublisher_should_signal_onComplete_when_actor_is_stopped() + public async Task ActorPublisher_should_signal_onComplete_when_actor_is_stopped() { var probe = CreateTestProbe(); var actorRef = Sys.ActorOf(TestPublisher.Props(probe.Ref)); var s = this.CreateManualSubscriberProbe(); ActorPublisher.Create(actorRef).Subscribe(s); - s.ExpectSubscription(); + + await s.ExpectSubscriptionAsync(); actorRef.Tell(PoisonPill.Instance); - s.ExpectComplete(); + await s.ExpectCompleteAsync(); } [Fact] - public void ActorPublisher_should_work_together_with_Flow_and_ActorSubscriber_using_old_Collect_behaviour() + public async Task ActorPublisher_should_work_together_with_Flow_and_ActorSubscriber_using_old_Collect_behaviour() { var materializer = Sys.Materializer(); - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var probe = CreateTestProbe(); var source = Source.ActorPublisher(Sender.Props); var sink = Sink.ActorSubscriber(Receiver.Props(probe.Ref)); - var t = source.Collect(n => - { - if (n%2 == 0) - return "elem-" + n; - return null; - }).ToMaterialized(sink, Keep.Both).Run(materializer); - var snd = t.Item1; - var rcv = t.Item2; + var (snd, rcv) = source.Collect(n => n%2 == 0, n => "elem-" + n) + .ToMaterialized(sink, Keep.Both).Run(materializer); for (var i = 1; i <= 3; i++) snd.Tell(i); - probe.ExpectMsg("elem-2", TimeSpan.FromMinutes(10)); + await probe.ExpectMsgAsync("elem-2", TimeSpan.FromMinutes(10)); for (var n = 4; n <= 500; n++) { - if (n%19 == 0) - Thread.Sleep(50); // simulate bursts + if (n % 19 == 0) + await Task.Delay(50); // simulate bursts snd.Tell(n); } for (var n = 4; n <= 500; n += 2) - probe.ExpectMsg("elem-" + n); + await probe.ExpectMsgAsync("elem-" + n); Watch(snd); rcv.Tell(PoisonPill.Instance); - ExpectTerminated(snd); + await ExpectTerminatedAsync(snd); }, materializer); } [Fact] - public void ActorPublisher_should_work_together_with_Flow_and_ActorSubscriber() + public async Task ActorPublisher_should_work_together_with_Flow_and_ActorSubscriber() { var materializer = Sys.Materializer(); - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var probe = CreateTestProbe(); var source = Source.ActorPublisher(Sender.Props); var sink = Sink.ActorSubscriber(Receiver.Props(probe.Ref)); - var t = source.Collect( - n => n % 2 == 0, - n => "elem-" + n) + var (snd, rcv) = source.Collect( + n => n % 2 == 0, + n => "elem-" + n) .ToMaterialized(sink, Keep.Both).Run(materializer); - var snd = t.Item1; - var rcv = t.Item2; for (var i = 1; i <= 3; i++) snd.Tell(i); - probe.ExpectMsg("elem-2", TimeSpan.FromMinutes(10)); + await probe.ExpectMsgAsync("elem-2", TimeSpan.FromMinutes(10)); for (var n = 4; n <= 500; n++) { if (n % 19 == 0) - Thread.Sleep(50); // simulate bursts + await Task.Delay(50); // simulate bursts snd.Tell(n); } for (var n = 4; n <= 500; n += 2) - probe.ExpectMsg("elem-" + n); + await probe.ExpectMsgAsync("elem-" + n); Watch(snd); rcv.Tell(PoisonPill.Instance); - ExpectTerminated(snd); + await ExpectTerminatedAsync(snd); }, materializer); } [Fact] - public void ActorPublisher_should_work_in_a_GraphDsl() + public async Task ActorPublisher_should_work_in_a_GraphDsl() { var materializer = Sys.Materializer(); - var probe1 = CreateTestProbe(); - var probe2 = CreateTestProbe(); - - var senderRef1 = ActorOf(Sender.Props); - var source1 = Source.FromPublisher(ActorPublisher.Create(senderRef1)) - .MapMaterializedValue(_ => senderRef1); - - var sink1 = Sink.FromSubscriber(ActorSubscriber.Create(ActorOf(Receiver.Props(probe1.Ref)))) - .MapMaterializedValue(_ => probe1.Ref); - var sink2 = Sink.ActorSubscriber(Receiver.Props(probe2.Ref)) - .MapMaterializedValue(_ => probe2.Ref); - var senderRef2 = RunnableGraph.FromGraph(GraphDsl.Create( - Source.ActorPublisher(Sender.Props), - (builder, source2) => - { - var merge = builder.Add(new Merge(2)); - var bcast = builder.Add(new Broadcast(2)); - - builder.From(source1).To(merge.In(0)); - builder.From(source2.Outlet).To(merge.In(1)); - - builder.From(merge.Out).Via(Flow.Create().Select(i => i.ToString())).To(bcast.In); - - builder.From(bcast.Out(0)).Via(Flow.Create().Select(s => s + "mark")).To(sink1); - builder.From(bcast.Out(1)).To(sink2); - - return ClosedShape.Instance; - })).Run(materializer); - - // the scala test is wrong - const int noOfMessages = 10; - for (var i = 0; i < noOfMessages; i++) + await this.AssertAllStagesStoppedAsync(async () => { - senderRef1.Tell(i); - senderRef2.Tell(i+noOfMessages); - } + var probe1 = CreateTestProbe(); + var probe2 = CreateTestProbe(); + + var senderRef1 = ActorOf(Sender.Props); + var source1 = Source.FromPublisher(ActorPublisher.Create(senderRef1)) + .MapMaterializedValue(_ => senderRef1); + + var sink1 = Sink.FromSubscriber(ActorSubscriber.Create(ActorOf(Receiver.Props(probe1.Ref)))) + .MapMaterializedValue(_ => probe1.Ref); + var sink2 = Sink.ActorSubscriber(Receiver.Props(probe2.Ref)) + .MapMaterializedValue(_ => probe2.Ref); + var senderRef2 = RunnableGraph.FromGraph(GraphDsl.Create( + Source.ActorPublisher(Sender.Props), + (builder, source2) => + { + var merge = builder.Add(new Merge(2)); + var bcast = builder.Add(new Broadcast(2)); + + builder.From(source1).To(merge.In(0)); + builder.From(source2.Outlet).To(merge.In(1)); + + builder.From(merge.Out).Via(Flow.Create().Select(i => i.ToString())).To(bcast.In); + + builder.From(bcast.Out(0)).Via(Flow.Create().Select(s => s + "mark")).To(sink1); + builder.From(bcast.Out(1)).To(sink2); + + return ClosedShape.Instance; + })).Run(materializer); + + // the scala test is wrong + const int noOfMessages = 10; + for (var i = 0; i < noOfMessages; i++) + { + senderRef1.Tell(i); + senderRef2.Tell(i+noOfMessages); + } - var probe1Messages = new List(noOfMessages*2); - var probe2Messages = new List(noOfMessages*2); - for (var i = 0; i < noOfMessages * 2; i++) - { - probe1Messages.Add(probe1.ExpectMsg()); - probe2Messages.Add(probe2.ExpectMsg()); - } - probe1Messages.Should().BeEquivalentTo(Enumerable.Range(0, noOfMessages * 2).Select(i => i + "mark")); - probe2Messages.Should().BeEquivalentTo(Enumerable.Range(0, noOfMessages * 2).Select(i => i.ToString())); + var probe1Messages = new List(noOfMessages*2); + var probe2Messages = new List(noOfMessages*2); + for (var i = 0; i < noOfMessages * 2; i++) + { + probe1Messages.Add(await probe1.ExpectMsgAsync()); + probe2Messages.Add(await probe2.ExpectMsgAsync()); + } + probe1Messages.Should().BeEquivalentTo(Enumerable.Range(0, noOfMessages * 2).Select(i => i + "mark")); + probe2Messages.Should().BeEquivalentTo(Enumerable.Range(0, noOfMessages * 2).Select(i => i.ToString())); + }, materializer); } [Fact(Skip = "Racy")] - public void ActorPublisher_should_be_able_to_define_a_subscription_timeout_after_which_it_should_shut_down() + public async Task ActorPublisher_should_be_able_to_define_a_subscription_timeout_after_which_it_should_shut_down() { var materializer = Sys.Materializer(); - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var timeout = TimeSpan.FromMilliseconds(150); var a = ActorOf(TimeoutingPublisher.Props(TestActor, timeout)); @@ -423,17 +438,17 @@ public void ActorPublisher_should_be_able_to_define_a_subscription_timeout_after // now subscribers will already be rejected, while the actor could perform some clean-up var sub = this.CreateManualSubscriberProbe(); pub.Subscribe(sub); - sub.ExpectSubscriptionAndError(); + await sub.ExpectSubscriptionAndErrorAsync(); - ExpectMsg("cleaned-up"); + await ExpectMsgAsync("cleaned-up"); // termination is triggered by user code Watch(a); - ExpectTerminated(a); + await ExpectTerminatedAsync(a); }, materializer); } [Fact] - public void ActorPublisher_should_be_able_to_define_a_subscription_timeout_which_is_cancelled_by_the_first_incoming_Subscriber() + public async Task ActorPublisher_should_be_able_to_define_a_subscription_timeout_which_is_cancelled_by_the_first_incoming_Subscriber() { var timeout = TimeSpan.FromMilliseconds(500); var sub = this.CreateManualSubscriberProbe(); @@ -442,68 +457,78 @@ public void ActorPublisher_should_be_able_to_define_a_subscription_timeout_which // subscribe right away, should cancel subscription-timeout pub.Subscribe(sub); - sub.ExpectSubscription(); + await sub.ExpectSubscriptionAsync(); - ExpectNoMsg(TimeSpan.FromSeconds(1)); + await ExpectNoMsgAsync(TimeSpan.FromSeconds(1)); } [Fact] - public void ActorPublisher_should_use_dispatcher_from_materializer_settings() + public async Task ActorPublisher_should_use_dispatcher_from_materializer_settings() { var materializer = ActorMaterializer.Create(Sys, Sys.Materializer().Settings.WithDispatcher("my-dispatcher1")); - var s = this.CreateManualSubscriberProbe(); - var actorRef = Source.ActorPublisher(TestPublisher.Props(TestActor, useTestDispatcher: false)) + await this.AssertAllStagesStoppedAsync(async () => + { + var s = this.CreateManualSubscriberProbe(); + var actorRef = Source.ActorPublisher(TestPublisher.Props(TestActor, useTestDispatcher: false)) .To(Sink.FromSubscriber(s)) .Run(materializer); - actorRef.Tell(ThreadName.Instance); - ExpectMsg().Should().Contain("my-dispatcher1"); + actorRef.Tell(ThreadName.Instance); + (await ExpectMsgAsync()).Should().Contain("my-dispatcher1"); + }, materializer); } [Fact] - public void ActorPublisher_should_use_dispatcher_from_operation_attributes() + public async Task ActorPublisher_should_use_dispatcher_from_operation_attributes() { var materializer = Sys.Materializer(); - var s = this.CreateManualSubscriberProbe(); - var actorRef = Source.ActorPublisher(TestPublisher.Props(TestActor, useTestDispatcher: false)) - .WithAttributes(ActorAttributes.CreateDispatcher("my-dispatcher1")) - .To(Sink.FromSubscriber(s)) - .Run(materializer); + await this.AssertAllStagesStoppedAsync(async () => + { + var s = this.CreateManualSubscriberProbe(); + var actorRef = Source.ActorPublisher(TestPublisher.Props(TestActor, useTestDispatcher: false)) + .WithAttributes(ActorAttributes.CreateDispatcher("my-dispatcher1")) + .To(Sink.FromSubscriber(s)) + .Run(materializer); - actorRef.Tell(ThreadName.Instance); - ExpectMsg().Should().Contain("my-dispatcher1"); + actorRef.Tell(ThreadName.Instance); + (await ExpectMsgAsync()).Should().Contain("my-dispatcher1"); + }, materializer); } [Fact] - public void ActorPublisher_should_use_dispatcher_from_props() + public async Task ActorPublisher_should_use_dispatcher_from_props() { var materializer = Sys.Materializer(); - var s = this.CreateManualSubscriberProbe(); - var actorRef = Source.ActorPublisher(TestPublisher.Props(TestActor, useTestDispatcher: false).WithDispatcher("my-dispatcher1")) - .WithAttributes(ActorAttributes.CreateDispatcher("my-dispatcher2")) - .To(Sink.FromSubscriber(s)) - .Run(materializer); + await this.AssertAllStagesStoppedAsync(async () => + { + var s = this.CreateManualSubscriberProbe(); + var actorRef = Source.ActorPublisher(TestPublisher.Props(TestActor, useTestDispatcher: false).WithDispatcher("my-dispatcher1")) + .WithAttributes(ActorAttributes.CreateDispatcher("my-dispatcher2")) + .To(Sink.FromSubscriber(s)) + .Run(materializer); - actorRef.Tell(ThreadName.Instance); - ExpectMsg().Should().Contain("my-dispatcher1"); + actorRef.Tell(ThreadName.Instance); + (await ExpectMsgAsync()).Should().Contain("my-dispatcher1"); + }, materializer); } [Fact] - public void ActorPublisher_should_handle_stash() + public async Task ActorPublisher_should_handle_stash() { var probe = CreateTestProbe(); var actorRef = Sys.ActorOf(TestPublisherWithStash.Props(probe.Ref)); var p = new ActorPublisherImpl(actorRef); var s = this.CreateSubscriberProbe(); p.Subscribe(s); - s.Request(2); - s.Request(3); + + await s.RequestAsync(2); + await s.RequestAsync(3); actorRef.Tell("unstash"); - probe.ExpectMsg(new TotalDemand(5)); - probe.ExpectMsg(new TotalDemand(5)); - s.Request(4); - probe.ExpectMsg(new TotalDemand(9)); - s.Cancel(); + await probe.ExpectMsgAsync(new TotalDemand(5)); + await probe.ExpectMsgAsync(new TotalDemand(5)); + await s.RequestAsync(4); + await probe.ExpectMsgAsync(new TotalDemand(9)); + await s.CancelAsync(); } } @@ -524,16 +549,34 @@ public TestPublisher(IActorRef probe) protected override bool Receive(object message) { - return message.Match() - .With(request => _probe.Tell(new TotalDemand(TotalDemand))) - .With(produce => OnNext(produce.Elem)) - .With(err => OnError(new Exception(err.Reason))) - .With(err => OnErrorThenStop(new Exception(err.Reason))) - .With(OnComplete) - .With(OnCompleteThenStop) - .With(() => { throw new Exception("boom"); }) - .With(()=>_probe.Tell(Context.Props.Dispatcher /*Thread.CurrentThread.Name*/)) // TODO fix me when thread name is set by dispatcher - .WasHandled; + switch (message) + { + case Request request: + _probe.Tell(new TotalDemand(TotalDemand)); + return true; + case Produce produce: + OnNext(produce.Elem); + return true; + case Err err: + OnError(new Exception(err.Reason)); + return true; + case ErrThenStop err: + OnErrorThenStop(new Exception(err.Reason)); + return true; + case Complete _: + OnComplete(); + return true; + case CompleteThenStop _: + OnCompleteThenStop(); + return true; + case Boom _: + throw new Exception("boom"); + case ThreadName _: + _probe.Tell(Context.Props.Dispatcher /*Thread.CurrentThread.Name*/); // TODO fix me when thread name is set by dispatcher + return true; + default: + return false; + } } } @@ -573,9 +616,9 @@ internal class Sender : Actors.ActorPublisher protected override bool Receive(object message) { - return message.Match() - .With(i => - { + switch (message) + { + case int i: if (_buffer.Count == 0 && TotalDemand > 0) OnNext(i); else @@ -583,10 +626,16 @@ protected override bool Receive(object message) _buffer = _buffer.Add(i); DeliverBuffer(); } - }) - .With(DeliverBuffer) - .With(() => Context.Stop(Self)) - .WasHandled; + return true; + case Request _: + DeliverBuffer(); + return true; + case Cancel _: + Context.Stop(Self); + return true; + default: + return false; + } } private void DeliverBuffer() @@ -628,15 +677,19 @@ public TimeoutingPublisher(IActorRef probe, TimeSpan timeout) protected override bool Receive(object message) { - return message.Match() - .With(() => OnNext(1)) - .With(() => - { + switch (message) + { + case Request _: + OnNext(1); + return true; + case SubscriptionTimeoutExceeded _: _probe.Tell("timed-out"); Context.System.Scheduler.ScheduleTellOnce(SubscriptionTimeout, _probe, "cleaned-up", Self); Context.System.Scheduler.ScheduleTellOnce(SubscriptionTimeout, Self, PoisonPill.Instance, Nobody.Instance); - }) - .WasHandled; + return true; + default: + return false; + } } } @@ -656,9 +709,11 @@ public Receiver(IActorRef probe) protected override bool Receive(object message) { - return message.Match() - .With(next => _probe.Tell(next.Element)) - .WasHandled; + if (!(message is OnNext next)) + return false; + + _probe.Tell(next.Element); + return true; } }