Skip to content

Commit

Permalink
Persistent actor stops on recovery failures
Browse files Browse the repository at this point in the history
  • Loading branch information
Horusiath committed Nov 4, 2015
1 parent dce684d commit 0310571
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 61 deletions.
58 changes: 10 additions & 48 deletions src/core/Akka.Persistence/Eventsourced.Recovery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,16 @@ private EventsourcedState ReplayStarted(Receive recoveryBehavior)
UpdateLastSequenceNr(m.Persistent);
base.AroundReceive(recoveryBehavior, m.Persistent);
}
catch (Exception exc)
catch (Exception cause)
{
var currentMessage = Context.AsInstanceOf<ActorCell>().CurrentMessage;
ChangeState(ReplayFailed(exc, currentMessage));
try
{
OnReplayFailure(cause, m.Persistent.Payload);
}
finally
{
Context.Stop(Self);
}
}
}
else if (message is ReplayMessagesSuccess)
Expand All @@ -138,58 +144,14 @@ private EventsourcedState ReplayStarted(Receive recoveryBehavior)
else if (message is ReplayMessagesFailure)
{
var failure = (ReplayMessagesFailure)message;
OnReplayFailure(failure.Cause);
OnReplayFailure(failure.Cause, message: null);
// FIXME what happens if RecoveryFailure is handled, i.e. actor is not stopped?
base.AroundReceive(recoveryBehavior, new RecoveryFailure(failure.Cause));
}
else _internalStash.Stash();
});
}

/// <summary>
/// Processes all remaining replayed messages and changes to <see cref="PrepareRestart"/>.
/// Message that caused and exception during replay, is re-added to the mailbox and re-received
/// in <see cref="PrepareRestart"/> state.
/// </summary>
private EventsourcedState ReplayFailed(Exception cause, object failureMessage)
{
return new EventsourcedState("replay failed", true, (receive, message) =>
{
if (message is ReplayMessagesFailure)
{
ReplayCompleted(cause, failureMessage);
// journal couldn't tell the maximum stored sequence number, hence the next
// replay must be a full replay (up to the highest stored sequence number)
// Recover(lastSequenceNr) is sent by PreRestart
LastSequenceNr = long.MaxValue;
}
else if (message is ReplayMessagesSuccess) ReplayCompleted(cause, failureMessage);
else if (message is ReplayedMessage) UpdateLastSequenceNr(((ReplayedMessage)message).Persistent);
else if (message is Recover) return; // ignore
else _internalStash.Stash();
});
}

private void ReplayCompleted(Exception cause, object failureMessage)
{
ChangeState(PrepareRestart(cause));

//TODO: this implementation requires mailbox.EnqueueFirst to be available, but that actually gives a large
// amount of composition constrains. If any of the casts below will go wrong, user-defined messages won't be handled by actors.
Context.EnqueueMessageFirst(failureMessage);
}

/// <summary>
/// Re-receives replayed message that caused an exception and re-throws the <paramref name="cause"/>.
/// </summary>
private EventsourcedState PrepareRestart(Exception cause)
{
return new EventsourcedState("prepare restart", true, (receive, message) =>
{
if (message is ReplayedMessage) throw cause;
});
}

/// <summary>
/// Processes messages with the highest stored sequence number in the journal and then switches to
/// <see cref="ProcessingCommands"/> state. All other messages are stashed.
Expand Down
28 changes: 20 additions & 8 deletions src/core/Akka.Persistence/Eventsourced.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Collections.Generic;
using System.Linq;
using Akka.Actor;
using Akka.Event;
using Akka.Util.Internal;

namespace Akka.Persistence
Expand Down Expand Up @@ -73,6 +74,7 @@ public abstract partial class Eventsourced : ActorBase, IPersistentIdentity, IWi
private LinkedList<IPendingHandlerInvocation> _pendingInvocations = new LinkedList<IPendingHandlerInvocation>();

protected readonly PersistenceExtension Extension;
private readonly ILoggingAdapter _log;

protected Eventsourced()
{
Expand All @@ -83,8 +85,11 @@ protected Eventsourced()
_maxMessageBatchSize = Extension.Settings.Journal.MaxMessageBatchSize;
_currentState = RecoveryPending();
_internalStash = CreateStash();
_log = Context.GetLogger();
}


