Skip to content

Commit

Permalink
Allow persisting events when recovery has completed (#3366)
Browse files Browse the repository at this point in the history
  • Loading branch information
ismaelhamed authored and Aaronontheweb committed Jun 25, 2018
1 parent 68dc1b5 commit fdc1765
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 83 deletions.
39 changes: 39 additions & 0 deletions src/core/Akka.Persistence.Tests/PersistentActorSpec.Actors.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1121,6 +1121,45 @@ protected override bool ReceiveCommand(object message)
return false;
}
}

internal class PersistInRecovery : ExamplePersistentActor
{
public PersistInRecovery(string name)
: base(name)
{ }

protected override bool ReceiveRecover(object message)
{
switch (message)
{
case Evt evt when evt.Data?.ToString() == "invalid":
Persist(new Evt("invalid-recovery"), UpdateStateHandler);
return true;
case Evt evt:
return UpdateState(evt);
case RecoveryCompleted _:
PersistAsync(new Evt("rc-1"), UpdateStateHandler);
Persist(new Evt("rc-2"), UpdateStateHandler);
PersistAsync(new Evt("rc-3"), UpdateStateHandler);
return true;
}

return false;
}

protected override bool ReceiveCommand(object message)
{
if (CommonBehavior(message)) return true;

if (message is Cmd cmd)
{
Persist(new Evt(cmd.Data), UpdateStateHandler);
return true;
}

return false;
}
}
}
}

16 changes: 16 additions & 0 deletions src/core/Akka.Persistence.Tests/PersistentActorSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -611,5 +611,21 @@ public void PersistentActor_should_brecover_the_message_which_caused_the_restart
persistentActor.Tell("boom");
ExpectMsg("failed with TestException while processing boom");
}

[Fact]
public void PersistentActor_should_be_able_to_persist_events_that_happen_during_recovery()
{
var persistentActor = ActorOf(Props.Create(() => new PersistInRecovery(Name)));
persistentActor.Tell(GetState.Instance);
ExpectMsgInOrder("a-1", "a-2", "rc-1", "rc-2");
persistentActor.Tell(GetState.Instance);
ExpectMsgInOrder("a-1", "a-2", "rc-1", "rc-2", "rc-3");
persistentActor.Tell(new Cmd("invalid"));
persistentActor.Tell(GetState.Instance);
ExpectMsgInOrder("a-1", "a-2", "rc-1", "rc-2", "rc-3", "invalid");
Watch(persistentActor);
persistentActor.Tell("boom");
ExpectTerminated(persistentActor);
}
}
}
159 changes: 87 additions & 72 deletions src/core/Akka.Persistence/Eventsourced.Recovery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,15 @@ namespace Akka.Persistence

