diff --git a/src/core/Akka.Persistence.TCK.Tests/LocalSnapshotStoreSpec.cs b/src/core/Akka.Persistence.TCK.Tests/LocalSnapshotStoreSpec.cs index b728d354816..4c5866915f7 100644 --- a/src/core/Akka.Persistence.TCK.Tests/LocalSnapshotStoreSpec.cs +++ b/src/core/Akka.Persistence.TCK.Tests/LocalSnapshotStoreSpec.cs @@ -46,10 +46,15 @@ public void LocalSnapshotStore_can_snapshot_actors_with_PersistenceId_containing ExpectMsg(); SnapshotStore.Tell(new LoadSnapshot(pid, SnapshotSelectionCriteria.Latest, long.MaxValue), TestActor); - ExpectMsg(res => - res.Snapshot.Snapshot.Equals("sample data") - && res.Snapshot.Metadata.PersistenceId == pid - && res.Snapshot.Metadata.SequenceNr == 1); + ExpectMsg(IsMessage); + bool IsMessage(LoadSnapshotResult res) + { + var result = res.Snapshot.Snapshot.Equals("sample data") + && res.Snapshot.Metadata.PersistenceId == pid + && res.Snapshot.Metadata.SequenceNr == 1; + + return result; + } } } } diff --git a/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs b/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs index ca404a5a0a4..f05ea09b2aa 100644 --- a/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs +++ b/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs @@ -5,6 +5,7 @@ // //----------------------------------------------------------------------- +using Akka.Configuration; using Akka.Event; using Xunit.Abstractions; @@ -72,23 +73,42 @@ protected override void OnRecover(object message) return; } } + + protected override void PostStop() + { + _log.Info("Shutting down"); + } + + protected override void PreStart() + { + _log.Info("Starting up"); + } } public class CounterActorTests : PersistenceTestKit { - public CounterActorTests(ITestOutputHelper output) : base(output:output){} + // create a Config that enables debug mode on the TestJournal + private static readonly Config Config = + ConfigurationFactory.ParseString(""" + akka.persistence.journal.test.debug = on + akka.persistence.snapshot-store.test.debug = on + """); + + public CounterActorTests(ITestOutputHelper output) : base(Config, output:output){} [Fact] public Task CounterActor_internal_state_will_be_lost_if_underlying_persistence_store_is_not_available() { return WithJournalWrite(write => write.Fail(), async () => { - var counterProps = Props.Create(() => new CounterActor("test")); + var counterProps = Props.Create(() => new CounterActor("test")) + .WithDispatcher("akka.actor.internal-dispatcher"); var actor = ActorOf(counterProps, "counter"); - Watch(actor); + Sys.Log.Info("Messaging actor"); + await WatchAsync(actor); actor.Tell("inc", TestActor); - await ExpectMsgAsync(TimeSpan.FromSeconds(3)); + await ExpectTerminatedAsync(actor); // need to restart actor actor = ActorOf(counterProps, "counter1"); diff --git a/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs b/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs index d9cb3099aa2..4230b5cc3ab 100644 --- a/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs +++ b/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs @@ -11,7 +11,9 @@ using System.Text; using System.Threading.Tasks; using Akka.Actor; +using Akka.Configuration; using Akka.Event; +using FluentAssertions.Extensions; using Xunit; using Xunit.Abstractions; @@ -22,16 +24,26 @@ namespace Akka.Persistence.TestKit.Tests /// public class Bug4762FixSpec : PersistenceTestKit { - public Bug4762FixSpec(ITestOutputHelper outputHelper) : base(output: outputHelper) + // create a Config that enables debug mode on the TestJournal + private static readonly Config Config = + ConfigurationFactory.ParseString(""" + akka.loglevel = DEBUG + akka.persistence.journal.test.debug = on + akka.persistence.journal.test.replay-filter.debug = on + akka.persistence.snapshot-store.test.debug = on + """); + + public Bug4762FixSpec(ITestOutputHelper outputHelper) : base(Config, output: outputHelper) { - } - + private class WriteMessage - { } + { + } private class TestEvent - { } + { + } private class TestActor2 : UntypedPersistentActor { @@ -48,17 +60,14 @@ public TestActor2(IActorRef probe) protected override void OnCommand(object message) { _log.Info("Received command {0}", message); - + switch (message) { case WriteMessage _: var event1 = new TestEvent(); var event2 = new TestEvent(); var events = new List { event1, event2 }; - PersistAll(events, _ => - { - _probe.Tell(Done.Instance); - }); + PersistAll(events, _ => { _probe.Tell(Done.Instance); }); break; default: @@ -66,6 +75,12 @@ protected override void OnCommand(object message) } } + protected override void PreStart() + { + _log.Info("Starting up and beginning recovery"); + base.PreStart(); + } + protected override void OnRecover(object message) { _log.Info("Received recover {0}", message); @@ -79,17 +94,18 @@ public Task TestJournal_PersistAll_should_only_count_each_event_exceptions_once( var probe = CreateTestProbe(); return WithJournalWrite(write => write.Pass(), async () => { - var actor = ActorOf(() => new TestActor2(probe)); - Watch(actor); + var actor = Sys.ActorOf( + Props.Create(() => new TestActor2(probe)) + .WithDispatcher("akka.actor.internal-dispatcher"), "test-actor"); var command = new WriteMessage(); actor.Tell(command, actor); - await probe.ExpectMsgAsync(); + await probe.ExpectMsgAsync(10.Seconds()); await probe.ExpectMsgAsync(); await probe.ExpectMsgAsync(); await probe.ExpectNoMsgAsync(3000); }); } } -} +} \ No newline at end of file diff --git a/src/core/Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs b/src/core/Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs index 406dab3d5d6..f35070446b2 100644 --- a/src/core/Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs +++ b/src/core/Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs @@ -7,6 +7,7 @@ using Akka.Actor.Setup; using Akka.Configuration; +using Akka.Event; namespace Akka.Persistence.TestKit { @@ -111,6 +112,11 @@ public async Task WithJournalRecovery(Func behavi await behaviorSelector(Journal.OnRecovery); await execution(); } + catch (Exception ex) + { + Log.Error(ex, "Error during execution of WithJournalRecovery"); + throw; + } finally { await Journal.OnRecovery.Pass(); @@ -136,6 +142,11 @@ public async Task WithJournalWrite(Func behaviorSele await behaviorSelector(Journal.OnWrite); await execution(); } + catch (Exception ex) + { + Log.Error(ex, "Error during execution of WithJournalWrite"); + throw; + } finally { await Journal.OnWrite.Pass(); @@ -157,7 +168,7 @@ public Task WithJournalRecovery(Func behaviorSele if (execution == null) throw new ArgumentNullException(nameof(execution)); execution(); - return Task.FromResult(new object()); + return Task.CompletedTask; }); /// @@ -175,7 +186,7 @@ public Task WithJournalWrite(Func behaviorSelector, if (execution == null) throw new ArgumentNullException(nameof(execution)); execution(); - return Task.FromResult(new object()); + return Task.CompletedTask; }); /// @@ -197,6 +208,11 @@ public async Task WithSnapshotSave(Func behavio await behaviorSelector(Snapshots.OnSave); await execution(); } + catch (Exception ex) + { + Log.Error(ex, "Error during execution of WithSnapshotSave"); + throw; + } finally { await Snapshots.OnSave.Pass(); @@ -222,6 +238,11 @@ public async Task WithSnapshotLoad(Func behavio await behaviorSelector(Snapshots.OnLoad); await execution(); } + catch (Exception ex) + { + Log.Error(ex, "Error during execution of WithSnapshotLoad"); + throw; + } finally { await Snapshots.OnLoad.Pass(); @@ -247,6 +268,11 @@ public async Task WithSnapshotDelete(Func beh await behaviorSelector(Snapshots.OnDelete); await execution(); } + catch (Exception ex) + { + Log.Error(ex, "Error during execution of WithSnapshotDelete"); + throw; + } finally { await Snapshots.OnDelete.Pass(); @@ -268,7 +294,7 @@ public Task WithSnapshotSave(Func behaviorSelec if (execution == null) throw new ArgumentNullException(nameof(execution)); execution(); - return Task.FromResult(true); + return Task.CompletedTask; }); /// @@ -286,7 +312,7 @@ public Task WithSnapshotLoad(Func behaviorSelec if (execution == null) throw new ArgumentNullException(nameof(execution)); execution(); - return Task.FromResult(true); + return Task.CompletedTask; }); /// @@ -304,7 +330,7 @@ public Task WithSnapshotDelete(Func behaviorS if (execution == null) throw new ArgumentNullException(nameof(execution)); execution(); - return Task.FromResult(true); + return Task.CompletedTask; }); /// @@ -348,7 +374,7 @@ private static Config GetConfig(Config customConfig) { var defaultConfig = ConfigurationFactory.FromResource("Akka.Persistence.TestKit.config.conf"); if (customConfig == Config.Empty) return defaultConfig; - else return defaultConfig.SafeWithFallback(customConfig); + else return customConfig.WithFallback(defaultConfig); } } } diff --git a/src/core/Akka.Persistence.TestKit/Journal/JournalInterceptors.cs b/src/core/Akka.Persistence.TestKit/Journal/JournalInterceptors.cs index 22f3eadf865..41ff1a3e3b0 100644 --- a/src/core/Akka.Persistence.TestKit/Journal/JournalInterceptors.cs +++ b/src/core/Akka.Persistence.TestKit/Journal/JournalInterceptors.cs @@ -16,7 +16,7 @@ internal class Noop : IJournalInterceptor { public static readonly IJournalInterceptor Instance = new Noop(); - public Task InterceptAsync(IPersistentRepresentation message) => Task.FromResult(true); + public Task InterceptAsync(IPersistentRepresentation message) => Task.CompletedTask; } internal class Failure : IJournalInterceptor diff --git a/src/core/Akka.Persistence.TestKit/Journal/JournalRecoveryBehaviorSetter.cs b/src/core/Akka.Persistence.TestKit/Journal/JournalRecoveryBehaviorSetter.cs index b9cc55c534c..ffc76445643 100644 --- a/src/core/Akka.Persistence.TestKit/Journal/JournalRecoveryBehaviorSetter.cs +++ b/src/core/Akka.Persistence.TestKit/Journal/JournalRecoveryBehaviorSetter.cs @@ -26,7 +26,7 @@ internal JournalRecoveryBehaviorSetter(IActorRef journal) public Task SetInterceptorAsync(IJournalInterceptor interceptor) => _journal.Ask( new TestJournal.UseRecoveryInterceptor(interceptor), - TimeSpan.FromSeconds(3) + TimeSpan.FromSeconds(0.5) ); } } diff --git a/src/core/Akka.Persistence.TestKit/Journal/JournalWriteBehaviorSetter.cs b/src/core/Akka.Persistence.TestKit/Journal/JournalWriteBehaviorSetter.cs index 7e554be403a..ed9f2e67a60 100644 --- a/src/core/Akka.Persistence.TestKit/Journal/JournalWriteBehaviorSetter.cs +++ b/src/core/Akka.Persistence.TestKit/Journal/JournalWriteBehaviorSetter.cs @@ -26,7 +26,7 @@ internal JournalWriteBehaviorSetter(IActorRef journal) public Task SetInterceptorAsync(IJournalInterceptor interceptor) => _journal.Ask( new TestJournal.UseWriteInterceptor(interceptor), - TimeSpan.FromSeconds(3) + TimeSpan.FromSeconds(0.5) ); } } diff --git a/src/core/Akka.Persistence.TestKit/Journal/TestJournal.cs b/src/core/Akka.Persistence.TestKit/Journal/TestJournal.cs index 56aa3f6b528..4333efeb7ad 100644 --- a/src/core/Akka.Persistence.TestKit/Journal/TestJournal.cs +++ b/src/core/Akka.Persistence.TestKit/Journal/TestJournal.cs @@ -5,34 +5,47 @@ // //----------------------------------------------------------------------- +using Akka.Configuration; +using Akka.Event; +using Akka.Actor; +using Akka.Persistence.Journal; +using System; +using System.Collections.Generic; +using System.Collections.Immutable; +using System.Threading.Tasks; + namespace Akka.Persistence.TestKit { - using Akka.Actor; - using Akka.Persistence; - using Akka.Persistence.Journal; - using System; - using System.Collections.Generic; - using System.Collections.Immutable; - using System.Threading.Tasks; - /// /// In-memory persistence journal implementation which behavior could be controlled by interceptors. /// public sealed class TestJournal : MemoryJournal { + private readonly ILoggingAdapter _log = Context.GetLogger(); private IJournalInterceptor _writeInterceptor = JournalInterceptors.Noop.Instance; private IJournalInterceptor _recoveryInterceptor = JournalInterceptors.Noop.Instance; + public TestJournal(Config journalConfig) + { + DebugEnabled = journalConfig.GetBoolean("debug", false); + } + + private bool DebugEnabled { get; } + protected override bool ReceivePluginInternal(object message) { switch (message) { case UseWriteInterceptor use: + if(DebugEnabled) + _log.Info("Using write interceptor {0}", use.Interceptor.GetType().Name); _writeInterceptor = use.Interceptor; Sender.Tell(Ack.Instance); return true; case UseRecoveryInterceptor use: + if(DebugEnabled) + _log.Info("Using recovery interceptor {0}", use.Interceptor.GetType().Name); _recoveryInterceptor = use.Interceptor; Sender.Tell(Ack.Instance); return true; @@ -51,7 +64,12 @@ protected override async Task> WriteMessagesAsync(IEnu { foreach (var p in (IEnumerable)w.Payload) { + if(DebugEnabled) + _log.Info("Beginning write intercept of message {0} with interceptor {1}", p, _writeInterceptor.GetType().Name); await _writeInterceptor.InterceptAsync(p); + + if(DebugEnabled) + _log.Info("Completed write intercept of message {0} with interceptor {1}", p, _writeInterceptor.GetType().Name); Add(p); } } @@ -75,6 +93,10 @@ protected override async Task> WriteMessagesAsync(IEnu public override async Task ReplayMessagesAsync(IActorContext context, string persistenceId, long fromSequenceNr, long toSequenceNr, long max, Action recoveryCallback) { var highest = HighestSequenceNr(persistenceId); + + if(DebugEnabled) + _log.Info("Replaying messages from {0} to {1} for persistenceId {2}", fromSequenceNr, toSequenceNr, persistenceId); + if (highest != 0L && max != 0L) { var messages = Read(persistenceId, fromSequenceNr, Math.Min(toSequenceNr, highest), max); @@ -82,7 +104,12 @@ public override async Task ReplayMessagesAsync(IActorContext context, string per { try { + if(DebugEnabled) + _log.Info("Beginning recovery intercept of message {0} with interceptor {1}", p, _recoveryInterceptor.GetType().Name); await _recoveryInterceptor.InterceptAsync(p); + + if(DebugEnabled) + _log.Info("Completed recovery intercept of message {0} with interceptor {1}", p, _recoveryInterceptor.GetType().Name); recoveryCallback(p); } catch (TestJournalFailureException) @@ -92,6 +119,9 @@ public override async Task ReplayMessagesAsync(IActorContext context, string per throw; } } + + if(DebugEnabled) + _log.Info("Completed replaying messages from {0} to {1} for persistenceId {2}", fromSequenceNr, toSequenceNr, persistenceId); } } diff --git a/src/core/Akka.Persistence.TestKit/SnapshotStore/SnapshotStoreSaveBehaviorSetter.cs b/src/core/Akka.Persistence.TestKit/SnapshotStore/SnapshotStoreSaveBehaviorSetter.cs index 67ccccb0d49..84e7fc8ef77 100644 --- a/src/core/Akka.Persistence.TestKit/SnapshotStore/SnapshotStoreSaveBehaviorSetter.cs +++ b/src/core/Akka.Persistence.TestKit/SnapshotStore/SnapshotStoreSaveBehaviorSetter.cs @@ -18,7 +18,7 @@ internal class SnapshotStoreSaveBehaviorSetter : ISnapshotStoreBehaviorSetter { internal SnapshotStoreSaveBehaviorSetter(IActorRef snapshots) { - this._snapshots = snapshots; + _snapshots = snapshots; } private readonly IActorRef _snapshots; diff --git a/src/core/Akka.Persistence.TestKit/SnapshotStore/TestSnapshotStore.cs b/src/core/Akka.Persistence.TestKit/SnapshotStore/TestSnapshotStore.cs index 838f06ce40e..a576a61f2e1 100644 --- a/src/core/Akka.Persistence.TestKit/SnapshotStore/TestSnapshotStore.cs +++ b/src/core/Akka.Persistence.TestKit/SnapshotStore/TestSnapshotStore.cs @@ -5,6 +5,9 @@ // //----------------------------------------------------------------------- +using Akka.Configuration; +using Akka.Event; + namespace Akka.Persistence.TestKit { using System.Threading.Tasks; @@ -19,22 +22,39 @@ public class TestSnapshotStore : MemorySnapshotStore private ISnapshotStoreInterceptor _saveInterceptor = SnapshotStoreInterceptors.Noop.Instance; private ISnapshotStoreInterceptor _loadInterceptor = SnapshotStoreInterceptors.Noop.Instance; private ISnapshotStoreInterceptor _deleteInterceptor = SnapshotStoreInterceptors.Noop.Instance; + private readonly ILoggingAdapter _log = Context.GetLogger(); + + public TestSnapshotStore(Config snapshotStoreConfig) + { + DebugEnabled = snapshotStoreConfig.GetBoolean("debug", false); + } + + private bool DebugEnabled { get; } protected override bool ReceivePluginInternal(object message) { + if(DebugEnabled) + _log.Info("Received plugin internal message {0}", message); + switch (message) { case UseSaveInterceptor use: + if(DebugEnabled) + _log.Info("Using save interceptor {0}", use.Interceptor.GetType().Name); _saveInterceptor = use.Interceptor; Sender.Tell(Ack.Instance); return true; case UseLoadInterceptor use: + if(DebugEnabled) + _log.Info("Using load interceptor {0}", use.Interceptor.GetType().Name); _loadInterceptor = use.Interceptor; Sender.Tell(Ack.Instance); return true; case UseDeleteInterceptor use: + if(DebugEnabled) + _log.Info("Using delete interceptor {0}", use.Interceptor.GetType().Name); _deleteInterceptor = use.Interceptor; Sender.Tell(Ack.Instance); return true; @@ -46,29 +66,55 @@ protected override bool ReceivePluginInternal(object message) protected override async Task SaveAsync(SnapshotMetadata metadata, object snapshot) { + if(DebugEnabled) + _log.Info("Starting to intercept snapshot {0} saving using interceptor {1}", metadata, _saveInterceptor.GetType().Name); + await _saveInterceptor.InterceptAsync(metadata.PersistenceId, ToSelectionCriteria(metadata)); + + if(DebugEnabled) + _log.Info("Completed intercept snapshot {0} saving using interceptor {1}", metadata, _saveInterceptor.GetType().Name); await base.SaveAsync(metadata, snapshot); } protected override async Task LoadAsync(string persistenceId, SnapshotSelectionCriteria criteria) { + if(DebugEnabled) + _log.Info("Starting to intercept snapshot {0} loading using interceptor {1}", persistenceId, _loadInterceptor.GetType().Name); await _loadInterceptor.InterceptAsync(persistenceId, criteria); + + if(DebugEnabled) + _log.Info("Completed intercept snapshot {0} loading using interceptor {1}", persistenceId, _loadInterceptor.GetType().Name); + return await base.LoadAsync(persistenceId, criteria); } protected override async Task DeleteAsync(SnapshotMetadata metadata) { + if(DebugEnabled) + _log.Info("Starting to intercept snapshot {0} deletion using interceptor {1}", metadata, _deleteInterceptor.GetType().Name); + await _deleteInterceptor.InterceptAsync(metadata.PersistenceId, ToSelectionCriteria(metadata)); + + if(DebugEnabled) + _log.Info("Completed intercept snapshot {0} deletion using interceptor {1}", metadata, _deleteInterceptor.GetType().Name); + await base.DeleteAsync(metadata); } protected override async Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria) { + if(DebugEnabled) + _log.Info("Starting to intercept snapshot {0} deletion using interceptor {1}", persistenceId, _deleteInterceptor.GetType().Name); + await _deleteInterceptor.InterceptAsync(persistenceId, criteria); + + if(DebugEnabled) + _log.Info("Completed intercept snapshot {0} deletion using interceptor {1}", persistenceId, _deleteInterceptor.GetType().Name); + await base.DeleteAsync(persistenceId, criteria); } - static SnapshotSelectionCriteria ToSelectionCriteria(SnapshotMetadata metadata) + private static SnapshotSelectionCriteria ToSelectionCriteria(SnapshotMetadata metadata) => new(metadata.SequenceNr, metadata.Timestamp, metadata.SequenceNr, metadata.Timestamp); /// diff --git a/src/core/Akka.Persistence.TestKit/config.conf b/src/core/Akka.Persistence.TestKit/config.conf index dda3d635272..f03792b2baa 100644 --- a/src/core/Akka.Persistence.TestKit/config.conf +++ b/src/core/Akka.Persistence.TestKit/config.conf @@ -7,6 +7,9 @@ akka { test { class = "Akka.Persistence.TestKit.TestJournal, Akka.Persistence.TestKit" plugin-dispatcher = "akka.actor.default-dispatcher" + + # enables debug mode, which adds verbose logging to each of the TestJournal stages + debug = false } } @@ -16,7 +19,10 @@ akka { test { class = "Akka.Persistence.TestKit.TestSnapshotStore, Akka.Persistence.TestKit" - plugin-dispatcher = "akka.actor.default-dispatcher" + plugin-dispatcher = "akka.actor.default-dispatcher" + + # enables debug mode, which adds verbose logging to each of the TestSnapshotStore stages + debug = false } } } diff --git a/src/core/Akka.Persistence/Eventsourced.Lifecycle.cs b/src/core/Akka.Persistence/Eventsourced.Lifecycle.cs index fa1acd2c39a..41715601a35 100644 --- a/src/core/Akka.Persistence/Eventsourced.Lifecycle.cs +++ b/src/core/Akka.Persistence/Eventsourced.Lifecycle.cs @@ -21,7 +21,9 @@ public partial class Eventsourced private void StartRecovery(Recovery recovery) { + Log.Info("Recovery granted"); ChangeState(RecoveryStarted(recovery.ReplayMax)); + Log.Info("Telling SnapshotStore to load snapshot"); LoadSnapshot(SnapshotterId, recovery.FromSnapshot, recovery.ToSequenceNr); } diff --git a/src/core/Akka.Persistence/Eventsourced.cs b/src/core/Akka.Persistence/Eventsourced.cs index a0befa7741b..d855747ffde 100644 --- a/src/core/Akka.Persistence/Eventsourced.cs +++ b/src/core/Akka.Persistence/Eventsourced.cs @@ -618,6 +618,7 @@ private void StashInternally(object currentMessage) { try { + Log.Info($"Stashing [{currentMessage}]"); _internalStash.Stash(); } catch (StashOverflowException e) diff --git a/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs b/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs index a30590147b5..d4103bfee61 100644 --- a/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs +++ b/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs @@ -54,9 +54,9 @@ protected AsyncWriteJournal() var config = extension.ConfigFor(Self); _breaker = new CircuitBreaker( Context.System.Scheduler, - config.GetInt("circuit-breaker.max-failures", 0), - config.GetTimeSpan("circuit-breaker.call-timeout", null), - config.GetTimeSpan("circuit-breaker.reset-timeout", null)); + config.GetInt("circuit-breaker.max-failures", 10), + config.GetTimeSpan("circuit-breaker.call-timeout", TimeSpan.FromSeconds(10)), + config.GetTimeSpan("circuit-breaker.reset-timeout", TimeSpan.FromSeconds(30))); var replayFilterMode = config.GetString("replay-filter.mode", "").ToLowerInvariant(); switch (replayFilterMode) @@ -77,8 +77,8 @@ protected AsyncWriteJournal() throw new ConfigurationException($"Invalid replay-filter.mode [{replayFilterMode}], supported values [off, repair-by-discard-old, fail, warn]"); } _isReplayFilterEnabled = _replayFilterMode != ReplayFilterMode.Disabled; - _replayFilterWindowSize = config.GetInt("replay-filter.window-size", 0); - _replayFilterMaxOldWriters = config.GetInt("replay-filter.max-old-writers", 0); + _replayFilterWindowSize = config.GetInt("replay-filter.window-size", 100); + _replayFilterMaxOldWriters = config.GetInt("replay-filter.max-old-writers", 10); _replayDebugEnabled = config.GetBoolean("replay-filter.debug", false); _resequencer = Context.System.ActorOf(Props.Create(() => new Resequencer())); @@ -251,6 +251,7 @@ private void HandleReplayMessages(ReplayMessages message) ? Context.ActorOf(ReplayFilter.Props(message.PersistentActor, _replayFilterMode, _replayFilterWindowSize, _replayFilterMaxOldWriters, _replayDebugEnabled)) : message.PersistentActor; + var self = Context.Self; var context = Context; var eventStream = Context.System.EventStream; @@ -259,20 +260,11 @@ private void HandleReplayMessages(ReplayMessages message) async Task ExecuteHighestSequenceNr() { - void CompleteHighSeqNo(long highSeqNo) - { - replyTo.Tell(new RecoverySuccess(highSeqNo)); - - if (CanPublish) - { - eventStream.Publish(message); - } - } - try { var highSequenceNr = await _breaker.WithCircuitBreaker((message, readHighestSequenceNrFrom, awj: this), state => - state.awj.ReadHighestSequenceNrAsync(state.message.PersistenceId, state.readHighestSequenceNrFrom)); + state.awj.ReadHighestSequenceNrAsync(state.message.PersistenceId, state.readHighestSequenceNrFrom)) + .ConfigureAwait(false); var toSequenceNr = Math.Min(message.ToSequenceNr, highSequenceNr); if (toSequenceNr <= 0L || message.FromSequenceNr > toSequenceNr) { @@ -293,7 +285,7 @@ await ReplayMessagesAsync(context, message.PersistenceId, message.FromSequenceNr replyTo.Tell(new ReplayedMessage(adaptedRepresentation), ActorRefs.NoSender); } } - }); + }).ConfigureAwait(false); CompleteHighSeqNo(highSequenceNr); } @@ -303,11 +295,23 @@ await ReplayMessagesAsync(context, message.PersistenceId, message.FromSequenceNr // operation failed because a CancellationToken was invoked // wrap the original exception and throw it, with some additional callsite context var newEx = new OperationCanceledException("ReplayMessagesAsync canceled, possibly due to timing out.", cx); - replyTo.Tell(new ReplayMessagesFailure(newEx)); + replyTo.Tell(new ReplayMessagesFailure(newEx), self); } catch (Exception ex) { - replyTo.Tell(new ReplayMessagesFailure(TryUnwrapException(ex))); + replyTo.Tell(new ReplayMessagesFailure(TryUnwrapException(ex)), self); + } + + return; + + void CompleteHighSeqNo(long highSeqNo) + { + replyTo.Tell(new RecoverySuccess(highSeqNo), self); + + if (CanPublish) + { + eventStream.Publish(message); + } } } diff --git a/src/core/Akka.Persistence/Journal/MemoryJournal.cs b/src/core/Akka.Persistence/Journal/MemoryJournal.cs index 25a861db3ac..89cc7756835 100644 --- a/src/core/Akka.Persistence/Journal/MemoryJournal.cs +++ b/src/core/Akka.Persistence/Journal/MemoryJournal.cs @@ -91,7 +91,7 @@ protected override Task> WriteMessagesAsync(IEnumerabl var persistentRepresentation = p.WithTimestamp(DateTime.UtcNow.Ticks); Add(persistentRepresentation); _allMessages.AddLast(persistentRepresentation); - if (!(p.Payload is Tagged tagged)) continue; + if (p.Payload is not Tagged tagged) continue; foreach (var tag in tagged.Tags) { @@ -118,7 +118,7 @@ protected override Task> WriteMessagesAsync(IEnumerabl /// TBD public override Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr) { - return Task.FromResult(Math.Max(HighestSequenceNr(persistenceId), _meta.TryGetValue(persistenceId, out long metaSeqNr) ? metaSeqNr : 0L)); + return Task.FromResult(Math.Max(HighestSequenceNr(persistenceId), _meta.TryGetValue(persistenceId, out var metaSeqNr) ? metaSeqNr : 0L)); } /// @@ -137,7 +137,7 @@ public override Task ReplayMessagesAsync(IActorContext context, string persisten var highest = HighestSequenceNr(persistenceId); if (highest != 0L && max != 0L) Read(persistenceId, fromSequenceNr, Math.Min(toSequenceNr, highest), max).ForEach(recoveryCallback); - return Task.FromResult(new object()); + return Task.CompletedTask; } /// @@ -154,7 +154,7 @@ protected override Task DeleteMessagesToAsync(string persistenceId, long toSeque _meta.AddOrUpdate(persistenceId, highestSeqNr, (_, _) => highestSeqNr); for (var snr = 1L; snr <= toSeqNr; snr++) Delete(persistenceId, snr); - return Task.FromResult(new object()); + return Task.CompletedTask; } protected override bool ReceivePluginInternal(object message) @@ -197,7 +197,7 @@ private Task ReplayTaggedMessagesAsync(ReplayTaggedMessages replay) if (!_tagsToMessagesMapping.ContainsKey(replay.Tag)) return Task.FromResult(0); - int index = 0; + var index = 0; foreach (var persistence in _tagsToMessagesMapping[replay.Tag] .Skip(replay.FromOffset) .Take(replay.ToOffset)) @@ -212,7 +212,7 @@ private Task ReplayTaggedMessagesAsync(ReplayTaggedMessages replay) private Task ReplayAllEventsAsync(ReplayAllEvents replay) { - int index = 0; + var index = 0; var replayed = _allMessages .Skip(replay.FromOffset) .Take(replay.ToOffset - replay.FromOffset) @@ -557,7 +557,7 @@ public Messages Add(IPersistentRepresentation persistent) /// TBD public Messages Update(string pid, long seqNr, Func updater) { - if (Messages.TryGetValue(pid, out LinkedList persistents)) + if (Messages.TryGetValue(pid, out var persistents)) { var node = persistents.First; while (node != null) @@ -580,7 +580,7 @@ public Messages Update(string pid, long seqNr, FuncTBD public Messages Delete(string pid, long seqNr) { - if (Messages.TryGetValue(pid, out LinkedList persistents)) + if (Messages.TryGetValue(pid, out var persistents)) { var node = persistents.First; while (node != null) @@ -605,7 +605,7 @@ public Messages Delete(string pid, long seqNr) /// TBD public IEnumerable Read(string pid, long fromSeqNr, long toSeqNr, long max) { - if (Messages.TryGetValue(pid, out LinkedList persistents)) + if (Messages.TryGetValue(pid, out var persistents)) { return persistents .Where(x => x.SequenceNr >= fromSeqNr && x.SequenceNr <= toSeqNr) @@ -622,13 +622,10 @@ public IEnumerable Read(string pid, long fromSeqNr, l /// TBD public long HighestSequenceNr(string pid) { - if (Messages.TryGetValue(pid, out LinkedList persistents)) - { - var last = persistents.LastOrDefault(); - return last?.SequenceNr ?? 0L; - } + if (!Messages.TryGetValue(pid, out var persistents)) return 0L; + var last = persistents.LastOrDefault(); + return last?.SequenceNr ?? 0L; - return 0L; } #endregion diff --git a/src/core/Akka.Persistence/Journal/ReplayFilter.cs b/src/core/Akka.Persistence/Journal/ReplayFilter.cs index e6fda67e051..476a1521544 100644 --- a/src/core/Akka.Persistence/Journal/ReplayFilter.cs +++ b/src/core/Akka.Persistence/Journal/ReplayFilter.cs @@ -131,28 +131,63 @@ public static Props Props(IActorRef persistentActor, ReplayFilterMode mode, int /// TBD protected override bool Receive(object message) { - if (message is ReplayedMessage value) + switch (message) { - if (DebugEnabled && _log.IsDebugEnabled) - _log.Debug($"Replay: {value.Persistent}"); + case ReplayedMessage value: + if (DebugEnabled && _log.IsDebugEnabled) + _log.Debug($"Replay: {value.Persistent}"); - try - { - if (_buffer.Count == WindowSize) + try { - var msg = _buffer.First; - _buffer.RemoveFirst(); - PersistentActor.Tell(msg.Value, ActorRefs.NoSender); - } + if (_buffer.Count == WindowSize) + { + var msg = _buffer.First; + _buffer.RemoveFirst(); + PersistentActor.Tell(msg.Value, ActorRefs.NoSender); + } - if (value.Persistent.WriterGuid.Equals(_writerUuid)) - { - // from same writer - if (value.Persistent.SequenceNr < _sequenceNr) + if (value.Persistent.WriterGuid.Equals(_writerUuid)) { - var errMsg = $@"Invalid replayed event [sequenceNr={value.Persistent.SequenceNr}, writerUUID={value.Persistent.WriterGuid}] as - the sequenceNr should be equal to or greater than already-processed event [sequenceNr={_sequenceNr}, writerUUID={_writerUuid}] from the same writer, for the same persistenceId [{value.Persistent.PersistenceId}]. - Perhaps, events were journaled out of sequence, or duplicate PersistentId for different entities?"; + // from same writer + if (value.Persistent.SequenceNr < _sequenceNr) + { + var errMsg = + $"Invalid replayed event [sequenceNr={value.Persistent.SequenceNr}, writerUUID={value.Persistent.WriterGuid}] as " + + $"the sequenceNr should be equal to or greater than already-processed event [sequenceNr={_sequenceNr}, " + + $"writerUUID={_writerUuid}] from the same writer, for the same persistenceId [{value.Persistent.PersistenceId}]. " + + "Perhaps, events were journaled out of sequence, or duplicate PersistentId for different entities?"; + LogIssue(errMsg); + switch (Mode) + { + case ReplayFilterMode.RepairByDiscardOld: + //discard + break; + case ReplayFilterMode.Fail: + throw new IllegalStateException(errMsg); + case ReplayFilterMode.Warn: + _buffer.AddLast(value); + break; + case ReplayFilterMode.Disabled: + throw new ArgumentException("Mode must not be Disabled"); + default: + throw new ArgumentException($"Unknown {nameof(ReplayFilterMode)} value: {Mode}"); + } + } + else + { + // note that it is alright with == _sequenceNr, since such may be emitted by EventSeq + _buffer.AddLast(value); + _sequenceNr = value.Persistent.SequenceNr; + } + } + else if (_oldWriters.Contains(value.Persistent.WriterGuid)) + { + // from old writer + var errMsg = + $"Invalid replayed event [sequenceNr={value.Persistent.SequenceNr}, writerUUID={value.Persistent.WriterGuid}]. " + + $"There was already a newer writer whose last replayed event was [sequenceNr={_sequenceNr}, " + + $"writerUUID={_writerUuid}] for the same persistenceId [{value.Persistent.PersistenceId}]. " + + "Perhaps, the old writer kept journaling messages after the new writer created, or duplicate PersistentId for different entities?"; LogIssue(errMsg); switch (Mode) { @@ -166,98 +201,78 @@ protected override bool Receive(object message) break; case ReplayFilterMode.Disabled: throw new ArgumentException("Mode must not be Disabled"); + default: + throw new ArgumentException($"Unknown {nameof(ReplayFilterMode)} value: {Mode}"); } } else { - // note that it is alright with == _sequenceNr, since such may be emitted by EventSeq - _buffer.AddLast(value); + // from new writer + if (!string.IsNullOrEmpty(_writerUuid)) + _oldWriters.AddLast(_writerUuid); + if (_oldWriters.Count > MaxOldWriters) + _oldWriters.RemoveFirst(); + _writerUuid = value.Persistent.WriterGuid; _sequenceNr = value.Persistent.SequenceNr; - } - } - else if (_oldWriters.Contains(value.Persistent.WriterGuid)) - { - // from old writer - var errMsg = $@"Invalid replayed event [sequenceNr={value.Persistent.SequenceNr}, writerUUID={value.Persistent.WriterGuid}]. - There was already a newer writer whose last replayed event was [sequenceNr={_sequenceNr}, writerUUID={_writerUuid}] for the same persistenceId [{value.Persistent.PersistenceId}]. - Perhaps, the old writer kept journaling messages after the new writer created, or duplicate PersistentId for different entities?"; - LogIssue(errMsg); - switch (Mode) - { - case ReplayFilterMode.RepairByDiscardOld: - //discard - break; - case ReplayFilterMode.Fail: - throw new IllegalStateException(errMsg); - case ReplayFilterMode.Warn: - _buffer.AddLast(value); - break; - case ReplayFilterMode.Disabled: - throw new ArgumentException("Mode must not be Disabled"); - } - } - else - { - // from new writer - if (!string.IsNullOrEmpty(_writerUuid)) - _oldWriters.AddLast(_writerUuid); - if (_oldWriters.Count > MaxOldWriters) - _oldWriters.RemoveFirst(); - _writerUuid = value.Persistent.WriterGuid; - _sequenceNr = value.Persistent.SequenceNr; - // clear the buffer from messages from other writers with higher SequenceNr - var node = _buffer.First; - while (node != null) - { - var next = node.Next; - var msg = node.Value; - if (msg.Persistent.SequenceNr >= _sequenceNr) + // clear the buffer from messages from other writers with higher SequenceNr + var node = _buffer.First; + while (node != null) { - var errMsg = $@"Invalid replayed event [sequenceNr=${value.Persistent.SequenceNr}, writerUUID=${value.Persistent.WriterGuid}] from a new writer. - An older writer already sent an event [sequenceNr=${msg.Persistent.SequenceNr}, writerUUID=${msg.Persistent.WriterGuid}] whose sequence number was equal or greater for the same persistenceId [${value.Persistent.PersistenceId}]. - Perhaps, the new writer journaled the event out of sequence, or duplicate PersistentId for different entities?"; - LogIssue(errMsg); - switch (Mode) + var next = node.Next; + var msg = node.Value; + if (msg.Persistent.SequenceNr >= _sequenceNr) { - case ReplayFilterMode.RepairByDiscardOld: - _buffer.Remove(node); - //discard - break; - case ReplayFilterMode.Fail: - throw new IllegalStateException(errMsg); - case ReplayFilterMode.Warn: - // keep - break; - case ReplayFilterMode.Disabled: - throw new ArgumentException("Mode must not be Disabled"); + var errMsg = + $"Invalid replayed event [sequenceNr=${value.Persistent.SequenceNr}, writerUUID=${value.Persistent.WriterGuid}] from a new writer. " + + $"An older writer already sent an event [sequenceNr=${msg.Persistent.SequenceNr}, " + + $"writerUUID=${msg.Persistent.WriterGuid}] whose sequence number was equal or greater " + + $"for the same persistenceId [${value.Persistent.PersistenceId}]. " + + "Perhaps, the new writer journaled the event out of sequence, or duplicate PersistentId for different entities?"; + LogIssue(errMsg); + switch (Mode) + { + case ReplayFilterMode.RepairByDiscardOld: + _buffer.Remove(node); + //discard + break; + case ReplayFilterMode.Fail: + throw new IllegalStateException(errMsg); + case ReplayFilterMode.Warn: + // keep + break; + case ReplayFilterMode.Disabled: + throw new ArgumentException("Mode must not be Disabled"); + default: + throw new ArgumentException($"Unknown {nameof(ReplayFilterMode)} value: {Mode}"); + } } + node = next; } - node = next; + _buffer.AddLast(value); } - _buffer.AddLast(value); } + catch (IllegalStateException ex) + { + if (Mode == ReplayFilterMode.Fail) + Fail(ex); + else + throw; + } + return true; + + case RecoverySuccess or ReplayMessagesFailure: + if (DebugEnabled) + _log.Debug($"Replay completed: {message}"); - } - catch (IllegalStateException ex) - { - if (Mode == ReplayFilterMode.Fail) - Fail(ex); - else - throw; - } - } - else if (message is RecoverySuccess or ReplayMessagesFailure) - { - if (DebugEnabled) - _log.Debug($"Replay completed: {message}"); - - SendBuffered(); - PersistentActor.Tell(message, ActorRefs.NoSender); - Context.Stop(Self); + SendBuffered(); + PersistentActor.Tell(message, ActorRefs.NoSender); + Context.Stop(Self); + return true; + + default: + return false; } - else return false; - return true; } private void SendBuffered() @@ -282,6 +297,8 @@ private void LogIssue(string errMsg) break; case ReplayFilterMode.Disabled: throw new ArgumentException("mode must not be Disabled"); + default: + throw new ArgumentException($"Unknown {nameof(ReplayFilterMode)}: {Mode} value. Message: {errMsg}"); } } @@ -291,20 +308,17 @@ private void Fail(IllegalStateException cause) PersistentActor.Tell(new ReplayMessagesFailure(cause), ActorRefs.NoSender); Context.Become(message => { - if (message is ReplayedMessage) - { - // discard - } - else if (message is RecoverySuccess or ReplayMessagesFailure) - { - Context.Stop(Self); - } - else + switch (message) { - return false; + case ReplayedMessage: + // discard + return true; + case RecoverySuccess or ReplayMessagesFailure: + Context.Stop(Self); + return true; + default: + return false; } - - return true; }); } } diff --git a/src/core/Akka.Persistence/RecoveryPermitter.cs b/src/core/Akka.Persistence/RecoveryPermitter.cs index cc29f40913c..37b6e16fd27 100644 --- a/src/core/Akka.Persistence/RecoveryPermitter.cs +++ b/src/core/Akka.Persistence/RecoveryPermitter.cs @@ -42,6 +42,7 @@ internal class RecoveryPermitter : UntypedActor private readonly ILoggingAdapter Log = Context.GetLogger(); private int _usedPermits; private int _maxPendingStats; + private ILoggingAdapter _log; public static Props Props(int maxPermits) => Actor.Props.Create(() => new RecoveryPermitter(maxPermits)); @@ -51,33 +52,38 @@ public static Props Props(int maxPermits) => public RecoveryPermitter(int maxPermits) { MaxPermits = maxPermits; + _log = Context.GetLogger(); } protected override void OnReceive(object message) { - if (message is RequestRecoveryPermit) + switch (message) { - Context.Watch(Sender); - if (_usedPermits >= MaxPermits) - { - if (pending.Count == 0) - Log.Debug("Exceeded max-concurrent-recoveries [{0}]. First pending {1}", MaxPermits, Sender); - pending.AddLast(Sender); - _maxPendingStats = Math.Max(_maxPendingStats, pending.Count); - } - else - { - RecoveryPermitGranted(Sender); - } - } - else if (message is ReturnRecoveryPermit) - { - ReturnRecoveryPermit(Sender); - } - else if (message is Terminated terminated && !pending.Remove(terminated.ActorRef)) - { - // pre-mature termination should be rare - ReturnRecoveryPermit(terminated.ActorRef); + case RequestRecoveryPermit: + _log.Info($"{nameof(RequestRecoveryPermit)} received: {Sender}"); + Context.Watch(Sender); + if (_usedPermits >= MaxPermits) + { + if (pending.Count == 0) + Log.Debug("Exceeded max-concurrent-recoveries [{0}]. First pending {1}", MaxPermits, Sender); + pending.AddLast(Sender); + _maxPendingStats = Math.Max(_maxPendingStats, pending.Count); + } + else + { + RecoveryPermitGranted(Sender); + _log.Info($"Recovery granted: {Sender}"); + } + break; + + case Akka.Persistence.ReturnRecoveryPermit: + ReturnRecoveryPermit(Sender); + break; + + case Terminated terminated when !pending.Remove(terminated.ActorRef): + // pre-mature termination should be rare + ReturnRecoveryPermit(terminated.ActorRef); + break; } } diff --git a/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs b/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs index 8474c1ab4bb..242e6b073e1 100644 --- a/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs +++ b/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs @@ -8,6 +8,7 @@ using System; using System.Threading.Tasks; using Akka.Actor; +using Akka.Event; using Akka.Pattern; namespace Akka.Persistence.Snapshot @@ -20,6 +21,8 @@ public abstract class SnapshotStore : ActorBase private readonly TaskContinuationOptions _continuationOptions = TaskContinuationOptions.ExecuteSynchronously; private readonly bool _publish; private readonly CircuitBreaker _breaker; + private readonly ILoggingAdapter _log; + private readonly bool _debugEnabled; /// /// Initializes a new instance of the class. @@ -32,7 +35,8 @@ protected SnapshotStore() var extension = Persistence.Instance.Apply(Context.System); if (extension == null) { - throw new ArgumentException("Couldn't initialize SnapshotStore instance, because associated Persistence extension has not been used in current actor system context."); + throw new ArgumentException( + "Couldn't initialize SnapshotStore instance, because associated Persistence extension has not been used in current actor system context."); } _publish = extension.Settings.Internal.PublishPluginCommands; @@ -42,6 +46,9 @@ protected SnapshotStore() config.GetInt("circuit-breaker.max-failures", 10), config.GetTimeSpan("circuit-breaker.call-timeout", TimeSpan.FromSeconds(10)), config.GetTimeSpan("circuit-breaker.reset-timeout", TimeSpan.FromSeconds(30))); + + _debugEnabled = config.GetBoolean("debug"); + _log = Context.GetLogger(); } /// @@ -54,144 +61,189 @@ private bool ReceiveSnapshotStore(object message) { var senderPersistentActor = Sender; // Sender is PersistentActor var self = Self; //Self MUST BE CLOSED OVER here, or the code below will be subject to race conditions - - if (message is LoadSnapshot loadSnapshot) - { - if (loadSnapshot.Criteria == SnapshotSelectionCriteria.None) - { - senderPersistentActor.Tell(new LoadSnapshotResult(null, loadSnapshot.ToSequenceNr)); - } - else - { - _breaker.WithCircuitBreaker(() => LoadAsync(loadSnapshot.PersistenceId, loadSnapshot.Criteria.Limit(loadSnapshot.ToSequenceNr))) - .ContinueWith(t => (!t.IsFaulted && !t.IsCanceled) - ? new LoadSnapshotResult(t.Result, loadSnapshot.ToSequenceNr) as ISnapshotResponse - : new LoadSnapshotFailed(t.IsFaulted - ? TryUnwrapException(t.Exception) - : new OperationCanceledException("LoadAsync canceled, possibly due to timing out.")), - _continuationOptions) - .PipeTo(senderPersistentActor); - } - } - else if (message is SaveSnapshot saveSnapshot) +#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + switch (message) { - var metadata = new SnapshotMetadata(saveSnapshot.Metadata.PersistenceId, saveSnapshot.Metadata.SequenceNr, DateTime.UtcNow); - - _breaker.WithCircuitBreaker(() => SaveAsync(metadata, saveSnapshot.Snapshot)) - .ContinueWith(t => (!t.IsFaulted && !t.IsCanceled) - ? new SaveSnapshotSuccess(metadata) as ISnapshotResponse - : new SaveSnapshotFailure(saveSnapshot.Metadata, - t.IsFaulted - ? TryUnwrapException(t.Exception) - : new OperationCanceledException("SaveAsync canceled, possibly due to timing out.")), - _continuationOptions) - .PipeTo(self, senderPersistentActor); + case LoadSnapshot loadSnapshot: + if(_debugEnabled) + _log.Info($"{nameof(LoadSnapshot)} message received."); + + LoadSnapshotAsync(loadSnapshot, self, senderPersistentActor); + break; + case SaveSnapshot saveSnapshot: + SaveSnapshotAsync(saveSnapshot, self, senderPersistentActor); + break; + case SaveSnapshotSuccess: + try + { + ReceivePluginInternal(message); + } + finally + { + senderPersistentActor.Tell(message); + } + + break; + case SaveSnapshotFailure saveSnapshotFailure: + try + { + ReceivePluginInternal(message); + _breaker.WithCircuitBreaker((msg: saveSnapshotFailure, ss: this), + state => state.ss.DeleteAsync(state.msg.Metadata)); + } + finally + { + senderPersistentActor.Tell(message); + } + + break; + case DeleteSnapshot deleteSnapshot: + DeleteSnapshotAsync(deleteSnapshot, self, senderPersistentActor); + break; + case DeleteSnapshotSuccess: + try + { + ReceivePluginInternal(message); + } + finally + { + senderPersistentActor.Tell(message); + } + + break; + case DeleteSnapshotFailure: + try + { + ReceivePluginInternal(message); + } + finally + { + senderPersistentActor.Tell(message); + } + + break; + case DeleteSnapshots deleteSnapshots: + DeleteSnapshotsAsync(deleteSnapshots, self, senderPersistentActor); + break; + case DeleteSnapshotsSuccess: + try + { + ReceivePluginInternal(message); + } + finally + { + senderPersistentActor.Tell(message); + } + + break; + case DeleteSnapshotsFailure: + try + { + ReceivePluginInternal(message); + } + finally + { + senderPersistentActor.Tell(message); + } + + break; + default: + return false; } - else if (message is SaveSnapshotSuccess) +#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + return true; + } + + private async Task DeleteSnapshotsAsync(DeleteSnapshots deleteSnapshots, IActorRef self, + IActorRef senderPersistentActor) + { + var eventStream = Context.System.EventStream; + try { - try - { - ReceivePluginInternal(message); - } - finally - { - senderPersistentActor.Tell(message); - } + await _breaker.WithCircuitBreaker((msg: deleteSnapshots, ss: this), + state => state.ss.DeleteAsync(state.msg.PersistenceId, state.msg.Criteria)); + + self.Tell(new DeleteSnapshotsSuccess(deleteSnapshots.Criteria), senderPersistentActor); } - else if (message is SaveSnapshotFailure saveSnapshotFailure) + catch (Exception ex) { - try - { - ReceivePluginInternal(message); - _breaker.WithCircuitBreaker(() => DeleteAsync(saveSnapshotFailure.Metadata)); - } - finally - { - senderPersistentActor.Tell(message); - } + self.Tell(new DeleteSnapshotsFailure(deleteSnapshots.Criteria, ex), senderPersistentActor); } - else if (message is DeleteSnapshot deleteSnapshot) + + if (_publish) + eventStream.Publish(deleteSnapshots); + } + + private async Task DeleteSnapshotAsync(DeleteSnapshot deleteSnapshot, IActorRef self, + IActorRef senderPersistentActor) + { + var eventStream = Context.System.EventStream; + + try { - var eventStream = Context.System.EventStream; - _breaker.WithCircuitBreaker(() => DeleteAsync(deleteSnapshot.Metadata)) - .ContinueWith(t => (!t.IsFaulted && !t.IsCanceled) - ? new DeleteSnapshotSuccess(deleteSnapshot.Metadata) as ISnapshotResponse - : new DeleteSnapshotFailure(deleteSnapshot.Metadata, - t.IsFaulted - ? TryUnwrapException(t.Exception) - : new OperationCanceledException("DeleteAsync canceled, possibly due to timing out.")), - _continuationOptions) - .PipeTo(self, senderPersistentActor) - .ContinueWith(_ => - { - if (_publish) - eventStream.Publish(message); - }, _continuationOptions); + await _breaker.WithCircuitBreaker((msg: deleteSnapshot, ss: this), + state => state.ss.DeleteAsync(state.msg.Metadata)); + + self.Tell(new DeleteSnapshotSuccess(deleteSnapshot.Metadata), senderPersistentActor); } - else if (message is DeleteSnapshotSuccess) + catch (Exception ex) { - try - { - ReceivePluginInternal(message); - } - finally - { - senderPersistentActor.Tell(message); - } + self.Tell(new DeleteSnapshotFailure(deleteSnapshot.Metadata, ex), senderPersistentActor); } - else if (message is DeleteSnapshotFailure) + + if (_publish) + eventStream.Publish(deleteSnapshot); + } + + private async Task SaveSnapshotAsync(SaveSnapshot saveSnapshot, IActorRef self, IActorRef senderPersistentActor) + { + var metadata = new SnapshotMetadata(saveSnapshot.Metadata.PersistenceId, + saveSnapshot.Metadata.SequenceNr, DateTime.UtcNow); + + try { - try - { - ReceivePluginInternal(message); - } - finally - { - senderPersistentActor.Tell(message); - } + await _breaker.WithCircuitBreaker((msg: metadata, save: saveSnapshot, ss: this), + state => state.ss.SaveAsync(state.msg, state.save.Snapshot)); + self.Tell(new SaveSnapshotSuccess(metadata), senderPersistentActor); } - else if (message is DeleteSnapshots deleteSnapshots) + catch (Exception ex) { - var eventStream = Context.System.EventStream; - _breaker.WithCircuitBreaker(() => DeleteAsync(deleteSnapshots.PersistenceId, deleteSnapshots.Criteria)) - .ContinueWith(t => (!t.IsFaulted && !t.IsCanceled) - ? new DeleteSnapshotsSuccess(deleteSnapshots.Criteria) as ISnapshotResponse - : new DeleteSnapshotsFailure(deleteSnapshots.Criteria, - t.IsFaulted - ? TryUnwrapException(t.Exception) - : new OperationCanceledException("DeleteAsync canceled, possibly due to timing out.")), - _continuationOptions) - .PipeTo(self, senderPersistentActor) - .ContinueWith(_ => - { - if (_publish) - eventStream.Publish(message); - }, _continuationOptions); + self.Tell(new SaveSnapshotFailure(metadata, ex), senderPersistentActor); } - else if (message is DeleteSnapshotsSuccess) + } + + private async Task LoadSnapshotAsync(LoadSnapshot loadSnapshot, IActorRef self, IActorRef senderPersistentActor) + { + if (loadSnapshot.Criteria == SnapshotSelectionCriteria.None) { - try - { - ReceivePluginInternal(message); - } - finally - { - senderPersistentActor.Tell(message); - } + senderPersistentActor.Tell(new LoadSnapshotResult(null, loadSnapshot.ToSequenceNr), self); } - else if (message is DeleteSnapshotsFailure) + else { try { - ReceivePluginInternal(message); + if(_debugEnabled) + _log.Info($"Starting {nameof(LoadSnapshotAsync)} circuit breaker."); + + var result = await _breaker.WithCircuitBreaker((msg: loadSnapshot, ss: this), + state => + { + if(_debugEnabled) + _log.Info($"Invoking {nameof(LoadAsync)} inside circuit breaker."); + + return state.ss.LoadAsync(state.msg.PersistenceId, state.msg.Criteria.Limit(state.msg.ToSequenceNr)); + }); + + if(_debugEnabled) + _log.Info($"{nameof(LoadSnapshotAsync)} circuit breaker completed."); + + senderPersistentActor.Tell(new LoadSnapshotResult(result, loadSnapshot.ToSequenceNr), self); } - finally + catch (Exception ex) { - senderPersistentActor.Tell(message); + senderPersistentActor.Tell(new LoadSnapshotFailed(ex), self); } } - else return false; - return true; } private Exception TryUnwrapException(Exception e) @@ -203,6 +255,7 @@ private Exception TryUnwrapException(Exception e) if (aggregateException.InnerExceptions.Count == 1) return aggregateException.InnerExceptions[0]; } + return e; } @@ -256,4 +309,4 @@ protected virtual bool ReceivePluginInternal(object message) return false; } } -} +} \ No newline at end of file