-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow persisting events when recovery has completed #3366
Conversation
@@ -17,17 +17,15 @@ namespace Akka.Persistence | |||
|
|||
internal class EventsourcedState | |||
{ | |||
public EventsourcedState(string name, bool isRecoveryRunning, StateReceive stateReceive) | |||
public EventsourcedState(string name, Func<bool> isRecoveryRunning, StateReceive stateReceive) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the best option?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: how often is the EventsourcedState
object allocated? From a functional standpoint it should be fine, but if we allocate a new EventsourcedState
instance regularly then anonymous method allocations could add up. I'm not familiar enough with Akka.Persistence's internals to know off-hand. Do you or @Horusiath have some idea?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Aaronontheweb not sure about this one, honestly. Maybe @Horusiath wants to weigh in.
|
||
return new EventsourcedState("replay started", true, (receive, message) => | ||
return new EventsourcedState("replay started", () => recoveryRunning, (receive, message) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implicitly capture closure
01f0463
to
6f0e47d
Compare
9e35b77
to
22962d9
Compare
22962d9
to
db12c06
Compare
@ismaelhamed still waiting on more changes first? |
db12c06
to
97dcd79
Compare
@Aaronontheweb code is complete, but I'd appreciate comments on those remarks. Until then, this is still WIP |
Thanks, I'll take a look |
|
||
public bool IsRecoveryRunning { get; } | ||
|
||
public Func<bool> IsRecoveryRunning { get; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This breaks the API?!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nah, this is an internal
API. None of the third party Akka.Persistence plugins have access to this so we're able to preserve binary compatibility here. It's only the Akka.Persistence engine itself that consumes this directly so I think you're fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have some questions about the specs and performance impacts that I'd like to have answered before pushing this through into a patch.
ExpectMsgInOrder("a-1", "a-2", "rc-1", "rc-2", "rc-3"); | ||
persistentActor.Tell(new Cmd("invalid")); | ||
persistentActor.Tell(GetState.Instance); | ||
ExpectMsgInOrder("a-1", "a-2", "rc-1", "rc-2", "rc-3", "invalid"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ismaelhamed shouldn't there be an invalid-recovery
event recovered here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Aaronontheweb No, the point of this PR is twofold: First, prevent commands from interrupting the events being persisted in RecoveryCompleted. Second, make sure an actor blows up if Persist() is invoked when replaying events (due to "invalid-recovery" event in this case).
So basically, persist is allowed in RecoveryCompleted, but it will fail-fast during recovery. You'll get a nice exception along with the intrusive ("invalid") message in OnRecoveryFailure though.
|
||
public bool IsRecoveryRunning { get; } | ||
|
||
public Func<bool> IsRecoveryRunning { get; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nah, this is an internal
API. None of the third party Akka.Persistence plugins have access to this so we're able to preserve binary compatibility here. It's only the Akka.Persistence engine itself that consumes this directly so I think you're fine.
{ | ||
try | ||
{ | ||
if (message is ReplayedMessage) | ||
switch (message) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this change
@@ -17,17 +17,15 @@ namespace Akka.Persistence | |||
|
|||
internal class EventsourcedState | |||
{ | |||
public EventsourcedState(string name, bool isRecoveryRunning, StateReceive stateReceive) | |||
public EventsourcedState(string name, Func<bool> isRecoveryRunning, StateReceive stateReceive) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: how often is the EventsourcedState
object allocated? From a functional standpoint it should be fine, but if we allocate a new EventsourcedState
instance regularly then anonymous method allocations could add up. I'm not familiar enough with Akka.Persistence's internals to know off-hand. Do you or @Horusiath have some idea?
Left some review notes for you @ismaelhamed |
persistentActor.Tell(GetState.Instance); | ||
ExpectMsgInOrder("a-1", "a-2", "rc-1", "rc-2"); | ||
persistentActor.Tell(GetState.Instance); | ||
ExpectMsgInOrder("a-1", "a-2", "rc-1", "rc-2", "rc-3"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's some race condition (at least in the JVM) between the GetState and the PersistAsync, so they've simplified the test like this:
expectMsgAnyOf(List("a-1", "a-2", "rc-1", "rc-2"), List("a-1", "a-2", "rc-1", "rc-2", "rc-3"))
But I'm not sure this is possible with the current ExpectMsgAnyOf in Akka.NET
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Aaronontheweb about this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I saw the comment - the test passes currently, right? If it's going to be racy, do we need to implement some new features in the TestKit
in order to do this correctly? What's the issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test runs fine, at least every time I've run it. But I noticed they (JVM) changed it since the original PR for the reason I mentioned, just so you know.
Bleh, I forgot to come back and revisit this - will do |
68c50a3
to
5980c05
Compare
5980c05
to
79484cd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me. Just rebased on latest dev
changes. Ready for merge when those specs come back.
334d899
to
f574061
Compare
Thanks for your contribution @ismaelhamed |
DO NOT MERGE before #3372Original issues: #21736 and #22609
Original PR: #21893
core implementationunit tests