/
InternalTestActor.cs
93 lines (87 loc) · 3.4 KB
/
InternalTestActor.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
//-----------------------------------------------------------------------
// <copyright file="InternalTestActor.cs" company="Akka.NET Project">
// Copyright (C) 2009-2023 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------
using System;
using System.Collections.Concurrent;
using Akka.Actor;
using Akka.Event;
namespace Akka.TestKit.Internal
{
/// <summary>
/// An actor that enqueues received messages to a <see cref="BlockingCollection{T}"/>.
/// <remarks>Note! Part of internal API. Breaking changes may occur without notice. Use at own risk.</remarks>
/// </summary>
public class InternalTestActor : ActorBase
{
private readonly ITestActorQueue<MessageEnvelope> _queue;
private TestKit.TestActor.Ignore _ignore;
private AutoPilot _autoPilot;
private DelegatingSupervisorStrategy _supervisorStrategy = new();
/// <summary>
/// TBD
/// </summary>
/// <param name="queue">TBD</param>
public InternalTestActor(ITestActorQueue<MessageEnvelope> queue)
{
_queue = queue;
}
/// <summary>
/// TBD
/// </summary>
/// <param name="message">TBD</param>
/// <returns>TBD</returns>
protected override bool Receive(object message)
{
try
{
global::System.Diagnostics.Debug.WriteLine("TestActor received " + message);
}
catch (FormatException)
{
if (message is LogEvent { Message: LogMessage msg })
global::System.Diagnostics.Debug.WriteLine(
$"TestActor received a malformed formatted message. Template:[{msg.Format}], args:[{string.Join(",", msg.Unformatted())}]");
else
throw;
}
switch (message)
{
case TestActor.SetIgnore setIgnore:
_ignore = setIgnore.Ignore;
return true;
case TestActor.Watch watch:
Context.Watch(watch.Actor);
return true;
case TestActor.Unwatch unwatch:
Context.Unwatch(unwatch.Actor);
return true;
case TestActor.SetAutoPilot setAutoPilot:
_autoPilot = setAutoPilot.AutoPilot;
return true;
case TestActor.Spawn spawn:
{
var actor = spawn.Apply(Context);
if (spawn._supervisorStrategy.HasValue)
{
_supervisorStrategy.Update(actor, spawn._supervisorStrategy.Value);
}
_queue.Enqueue(new RealMessageEnvelope(actor, Self));
return true;
}
}
var actorRef = Sender;
if(_autoPilot != null)
{
var newAutoPilot = _autoPilot.Run(actorRef, message);
if(!(newAutoPilot is KeepRunning))
_autoPilot = newAutoPilot;
}
if(_ignore == null || !_ignore(message))
_queue.Enqueue(new RealMessageEnvelope(message, actorRef));
return true;
}
}
}