Skip to content

Commit

Permalink
WIP queue benchmarks (#6127)
Browse files Browse the repository at this point in the history
* WIP queue benchmarks

* completed MailboxThroughputBenchmarks

* disable `CallingThreadDispatcher`
  • Loading branch information
Aaronontheweb committed Oct 8, 2022
1 parent bf543ff commit 70c57df
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 0 deletions.
42 changes: 42 additions & 0 deletions src/benchmark/Akka.Benchmarks/Dispatch/CallingThreadExecutor.cs
@@ -0,0 +1,42 @@
//-----------------------------------------------------------------------
// <copyright file="CallingThreadExecutor.cs" company="Akka.NET Project">
// Copyright (C) 2009-2021 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using Akka.Configuration;
using Akka.Dispatch;

namespace Akka.Benchmarks.Dispatch
{
public class CallingThreadExecutor : ExecutorService
{
public CallingThreadExecutor(string id) : base(id)
{
}

public override void Execute(IRunnable run)
{
run.Run();
}

public override void Shutdown()
{

}
}

public class CallingThreadExecutorConfigurator : ExecutorServiceConfigurator
{
public CallingThreadExecutorConfigurator(Config config, IDispatcherPrerequisites prerequisites) : base(config, prerequisites)
{
}

public override ExecutorService Produce(string id)
{
return new CallingThreadExecutor(id);
}
}
}

121 changes: 121 additions & 0 deletions src/benchmark/Akka.Benchmarks/Dispatch/MailboxThroughputBenchmarks.cs
@@ -0,0 +1,121 @@
//-----------------------------------------------------------------------
// <copyright file="MailboxThroughput.cs" company="Akka.NET Project">
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Actor.Dsl;
using Akka.Benchmarks.Configurations;
using Akka.Configuration;
using Akka.Dispatch;
using Akka.Dispatch.MessageQueues;
using Akka.Util.Internal;
using BenchmarkDotNet.Attributes;

namespace Akka.Benchmarks.Dispatch
{
[Config(typeof(MicroBenchmarkConfig))]
public class MailboxThroughputBenchmarks
{
private ActorSystem _system;
private Mailbox _enqueueMailbox;
private Mailbox _runMailbox;
private IActorRef _testActor;
private TaskCompletionSource<int> _completionSource;
private Envelope _msg;

private class CompletionActor : ReceiveActor
{
private readonly TaskCompletionSource<int> _tcs;
private readonly int _target;
private int _count = 0;

public CompletionActor(int target, TaskCompletionSource<int> tcs)
{
_target = target;
_tcs = tcs;

ReceiveAny(_ =>
{
if (++_count == _target)
{
_tcs.TrySetResult(0);
}
});
}
}

public static readonly Config Config = ConfigurationFactory.ParseString(@"
calling-thread-dispatcher{
executor=""" + typeof(CallingThreadExecutorConfigurator).AssemblyQualifiedName + @"""
#throughput = 100
}
");

[Params(10_000, 100_000, 1_000_000, 10_000_000)] // higher values will cause the CallingThreadDispatcher to stack overflow
public int MsgCount { get; set; }

//[Params(true, false)]
public bool UseCallingThreadDispatcher { get; set; } = false;

[GlobalSetup]
public void Setup()
{
_system = ActorSystem.Create("Bench", Config);
_msg = new Envelope("hit", ActorRefs.NoSender);
}

[IterationSetup]
public void IterationSetup()
{
_completionSource = new TaskCompletionSource<int>();
var props = Props.Create(() => new CompletionActor(MsgCount, _completionSource));
var finalProps = UseCallingThreadDispatcher ? props.WithDispatcher("calling-thread-dispatcher") : props;
_testActor = _system.ActorOf(finalProps);

var repointableActorRef = _testActor.AsInstanceOf<RepointableActorRef>();
if (UseCallingThreadDispatcher) {
// have to perform the work of the supervisor ourselves

repointableActorRef.Point();
}


// have to force actor to start before we acquire cell
var id = _testActor.Ask<ActorIdentity>(new Identify(null), TimeSpan.FromSeconds(3)).Result;

_enqueueMailbox = new Mailbox(new UnboundedMessageQueue());
_enqueueMailbox.SetActor(repointableActorRef.Underlying.AsInstanceOf<ActorCell>());

_runMailbox = new Mailbox(new UnboundedMessageQueue());
_runMailbox.SetActor(repointableActorRef.Underlying.AsInstanceOf<ActorCell>());

for(var i = 0; i < MsgCount; i++)
_runMailbox.MessageQueue.Enqueue(_testActor, _msg);
}

[IterationCleanup]
public void Cleanup()
{
_testActor.GracefulStop(TimeSpan.FromSeconds(3)).Wait();
}

[Benchmark]
public void EnqueuePerformance()
{
for(var i = 0; i < MsgCount; i++)
_enqueueMailbox.MessageQueue.Enqueue(_testActor, _msg);
}

[Benchmark]
public async Task RunPerformance()
{
_runMailbox.Run();
await _completionSource.Task;
}
}
}

0 comments on commit 70c57df

Please sign in to comment.