Skip to content

Commit

Permalink
[WIP] Allow persisting events when recovery has completed
Browse files Browse the repository at this point in the history
  • Loading branch information
ismaelhamed committed Mar 17, 2018
1 parent 55388c6 commit 01f0463
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 94 deletions.
155 changes: 84 additions & 71 deletions src/core/Akka.Persistence/Eventsourced.Recovery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,15 @@ namespace Akka.Persistence

internal class EventsourcedState
{
public EventsourcedState(string name, bool isRecoveryRunning, StateReceive stateReceive)
public EventsourcedState(string name, Func<bool> isRecoveryRunning, StateReceive stateReceive)
{
Name = name;
IsRecoveryRunning = isRecoveryRunning;
StateReceive = stateReceive;
}

public string Name { get; }

public bool IsRecoveryRunning { get; }

public Func<bool> IsRecoveryRunning { get; }
public StateReceive StateReceive { get; }

public override string ToString() => Name;
Expand All @@ -47,7 +45,7 @@ public abstract partial class Eventsourced
/// </summary>
private EventsourcedState WaitingRecoveryPermit(Recovery recovery)
{
return new EventsourcedState("waiting for recovery permit", true, (receive, message) =>
return new EventsourcedState("waiting for recovery permit", () => true, (receive, message) =>
{
if (message is RecoveryPermitGranted)
StartRecovery(recovery);
Expand All @@ -59,7 +57,7 @@ private EventsourcedState WaitingRecoveryPermit(Recovery recovery)
/// <summary>
/// Processes a loaded snapshot, if any. A loaded snapshot is offered with a <see cref="SnapshotOffer"/>
/// message to the actor's <see cref="ReceiveRecover"/>. Then initiates a message replay, either starting
/// from the loaded snapshot or from scratch, and switches to <see cref="ReplayStarted"/> state.
/// from the loaded snapshot or from scratch, and switches to <see cref="RecoveryStarted"/> state.
/// All incoming messages are stashed.
/// </summary>
/// <param name="maxReplays">Maximum number of messages to replay</param>
Expand All @@ -81,7 +79,7 @@ private EventsourcedState RecoveryStarted(long maxReplays)
else return false;
};

return new EventsourcedState("recovery started - replay max: " + maxReplays, true, (receive, message) =>
return new EventsourcedState("recovery started - replay max: " + maxReplays, () => true, (receive, message) =>
{
try
{
Expand Down Expand Up @@ -137,11 +135,6 @@ private EventsourcedState RecoveryStarted(long maxReplays)
});
}

private void ReturnRecoveryPermit()
{
Extension.RecoveryPermitter().Tell(Akka.Persistence.ReturnRecoveryPermit.Instance, Self);
}

/// <summary>
/// Processes replayed messages, if any. The actor's <see cref="ReceiveRecover"/> is invoked with the replayed events.
///
Expand All @@ -157,83 +150,85 @@ private EventsourcedState Recovering(Receive recoveryBehavior, TimeSpan timeout)
// protect against event replay stalling forever because of journal overloaded and such
var timeoutCancelable = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(timeout, timeout, Self, new RecoveryTick(false), Self);
var eventSeenInInterval = false;
var recoveryRunning = true;

return new EventsourcedState("replay started", true, (receive, message) =>
return new EventsourcedState("replay started", () => recoveryRunning, (receive, message) =>
{
try
{
if (message is ReplayedMessage)
switch (message)
{
var m = (ReplayedMessage)message;
try
{
eventSeenInInterval = true;
UpdateLastSequenceNr(m.Persistent);
base.AroundReceive(recoveryBehavior, m.Persistent);
}
catch (Exception cause)
{
case ReplayedMessage replayed:
try
{
eventSeenInInterval = true;
UpdateLastSequenceNr(replayed.Persistent);
base.AroundReceive(recoveryBehavior, replayed.Persistent);
}
catch (Exception cause)
{
timeoutCancelable.Cancel();
try { OnRecoveryFailure(cause, replayed.Persistent.Payload); }
finally { Context.Stop(Self); }
ReturnRecoveryPermit();
}
break;
case RecoverySuccess success:
timeoutCancelable.Cancel();
OnReplaySuccess();
_sequenceNr = success.HighestSequenceNr;
LastSequenceNr = success.HighestSequenceNr;
recoveryRunning = false;
try
{
OnRecoveryFailure(cause, m.Persistent.Payload);
base.AroundReceive(recoveryBehavior, RecoveryCompleted.Instance);
}
finally
{
Context.Stop(Self);
// in finally in case exception and resume strategy
TransitToProcessingState();
}
ReturnRecoveryPermit();
}
}
else if (message is RecoverySuccess)
{
var m = (RecoverySuccess)message;
timeoutCancelable.Cancel();
OnReplaySuccess();
ChangeState(ProcessingCommands());
_sequenceNr = m.HighestSequenceNr;
LastSequenceNr = m.HighestSequenceNr;
_internalStash.UnstashAll();
base.AroundReceive(recoveryBehavior, RecoveryCompleted.Instance);
ReturnRecoveryPermit();
}
else if (message is ReplayMessagesFailure)
{
var failure = (ReplayMessagesFailure)message;
timeoutCancelable.Cancel();
try
{
OnRecoveryFailure(failure.Cause, message: null);
}
finally
{
Context.Stop(Self);
}
}
else if (message is RecoveryTick tick && !tick.Snapshot)
{
if (!eventSeenInInterval)
{
break;
case ReplayMessagesFailure failure:
timeoutCancelable.Cancel();
try
{
OnRecoveryFailure(
new RecoveryTimedOutException(
$"Recovery timed out, didn't get event within {timeout.TotalSeconds}s, highest sequence number seen {_sequenceNr}."));
OnRecoveryFailure(failure.Cause);
}
finally
{
Context.Stop(Self);
}
}
else
{
eventSeenInInterval = false;
}
ReturnRecoveryPermit();
break;
case RecoveryTick tick when !tick.Snapshot:
if (!eventSeenInInterval)
{
timeoutCancelable.Cancel();
try
{
OnRecoveryFailure(
new RecoveryTimedOutException(
$"Recovery timed out, didn't get event within {timeout.TotalSeconds}s, highest sequence number seen {LastSequenceNr}."));
}
finally
{
Context.Stop(Self);
}
ReturnRecoveryPermit();
}
else
{
eventSeenInInterval = false;
}
break;
default:
StashInternally(message);
break;
}
else
StashInternally(message);
}
catch (Exception)
{
Expand All @@ -243,13 +238,31 @@ private EventsourcedState Recovering(Receive recoveryBehavior, TimeSpan timeout)
});
}

private void ReturnRecoveryPermit() =>
Extension.RecoveryPermitter().Tell(Akka.Persistence.ReturnRecoveryPermit.Instance, Self);

private void TransitToProcessingState()
{
if (_eventBatch.Count > 0) FlushBatch();

if (_pendingStashingPersistInvocations > 0)
{
ChangeState(PersistingEvents());
}
else
{
ChangeState(ProcessingCommands());
UnstashInternally(true);
}
}

/// <summary>
/// If event persistence is pending after processing a command, event persistence
/// Command processing state. If event persistence is pending after processing a command, event persistence
/// is triggered and the state changes to <see cref="PersistingEvents"/>.
/// </summary>
private EventsourcedState ProcessingCommands()
{
return new EventsourcedState("processing commands", false, (receive, message) =>
return new EventsourcedState("processing commands", () => false, (receive, message) =>
{
var handled = CommonProcessingStateBehavior(message, err =>
{
Expand Down Expand Up @@ -301,12 +314,12 @@ private void FlushBatch()
}

/// <summary>
/// Remains until pending events are persisted and then changes state to <see cref="ProcessingCommands"/>.
/// Event persisting state. Remains until pending events are persisted and then changes state to <see cref="ProcessingCommands"/>.
/// Only events to be persisted are processed. All other messages are stashed internally.
/// </summary>
private EventsourcedState PersistingEvents()
{
return new EventsourcedState("persisting events", false, (receive, message) =>
return new EventsourcedState("persisting events", () => false, (receive, message) =>
{
var handled = CommonProcessingStateBehavior(message, err =>
{
Expand Down
73 changes: 50 additions & 23 deletions src/core/Akka.Persistence/Eventsourced.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ namespace Akka.Persistence
public interface IPendingHandlerInvocation
{
object Event { get; }

Action<object> Handler { get; }
}

Expand Down Expand Up @@ -180,7 +179,7 @@ public IStash Stash
/// <summary>
/// Returns true if this persistent entity is currently recovering.
/// </summary>
public bool IsRecovering => _currentState?.IsRecoveryRunning ?? true;
public bool IsRecovering => _currentState?.IsRecoveryRunning() ?? true;

/// <summary>
/// Returns true if this persistent entity has successfully finished recovery.
Expand Down Expand Up @@ -297,6 +296,11 @@ public void DeleteSnapshots(SnapshotSelectionCriteria criteria)
/// <param name="handler">TBD</param>
public void Persist<TEvent>(TEvent @event, Action<TEvent> handler)
{
if (IsRecovering)
{
throw new InvalidOperationException("Cannot persist during replay. Events can be persisted when receiving RecoveryCompleted or later.");
}

_pendingStashingPersistInvocations++;
_pendingInvocations.AddLast(new StashingHandlerInvocation(@event, o => handler((TEvent)o)));
_eventBatch.AddFirst(new AtomicWrite(new Persistent(@event, persistenceId: PersistenceId,
Expand All @@ -313,17 +317,23 @@ public void Persist<TEvent>(TEvent @event, Action<TEvent> handler)
/// <param name="handler">TBD</param>
public void PersistAll<TEvent>(IEnumerable<TEvent> events, Action<TEvent> handler)
{
if (IsRecovering)
{
throw new InvalidOperationException("Cannot persist during replay. Events can be persisted when receiving RecoveryCompleted or later.");
}

if (events == null) return;

Action<object> inv = o => handler((TEvent)o);
void Inv(object o) => handler((TEvent)o);
var persistents = ImmutableList<IPersistentRepresentation>.Empty.ToBuilder();
foreach (var @event in events)
{
_pendingStashingPersistInvocations++;
_pendingInvocations.AddLast(new StashingHandlerInvocation(@event, inv));
_pendingInvocations.AddLast(new StashingHandlerInvocation(@event, Inv));
persistents.Add(new Persistent(@event, persistenceId: PersistenceId,
sequenceNr: NextSequenceNr(), writerGuid: _writerGuid, sender: Sender));
}

if (persistents.Count > 0)
_eventBatch.AddFirst(new AtomicWrite(persistents.ToImmutable()));
}
Expand Down Expand Up @@ -358,6 +368,11 @@ public void PersistAll<TEvent>(IEnumerable<TEvent> events, Action<TEvent> handle
/// <param name="handler">TBD</param>
public void PersistAsync<TEvent>(TEvent @event, Action<TEvent> handler)
{
if (IsRecovering)
{
throw new InvalidOperationException("Cannot persist during replay. Events can be persisted when receiving RecoveryCompleted or later.");
}

_pendingInvocations.AddLast(new AsyncHandlerInvocation(@event, o => handler((TEvent)o)));
_eventBatch.AddFirst(new AtomicWrite(new Persistent(@event, persistenceId: PersistenceId,
sequenceNr: NextSequenceNr(), writerGuid: _writerGuid, sender: Sender)));
Expand All @@ -373,13 +388,20 @@ public void PersistAsync<TEvent>(TEvent @event, Action<TEvent> handler)
/// <param name="handler">TBD</param>
public void PersistAllAsync<TEvent>(IEnumerable<TEvent> events, Action<TEvent> handler)
{
Action<object> inv = o => handler((TEvent)o);
foreach (var @event in events)
if (IsRecovering)
{
throw new InvalidOperationException("Cannot persist during replay. Events can be persisted when receiving RecoveryCompleted or later.");
}

void Inv(object o) => handler((TEvent)o);
var enumerable = events as TEvent[] ?? events.ToArray();
foreach (var @event in enumerable)
{
_pendingInvocations.AddLast(new AsyncHandlerInvocation(@event, inv));
_pendingInvocations.AddLast(new AsyncHandlerInvocation(@event, Inv));
}
_eventBatch.AddFirst(new AtomicWrite(events.Select(e => new Persistent(e, persistenceId: PersistenceId,
sequenceNr: NextSequenceNr(), writerGuid: _writerGuid, sender: Sender))

_eventBatch.AddFirst(new AtomicWrite(enumerable.Select(e => new Persistent(e, persistenceId: PersistenceId,
sequenceNr: NextSequenceNr(), writerGuid: _writerGuid, sender: Sender))
.ToImmutableList<IPersistentRepresentation>()));
}

Expand All @@ -406,6 +428,11 @@ public void PersistAllAsync<TEvent>(IEnumerable<TEvent> events, Action<TEvent> h
/// <param name="handler">TBD</param>
public void DeferAsync<TEvent>(TEvent evt, Action<TEvent> handler)
{
if (IsRecovering)
{
throw new InvalidOperationException("Cannot persist during replay. Events can be persisted when receiving RecoveryCompleted or later.");
}

if (_pendingInvocations.Count == 0)
{
handler(evt);
Expand Down Expand Up @@ -442,8 +469,8 @@ protected virtual void OnRecoveryFailure(Exception reason, object message = null
{
if (message != null)
{
Log.Error(reason, "Exception in ReceiveRecover when replaying event type [{0}] with sequence number [{1}] for persistenceId [{2}]",
message.GetType(), LastSequenceNr, PersistenceId);
Log.Error(reason, "Exception in ReceiveRecover when replaying event type [{0}] with sequence number [{1}] for persistenceId [{2}]",
message.GetType(), LastSequenceNr, PersistenceId);
}
else
{
Expand Down Expand Up @@ -505,18 +532,18 @@ protected void RunTask(Func<Task> action)
var tcs = new TaskCompletionSource<object>();
t.ContinueWith(r =>
{
_asyncTaskRunning = false;
OnProcessingCommandsAroundReceiveComplete(r.IsFaulted || r.IsCanceled);
if (r.IsFaulted)
tcs.TrySetException(r.Exception);
else if (r.IsCanceled)
tcs.TrySetCanceled();
else
tcs.TrySetResult(null);
}, TaskContinuationOptions.AttachedToParent & TaskContinuationOptions.ExecuteSynchronously);
{
_asyncTaskRunning = false;
OnProcessingCommandsAroundReceiveComplete(r.IsFaulted || r.IsCanceled);
if (r.IsFaulted)
tcs.TrySetException(r.Exception);
else if (r.IsCanceled)
tcs.TrySetCanceled();
else
tcs.TrySetResult(null);
}, TaskContinuationOptions.AttachedToParent & TaskContinuationOptions.ExecuteSynchronously);
t = tcs.Task;
}
Expand Down

0 comments on commit 01f0463

Please sign in to comment.