Skip to content

Commit

Permalink
Convert Akka.Streams.Tests to async - Actor.ActorSubscriberSpec (#5992)
Browse files Browse the repository at this point in the history
Co-authored-by: Aaron Stannard <aaron@petabridge.com>
  • Loading branch information
Arkatufus and Aaronontheweb committed Jun 20, 2022
1 parent 64b8daa commit 5ca2166
Showing 1 changed file with 162 additions and 101 deletions.
263 changes: 162 additions & 101 deletions src/core/Akka.Streams.Tests/Actor/ActorSubscriberSpec.cs
Expand Up @@ -8,14 +8,13 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
using Akka.Routing;
using Akka.Streams.Actors;
using Akka.Streams.Dsl;
using Akka.Streams.TestKit;
using Akka.TestKit;
using Akka.Util.Internal;
using FluentAssertions;
using Reactive.Streams;
using Xunit;
Expand All @@ -34,114 +33,156 @@ public ActorSubscriberSpec(ITestOutputHelper helper)
}

[Fact]
public void ActorSubscriber_should_receive_requested_elements()
public async Task ActorSubscriber_should_receive_requested_elements()
{
var actorRef = Source.From(new[] { 1, 2, 3 })
.RunWith(Sink.ActorSubscriber<int>(ManualSubscriber.Props(TestActor)), Sys.Materializer());

ExpectNoMsg(200);
actorRef.Tell("ready"); //requesting 2
ExpectMsg<OnNext>().Element.Should().Be(1);
ExpectMsg<OnNext>().Element.Should().Be(2);
ExpectNoMsg(200);
actorRef.Tell("ready");
ExpectMsg<OnNext>().Element.Should().Be(3);
ExpectMsg<OnComplete>();
var materializer = Sys.Materializer();
await this.AssertAllStagesStoppedAsync(async () =>
{
var actorRef = Source.From(new[] { 1, 2, 3 })
.RunWith(Sink.ActorSubscriber<int>(ManualSubscriber.Props(TestActor)), materializer);
await ExpectNoMsgAsync(200);
actorRef.Tell("ready"); //requesting 2
(await ExpectMsgAsync<OnNext>()).Element.Should().Be(1);
(await ExpectMsgAsync<OnNext>()).Element.Should().Be(2);
await ExpectNoMsgAsync(200);
actorRef.Tell("ready");
(await ExpectMsgAsync<OnNext>()).Element.Should().Be(3);
await ExpectMsgAsync<OnComplete>();
}, materializer);
}

[Fact]
public void ActorSubscriber_should_signal_error()
public async Task ActorSubscriber_should_signal_error()
{
var e = new Exception("simulated");
var actorRef = Source.FromEnumerator<int>(() => { throw e; })
.RunWith(Sink.ActorSubscriber<int>(ManualSubscriber.Props(TestActor)), Sys.Materializer());
actorRef.Tell("ready");
var materializer = Sys.Materializer();
await this.AssertAllStagesStoppedAsync(async () =>
{
var e = new Exception("simulated");
var actorRef = Source.FromEnumerator<int>(() => throw e)
.RunWith(Sink.ActorSubscriber<int>(ManualSubscriber.Props(TestActor)), materializer);
actorRef.Tell("ready");
ExpectMsg<OnError>().Cause.Should().Be(e);
(await ExpectMsgAsync<OnError>()).Cause.Should().Be(e);
}, materializer);
}

[Fact]
public void ActorSubscriberSpec_should_remember_requested_after_restart()
public async Task ActorSubscriberSpec_should_remember_requested_after_restart()
{
// creating actor with default supervision, because stream supervisor default strategy is to
var actorRef = Sys.ActorOf(ManualSubscriber.Props(TestActor));
Source.From(Enumerable.Range(1, 7))
.RunWith(Sink.FromSubscriber(new ActorSubscriberImpl<int>(actorRef)), Sys.Materializer());
actorRef.Tell("ready");
ExpectMsg<OnNext>().Element.Should().Be(1);
ExpectMsg<OnNext>().Element.Should().Be(2);
ExpectNoMsg(200);
actorRef.Tell("boom");
actorRef.Tell("ready");
actorRef.Tell("ready");
actorRef.Tell("boom");
Enumerable.Range(3, 4).ForEach(n => ExpectMsg<OnNext>().Element.Should().Be(n));
ExpectNoMsg(200);
actorRef.Tell("ready");
ExpectMsg<OnNext>().Element.Should().Be(7);
ExpectMsg<OnComplete>();
var materializer = Sys.Materializer();
await this.AssertAllStagesStoppedAsync(async () =>
{
// creating actor with default supervision, because stream supervisor default strategy is to
var actorRef = Sys.ActorOf(ManualSubscriber.Props(TestActor));
Source.From(Enumerable.Range(1, 7))
.RunWith(Sink.FromSubscriber(new ActorSubscriberImpl<int>(actorRef)), materializer);
actorRef.Tell("ready");
(await ExpectMsgAsync<OnNext>()).Element.Should().Be(1);
(await ExpectMsgAsync<OnNext>()).Element.Should().Be(2);
await ExpectNoMsgAsync(200);
actorRef.Tell("boom");
actorRef.Tell("ready");
actorRef.Tell("ready");
actorRef.Tell("boom");
foreach (var n in Enumerable.Range(3, 4))
{
(await ExpectMsgAsync<OnNext>()).Element.Should().Be(n);
}
await ExpectNoMsgAsync(200);
actorRef.Tell("ready");
(await ExpectMsgAsync<OnNext>()).Element.Should().Be(7);
await ExpectMsgAsync<OnComplete>();
}, materializer);
}

[Fact]
public void ActorSubscriberSpec_should_not_deliver_more_after_cancel()
public async Task ActorSubscriberSpec_should_not_deliver_more_after_cancel()
{
var actorRef = Source.From(Enumerable.Range(1, 5))
.RunWith(Sink.ActorSubscriber<int>(ManualSubscriber.Props(TestActor)), Sys.Materializer());
actorRef.Tell("ready");
ExpectMsg<OnNext>().Element.Should().Be(1);
ExpectMsg<OnNext>().Element.Should().Be(2);
actorRef.Tell("requestAndCancel");
ExpectNoMsg(200);
var materializer = Sys.Materializer();
await this.AssertAllStagesStoppedAsync(async () =>
{
var actorRef = Source.From(Enumerable.Range(1, 5))
.RunWith(Sink.ActorSubscriber<int>(ManualSubscriber.Props(TestActor)), materializer);
actorRef.Tell("ready");
(await ExpectMsgAsync<OnNext>()).Element.Should().Be(1);
(await ExpectMsgAsync<OnNext>()).Element.Should().Be(2);
actorRef.Tell("requestAndCancel");
await ExpectNoMsgAsync(200);
}, materializer);
}

[Fact]
public void ActorSubscriberSpec_should_terminate_after_cancel()
public async Task ActorSubscriberSpec_should_terminate_after_cancel()
{
var actorRef = Source.From(Enumerable.Range(1, 5))
.RunWith(Sink.ActorSubscriber<int>(ManualSubscriber.Props(TestActor)), Sys.Materializer());
Watch(actorRef);
actorRef.Tell("requestAndCancel");
ExpectTerminated(actorRef);
var materializer = Sys.Materializer();
await this.AssertAllStagesStoppedAsync(async () =>
{
var actorRef = Source.From(Enumerable.Range(1, 5))
.RunWith(Sink.ActorSubscriber<int>(ManualSubscriber.Props(TestActor)), materializer);
Watch(actorRef);
actorRef.Tell("requestAndCancel");
await ExpectTerminatedAsync(actorRef);
}, materializer);
}

[Fact]
public void ActorSubscriberSpec_should_cancel_incoming_subscription_when_cancel_was_called_before_it_arrived()
public async Task ActorSubscriberSpec_should_cancel_incoming_subscription_when_cancel_was_called_before_it_arrived()
{
var actorRef = Sys.ActorOf(ImmediatelyCancelledSubscriber.Props(TestActor));
var sub = new ActorSubscriberImpl<object>(actorRef);
Watch(actorRef);
ExpectNoMsg(200);
await ExpectNoMsgAsync(200);
sub.OnSubscribe(new CancelSubscription(TestActor));
ExpectMsg("cancel");
ExpectTerminated(actorRef, TimeSpan.FromMilliseconds(200));
await ExpectMsgAsync("cancel");
await ExpectTerminatedAsync(actorRef, TimeSpan.FromMilliseconds(200));
}

[Fact]
public void ActorSubscriberSpec_should_work_with_OneByOneRequestStrategy()
public async Task ActorSubscriberSpec_should_work_with_OneByOneRequestStrategy()
{
Source.From(Enumerable.Range(1, 17))
.RunWith(Sink.ActorSubscriber<int>(RequestStrategySubscriber.Props(TestActor, OneByOneRequestStrategy.Instance)), Sys.Materializer());
Enumerable.Range(1, 17).ForEach(n => ExpectMsg<OnNext>().Element.Should().Be(n));
ExpectMsg<OnComplete>();
var materializer = Sys.Materializer();
await this.AssertAllStagesStoppedAsync(async () =>
{
Source.From(Enumerable.Range(1, 17))
.RunWith(Sink.ActorSubscriber<int>(RequestStrategySubscriber.Props(TestActor, OneByOneRequestStrategy.Instance)), materializer);
foreach (var n in Enumerable.Range(1, 17))
{
(await ExpectMsgAsync<OnNext>()).Element.Should().Be(n);
}
await ExpectMsgAsync<OnComplete>();
}, materializer);
}

