From adab09d8f1e7cb1fe16f1c48bbd0f0e1c32ee917 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Fri, 19 Jan 2024 09:30:24 -0600 Subject: [PATCH 01/37] attempting to fix racy Akka.Persistence.TestKit.Tests --- .../Actors/CounterActor.cs | 14 ++++++++++++-- .../Bug4762FixSpec.cs | 1 - 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs b/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs index ca404a5a0a4..2b19e30b745 100644 --- a/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs +++ b/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs @@ -72,6 +72,16 @@ protected override void OnRecover(object message) return; } } + + protected override void PostStop() + { + _log.Info("Shutting down"); + } + + protected override void PreStart() + { + _log.Info("Starting up"); + } } public class CounterActorTests : PersistenceTestKit @@ -86,9 +96,9 @@ public Task CounterActor_internal_state_will_be_lost_if_underlying_persistence_s var counterProps = Props.Create(() => new CounterActor("test")); var actor = ActorOf(counterProps, "counter"); - Watch(actor); + await WatchAsync(actor); actor.Tell("inc", TestActor); - await ExpectMsgAsync(TimeSpan.FromSeconds(3)); + await ExpectTerminatedAsync(actor); // need to restart actor actor = ActorOf(counterProps, "counter1"); diff --git a/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs b/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs index d9cb3099aa2..dee3bcd0f45 100644 --- a/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs +++ b/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs @@ -80,7 +80,6 @@ public Task TestJournal_PersistAll_should_only_count_each_event_exceptions_once( return WithJournalWrite(write => write.Pass(), async () => { var actor = ActorOf(() => new TestActor2(probe)); - Watch(actor); var command = new WriteMessage(); actor.Tell(command, actor); From 185d5adcd7cbe0df0ecf8747505db8d63cce4833 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Fri, 19 Jan 2024 09:53:55 -0600 Subject: [PATCH 02/37] added debug log --- src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs b/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs index 2b19e30b745..d4275c6f059 100644 --- a/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs +++ b/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs @@ -96,6 +96,7 @@ public Task CounterActor_internal_state_will_be_lost_if_underlying_persistence_s var counterProps = Props.Create(() => new CounterActor("test")); var actor = ActorOf(counterProps, "counter"); + Sys.Log.Info("Messaging actor"); await WatchAsync(actor); actor.Tell("inc", TestActor); await ExpectTerminatedAsync(actor); From 272b962eebf3819e94002a090cbf55157990673f Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Fri, 19 Jan 2024 10:27:14 -0600 Subject: [PATCH 03/37] looking into some suspicious `await` calls inside the `AsyncWriteJournal` --- .../Journal/JournalInterceptors.cs | 2 +- .../Journal/AsyncWriteJournal.cs | 27 ++++++++++--------- .../Akka.Persistence/Journal/MemoryJournal.cs | 4 +-- 3 files changed, 18 insertions(+), 15 deletions(-) diff --git a/src/core/Akka.Persistence.TestKit/Journal/JournalInterceptors.cs b/src/core/Akka.Persistence.TestKit/Journal/JournalInterceptors.cs index 22f3eadf865..41ff1a3e3b0 100644 --- a/src/core/Akka.Persistence.TestKit/Journal/JournalInterceptors.cs +++ b/src/core/Akka.Persistence.TestKit/Journal/JournalInterceptors.cs @@ -16,7 +16,7 @@ internal class Noop : IJournalInterceptor { public static readonly IJournalInterceptor Instance = new Noop(); - public Task InterceptAsync(IPersistentRepresentation message) => Task.FromResult(true); + public Task InterceptAsync(IPersistentRepresentation message) => Task.CompletedTask; } internal class Failure : IJournalInterceptor diff --git a/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs b/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs index a30590147b5..d3f88339d93 100644 --- a/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs +++ b/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs @@ -259,20 +259,11 @@ private void HandleReplayMessages(ReplayMessages message) async Task ExecuteHighestSequenceNr() { - void CompleteHighSeqNo(long highSeqNo) - { - replyTo.Tell(new RecoverySuccess(highSeqNo)); - - if (CanPublish) - { - eventStream.Publish(message); - } - } - try { var highSequenceNr = await _breaker.WithCircuitBreaker((message, readHighestSequenceNrFrom, awj: this), state => - state.awj.ReadHighestSequenceNrAsync(state.message.PersistenceId, state.readHighestSequenceNrFrom)); + state.awj.ReadHighestSequenceNrAsync(state.message.PersistenceId, state.readHighestSequenceNrFrom)) + .ConfigureAwait(false); var toSequenceNr = Math.Min(message.ToSequenceNr, highSequenceNr); if (toSequenceNr <= 0L || message.FromSequenceNr > toSequenceNr) { @@ -293,7 +284,7 @@ await ReplayMessagesAsync(context, message.PersistenceId, message.FromSequenceNr replyTo.Tell(new ReplayedMessage(adaptedRepresentation), ActorRefs.NoSender); } } - }); + }).ConfigureAwait(false); CompleteHighSeqNo(highSequenceNr); } @@ -309,6 +300,18 @@ await ReplayMessagesAsync(context, message.PersistenceId, message.FromSequenceNr { replyTo.Tell(new ReplayMessagesFailure(TryUnwrapException(ex))); } + + return; + + void CompleteHighSeqNo(long highSeqNo) + { + replyTo.Tell(new RecoverySuccess(highSeqNo)); + + if (CanPublish) + { + eventStream.Publish(message); + } + } } // instead of ContinueWith diff --git a/src/core/Akka.Persistence/Journal/MemoryJournal.cs b/src/core/Akka.Persistence/Journal/MemoryJournal.cs index 25a861db3ac..445b8d38917 100644 --- a/src/core/Akka.Persistence/Journal/MemoryJournal.cs +++ b/src/core/Akka.Persistence/Journal/MemoryJournal.cs @@ -137,7 +137,7 @@ public override Task ReplayMessagesAsync(IActorContext context, string persisten var highest = HighestSequenceNr(persistenceId); if (highest != 0L && max != 0L) Read(persistenceId, fromSequenceNr, Math.Min(toSequenceNr, highest), max).ForEach(recoveryCallback); - return Task.FromResult(new object()); + return Task.CompletedTask; } /// @@ -154,7 +154,7 @@ protected override Task DeleteMessagesToAsync(string persistenceId, long toSeque _meta.AddOrUpdate(persistenceId, highestSeqNr, (_, _) => highestSeqNr); for (var snr = 1L; snr <= toSeqNr; snr++) Delete(persistenceId, snr); - return Task.FromResult(new object()); + return Task.CompletedTask; } protected override bool ReceivePluginInternal(object message) From 7b8b538a5b8a8b102198f93b4496c35070f86f6a Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 23 Jan 2024 08:09:54 -0600 Subject: [PATCH 04/37] minor code clean up --- .../PersistenceTestKit.cs | 10 +++++----- .../SnapshotStore/SnapshotStoreSaveBehaviorSetter.cs | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/core/Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs b/src/core/Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs index 406dab3d5d6..e7b4d74b1dc 100644 --- a/src/core/Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs +++ b/src/core/Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs @@ -157,7 +157,7 @@ public Task WithJournalRecovery(Func behaviorSele if (execution == null) throw new ArgumentNullException(nameof(execution)); execution(); - return Task.FromResult(new object()); + return Task.CompletedTask; }); /// @@ -175,7 +175,7 @@ public Task WithJournalWrite(Func behaviorSelector, if (execution == null) throw new ArgumentNullException(nameof(execution)); execution(); - return Task.FromResult(new object()); + return Task.CompletedTask; }); /// @@ -268,7 +268,7 @@ public Task WithSnapshotSave(Func behaviorSelec if (execution == null) throw new ArgumentNullException(nameof(execution)); execution(); - return Task.FromResult(true); + return Task.CompletedTask; }); /// @@ -286,7 +286,7 @@ public Task WithSnapshotLoad(Func behaviorSelec if (execution == null) throw new ArgumentNullException(nameof(execution)); execution(); - return Task.FromResult(true); + return Task.CompletedTask; }); /// @@ -304,7 +304,7 @@ public Task WithSnapshotDelete(Func behaviorS if (execution == null) throw new ArgumentNullException(nameof(execution)); execution(); - return Task.FromResult(true); + return Task.CompletedTask; }); /// diff --git a/src/core/Akka.Persistence.TestKit/SnapshotStore/SnapshotStoreSaveBehaviorSetter.cs b/src/core/Akka.Persistence.TestKit/SnapshotStore/SnapshotStoreSaveBehaviorSetter.cs index 67ccccb0d49..84e7fc8ef77 100644 --- a/src/core/Akka.Persistence.TestKit/SnapshotStore/SnapshotStoreSaveBehaviorSetter.cs +++ b/src/core/Akka.Persistence.TestKit/SnapshotStore/SnapshotStoreSaveBehaviorSetter.cs @@ -18,7 +18,7 @@ internal class SnapshotStoreSaveBehaviorSetter : ISnapshotStoreBehaviorSetter { internal SnapshotStoreSaveBehaviorSetter(IActorRef snapshots) { - this._snapshots = snapshots; + _snapshots = snapshots; } private readonly IActorRef _snapshots; From ef8fd689462ec1bbb3fc28a5c4063cbeba927a94 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 23 Jan 2024 08:20:59 -0600 Subject: [PATCH 05/37] `MemoryJournal` cleanup --- .../Akka.Persistence/Journal/MemoryJournal.cs | 21 ++++++++----------- 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/src/core/Akka.Persistence/Journal/MemoryJournal.cs b/src/core/Akka.Persistence/Journal/MemoryJournal.cs index 445b8d38917..7358359fc60 100644 --- a/src/core/Akka.Persistence/Journal/MemoryJournal.cs +++ b/src/core/Akka.Persistence/Journal/MemoryJournal.cs @@ -91,7 +91,7 @@ protected override Task> WriteMessagesAsync(IEnumerabl var persistentRepresentation = p.WithTimestamp(DateTime.UtcNow.Ticks); Add(persistentRepresentation); _allMessages.AddLast(persistentRepresentation); - if (!(p.Payload is Tagged tagged)) continue; + if (p.Payload is not Tagged tagged) continue; foreach (var tag in tagged.Tags) { @@ -197,7 +197,7 @@ private Task ReplayTaggedMessagesAsync(ReplayTaggedMessages replay) if (!_tagsToMessagesMapping.ContainsKey(replay.Tag)) return Task.FromResult(0); - int index = 0; + var index = 0; foreach (var persistence in _tagsToMessagesMapping[replay.Tag] .Skip(replay.FromOffset) .Take(replay.ToOffset)) @@ -212,7 +212,7 @@ private Task ReplayTaggedMessagesAsync(ReplayTaggedMessages replay) private Task ReplayAllEventsAsync(ReplayAllEvents replay) { - int index = 0; + var index = 0; var replayed = _allMessages .Skip(replay.FromOffset) .Take(replay.ToOffset - replay.FromOffset) @@ -557,7 +557,7 @@ public Messages Add(IPersistentRepresentation persistent) /// TBD public Messages Update(string pid, long seqNr, Func updater) { - if (Messages.TryGetValue(pid, out LinkedList persistents)) + if (Messages.TryGetValue(pid, out var persistents)) { var node = persistents.First; while (node != null) @@ -580,7 +580,7 @@ public Messages Update(string pid, long seqNr, FuncTBD public Messages Delete(string pid, long seqNr) { - if (Messages.TryGetValue(pid, out LinkedList persistents)) + if (Messages.TryGetValue(pid, out var persistents)) { var node = persistents.First; while (node != null) @@ -605,7 +605,7 @@ public Messages Delete(string pid, long seqNr) /// TBD public IEnumerable Read(string pid, long fromSeqNr, long toSeqNr, long max) { - if (Messages.TryGetValue(pid, out LinkedList persistents)) + if (Messages.TryGetValue(pid, out var persistents)) { return persistents .Where(x => x.SequenceNr >= fromSeqNr && x.SequenceNr <= toSeqNr) @@ -622,13 +622,10 @@ public IEnumerable Read(string pid, long fromSeqNr, l /// TBD public long HighestSequenceNr(string pid) { - if (Messages.TryGetValue(pid, out LinkedList persistents)) - { - var last = persistents.LastOrDefault(); - return last?.SequenceNr ?? 0L; - } + if (!Messages.TryGetValue(pid, out var persistents)) return 0L; + var last = persistents.LastOrDefault(); + return last?.SequenceNr ?? 0L; - return 0L; } #endregion From caf93d767df96d9226f787b54801e500f8173f96 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 23 Jan 2024 08:35:27 -0600 Subject: [PATCH 06/37] added more robust external error logging --- .../PersistenceTestKit.cs | 26 +++++++++++++++++++ .../Akka.Persistence/Journal/MemoryJournal.cs | 2 +- 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/src/core/Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs b/src/core/Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs index e7b4d74b1dc..44a420a4e8d 100644 --- a/src/core/Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs +++ b/src/core/Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs @@ -7,6 +7,7 @@ using Akka.Actor.Setup; using Akka.Configuration; +using Akka.Event; namespace Akka.Persistence.TestKit { @@ -111,6 +112,11 @@ public async Task WithJournalRecovery(Func behavi await behaviorSelector(Journal.OnRecovery); await execution(); } + catch (Exception ex) + { + Log.Error(ex, "Error during execution of WithJournalRecovery"); + throw; + } finally { await Journal.OnRecovery.Pass(); @@ -136,6 +142,11 @@ public async Task WithJournalWrite(Func behaviorSele await behaviorSelector(Journal.OnWrite); await execution(); } + catch (Exception ex) + { + Log.Error(ex, "Error during execution of WithJournalWrite"); + throw; + } finally { await Journal.OnWrite.Pass(); @@ -197,6 +208,11 @@ public async Task WithSnapshotSave(Func behavio await behaviorSelector(Snapshots.OnSave); await execution(); } + catch (Exception ex) + { + Log.Error(ex, "Error during execution of WithSnapshotSave"); + throw; + } finally { await Snapshots.OnSave.Pass(); @@ -222,6 +238,11 @@ public async Task WithSnapshotLoad(Func behavio await behaviorSelector(Snapshots.OnLoad); await execution(); } + catch (Exception ex) + { + Log.Error(ex, "Error during execution of WithSnapshotLoad"); + throw; + } finally { await Snapshots.OnLoad.Pass(); @@ -247,6 +268,11 @@ public async Task WithSnapshotDelete(Func beh await behaviorSelector(Snapshots.OnDelete); await execution(); } + catch (Exception ex) + { + Log.Error(ex, "Error during execution of WithSnapshotDelete"); + throw; + } finally { await Snapshots.OnDelete.Pass(); diff --git a/src/core/Akka.Persistence/Journal/MemoryJournal.cs b/src/core/Akka.Persistence/Journal/MemoryJournal.cs index 7358359fc60..89cc7756835 100644 --- a/src/core/Akka.Persistence/Journal/MemoryJournal.cs +++ b/src/core/Akka.Persistence/Journal/MemoryJournal.cs @@ -118,7 +118,7 @@ protected override Task> WriteMessagesAsync(IEnumerabl /// TBD public override Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr) { - return Task.FromResult(Math.Max(HighestSequenceNr(persistenceId), _meta.TryGetValue(persistenceId, out long metaSeqNr) ? metaSeqNr : 0L)); + return Task.FromResult(Math.Max(HighestSequenceNr(persistenceId), _meta.TryGetValue(persistenceId, out var metaSeqNr) ? metaSeqNr : 0L)); } /// From 30897ff96abe769670aab69b0f3e81e4ea43048b Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 23 Jan 2024 09:02:16 -0600 Subject: [PATCH 07/37] disable sync thread dispatcher --- src/core/Akka.TestKit/TestKitBase.cs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/core/Akka.TestKit/TestKitBase.cs b/src/core/Akka.TestKit/TestKitBase.cs index c639719304b..d13da2333b0 100644 --- a/src/core/Akka.TestKit/TestKitBase.cs +++ b/src/core/Akka.TestKit/TestKitBase.cs @@ -701,8 +701,9 @@ public IActorRef CreateTestActor(string name) private IActorRef CreateTestActor(ActorSystem system, string name) { - var testActorProps = Props.Create(() => new InternalTestActor(new BlockingCollectionTestActorQueue(_testState.Queue))) - .WithDispatcher("akka.test.test-actor.dispatcher"); + var testActorProps = Props.Create(() => + new InternalTestActor(new BlockingCollectionTestActorQueue(_testState.Queue))); + //.WithDispatcher("akka.test.test-actor.dispatcher"); var testActor = system.AsInstanceOf().SystemActorOf(testActorProps, name); return testActor; } From fbb0980640f3c12a8bbe4c400ba8c9e3bfd4c64e Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 23 Jan 2024 09:33:23 -0600 Subject: [PATCH 08/37] Revert "disable sync thread dispatcher" This reverts commit 30897ff96abe769670aab69b0f3e81e4ea43048b. --- src/core/Akka.TestKit/TestKitBase.cs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/core/Akka.TestKit/TestKitBase.cs b/src/core/Akka.TestKit/TestKitBase.cs index d13da2333b0..c639719304b 100644 --- a/src/core/Akka.TestKit/TestKitBase.cs +++ b/src/core/Akka.TestKit/TestKitBase.cs @@ -701,9 +701,8 @@ public IActorRef CreateTestActor(string name) private IActorRef CreateTestActor(ActorSystem system, string name) { - var testActorProps = Props.Create(() => - new InternalTestActor(new BlockingCollectionTestActorQueue(_testState.Queue))); - //.WithDispatcher("akka.test.test-actor.dispatcher"); + var testActorProps = Props.Create(() => new InternalTestActor(new BlockingCollectionTestActorQueue(_testState.Queue))) + .WithDispatcher("akka.test.test-actor.dispatcher"); var testActor = system.AsInstanceOf().SystemActorOf(testActorProps, name); return testActor; } From 0178cd3906af0b09c170387acbc5c07836c115eb Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 23 Jan 2024 09:46:18 -0600 Subject: [PATCH 09/37] added debugging capabilities to `TestJournal` --- .../Journal/TestJournal.cs | 29 +++++++++++++++++++ src/core/Akka.Persistence.TestKit/config.conf | 5 +++- 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/src/core/Akka.Persistence.TestKit/Journal/TestJournal.cs b/src/core/Akka.Persistence.TestKit/Journal/TestJournal.cs index 56aa3f6b528..9f572ca34f1 100644 --- a/src/core/Akka.Persistence.TestKit/Journal/TestJournal.cs +++ b/src/core/Akka.Persistence.TestKit/Journal/TestJournal.cs @@ -5,6 +5,9 @@ // //----------------------------------------------------------------------- +using Akka.Configuration; +using Akka.Event; + namespace Akka.Persistence.TestKit { using Akka.Actor; @@ -20,19 +23,31 @@ namespace Akka.Persistence.TestKit /// public sealed class TestJournal : MemoryJournal { + private readonly ILoggingAdapter _log = Context.GetLogger(); private IJournalInterceptor _writeInterceptor = JournalInterceptors.Noop.Instance; private IJournalInterceptor _recoveryInterceptor = JournalInterceptors.Noop.Instance; + public TestJournal(Config journalConfig) + { + DebugEnabled = journalConfig.GetBoolean("debug", false); + } + + private bool DebugEnabled { get; } + protected override bool ReceivePluginInternal(object message) { switch (message) { case UseWriteInterceptor use: + if(DebugEnabled) + _log.Info("Using write interceptor {0}", use.Interceptor.GetType().Name); _writeInterceptor = use.Interceptor; Sender.Tell(Ack.Instance); return true; case UseRecoveryInterceptor use: + if(DebugEnabled) + _log.Info("Using recovery interceptor {0}", use.Interceptor.GetType().Name); _recoveryInterceptor = use.Interceptor; Sender.Tell(Ack.Instance); return true; @@ -51,7 +66,12 @@ protected override async Task> WriteMessagesAsync(IEnu { foreach (var p in (IEnumerable)w.Payload) { + if(DebugEnabled) + _log.Info("Beginning write intercept of message {0} with interceptor {1}", p, _writeInterceptor.GetType().Name); await _writeInterceptor.InterceptAsync(p); + + if(DebugEnabled) + _log.Info("Completed write intercept of message {0} with interceptor {1}", p, _writeInterceptor.GetType().Name); Add(p); } } @@ -75,6 +95,10 @@ protected override async Task> WriteMessagesAsync(IEnu public override async Task ReplayMessagesAsync(IActorContext context, string persistenceId, long fromSequenceNr, long toSequenceNr, long max, Action recoveryCallback) { var highest = HighestSequenceNr(persistenceId); + + if(DebugEnabled) + _log.Info("Replaying messages from {0} to {1} for persistenceId {2}", fromSequenceNr, toSequenceNr, persistenceId); + if (highest != 0L && max != 0L) { var messages = Read(persistenceId, fromSequenceNr, Math.Min(toSequenceNr, highest), max); @@ -82,7 +106,12 @@ public override async Task ReplayMessagesAsync(IActorContext context, string per { try { + if(DebugEnabled) + _log.Info("Beginning recovery intercept of message {0} with interceptor {1}", p, _recoveryInterceptor.GetType().Name); await _recoveryInterceptor.InterceptAsync(p); + + if(DebugEnabled) + _log.Info("Completed recovery intercept of message {0} with interceptor {1}", p, _recoveryInterceptor.GetType().Name); recoveryCallback(p); } catch (TestJournalFailureException) diff --git a/src/core/Akka.Persistence.TestKit/config.conf b/src/core/Akka.Persistence.TestKit/config.conf index dda3d635272..a416e0fb7ca 100644 --- a/src/core/Akka.Persistence.TestKit/config.conf +++ b/src/core/Akka.Persistence.TestKit/config.conf @@ -7,6 +7,9 @@ akka { test { class = "Akka.Persistence.TestKit.TestJournal, Akka.Persistence.TestKit" plugin-dispatcher = "akka.actor.default-dispatcher" + + # enables debug mode, which adds verbose logging to each of the TestJournal stages + debug = false } } @@ -16,7 +19,7 @@ akka { test { class = "Akka.Persistence.TestKit.TestSnapshotStore, Akka.Persistence.TestKit" - plugin-dispatcher = "akka.actor.default-dispatcher" + plugin-dispatcher = "akka.actor.default-dispatcher" } } } From 7206ccac995c54185684a023a31bc791678aa93e Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 23 Jan 2024 10:00:58 -0600 Subject: [PATCH 10/37] added debug logging to `CounterActor` specs --- .../Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs | 6 +++++- .../Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs b/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs index d4275c6f059..51b7fd8c9c3 100644 --- a/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs +++ b/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs @@ -5,6 +5,7 @@ // //----------------------------------------------------------------------- +using Akka.Configuration; using Akka.Event; using Xunit.Abstractions; @@ -86,7 +87,10 @@ protected override void PreStart() public class CounterActorTests : PersistenceTestKit { - public CounterActorTests(ITestOutputHelper output) : base(output:output){} + // create a Config that enables debug mode on the TestJournal + private static readonly Config Config = ConfigurationFactory.ParseString("akka.persistence.journal.test.debug = on"); + + public CounterActorTests(ITestOutputHelper output) : base(Config, output:output){} [Fact] public Task CounterActor_internal_state_will_be_lost_if_underlying_persistence_store_is_not_available() diff --git a/src/core/Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs b/src/core/Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs index 44a420a4e8d..f35070446b2 100644 --- a/src/core/Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs +++ b/src/core/Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs @@ -374,7 +374,7 @@ private static Config GetConfig(Config customConfig) { var defaultConfig = ConfigurationFactory.FromResource("Akka.Persistence.TestKit.config.conf"); if (customConfig == Config.Empty) return defaultConfig; - else return defaultConfig.SafeWithFallback(customConfig); + else return customConfig.WithFallback(defaultConfig); } } } From daf2c8cb77988cf6706fed6d5c4ff4a64d073889 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 23 Jan 2024 10:07:18 -0600 Subject: [PATCH 11/37] added debug logging to `Bug4762FixSpec` --- .../Bug4762FixSpec.cs | 25 +++++++++++-------- .../Journal/TestJournal.cs | 3 +++ 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs b/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs index dee3bcd0f45..3e78a44d13f 100644 --- a/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs +++ b/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs @@ -11,6 +11,7 @@ using System.Text; using System.Threading.Tasks; using Akka.Actor; +using Akka.Configuration; using Akka.Event; using Xunit; using Xunit.Abstractions; @@ -22,16 +23,21 @@ namespace Akka.Persistence.TestKit.Tests /// public class Bug4762FixSpec : PersistenceTestKit { - public Bug4762FixSpec(ITestOutputHelper outputHelper) : base(output: outputHelper) + // create a Config that enables debug mode on the TestJournal + private static readonly Config Config = + ConfigurationFactory.ParseString("akka.persistence.journal.test.debug = on"); + + public Bug4762FixSpec(ITestOutputHelper outputHelper) : base(Config, output: outputHelper) { - } - + private class WriteMessage - { } + { + } private class TestEvent - { } + { + } private class TestActor2 : UntypedPersistentActor { @@ -48,17 +54,14 @@ public TestActor2(IActorRef probe) protected override void OnCommand(object message) { _log.Info("Received command {0}", message); - + switch (message) { case WriteMessage _: var event1 = new TestEvent(); var event2 = new TestEvent(); var events = new List { event1, event2 }; - PersistAll(events, _ => - { - _probe.Tell(Done.Instance); - }); + PersistAll(events, _ => { _probe.Tell(Done.Instance); }); break; default: @@ -91,4 +94,4 @@ public Task TestJournal_PersistAll_should_only_count_each_event_exceptions_once( }); } } -} +} \ No newline at end of file diff --git a/src/core/Akka.Persistence.TestKit/Journal/TestJournal.cs b/src/core/Akka.Persistence.TestKit/Journal/TestJournal.cs index 9f572ca34f1..bf9ea165013 100644 --- a/src/core/Akka.Persistence.TestKit/Journal/TestJournal.cs +++ b/src/core/Akka.Persistence.TestKit/Journal/TestJournal.cs @@ -121,6 +121,9 @@ public override async Task ReplayMessagesAsync(IActorContext context, string per throw; } } + + if(DebugEnabled) + _log.Info("Completed replaying messages from {0} to {1} for persistenceId {2}", fromSequenceNr, toSequenceNr, persistenceId); } } From 26108a79dd081b7f2ba631055950a5e920a207a1 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 23 Jan 2024 16:13:37 -0600 Subject: [PATCH 12/37] added debug logging to `TestSnapshotStore` --- .../Actors/CounterActor.cs | 6 ++- .../Bug4762FixSpec.cs | 5 ++- .../Journal/TestJournal.cs | 14 +++--- .../SnapshotStore/TestSnapshotStore.cs | 45 ++++++++++++++++++- src/core/Akka.Persistence.TestKit/config.conf | 3 ++ 5 files changed, 62 insertions(+), 11 deletions(-) diff --git a/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs b/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs index 51b7fd8c9c3..e2d8e2e9445 100644 --- a/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs +++ b/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs @@ -88,7 +88,11 @@ protected override void PreStart() public class CounterActorTests : PersistenceTestKit { // create a Config that enables debug mode on the TestJournal - private static readonly Config Config = ConfigurationFactory.ParseString("akka.persistence.journal.test.debug = on"); + private static readonly Config Config = + ConfigurationFactory.ParseString(""" + akka.persistence.journal.test.debug = on + akka.persistence.snapshot-store.test.debug = on + """); public CounterActorTests(ITestOutputHelper output) : base(Config, output:output){} diff --git a/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs b/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs index 3e78a44d13f..a590aa2e3e7 100644 --- a/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs +++ b/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs @@ -25,7 +25,10 @@ public class Bug4762FixSpec : PersistenceTestKit { // create a Config that enables debug mode on the TestJournal private static readonly Config Config = - ConfigurationFactory.ParseString("akka.persistence.journal.test.debug = on"); + ConfigurationFactory.ParseString(""" + akka.persistence.journal.test.debug = on + akka.persistence.snapshot-store.test.debug = on + """); public Bug4762FixSpec(ITestOutputHelper outputHelper) : base(Config, output: outputHelper) { diff --git a/src/core/Akka.Persistence.TestKit/Journal/TestJournal.cs b/src/core/Akka.Persistence.TestKit/Journal/TestJournal.cs index bf9ea165013..4333efeb7ad 100644 --- a/src/core/Akka.Persistence.TestKit/Journal/TestJournal.cs +++ b/src/core/Akka.Persistence.TestKit/Journal/TestJournal.cs @@ -7,17 +7,15 @@ using Akka.Configuration; using Akka.Event; +using Akka.Actor; +using Akka.Persistence.Journal; +using System; +using System.Collections.Generic; +using System.Collections.Immutable; +using System.Threading.Tasks; namespace Akka.Persistence.TestKit { - using Akka.Actor; - using Akka.Persistence; - using Akka.Persistence.Journal; - using System; - using System.Collections.Generic; - using System.Collections.Immutable; - using System.Threading.Tasks; - /// /// In-memory persistence journal implementation which behavior could be controlled by interceptors. /// diff --git a/src/core/Akka.Persistence.TestKit/SnapshotStore/TestSnapshotStore.cs b/src/core/Akka.Persistence.TestKit/SnapshotStore/TestSnapshotStore.cs index 838f06ce40e..2be0704d986 100644 --- a/src/core/Akka.Persistence.TestKit/SnapshotStore/TestSnapshotStore.cs +++ b/src/core/Akka.Persistence.TestKit/SnapshotStore/TestSnapshotStore.cs @@ -5,6 +5,9 @@ // //----------------------------------------------------------------------- +using Akka.Configuration; +using Akka.Event; + namespace Akka.Persistence.TestKit { using System.Threading.Tasks; @@ -19,22 +22,36 @@ public class TestSnapshotStore : MemorySnapshotStore private ISnapshotStoreInterceptor _saveInterceptor = SnapshotStoreInterceptors.Noop.Instance; private ISnapshotStoreInterceptor _loadInterceptor = SnapshotStoreInterceptors.Noop.Instance; private ISnapshotStoreInterceptor _deleteInterceptor = SnapshotStoreInterceptors.Noop.Instance; + private readonly ILoggingAdapter _log = Context.GetLogger(); + + public TestSnapshotStore(Config snapshotStoreConfig) + { + DebugEnabled = snapshotStoreConfig.GetBoolean("debug", false); + } + + private bool DebugEnabled { get; } protected override bool ReceivePluginInternal(object message) { switch (message) { case UseSaveInterceptor use: + if(DebugEnabled) + _log.Info("Using save interceptor {0}", use.Interceptor.GetType().Name); _saveInterceptor = use.Interceptor; Sender.Tell(Ack.Instance); return true; case UseLoadInterceptor use: + if(DebugEnabled) + _log.Info("Using load interceptor {0}", use.Interceptor.GetType().Name); _loadInterceptor = use.Interceptor; Sender.Tell(Ack.Instance); return true; case UseDeleteInterceptor use: + if(DebugEnabled) + _log.Info("Using delete interceptor {0}", use.Interceptor.GetType().Name); _deleteInterceptor = use.Interceptor; Sender.Tell(Ack.Instance); return true; @@ -46,29 +63,55 @@ protected override bool ReceivePluginInternal(object message) protected override async Task SaveAsync(SnapshotMetadata metadata, object snapshot) { + if(DebugEnabled) + _log.Info("Starting to intercept snapshot {0} saving using interceptor {1}", metadata, _saveInterceptor.GetType().Name); + await _saveInterceptor.InterceptAsync(metadata.PersistenceId, ToSelectionCriteria(metadata)); + + if(DebugEnabled) + _log.Info("Completed intercept snapshot {0} saving using interceptor {1}", metadata, _saveInterceptor.GetType().Name); await base.SaveAsync(metadata, snapshot); } protected override async Task LoadAsync(string persistenceId, SnapshotSelectionCriteria criteria) { + if(DebugEnabled) + _log.Info("Starting to intercept snapshot {0} loading using interceptor {1}", persistenceId, _loadInterceptor.GetType().Name); await _loadInterceptor.InterceptAsync(persistenceId, criteria); + + if(DebugEnabled) + _log.Info("Completed intercept snapshot {0} loading using interceptor {1}", persistenceId, _loadInterceptor.GetType().Name); + return await base.LoadAsync(persistenceId, criteria); } protected override async Task DeleteAsync(SnapshotMetadata metadata) { + if(DebugEnabled) + _log.Info("Starting to intercept snapshot {0} deletion using interceptor {1}", metadata, _deleteInterceptor.GetType().Name); + await _deleteInterceptor.InterceptAsync(metadata.PersistenceId, ToSelectionCriteria(metadata)); + + if(DebugEnabled) + _log.Info("Completed intercept snapshot {0} deletion using interceptor {1}", metadata, _deleteInterceptor.GetType().Name); + await base.DeleteAsync(metadata); } protected override async Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria) { + if(DebugEnabled) + _log.Info("Starting to intercept snapshot {0} deletion using interceptor {1}", persistenceId, _deleteInterceptor.GetType().Name); + await _deleteInterceptor.InterceptAsync(persistenceId, criteria); + + if(DebugEnabled) + _log.Info("Completed intercept snapshot {0} deletion using interceptor {1}", persistenceId, _deleteInterceptor.GetType().Name); + await base.DeleteAsync(persistenceId, criteria); } - static SnapshotSelectionCriteria ToSelectionCriteria(SnapshotMetadata metadata) + private static SnapshotSelectionCriteria ToSelectionCriteria(SnapshotMetadata metadata) => new(metadata.SequenceNr, metadata.Timestamp, metadata.SequenceNr, metadata.Timestamp); /// diff --git a/src/core/Akka.Persistence.TestKit/config.conf b/src/core/Akka.Persistence.TestKit/config.conf index a416e0fb7ca..f03792b2baa 100644 --- a/src/core/Akka.Persistence.TestKit/config.conf +++ b/src/core/Akka.Persistence.TestKit/config.conf @@ -20,6 +20,9 @@ akka { test { class = "Akka.Persistence.TestKit.TestSnapshotStore, Akka.Persistence.TestKit" plugin-dispatcher = "akka.actor.default-dispatcher" + + # enables debug mode, which adds verbose logging to each of the TestSnapshotStore stages + debug = false } } } From 852375f3c1a79e00b630515ed1a7a712f3e70d44 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 23 Jan 2024 19:11:57 -0600 Subject: [PATCH 13/37] attempting to fix some continuation mess inside the snapshot store --- .../Snapshot/SnapshotStore.cs | 127 +++++++++++------- 1 file changed, 77 insertions(+), 50 deletions(-) diff --git a/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs b/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs index 8474c1ab4bb..bcb1c15d219 100644 --- a/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs +++ b/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs @@ -32,7 +32,8 @@ protected SnapshotStore() var extension = Persistence.Instance.Apply(Context.System); if (extension == null) { - throw new ArgumentException("Couldn't initialize SnapshotStore instance, because associated Persistence extension has not been used in current actor system context."); + throw new ArgumentException( + "Couldn't initialize SnapshotStore instance, because associated Persistence extension has not been used in current actor system context."); } _publish = extension.Settings.Internal.PublishPluginCommands; @@ -57,35 +58,11 @@ private bool ReceiveSnapshotStore(object message) if (message is LoadSnapshot loadSnapshot) { - if (loadSnapshot.Criteria == SnapshotSelectionCriteria.None) - { - senderPersistentActor.Tell(new LoadSnapshotResult(null, loadSnapshot.ToSequenceNr)); - } - else - { - _breaker.WithCircuitBreaker(() => LoadAsync(loadSnapshot.PersistenceId, loadSnapshot.Criteria.Limit(loadSnapshot.ToSequenceNr))) - .ContinueWith(t => (!t.IsFaulted && !t.IsCanceled) - ? new LoadSnapshotResult(t.Result, loadSnapshot.ToSequenceNr) as ISnapshotResponse - : new LoadSnapshotFailed(t.IsFaulted - ? TryUnwrapException(t.Exception) - : new OperationCanceledException("LoadAsync canceled, possibly due to timing out.")), - _continuationOptions) - .PipeTo(senderPersistentActor); - } + LoadSnapshotAsync(loadSnapshot, senderPersistentActor); } else if (message is SaveSnapshot saveSnapshot) { - var metadata = new SnapshotMetadata(saveSnapshot.Metadata.PersistenceId, saveSnapshot.Metadata.SequenceNr, DateTime.UtcNow); - - _breaker.WithCircuitBreaker(() => SaveAsync(metadata, saveSnapshot.Snapshot)) - .ContinueWith(t => (!t.IsFaulted && !t.IsCanceled) - ? new SaveSnapshotSuccess(metadata) as ISnapshotResponse - : new SaveSnapshotFailure(saveSnapshot.Metadata, - t.IsFaulted - ? TryUnwrapException(t.Exception) - : new OperationCanceledException("SaveAsync canceled, possibly due to timing out.")), - _continuationOptions) - .PipeTo(self, senderPersistentActor); + SaveSnapshotAsync(saveSnapshot, self, senderPersistentActor); } else if (message is SaveSnapshotSuccess) { @@ -103,7 +80,7 @@ private bool ReceiveSnapshotStore(object message) try { ReceivePluginInternal(message); - _breaker.WithCircuitBreaker(() => DeleteAsync(saveSnapshotFailure.Metadata)); + _breaker.WithCircuitBreaker((msg: saveSnapshotFailure, ss: this), state => state.ss.DeleteAsync(state.msg.Metadata)); } finally { @@ -112,21 +89,7 @@ private bool ReceiveSnapshotStore(object message) } else if (message is DeleteSnapshot deleteSnapshot) { - var eventStream = Context.System.EventStream; - _breaker.WithCircuitBreaker(() => DeleteAsync(deleteSnapshot.Metadata)) - .ContinueWith(t => (!t.IsFaulted && !t.IsCanceled) - ? new DeleteSnapshotSuccess(deleteSnapshot.Metadata) as ISnapshotResponse - : new DeleteSnapshotFailure(deleteSnapshot.Metadata, - t.IsFaulted - ? TryUnwrapException(t.Exception) - : new OperationCanceledException("DeleteAsync canceled, possibly due to timing out.")), - _continuationOptions) - .PipeTo(self, senderPersistentActor) - .ContinueWith(_ => - { - if (_publish) - eventStream.Publish(message); - }, _continuationOptions); + DeleteSnapshotAsync(deleteSnapshot, self, senderPersistentActor); } else if (message is DeleteSnapshotSuccess) { @@ -155,12 +118,13 @@ private bool ReceiveSnapshotStore(object message) var eventStream = Context.System.EventStream; _breaker.WithCircuitBreaker(() => DeleteAsync(deleteSnapshots.PersistenceId, deleteSnapshots.Criteria)) .ContinueWith(t => (!t.IsFaulted && !t.IsCanceled) - ? new DeleteSnapshotsSuccess(deleteSnapshots.Criteria) as ISnapshotResponse - : new DeleteSnapshotsFailure(deleteSnapshots.Criteria, - t.IsFaulted - ? TryUnwrapException(t.Exception) - : new OperationCanceledException("DeleteAsync canceled, possibly due to timing out.")), - _continuationOptions) + ? new DeleteSnapshotsSuccess(deleteSnapshots.Criteria) as ISnapshotResponse + : new DeleteSnapshotsFailure(deleteSnapshots.Criteria, + t.IsFaulted + ? TryUnwrapException(t.Exception) + : new OperationCanceledException( + "DeleteAsync canceled, possibly due to timing out.")), + _continuationOptions) .PipeTo(self, senderPersistentActor) .ContinueWith(_ => { @@ -191,9 +155,71 @@ private bool ReceiveSnapshotStore(object message) } } else return false; + return true; } + private async Task DeleteSnapshotAsync(DeleteSnapshot deleteSnapshot, IActorRef self, + IActorRef senderPersistentActor) + { + var eventStream = Context.System.EventStream; + + try + { + await _breaker.WithCircuitBreaker((msg: deleteSnapshot, ss: this), + state => state.ss.DeleteAsync(state.msg.Metadata)); + + self.Tell(new DeleteSnapshotSuccess(deleteSnapshot.Metadata), senderPersistentActor); + } + catch (Exception ex) + { + self.Tell(new DeleteSnapshotFailure(deleteSnapshot.Metadata, ex), senderPersistentActor); + } + + if (_publish) + eventStream.Publish(deleteSnapshot); + } + + private async Task SaveSnapshotAsync(SaveSnapshot saveSnapshot, IActorRef self, IActorRef senderPersistentActor) + { + var metadata = new SnapshotMetadata(saveSnapshot.Metadata.PersistenceId, + saveSnapshot.Metadata.SequenceNr, DateTime.UtcNow); + + try + { + await _breaker.WithCircuitBreaker((msg: metadata, save: saveSnapshot, ss: this), + state => state.ss.SaveAsync(state.msg, state.save)); + self.Tell(new SaveSnapshotSuccess(metadata), senderPersistentActor); + } + catch (Exception ex) + { + self.Tell(new SaveSnapshotFailure(metadata, ex), senderPersistentActor); + } + } + + private async Task LoadSnapshotAsync(LoadSnapshot loadSnapshot, IActorRef senderPersistentActor) + { + if (loadSnapshot.Criteria == SnapshotSelectionCriteria.None) + { + senderPersistentActor.Tell(new LoadSnapshotResult(null, loadSnapshot.ToSequenceNr)); + } + else + { + try + { + var result = await _breaker.WithCircuitBreaker((msg: loadSnapshot, ss: this), + state => state.ss.LoadAsync(state.msg.PersistenceId, + state.msg.Criteria.Limit(state.msg.ToSequenceNr))); + + senderPersistentActor.Tell(new LoadSnapshotResult(result, loadSnapshot.ToSequenceNr)); + } + catch (Exception ex) + { + senderPersistentActor.Tell(new LoadSnapshotFailed(ex)); + } + } + } + private Exception TryUnwrapException(Exception e) { var aggregateException = e as AggregateException; @@ -203,6 +229,7 @@ private Exception TryUnwrapException(Exception e) if (aggregateException.InnerExceptions.Count == 1) return aggregateException.InnerExceptions[0]; } + return e; } @@ -256,4 +283,4 @@ protected virtual bool ReceivePluginInternal(object message) return false; } } -} +} \ No newline at end of file From 0de3cc9ecf420fb9c7197ef352efe216f9c5e2f0 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 23 Jan 2024 19:16:49 -0600 Subject: [PATCH 14/37] fixed final cases --- .../Snapshot/SnapshotStore.cs | 195 +++++++++--------- 1 file changed, 100 insertions(+), 95 deletions(-) diff --git a/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs b/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs index bcb1c15d219..67a6e333925 100644 --- a/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs +++ b/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs @@ -55,108 +55,113 @@ private bool ReceiveSnapshotStore(object message) { var senderPersistentActor = Sender; // Sender is PersistentActor var self = Self; //Self MUST BE CLOSED OVER here, or the code below will be subject to race conditions - - if (message is LoadSnapshot loadSnapshot) - { - LoadSnapshotAsync(loadSnapshot, senderPersistentActor); - } - else if (message is SaveSnapshot saveSnapshot) - { - SaveSnapshotAsync(saveSnapshot, self, senderPersistentActor); - } - else if (message is SaveSnapshotSuccess) - { - try - { - ReceivePluginInternal(message); - } - finally - { - senderPersistentActor.Tell(message); - } - } - else if (message is SaveSnapshotFailure saveSnapshotFailure) - { - try - { - ReceivePluginInternal(message); - _breaker.WithCircuitBreaker((msg: saveSnapshotFailure, ss: this), state => state.ss.DeleteAsync(state.msg.Metadata)); - } - finally - { - senderPersistentActor.Tell(message); - } - } - else if (message is DeleteSnapshot deleteSnapshot) +#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + switch (message) { - DeleteSnapshotAsync(deleteSnapshot, self, senderPersistentActor); - } - else if (message is DeleteSnapshotSuccess) - { - try - { - ReceivePluginInternal(message); - } - finally - { - senderPersistentActor.Tell(message); - } - } - else if (message is DeleteSnapshotFailure) - { - try - { - ReceivePluginInternal(message); - } - finally - { - senderPersistentActor.Tell(message); - } - } - else if (message is DeleteSnapshots deleteSnapshots) - { - var eventStream = Context.System.EventStream; - _breaker.WithCircuitBreaker(() => DeleteAsync(deleteSnapshots.PersistenceId, deleteSnapshots.Criteria)) - .ContinueWith(t => (!t.IsFaulted && !t.IsCanceled) - ? new DeleteSnapshotsSuccess(deleteSnapshots.Criteria) as ISnapshotResponse - : new DeleteSnapshotsFailure(deleteSnapshots.Criteria, - t.IsFaulted - ? TryUnwrapException(t.Exception) - : new OperationCanceledException( - "DeleteAsync canceled, possibly due to timing out.")), - _continuationOptions) - .PipeTo(self, senderPersistentActor) - .ContinueWith(_ => + case LoadSnapshot loadSnapshot: + + LoadSnapshotAsync(loadSnapshot, senderPersistentActor); + break; + case SaveSnapshot saveSnapshot: + SaveSnapshotAsync(saveSnapshot, self, senderPersistentActor); + break; + case SaveSnapshotSuccess: + try { - if (_publish) - eventStream.Publish(message); - }, _continuationOptions); + ReceivePluginInternal(message); + } + finally + { + senderPersistentActor.Tell(message); + } + + break; + case SaveSnapshotFailure saveSnapshotFailure: + try + { + ReceivePluginInternal(message); + _breaker.WithCircuitBreaker((msg: saveSnapshotFailure, ss: this), state => state.ss.DeleteAsync(state.msg.Metadata)); + } + finally + { + senderPersistentActor.Tell(message); + } + + break; + case DeleteSnapshot deleteSnapshot: + DeleteSnapshotAsync(deleteSnapshot, self, senderPersistentActor); + break; + case DeleteSnapshotSuccess: + try + { + ReceivePluginInternal(message); + } + finally + { + senderPersistentActor.Tell(message); + } + + break; + case DeleteSnapshotFailure: + try + { + ReceivePluginInternal(message); + } + finally + { + senderPersistentActor.Tell(message); + } + + break; + case DeleteSnapshots deleteSnapshots: + DeleteSnapshotsAsync(deleteSnapshots, self, senderPersistentActor); + break; + case DeleteSnapshotsSuccess: + try + { + ReceivePluginInternal(message); + } + finally + { + senderPersistentActor.Tell(message); + } + + break; + case DeleteSnapshotsFailure: + try + { + ReceivePluginInternal(message); + } + finally + { + senderPersistentActor.Tell(message); + } + + break; + default: + return false; } - else if (message is DeleteSnapshotsSuccess) +#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + return true; + } + + private async Task DeleteSnapshotsAsync(DeleteSnapshots deleteSnapshots, IActorRef self, IActorRef senderPersistentActor) + { + var eventStream = Context.System.EventStream; + try { - try - { - ReceivePluginInternal(message); - } - finally - { - senderPersistentActor.Tell(message); - } + await _breaker.WithCircuitBreaker((msg: deleteSnapshots, ss: this), + state => state.ss.DeleteAsync(state.msg.PersistenceId, state.msg.Criteria)); + + self.Tell(new DeleteSnapshotsSuccess(deleteSnapshots.Criteria), senderPersistentActor); } - else if (message is DeleteSnapshotsFailure) + catch (Exception ex) { - try - { - ReceivePluginInternal(message); - } - finally - { - senderPersistentActor.Tell(message); - } + self.Tell(new DeleteSnapshotsFailure(deleteSnapshots.Criteria, ex), senderPersistentActor); } - else return false; - - return true; + + if (_publish) + eventStream.Publish(deleteSnapshots); } private async Task DeleteSnapshotAsync(DeleteSnapshot deleteSnapshot, IActorRef self, From ef22f501cbd0d69fb65de5d46985c744fcfb4b7b Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 23 Jan 2024 19:17:05 -0600 Subject: [PATCH 15/37] formatting --- src/core/Akka.Persistence/Snapshot/SnapshotStore.cs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs b/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs index 67a6e333925..bb0c7a6ef81 100644 --- a/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs +++ b/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs @@ -80,7 +80,8 @@ private bool ReceiveSnapshotStore(object message) try { ReceivePluginInternal(message); - _breaker.WithCircuitBreaker((msg: saveSnapshotFailure, ss: this), state => state.ss.DeleteAsync(state.msg.Metadata)); + _breaker.WithCircuitBreaker((msg: saveSnapshotFailure, ss: this), + state => state.ss.DeleteAsync(state.msg.Metadata)); } finally { @@ -145,7 +146,8 @@ private bool ReceiveSnapshotStore(object message) return true; } - private async Task DeleteSnapshotsAsync(DeleteSnapshots deleteSnapshots, IActorRef self, IActorRef senderPersistentActor) + private async Task DeleteSnapshotsAsync(DeleteSnapshots deleteSnapshots, IActorRef self, + IActorRef senderPersistentActor) { var eventStream = Context.System.EventStream; try @@ -159,7 +161,7 @@ await _breaker.WithCircuitBreaker((msg: deleteSnapshots, ss: this), { self.Tell(new DeleteSnapshotsFailure(deleteSnapshots.Criteria, ex), senderPersistentActor); } - + if (_publish) eventStream.Publish(deleteSnapshots); } @@ -168,7 +170,7 @@ private async Task DeleteSnapshotAsync(DeleteSnapshot deleteSnapshot, IActorRef IActorRef senderPersistentActor) { var eventStream = Context.System.EventStream; - + try { await _breaker.WithCircuitBreaker((msg: deleteSnapshot, ss: this), @@ -180,7 +182,7 @@ await _breaker.WithCircuitBreaker((msg: deleteSnapshot, ss: this), { self.Tell(new DeleteSnapshotFailure(deleteSnapshot.Metadata, ex), senderPersistentActor); } - + if (_publish) eventStream.Publish(deleteSnapshot); } From a33110a832e5792fd8527aa1b1da4398356ef77f Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 23 Jan 2024 19:44:55 -0600 Subject: [PATCH 16/37] fixed snapshot saving errors --- .../LocalSnapshotStoreSpec.cs | 13 +++++++++---- src/core/Akka.Persistence/Snapshot/SnapshotStore.cs | 2 +- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/src/core/Akka.Persistence.TCK.Tests/LocalSnapshotStoreSpec.cs b/src/core/Akka.Persistence.TCK.Tests/LocalSnapshotStoreSpec.cs index b728d354816..4c5866915f7 100644 --- a/src/core/Akka.Persistence.TCK.Tests/LocalSnapshotStoreSpec.cs +++ b/src/core/Akka.Persistence.TCK.Tests/LocalSnapshotStoreSpec.cs @@ -46,10 +46,15 @@ public void LocalSnapshotStore_can_snapshot_actors_with_PersistenceId_containing ExpectMsg(); SnapshotStore.Tell(new LoadSnapshot(pid, SnapshotSelectionCriteria.Latest, long.MaxValue), TestActor); - ExpectMsg(res => - res.Snapshot.Snapshot.Equals("sample data") - && res.Snapshot.Metadata.PersistenceId == pid - && res.Snapshot.Metadata.SequenceNr == 1); + ExpectMsg(IsMessage); + bool IsMessage(LoadSnapshotResult res) + { + var result = res.Snapshot.Snapshot.Equals("sample data") + && res.Snapshot.Metadata.PersistenceId == pid + && res.Snapshot.Metadata.SequenceNr == 1; + + return result; + } } } } diff --git a/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs b/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs index bb0c7a6ef81..158b94ca2b8 100644 --- a/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs +++ b/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs @@ -195,7 +195,7 @@ private async Task SaveSnapshotAsync(SaveSnapshot saveSnapshot, IActorRef self, try { await _breaker.WithCircuitBreaker((msg: metadata, save: saveSnapshot, ss: this), - state => state.ss.SaveAsync(state.msg, state.save)); + state => state.ss.SaveAsync(state.msg, state.save.Snapshot)); self.Tell(new SaveSnapshotSuccess(metadata), senderPersistentActor); } catch (Exception ex) From b8a7245a26106bc625586a1d70ef31456a9a4cb7 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 24 Jan 2024 07:54:20 -0600 Subject: [PATCH 17/37] enable DEBUG logging --- src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs b/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs index a590aa2e3e7..48840062f54 100644 --- a/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs +++ b/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs @@ -26,6 +26,7 @@ public class Bug4762FixSpec : PersistenceTestKit // create a Config that enables debug mode on the TestJournal private static readonly Config Config = ConfigurationFactory.ParseString(""" + akka.loglevel = DEBUG akka.persistence.journal.test.debug = on akka.persistence.snapshot-store.test.debug = on """); From 4af443afc4482d551d5b6dfabbf86e27b1a19cb0 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 24 Jan 2024 07:59:09 -0600 Subject: [PATCH 18/37] more debug logging --- src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs b/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs index 48840062f54..d1a2682b5bd 100644 --- a/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs +++ b/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs @@ -73,6 +73,12 @@ protected override void OnCommand(object message) } } + protected override void PreStart() + { + _log.Info("Starting up and beginning recovery"); + base.PreStart(); + } + protected override void OnRecover(object message) { _log.Info("Received recover {0}", message); From 663a88da56e2b00f6aa0b0cac53f7196c1469da8 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Thu, 25 Jan 2024 08:21:02 -0600 Subject: [PATCH 19/37] more debug logging --- .../SnapshotStore/TestSnapshotStore.cs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/core/Akka.Persistence.TestKit/SnapshotStore/TestSnapshotStore.cs b/src/core/Akka.Persistence.TestKit/SnapshotStore/TestSnapshotStore.cs index 2be0704d986..a576a61f2e1 100644 --- a/src/core/Akka.Persistence.TestKit/SnapshotStore/TestSnapshotStore.cs +++ b/src/core/Akka.Persistence.TestKit/SnapshotStore/TestSnapshotStore.cs @@ -33,6 +33,9 @@ public TestSnapshotStore(Config snapshotStoreConfig) protected override bool ReceivePluginInternal(object message) { + if(DebugEnabled) + _log.Info("Received plugin internal message {0}", message); + switch (message) { case UseSaveInterceptor use: From a221b7efe5cd1ff6d36e61fcb66cc917c62620b2 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Thu, 1 Feb 2024 00:43:49 +0700 Subject: [PATCH 20/37] Clean-up and modernize ReplayFilter --- .../Akka.Persistence/Journal/ReplayFilter.cs | 228 ++++++++++-------- 1 file changed, 121 insertions(+), 107 deletions(-) diff --git a/src/core/Akka.Persistence/Journal/ReplayFilter.cs b/src/core/Akka.Persistence/Journal/ReplayFilter.cs index e6fda67e051..476a1521544 100644 --- a/src/core/Akka.Persistence/Journal/ReplayFilter.cs +++ b/src/core/Akka.Persistence/Journal/ReplayFilter.cs @@ -131,28 +131,63 @@ public static Props Props(IActorRef persistentActor, ReplayFilterMode mode, int /// TBD protected override bool Receive(object message) { - if (message is ReplayedMessage value) + switch (message) { - if (DebugEnabled && _log.IsDebugEnabled) - _log.Debug($"Replay: {value.Persistent}"); + case ReplayedMessage value: + if (DebugEnabled && _log.IsDebugEnabled) + _log.Debug($"Replay: {value.Persistent}"); - try - { - if (_buffer.Count == WindowSize) + try { - var msg = _buffer.First; - _buffer.RemoveFirst(); - PersistentActor.Tell(msg.Value, ActorRefs.NoSender); - } + if (_buffer.Count == WindowSize) + { + var msg = _buffer.First; + _buffer.RemoveFirst(); + PersistentActor.Tell(msg.Value, ActorRefs.NoSender); + } - if (value.Persistent.WriterGuid.Equals(_writerUuid)) - { - // from same writer - if (value.Persistent.SequenceNr < _sequenceNr) + if (value.Persistent.WriterGuid.Equals(_writerUuid)) { - var errMsg = $@"Invalid replayed event [sequenceNr={value.Persistent.SequenceNr}, writerUUID={value.Persistent.WriterGuid}] as - the sequenceNr should be equal to or greater than already-processed event [sequenceNr={_sequenceNr}, writerUUID={_writerUuid}] from the same writer, for the same persistenceId [{value.Persistent.PersistenceId}]. - Perhaps, events were journaled out of sequence, or duplicate PersistentId for different entities?"; + // from same writer + if (value.Persistent.SequenceNr < _sequenceNr) + { + var errMsg = + $"Invalid replayed event [sequenceNr={value.Persistent.SequenceNr}, writerUUID={value.Persistent.WriterGuid}] as " + + $"the sequenceNr should be equal to or greater than already-processed event [sequenceNr={_sequenceNr}, " + + $"writerUUID={_writerUuid}] from the same writer, for the same persistenceId [{value.Persistent.PersistenceId}]. " + + "Perhaps, events were journaled out of sequence, or duplicate PersistentId for different entities?"; + LogIssue(errMsg); + switch (Mode) + { + case ReplayFilterMode.RepairByDiscardOld: + //discard + break; + case ReplayFilterMode.Fail: + throw new IllegalStateException(errMsg); + case ReplayFilterMode.Warn: + _buffer.AddLast(value); + break; + case ReplayFilterMode.Disabled: + throw new ArgumentException("Mode must not be Disabled"); + default: + throw new ArgumentException($"Unknown {nameof(ReplayFilterMode)} value: {Mode}"); + } + } + else + { + // note that it is alright with == _sequenceNr, since such may be emitted by EventSeq + _buffer.AddLast(value); + _sequenceNr = value.Persistent.SequenceNr; + } + } + else if (_oldWriters.Contains(value.Persistent.WriterGuid)) + { + // from old writer + var errMsg = + $"Invalid replayed event [sequenceNr={value.Persistent.SequenceNr}, writerUUID={value.Persistent.WriterGuid}]. " + + $"There was already a newer writer whose last replayed event was [sequenceNr={_sequenceNr}, " + + $"writerUUID={_writerUuid}] for the same persistenceId [{value.Persistent.PersistenceId}]. " + + "Perhaps, the old writer kept journaling messages after the new writer created, or duplicate PersistentId for different entities?"; LogIssue(errMsg); switch (Mode) { @@ -166,98 +201,78 @@ protected override bool Receive(object message) break; case ReplayFilterMode.Disabled: throw new ArgumentException("Mode must not be Disabled"); + default: + throw new ArgumentException($"Unknown {nameof(ReplayFilterMode)} value: {Mode}"); } } else { - // note that it is alright with == _sequenceNr, since such may be emitted by EventSeq - _buffer.AddLast(value); + // from new writer + if (!string.IsNullOrEmpty(_writerUuid)) + _oldWriters.AddLast(_writerUuid); + if (_oldWriters.Count > MaxOldWriters) + _oldWriters.RemoveFirst(); + _writerUuid = value.Persistent.WriterGuid; _sequenceNr = value.Persistent.SequenceNr; - } - } - else if (_oldWriters.Contains(value.Persistent.WriterGuid)) - { - // from old writer - var errMsg = $@"Invalid replayed event [sequenceNr={value.Persistent.SequenceNr}, writerUUID={value.Persistent.WriterGuid}]. - There was already a newer writer whose last replayed event was [sequenceNr={_sequenceNr}, writerUUID={_writerUuid}] for the same persistenceId [{value.Persistent.PersistenceId}]. - Perhaps, the old writer kept journaling messages after the new writer created, or duplicate PersistentId for different entities?"; - LogIssue(errMsg); - switch (Mode) - { - case ReplayFilterMode.RepairByDiscardOld: - //discard - break; - case ReplayFilterMode.Fail: - throw new IllegalStateException(errMsg); - case ReplayFilterMode.Warn: - _buffer.AddLast(value); - break; - case ReplayFilterMode.Disabled: - throw new ArgumentException("Mode must not be Disabled"); - } - } - else - { - // from new writer - if (!string.IsNullOrEmpty(_writerUuid)) - _oldWriters.AddLast(_writerUuid); - if (_oldWriters.Count > MaxOldWriters) - _oldWriters.RemoveFirst(); - _writerUuid = value.Persistent.WriterGuid; - _sequenceNr = value.Persistent.SequenceNr; - // clear the buffer from messages from other writers with higher SequenceNr - var node = _buffer.First; - while (node != null) - { - var next = node.Next; - var msg = node.Value; - if (msg.Persistent.SequenceNr >= _sequenceNr) + // clear the buffer from messages from other writers with higher SequenceNr + var node = _buffer.First; + while (node != null) { - var errMsg = $@"Invalid replayed event [sequenceNr=${value.Persistent.SequenceNr}, writerUUID=${value.Persistent.WriterGuid}] from a new writer. - An older writer already sent an event [sequenceNr=${msg.Persistent.SequenceNr}, writerUUID=${msg.Persistent.WriterGuid}] whose sequence number was equal or greater for the same persistenceId [${value.Persistent.PersistenceId}]. - Perhaps, the new writer journaled the event out of sequence, or duplicate PersistentId for different entities?"; - LogIssue(errMsg); - switch (Mode) + var next = node.Next; + var msg = node.Value; + if (msg.Persistent.SequenceNr >= _sequenceNr) { - case ReplayFilterMode.RepairByDiscardOld: - _buffer.Remove(node); - //discard - break; - case ReplayFilterMode.Fail: - throw new IllegalStateException(errMsg); - case ReplayFilterMode.Warn: - // keep - break; - case ReplayFilterMode.Disabled: - throw new ArgumentException("Mode must not be Disabled"); + var errMsg = + $"Invalid replayed event [sequenceNr=${value.Persistent.SequenceNr}, writerUUID=${value.Persistent.WriterGuid}] from a new writer. " + + $"An older writer already sent an event [sequenceNr=${msg.Persistent.SequenceNr}, " + + $"writerUUID=${msg.Persistent.WriterGuid}] whose sequence number was equal or greater " + + $"for the same persistenceId [${value.Persistent.PersistenceId}]. " + + "Perhaps, the new writer journaled the event out of sequence, or duplicate PersistentId for different entities?"; + LogIssue(errMsg); + switch (Mode) + { + case ReplayFilterMode.RepairByDiscardOld: + _buffer.Remove(node); + //discard + break; + case ReplayFilterMode.Fail: + throw new IllegalStateException(errMsg); + case ReplayFilterMode.Warn: + // keep + break; + case ReplayFilterMode.Disabled: + throw new ArgumentException("Mode must not be Disabled"); + default: + throw new ArgumentException($"Unknown {nameof(ReplayFilterMode)} value: {Mode}"); + } } + node = next; } - node = next; + _buffer.AddLast(value); } - _buffer.AddLast(value); } + catch (IllegalStateException ex) + { + if (Mode == ReplayFilterMode.Fail) + Fail(ex); + else + throw; + } + return true; + + case RecoverySuccess or ReplayMessagesFailure: + if (DebugEnabled) + _log.Debug($"Replay completed: {message}"); - } - catch (IllegalStateException ex) - { - if (Mode == ReplayFilterMode.Fail) - Fail(ex); - else - throw; - } - } - else if (message is RecoverySuccess or ReplayMessagesFailure) - { - if (DebugEnabled) - _log.Debug($"Replay completed: {message}"); - - SendBuffered(); - PersistentActor.Tell(message, ActorRefs.NoSender); - Context.Stop(Self); + SendBuffered(); + PersistentActor.Tell(message, ActorRefs.NoSender); + Context.Stop(Self); + return true; + + default: + return false; } - else return false; - return true; } private void SendBuffered() @@ -282,6 +297,8 @@ private void LogIssue(string errMsg) break; case ReplayFilterMode.Disabled: throw new ArgumentException("mode must not be Disabled"); + default: + throw new ArgumentException($"Unknown {nameof(ReplayFilterMode)}: {Mode} value. Message: {errMsg}"); } } @@ -291,20 +308,17 @@ private void Fail(IllegalStateException cause) PersistentActor.Tell(new ReplayMessagesFailure(cause), ActorRefs.NoSender); Context.Become(message => { - if (message is ReplayedMessage) - { - // discard - } - else if (message is RecoverySuccess or ReplayMessagesFailure) - { - Context.Stop(Self); - } - else + switch (message) { - return false; + case ReplayedMessage: + // discard + return true; + case RecoverySuccess or ReplayMessagesFailure: + Context.Stop(Self); + return true; + default: + return false; } - - return true; }); } } From f963565ca284a87d082899ef8ad776576ea2d8e6 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Thu, 1 Feb 2024 00:44:34 +0700 Subject: [PATCH 21/37] Fix configuration, make sure that default values are sensible --- src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs b/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs index d3f88339d93..b08088208a8 100644 --- a/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs +++ b/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs @@ -54,9 +54,9 @@ protected AsyncWriteJournal() var config = extension.ConfigFor(Self); _breaker = new CircuitBreaker( Context.System.Scheduler, - config.GetInt("circuit-breaker.max-failures", 0), - config.GetTimeSpan("circuit-breaker.call-timeout", null), - config.GetTimeSpan("circuit-breaker.reset-timeout", null)); + config.GetInt("circuit-breaker.max-failures", 10), + config.GetTimeSpan("circuit-breaker.call-timeout", TimeSpan.FromSeconds(10)), + config.GetTimeSpan("circuit-breaker.reset-timeout", TimeSpan.FromSeconds(30))); var replayFilterMode = config.GetString("replay-filter.mode", "").ToLowerInvariant(); switch (replayFilterMode) @@ -77,8 +77,8 @@ protected AsyncWriteJournal() throw new ConfigurationException($"Invalid replay-filter.mode [{replayFilterMode}], supported values [off, repair-by-discard-old, fail, warn]"); } _isReplayFilterEnabled = _replayFilterMode != ReplayFilterMode.Disabled; - _replayFilterWindowSize = config.GetInt("replay-filter.window-size", 0); - _replayFilterMaxOldWriters = config.GetInt("replay-filter.max-old-writers", 0); + _replayFilterWindowSize = config.GetInt("replay-filter.window-size", 100); + _replayFilterMaxOldWriters = config.GetInt("replay-filter.max-old-writers", 10); _replayDebugEnabled = config.GetBoolean("replay-filter.debug", false); _resequencer = Context.System.ActorOf(Props.Create(() => new Resequencer())); From 7788e726e9b383876e1da88b8a13b1ee68156d94 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Thu, 1 Feb 2024 00:46:08 +0700 Subject: [PATCH 22/37] Fix journal interceptor, make sure that Ask operation are short enough so that it wouldn't be masked by other exceptions --- .../Journal/JournalRecoveryBehaviorSetter.cs | 2 +- .../Journal/JournalWriteBehaviorSetter.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/Akka.Persistence.TestKit/Journal/JournalRecoveryBehaviorSetter.cs b/src/core/Akka.Persistence.TestKit/Journal/JournalRecoveryBehaviorSetter.cs index b9cc55c534c..ffc76445643 100644 --- a/src/core/Akka.Persistence.TestKit/Journal/JournalRecoveryBehaviorSetter.cs +++ b/src/core/Akka.Persistence.TestKit/Journal/JournalRecoveryBehaviorSetter.cs @@ -26,7 +26,7 @@ internal JournalRecoveryBehaviorSetter(IActorRef journal) public Task SetInterceptorAsync(IJournalInterceptor interceptor) => _journal.Ask( new TestJournal.UseRecoveryInterceptor(interceptor), - TimeSpan.FromSeconds(3) + TimeSpan.FromSeconds(0.5) ); } } diff --git a/src/core/Akka.Persistence.TestKit/Journal/JournalWriteBehaviorSetter.cs b/src/core/Akka.Persistence.TestKit/Journal/JournalWriteBehaviorSetter.cs index 7e554be403a..ed9f2e67a60 100644 --- a/src/core/Akka.Persistence.TestKit/Journal/JournalWriteBehaviorSetter.cs +++ b/src/core/Akka.Persistence.TestKit/Journal/JournalWriteBehaviorSetter.cs @@ -26,7 +26,7 @@ internal JournalWriteBehaviorSetter(IActorRef journal) public Task SetInterceptorAsync(IJournalInterceptor interceptor) => _journal.Ask( new TestJournal.UseWriteInterceptor(interceptor), - TimeSpan.FromSeconds(3) + TimeSpan.FromSeconds(0.5) ); } } From aeb68c85871a1c65b4bd55fa331f00968740e5e0 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Thu, 1 Feb 2024 00:46:36 +0700 Subject: [PATCH 23/37] Turn on relpay filter debug mode --- src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs b/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs index d1a2682b5bd..f89bab5472c 100644 --- a/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs +++ b/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs @@ -28,6 +28,7 @@ public class Bug4762FixSpec : PersistenceTestKit ConfigurationFactory.ParseString(""" akka.loglevel = DEBUG akka.persistence.journal.test.debug = on + akka.persistence.journal.test.replay-filter.debug = on akka.persistence.snapshot-store.test.debug = on """); From afc24df297b0ab7420d80599b6291af15949f9b7 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Thu, 1 Feb 2024 01:20:05 +0700 Subject: [PATCH 24/37] Bump timeout, testing if recovery is delayed or permanently stuck --- src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs b/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs index f89bab5472c..8d4b0f7890d 100644 --- a/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs +++ b/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs @@ -13,6 +13,7 @@ using Akka.Actor; using Akka.Configuration; using Akka.Event; +using FluentAssertions.Extensions; using Xunit; using Xunit.Abstractions; @@ -98,7 +99,7 @@ public Task TestJournal_PersistAll_should_only_count_each_event_exceptions_once( var command = new WriteMessage(); actor.Tell(command, actor); - await probe.ExpectMsgAsync(); + await probe.ExpectMsgAsync(10.Seconds()); await probe.ExpectMsgAsync(); await probe.ExpectMsgAsync(); await probe.ExpectNoMsgAsync(3000); From 8f84ff5079fef870b653fa3420e70229397021cf Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Thu, 1 Feb 2024 23:48:36 +0700 Subject: [PATCH 25/37] Fix AsyncWriteJournal replay handler not closing over self --- src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs b/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs index b08088208a8..d4103bfee61 100644 --- a/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs +++ b/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs @@ -251,6 +251,7 @@ private void HandleReplayMessages(ReplayMessages message) ? Context.ActorOf(ReplayFilter.Props(message.PersistentActor, _replayFilterMode, _replayFilterWindowSize, _replayFilterMaxOldWriters, _replayDebugEnabled)) : message.PersistentActor; + var self = Context.Self; var context = Context; var eventStream = Context.System.EventStream; @@ -294,18 +295,18 @@ await ReplayMessagesAsync(context, message.PersistenceId, message.FromSequenceNr // operation failed because a CancellationToken was invoked // wrap the original exception and throw it, with some additional callsite context var newEx = new OperationCanceledException("ReplayMessagesAsync canceled, possibly due to timing out.", cx); - replyTo.Tell(new ReplayMessagesFailure(newEx)); + replyTo.Tell(new ReplayMessagesFailure(newEx), self); } catch (Exception ex) { - replyTo.Tell(new ReplayMessagesFailure(TryUnwrapException(ex))); + replyTo.Tell(new ReplayMessagesFailure(TryUnwrapException(ex)), self); } return; void CompleteHighSeqNo(long highSeqNo) { - replyTo.Tell(new RecoverySuccess(highSeqNo)); + replyTo.Tell(new RecoverySuccess(highSeqNo), self); if (CanPublish) { From 68e2192bd869721135c7bd975b650e320349f536 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Fri, 2 Feb 2024 00:01:17 +0700 Subject: [PATCH 26/37] Fix SnapshotStore snapshot load not closing over self --- src/core/Akka.Persistence/Snapshot/SnapshotStore.cs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs b/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs index 158b94ca2b8..fbd0693e652 100644 --- a/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs +++ b/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs @@ -60,7 +60,7 @@ private bool ReceiveSnapshotStore(object message) { case LoadSnapshot loadSnapshot: - LoadSnapshotAsync(loadSnapshot, senderPersistentActor); + LoadSnapshotAsync(loadSnapshot, self, senderPersistentActor); break; case SaveSnapshot saveSnapshot: SaveSnapshotAsync(saveSnapshot, self, senderPersistentActor); @@ -204,11 +204,11 @@ await _breaker.WithCircuitBreaker((msg: metadata, save: saveSnapshot, ss: this), } } - private async Task LoadSnapshotAsync(LoadSnapshot loadSnapshot, IActorRef senderPersistentActor) + private async Task LoadSnapshotAsync(LoadSnapshot loadSnapshot, IActorRef self, IActorRef senderPersistentActor) { if (loadSnapshot.Criteria == SnapshotSelectionCriteria.None) { - senderPersistentActor.Tell(new LoadSnapshotResult(null, loadSnapshot.ToSequenceNr)); + senderPersistentActor.Tell(new LoadSnapshotResult(null, loadSnapshot.ToSequenceNr), self); } else { @@ -218,11 +218,11 @@ private async Task LoadSnapshotAsync(LoadSnapshot loadSnapshot, IActorRef sender state => state.ss.LoadAsync(state.msg.PersistenceId, state.msg.Criteria.Limit(state.msg.ToSequenceNr))); - senderPersistentActor.Tell(new LoadSnapshotResult(result, loadSnapshot.ToSequenceNr)); + senderPersistentActor.Tell(new LoadSnapshotResult(result, loadSnapshot.ToSequenceNr), self); } catch (Exception ex) { - senderPersistentActor.Tell(new LoadSnapshotFailed(ex)); + senderPersistentActor.Tell(new LoadSnapshotFailed(ex), self); } } } From 5157e05cfd9298df41fd3d7394515b9e2a10d546 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Fri, 2 Feb 2024 01:22:29 +0700 Subject: [PATCH 27/37] #7068 - Examining CircuitBreaker timings --- .../Snapshot/SnapshotStore.cs | 23 +++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs b/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs index fbd0693e652..242e6b073e1 100644 --- a/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs +++ b/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs @@ -8,6 +8,7 @@ using System; using System.Threading.Tasks; using Akka.Actor; +using Akka.Event; using Akka.Pattern; namespace Akka.Persistence.Snapshot @@ -20,6 +21,8 @@ public abstract class SnapshotStore : ActorBase private readonly TaskContinuationOptions _continuationOptions = TaskContinuationOptions.ExecuteSynchronously; private readonly bool _publish; private readonly CircuitBreaker _breaker; + private readonly ILoggingAdapter _log; + private readonly bool _debugEnabled; /// /// Initializes a new instance of the class. @@ -43,6 +46,9 @@ protected SnapshotStore() config.GetInt("circuit-breaker.max-failures", 10), config.GetTimeSpan("circuit-breaker.call-timeout", TimeSpan.FromSeconds(10)), config.GetTimeSpan("circuit-breaker.reset-timeout", TimeSpan.FromSeconds(30))); + + _debugEnabled = config.GetBoolean("debug"); + _log = Context.GetLogger(); } /// @@ -59,6 +65,8 @@ private bool ReceiveSnapshotStore(object message) switch (message) { case LoadSnapshot loadSnapshot: + if(_debugEnabled) + _log.Info($"{nameof(LoadSnapshot)} message received."); LoadSnapshotAsync(loadSnapshot, self, senderPersistentActor); break; @@ -214,9 +222,20 @@ private async Task LoadSnapshotAsync(LoadSnapshot loadSnapshot, IActorRef self, { try { + if(_debugEnabled) + _log.Info($"Starting {nameof(LoadSnapshotAsync)} circuit breaker."); + var result = await _breaker.WithCircuitBreaker((msg: loadSnapshot, ss: this), - state => state.ss.LoadAsync(state.msg.PersistenceId, - state.msg.Criteria.Limit(state.msg.ToSequenceNr))); + state => + { + if(_debugEnabled) + _log.Info($"Invoking {nameof(LoadAsync)} inside circuit breaker."); + + return state.ss.LoadAsync(state.msg.PersistenceId, state.msg.Criteria.Limit(state.msg.ToSequenceNr)); + }); + + if(_debugEnabled) + _log.Info($"{nameof(LoadSnapshotAsync)} circuit breaker completed."); senderPersistentActor.Tell(new LoadSnapshotResult(result, loadSnapshot.ToSequenceNr), self); } From 40bcb3576837e4d5cafd57202f2d713b81397e0d Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Fri, 2 Feb 2024 03:19:40 +0700 Subject: [PATCH 28/37] Measure RecoveryPermitter --- src/core/Akka.Persistence/Eventsourced.cs | 1 + .../Akka.Persistence/RecoveryPermitter.cs | 50 +++++++++++-------- 2 files changed, 29 insertions(+), 22 deletions(-) diff --git a/src/core/Akka.Persistence/Eventsourced.cs b/src/core/Akka.Persistence/Eventsourced.cs index a0befa7741b..d855747ffde 100644 --- a/src/core/Akka.Persistence/Eventsourced.cs +++ b/src/core/Akka.Persistence/Eventsourced.cs @@ -618,6 +618,7 @@ private void StashInternally(object currentMessage) { try { + Log.Info($"Stashing [{currentMessage}]"); _internalStash.Stash(); } catch (StashOverflowException e) diff --git a/src/core/Akka.Persistence/RecoveryPermitter.cs b/src/core/Akka.Persistence/RecoveryPermitter.cs index cc29f40913c..37b6e16fd27 100644 --- a/src/core/Akka.Persistence/RecoveryPermitter.cs +++ b/src/core/Akka.Persistence/RecoveryPermitter.cs @@ -42,6 +42,7 @@ internal class RecoveryPermitter : UntypedActor private readonly ILoggingAdapter Log = Context.GetLogger(); private int _usedPermits; private int _maxPendingStats; + private ILoggingAdapter _log; public static Props Props(int maxPermits) => Actor.Props.Create(() => new RecoveryPermitter(maxPermits)); @@ -51,33 +52,38 @@ public static Props Props(int maxPermits) => public RecoveryPermitter(int maxPermits) { MaxPermits = maxPermits; + _log = Context.GetLogger(); } protected override void OnReceive(object message) { - if (message is RequestRecoveryPermit) + switch (message) { - Context.Watch(Sender); - if (_usedPermits >= MaxPermits) - { - if (pending.Count == 0) - Log.Debug("Exceeded max-concurrent-recoveries [{0}]. First pending {1}", MaxPermits, Sender); - pending.AddLast(Sender); - _maxPendingStats = Math.Max(_maxPendingStats, pending.Count); - } - else - { - RecoveryPermitGranted(Sender); - } - } - else if (message is ReturnRecoveryPermit) - { - ReturnRecoveryPermit(Sender); - } - else if (message is Terminated terminated && !pending.Remove(terminated.ActorRef)) - { - // pre-mature termination should be rare - ReturnRecoveryPermit(terminated.ActorRef); + case RequestRecoveryPermit: + _log.Info($"{nameof(RequestRecoveryPermit)} received: {Sender}"); + Context.Watch(Sender); + if (_usedPermits >= MaxPermits) + { + if (pending.Count == 0) + Log.Debug("Exceeded max-concurrent-recoveries [{0}]. First pending {1}", MaxPermits, Sender); + pending.AddLast(Sender); + _maxPendingStats = Math.Max(_maxPendingStats, pending.Count); + } + else + { + RecoveryPermitGranted(Sender); + _log.Info($"Recovery granted: {Sender}"); + } + break; + + case Akka.Persistence.ReturnRecoveryPermit: + ReturnRecoveryPermit(Sender); + break; + + case Terminated terminated when !pending.Remove(terminated.ActorRef): + // pre-mature termination should be rare + ReturnRecoveryPermit(terminated.ActorRef); + break; } } From 94de668013e078951eb51946da73b66e15f1c365 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Fri, 2 Feb 2024 04:20:54 +0700 Subject: [PATCH 29/37] Investigate recovery timing --- src/core/Akka.Persistence/Eventsourced.Lifecycle.cs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/core/Akka.Persistence/Eventsourced.Lifecycle.cs b/src/core/Akka.Persistence/Eventsourced.Lifecycle.cs index fa1acd2c39a..41715601a35 100644 --- a/src/core/Akka.Persistence/Eventsourced.Lifecycle.cs +++ b/src/core/Akka.Persistence/Eventsourced.Lifecycle.cs @@ -21,7 +21,9 @@ public partial class Eventsourced private void StartRecovery(Recovery recovery) { + Log.Info("Recovery granted"); ChangeState(RecoveryStarted(recovery.ReplayMax)); + Log.Info("Telling SnapshotStore to load snapshot"); LoadSnapshot(SnapshotterId, recovery.FromSnapshot, recovery.ToSequenceNr); } From dbc416523d76b60d58d0fa81389d8824fbb8f152 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Mon, 5 Feb 2024 22:41:48 +0700 Subject: [PATCH 30/37] Try to pinpoint cycle stealing/starvation problem --- .../Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs | 3 ++- src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs | 4 +++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs b/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs index e2d8e2e9445..4e0552f6fa2 100644 --- a/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs +++ b/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs @@ -101,7 +101,8 @@ public Task CounterActor_internal_state_will_be_lost_if_underlying_persistence_s { return WithJournalWrite(write => write.Fail(), async () => { - var counterProps = Props.Create(() => new CounterActor("test")); + var counterProps = Props.Create(() => new CounterActor("test")) + .WithDispatcher("internal-dispatcher"); var actor = ActorOf(counterProps, "counter"); Sys.Log.Info("Messaging actor"); diff --git a/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs b/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs index 8d4b0f7890d..597e313ce3b 100644 --- a/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs +++ b/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs @@ -94,7 +94,9 @@ public Task TestJournal_PersistAll_should_only_count_each_event_exceptions_once( var probe = CreateTestProbe(); return WithJournalWrite(write => write.Pass(), async () => { - var actor = ActorOf(() => new TestActor2(probe)); + var actor = Sys.ActorOf( + Props.Create(() => new TestActor2(probe)) + .WithDispatcher("internal-dispatcher"), "test-actor"); var command = new WriteMessage(); actor.Tell(command, actor); From 3da6e15b53d226688b84b93cfbefb043e33f4588 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Mon, 5 Feb 2024 23:27:56 +0700 Subject: [PATCH 31/37] fix wrong dispatcher name --- src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs | 2 +- src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs b/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs index 4e0552f6fa2..f05ea09b2aa 100644 --- a/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs +++ b/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs @@ -102,7 +102,7 @@ public Task CounterActor_internal_state_will_be_lost_if_underlying_persistence_s return WithJournalWrite(write => write.Fail(), async () => { var counterProps = Props.Create(() => new CounterActor("test")) - .WithDispatcher("internal-dispatcher"); + .WithDispatcher("akka.actor.internal-dispatcher"); var actor = ActorOf(counterProps, "counter"); Sys.Log.Info("Messaging actor"); diff --git a/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs b/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs index 597e313ce3b..4230b5cc3ab 100644 --- a/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs +++ b/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs @@ -96,7 +96,7 @@ public Task TestJournal_PersistAll_should_only_count_each_event_exceptions_once( { var actor = Sys.ActorOf( Props.Create(() => new TestActor2(probe)) - .WithDispatcher("internal-dispatcher"), "test-actor"); + .WithDispatcher("akka.actor.internal-dispatcher"), "test-actor"); var command = new WriteMessage(); actor.Tell(command, actor); From 23992bb7522670c237cfafc1e5971251289dc1da Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Tue, 6 Feb 2024 01:03:39 +0700 Subject: [PATCH 32/37] fix AsyncQueue deadlock --- .../Internal/AsyncPeekableCollection.cs | 2 +- src/core/Akka.TestKit/Internal/AsyncQueue.cs | 180 ++++-------------- 2 files changed, 43 insertions(+), 139 deletions(-) diff --git a/src/core/Akka.TestKit/Internal/AsyncPeekableCollection.cs b/src/core/Akka.TestKit/Internal/AsyncPeekableCollection.cs index d4cd154b31b..4a04af02c26 100644 --- a/src/core/Akka.TestKit/Internal/AsyncPeekableCollection.cs +++ b/src/core/Akka.TestKit/Internal/AsyncPeekableCollection.cs @@ -157,7 +157,7 @@ public Task AddAsync(T item, CancellationToken cancellationToken) /// The item to add. /// A cancellation token that can be used to abort the add operation. public void Add(T item, CancellationToken cancellationToken) - => DoAddAsync(item, cancellationToken, sync: true).WaitAndUnwrapException(CancellationToken.None); + => DoAddAsync(item, cancellationToken, sync: true).WaitAndUnwrapException(cancellationToken); /// /// Adds an item to the producer/consumer collection. Throws diff --git a/src/core/Akka.TestKit/Internal/AsyncQueue.cs b/src/core/Akka.TestKit/Internal/AsyncQueue.cs index b63a8c94491..2ff5365380e 100644 --- a/src/core/Akka.TestKit/Internal/AsyncQueue.cs +++ b/src/core/Akka.TestKit/Internal/AsyncQueue.cs @@ -6,45 +6,51 @@ //----------------------------------------------------------------------- using System; -using System.Collections; +using System.Collections.Concurrent; using System.Collections.Generic; +using System.Linq; using System.Threading; using System.Threading.Tasks; -using Nito.AsyncEx.Synchronous; namespace Akka.TestKit.Internal { public class AsyncQueue: ITestQueue where T: class { - private readonly AsyncPeekableCollection _collection = new(new QueueCollection()); + private readonly ConcurrentQueue _collection = new(); public int Count => _collection.Count; - - public void Enqueue(T item) => EnqueueAsync(item).AsTask().WaitAndUnwrapException(); - public ValueTask EnqueueAsync(T item) => new(_collection.AddAsync(item)); + public void Enqueue(T item) => _collection.Enqueue(item); + + public ValueTask EnqueueAsync(T item) + { + _collection.Enqueue(item); + return new ValueTask(); + } public bool TryEnqueue(T item, int millisecondsTimeout, CancellationToken cancellationToken) { - var task = TryEnqueueAsync(item, millisecondsTimeout, cancellationToken); - task.AsTask().Wait(cancellationToken); - return task.Result; + try + { + _collection.Enqueue(item); + return true; + } + catch + { + return false; + } } - public async ValueTask TryEnqueueAsync(T item, int millisecondsTimeout, CancellationToken cancellationToken) + public ValueTask TryEnqueueAsync(T item, int millisecondsTimeout, CancellationToken cancellationToken) { - using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken)) + try { - cts.CancelAfter(millisecondsTimeout); - try - { - await _collection.AddAsync(item, cts.Token); - return true; - } - catch - { - return false; - } + _collection.Enqueue(item); + return new ValueTask(true); + } + catch + { + return new ValueTask(false); } } @@ -59,7 +65,7 @@ public bool TryTake(out T item, CancellationToken cancellationToken = default) try { // TryRead returns immediately - return _collection.TryTake(out item); + return _collection.TryDequeue(out item); } catch { @@ -84,36 +90,33 @@ public bool TryTake(out T item, int millisecondsTimeout, CancellationToken cance } } - public async ValueTask<(bool success, T item)> TryTakeAsync(CancellationToken cancellationToken) + public ValueTask<(bool success, T item)> TryTakeAsync(CancellationToken cancellationToken) { try { - var result = await _collection.TakeAsync(cancellationToken); - return (true, result); + _collection.TryDequeue(out var result); + return new ValueTask<(bool success, T item)>((true, result)); } catch { - return (false, default); + return new ValueTask<(bool success, T item)>((false, default)); } } public async ValueTask<(bool success, T item)> TryTakeAsync(int millisecondsTimeout, CancellationToken cancellationToken) { - using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken)) - { - cts.CancelAfter(millisecondsTimeout); - return await TryTakeAsync(cts.Token); - } + using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + return await TryTakeAsync(cts.Token); } public T Take(CancellationToken cancellationToken) { - if(!_collection.TryTake(out var item)) + if(!_collection.TryDequeue(out var item)) throw new InvalidOperationException("Failed to dequeue item from the queue."); return item; } - public ValueTask TakeAsync(CancellationToken cancellationToken) => new(_collection.TakeAsync(cancellationToken)); + public ValueTask TakeAsync(CancellationToken cancellationToken) => new(Take(cancellationToken)); public bool TryPeek(out T item) => _collection.TryPeek(out item); @@ -133,16 +136,16 @@ public bool TryPeek(out T item, int millisecondsTimeout, CancellationToken cance } } - public async ValueTask<(bool success, T item)> TryPeekAsync(CancellationToken cancellationToken) + public ValueTask<(bool success, T item)> TryPeekAsync(CancellationToken cancellationToken) { try { - var result = await _collection.PeekAsync(cancellationToken); - return (true, result); + _collection.TryPeek(out var result); + return new ValueTask<(bool success, T item)>((true, result)); } catch { - return (false, default); + return new ValueTask<(bool success, T item)>((false, default)); } } @@ -165,110 +168,11 @@ public T Peek(CancellationToken cancellationToken) return item; } - public ValueTask PeekAsync(CancellationToken cancellationToken) => new(_collection.PeekAsync(cancellationToken)); + public ValueTask PeekAsync(CancellationToken cancellationToken) => new(Peek(cancellationToken)); public List ToList() { - throw new System.NotImplementedException(); + return _collection.ToList(); } - - private class QueueCollection : IPeekableProducerConsumerCollection - { - private readonly Queue _queue = new(); - - public int Count { - get - { - lock (SyncRoot) - { - return _queue.Count; - } - } - } - - public bool TryAdd(T item) - { - lock (SyncRoot) - { - _queue.Enqueue(item); - return true; - } - } - - public bool TryTake(out T item) - { - lock(SyncRoot) - { - if(_queue.Count == 0) - { - item = null; - return false; - } - - item = _queue.Dequeue(); - return true; - } - } - - public bool TryPeek(out T item) - { - lock(SyncRoot) - { - if(_queue.Count == 0) - { - item = null; - return false; - } - - item = _queue.Peek(); - return true; - } - } - - public void CopyTo(T[] array, int index) - { - lock(SyncRoot) - { - _queue.CopyTo(array, index); - } - } - - - public void CopyTo(Array array, int index) - { - lock(SyncRoot) - { - ((ICollection)_queue).CopyTo(array, index); - } - } - - public T[] ToArray() - { - lock(SyncRoot) - { - return _queue.ToArray(); - } - } - - - public IEnumerator GetEnumerator() - { - lock(SyncRoot) - { - //We must create a copy - return new List(_queue).GetEnumerator(); - } - } - - IEnumerator IEnumerable.GetEnumerator() - { - return GetEnumerator(); - } - - public object SyncRoot { get; } = new(); - - public bool IsSynchronized => true; - } } - } From 0ac095751716f436ede28fdc02dcaa5f0ec0ef20 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Tue, 6 Feb 2024 02:06:23 +0700 Subject: [PATCH 33/37] Revert "fix AsyncQueue deadlock" This reverts commit 23992bb7522670c237cfafc1e5971251289dc1da. --- .../Internal/AsyncPeekableCollection.cs | 2 +- src/core/Akka.TestKit/Internal/AsyncQueue.cs | 180 ++++++++++++++---- 2 files changed, 139 insertions(+), 43 deletions(-) diff --git a/src/core/Akka.TestKit/Internal/AsyncPeekableCollection.cs b/src/core/Akka.TestKit/Internal/AsyncPeekableCollection.cs index 4a04af02c26..d4cd154b31b 100644 --- a/src/core/Akka.TestKit/Internal/AsyncPeekableCollection.cs +++ b/src/core/Akka.TestKit/Internal/AsyncPeekableCollection.cs @@ -157,7 +157,7 @@ public Task AddAsync(T item, CancellationToken cancellationToken) /// The item to add. /// A cancellation token that can be used to abort the add operation. public void Add(T item, CancellationToken cancellationToken) - => DoAddAsync(item, cancellationToken, sync: true).WaitAndUnwrapException(cancellationToken); + => DoAddAsync(item, cancellationToken, sync: true).WaitAndUnwrapException(CancellationToken.None); /// /// Adds an item to the producer/consumer collection. Throws diff --git a/src/core/Akka.TestKit/Internal/AsyncQueue.cs b/src/core/Akka.TestKit/Internal/AsyncQueue.cs index 2ff5365380e..b63a8c94491 100644 --- a/src/core/Akka.TestKit/Internal/AsyncQueue.cs +++ b/src/core/Akka.TestKit/Internal/AsyncQueue.cs @@ -6,51 +6,45 @@ //----------------------------------------------------------------------- using System; -using System.Collections.Concurrent; +using System.Collections; using System.Collections.Generic; -using System.Linq; using System.Threading; using System.Threading.Tasks; +using Nito.AsyncEx.Synchronous; namespace Akka.TestKit.Internal { public class AsyncQueue: ITestQueue where T: class { - private readonly ConcurrentQueue _collection = new(); + private readonly AsyncPeekableCollection _collection = new(new QueueCollection()); public int Count => _collection.Count; + + public void Enqueue(T item) => EnqueueAsync(item).AsTask().WaitAndUnwrapException(); - public void Enqueue(T item) => _collection.Enqueue(item); - - public ValueTask EnqueueAsync(T item) - { - _collection.Enqueue(item); - return new ValueTask(); - } + public ValueTask EnqueueAsync(T item) => new(_collection.AddAsync(item)); public bool TryEnqueue(T item, int millisecondsTimeout, CancellationToken cancellationToken) { - try - { - _collection.Enqueue(item); - return true; - } - catch - { - return false; - } + var task = TryEnqueueAsync(item, millisecondsTimeout, cancellationToken); + task.AsTask().Wait(cancellationToken); + return task.Result; } - public ValueTask TryEnqueueAsync(T item, int millisecondsTimeout, CancellationToken cancellationToken) + public async ValueTask TryEnqueueAsync(T item, int millisecondsTimeout, CancellationToken cancellationToken) { - try - { - _collection.Enqueue(item); - return new ValueTask(true); - } - catch + using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken)) { - return new ValueTask(false); + cts.CancelAfter(millisecondsTimeout); + try + { + await _collection.AddAsync(item, cts.Token); + return true; + } + catch + { + return false; + } } } @@ -65,7 +59,7 @@ public bool TryTake(out T item, CancellationToken cancellationToken = default) try { // TryRead returns immediately - return _collection.TryDequeue(out item); + return _collection.TryTake(out item); } catch { @@ -90,33 +84,36 @@ public bool TryTake(out T item, int millisecondsTimeout, CancellationToken cance } } - public ValueTask<(bool success, T item)> TryTakeAsync(CancellationToken cancellationToken) + public async ValueTask<(bool success, T item)> TryTakeAsync(CancellationToken cancellationToken) { try { - _collection.TryDequeue(out var result); - return new ValueTask<(bool success, T item)>((true, result)); + var result = await _collection.TakeAsync(cancellationToken); + return (true, result); } catch { - return new ValueTask<(bool success, T item)>((false, default)); + return (false, default); } } public async ValueTask<(bool success, T item)> TryTakeAsync(int millisecondsTimeout, CancellationToken cancellationToken) { - using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); - return await TryTakeAsync(cts.Token); + using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken)) + { + cts.CancelAfter(millisecondsTimeout); + return await TryTakeAsync(cts.Token); + } } public T Take(CancellationToken cancellationToken) { - if(!_collection.TryDequeue(out var item)) + if(!_collection.TryTake(out var item)) throw new InvalidOperationException("Failed to dequeue item from the queue."); return item; } - public ValueTask TakeAsync(CancellationToken cancellationToken) => new(Take(cancellationToken)); + public ValueTask TakeAsync(CancellationToken cancellationToken) => new(_collection.TakeAsync(cancellationToken)); public bool TryPeek(out T item) => _collection.TryPeek(out item); @@ -136,16 +133,16 @@ public bool TryPeek(out T item, int millisecondsTimeout, CancellationToken cance } } - public ValueTask<(bool success, T item)> TryPeekAsync(CancellationToken cancellationToken) + public async ValueTask<(bool success, T item)> TryPeekAsync(CancellationToken cancellationToken) { try { - _collection.TryPeek(out var result); - return new ValueTask<(bool success, T item)>((true, result)); + var result = await _collection.PeekAsync(cancellationToken); + return (true, result); } catch { - return new ValueTask<(bool success, T item)>((false, default)); + return (false, default); } } @@ -168,11 +165,110 @@ public T Peek(CancellationToken cancellationToken) return item; } - public ValueTask PeekAsync(CancellationToken cancellationToken) => new(Peek(cancellationToken)); + public ValueTask PeekAsync(CancellationToken cancellationToken) => new(_collection.PeekAsync(cancellationToken)); public List ToList() { - return _collection.ToList(); + throw new System.NotImplementedException(); } + + private class QueueCollection : IPeekableProducerConsumerCollection + { + private readonly Queue _queue = new(); + + public int Count { + get + { + lock (SyncRoot) + { + return _queue.Count; + } + } + } + + public bool TryAdd(T item) + { + lock (SyncRoot) + { + _queue.Enqueue(item); + return true; + } + } + + public bool TryTake(out T item) + { + lock(SyncRoot) + { + if(_queue.Count == 0) + { + item = null; + return false; + } + + item = _queue.Dequeue(); + return true; + } + } + + public bool TryPeek(out T item) + { + lock(SyncRoot) + { + if(_queue.Count == 0) + { + item = null; + return false; + } + + item = _queue.Peek(); + return true; + } + } + + public void CopyTo(T[] array, int index) + { + lock(SyncRoot) + { + _queue.CopyTo(array, index); + } + } + + + public void CopyTo(Array array, int index) + { + lock(SyncRoot) + { + ((ICollection)_queue).CopyTo(array, index); + } + } + + public T[] ToArray() + { + lock(SyncRoot) + { + return _queue.ToArray(); + } + } + + + public IEnumerator GetEnumerator() + { + lock(SyncRoot) + { + //We must create a copy + return new List(_queue).GetEnumerator(); + } + } + + IEnumerator IEnumerable.GetEnumerator() + { + return GetEnumerator(); + } + + public object SyncRoot { get; } = new(); + + public bool IsSynchronized => true; + } } + } From f65914d24622ac8872b0000de51920722bc222d9 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Tue, 6 Feb 2024 02:24:51 +0700 Subject: [PATCH 34/37] Disable AsyncQueue --- src/core/Akka.TestKit/TestKitBase.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/Akka.TestKit/TestKitBase.cs b/src/core/Akka.TestKit/TestKitBase.cs index c639719304b..324dc3897ea 100644 --- a/src/core/Akka.TestKit/TestKitBase.cs +++ b/src/core/Akka.TestKit/TestKitBase.cs @@ -160,7 +160,7 @@ protected virtual void InitializeTest(ActorSystem system, ActorSystemSetup confi system.RegisterExtension(new TestKitAssertionsExtension(_assertions)); _testState.TestKitSettings = TestKitExtension.For(_testState.System); - _testState.Queue = new AsyncQueue(); + _testState.Queue = new BlockingQueue();//new AsyncQueue(); _testState.Log = Logging.GetLogger(system, GetType()); _testState.EventFilterFactory = new EventFilterFactory(this); From a464bb38f64f0f262d1d42607e10a33ca061c44f Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Tue, 6 Feb 2024 04:24:41 +0700 Subject: [PATCH 35/37] Revert dispatcher changes --- .../Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs | 3 +-- src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs | 4 +--- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs b/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs index f05ea09b2aa..e2d8e2e9445 100644 --- a/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs +++ b/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs @@ -101,8 +101,7 @@ public Task CounterActor_internal_state_will_be_lost_if_underlying_persistence_s { return WithJournalWrite(write => write.Fail(), async () => { - var counterProps = Props.Create(() => new CounterActor("test")) - .WithDispatcher("akka.actor.internal-dispatcher"); + var counterProps = Props.Create(() => new CounterActor("test")); var actor = ActorOf(counterProps, "counter"); Sys.Log.Info("Messaging actor"); diff --git a/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs b/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs index 4230b5cc3ab..513b817c07b 100644 --- a/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs +++ b/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs @@ -94,9 +94,7 @@ public Task TestJournal_PersistAll_should_only_count_each_event_exceptions_once( var probe = CreateTestProbe(); return WithJournalWrite(write => write.Pass(), async () => { - var actor = Sys.ActorOf( - Props.Create(() => new TestActor2(probe)) - .WithDispatcher("akka.actor.internal-dispatcher"), "test-actor"); + var actor = Sys.ActorOf(Props.Create(() => new TestActor2(probe)), "test-actor"); var command = new WriteMessage(); actor.Tell(command, actor); From 97d7dd6f3eb2632a17e0aab88320c233eee6c2ec Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Tue, 6 Feb 2024 10:17:41 +0700 Subject: [PATCH 36/37] Revert dispatcher changes revert --- .../Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs | 3 ++- src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs | 4 +++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs b/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs index e2d8e2e9445..f05ea09b2aa 100644 --- a/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs +++ b/src/core/Akka.Persistence.TestKit.Tests/Actors/CounterActor.cs @@ -101,7 +101,8 @@ public Task CounterActor_internal_state_will_be_lost_if_underlying_persistence_s { return WithJournalWrite(write => write.Fail(), async () => { - var counterProps = Props.Create(() => new CounterActor("test")); + var counterProps = Props.Create(() => new CounterActor("test")) + .WithDispatcher("akka.actor.internal-dispatcher"); var actor = ActorOf(counterProps, "counter"); Sys.Log.Info("Messaging actor"); diff --git a/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs b/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs index 513b817c07b..4230b5cc3ab 100644 --- a/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs +++ b/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs @@ -94,7 +94,9 @@ public Task TestJournal_PersistAll_should_only_count_each_event_exceptions_once( var probe = CreateTestProbe(); return WithJournalWrite(write => write.Pass(), async () => { - var actor = Sys.ActorOf(Props.Create(() => new TestActor2(probe)), "test-actor"); + var actor = Sys.ActorOf( + Props.Create(() => new TestActor2(probe)) + .WithDispatcher("akka.actor.internal-dispatcher"), "test-actor"); var command = new WriteMessage(); actor.Tell(command, actor); From 77a3c3d4e0ea35dcf937aa0d92da2dcbe1e060a5 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Tue, 6 Feb 2024 10:18:16 +0700 Subject: [PATCH 37/37] Revert AsyncQueue changes --- src/core/Akka.TestKit/TestKitBase.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/Akka.TestKit/TestKitBase.cs b/src/core/Akka.TestKit/TestKitBase.cs index 324dc3897ea..c639719304b 100644 --- a/src/core/Akka.TestKit/TestKitBase.cs +++ b/src/core/Akka.TestKit/TestKitBase.cs @@ -160,7 +160,7 @@ protected virtual void InitializeTest(ActorSystem system, ActorSystemSetup confi system.RegisterExtension(new TestKitAssertionsExtension(_assertions)); _testState.TestKitSettings = TestKitExtension.For(_testState.System); - _testState.Queue = new BlockingQueue();//new AsyncQueue(); + _testState.Queue = new AsyncQueue(); _testState.Log = Logging.GetLogger(system, GetType()); _testState.EventFilterFactory = new EventFilterFactory(this);