Skip to content

Commit

Permalink
[Async TestKit] Convert Akka.Streams.Tests to async - FlowFlattenMerg…
Browse files Browse the repository at this point in the history
…eSpec FlowGroupBySpec TimeoutsSpec (#5963)

* Convert Akka.Streams.Tests to async - FlowFlattenMergeSpec FlowGroupBySpec TimeoutsSpec

* Fix FlowGroupBySpec

* Fix FlowFlattenMergeSpec

* Fix AssertAllStagesStopped

* Add ShouldThrowWithin Task extension

* Fix FutureFlattenSourceSpec

* Revert HubSpec to its original code

* Add ForEachAsync sink

* Remove extraneous async test functions, causes ambiguous function fingerprints

* Fix specs

* Update API verify list

* Fix XML-Doc

* Fix RestartSpec

* fix stringify

* Fix FlowDelaySpec tests

* Rewrite FlowDelaySpec

* Fix HubSpec

* Fix HubSpec

* Merge dev

* Fix FlowDelaySpec, add epsilon

* Fix FlowDelaySpec timing problem.

* Add blamw flag to dotnet test

* Fix OutputStreamSourceSpec deadlock bug

* Fix LastSinkSpec

* Fix deadlock caused by WithinAsync and AwaitConditionAsync

* Skip persistence performance test for SqLite for now.

Co-authored-by: Aaron Stannard <aaron@petabridge.com>
  • Loading branch information
Arkatufus and Aaronontheweb committed Jun 20, 2022
1 parent 5ca2166 commit 6c07c56
Show file tree
Hide file tree
Showing 61 changed files with 1,338 additions and 1,689 deletions.
12 changes: 6 additions & 6 deletions build.fsx
Expand Up @@ -250,8 +250,8 @@ Target "RunTests" (fun _ ->
let runSingleProject project =
let arguments =
match (hasTeamCity) with
| true -> (sprintf "test -c Release --no-build --logger:trx --logger:\"console;verbosity=normal\" --framework %s --results-directory \"%s\" -- -parallel none -teamcity" testNetFrameworkVersion outputTests)
| false -> (sprintf "test -c Release --no-build --logger:trx --logger:\"console;verbosity=normal\" --framework %s --results-directory \"%s\" -- -parallel none" testNetFrameworkVersion outputTests)
| true -> (sprintf "test -c Release --blame-crash --blame-hang-timeout 30s --no-build --logger:trx --logger:\"console;verbosity=normal\" --framework %s --results-directory \"%s\" -- -parallel none -teamcity" testNetFrameworkVersion outputTests)
| false -> (sprintf "test -c Release --blame-crash --blame-hang-timeout 30s --no-build --logger:trx --logger:\"console;verbosity=normal\" --framework %s --results-directory \"%s\" -- -parallel none" testNetFrameworkVersion outputTests)

let result = ExecProcess(fun info ->
info.FileName <- "dotnet"
Expand Down Expand Up @@ -280,8 +280,8 @@ Target "RunTestsNetCore" (fun _ ->
let runSingleProject project =
let arguments =
match (hasTeamCity) with
| true -> (sprintf "test -c Release --no-build --logger:trx --logger:\"console;verbosity=normal\" --framework %s --results-directory \"%s\" -- -parallel none -teamcity" testNetCoreVersion outputTests)
| false -> (sprintf "test -c Release --no-build --logger:trx --logger:\"console;verbosity=normal\" --framework %s --results-directory \"%s\" -- -parallel none" testNetCoreVersion outputTests)
| true -> (sprintf "test -c Release --blame-crash --blame-hang-timeout 30s --no-build --logger:trx --logger:\"console;verbosity=normal\" --framework %s --results-directory \"%s\" -- -parallel none -teamcity" testNetCoreVersion outputTests)
| false -> (sprintf "test -c Release --blame-crash --blame-hang-timeout 30s --no-build --logger:trx --logger:\"console;verbosity=normal\" --framework %s --results-directory \"%s\" -- -parallel none" testNetCoreVersion outputTests)

let result = ExecProcess(fun info ->
info.FileName <- "dotnet"
Expand Down Expand Up @@ -310,8 +310,8 @@ Target "RunTestsNet" (fun _ ->
let runSingleProject project =
let arguments =
match (hasTeamCity) with
| true -> (sprintf "test -c Release --no-build --logger:trx --logger:\"console;verbosity=normal\" --framework %s --results-directory \"%s\" -- -parallel none -teamcity" testNetVersion outputTests)
| false -> (sprintf "test -c Release --no-build --logger:trx --logger:\"console;verbosity=normal\" --framework %s --results-directory \"%s\" -- -parallel none" testNetVersion outputTests)
| true -> (sprintf "test -c Release --blame-crash --blame-hang-timeout 30s --no-build --logger:trx --logger:\"console;verbosity=normal\" --framework %s --results-directory \"%s\" -- -parallel none -teamcity" testNetVersion outputTests)
| false -> (sprintf "test -c Release --blame-crash --blame-hang-timeout 30s --no-build --logger:trx --logger:\"console;verbosity=normal\" --framework %s --results-directory \"%s\" -- -parallel none" testNetVersion outputTests)

let result = ExecProcess(fun info ->
info.FileName <- "dotnet"
Expand Down
Expand Up @@ -143,10 +143,10 @@ public async Task Persistent_Shard_must_recover_from_failing_entity(Props entity
err.Cause.Should().BeOfType<ActorInitializationException>();

// Need to wait for the internal state to reset, else everything we sent will go to dead letter
await AwaitConditionAsync(() =>
await AwaitConditionAsync(async () =>
{
persistentShard.Tell(Shard.GetCurrentShardState.Instance);
var failedState = ExpectMsg<Shard.CurrentShardState>();
var failedState = await ExpectMsgAsync<Shard.CurrentShardState>();
return failedState.EntityIds.Count == 0;
});

Expand Down
Expand Up @@ -80,7 +80,7 @@ public async Task DistributedPubSubMediator_should_send_messages_to_dead_letter(

// assert
await EventFilter.DeadLetter<object>().ExpectAsync(1,
() => { mediator.Tell(new Publish("pub-sub", $"hit")); });
async () => { mediator.Tell(new Publish("pub-sub", "hit")); });
}
}

Expand Down
Expand Up @@ -53,7 +53,7 @@ public async Task ClusterSingletonProxy_with_zero_buffering_should_work()

// have to wait for cluster singleton to be ready, otherwise message will be rejected
await AwaitConditionAsync(
() => Cluster.Get(testSystem.Sys).State.Members.Count(m => m.Status == MemberStatus.Up) == 2,
async () => Cluster.Get(testSystem.Sys).State.Members.Count(m => m.Status == MemberStatus.Up) == 2,
TimeSpan.FromSeconds(30));

try
Expand Down
Expand Up @@ -12,6 +12,8 @@

namespace Akka.Persistence.Sqlite.Tests.Performance
{
// Skip performance test. Commented out for now, we'll add a specific environment variable controlled skipable fact later
/*
public class SqliteJournalPerfSpec : JournalPerfSpec
{
private static AtomicCounter counter = new AtomicCounter(0);
Expand Down Expand Up @@ -40,4 +42,5 @@ class = ""Akka.Persistence.Sqlite.Journal.SqliteJournal, Akka.Persistence.Sqlite
}");
}
}
*/
}
Expand Up @@ -1935,6 +1935,7 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> First<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> FirstOrDefault<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEach<TIn>(System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEachAsync<TIn>(int parallelism, System.Func<TIn, System.Threading.Tasks.Task> action) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEachParallel<TIn>(int parallelism, System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, TMat> FromGraph<TIn, TMat>(Akka.Streams.IGraph<Akka.Streams.SinkShape<TIn>, TMat> graph) { }
public static Akka.Streams.Dsl.Sink<TIn, Akka.NotUsed> FromSubscriber<TIn>(Reactive.Streams.ISubscriber<TIn> subscriber) { }
Expand Down
Expand Up @@ -1935,6 +1935,7 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> First<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> FirstOrDefault<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEach<TIn>(System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEachAsync<TIn>(int parallelism, System.Func<TIn, System.Threading.Tasks.Task> action) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEachParallel<TIn>(int parallelism, System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, TMat> FromGraph<TIn, TMat>(Akka.Streams.IGraph<Akka.Streams.SinkShape<TIn>, TMat> graph) { }
public static Akka.Streams.Dsl.Sink<TIn, Akka.NotUsed> FromSubscriber<TIn>(Reactive.Streams.ISubscriber<TIn> subscriber) { }
Expand Down
Expand Up @@ -1935,6 +1935,7 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> First<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> FirstOrDefault<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEach<TIn>(System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEachAsync<TIn>(int parallelism, System.Func<TIn, System.Threading.Tasks.Task> action) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEachParallel<TIn>(int parallelism, System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, TMat> FromGraph<TIn, TMat>(Akka.Streams.IGraph<Akka.Streams.SinkShape<TIn>, TMat> graph) { }
public static Akka.Streams.Dsl.Sink<TIn, Akka.NotUsed> FromSubscriber<TIn>(Reactive.Streams.ISubscriber<TIn> subscriber) { }
Expand Down
39 changes: 7 additions & 32 deletions src/core/Akka.Cluster.Tests/ClusterLogSpec.cs
Expand Up @@ -11,7 +11,9 @@
using Akka.Actor;
using Akka.Configuration;
using Akka.TestKit;
using Akka.TestKit.Extensions;
using Akka.Util.Internal;
using FluentAssertions.Extensions;
using Xunit;
using Xunit.Abstractions;
using Xunit.Sdk;
Expand Down Expand Up @@ -50,26 +52,13 @@ protected async Task AwaitUpAsync()
{
await WithinAsync(TimeSpan.FromSeconds(10), async() =>
{
await AwaitConditionAsync(() => ClusterView.IsSingletonCluster);
await AwaitConditionAsync(async () => ClusterView.IsSingletonCluster);
ClusterView.Self.Address.ShouldBe(_selfAddress);
ClusterView.Members.Select(m => m.Address).ShouldBe(new Address[] { _selfAddress });
await AwaitAssertAsync(() => ClusterView.Status.ShouldBe(MemberStatus.Up));
});
}


/// <summary>
/// The expected log info pattern to intercept after a <see cref="Cluster.Join(Address)"/>.
/// </summary>
protected void Join(string expected)
{
Within(TimeSpan.FromSeconds(10), () =>
{
EventFilter
.Info(contains: expected)
.ExpectOne(() => _cluster.Join(_selfAddress));
});
}
/// <summary>
/// The expected log info pattern to intercept after a <see cref="Cluster.Join(Address)"/>.
/// </summary>
Expand All @@ -79,21 +68,7 @@ protected async Task JoinAsync(string expected)
{
await EventFilter
.Info(contains: expected)
.ExpectOneAsync(TimeSpan.FromMinutes(1), () => _cluster.Join(_selfAddress));
});
}

/// <summary>
/// The expected log info pattern to intercept after a <see cref="Cluster.Down(Address)"/>.
/// </summary>
/// <param name="expected"></param>
protected void Down(string expected)
{
Within(TimeSpan.FromSeconds(10), () =>
{
EventFilter
.Info(contains: expected)
.ExpectOne(() => _cluster.Down(_selfAddress));
.ExpectOneAsync(async () => _cluster.Join(_selfAddress));
});
}

Expand All @@ -107,7 +82,7 @@ protected async Task DownAsync(string expected)
{
await EventFilter
.Info(contains: expected)
.ExpectOneAsync(() => _cluster.Down(_selfAddress));
.ExpectOneAsync(async () => _cluster.Down(_selfAddress));
});
}
}
Expand Down Expand Up @@ -139,9 +114,9 @@ public ClusterLogVerboseDefaultSpec(ITestOutputHelper output)
public async Task A_cluster_must_not_log_verbose_cluster_events_by_default()
{
_cluster.Settings.LogInfoVerbose.ShouldBeFalse();
Intercept<TrueException>(() => Join(upLogMessage));
await JoinAsync(upLogMessage).ShouldThrowWithin<TrueException>(10.Seconds());
await AwaitUpAsync();
Intercept<TrueException>(() => Down(downLogMessage));
await DownAsync(downLogMessage).ShouldThrowWithin<TrueException>(10.Seconds());
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/core/Akka.Cluster.Tests/ClusterSpec.cs
Expand Up @@ -79,7 +79,7 @@ public async Task A_cluster_must_initially_become_singleton_cluster_when_joining
ClusterView.Members.Count.Should().Be(0);
_cluster.Join(_selfAddress);
LeaderActions(); // Joining -> Up
await AwaitConditionAsync(() => ClusterView.IsSingletonCluster);
await AwaitConditionAsync(async () => ClusterView.IsSingletonCluster);
ClusterView.Self.Address.Should().Be(_selfAddress);
ClusterView.Members.Select(m => m.Address).ToImmutableHashSet()
.Should().BeEquivalentTo(ImmutableHashSet.Create(_selfAddress));
Expand Down Expand Up @@ -258,7 +258,7 @@ public async Task A_cluster_must_cancel_LeaveAsync_task_if_CancellationToken_fir

// Cancelling the first task
cts.Cancel();
await AwaitConditionAsync(() => task1.IsCanceled, null, "Task should be cancelled");
await AwaitConditionAsync(async () => task1.IsCanceled, null, "Task should be cancelled");

await WithinAsync(TimeSpan.FromSeconds(10), async () =>
{
Expand All @@ -273,12 +273,12 @@ public async Task A_cluster_must_cancel_LeaveAsync_task_if_CancellationToken_fir
ExpectMsg<ClusterEvent.MemberRemoved>().Member.Address.Should().Be(_selfAddress);
// Second task should complete (not cancelled)
await AwaitConditionAsync(() => task2.IsCompleted && !task2.IsCanceled, null, "Task should be completed, but not cancelled.");
await AwaitConditionAsync(async () => task2.IsCompleted && !task2.IsCanceled, null, "Task should be completed, but not cancelled.");
}, cancellationToken: cts.Token);

// Subsequent LeaveAsync() tasks expected to complete immediately (not cancelled)
var task3 = _cluster.LeaveAsync();
await AwaitConditionAsync(() => task3.IsCompleted && !task3.IsCanceled, null, "Task should be completed, but not cancelled.");
await AwaitConditionAsync(async () => task3.IsCompleted && !task3.IsCanceled, null, "Task should be completed, but not cancelled.");
}

[Fact]
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Remote.Tests/ActorsLeakSpec.cs
Expand Up @@ -234,7 +234,7 @@ public async Task Remoting_must_not_leak_actors()
* Wait for the ReliableDeliverySupervisor to receive its "TooLongIdle" message,
* which will throw a HopelessAssociation wrapped around a TimeoutException.
*/
await EventFilter.Exception<TimeoutException>().ExpectOneAsync(() => { });
await EventFilter.Exception<TimeoutException>().ExpectOneAsync(async () => { });

await AwaitAssertAsync(() =>
{
Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka.Remote.Tests/RemotingSpec.cs
Expand Up @@ -521,7 +521,7 @@ public async Task Stash_inbound_connections_until_UID_is_known_for_pending_outbo
(new ActorAssociationEventListener(remoteTransportProbe)));

// Hijack associations through the test transport
await AwaitConditionAsync(() => registry.TransportsReady(rawLocalAddress, rawRemoteAddress));
await AwaitConditionAsync(async () => registry.TransportsReady(rawLocalAddress, rawRemoteAddress));
var testTransport = registry.TransportFor(rawLocalAddress).Value.Item1;
testTransport.WriteBehavior.PushConstant(true);

Expand Down Expand Up @@ -601,7 +601,7 @@ public async Task Properly_quarantine_stashed_inbound_connections()
(new ActorAssociationEventListener(remoteTransportProbe)));

// Hijack associations through the test transport
await AwaitConditionAsync(() => registry.TransportsReady(rawLocalAddress, rawRemoteAddress));
await AwaitConditionAsync(async () => registry.TransportsReady(rawLocalAddress, rawRemoteAddress));
var testTransport = registry.TransportFor(rawLocalAddress).Value.Item1;
testTransport.WriteBehavior.PushConstant(true);

Expand Down

0 comments on commit 6c07c56

Please sign in to comment.