Skip to content

Commit

Permalink
Convert Akka.Streams.Tests to async - Actor.ActorPublisherSpec (#5991)
Browse files Browse the repository at this point in the history
  • Loading branch information
Arkatufus committed Jun 10, 2022
1 parent d8e62f6 commit f595f63
Show file tree
Hide file tree
Showing 3 changed files with 298 additions and 212 deletions.
8 changes: 8 additions & 0 deletions src/core/Akka.Streams.TestKit/TestSubscriber.cs
Expand Up @@ -272,6 +272,14 @@ public async Task<Exception> ExpectSubscriptionAndErrorAsync(CancellationToken c
=> await ExpectSubscriptionAndErrorTask(this, signalDemand, cancellationToken)
.ConfigureAwait(false);

public async Task ExpectSubscriptionAndCompleteAsync(
bool signalDemand = true,
CancellationToken cancellationToken = default)
{
await ExpectSubscriptionAndCompleteTask(this, signalDemand, cancellationToken)
.ConfigureAwait(false);
}

/// <summary>
/// Expect given next element or error signal, returning whichever was signaled.
/// </summary>
Expand Down
49 changes: 36 additions & 13 deletions src/core/Akka.Streams.TestKit/Utils.cs
Expand Up @@ -8,12 +8,15 @@
using System;
using System.Collections.Immutable;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
using Akka.Streams.Implementation;
using Akka.TestKit;
using Akka.TestKit.Extensions;
using Akka.Util.Internal;
using FluentAssertions.Extensions;

namespace Akka.Streams.TestKit
{
Expand All @@ -22,32 +25,52 @@ public static class Utils
public static Config UnboundedMailboxConfig { get; } =
ConfigurationFactory.ParseString(@"akka.actor.default-mailbox.mailbox-type = ""Akka.Dispatch.UnboundedMailbox, Akka""");

public static void AssertAllStagesStopped(this AkkaSpec spec, Action block, IMaterializer materializer)
public static void AssertAllStagesStopped(
this AkkaSpec spec,
Action block,
IMaterializer materializer,
TimeSpan? timeout = null,
CancellationToken cancellationToken = default)
=> AssertAllStagesStoppedAsync(spec, () =>
{
block();
return NotUsed.Instance;
}, materializer)
}, materializer, timeout, cancellationToken)
.ConfigureAwait(false).GetAwaiter().GetResult();

public static T AssertAllStagesStopped<T>(this AkkaSpec spec, Func<T> block, IMaterializer materializer)
=> AssertAllStagesStoppedAsync(spec, () => Task.FromResult(block()), materializer)
public static T AssertAllStagesStopped<T>(
this AkkaSpec spec,
Func<T> block,
IMaterializer materializer,
TimeSpan? timeout = null,
CancellationToken cancellationToken = default)
=> AssertAllStagesStoppedAsync(spec, async () => block(), materializer, timeout, cancellationToken)
.ConfigureAwait(false).GetAwaiter().GetResult();

public static async Task<T> AssertAllStagesStoppedAsync<T>(this AkkaSpec spec, Func<T> block,
IMaterializer materializer)
=> await AssertAllStagesStoppedAsync(spec, () => Task.FromResult(block()), materializer)
public static async Task<T> AssertAllStagesStoppedAsync<T>(
this AkkaSpec spec,
Func<T> block,
IMaterializer materializer,
TimeSpan? timeout = null,
CancellationToken cancellationToken = default)
=> await AssertAllStagesStoppedAsync(spec, () => Task.FromResult(block()), materializer, timeout, cancellationToken)
.ConfigureAwait(false);

public static async Task<T> AssertAllStagesStoppedAsync<T>(this AkkaSpec spec, Func<Task<T>> block, IMaterializer materializer)
public static async Task<T> AssertAllStagesStoppedAsync<T>(
this AkkaSpec spec,
Func<Task<T>> block,
IMaterializer materializer,
TimeSpan? timeout = null,
CancellationToken cancellationToken = default)
{
var result = await block();
timeout ??= 20.Seconds();
var result = await block().ShouldCompleteWithin(timeout.Value);
if (!(materializer is ActorMaterializerImpl impl))
return result;

var probe = spec.CreateTestProbe(impl.System);
probe.Send(impl.Supervisor, StreamSupervisor.StopChildren.Instance);
await probe.ExpectMsgAsync<StreamSupervisor.StoppedChildren>();
await probe.ExpectMsgAsync<StreamSupervisor.StoppedChildren>(cancellationToken: cancellationToken);

await probe.WithinAsync(TimeSpan.FromSeconds(5), async () =>
{
Expand All @@ -57,17 +80,17 @@ public static async Task<T> AssertAllStagesStoppedAsync<T>(this AkkaSpec spec, F
await probe.AwaitAssertAsync(async () =>
{
impl.Supervisor.Tell(StreamSupervisor.GetChildren.Instance, probe.Ref);
children = (await probe.ExpectMsgAsync<StreamSupervisor.Children>()).Refs;
children = (await probe.ExpectMsgAsync<StreamSupervisor.Children>(cancellationToken: cancellationToken)).Refs;
if (children.Count != 0)
throw new Exception($"expected no StreamSupervisor children, but got {children.Aggregate("", (s, @ref) => s + @ref + ", ")}");
});
}, cancellationToken: cancellationToken);
}
catch
{
children.ForEach(c=>c.Tell(StreamSupervisor.PrintDebugDump.Instance));
throw;
}
});
}, cancellationToken: cancellationToken);

return result;
}
Expand Down

0 comments on commit f595f63

Please sign in to comment.