diff --git a/src/core/Akka.Streams.TestKit/ScriptedTest.cs b/src/core/Akka.Streams.TestKit/ScriptedTest.cs index 0bf70a76ca3..663610add68 100644 --- a/src/core/Akka.Streams.TestKit/ScriptedTest.cs +++ b/src/core/Akka.Streams.TestKit/ScriptedTest.cs @@ -15,8 +15,10 @@ using Akka.Configuration; using Akka.Streams.Dsl; using Akka.TestKit; +using Akka.TestKit.Extensions; using Akka.Util; using Akka.Util.Internal; +using FluentAssertions.Extensions; using Reactive.Streams; using Xunit.Abstractions; @@ -317,19 +319,18 @@ protected ScriptedTest(ITestOutputHelper output = null) : base(output) Func, Flow> op, int maximumOverrun = 3, int maximumRequest = 3, - int maximumBuffer = 3, - AkkaSpec spec = null) - => RunScriptAsync(spec, script, settings, op, maximumOverrun, maximumRequest, maximumBuffer) + int maximumBuffer = 3) + => RunScriptAsync(script, settings, op, maximumOverrun, maximumRequest, maximumBuffer) .ConfigureAwait(false).GetAwaiter().GetResult(); protected async Task RunScriptAsync( - AkkaSpec spec, Script script, ActorMaterializerSettings settings, Func, Flow> op, int maximumOverrun = 3, int maximumRequest = 3, - int maximumBuffer = 3) + int maximumBuffer = 3, + bool assertStagesStopped = true) { var runner = new ScriptRunner(op, settings, script, maximumOverrun, maximumRequest, maximumBuffer, this); async Task Run() @@ -337,14 +338,19 @@ async Task Run() await runner.InitializeAsync(); await runner.RunAsync(); } - - if(spec != null) - await spec.AssertAllStagesStoppedAsync(async () => + + if (assertStagesStopped) + { + await this.AssertAllStagesStoppedAsync(async () => { await Run(); }, runner.Materializer); + } else - await Run(); + { + // guard against deadlocks, assuming that a test would not take more than 30 seconds. + await Run().ShouldCompleteWithin(30.Seconds()); + } } protected static IPublisher ToPublisher(Source source, IMaterializer materializer) diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowCollectSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowCollectSpec.cs index f2783b169ba..b6cb56bf292 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowCollectSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowCollectSpec.cs @@ -42,7 +42,7 @@ public async Task An_old_behaviour_Collect_must_collect() foreach (var _ in RandomTestRange(Sys)) { - await RunScriptAsync(this, script, Materializer.Settings, + await RunScriptAsync(script, Materializer.Settings, // This is intentional, testing backward compatibility with old obsolete method #pragma warning disable CS0618 flow => flow.Collect(x => x % 2 == 0 ? (x * x).ToString() : null)); @@ -63,7 +63,7 @@ public async Task A_Collect_must_collect() foreach (var _ in RandomTestRange(Sys)) { - await RunScriptAsync(this, script, Materializer.Settings, + await RunScriptAsync(script, Materializer.Settings, flow => flow.Collect(x => x % 2 == 0, x => (x * x).ToString())); } } diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowGroupedSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowGroupedSpec.cs index 894e9e15ea1..c44140d86e6 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowGroupedSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowGroupedSpec.cs @@ -45,7 +45,7 @@ public async Task A_Grouped_must_group_evenly() var script = Script.Create(RandomTestRange(Sys).Select(_ => RandomTest(testLength)).ToArray()); foreach (var _ in RandomTestRange(Sys)) { - await RunScriptAsync(this, script, Settings, flow => flow.Grouped(testLength)); + await RunScriptAsync(script, Settings, flow => flow.Grouped(testLength)); } } @@ -57,7 +57,7 @@ public async Task A_Grouped_must_group_with_rest() var script = Script.Create(RandomTestRange(Sys).Select(_ => RandomTest(testLength)).Concat(RandomTest(1)).ToArray()); foreach (var _ in RandomTestRange(Sys)) { - await RunScriptAsync(this, script, Settings, flow => flow.Grouped(testLength)); + await RunScriptAsync(script, Settings, flow => flow.Grouped(testLength)); } } } diff --git a/src/core/Akka.Streams.Tests/Extra/FlowTimedSpec.cs b/src/core/Akka.Streams.Tests/Extra/FlowTimedSpec.cs index ccf86eae574..f5cc4dc306d 100644 --- a/src/core/Akka.Streams.Tests/Extra/FlowTimedSpec.cs +++ b/src/core/Akka.Streams.Tests/Extra/FlowTimedSpec.cs @@ -8,14 +8,13 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading.Tasks; using Akka.Actor; using Akka.Streams.Dsl; using Akka.Streams.Extra; using Akka.Streams.TestKit; -using Akka.Util.Internal; using Xunit; using Xunit.Abstractions; -// ReSharper disable InvokeAsExtensionMethod namespace Akka.Streams.Tests.Extra { @@ -24,7 +23,7 @@ public class FlowTimedSpec : ScriptedTest private readonly ITestOutputHelper _helper; private ActorMaterializer Materializer { get; } - public FlowTimedSpec(ITestOutputHelper helper) : base(helper) + public FlowTimedSpec(ITestOutputHelper helper) : base("akka.loglevel = DEBUG", helper) { _helper = helper; var settings = ActorMaterializerSettings.Create(Sys).WithInputBuffer(2, 16); @@ -32,89 +31,101 @@ public FlowTimedSpec(ITestOutputHelper helper) : base(helper) } [Fact] - public void Timed_Source_must_measure_time_it_takes_between_elements_matching_a_predicate() + public async Task Timed_Source_must_measure_time_it_takes_between_elements_matching_a_predicate() { var testActor = CreateTestProbe(); const int measureBetweenEvery = 5; - Action printInfo = interval => + void PrintInfo(TimeSpan interval) { testActor.Tell(interval); _helper.WriteLine($"Measured interval between {measureBetweenEvery} elements was {interval}"); - }; + } - var n = 20; + const int n = 20; var testRuns = new[] {1, 2}; - Func> script = - () => - Script.Create( - Enumerable.Range(1, n) - .Select(x => ((ICollection)new[] { x }, (ICollection)new[] { x })).ToArray()); - testRuns.ForEach( - _ => - RunScript(script(), Materializer.Settings, - flow =>flow.Select(x => x) - .TimedIntervalBetween(i => i%measureBetweenEvery == 0, printInfo))); + + Script Script() => + ScriptedTest.Script.Create(Enumerable.Range(1, n) + .Select(x => ((ICollection)new[] { x }, (ICollection)new[] { x })) + .ToArray()); + + foreach (var _ in testRuns) + { + await RunScriptAsync(Script(), Materializer.Settings, + flow =>flow.Select(x => x) + .TimedIntervalBetween(i => i % measureBetweenEvery == 0, PrintInfo), + assertStagesStopped: false); + } var expectedNrOfOnIntervalCalls = testRuns.Length*((n/measureBetweenEvery) - 1); // first time has no value to compare to, so skips calling onInterval - Enumerable.Range(1,expectedNrOfOnIntervalCalls).ForEach(_=> testActor.ExpectMsg()); + foreach (var _ in Enumerable.Range(1,expectedNrOfOnIntervalCalls)) + { + await testActor.ExpectMsgAsync(); + } } [Fact] - public void Timed_Source_must_measure_time_it_takes_from_start_to_complete_by_wrapping_operations() + public async Task Timed_Source_must_measure_time_it_takes_from_start_to_complete_by_wrapping_operations() { var testActor = CreateTestProbe(); - var n = 50; + const int n = 50; - Action printInfo = d => + void PrintInfo(TimeSpan d) { testActor.Tell(d); _helper.WriteLine($"Processing {n} elements took {d}"); - }; + } var testRuns = new[] {1, 2, 3}; - Func> script = - () => - Script.Create( - Enumerable.Range(1, n) - .Select(x => ((ICollection)new[] { x }, (ICollection)new[] { x })).ToArray()); - - testRuns.ForEach( - _ => RunScript(script(), Materializer.Settings, flow => flow.Timed(f => f.Select(x => x), printInfo))); - testRuns.ForEach(_ => testActor.ExpectMsg()); - testActor.ExpectNoMsg(TimeSpan.FromSeconds(1)); + + Script Script() => + ScriptedTest.Script.Create(Enumerable.Range(1, n) + .Select(x => ((ICollection)new[] { x }, (ICollection)new[] { x })) + .ToArray()); + + foreach (var _ in testRuns) + { + await RunScriptAsync(Script(), Materializer.Settings, flow => flow.Timed(f => f.Select(x => x), PrintInfo)); + } + + foreach (var _ in testRuns) + { + await testActor.ExpectMsgAsync(); + } + await testActor.ExpectNoMsgAsync(TimeSpan.FromSeconds(1)); } [Fact] - public void Timed_Flow_must_measure_time_it_takes_between_elements_matching_a_predicate() + public async Task Timed_Flow_must_measure_time_it_takes_between_elements_matching_a_predicate() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var probe = CreateTestProbe(); var flow = - Flow.Create().Select(x => (long) x).TimedIntervalBetween(i => i%2 == 1, d => probe.Tell(d)); + Flow.Create().Select(x => (long) x).TimedIntervalBetween(i => i % 2 == 1, d => probe.Tell(d)); var c1 = this.CreateManualSubscriberProbe(); Source.From(Enumerable.Range(1, 3)).Via(flow).RunWith(Sink.FromSubscriber(c1), Materializer); - var s = c1.ExpectSubscription(); + var s = await c1.ExpectSubscriptionAsync(); s.Request(100); - c1.ExpectNext(1L); - c1.ExpectNext(2L); - c1.ExpectNext(3L); - c1.ExpectComplete(); + await c1.ExpectNextAsync(1L); + await c1.ExpectNextAsync(2L); + await c1.ExpectNextAsync(3L); + await c1.ExpectCompleteAsync(); - var duration = probe.ExpectMsg(); + var duration = await probe.ExpectMsgAsync(); _helper.WriteLine($"Got duration (first): {duration}"); }, Materializer); } [Fact] - public void Timed_Flow_must_measure_time_it_takes_from_start_to_complete_by_wrapping_operations() + public async Task Timed_Flow_must_measure_time_it_takes_from_start_to_complete_by_wrapping_operations() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var probe = CreateTestProbe(); @@ -124,9 +135,7 @@ public void Timed_Flow_must_measure_time_it_takes_from_start_to_complete_by_wrap d => probe.Tell(d)) .Select(s => s + "!"); - var t = flow.RunWith(Source.AsSubscriber(), Sink.AsPublisher(false), Materializer); - var flowIn = t.Item1; - var flowOut = t.Item2; + var (flowIn, flowOut) = flow.RunWith(Source.AsSubscriber(), Sink.AsPublisher(false), Materializer); var c1 = this.CreateManualSubscriberProbe(); flowOut.Subscribe(c1); @@ -134,12 +143,15 @@ public void Timed_Flow_must_measure_time_it_takes_from_start_to_complete_by_wrap var p = Source.From(Enumerable.Range(0, 101)).RunWith(Sink.AsPublisher(false), Materializer); p.Subscribe(flowIn); - var sub = c1.ExpectSubscription(); + var sub = await c1.ExpectSubscriptionAsync(); sub.Request(200); - Enumerable.Range(0, 101).ForEach(i => c1.ExpectNext(i + "!")); - c1.ExpectComplete(); + foreach (var i in Enumerable.Range(0, 101)) + { + await c1.ExpectNextAsync(i + "!"); + } + await c1.ExpectCompleteAsync(); - var duration = probe.ExpectMsg(); + var duration = await probe.ExpectMsgAsync(); _helper.WriteLine($"Took: {duration}"); }, Materializer); }