protected virtual ILoggingAdapter Log { get { return _log; } }

/// <summary>
/// Id of the persistent entity for which messages should be replayed.
/// </summary>
Expand Down Expand Up @@ -302,10 +307,22 @@ public void UnstashAll()
protected virtual void OnReplaySuccess() { }

/// <summary>
/// Called whenever a message replay fails.
/// Called whenever a message replay fails. By default it log the errors.
/// </summary>
/// <param name="reason">Reason of failure</param>
protected virtual void OnReplayFailure(Exception reason) { }
/// <param name="message">Message that caused a failure</param>
protected virtual void OnReplayFailure(Exception reason, object message = null)
{
if (message != null)
{
_log.Error(reason, "Exception in ReceiveRecover when replaying event type [{0}] with sequence number [{1}] for persistenceId [{2}]",
message.GetType(), LastSequenceNr, PersistenceId);
}
else
{
_log.Error(reason, "Persistence failure when replaying events for persistenceId [{0}]. Last known sequence number [{1}]", PersistenceId, LastSequenceNr);
}
}

private void ChangeState(EventsourcedState state)
{
Expand Down Expand Up @@ -333,11 +350,6 @@ private IStash CreateStash()
{
return Context.CreateStash(GetType());
}

private static bool CanUnstashFilterPredicate(object message)
{
return !(message is WriteMessageSuccess || message is ReplayedMessage);
}
}
}

13 changes: 8 additions & 5 deletions src/core/Akka.Persistence/Persistence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using Akka.Actor;
using Akka.Configuration;
using Akka.Dispatch;
Expand Down Expand Up @@ -52,14 +53,14 @@ public PersistenceExtension(ExtendedActorSystem system)
var configPath = _config.GetString("journal.plugin");
if (string.IsNullOrEmpty(configPath)) throw new NullReferenceException("Default journal plugin is not configured");
return configPath;
});
}, LazyThreadSafetyMode.ExecutionAndPublication);

_defaultSnapshotPluginId = new Lazy<string>(() =>
{
var configPath = _config.GetString("snapshot-store.plugin");
if (string.IsNullOrEmpty(configPath)) throw new NullReferenceException("Default snapshot-store plugin is not configured");
return configPath;
});
}, LazyThreadSafetyMode.ExecutionAndPublication);

Settings = new PersistenceSettings(_system, _config);
}
Expand All @@ -81,7 +82,7 @@ public IActorRef SnapshotStoreFor(string snapshotPluginId)
Lazy<PluginHolder> pluginContainer;
if (!_snapshotPluginExtensionIds.TryGetValue(configPath, out pluginContainer))
{
var plugin = new Lazy<PluginHolder>(() => CreatePlugin(configPath, _ => DefaultPluginDispatcherId));
var plugin = new Lazy<PluginHolder>(() => CreatePlugin(configPath, _ => DefaultPluginDispatcherId), LazyThreadSafetyMode.ExecutionAndPublication);
pluginContainer = _snapshotPluginExtensionIds.AddOrUpdate(configPath, plugin, (key, old) => plugin);
}

Expand All @@ -101,7 +102,8 @@ public IActorRef JournalFor(string journalPluginId)
var plugin = new Lazy<PluginHolder>(() => CreatePlugin(configPath, type =>
typeof (AsyncWriteJournal).IsAssignableFrom(type)
? Dispatchers.DefaultDispatcherId
: DefaultPluginDispatcherId));
: DefaultPluginDispatcherId),
LazyThreadSafetyMode.ExecutionAndPublication);
pluginContainer = _journalPluginExtensionIds.AddOrUpdate(configPath, plugin, (key, old) => plugin);
}

Expand All @@ -124,7 +126,8 @@ public EventAdapters AdaptersFor(string journalPluginId)
var plugin = new Lazy<PluginHolder>(() =>
CreatePlugin(configPath, type => typeof (AsyncWriteJournal).IsAssignableFrom(type)
? Dispatchers.DefaultDispatcherId
: DefaultPluginDispatcherId));
: DefaultPluginDispatcherId),
LazyThreadSafetyMode.ExecutionAndPublication);
pluginContainer = _journalPluginExtensionIds.AddOrUpdate(configPath, plugin, (key, old) => plugin);
}

Expand Down

0 comments on commit 0310571

Please sign in to comment.