Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Edge Case] Actor Context might not be available inside ReceiveAsync #7122

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 129 additions & 0 deletions src/core/Akka.Tests/Dispatch/AsyncAwaitSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,18 @@
//-----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.TestKit;
using FluentAssertions;
using FluentAssertions.Extensions;
using Xunit;
using Xunit.Abstractions;
using Xunit.Sdk;

namespace Akka.Tests.Dispatch
{
Expand Down Expand Up @@ -276,8 +283,130 @@ public RestartMessage(object message)
}
}

internal sealed class CommandMessage
{
public static readonly CommandMessage Instance = new();
private CommandMessage() { }
}
internal sealed record ReplyMessage(IActorRef Sender);
internal sealed record ResultMessage(IActorRef Self, IActorRef Sender);
internal sealed record ExceptionMessage(IActorRef Self, Exception Cause);

internal sealed class SelfAccessingReceiveAsyncActor: ReceiveActor, IWithTimers
{
private readonly ThirdPartyCode _worker = new();
public ITimerScheduler Timers { get; set; }

public SelfAccessingReceiveAsyncActor(IActorRef probe)
{
ReceiveAsync<CommandMessage>(async _ =>
{
var self = Self;
try
{
await _worker.Execute(() => { Self.Tell(new ReplyMessage(Self)); });
Self.Tell(new ReplyMessage(Self));
}
catch (Exception e)
{
probe.Tell(new ExceptionMessage(self, e));
throw;
}
});

Receive<ReplyMessage>(msg =>
{
probe.Tell(new ResultMessage(Sender, msg.Sender));
});
}

// Emulating a third party async code that breaks async context flow
private sealed class ThirdPartyCode
{
public async Task Execute(Action action)
{
await Task.Delay(TimeSpan.FromMilliseconds(50))
// ConfigureAwait(false) breaks async context flow, making Self inaccessible inside Action
.ConfigureAwait(false);
Comment on lines +329 to +330
Copy link
Contributor Author

@Arkatufus Arkatufus Mar 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this unit test will run just fine if we remove .ConfigureAwait(false), but of course users would likely be stuck with code that uses .ConfigureAwait(false) if they were using a third party package.

action();
}
}
}

public class ActorAsyncAwaitSpec : AkkaSpec
{
private readonly ITestOutputHelper _output;

public ActorAsyncAwaitSpec(ITestOutputHelper output)
{
_output = output;
}

[Fact(DisplayName = "ReceiveActor.Self should be preserved inside ReceiveAsync async callback")]
public async Task ReceiveActorSelfInsideAsyncCallback()
{
const int totalActors = 10;
var probe = CreateTestProbe();
var actors = new IActorRef[totalActors];

foreach (var i in Enumerable.Range(0, totalActors))
{
actors[i] = Sys.ActorOf(Props.Create(() => new SelfAccessingReceiveAsyncActor(probe)), $"worker_{i}");
}

foreach (var i in Enumerable.Range(0, totalActors))
{
actors[i].Tell(CommandMessage.Instance, ActorRefs.Nobody);
}

var messages = new List<object>();
var stepTime = 50.Milliseconds();
var timeout = 10.Seconds();
var stopwatch = Stopwatch.StartNew();
while (stopwatch.Elapsed < timeout)
{
var (success, envelope) = await probe.TryReceiveOneAsync(TimeSpan.Zero);
if (success)
{
messages.Add(envelope.Message);
}

await Task.Delay(stepTime);
}

while (probe.HasMessages)
{
var (success, envelope) = await probe.TryReceiveOneAsync(TimeSpan.Zero);
if (success)
{
messages.Add(envelope.Message);
}
}
stopwatch.Stop();

var exceptions = new List<ExceptionMessage>();
_output.WriteLine($"Received {messages.Count} messages within {stopwatch.Elapsed.TotalSeconds} seconds. Expected message counts: {totalActors * 2}");
foreach (var message in messages)
{
switch (message)
{
case ResultMessage m:
m.Self.Should().Be(m.Sender);
break;
case ExceptionMessage m:
exceptions.Add(m);
break;
default:
throw new XunitException($"Unexpected message [{message.GetType()}]: {message}");
}
}

if (exceptions.Count > 0)
throw new XunitException(
$"{exceptions.Count} Exception(s) was thrown while actors were processing messages." +
$"{string.Join("", exceptions.Select(m => $"\n actor: {m.Self.Path.Name}, exception: {m.Cause.Message}"))}");
}

[Fact]
public async Task UntypedActors_should_be_able_to_async_await_ask_message_loop()
{
Expand Down
Loading