internal class EventsourcedState
{
public EventsourcedState(string name, bool isRecoveryRunning, StateReceive stateReceive)
public EventsourcedState(string name, Func<bool> isRecoveryRunning, StateReceive stateReceive)
{
Name = name;
IsRecoveryRunning = isRecoveryRunning;
StateReceive = stateReceive;
}

public string Name { get; }

public bool IsRecoveryRunning { get; }

public Func<bool> IsRecoveryRunning { get; }
public StateReceive StateReceive { get; }

public override string ToString() => Name;
Expand All @@ -47,7 +45,7 @@ public abstract partial class Eventsourced
/// </summary>
private EventsourcedState WaitingRecoveryPermit(Recovery recovery)
{
return new EventsourcedState("waiting for recovery permit", true, (receive, message) =>
return new EventsourcedState("waiting for recovery permit", () => true, (receive, message) =>
{
if (message is RecoveryPermitGranted)
StartRecovery(recovery);
Expand All @@ -59,7 +57,7 @@ private EventsourcedState WaitingRecoveryPermit(Recovery recovery)
/// <summary>
/// Processes a loaded snapshot, if any. A loaded snapshot is offered with a <see cref="SnapshotOffer"/>
/// message to the actor's <see cref="ReceiveRecover"/>. Then initiates a message replay, either starting
/// from the loaded snapshot or from scratch, and switches to <see cref="ReplayStarted"/> state.
/// from the loaded snapshot or from scratch, and switches to <see cref="RecoveryStarted"/> state.
/// All incoming messages are stashed.
/// </summary>
/// <param name="maxReplays">Maximum number of messages to replay</param>
Expand All @@ -81,7 +79,7 @@ private EventsourcedState RecoveryStarted(long maxReplays)
else return false;
};

return new EventsourcedState("recovery started - replay max: " + maxReplays, true, (receive, message) =>
return new EventsourcedState("recovery started - replay max: " + maxReplays, () => true, (receive, message) =>
{
try
{
Expand Down Expand Up @@ -139,11 +137,6 @@ private EventsourcedState RecoveryStarted(long maxReplays)
});
}

private void ReturnRecoveryPermit()
{
Extension.RecoveryPermitter().Tell(Akka.Persistence.ReturnRecoveryPermit.Instance, Self);
}

/// <summary>
/// Processes replayed messages, if any. The actor's <see cref="ReceiveRecover"/> is invoked with the replayed events.
///
Expand All @@ -159,85 +152,89 @@ private EventsourcedState Recovering(Receive recoveryBehavior, TimeSpan timeout)
// protect against event replay stalling forever because of journal overloaded and such
var timeoutCancelable = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(timeout, timeout, Self, new RecoveryTick(false), Self);
var eventSeenInInterval = false;
var recoveryRunning = true;

return new EventsourcedState("replay started", true, (receive, message) =>
return new EventsourcedState("replay started", () => recoveryRunning, (receive, message) =>
{
try
{
if (message is ReplayedMessage)
switch (message)
{
var m = (ReplayedMessage)message;
try
{
eventSeenInInterval = true;
UpdateLastSequenceNr(m.Persistent);
base.AroundReceive(recoveryBehavior, m.Persistent);
}
catch (Exception cause)
{
case ReplayedMessage replayed:
try
{
eventSeenInInterval = true;
UpdateLastSequenceNr(replayed.Persistent);
base.AroundReceive(recoveryBehavior, replayed.Persistent);
}
catch (Exception cause)
{
timeoutCancelable.Cancel();
try
{
OnRecoveryFailure(cause, replayed.Persistent.Payload);
}
finally
{
Context.Stop(Self);
}
ReturnRecoveryPermit();
}
break;
case RecoverySuccess success:
timeoutCancelable.Cancel();
OnReplaySuccess();
_sequenceNr = success.HighestSequenceNr;
LastSequenceNr = success.HighestSequenceNr;
recoveryRunning = false;
try
{
OnRecoveryFailure(cause, m.Persistent.Payload);
base.AroundReceive(recoveryBehavior, RecoveryCompleted.Instance);
}
finally
{
Context.Stop(Self);
// in finally in case exception and resume strategy
TransitToProcessingState();
}
ReturnRecoveryPermit();
}
}
else if (message is RecoverySuccess)
{
var m = (RecoverySuccess)message;
timeoutCancelable.Cancel();
OnReplaySuccess();
ChangeState(ProcessingCommands());
_sequenceNr = m.HighestSequenceNr;
LastSequenceNr = m.HighestSequenceNr;
_internalStash.UnstashAll();
base.AroundReceive(recoveryBehavior, RecoveryCompleted.Instance);
ReturnRecoveryPermit();
}
else if (message is ReplayMessagesFailure)
{
var failure = (ReplayMessagesFailure)message;
timeoutCancelable.Cancel();
try
{
OnRecoveryFailure(failure.Cause, message: null);
}
finally
{
Context.Stop(Self);
}
ReturnRecoveryPermit();
}
else if (message is RecoveryTick tick && !tick.Snapshot)
{
if (!eventSeenInInterval)
{
break;
case ReplayMessagesFailure failure:
timeoutCancelable.Cancel();
try
{
OnRecoveryFailure(
new RecoveryTimedOutException(
$"Recovery timed out, didn't get event within {timeout.TotalSeconds}s, highest sequence number seen {_sequenceNr}."));
OnRecoveryFailure(failure.Cause);
}
finally
{
Context.Stop(Self);
}
ReturnRecoveryPermit();
}
else
{
eventSeenInInterval = false;
}
break;
case RecoveryTick tick when !tick.Snapshot:
if (!eventSeenInInterval)
{
timeoutCancelable.Cancel();
try
{
OnRecoveryFailure(
new RecoveryTimedOutException(
$"Recovery timed out, didn't get event within {timeout.TotalSeconds}s, highest sequence number seen {LastSequenceNr}."));
}
finally
{
Context.Stop(Self);
}
ReturnRecoveryPermit();
}
else
{
eventSeenInInterval = false;
}
break;
default:
StashInternally(message);
break;
}
else
StashInternally(message);
}
catch (Exception)
{
Expand All @@ -247,13 +244,31 @@ private EventsourcedState Recovering(Receive recoveryBehavior, TimeSpan timeout)
});
}

private void ReturnRecoveryPermit() =>
Extension.RecoveryPermitter().Tell(Akka.Persistence.ReturnRecoveryPermit.Instance, Self);

private void TransitToProcessingState()
{
if (_eventBatch.Count > 0) FlushBatch();

if (_pendingStashingPersistInvocations > 0)
{
ChangeState(PersistingEvents());
}
else
{
ChangeState(ProcessingCommands());
_internalStash.UnstashAll();
}
}

/// <summary>
/// If event persistence is pending after processing a command, event persistence
/// Command processing state. If event persistence is pending after processing a command, event persistence
/// is triggered and the state changes to <see cref="PersistingEvents"/>.
/// </summary>
private EventsourcedState ProcessingCommands()
{
return new EventsourcedState("processing commands", false, (receive, message) =>
return new EventsourcedState("processing commands", () => false, (receive, message) =>
{
var handled = CommonProcessingStateBehavior(message, err =>
{
Expand Down Expand Up @@ -305,12 +320,12 @@ private void FlushBatch()
}

/// <summary>
/// Remains until pending events are persisted and then changes state to <see cref="ProcessingCommands"/>.
/// Event persisting state. Remains until pending events are persisted and then changes state to <see cref="ProcessingCommands"/>.
/// Only events to be persisted are processed. All other messages are stashed internally.
/// </summary>
private EventsourcedState PersistingEvents()
{
return new EventsourcedState("persisting events", false, (receive, message) =>
return new EventsourcedState("persisting events", () => false, (receive, message) =>
{
var handled = CommonProcessingStateBehavior(message, err =>
{
Expand Down
Loading

0 comments on commit fdc1765

Please sign in to comment.