Permalink
Browse files

akka-persistence prototype

The most prominent changes compared to eventsourced are:

- No central processor and channel registry any more
- Auto-recovery of processors on start and restart (can be disabled)
- Recovery of processor networks doesn't require coordination
- Explicit channel activation not needed any more
- Message sequence numbers generated per processor (no gaps)
- Sender references are journaled along with messages
- Processors can determine their recovery status
- No custom API on extension object, only messages
- Journal created by extension from config, not by application
- Applications only interact with processors and channels via messages
- Internal design prepared for having processor-specific journal actors (for later optimization possibilities)

Further additions and changes during review:

- Allow processor implementation classes to use inherited stash
- Channel support to resolve (potentially invalid) sender references
- Logical intead of physical deletion of messages
- Pinned dispatcher for LevelDB journal
- Processor can handle failures during recovery
- Message renamed to Persistent

This prototype has the following limitations:

- Serialization of persistent messages and their payload via JavaSerializer only (will be configurable later)
- The LevelDB journal implementation based on a LevelDB Java port, not the native LevelDB (will be configurable later)

The following features will be added later using separate tickets:

- Snapshot-based recovery
- Reliable channels
- Journal plugin API
- Optimizations
- ...
  • Loading branch information...
1 parent 1187fec commit cdeea924ff84214025f1b5c147c444de803fd006 @krasserm krasserm committed Sep 14, 2013
Showing with 3,119 additions and 8 deletions.
  1. +10 −0 akka-actor/src/main/scala/akka/actor/Actor.scala
  2. +1 −1 akka-actor/src/main/scala/akka/actor/ActorCell.scala
  3. +24 −4 akka-actor/src/main/scala/akka/actor/Stash.scala
  4. +172 −0 akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java
  5. +1 −0 akka-docs/rst/java/index-actors.rst
  6. +234 −0 akka-docs/rst/java/persistence.rst
  7. +187 −0 akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala
  8. +1 −0 akka-docs/rst/scala/index-actors.rst
  9. +252 −0 akka-docs/rst/scala/persistence.rst
  10. +19 −0 akka-persistence/src/main/resources/reference.conf
  11. +271 −0 akka-persistence/src/main/scala/akka/persistence/Channel.scala
  12. +85 −0 akka-persistence/src/main/scala/akka/persistence/Journal.scala
  13. +66 −0 akka-persistence/src/main/scala/akka/persistence/Persistence.scala
  14. +103 −0 akka-persistence/src/main/scala/akka/persistence/Persistent.scala
  15. +415 −0 akka-persistence/src/main/scala/akka/persistence/Processor.scala
  16. +76 −0 akka-persistence/src/main/scala/akka/persistence/journal/InmemJournal.scala
  17. +278 −0 akka-persistence/src/main/scala/akka/persistence/journal/LeveldbJournal.scala
  18. +129 −0 akka-persistence/src/test/scala/akka/persistence/ChannelSpec.scala
  19. +46 −0 akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala
  20. +303 −0 akka-persistence/src/test/scala/akka/persistence/ProcessorSpec.scala
  21. +127 −0 akka-persistence/src/test/scala/akka/persistence/ProcessorStashSpec.scala
  22. +52 −0 ...amples/akka-sample-persistence/src/main/java/sample/persistence/japi/ProcessorChannelExample.java
  23. +75 −0 ...amples/akka-sample-persistence/src/main/java/sample/persistence/japi/ProcessorFailureExample.java
  24. +1 −0 akka-samples/akka-sample-persistence/src/main/resources/application.conf
  25. +58 −0 ...mples/akka-sample-persistence/src/main/scala/sample/persistence/ConversationRecoveryExample.scala
  26. +47 −0 akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ProcessorChannelExample.scala
  27. +60 −0 akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ProcessorFailureExample.scala
  28. +26 −3 project/AkkaBuild.scala
