Skip to content

Commit

Permalink
Akka.Persistence.Query Throttling implementation - "QueryPermitter" (#…
Browse files Browse the repository at this point in the history
…6436)

* refactor all Akka.Persistence.Query implementations take an `IActorRef` param

refactor all Akka.Persistence.Query implementations take an `IActorRef` param so we can swap the journal reference for a "throttler" actor in the middle

* implemented journal

* bump buffer size to 1m queries

* fixed query design to factor in throttling

* Revert "fixed query design to factor in throttling"

This reverts commit e9d7639.

* implementing query permitter design

* implemented persistent id queries

* completed query permitter implementation

* make max queries configurable

* fixed backpressure impl such that tokens are only returned after queries are completed

* bump up event count

* Revert "bump up event count"

This reverts commit 6ae4f41.

* fixed reply to QueryPermitter
  • Loading branch information
Aaronontheweb committed Feb 24, 2023
1 parent f0ff08a commit 0bc2464
Show file tree
Hide file tree
Showing 9 changed files with 512 additions and 222 deletions.
42 changes: 11 additions & 31 deletions src/common.props
Expand Up @@ -40,39 +40,19 @@
<CopyLocalLockFileAssemblies>true</CopyLocalLockFileAssemblies>
</PropertyGroup>
<PropertyGroup>
<PackageReleaseNotes>Version 1.5.0-beta1 contains **breaking API changes** and new API changes for Akka.NET.
Breaking Changes: Logging**
In [https://github.com/akkadotnet/akka.net/pull/6408](https://github.com/akkadotnet/akka.net/pull/6408) the entire `ILoggingAdapter` interface was rewritten in order to improve extensibility and performance (logging is now 30-40% faster in all cases and allocates ~50% fewer objects for large format strings).
All of the changes made here are _source compatible_, but not _binary compatible_ - meaning that users and package authors will need to do the following:
Add `using Akka.Event` in all files that used the `ILoggingAdapter` and
Recompile.
&gt; NOTE: you can use a [`global using Akka.Event` directive](https://devblogs.microsoft.com/dotnet/welcome-to-csharp-10/#global-using-directives) to do this solution / project-wide if your project supports C# 10 and / or .NET 6.
In addition to improving the performance of the `ILoggingAdapter` system, we've also made it more extensible - for instance, you can now globally configure the `ILogMessageFormatter` via the following HOCON:
```
akka {
loglevel=INFO,
loggers=["Akka.Logger.Serilog.SerilogLogger, Akka.Logger.Serilog"]
logger-formatter="Akka.Logger.Serilog.SerilogLogMessageFormatter, Akka.Logger.Serilog"
}
```
That will allow users to use the `SerilogLogMessageFormatter` globally throughout their applications - no more annoying calls like this inside individual actors that want to use semantic logging:
```csharp
private readonly ILoggingAdapter _logger = Context.GetLogger&lt;SerilogLoggingAdapter&gt;();
```
Breaking Changes: Akka.Persistence.Sql.Common**
This is a breaking change that should effect almost no users, but [we deleted some old, bad ideas from the API surface](https://github.com/akkadotnet/akka.net/pull/6412) and it might require all Akka.Persistence.Sql* plugins to be recompiled.
For what it's worth, [Akka.Persistence.Sql.Common's performance has been improved significantly](https://github.com/akkadotnet/akka.net/pull/6384) and we'll continue working on that with some additional API changes this week.
Other Changes and Additions**
[Akka.Actor: New API - `IActorRef.WatchAsync`](https://github.com/akkadotnet/akka.net/pull/6102) - adds a new extension method to `IActorRef` which allows users to subscribe to actor lifecycle notifications outside of the `ActorSystem`.
[Akka.Actor: Suppress `System.Object` warning for serializer configuration changes](https://github.com/akkadotnet/akka.net/issues/6377)
If you want to see the [full set of changes made in Akka.NET v1.5.0 so far, click here](https://github.com/akkadotnet/akka.net/milestone/7?closed=1).
<PackageReleaseNotes>Version 1.5.0-beta2 contains **breaking API changes** and new API changes for Akka.NET.
[Akka.Event: Add K to the DateTime format string to include TZ information](https://github.com/akkadotnet/akka.net/pull/6419)
[Akka.TestKit: Reintroduce old code and mark them obsolete](https://github.com/akkadotnet/akka.net/pull/6420) - fixes major regression in Akka.TestKit.Xunit2 where we removed `IDipsoable` before. This PR reintroduces it for backwards compat.
[Akka.Cluster.Sharding: clean its internal cache if region/proxy died](https://github.com/akkadotnet/akka.net/pull/6424)
[Akka.Util: Harden `Option&lt;T&gt;` by disallowing null value](https://github.com/akkadotnet/akka.net/pull/6426)
[Akka.Util: move `DateTime` / `TimeSpan` extension APIs out of Akka.Util and into Akka.Cluster.Metrics](https://github.com/akkadotnet/akka.net/pull/6427)
[Akka.Util: Remove unsafe `implicit` conversion operators in `AtomicBoolean` and `AtomicReference&lt;T&gt;`](https://github.com/akkadotnet/akka.net/pull/6429)
[Akka: Standardize on C# 11.0](https://github.com/akkadotnet/akka.net/pull/6431)
[Akka.Persistence: improve `AsyncWriteJournal` and `PersistentActor` performance](https://github.com/akkadotnet/akka.net/pull/6432)
| COMMITS | LOC+ | LOC- | AUTHOR |
| --- | --- | --- | --- |
| 12 | 15 | 15 | dependabot[bot] |
| 11 | 1930 | 1278 | Aaron Stannard |
| 2 | 143 | 73 | Sergey Popov |
| 1 | 26 | 4 | Thomas Stegemann |
| 1 | 1 | 1 | Michel van Os |</PackageReleaseNotes>
| 8 | 260 | 942 | Aaron Stannard |
| 5 | 169 | 60 | Gregorius Soedharmo |</PackageReleaseNotes>
</PropertyGroup>
<!-- SourceLink support for all Akka.NET projects -->
<ItemGroup>
Expand Down
@@ -1,5 +1,5 @@
<Project Sdk="Microsoft.NET.Sdk">
<Import Project="..\..\..\common.props"/>
<Import Project="..\..\..\common.props" />

<PropertyGroup>
<AssemblyTitle>Akka.Persistence.Query.Sql</AssemblyTitle>
Expand All @@ -10,10 +10,10 @@
</PropertyGroup>

<ItemGroup>
<EmbeddedResource Include="reference.conf"/>
<ProjectReference Include="..\..\..\core\Akka.Persistence\Akka.Persistence.csproj"/>
<ProjectReference Include="..\..\..\core\Akka.Persistence.Query\Akka.Persistence.Query.csproj"/>
<ProjectReference Include="..\Akka.Persistence.Sql.Common\Akka.Persistence.Sql.Common.csproj"/>
<EmbeddedResource Include="reference.conf" />
<ProjectReference Include="..\..\..\core\Akka.Persistence\Akka.Persistence.csproj" />
<ProjectReference Include="..\..\..\core\Akka.Persistence.Query\Akka.Persistence.Query.csproj" />
<ProjectReference Include="..\Akka.Persistence.Sql.Common\Akka.Persistence.Sql.Common.csproj" />
</ItemGroup>

<PropertyGroup Condition=" '$(Configuration)' == 'Release' ">
Expand Down
Expand Up @@ -6,8 +6,6 @@
//-----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Text;
using Akka.Actor;
using Akka.Event;
using Akka.Persistence.Sql.Common.Journal;
Expand All @@ -20,16 +18,16 @@ internal static class AllEventsPublisher
[Serializable]
public sealed class Continue
{
public static readonly Continue Instance = new Continue();
public static readonly Continue Instance = new();

private Continue() { }
}

public static Props Props(long fromOffset, TimeSpan? refreshInterval, int maxBufferSize, string writeJournalPluginId)
public static Props Props(long fromOffset, TimeSpan? refreshInterval, int maxBufferSize, IActorRef writeJournal, IActorRef queryPermitter)
{
return refreshInterval.HasValue ?
Actor.Props.Create(() => new LiveAllEventsPublisher(fromOffset, refreshInterval.Value, maxBufferSize, writeJournalPluginId)) :
Actor.Props.Create(() => new CurrentAllEventsPublisher(fromOffset, maxBufferSize, writeJournalPluginId));
Actor.Props.Create(() => new LiveAllEventsPublisher(fromOffset, refreshInterval.Value, maxBufferSize, writeJournal, queryPermitter)) :
Actor.Props.Create(() => new CurrentAllEventsPublisher(fromOffset, maxBufferSize, writeJournal, queryPermitter));
}
}

Expand All @@ -39,16 +37,18 @@ internal abstract class AbstractAllEventsPublisher : ActorPublisher<EventEnvelop
private ILoggingAdapter _log;
protected long CurrentOffset;

protected AbstractAllEventsPublisher(long fromOffset, int maxBufferSize, string writeJournalPluginId)
protected AbstractAllEventsPublisher(long fromOffset, int maxBufferSize, IActorRef journalRef, IActorRef queryPermitter)
{
CurrentOffset = FromOffset = fromOffset;
MaxBufferSize = maxBufferSize;
JournalRef = journalRef;
QueryPermitter = queryPermitter;
Buffer = new DeliveryBuffer<EventEnvelope>(OnNext);
JournalRef = Persistence.Instance.Apply(Context.System).JournalFor(writeJournalPluginId);
}

protected ILoggingAdapter Log => _log ??= Context.GetLogger();
protected IActorRef JournalRef { get; }
protected IActorRef QueryPermitter { get; }
protected DeliveryBuffer<EventEnvelope> Buffer { get; }
protected long FromOffset { get; }
protected abstract long ToOffset { get; }
Expand Down Expand Up @@ -81,7 +81,7 @@ protected bool Idle(object message)
switch (message)
{
case AllEventsPublisher.Continue _:
if (IsTimeForReplay) Replay();
if (IsTimeForReplay) RequestQueryPermit();
return true;
case Request _:
ReceiveIdleRequest();
Expand All @@ -94,6 +94,34 @@ protected bool Idle(object message)
}
}

protected void RequestQueryPermit()
{
Log.Debug("Requesting query permit");
QueryPermitter.Tell(RequestQueryStart.Instance);
Become(WaitingForQueryPermit);
}

protected bool WaitingForQueryPermit(object message)
{
switch (message)
{
case QueryStartGranted _:
Replay();
return true;
case AllEventsPublisher.Continue _:
// ignore
return true;
case Request _:
ReceiveIdleRequest(); // can still handle idle requests while waiting for permit
return true;
case Cancel _:
Context.Stop(Self);
return true;
default:
return false;
}
}

protected void Replay()
{
var limit = MaxBufferSize - Buffer.Length;
Expand Down Expand Up @@ -129,6 +157,7 @@ protected bool Replaying( object message )
Log.Error(failure.Cause, "event replay failed, due to [{0}]", failure.Cause.Message);
Buffer.DeliverBuffer(TotalDemand);
OnErrorThenStop(failure.Cause);
QueryPermitter.Tell(ReturnQueryStart.Instance); // return token to permitter
return true;
case Request _:
Buffer.DeliverBuffer(TotalDemand);
Expand All @@ -148,8 +177,8 @@ protected bool Replaying( object message )
internal sealed class LiveAllEventsPublisher : AbstractAllEventsPublisher
{
private readonly ICancelable _tickCancelable;
public LiveAllEventsPublisher(long fromOffset, TimeSpan refreshInterval, int maxBufferSize, string writeJournalPluginId)
: base(fromOffset, maxBufferSize, writeJournalPluginId)
public LiveAllEventsPublisher(long fromOffset, TimeSpan refreshInterval, int maxBufferSize, IActorRef writeJournal, IActorRef queryPermitter)
: base(fromOffset, maxBufferSize, writeJournal, queryPermitter)
{
_tickCancelable = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(refreshInterval, refreshInterval, Self, AllEventsPublisher.Continue.Instance, Self);
}
Expand All @@ -164,7 +193,7 @@ protected override void PostStop()

protected override void ReceiveInitialRequest()
{
Replay();
RequestQueryPermit();
}

protected override void ReceiveIdleRequest()
Expand All @@ -176,6 +205,7 @@ protected override void ReceiveIdleRequest()

protected override void ReceiveRecoverySuccess(long highestOrderingNr)
{
QueryPermitter.Tell(ReturnQueryStart.Instance); // return token
Buffer.DeliverBuffer(TotalDemand);
if (Buffer.IsEmpty && CurrentOffset > ToOffset)
OnCompleteThenStop();
Expand All @@ -186,16 +216,16 @@ protected override void ReceiveRecoverySuccess(long highestOrderingNr)

internal sealed class CurrentAllEventsPublisher : AbstractAllEventsPublisher
{
public CurrentAllEventsPublisher(long fromOffset, int maxBufferSize, string writeJournalPluginId)
: base(fromOffset, maxBufferSize, writeJournalPluginId)
public CurrentAllEventsPublisher(long fromOffset, int maxBufferSize, IActorRef writeJournal, IActorRef queryPermitter)
: base(fromOffset, maxBufferSize, writeJournal, queryPermitter)
{ }

private long _toOffset = long.MaxValue;
protected override long ToOffset => _toOffset;

protected override void ReceiveInitialRequest()
{
Replay();
RequestQueryPermit();
}

protected override void ReceiveIdleRequest()
Expand All @@ -209,6 +239,7 @@ protected override void ReceiveIdleRequest()

protected override void ReceiveRecoverySuccess(long highestOrderingNr)
{
QueryPermitter.Tell(ReturnQueryStart.Instance); // return token to permitter
Buffer.DeliverBuffer(TotalDemand);

if (highestOrderingNr < ToOffset)
Expand Down

0 comments on commit 0bc2464

Please sign in to comment.