Skip to content

Commit

Permalink
Akka.Persistence: improve AsyncWriteJournal and PersistentActor p…
Browse files Browse the repository at this point in the history
…erformance (#6432)

* park

* update syntax - eliminate additional array allocation

* eliminated delegate allocations using ValueDelegate pattern

* added comment

* eliminate unnecessary array allocations

* made 1 API member static

* fix AsyncWriteProxy
  • Loading branch information
Aaronontheweb committed Feb 22, 2023
1 parent e58b041 commit 32b832b
Show file tree
Hide file tree
Showing 11 changed files with 58 additions and 69 deletions.
Expand Up @@ -128,25 +128,31 @@ protected override bool AroundReceive(Receive receive, object message)
{
if (_isInitialized)
{
if (!(message is InitTimeout))
if (message is not InitTimeout)
return base.AroundReceive(receive, message);
}
else if (message is SetStore msg)
else switch (message)
{
_store = msg.Store;
Stash.UnstashAll();
_isInitialized = true;
}
else if (message is InitTimeout)
{
_isInitTimedOut = true;
Stash.UnstashAll(); // will trigger appropriate failures
}
else if (_isInitTimedOut)
{
return base.AroundReceive(receive, message);
case SetStore msg:
_store = msg.Store;
Stash.UnstashAll();
_isInitialized = true;
break;
case InitTimeout:
_isInitTimedOut = true;
Stash.UnstashAll(); // will trigger appropriate failures
break;
default:
{
if (_isInitTimedOut)
{
return base.AroundReceive(receive, message);
}
else Stash.Stash();

break;
}
}
else Stash.Stash();
return true;
}

Expand All @@ -162,26 +168,25 @@ protected override bool AroundReceive(Receive receive, object message)
/// <returns>TBD</returns>
protected override Task<IImmutableList<Exception>> WriteMessagesAsync(IEnumerable<AtomicWrite> messages)
{
var trueMsgs = messages.ToArray();

if (_store == null)
return StoreNotInitialized<IImmutableList<Exception>>();

return _store.Ask<object>(sender => new WriteMessages(messages, sender, 1), Timeout, CancellationToken.None)
return _store.Ask<object>(sender => new WriteMessages(trueMsgs, sender, 1), Timeout, CancellationToken.None)
.ContinueWith(r =>
{
if (r.IsCanceled)
return (IImmutableList<Exception>)messages.Select(i => (Exception)new TimeoutException()).ToImmutableList();
return (IImmutableList<Exception>)trueMsgs.Select(i => (Exception)new TimeoutException()).ToImmutableList();
if (r.IsFaulted)
return messages.Select(i => (Exception)r.Exception).ToImmutableList();
return trueMsgs.Select(i => (Exception)r.Exception).ToImmutableList();
if (r.Result is WriteMessageSuccess wms)
return r.Result switch
{
return messages.Select(i => (Exception)null).ToImmutableList();
}
if (r.Result is WriteMessageFailure wmf)
{
return messages.Select(i => wmf.Cause).ToImmutableList();
}
return null;
WriteMessageSuccess wms => trueMsgs.Select(i => (Exception)null).ToImmutableList(),
WriteMessageFailure wmf => trueMsgs.Select(i => wmf.Cause).ToImmutableList(),
_ => null
};
}, TaskContinuationOptions.ExecuteSynchronously);
}

