Skip to content

Commit

Permalink
Convert Akka.Streams.Tests to async - Extra.FlowTimedSpec (#5993)
Browse files Browse the repository at this point in the history
Co-authored-by: Aaron Stannard <aaron@petabridge.com>
  • Loading branch information
Arkatufus and Aaronontheweb committed Jun 24, 2022
1 parent f3469d7 commit d16b934
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 63 deletions.
24 changes: 15 additions & 9 deletions src/core/Akka.Streams.TestKit/ScriptedTest.cs
Expand Up @@ -15,8 +15,10 @@
using Akka.Configuration;
using Akka.Streams.Dsl;
using Akka.TestKit;
using Akka.TestKit.Extensions;
using Akka.Util;
using Akka.Util.Internal;
using FluentAssertions.Extensions;
using Reactive.Streams;
using Xunit.Abstractions;

Expand Down Expand Up @@ -317,34 +319,38 @@ protected ScriptedTest(ITestOutputHelper output = null) : base(output)
Func<Flow<TIn2, TIn2, NotUsed>, Flow<TIn2, TOut2, TMat2>> op,
int maximumOverrun = 3,
int maximumRequest = 3,
int maximumBuffer = 3,
AkkaSpec spec = null)
=> RunScriptAsync(spec, script, settings, op, maximumOverrun, maximumRequest, maximumBuffer)
int maximumBuffer = 3)
=> RunScriptAsync(script, settings, op, maximumOverrun, maximumRequest, maximumBuffer)
.ConfigureAwait(false).GetAwaiter().GetResult();

