Skip to content

Commit

Permalink
Add ReceiveAsync feature to Akka.TestKit TestActorRef (#6281)
Browse files Browse the repository at this point in the history
Co-authored-by: Aaron Stannard <aaron@petabridge.com>
  • Loading branch information
Arkatufus and Aaronontheweb committed Dec 1, 2022
1 parent de55b33 commit 5605d83
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 10 deletions.
61 changes: 61 additions & 0 deletions src/core/Akka.TestKit.Tests/TestActorRefTests/ExceptionHandling.cs
@@ -0,0 +1,61 @@
// -----------------------------------------------------------------------
// <copyright file="ExceptionHandling.cs" company="Akka.NET Project">
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using System;
using System.Threading.Tasks;
using Akka.Actor;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;
using static FluentAssertions.FluentActions;

namespace Akka.TestKit.Tests.TestActorRefTests
{
public class ExceptionHandling: TestKit.Xunit2.TestKit
{
private class GiveError
{ }

private class GiveErrorAsync
{ }

private class ExceptionActor : ReceiveActor
{
public ExceptionActor()
{
Receive<GiveError>((b) => throw new Exception("WAT"));

ReceiveAsync<GiveErrorAsync>(async (b) =>
{
await Task.Delay(TimeSpan.FromSeconds(0.1));
throw new Exception("WATASYNC");
});
}
}

public ExceptionHandling(ITestOutputHelper helper) : base("akka.loglevel = debug", helper)
{
}

[Fact]
public void GetException()
{
var props = Props.Create<ExceptionActor>();
var subject = new TestActorRef<ExceptionActor>(Sys, props, null, "testA");
Invoking(() => subject.Receive(new GiveError()))
.Should().Throw<Exception>().WithMessage("WAT");
}

[Fact]
public async Task GetExceptionAsync()
{
var props = Props.Create<ExceptionActor>();
var subject = new TestActorRef<ExceptionActor>(Sys, props, null, "testB");
await Awaiting(() => subject.ReceiveAsync(new GiveErrorAsync()))
.Should().ThrowAsync<Exception>().WithMessage("WATASYNC");
}
}
}
74 changes: 68 additions & 6 deletions src/core/Akka.TestKit/Internal/InternalTestActorRef.cs
Expand Up @@ -6,11 +6,12 @@
//-----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Actor.Internal;
using Akka.Dispatch;
using Akka.Dispatch.SysMsg;
using Akka.Pattern;
using Akka.Util;
using Akka.Util.Internal;
Expand Down Expand Up @@ -89,6 +90,14 @@ public void Receive(object message, IActorRef sender = null)
cell.UseThreadContext(() => cell.ReceiveMessageForTest(envelope));
}

public Task ReceiveAsync(object message, IActorRef sender = null)
{
var cell = (TestActorCell)Cell;
sender = sender.IsNobody() ? cell.System.DeadLetters : sender;
var envelope = new Envelope(message, sender);
return cell.UseThreadContextAsync(() => cell.ReceiveMessageForTestAsync(envelope));
}

/// <summary>
/// TBD
/// </summary>
Expand Down Expand Up @@ -245,25 +254,73 @@ public override ActorTaskScheduler TaskScheduler
if (taskScheduler != null)
return taskScheduler;

taskScheduler = new TestActorTaskScheduler(this);
taskScheduler = new TestActorTaskScheduler(this, TaskFailureHook);
return Interlocked.CompareExchange(ref _taskScheduler, taskScheduler, null) ?? taskScheduler;
}
}


private readonly Dictionary<object, TaskCompletionSource<Done>> _testActorTasks =
new Dictionary<object, TaskCompletionSource<Done>>();

/// <summary>
/// This is only intended to be called from TestKit's TestActorRef
/// </summary>
/// <param name="envelope">TBD</param>
public Task ReceiveMessageForTestAsync(Envelope envelope)
{
var tcs = new TaskCompletionSource<Done>();
_testActorTasks[envelope.Message] = tcs;
ReceiveMessageForTest(envelope);
return tcs.Task;
}

/// <summary>
/// TBD
/// </summary>
/// <param name="actionAsync">TBD</param>
public Task UseThreadContextAsync(Func<Task> actionAsync)
{
var tmp = InternalCurrentActorCellKeeper.Current;
InternalCurrentActorCellKeeper.Current = this;
try
{
return actionAsync();
}
finally
{
//ensure we set back the old context
InternalCurrentActorCellKeeper.Current = tmp;
}
}