[Fact]
public void ActorSubscriberSpec_should_should_work_with_WatermarkRequestStrategy()
public async Task ActorSubscriberSpec_should_should_work_with_WatermarkRequestStrategy()
{
Source.From(Enumerable.Range(1, 17))
.RunWith(Sink.ActorSubscriber<int>(RequestStrategySubscriber.Props(TestActor, new WatermarkRequestStrategy(highWatermark: 10))), Sys.Materializer());
Enumerable.Range(1, 17).ForEach(n => ExpectMsg<OnNext>().Element.Should().Be(n));
ExpectMsg<OnComplete>();
var materializer = Sys.Materializer();
await this.AssertAllStagesStoppedAsync(async () =>
{
Source.From(Enumerable.Range(1, 17))
.RunWith(Sink.ActorSubscriber<int>(RequestStrategySubscriber.Props(TestActor, new WatermarkRequestStrategy(highWatermark: 10))), materializer);
foreach (var n in Enumerable.Range(1, 17))
{
(await ExpectMsgAsync<OnNext>()).Element.Should().Be(n);
}
await ExpectMsgAsync<OnComplete>();
}, materializer);
}

[Fact]
public void ActorSubscriberSpec_should_support_custom_max_in_flight_request_strategy_with_child_workers()
public async Task ActorSubscriberSpec_should_support_custom_max_in_flight_request_strategy_with_child_workers()
{
var n = 117;
Source.From(Enumerable.Range(1, n))
.Select(i => new Msg(i, TestActor))
.RunWith(Sink.ActorSubscriber<Msg>(Streamer.Props), Sys.Materializer());
ReceiveN(n).Should().BeEquivalentTo(Enumerable.Range(1, n).Select(i => new Done(i)));
var materializer = Sys.Materializer();
await this.AssertAllStagesStoppedAsync(async () =>
{
const int n = 117;
Source.From(Enumerable.Range(1, n))
.Select(i => new Msg(i, TestActor))
.RunWith(Sink.ActorSubscriber<Msg>(Streamer.Props), Sys.Materializer());
(await ReceiveNAsync(n).ToListAsync())
.Should().BeEquivalentTo(Enumerable.Range(1, n).Select(i => new Done(i)));
}, materializer);
}

}
Expand All @@ -163,25 +204,37 @@ public ManualSubscriber(IActorRef probe)

protected override bool Receive(object message)
{
return message.Match()
.With<OnNext>(_probe.Tell)
.With<OnComplete>(_probe.Tell)
.With<OnError>(_probe.Tell)
.With<string>(s =>
{
if (s.Equals("ready"))
Request(2);
else if (s.Equals("boom"))
throw new Exception(s);
else if (s.Equals("requestAndCancel"))
switch (message)
{
case OnNext msg:
_probe.Tell(msg);
return true;
case OnComplete msg:
_probe.Tell(msg);
return true;
case OnError msg:
_probe.Tell(msg);
return true;
case string s:
switch (s)
{
Request(1);
Cancel();
case "ready":
Request(2);
break;
case "boom":
throw new Exception(s);
case "requestAndCancel":
Request(1);
Cancel();
break;
case "cancel":
Cancel();
break;
}
else if (s.Equals("cancel"))
Cancel();
})
.WasHandled;
return true;
default:
return false;
}
}
}

Expand Down Expand Up @@ -216,10 +269,17 @@ public RequestStrategySubscriber(IActorRef probe, IRequestStrategy strat)

protected override bool Receive(object message)
{
return message.Match()
.With<OnNext>(_probe.Tell)
.With<OnComplete>(_probe.Tell)
.WasHandled;
switch (message)
{
case OnNext msg:
_probe.Tell(msg);
return true;
case OnComplete msg:
_probe.Tell(msg);
return true;
default:
return false;
}
}

public override IRequestStrategy RequestStrategy { get; }
Expand Down Expand Up @@ -322,21 +382,22 @@ public Streamer()

protected override bool Receive(object message)
{
return message.Match()
.With<OnNext>(next =>
{
switch (message)
{
case OnNext next:
var msg = (Msg)next.Element;
_queue.Add(msg.Id, msg.ReplyTo);
if (_queue.Count > 10)
throw new InvalidOperationException($"queued too many: {_queue.Count}");
_router.Route(new Work(msg.Id), Self);
})
.With<Reply>(reply =>
{
return true;
case Reply reply:
_queue[reply.Id].Tell(new Done(reply.Id));
_queue.Remove(reply.Id);
})
.WasHandled;
return true;
default:
return false;
}
}
}

Expand Down

0 comments on commit 5ca2166

Please sign in to comment.