protected async Task RunScriptAsync<TIn2, TOut2, TMat2>(
AkkaSpec spec,
Script<TIn2, TOut2> script,
ActorMaterializerSettings settings,
Func<Flow<TIn2, TIn2, NotUsed>, Flow<TIn2, TOut2, TMat2>> op,
int maximumOverrun = 3,
int maximumRequest = 3,
int maximumBuffer = 3)
int maximumBuffer = 3,
bool assertStagesStopped = true)
{
var runner = new ScriptRunner<TIn2, TOut2, TMat2>(op, settings, script, maximumOverrun, maximumRequest, maximumBuffer, this);
async Task Run()
{
await runner.InitializeAsync();
await runner.RunAsync();
}

if(spec != null)
await spec.AssertAllStagesStoppedAsync(async () =>

if (assertStagesStopped)
{
await this.AssertAllStagesStoppedAsync(async () =>
{
await Run();
}, runner.Materializer);
}
else
await Run();
{
// guard against deadlocks, assuming that a test would not take more than 30 seconds.
await Run().ShouldCompleteWithin(30.Seconds());
}
}

protected static IPublisher<TOut> ToPublisher<TOut>(Source<TOut, NotUsed> source, IMaterializer materializer)
Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka.Streams.Tests/Dsl/FlowCollectSpec.cs
Expand Up @@ -42,7 +42,7 @@ public async Task An_old_behaviour_Collect_must_collect()

foreach (var _ in RandomTestRange(Sys))
{
await RunScriptAsync(this, script, Materializer.Settings,
await RunScriptAsync(script, Materializer.Settings,
// This is intentional, testing backward compatibility with old obsolete method
#pragma warning disable CS0618
flow => flow.Collect(x => x % 2 == 0 ? (x * x).ToString() : null));
Expand All @@ -63,7 +63,7 @@ public async Task A_Collect_must_collect()

foreach (var _ in RandomTestRange(Sys))
{
await RunScriptAsync(this, script, Materializer.Settings,
await RunScriptAsync(script, Materializer.Settings,
flow => flow.Collect(x => x % 2 == 0, x => (x * x).ToString()));
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka.Streams.Tests/Dsl/FlowGroupedSpec.cs
Expand Up @@ -45,7 +45,7 @@ public async Task A_Grouped_must_group_evenly()
var script = Script.Create(RandomTestRange(Sys).Select(_ => RandomTest(testLength)).ToArray());
foreach (var _ in RandomTestRange(Sys))
{
await RunScriptAsync(this, script, Settings, flow => flow.Grouped(testLength));
await RunScriptAsync(script, Settings, flow => flow.Grouped(testLength));
}
}

Expand All @@ -57,7 +57,7 @@ public async Task A_Grouped_must_group_with_rest()
var script = Script.Create(RandomTestRange(Sys).Select(_ => RandomTest(testLength)).Concat(RandomTest(1)).ToArray());
foreach (var _ in RandomTestRange(Sys))
{
await RunScriptAsync(this, script, Settings, flow => flow.Grouped(testLength));
await RunScriptAsync(script, Settings, flow => flow.Grouped(testLength));
}
}
}
Expand Down
112 changes: 62 additions & 50 deletions src/core/Akka.Streams.Tests/Extra/FlowTimedSpec.cs
Expand Up @@ -8,14 +8,13 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Streams.Dsl;
using Akka.Streams.Extra;
using Akka.Streams.TestKit;
using Akka.Util.Internal;
using Xunit;
using Xunit.Abstractions;
// ReSharper disable InvokeAsExtensionMethod

namespace Akka.Streams.Tests.Extra
{
Expand All @@ -24,97 +23,109 @@ public class FlowTimedSpec : ScriptedTest
private readonly ITestOutputHelper _helper;
private ActorMaterializer Materializer { get; }

public FlowTimedSpec(ITestOutputHelper helper) : base(helper)
public FlowTimedSpec(ITestOutputHelper helper) : base("akka.loglevel = DEBUG", helper)
{
_helper = helper;
var settings = ActorMaterializerSettings.Create(Sys).WithInputBuffer(2, 16);
Materializer = ActorMaterializer.Create(Sys, settings);
}

[Fact]
public void Timed_Source_must_measure_time_it_takes_between_elements_matching_a_predicate()
public async Task Timed_Source_must_measure_time_it_takes_between_elements_matching_a_predicate()
{
var testActor = CreateTestProbe();
const int measureBetweenEvery = 5;

Action<TimeSpan> printInfo = interval =>
void PrintInfo(TimeSpan interval)
{
testActor.Tell(interval);
_helper.WriteLine($"Measured interval between {measureBetweenEvery} elements was {interval}");
};
}

var n = 20;
const int n = 20;
var testRuns = new[] {1, 2};
Func<Script<int, int>> script =
() =>
Script.Create(
Enumerable.Range(1, n)
.Select(x => ((ICollection<int>)new[] { x }, (ICollection<int>)new[] { x })).ToArray());
testRuns.ForEach(
_ =>
RunScript(script(), Materializer.Settings,
flow =>flow.Select(x => x)
.TimedIntervalBetween(i => i%measureBetweenEvery == 0, printInfo)));

Script<int, int> Script() =>
ScriptedTest.Script.Create(Enumerable.Range(1, n)
.Select(x => ((ICollection<int>)new[] { x }, (ICollection<int>)new[] { x }))
.ToArray());

foreach (var _ in testRuns)
{
await RunScriptAsync(Script(), Materializer.Settings,
flow =>flow.Select(x => x)
.TimedIntervalBetween(i => i % measureBetweenEvery == 0, PrintInfo),
assertStagesStopped: false);
}

var expectedNrOfOnIntervalCalls = testRuns.Length*((n/measureBetweenEvery) - 1); // first time has no value to compare to, so skips calling onInterval
Enumerable.Range(1,expectedNrOfOnIntervalCalls).ForEach(_=> testActor.ExpectMsg<TimeSpan>());
foreach (var _ in Enumerable.Range(1,expectedNrOfOnIntervalCalls))
{
await testActor.ExpectMsgAsync<TimeSpan>();
}
}

[Fact]
public void Timed_Source_must_measure_time_it_takes_from_start_to_complete_by_wrapping_operations()
public async Task Timed_Source_must_measure_time_it_takes_from_start_to_complete_by_wrapping_operations()
{
var testActor = CreateTestProbe();
var n = 50;
const int n = 50;

Action<TimeSpan> printInfo = d =>
void PrintInfo(TimeSpan d)
{
testActor.Tell(d);
_helper.WriteLine($"Processing {n} elements took {d}");
};
}

var testRuns = new[] {1, 2, 3};
Func<Script<int, int>> script =
() =>
Script.Create(
Enumerable.Range(1, n)
.Select(x => ((ICollection<int>)new[] { x }, (ICollection<int>)new[] { x })).ToArray());

testRuns.ForEach(
_ => RunScript(script(), Materializer.Settings, flow => flow.Timed(f => f.Select(x => x), printInfo)));
testRuns.ForEach(_ => testActor.ExpectMsg<TimeSpan>());
testActor.ExpectNoMsg(TimeSpan.FromSeconds(1));

Script<int, int> Script() =>
ScriptedTest.Script.Create(Enumerable.Range(1, n)
.Select(x => ((ICollection<int>)new[] { x }, (ICollection<int>)new[] { x }))
.ToArray());

foreach (var _ in testRuns)
{
await RunScriptAsync(Script(), Materializer.Settings, flow => flow.Timed(f => f.Select(x => x), PrintInfo));
}

foreach (var _ in testRuns)
{
await testActor.ExpectMsgAsync<TimeSpan>();
}
await testActor.ExpectNoMsgAsync(TimeSpan.FromSeconds(1));
}


[Fact]
public void Timed_Flow_must_measure_time_it_takes_between_elements_matching_a_predicate()
public async Task Timed_Flow_must_measure_time_it_takes_between_elements_matching_a_predicate()
{
this.AssertAllStagesStopped(() =>
await this.AssertAllStagesStoppedAsync(async () =>
{
var probe = CreateTestProbe();
var flow =
Flow.Create<int>().Select(x => (long) x).TimedIntervalBetween(i => i%2 == 1, d => probe.Tell(d));
Flow.Create<int>().Select(x => (long) x).TimedIntervalBetween(i => i % 2 == 1, d => probe.Tell(d));
var c1 = this.CreateManualSubscriberProbe<long>();
Source.From(Enumerable.Range(1, 3)).Via(flow).RunWith(Sink.FromSubscriber(c1), Materializer);
var s = c1.ExpectSubscription();
var s = await c1.ExpectSubscriptionAsync();
s.Request(100);
c1.ExpectNext(1L);
c1.ExpectNext(2L);
c1.ExpectNext(3L);
c1.ExpectComplete();
await c1.ExpectNextAsync(1L);
await c1.ExpectNextAsync(2L);
await c1.ExpectNextAsync(3L);
await c1.ExpectCompleteAsync();
var duration = probe.ExpectMsg<TimeSpan>();
var duration = await probe.ExpectMsgAsync<TimeSpan>();
_helper.WriteLine($"Got duration (first): {duration}");
}, Materializer);
}

[Fact]
public void Timed_Flow_must_measure_time_it_takes_from_start_to_complete_by_wrapping_operations()
public async Task Timed_Flow_must_measure_time_it_takes_from_start_to_complete_by_wrapping_operations()
{
this.AssertAllStagesStopped(() =>
await this.AssertAllStagesStoppedAsync(async () =>
{
var probe = CreateTestProbe();
Expand All @@ -124,22 +135,23 @@ public void Timed_Flow_must_measure_time_it_takes_from_start_to_complete_by_wrap
d => probe.Tell(d))
.Select(s => s + "!");
var t = flow.RunWith(Source.AsSubscriber<int>(), Sink.AsPublisher<string>(false), Materializer);
var flowIn = t.Item1;
var flowOut = t.Item2;
var (flowIn, flowOut) = flow.RunWith(Source.AsSubscriber<int>(), Sink.AsPublisher<string>(false), Materializer);
var c1 = this.CreateManualSubscriberProbe<string>();
flowOut.Subscribe(c1);
var p = Source.From(Enumerable.Range(0, 101)).RunWith(Sink.AsPublisher<int>(false), Materializer);
p.Subscribe(flowIn);
var sub = c1.ExpectSubscription();
var sub = await c1.ExpectSubscriptionAsync();
sub.Request(200);
Enumerable.Range(0, 101).ForEach(i => c1.ExpectNext(i + "!"));
c1.ExpectComplete();
foreach (var i in Enumerable.Range(0, 101))
{
await c1.ExpectNextAsync(i + "!");
}
await c1.ExpectCompleteAsync();
var duration = probe.ExpectMsg<TimeSpan>();
var duration = await probe.ExpectMsgAsync<TimeSpan>();
_helper.WriteLine($"Took: {duration}");
}, Materializer);
}
Expand Down

0 comments on commit d16b934

Please sign in to comment.