private void TaskFailureHook(object message, Exception exception)
{
if (!_testActorTasks.TryGetValue(message, out var tcs))
return;
if (exception is { })
tcs.TrySetException(exception);
else
tcs.TrySetResult(Done.Instance);
_testActorTasks.Remove(message);
}

/// <summary>
/// TBD
/// </summary>
public new object Actor { get { return base.Actor; } }
}

internal class TestActorTaskScheduler : ActorTaskScheduler
internal class TestActorTaskScheduler : ActorTaskScheduler, IAsyncResultInterceptor
{
private readonly ActorCell _testActorCell;
private readonly TestActorCell _testActorCell;
private readonly Action<object, Exception> _taskCallback;

/// <inheritdoc />
internal TestActorTaskScheduler(ActorCell testActorCell) : base(testActorCell)
internal TestActorTaskScheduler(ActorCell testActorCell, Action<object, Exception> taskCallback) : base(testActorCell)
{
_testActorCell = testActorCell;
_taskCallback = taskCallback;
_testActorCell = (TestActorCell) testActorCell;
}

/// <inheritdoc />
Expand All @@ -277,6 +334,11 @@ protected override void OnAfterTaskCompleted()
{
ActorCellKeepingSynchronizationContext.AsyncCache = null;
}

public void OnTaskCompleted(object message, Exception exception)
{
_taskCallback(message, exception);
}
}

/// <summary>
Expand Down
17 changes: 17 additions & 0 deletions src/core/Akka.TestKit/TestActorRefBase.cs
Expand Up @@ -7,6 +7,7 @@

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Dispatch;
using Akka.Dispatch.SysMsg;
Expand Down Expand Up @@ -51,6 +52,22 @@ public void Receive(object message, IActorRef sender = null)
_internalRef.Receive(message, sender);
}

/// <summary>
/// Directly inject messages into actor ReceiveAsync behavior. Any exceptions
/// thrown will be available to you, while still being able to use
/// become/unbecome.
/// Note: This method violates the actor model and could cause unpredictable
/// behavior. For example, a Receive call to an actor could run simultaneously
/// (2 simultaneous threads running inside the actor) with the actor's handling
/// of a previous Tell call.
/// </summary>
/// <param name="message">The message.</param>
/// <param name="sender">The sender.</param>
public Task ReceiveAsync(object message, IActorRef sender = null)
{
return _internalRef.ReceiveAsync(message, sender);
}

/// <summary>
/// TBD
/// </summary>
Expand Down
15 changes: 11 additions & 4 deletions src/core/Akka/Dispatch/ActorTaskScheduler.cs
Expand Up @@ -149,7 +149,7 @@ public static void RunTask(Func<Task> asyncAction)
//suspend the mailbox
dispatcher.Suspend(context);

ActorTaskScheduler actorScheduler = context.TaskScheduler;
var actorScheduler = context.TaskScheduler;
actorScheduler.CurrentMessage = context.CurrentMessage;

actorScheduler.OnBeforeTaskStarted();
Expand All @@ -158,18 +158,21 @@ public static void RunTask(Func<Task> asyncAction)
.Unwrap()
.ContinueWith(parent =>
{
Exception exception = GetTaskException(parent);
var exception = GetTaskException(parent);
if (exception == null)
{
dispatcher.Resume(context);
context.CheckReceiveTimeout();
}
else
{
context.Self.AsInstanceOf<IInternalActorRef>().SendSystemMessage(new ActorTaskSchedulerMessage(exception, actorScheduler.CurrentMessage));
}
// Used by TestActorRef to intercept async execution result
if(actorScheduler is IAsyncResultInterceptor interceptor)
interceptor.OnTaskCompleted(actorScheduler.CurrentMessage, exception);
//clear the current message field of the scheduler
actorScheduler.CurrentMessage = null;
actorScheduler.OnAfterTaskCompleted();
Expand Down Expand Up @@ -203,3 +206,7 @@ private static Exception TryUnwrapAggregateException(AggregateException aggregat
}
}

internal interface IAsyncResultInterceptor
{
void OnTaskCompleted(object message, Exception exception);
}

0 comments on commit 5605d83

Please sign in to comment.