Skip to content

Commit

Permalink
Fix TRX NRE bug (#234)
Browse files Browse the repository at this point in the history
  • Loading branch information
Arkatufus committed Apr 19, 2024
1 parent 78057df commit 682fb03
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,14 @@ protected override async Task<RunSummary> RunTestAsync()
SinkCoordinator = TestRunSystem.ActorOf(Props.Create(()
=> new SinkCoordinator(sinks)), "sinkCoordinator");

await SinkCoordinator.Ask<SinkCoordinator.Ready>(Sinks.SinkCoordinator.Ready.Instance);

var tcpLogger = TestRunSystem.ActorOf(Props.Create(() => new TcpLoggingServer(SinkCoordinator)), "TcpLogger");
var listenEndpoint = new IPEndPoint(IPAddress.Parse(Options.ListenAddress), Options.ListenPort);
TestRunSystem.Tcp().Tell(new Tcp.Bind(tcpLogger, listenEndpoint), sender: tcpLogger);

PublishRunnerMessage($"Starting test {TestCase.DisplayName}");
StartNewSpec();
PublishRunnerMessage($"Starting test {TestCase.DisplayName}");

var timelineCollector = TestRunSystem.ActorOf(Props.Create(() => new TimelineLogCollectorActor(Options.AppendLogOutput)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ internal interface IMessageSink
///
/// Typically called at the beginning of a test run.
/// </summary>
void Open(ActorSystem context);
Task Open(ActorSystem context);

/// <summary>
/// Flag that determines if <see cref="Open"/> has been successfully called or not.
Expand Down
5 changes: 3 additions & 2 deletions src/Akka.MultiNode.TestAdapter/Internal/Sinks/MessageSink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ protected MessageSink(Props messageSinkActorProps)

#region Flow Control

public void Open(ActorSystem context)
public async Task Open(ActorSystem context)
{
//Do nothing
if(IsClosed || IsOpen) return;
Expand All @@ -42,6 +42,7 @@ public void Open(ActorSystem context)

//Start the TestCoordinatorActor
MessageSinkActorRef = context.ActorOf(MessageSinkActorProps);
await MessageSinkActorRef.Ask<SinkCoordinator.Ready>(SinkCoordinator.Ready.Instance);
}

public bool IsOpen { get; private set; }
Expand All @@ -55,7 +56,7 @@ internal void RequestExitCode(IActorRef sender)
public async Task<bool> Close(ActorSystem context)
{
//Test run has already been closed or hasn't started
if (!IsOpen || IsClosed) return await Task.FromResult(false);
if (!IsOpen || IsClosed) return false;

IsOpen = false;
IsClosed = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ protected MessageSinkActor()
/// </summary>
private void SetReceive()
{
Receive<SinkCoordinator.Ready>(HandleReady);
Receive<BeginNewSpec>(HandleNewSpec);
Receive<EndSpec>(HandleEndSpec);
Receive<LogMessageFragmentForNode>(HandleNodeMessageFragment);
Expand All @@ -73,6 +74,8 @@ private void SetReceive()
/// </summary>
protected abstract void AdditionalReceives();

private void HandleReady(SinkCoordinator.Ready ready) => Sender.Tell(ready);

protected abstract void HandleNewSpec(BeginNewSpec newSpec);

protected abstract void HandleEndSpec(EndSpec endSpec);
Expand Down
41 changes: 29 additions & 12 deletions src/Akka.MultiNode.TestAdapter/Internal/Sinks/SinkCoordinator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,19 @@ internal class SinkCoordinator : ReceiveActor
{
#region Message classes

public sealed class Ready
{
public static readonly Ready Instance = new();
private Ready() { }
}

/// <summary>
/// Used to signal that we need to enable a given <see cref="MessageSink"/> instance
/// </summary>
internal class EnableSink
internal class EnableSinks
{
public EnableSink(MessageSink sink)
{
Sink = sink;
}

public MessageSink Sink { get; private set; }
public static readonly EnableSinks Instance = new();
private EnableSinks() { }
}

/// <summary>
Expand Down Expand Up @@ -80,6 +82,8 @@ public RecommendedExitCode(int code)

#endregion

private bool _ready;

protected readonly List<MessageSink> DefaultSinks;
protected readonly List<MessageSink> Sinks = new List<MessageSink>();

Expand Down Expand Up @@ -107,8 +111,7 @@ public SinkCoordinator(IEnumerable<MessageSink> defaultSinks)

protected override void PreStart()
{
foreach(var sink in DefaultSinks)
Self.Tell(new EnableSink(sink));
Self.Tell(EnableSinks.Instance);
}

#endregion
Expand All @@ -117,10 +120,24 @@ protected override void PreStart()

private void InitializeReceives()
{
Receive<EnableSink>(sink =>
ReceiveAsync<EnableSinks>(async _ =>
{
foreach (var sink in DefaultSinks)
{
Sinks.Add(sink);
await sink.Open(Context.System);
}
_ready = true;
});

Receive<Ready>(r =>
{
Sinks.Add(sink.Sink);
sink.Sink.Open(Context.System);
if (!_ready)
// If we're not yet ready, requeue the message
Self.Tell(r, Sender);
else
Sender.Tell(r);
});

Receive<SinkClosed>(closed =>
Expand Down

0 comments on commit 682fb03

Please sign in to comment.