@@ -470,6 +470,16 @@ trait Actor {
//#receive
/**
+ * INTERNAL API.
+ *
+ * Can be overridden to intercept calls to this actor's current behavior.
+ *
+ * @param receive current behavior.
+ * @param msg current message.
+ */
+ protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = receive.applyOrElse(msg, unhandled)
+
+ /**
* User overridable definition the strategy to use for supervising
* child actors.
*/
@@ -495,7 +495,7 @@ private[akka] class ActorCell(
}
}
- final def receiveMessage(msg: Any): Unit = behaviorStack.head.applyOrElse(msg, actor.unhandled)
+ final def receiveMessage(msg: Any): Unit = actor.aroundReceive(behaviorStack.head, msg)
/*
* ACTOR CONTEXT IMPLEMENTATION
@@ -77,10 +77,13 @@ trait UnrestrictedStash extends Actor {
config.getInt("stash-capacity")
}
- /* The actor's deque-based message queue.
+ /**
+ * INTERNAL API.
+ *
+ * The actor's deque-based message queue.
* `mailbox.queue` is the underlying `Deque`.
*/
- private val mailbox: DequeBasedMessageQueueSemantics = {
+ protected[akka] val mailbox: DequeBasedMessageQueueSemantics = {
context.asInstanceOf[ActorCell].mailbox.messageQueue match {
case queue: DequeBasedMessageQueueSemantics queue
case other throw ActorInitializationException(self, s"DequeBasedMailbox required, got: ${other.getClass.getName}\n" +
@@ -116,9 +119,26 @@ trait UnrestrictedStash extends Actor {
*
* The stash is guaranteed to be empty after calling `unstashAll()`.
*/
- def unstashAll(): Unit = {
+ def unstashAll(): Unit = unstashAll(_ true)
+
+ /**
+ * INTERNAL API.
+ *
+ * Prepends selected messages in the stash, applying `filterPredicate`, to the
+ * mailbox, and then clears the stash.
+ *
+ * Messages from the stash are enqueued to the mailbox until the capacity of the
+ * mailbox (if any) has been reached. In case a bounded mailbox overflows, a
+ * `MessageQueueAppendFailedException` is thrown.
+ *
+ * The stash is guaranteed to be empty after calling `unstashAll(Any => Boolean)`.
+ *
+ * @param filterPredicate only stashed messages selected by this predicate are
+ * prepended to the mailbox.
+ */
+ protected[akka] def unstashAll(filterPredicate: Any Boolean): Unit = {
try {
- val i = theStash.reverseIterator
+ val i = theStash.reverseIterator.filter(envelope filterPredicate(envelope.message))
while (i.hasNext) mailbox.enqueueFirst(self, i.next())
} finally {
theStash = Vector.empty[Envelope]
@@ -0,0 +1,172 @@
+package docs.persistence;
+
+import scala.Option;
+
+import akka.actor.*;
+import akka.persistence.*;
+
+public class PersistenceDocTest {
+
+ public interface ProcessorMethods {
+ //#processor-id
+ public String processorId();
+ //#processor-id
+ //#recovery-status
+ public boolean recoveryRunning();
+ public boolean recoveryFinished();
+ //#recovery-status
+ //#current-message
+ public Persistent getCurrentPersistentMessage();
+ //#current-message
+ }
+
+ static Object o1 = new Object() {
+ //#definition
+ class MyProcessor extends UntypedProcessor {
+ public void onReceive(Object message) throws Exception {
+ if (message instanceof Persistent) {
+ // message has been written to journal
+ Persistent persistent = (Persistent)message;
+ Object payload = persistent.payload();
+ Long sequenceNr = persistent.sequenceNr();
+ // ...
+ } else {
+ // message has not been written to journal
+ }
+ }
+ }
+ //#definition
+
+ class MyActor extends UntypedActor {
+ ActorRef processor;
+
+ public MyActor() {
+ //#usage
+ processor = getContext().actorOf(Props.create(MyProcessor.class), "myProcessor");
+
+ processor.tell(Persistent.create("foo"), null);
+ processor.tell("bar", null);
+ //#usage
+ }
+
+ public void onReceive(Object message) throws Exception {
+ // ...
+ }
+
+ private void recover() {
+ //#recover-explicit
+ processor.tell(Recover.create(), null);
+ //#recover-explicit
+ }
+ }
+ };
+
+ static Object o2 = new Object() {
+ abstract class MyProcessor1 extends UntypedProcessor {
+ //#recover-on-start-disabled
+ @Override
+ public void preStartProcessor() {}
+ //#recover-on-start-disabled
+
+ //#recover-on-restart-disabled
+ @Override
+ public void preRestartProcessor(Throwable reason, Option<Object> message) {}
+ //#recover-on-restart-disabled
+ }
+
+ abstract class MyProcessor2 extends UntypedProcessor {
+ //#recover-on-start-custom
+ @Override
+ public void preStartProcessor() {
@jboner
jboner Sep 17, 2013 Member

Why call it preStartProcessor? Why not something like preStartRecovery or preStartReplay or something with semantic meaning? That is of course if the main purpose of it is to perform recovery—as the docs state.

@jboner
jboner Sep 17, 2013 Member

I guess my point is: why not have methods with good names and well defined semantic meaning rather than just blindly adopting the preX/postX scheme from Actor (which are so generic just because it is only the user implementing them). In this example just call the method: recover or replay.

@krasserm
krasserm Sep 17, 2013 Contributor

The processor-specific life cycle hooks are planned to be removed anyway, as discussed at:

Pull request is coming soon. The recovery logic will then directly be in preStart and preRestart. We still can factor the recovery logic out into separate (overridable) methods.

@jboner
jboner Sep 17, 2013 Member

Ok. I think that could make sense. preStart/preRestart are even less specific. But there might be drawbacks.

+ getSelf().tell(Recover.create(457L), null);
+ }
+ //#recover-on-start-custom
+ }
+
+ abstract class MyProcessor3 extends UntypedProcessor {
+ //#deletion
+ @Override
+ public void preRestartProcessor(Throwable reason, Option<Object> message) throws Exception {
+ if (message.isDefined() && message.get() instanceof Persistent) {
+ delete((Persistent) message.get());
+ }
+ super.preRestartProcessor(reason, message);
+ }
+ //#deletion
+ }
+
+ class MyProcessor4 extends UntypedProcessor implements ProcessorMethods {
+ //#processor-id-override
+ @Override
+ public String processorId() {
+ return "my-stable-processor-id";
+ }
+ //#processor-id-override
+ @Override
+ public void onReceive(Object message) throws Exception {}
+ }
+ };
+
+ static Object o3 = new Object() {
+ //#channel-example
+ class MyProcessor extends UntypedProcessor {
+ private final ActorRef destination;
+ private final ActorRef channel;
+
+ public MyProcessor() {
+ this.destination = getContext().actorOf(Props.create(MyDestination.class));
+ this.channel = getContext().actorOf(Channel.props(), "myChannel");
+ }
+
+ public void onReceive(Object message) throws Exception {
+ if (message instanceof Persistent) {
+ Persistent p = (Persistent)message;
+ Persistent out = p.withPayload("done " + p.payload());
+ channel.tell(Deliver.create(out, destination), getSelf());
+ }
+ }
+ }
+
+ class MyDestination extends UntypedActor {
+ public void onReceive(Object message) throws Exception {
+ if (message instanceof Persistent) {
+ Persistent p = (Persistent)message;
+ System.out.println("received " + p.payload());
+ p.confirm();
@jboner
jboner Sep 17, 2013 Member

What happens if I call confirm on a message not delivered through a channel? no-op?
The destination actor can't, and should not know, if it receives messages through a channel or not right?

@krasserm
krasserm Sep 17, 2013 Contributor

What happens if I call confirm on a message not delivered through a channel? no-op?

yes, a no-op.

The destination actor can't, and should not know, if it receives messages through a channel or not right?

the destination actor (or any of its downstream actors) is responsible for confirming messages delivered through a channel. Therefore, it must know.

@jboner
jboner Sep 17, 2013 Member

Ok. Is confirm used anywhere else? If not why is it on the generic Persistent?
Have you thought about having a special ConfirmablePersistent or ChannelRoutedPersistent message or wrap it in an envelope with the confirm method or similar?
So it is clear what the intent is. Just asking.

@krasserm
krasserm Sep 17, 2013 Contributor

I like the idea with ConfirmablePersistent. Need to think more about possible implications. Added to my todo list.

@jboner
jboner Sep 17, 2013 Member

Ok. Cool.

@krasserm
krasserm Nov 4, 2013 Contributor

@jboner I didn't forget about ConfirmablePersistent. Will come with my next pull request.

+ }
+ }
+ }
+ //#channel-example
+
+ class MyProcessor2 extends UntypedProcessor {
+ private final ActorRef destination;
+ private final ActorRef channel;
+
+ public MyProcessor2(ActorRef destination) {
+ this.destination = getContext().actorOf(Props.create(MyDestination.class));
+ //#channel-id-override
+ this.channel = getContext().actorOf(Channel.props("my-stable-channel-id"));
+ //#channel-id-override
+ }
+
+ public void onReceive(Object message) throws Exception {
+ if (message instanceof Persistent) {
+ Persistent p = (Persistent)message;
+ Persistent out = p.withPayload("done " + p.payload());
+ channel.tell(Deliver.create(out, destination), getSelf());
+
+ //#channel-example-reply
+ channel.tell(Deliver.create(out, getSender()), getSelf());
+ //#channel-example-reply
+ //#resolve-destination
+ channel.tell(Deliver.create(out, getSender(), Resolve.destination()), getSelf());
+ //#resolve-destination
+ //#resolve-sender
+ channel.tell(Deliver.create(out, destination, Resolve.sender()), getSender());
+ //#resolve-sender
+
+ }
+ }
+ }
+ };
+}
@@ -11,4 +11,5 @@ Actors
mailboxes
routing
fsm
+ persistence
testing
Oops, something went wrong.

1 comment on commit cdeea92

@ahjohannessen

This is epic! :)

Please sign in to comment.