Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ReceivePersistentActor.CommandAsync missing active ActorContext after await #3894

Open
balcko opened this issue Aug 28, 2019 · 0 comments
Open

Comments

@balcko
Copy link

balcko commented Aug 28, 2019

Hi, we are using latest Akka 1.3.14 version with target framework both net462 and netcoreapp2.2
There is a problem in asynchronous command handlers when using certain operations (Persist, PersistAsync, Become, BecomeStacked, SetReceiveTimeout) in the continuation after await. Exception thrown states

System.NotSupportedException: There is no active ActorContext, this is most likely due to use of async operations from within this actor.

The problem is that static property ActorBase.Context (called by all affected operations) uses internally ThreadStatic field not resistant to thread switches on continuation.

I managed to simulate the issue reliably with using await task.ConfigureAwait(false). I think that theoretically issue may arise also without using ConfigureAwait(false), since pure await does not guarantee that continuation will run on the same thread, thus preserving ThreadStatic value.

So far we have solved the problem by creating custom base actor class inheriting from ReceivePersistentActor, which captures internally its Context and overwrites implementation of affected operations by wrapping them in ActorCell.UseThreadContext(action).

I have attached a code snippet from our integration tests project showing the issue.

[Fact]
public async Task SaveOk()
{
	// ARRANGE
	var actorRef = ActorSystem.ActorOf(Props.Create(() => new AsyncHandlingActor("SaveActor")), "SaveActor");

	// ACT
	var result = await actorRef.Ask<bool>(new Save("data"), TimeSpan.FromSeconds(5));

	// ASSERT
	Assert.True(result);
	// failed on timeout since Persist crashed
}

// actor
public sealed class AsyncHandlingActor : ReceivePersistentActor
{
	public override string PersistenceId { get; }
	private HashSet<string> State { get; }

	public AsyncHandlingActor(string persistenceId)
	{
		PersistenceId = persistenceId;
		State = new HashSet<string>();

		Recover<Saved>(saved => { State.Add(saved.Item); });
		Recover<SnapshotOffer>(snapshot =>
		{
			// snapshotting not used
		});

		CommandAsync<Save>(async save =>
		{
			var sender = Sender;
			await Task.Delay(10).ConfigureAwait(false);
			Persist(new Saved(save.Item), saved =>
			{
				State.Add(saved.Item);
				sender.Tell(true);
			});
		});
	}
}

// events
[MessagePackObject]
[Manifest("CommandAsyncIssues_Saved")]
public sealed class Saved
{
	[SerializationConstructor]
	public Saved(string item)
	{
		Item = item;
	}

	[Key(0)]
	public string Item { get; }
}

// commands
public sealed class Save
{
	public Save(string item)
	{
		Item = item;
	}

	public string Item { get; }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant