Skip to content

Commit

Permalink
Introduce ChannelExecutor (#4882)
Browse files Browse the repository at this point in the history
* added `ChannelExecutor` dispatcher - uses `FixedConcurrencyTaskScheduler` internally - have `ActorSystem.Create` call `ThreadPool.SetMinThreads(0,0)` to improve performance across the board.

* fixed documentation errors
  • Loading branch information
Aaronontheweb committed Apr 28, 2021
1 parent e58d36a commit bdfc893
Show file tree
Hide file tree
Showing 10 changed files with 217 additions and 20 deletions.
58 changes: 58 additions & 0 deletions docs/articles/actors/dispatchers.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ Some dispatcher configurations are available out-of-the-box for convenience. You
* **task-dispatcher** - A configuration that uses the [TaskDispatcher](#taskdispatcher).
* **default-fork-join-dispatcher** - A configuration that uses the [ForkJoinDispatcher](#forkjoindispatcher).
* **synchronized-dispatcher** - A configuration that uses the [SynchronizedDispatcher](#synchronizeddispatcher).
* **channel-executor** - new as of v1.4.19, the [`ChannelExecutor`](#channelexecutor) is used to run on top of the .NET `ThreadPool` and allow Akka.NET to dynamically scale thread usage up and down with demand in exchange for better CPU and throughput performance.

## Built-in Dispatchers

Expand Down Expand Up @@ -165,6 +166,63 @@ private void Form1_Load(object sender, System.EventArgs e)
}
```

### `ChannelExecutor`
In Akka.NET v1.4.19 we will be introducing an opt-in feature, the `ChannelExecutor` - a new dispatcher type that re-uses the same configuration as a `ForkJoinDispatcher` but runs entirely on top of the .NET `ThreadPool` and is able to take advantage of dynamic thread pool scaling to size / resize workloads on the fly.

During its initial development and benchmarks, we observed the following:

1. The `ChannelExecutor` tremendously reduced idle CPU and max busy CPU even during peak message throughput, primarily as a result of dynamically shrinking the total `ThreadPool` to only the necessary size. This resolves one of the largest complaints large users of Akka.NET have today. However, **in order for this setting to be effective `ThreadPool.SetMin(0,0)` must also be set**. We are considering doing this inside the `ActorSystem.Create` method, those settings don't work for you you can easily override them by simply calling `ThreadPool.SetMin(yourValue, yourValue)` again after `ActorSystem.Create` has exited.
2. The `ChannelExecutor` actually beat the `ForkJoinDispatcher` and others on performance even in environments like Docker and bare metal on Windows.

> [!NOTE]
> We are in the process of gathering data from users on how well `ChannelExecutor` performs in the real world. If you are interested in trying out the `ChannelExecutor`, please read the directions in this document and then comment on [the "Akka.NET v1.4.19: ChannelExecutor performance data" discussion thread](https://github.com/akkadotnet/akka.net/discussions/4983).
The `ChannelExectuor` re-uses the same threading settings as the `ForkJoinExecutor` to determine its effective upper and lower parallelism limits, and you can configure the `ChannelExecutor` to run inside your `ActorSystem` via the following HOCON configuration:

```
akka.actor.default-dispatcher = {
executor = channel-executor
fork-join-executor { #channelexecutor will re-use these settings
parallelism-min = 2
parallelism-factor = 1
parallelism-max = 64
}
}
akka.actor.internal-dispatcher = {
executor = channel-executor
throughput = 5
fork-join-executor {
parallelism-min = 4
parallelism-factor = 1.0
parallelism-max = 64
}
}
akka.remote.default-remote-dispatcher {
type = Dispatcher
executor = channel-executor
fork-join-executor {
parallelism-min = 2
parallelism-factor = 0.5
parallelism-max = 16
}
}
akka.remote.backoff-remote-dispatcher {
executor = channel-executor
fork-join-executor {
parallelism-min = 2
parallelism-max = 2
}
}
```

This will enable the `ChannelExecutor` to run everywhere and all Akka.NET loads, with the exception of anything you manually allocate onto a `ForkJoinDispatcher` or `PinnedDispatcher`, will be managed by the `ThreadPool`.

> [!IMPORTANT]
> As of Akka.NET v1.4.19, we call `ThreadPool.SetMinThreads(0,0)` inside the `ActorSystem.Create` method as we've found that the default `ThreadPool` minimum values have a negative impact on performance. However, if this causes undesireable side effects for you inside your application you can always override those settings by calling `ThreadPool.SetMinThreads(yourValue, yourValue)` again after you've created your `ActorSystem`.
#### Common Dispatcher Configuration

The following configuration keys are available for any dispatcher configuration:
Expand Down
1 change: 1 addition & 0 deletions src/benchmark/Akka.Benchmarks/Actor/PingPongBenchmarks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Benchmarks.Configurations;
using Akka.Configuration;
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Engines;

Expand Down
3 changes: 2 additions & 1 deletion src/benchmark/RemotePingPong/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public static uint CpuSpeed()
public static Config CreateActorSystemConfig(string actorSystemName, string ipOrHostname, int port)
{
var baseConfig = ConfigurationFactory.ParseString(@"
akka {
akka {
actor.provider = remote
loglevel = ERROR
suppress-json-serializer-warning = on
Expand All @@ -57,6 +57,7 @@ public static Config CreateActorSystemConfig(string actorSystemName, string ipOr
port = 0
hostname = ""localhost""
}
}
}");

Expand Down
26 changes: 23 additions & 3 deletions src/core/Akka.Cluster.Tests.MultiNode/StressSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public StressSpecConfig()
convergence-within-factor = 1.0
}
akka.actor.provider = cluster
akka.cluster {
failure-detector.acceptable-heartbeat-pause = 3s
downing-provider-class = ""Akka.Cluster.SplitBrainResolver, Akka.Cluster""
Expand All @@ -86,10 +87,29 @@ public StressSpecConfig()
akka.loggers = [""Akka.TestKit.TestEventListener, Akka.TestKit""]
akka.loglevel = INFO
akka.remote.log-remote-lifecycle-events = off
akka.actor.default-dispatcher.fork-join-executor {
parallelism - min = 8
parallelism - max = 8
akka.actor.default-dispatcher = {
executor = channel-executor
fork-join-executor {
parallelism-min = 2
parallelism-factor = 1
parallelism-max = 64
}
}
akka.actor.internal-dispatcher = {
executor = channel-executor
fork-join-executor {
parallelism-min = 2
parallelism-factor = 1
parallelism-max = 64
}
}
akka.remote.default-remote-dispatcher {
executor = channel-executor
fork-join-executor {
parallelism-min = 2
parallelism-factor = 0.5
parallelism-max = 16
}
");

TestTransport = true;
Expand Down
21 changes: 9 additions & 12 deletions src/core/Akka.Remote/Configuration/Remote.conf
Original file line number Diff line number Diff line change
Expand Up @@ -578,23 +578,20 @@ akka {

### Default dispatcher for the remoting subsystem

### Default dispatcher for the remoting subsystem

default-remote-dispatcher {
type = ForkJoinDispatcher
executor = fork-join-executor
dedicated-thread-pool {
# Fixed number of threads to have in this threadpool
thread-count = 4
executor = fork-join-executor
fork-join-executor {
parallelism-min = 2
parallelism-factor = 0.5
parallelism-max = 16
}
}

backoff-remote-dispatcher {
type = ForkJoinDispatcher
executor = fork-join-executor
dedicated-thread-pool {
# Fixed number of threads to have in this threadpool
thread-count = 4
executor = fork-join-executor
fork-join-executor {
parallelism-min = 2
parallelism-max = 2
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,17 @@ protected override MessageDispatcherConfigurator Configurator()
return new DispatcherConfigurator(DispatcherConfiguration, Prereqs);
}
}

public class ChannelDispatcherExecutorThroughputSpec : WarmDispatcherThroughputSpecBase
{
public static Config DispatcherConfiguration => ConfigurationFactory.ParseString(@"
id = PerfTest
executor = channel-executor
");

protected override MessageDispatcherConfigurator Configurator()
{
return new DispatcherConfigurator(DispatcherConfiguration, Prereqs);
}
}
}
4 changes: 4 additions & 0 deletions src/core/Akka/Actor/ActorSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,10 @@ public static ActorSystem Create(string name)

private static ActorSystem CreateAndStartSystem(string name, Config withFallback, ActorSystemSetup setup)
{
// allows the ThreadPool to scale up / down dynamically
// by removing minimum thread count, which in our benchmarks
// appears to negatively impact performance
ThreadPool.SetMinThreads(0, 0);
var system = new ActorSystemImpl(name, withFallback, setup, Option<Props>.None);
system.Start();
return system;
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka/Akka.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

<ItemGroup>
<PackageReference Include="Newtonsoft.Json" Version="$(NewtonsoftJsonVersion)" />
<PackageReference Include="System.Collections.Immutable" Version="5.0.0" />
<PackageReference Include="System.Collections.Immutable" Version="5.0.0" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == '$(NetStandardLibVersion)'">
Expand Down
22 changes: 22 additions & 0 deletions src/core/Akka/Dispatch/AbstractDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,26 @@ protected ExecutorServiceConfigurator(Config config, IDispatcherPrerequisites pr
public IDispatcherPrerequisites Prerequisites { get; private set; }
}

internal sealed class ChannelExecutorConfigurator : ExecutorServiceConfigurator
{
public ChannelExecutorConfigurator(Config config, IDispatcherPrerequisites prerequisites) : base(config, prerequisites)
{
var fje = config.GetConfig("fork-join-executor");
MaxParallelism = ThreadPoolConfig.ScaledPoolSize(
fje.GetInt("parallelism-min"),
fje.GetDouble("parallelism-factor", 1.0D), // the scalar-based factor to scale the threadpool size to
fje.GetInt("parallelism-max"));
}

public int MaxParallelism {get;}

public override ExecutorService Produce(string id)
{
Prerequisites.EventStream.Publish(new Debug($"ChannelExecutor-[id]", typeof(FixedConcurrencyTaskScheduler), $"Launched Dispatcher [{id}] with MaxParallelism=[{MaxParallelism}]"));
return new TaskSchedulerExecutor(id, new FixedConcurrencyTaskScheduler(MaxParallelism));
}
}

/// <summary>
/// INTERNAL API
///
Expand Down Expand Up @@ -306,6 +326,8 @@ protected ExecutorServiceConfigurator ConfigureExecutor()
return new CurrentSynchronizationContextExecutorServiceFactory(Config, Prerequisites);
case "task-executor":
return new DefaultTaskSchedulerExecutorConfigurator(Config, Prerequisites);
case "channel-executor":
return new ChannelExecutorConfigurator(Config, Prerequisites);
default:
Type executorConfiguratorType = Type.GetType(executor);
if (executorConfiguratorType == null)
Expand Down
87 changes: 84 additions & 3 deletions src/core/Akka/Dispatch/Dispatchers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
Expand Down Expand Up @@ -91,6 +92,86 @@ public PartialTrustThreadPoolExecutorService(string id) : base(id)
}
}

/// <summary>
/// INTERNAL API
///
/// Used to power <see cref="ChannelExecutorConfigurator"/>
/// </summary>
internal sealed class FixedConcurrencyTaskScheduler : TaskScheduler
{

[ThreadStatic]
private static bool _threadRunning = false;
private ConcurrentQueue<Task> _tasks = new ConcurrentQueue<Task>();

private int _readers = 0;

public FixedConcurrencyTaskScheduler(int degreeOfParallelism)
{
MaximumConcurrencyLevel = degreeOfParallelism;
}


public override int MaximumConcurrencyLevel { get; }

/// <summary>
/// ONLY USED IN DEBUGGER - NO PERF IMPACT.
/// </summary>
protected override IEnumerable<Task> GetScheduledTasks()
{
return _tasks;
}

protected override bool TryDequeue(Task task)
{
return false;
}

protected override void QueueTask(Task task)
{
_tasks.Enqueue(task);
if (_readers < MaximumConcurrencyLevel)
{
var initial = _readers;
var newVale = _readers + 1;
if (initial == Interlocked.CompareExchange(ref _readers, newVale, initial))
{
// try to start a new worker
ThreadPool.UnsafeQueueUserWorkItem(_ => ReadChannel(), null);
}
}
}

protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
// If this thread isn't already processing a task, we don't support inlining
if (!_threadRunning) return false;
return TryExecuteTask(task);
}

public void ReadChannel()
{
_threadRunning = true;
try
{
while (_tasks.TryDequeue(out var runnable))
{
base.TryExecuteTask(runnable);
}
}
catch
{
// suppress exceptions
}
finally
{
Interlocked.Decrement(ref _readers);

_threadRunning = false;
}
}
}


/// <summary>
/// INTERNAL API
Expand Down Expand Up @@ -273,7 +354,7 @@ public MessageDispatcher DefaultGlobalDispatcher
internal MessageDispatcher InternalDispatcher { get; }

/// <summary>
/// The <see cref="Hocon.Config"/> for the default dispatcher.
/// The <see cref="Configuration.Config"/> for the default dispatcher.
/// </summary>
public Config DefaultDispatcherConfig
{
Expand Down Expand Up @@ -336,7 +417,7 @@ public bool HasDispatcher(string id)
private MessageDispatcherConfigurator LookupConfigurator(string id)
{
var depth = 0;
while(depth < MaxDispatcherAliasDepth)
while (depth < MaxDispatcherAliasDepth)
{
if (_dispatcherConfigurators.TryGetValue(id, out var configurator))
return configurator;
Expand Down Expand Up @@ -374,7 +455,7 @@ private MessageDispatcherConfigurator LookupConfigurator(string id)
/// <summary>
/// INTERNAL API
///
/// Creates a dispatcher from a <see cref="Hocon.Config"/>. Internal test purpose only.
/// Creates a dispatcher from a <see cref="Configuration.Config"/>. Internal test purpose only.
/// <code>
/// From(Config.GetConfig(id));
/// </code>
Expand Down

0 comments on commit bdfc893

Please sign in to comment.