-
Notifications
You must be signed in to change notification settings - Fork 1k
/
Utils.cs
114 lines (102 loc) · 4.76 KB
/
Utils.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
//-----------------------------------------------------------------------
// <copyright file="Utils.cs" company="Akka.NET Project">
// Copyright (C) 2009-2021 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------
using System;
using System.Collections.Immutable;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
using Akka.Streams.Implementation;
using Akka.TestKit;
using Akka.TestKit.Extensions;
using Akka.Util.Internal;
using FluentAssertions.Extensions;
namespace Akka.Streams.TestKit
{
public static class Utils
{
public static Config UnboundedMailboxConfig { get; } =
ConfigurationFactory.ParseString(@"akka.actor.default-mailbox.mailbox-type = ""Akka.Dispatch.UnboundedMailbox, Akka""");
public static void AssertAllStagesStopped(
this AkkaSpec spec,
Action block,
IMaterializer materializer,
TimeSpan? timeout = null,
CancellationToken cancellationToken = default)
=> AssertAllStagesStoppedAsync(spec, () =>
{
block();
return NotUsed.Instance;
}, materializer, timeout, cancellationToken)
.ConfigureAwait(false).GetAwaiter().GetResult();
public static T AssertAllStagesStopped<T>(
this AkkaSpec spec,
Func<T> block,
IMaterializer materializer,
TimeSpan? timeout = null,
CancellationToken cancellationToken = default)
=> AssertAllStagesStoppedAsync(spec, async () => block(), materializer, timeout, cancellationToken)
.ConfigureAwait(false).GetAwaiter().GetResult();
public static async Task<T> AssertAllStagesStoppedAsync<T>(
this AkkaSpec spec,
Func<T> block,
IMaterializer materializer,
TimeSpan? timeout = null,
CancellationToken cancellationToken = default)
=> await AssertAllStagesStoppedAsync(spec, () => Task.FromResult(block()), materializer, timeout, cancellationToken)
.ConfigureAwait(false);
public static async Task<T> AssertAllStagesStoppedAsync<T>(
this AkkaSpec spec,
Func<Task<T>> block,
IMaterializer materializer,
TimeSpan? timeout = null,
CancellationToken cancellationToken = default)
{
timeout ??= 20.Seconds();
var result = await block().ShouldCompleteWithin(timeout.Value);
if (!(materializer is ActorMaterializerImpl impl))
return result;
var probe = spec.CreateTestProbe(impl.System);
probe.Send(impl.Supervisor, StreamSupervisor.StopChildren.Instance);
await probe.ExpectMsgAsync<StreamSupervisor.StoppedChildren>(cancellationToken: cancellationToken);
await probe.WithinAsync(TimeSpan.FromSeconds(5), async () =>
{
IImmutableSet<IActorRef> children = ImmutableHashSet<IActorRef>.Empty;
try
{
await probe.AwaitAssertAsync(async () =>
{
impl.Supervisor.Tell(StreamSupervisor.GetChildren.Instance, probe.Ref);
children = (await probe.ExpectMsgAsync<StreamSupervisor.Children>(cancellationToken: cancellationToken)).Refs;
if (children.Count != 0)
throw new Exception($"expected no StreamSupervisor children, but got {children.Aggregate("", (s, @ref) => s + @ref + ", ")}");
}, cancellationToken: cancellationToken);
}
catch
{
children.ForEach(c=>c.Tell(StreamSupervisor.PrintDebugDump.Instance));
throw;
}
}, cancellationToken: cancellationToken);
return result;
}
public static void AssertDispatcher(IActorRef @ref, string dispatcher)
{
if (!(@ref is ActorRefWithCell r))
throw new Exception($"Unable to determine dispatcher of {@ref}");
if (r.Underlying.Props.Dispatcher != dispatcher)
throw new Exception($"Expected {@ref} to use dispatcher [{dispatcher}], yet used : [{r.Underlying.Props.Dispatcher}]");
}
[Obsolete("Use ShouldCompleteWithin instead")]
public static T AwaitResult<T>(this Task<T> task, TimeSpan? timeout = null)
{
task.Wait(timeout??TimeSpan.FromSeconds(3)).ShouldBeTrue();
return task.Result;
}
}
}