Expand Down
Expand Up @@ -841,7 +841,7 @@ namespace Akka.Persistence.Journal
protected virtual bool ReceivePluginInternal(object message) { }
protected bool ReceiveWriteJournal(object message) { }
public abstract System.Threading.Tasks.Task ReplayMessagesAsync(Akka.Actor.IActorContext context, string persistenceId, long fromSequenceNr, long toSequenceNr, long max, System.Action<Akka.Persistence.IPersistentRepresentation> recoveryCallback);
protected System.Exception TryUnwrapException(System.Exception e) { }
protected static System.Exception TryUnwrapException(System.Exception e) { }
protected abstract System.Threading.Tasks.Task<System.Collections.Immutable.IImmutableList<System.Exception>> WriteMessagesAsync(System.Collections.Generic.IEnumerable<Akka.Persistence.AtomicWrite> messages);
}
public abstract class AsyncWriteProxy : Akka.Persistence.Journal.AsyncWriteJournal, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics>
Expand Down
Expand Up @@ -841,7 +841,7 @@ namespace Akka.Persistence.Journal
protected virtual bool ReceivePluginInternal(object message) { }
protected bool ReceiveWriteJournal(object message) { }
public abstract System.Threading.Tasks.Task ReplayMessagesAsync(Akka.Actor.IActorContext context, string persistenceId, long fromSequenceNr, long toSequenceNr, long max, System.Action<Akka.Persistence.IPersistentRepresentation> recoveryCallback);
protected System.Exception TryUnwrapException(System.Exception e) { }
protected static System.Exception TryUnwrapException(System.Exception e) { }
protected abstract System.Threading.Tasks.Task<System.Collections.Immutable.IImmutableList<System.Exception>> WriteMessagesAsync(System.Collections.Generic.IEnumerable<Akka.Persistence.AtomicWrite> messages);
}
public abstract class AsyncWriteProxy : Akka.Persistence.Journal.AsyncWriteJournal, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics>
Expand Down
Expand Up @@ -841,7 +841,7 @@ namespace Akka.Persistence.Journal
protected virtual bool ReceivePluginInternal(object message) { }
protected bool ReceiveWriteJournal(object message) { }
public abstract System.Threading.Tasks.Task ReplayMessagesAsync(Akka.Actor.IActorContext context, string persistenceId, long fromSequenceNr, long toSequenceNr, long max, System.Action<Akka.Persistence.IPersistentRepresentation> recoveryCallback);
protected System.Exception TryUnwrapException(System.Exception e) { }
protected static System.Exception TryUnwrapException(System.Exception e) { }
protected abstract System.Threading.Tasks.Task<System.Collections.Immutable.IImmutableList<System.Exception>> WriteMessagesAsync(System.Collections.Generic.IEnumerable<Akka.Persistence.AtomicWrite> messages);
}
public abstract class AsyncWriteProxy : Akka.Persistence.Journal.AsyncWriteJournal, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics>
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Persistence.Tests/Journal/ChaosJournal.cs
Expand Up @@ -95,7 +95,7 @@ public override Task<long> ReadHighestSequenceNrAsync(string persistenceId, long

protected override Task<IImmutableList<Exception>> WriteMessagesAsync(IEnumerable<AtomicWrite> messages)
{
TaskCompletionSource<IImmutableList<Exception>> promise =
var promise =
new TaskCompletionSource<IImmutableList<Exception>>();
if (ChaosSupportExtensions.ShouldFail(_writeFailureRate))
promise.SetException(new WriteFailedException(messages));
Expand Down
7 changes: 2 additions & 5 deletions src/core/Akka.Persistence/Eventsourced.Lifecycle.cs
Expand Up @@ -11,16 +11,13 @@

namespace Akka.Persistence
{
/// <summary>
/// TBD
/// </summary>
public partial class Eventsourced
{
/// <summary>
/// TBD
/// Function used to filter out messages that should not be unstashed during recovery.
/// </summary>
public static readonly Func<Envelope, bool> UnstashFilterPredicate =
envelope => !(envelope.Message is WriteMessageSuccess || envelope.Message is ReplayedMessage);
envelope => envelope.Message is not (WriteMessageSuccess or ReplayedMessage);

private void StartRecovery(Recovery recovery)
{
Expand Down
11 changes: 4 additions & 7 deletions src/core/Akka.Persistence/Eventsourced.Recovery.cs
Expand Up @@ -30,10 +30,7 @@ public EventsourcedState(string name, Func<bool> isRecoveryRunning, StateReceive

public override string ToString() => Name;
}

/// <summary>
/// TBD
/// </summary>

public abstract partial class Eventsourced
{
/// <summary>
Expand Down Expand Up @@ -135,7 +132,7 @@ bool RecoveryBehavior(object message)
}
ReturnRecoveryPermit();
break;
case RecoveryTick tick when tick.Snapshot:
case RecoveryTick { Snapshot: true }:
try
{
OnRecoveryFailure(
Expand Down Expand Up @@ -235,7 +232,7 @@ private EventsourcedState Recovering(Receive recoveryBehavior, TimeSpan timeout)
}
ReturnRecoveryPermit();
break;
case RecoveryTick tick when !tick.Snapshot:
case RecoveryTick { Snapshot: false }:
if (!eventSeenInInterval)
{
timeoutCancelable.Cancel();
Expand All @@ -256,7 +253,7 @@ private EventsourcedState Recovering(Receive recoveryBehavior, TimeSpan timeout)
eventSeenInInterval = false;
}
break;
case RecoveryTick tick when tick.Snapshot:
case RecoveryTick { Snapshot: true }:
// snapshot tick, ignore
break;
default:
Expand Down
14 changes: 7 additions & 7 deletions src/core/Akka.Persistence/Eventsourced.cs
Expand Up @@ -70,7 +70,7 @@ public RecoveryTick(bool snapshot)
}

/// <summary>
/// TBD
/// The base class for all persistent actors.
/// </summary>
public abstract partial class Eventsourced : ActorBase, IPersistentIdentity, IPersistenceStash, IPersistenceRecovery
{
Expand All @@ -81,18 +81,18 @@ public abstract partial class Eventsourced : ActorBase, IPersistentIdentity, IPe
private readonly IStash _internalStash;
private IActorRef _snapshotStore;
private IActorRef _journal;
private ICollection<IPersistentEnvelope> _journalBatch = new List<IPersistentEnvelope>();
private List<IPersistentEnvelope> _journalBatch = new();
private bool _isWriteInProgress;
private long _sequenceNr;
private EventsourcedState _currentState;
private LinkedList<IPersistentEnvelope> _eventBatch = new LinkedList<IPersistentEnvelope>();
private LinkedList<IPersistentEnvelope> _eventBatch = new();
private bool _asyncTaskRunning = false;

/// Used instead of iterating `pendingInvocations` in order to check if safe to revert to processing commands
private long _pendingStashingPersistInvocations = 0L;

/// Holds user-supplied callbacks for persist/persistAsync calls
private readonly LinkedList<IPendingHandlerInvocation> _pendingInvocations = new LinkedList<IPendingHandlerInvocation>();
private readonly LinkedList<IPendingHandlerInvocation> _pendingInvocations = new();

/// <summary>
/// TBD
Expand Down Expand Up @@ -164,12 +164,12 @@ public IStash Stash
/// <summary>
/// TBD
/// </summary>
public IActorRef Journal => _journal ?? (_journal = Extension.JournalFor(JournalPluginId));
public IActorRef Journal => _journal ??= Extension.JournalFor(JournalPluginId);

/// <summary>
/// TBD
/// </summary>
public IActorRef SnapshotStore => _snapshotStore ?? (_snapshotStore = Extension.SnapshotStoreFor(SnapshotPluginId));
public IActorRef SnapshotStore => _snapshotStore ??= Extension.SnapshotStoreFor(SnapshotPluginId);

/// <summary>
/// Returns <see cref="PersistenceId"/>.
Expand Down Expand Up @@ -603,7 +603,7 @@ private void FlushJournalBatch()
{
if (!_isWriteInProgress && _journalBatch.Count > 0)
{
Journal.Tell(new WriteMessages(_journalBatch.ToArray(), Self, _instanceId));
Journal.Tell(new WriteMessages(_journalBatch, Self, _instanceId));
_journalBatch = new List<IPersistentEnvelope>(0);
_isWriteInProgress = true;
}
Expand Down
16 changes: 6 additions & 10 deletions src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs
Expand Up @@ -323,15 +323,11 @@ void CompleteHighSeqNo(long highSeqNo)
/// </summary>
/// <param name="e">TBD</param>
/// <returns>TBD</returns>
protected Exception TryUnwrapException(Exception e)
protected static Exception TryUnwrapException(Exception e)
{
if (e is AggregateException aggregateException)
{
aggregateException = aggregateException.Flatten();
if (aggregateException.InnerExceptions.Count == 1)
return aggregateException.InnerExceptions[0];
}
return e;
if (e is not AggregateException aggregateException) return e;
aggregateException = aggregateException.Flatten();
return aggregateException.InnerExceptions.Count == 1 ? aggregateException.InnerExceptions[0] : e;
}

private void HandleWriteMessages(WriteMessages message)
Expand All @@ -357,7 +353,7 @@ private async Task ExecuteBatch(WriteMessages message, int atomicWriteCount, IAc
{
try
{
var prepared = PreparePersistentBatch(message.Messages).ToArray();
var prepared = PreparePersistentBatch(message.Messages);
// try in case AsyncWriteMessages throws
try
{
Expand Down Expand Up @@ -389,7 +385,7 @@ private async Task ExecuteBatch(WriteMessages message, int atomicWriteCount, IAc

resequencer.Tell(new Desequenced(WriteMessagesSuccessful.Instance, resequencerCounter, writeMessage.PersistentActor, writeJournal), writeJournal);
Resequence((x, exception) => exception == null
? (object)new WriteMessageSuccess(x, writeMessage.ActorInstanceId)
? new WriteMessageSuccess(x, writeMessage.ActorInstanceId)
: new WriteMessageRejected(x, exception, writeMessage.ActorInstanceId), results, resequencerCounter, writeMessage, resequencer, writeJournal);
}

Expand Down
12 changes: 3 additions & 9 deletions src/core/Akka.Persistence/Journal/AsyncWriteProxy.cs
Expand Up @@ -63,10 +63,7 @@ public sealed class SetStore
/// </exception>
public SetStore(IActorRef store)
{
if (store == null)
throw new ArgumentNullException(nameof(store), "SetStore requires non-null reference to store actor");

Store = store;
Store = store ?? throw new ArgumentNullException(nameof(store), "SetStore requires non-null reference to store actor");
}

/// <summary>
Expand Down Expand Up @@ -263,10 +260,7 @@ public abstract class AsyncWriteProxy : AsyncWriteJournal, IWithUnboundedStash
private bool _isInitialized;
private bool _isInitTimedOut;
private IActorRef _store;

/// <summary>
/// TBD
/// </summary>

protected AsyncWriteProxy()
{
_isInitialized = false;
Expand Down Expand Up @@ -298,7 +292,7 @@ protected internal override bool AroundReceive(Receive receive, object message)
{
if (_isInitialized)
{
if (!(message is InitTimeout))
if (message is not InitTimeout)
return base.AroundReceive(receive, message);
}
else switch (message)
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Persistence/Journal/WriteJournal.cs
Expand Up @@ -38,7 +38,7 @@ protected IEnumerable<AtomicWrite> PreparePersistentBatch(IEnumerable<IPersisten
{
foreach (var resequenceable in resequenceables)
{
if (!(resequenceable is AtomicWrite)) continue;
if (resequenceable is not AtomicWrite) continue;

var result = ImmutableList.CreateBuilder<IPersistentRepresentation>();

Expand Down

0 comments on commit 32b832b

Please sign in to comment.