Skip to content

Commit

Permalink
[58-74] ObservableSinkSpec (#6606)
Browse files Browse the repository at this point in the history
* [58-74] `ObservableSinkSpec`

* Changes to `async` TestKit
  • Loading branch information
eaba committed Mar 31, 2023
1 parent 5dab14b commit 8fbd281
Showing 1 changed file with 33 additions and 40 deletions.
73 changes: 33 additions & 40 deletions src/core/Akka.Streams.Tests/Dsl/ObservableSinkSpec.cs
Expand Up @@ -6,14 +6,13 @@
//-----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Streams.Actors;
using Akka.Streams.Dsl;
using Akka.Streams.TestKit;
using Akka.TestKit;
using Akka.TestKit.Xunit2.Attributes;
using FluentAssertions.Execution;
using Xunit;
using Xunit.Abstractions;

Expand All @@ -37,10 +36,10 @@ public TestObserver(AkkaSpec spec)
public void OnError(Exception error) => _probe.Ref.Tell(new OnError(error), ActorRefs.NoSender);
public void OnCompleted() => _probe.Ref.Tell(OnComplete.Instance, ActorRefs.NoSender);

public T ExpectEvent(T expected) => (T)_probe.ExpectMsg<OnNext>(x => Equals(x.Element, expected)).Element;
public TError ExpectError<TError>(TError error) where TError : Exception => (TError)_probe.ExpectMsg<OnError>(x => Equals(x.Cause, error)).Cause;
public void ExpectCompleted() => _probe.ExpectMsg<OnComplete>();
public void ExpectNoMsg() => _probe.ExpectNoMsg();
public async Task<T> ExpectEventAsync(T expected) => (T)(await _probe.ExpectMsgAsync<OnNext>(x => Equals(x.Element, expected))).Element;
public async Task<TError> ExpectErrorAsync<TError>(TError error) where TError : Exception => (TError)(await _probe.ExpectMsgAsync<OnError>(x => Equals(x.Cause, error))).Cause;
public async Task ExpectCompletedAsync() => await _probe.ExpectMsgAsync<OnComplete>();
public async Task ExpectNoMsgAsync() => await _probe.ExpectNoMsgAsync();
}

#endregion
Expand All @@ -55,10 +54,9 @@ public ObservableSinkSpec(ITestOutputHelper helper) : base(SpecConfig, helper)
}

[LocalFact(SkipLocal = "Racy on Azure DevOps")]
public void An_ObservableSink_must_allow_the_same_observer_to_be_subscribed_only_once()
public async Task An_ObservableSink_must_allow_the_same_observer_to_be_subscribed_only_once()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var probe = new TestObserver<int>(this);
var observable = Source.From(new[] { 1, 2, 3 })
.RunWith(Sink.AsObservable<int>(), Materializer);
Expand All @@ -68,20 +66,18 @@ public void An_ObservableSink_must_allow_the_same_observer_to_be_subscribed_only
d1.ShouldBe(d2);
probe.ExpectEvent(1);
probe.ExpectEvent(2);
probe.ExpectEvent(3);
probe.ExpectCompleted();
probe.ExpectNoMsg();
await probe.ExpectEventAsync(1);
await probe.ExpectEventAsync(2);
await probe.ExpectEventAsync(3);
await probe.ExpectCompletedAsync();
await probe.ExpectNoMsgAsync();
}, Materializer);
}

[LocalFact(SkipLocal = "Racy on Azure DevOps")]
public void An_ObservableSink_must_propagate_events_to_all_observers()
public async Task An_ObservableSink_must_propagate_events_to_all_observers()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var probe1 = new TestObserver<int>(this);
var probe2 = new TestObserver<int>(this);
var observable = Source.From(new[] { 1, 2 })
Expand All @@ -90,24 +86,22 @@ public void An_ObservableSink_must_propagate_events_to_all_observers()
var d1 = observable.Subscribe(probe1);
var d2 = observable.Subscribe(probe2);
probe1.ExpectEvent(1);
probe1.ExpectEvent(2);
probe1.ExpectCompleted();
probe1.ExpectNoMsg();
probe2.ExpectEvent(1);
probe2.ExpectEvent(2);
probe2.ExpectCompleted();
probe2.ExpectNoMsg();
await probe1.ExpectEventAsync(1);
await probe1.ExpectEventAsync(2);
await probe1.ExpectCompletedAsync();
await probe1.ExpectNoMsgAsync();
await probe2.ExpectEventAsync(1);
await probe2.ExpectEventAsync(2);
await probe2.ExpectCompletedAsync();
await probe2.ExpectNoMsgAsync();
}, Materializer);
}

[LocalFact(SkipLocal = "Racy on Azure DevOps")]
public void An_ObservableSink_must_propagate_error_to_all_observers()
public async Task An_ObservableSink_must_propagate_error_to_all_observers()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var e = new Exception("boom");
var probe1 = new TestObserver<int>(this);
var probe2 = new TestObserver<int>(this);
Expand All @@ -117,17 +111,16 @@ public void An_ObservableSink_must_propagate_error_to_all_observers()
var d1 = observable.Subscribe(probe1);
var d2 = observable.Subscribe(probe2);
probe1.ExpectError(e);
probe1.ExpectNoMsg();
probe2.ExpectError(e);
probe2.ExpectNoMsg();
await probe1.ExpectErrorAsync(e);
await probe1.ExpectNoMsgAsync();
await probe2.ExpectErrorAsync(e);
await probe2.ExpectNoMsgAsync();
}, Materializer);
}

[Fact]
public void An_ObservableSink_subscriber_must_be_disposable()
public async Task An_ObservableSink_subscriber_must_be_disposable()
{
var probe = new TestObserver<int>(this);
var tuple = Source.Queue<int>(1, OverflowStrategy.DropHead)
Expand All @@ -142,21 +135,21 @@ public void An_ObservableSink_subscriber_must_be_disposable()
t.Wait(TimeSpan.FromSeconds(1)).ShouldBeTrue();
t.Result.ShouldBe(QueueOfferResult.Enqueued.Instance);

probe.ExpectEvent(1);
await probe.ExpectEventAsync(1);

t = queue.OfferAsync(2);
t.Wait(TimeSpan.FromSeconds(1)).ShouldBeTrue();
t.Result.ShouldBe(QueueOfferResult.Enqueued.Instance);

probe.ExpectEvent(2);
await probe.ExpectEventAsync(2);

d1.Dispose();

t = queue.OfferAsync(3);
t.Wait(TimeSpan.FromSeconds(1)).ShouldBeTrue();

probe.ExpectCompleted();
probe.ExpectNoMsg();
await probe.ExpectCompletedAsync();
await probe.ExpectNoMsgAsync();
}
}
}

0 comments on commit 8fbd281

Please sign in to comment.