Skip to content

Commit

Permalink
FlowAskSpec: configured test dispatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
Horusiath authored and Danthar committed Aug 13, 2018
1 parent adf4d41 commit 4264c9a
Showing 1 changed file with 34 additions and 10 deletions.
44 changes: 34 additions & 10 deletions src/core/Akka.Streams.Tests/Dsl/FlowAskSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
using Akka.Streams.Dsl;
using Akka.Streams.TestKit;
using Akka.Streams.TestKit.Tests;
Expand Down Expand Up @@ -45,6 +46,8 @@ public bool Equals(Reply other)

public override bool Equals(object obj) => obj is Reply reply && Equals(reply);
public override int GetHashCode() => Payload;

public override string ToString() => $"Reply({Payload})";
}

sealed class Replier : UntypedActor
Expand Down Expand Up @@ -116,10 +119,27 @@ public FailOnAllExcept(int n)

#endregion

private static readonly Config SpecConfig = ConfigurationFactory.ParseString(@"
akka.test.stream-dispatcher {
type = Dispatcher
executor = ""fork-join-executor""
fork-join-executor {
parallelism-min = 8
parallelism-max = 8
}
mailbox-requirement = ""Akka.Dispatch.IUnboundedMessageQueueSemantics""
}
akka.stream {
materializer {
dispatcher = ""akka.test.stream-dispatcher""
}
}");

private readonly IMaterializer _materializer;
private readonly TimeSpan _timeout = 10.Seconds();

public FlowAskSpec(ITestOutputHelper output) : base(output)
public FlowAskSpec(ITestOutputHelper output) : base(SpecConfig, output)
{
_materializer = Sys.Materializer();
}
Expand All @@ -128,7 +148,7 @@ public FlowAskSpec(ITestOutputHelper output) : base(output)
public void Flow_with_ask_must_produce_asked_elements() => this.AssertAllStagesStopped(() =>
{
var replyOnInts =
Sys.ActorOf(Props.Create(() => new Replier()),
Sys.ActorOf(Props.Create(() => new Replier()).WithDispatcher("akka.test.stream-dispatcher"),
"replyOnInts");
var c = this.CreateManualSubscriberProbe<Reply>();
Expand All @@ -149,7 +169,7 @@ public FlowAskSpec(ITestOutputHelper output) : base(output)
[Fact]
public void Flow_with_ask_must_produce_asked_elements_for_simple_ask() => this.AssertAllStagesStopped(() =>
{
var replyOnInts = Sys.ActorOf(Props.Create(() => new Replier()), "replyOnInts");
var replyOnInts = Sys.ActorOf(Props.Create(() => new Replier()).WithDispatcher("akka.test.stream-dispatcher"), "replyOnInts");
var c = this.CreateManualSubscriberProbe<Reply>();
var p = Source.From(Enumerable.Range(1, 3))
Expand All @@ -169,7 +189,7 @@ public FlowAskSpec(ITestOutputHelper output) : base(output)
[Fact]
public void Flow_with_ask_must_produce_asked_elements_when_response_is_Status_Success() => this.AssertAllStagesStopped(() =>
{
var statusReplier = Sys.ActorOf(Props.Create(() => new StatusReplier()), "statusReplier");
var statusReplier = Sys.ActorOf(Props.Create(() => new StatusReplier()).WithDispatcher("akka.test.stream-dispatcher"), "statusReplier");
var c = this.CreateManualSubscriberProbe<Reply>();
var p = Source.From(Enumerable.Range(1, 3))
Expand All @@ -189,7 +209,7 @@ public FlowAskSpec(ITestOutputHelper output) : base(output)
[Fact]
public void Flow_with_ask_must_produce_future_elements_in_order()
{
var replyRandomDelays = Sys.ActorOf(Props.Create(() => new RandomDelaysReplier()), "replyRandomDelays");
var replyRandomDelays = Sys.ActorOf(Props.Create(() => new RandomDelaysReplier()).WithDispatcher("akka.test.stream-dispatcher"), "replyRandomDelays");
var c = this.CreateManualSubscriberProbe<Reply>();

var p = Source.From(Enumerable.Range(1, 50))
Expand All @@ -207,7 +227,7 @@ public void Flow_with_ask_must_produce_future_elements_in_order()
[Fact]
public void Flow_with_ask_must_signal_ask_timeout_failure() => this.AssertAllStagesStopped(() =>
{
var dontReply = Sys.ActorOf(BlackHoleActor.Props, "dontReply");
var dontReply = Sys.ActorOf(BlackHoleActor.Props.WithDispatcher("akka.test.stream-dispatcher"), "dontReply");
var c = this.CreateManualSubscriberProbe<Reply>();
var p = Source.From(Enumerable.Range(1, 50))
Expand Down Expand Up @@ -239,7 +259,7 @@ public void Flow_with_ask_must_produce_future_elements_in_order()
[Fact]
public void Flow_with_ask_signal_failure_when_target_actor_is_terminated() => this.AssertAllStagesStopped(() =>
{
var r = Sys.ActorOf(Props.Create(() => new Replier()), "replyRandomDelays");
var r = Sys.ActorOf(Props.Create(() => new Replier()).WithDispatcher("akka.test.stream-dispatcher"), "replyRandomDelays");
var done = Source.Maybe<int>()
.Ask<Reply>(r, _timeout, 4)
.RunWith(Sink.Ignore<Reply>(), _materializer);
Expand Down Expand Up @@ -326,7 +346,7 @@ public void Flow_with_ask_must_produce_future_elements_in_order()
[Fact]
public void Flow_with_ask_should_handle_cancel_properly() => this.AssertAllStagesStopped(() =>
{
var dontReply = Sys.ActorOf(BlackHoleActor.Props, "dontReply");
var dontReply = Sys.ActorOf(BlackHoleActor.Props.WithDispatcher("akka.test.stream-dispatcher"), "dontReply");
var pub = this.CreateManualPublisherProbe<int>();
var sub = this.CreateManualSubscriberProbe<Reply>();
Expand All @@ -340,8 +360,12 @@ public void Flow_with_ask_must_produce_future_elements_in_order()
upstream.ExpectCancellation();
}, _materializer);

private IActorRef ReplierFailOn(int n) => Sys.ActorOf(Props.Create(() => new FailOn(n)), "failureReplier-" + n);
private IActorRef ReplierFailOn(int n) =>
Sys.ActorOf(Props.Create(() => new FailOn(n)).WithDispatcher("akka.test.stream-dispatcher"),
"failureReplier-" + n);

private IActorRef ReplierFailAllExceptOn(int n) => Sys.ActorOf(Props.Create(() => new FailOnAllExcept(n)), "failureReplier-" + n);
private IActorRef ReplierFailAllExceptOn(int n) =>
Sys.ActorOf(Props.Create(() => new FailOnAllExcept(n)).WithDispatcher("akka.test.stream-dispatcher"),
"failureReplier-" + n);
}
}

0 comments on commit 4264c9a

Please sign in to comment.