From 8fbd281cd94e65d9420db89eb60e87e0bdc254a1 Mon Sep 17 00:00:00 2001 From: Ebere Abanonu Date: Fri, 31 Mar 2023 14:39:16 +0100 Subject: [PATCH] [58-74] `ObservableSinkSpec` (#6606) * [58-74] `ObservableSinkSpec` * Changes to `async` TestKit --- .../Dsl/ObservableSinkSpec.cs | 73 +++++++++---------- 1 file changed, 33 insertions(+), 40 deletions(-) diff --git a/src/core/Akka.Streams.Tests/Dsl/ObservableSinkSpec.cs b/src/core/Akka.Streams.Tests/Dsl/ObservableSinkSpec.cs index 0b4005cf224..5ae008cb7fc 100644 --- a/src/core/Akka.Streams.Tests/Dsl/ObservableSinkSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/ObservableSinkSpec.cs @@ -6,14 +6,13 @@ //----------------------------------------------------------------------- using System; -using System.Collections.Generic; +using System.Threading.Tasks; using Akka.Actor; using Akka.Streams.Actors; using Akka.Streams.Dsl; using Akka.Streams.TestKit; using Akka.TestKit; using Akka.TestKit.Xunit2.Attributes; -using FluentAssertions.Execution; using Xunit; using Xunit.Abstractions; @@ -37,10 +36,10 @@ public TestObserver(AkkaSpec spec) public void OnError(Exception error) => _probe.Ref.Tell(new OnError(error), ActorRefs.NoSender); public void OnCompleted() => _probe.Ref.Tell(OnComplete.Instance, ActorRefs.NoSender); - public T ExpectEvent(T expected) => (T)_probe.ExpectMsg(x => Equals(x.Element, expected)).Element; - public TError ExpectError(TError error) where TError : Exception => (TError)_probe.ExpectMsg(x => Equals(x.Cause, error)).Cause; - public void ExpectCompleted() => _probe.ExpectMsg(); - public void ExpectNoMsg() => _probe.ExpectNoMsg(); + public async Task ExpectEventAsync(T expected) => (T)(await _probe.ExpectMsgAsync(x => Equals(x.Element, expected))).Element; + public async Task ExpectErrorAsync(TError error) where TError : Exception => (TError)(await _probe.ExpectMsgAsync(x => Equals(x.Cause, error))).Cause; + public async Task ExpectCompletedAsync() => await _probe.ExpectMsgAsync(); + public async Task ExpectNoMsgAsync() => await _probe.ExpectNoMsgAsync(); } #endregion @@ -55,10 +54,9 @@ public ObservableSinkSpec(ITestOutputHelper helper) : base(SpecConfig, helper) } [LocalFact(SkipLocal = "Racy on Azure DevOps")] - public void An_ObservableSink_must_allow_the_same_observer_to_be_subscribed_only_once() + public async Task An_ObservableSink_must_allow_the_same_observer_to_be_subscribed_only_once() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var probe = new TestObserver(this); var observable = Source.From(new[] { 1, 2, 3 }) .RunWith(Sink.AsObservable(), Materializer); @@ -68,20 +66,18 @@ public void An_ObservableSink_must_allow_the_same_observer_to_be_subscribed_only d1.ShouldBe(d2); - probe.ExpectEvent(1); - probe.ExpectEvent(2); - probe.ExpectEvent(3); - probe.ExpectCompleted(); - probe.ExpectNoMsg(); - + await probe.ExpectEventAsync(1); + await probe.ExpectEventAsync(2); + await probe.ExpectEventAsync(3); + await probe.ExpectCompletedAsync(); + await probe.ExpectNoMsgAsync(); }, Materializer); } [LocalFact(SkipLocal = "Racy on Azure DevOps")] - public void An_ObservableSink_must_propagate_events_to_all_observers() + public async Task An_ObservableSink_must_propagate_events_to_all_observers() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var probe1 = new TestObserver(this); var probe2 = new TestObserver(this); var observable = Source.From(new[] { 1, 2 }) @@ -90,24 +86,22 @@ public void An_ObservableSink_must_propagate_events_to_all_observers() var d1 = observable.Subscribe(probe1); var d2 = observable.Subscribe(probe2); - probe1.ExpectEvent(1); - probe1.ExpectEvent(2); - probe1.ExpectCompleted(); - probe1.ExpectNoMsg(); - - probe2.ExpectEvent(1); - probe2.ExpectEvent(2); - probe2.ExpectCompleted(); - probe2.ExpectNoMsg(); + await probe1.ExpectEventAsync(1); + await probe1.ExpectEventAsync(2); + await probe1.ExpectCompletedAsync(); + await probe1.ExpectNoMsgAsync(); + await probe2.ExpectEventAsync(1); + await probe2.ExpectEventAsync(2); + await probe2.ExpectCompletedAsync(); + await probe2.ExpectNoMsgAsync(); }, Materializer); } [LocalFact(SkipLocal = "Racy on Azure DevOps")] - public void An_ObservableSink_must_propagate_error_to_all_observers() + public async Task An_ObservableSink_must_propagate_error_to_all_observers() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var e = new Exception("boom"); var probe1 = new TestObserver(this); var probe2 = new TestObserver(this); @@ -117,17 +111,16 @@ public void An_ObservableSink_must_propagate_error_to_all_observers() var d1 = observable.Subscribe(probe1); var d2 = observable.Subscribe(probe2); - probe1.ExpectError(e); - probe1.ExpectNoMsg(); - - probe2.ExpectError(e); - probe2.ExpectNoMsg(); + await probe1.ExpectErrorAsync(e); + await probe1.ExpectNoMsgAsync(); + await probe2.ExpectErrorAsync(e); + await probe2.ExpectNoMsgAsync(); }, Materializer); } [Fact] - public void An_ObservableSink_subscriber_must_be_disposable() + public async Task An_ObservableSink_subscriber_must_be_disposable() { var probe = new TestObserver(this); var tuple = Source.Queue(1, OverflowStrategy.DropHead) @@ -142,21 +135,21 @@ public void An_ObservableSink_subscriber_must_be_disposable() t.Wait(TimeSpan.FromSeconds(1)).ShouldBeTrue(); t.Result.ShouldBe(QueueOfferResult.Enqueued.Instance); - probe.ExpectEvent(1); + await probe.ExpectEventAsync(1); t = queue.OfferAsync(2); t.Wait(TimeSpan.FromSeconds(1)).ShouldBeTrue(); t.Result.ShouldBe(QueueOfferResult.Enqueued.Instance); - probe.ExpectEvent(2); + await probe.ExpectEventAsync(2); d1.Dispose(); t = queue.OfferAsync(3); t.Wait(TimeSpan.FromSeconds(1)).ShouldBeTrue(); - probe.ExpectCompleted(); - probe.ExpectNoMsg(); + await probe.ExpectCompletedAsync(); + await probe.ExpectNoMsgAsync(); } } }