Skip to content

Commit

Permalink
Replay from application-defined sequence number (RBMHTechnology#302)
Browse files Browse the repository at this point in the history
  • Loading branch information
danbim committed Sep 19, 2016
1 parent 8a7b7de commit 5760801
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 5 deletions.
Expand Up @@ -308,7 +308,7 @@ trait EventsourcedView extends Actor with Stash {
val iid = instanceId

if (saveRequests.contains(metadata)) {
handler(Failure(new IllegalStateException(s"snapshot with metadata ${metadata} is currently being saved")))
handler(Failure(new IllegalStateException(s"snapshot with metadata $metadata is currently being saved")))
} else {
saveRequests += (metadata -> handler)
val snapshot = snapshotCaptured(prototype)
Expand All @@ -318,6 +318,17 @@ trait EventsourcedView extends Actor with Stash {
}
}

/**
* Override to provide an application-defined log sequence number from which event replay will start.
*
* If `Some(snr)` is returned snapshot loading will be skipped and replay will start from
* the given sequence number `snr`.
*
* If `None` is returned the actor proceeds with the regular snapshot loading procedure.
*/
def replayFromSequenceNr: Option[Long] =
None

/**
* Internal API.
*/
Expand All @@ -342,7 +353,10 @@ trait EventsourcedView extends Actor with Stash {
* Internal API.
*/
private[eventuate] def init(): Unit =
load()
replayFromSequenceNr match {
case Some(snr) => replay(snr, subscribe = true)
case None => load()
}

/**
* Internal API.
Expand Down
Expand Up @@ -88,6 +88,8 @@ trait EventsourcedWriter[R, W] extends EventsourcedView {

private var numPending: Int = 0

override final def replayFromSequenceNr: Option[Long] = None

/**
* Asynchronously reads an initial value from the target database, usually to obtain information about
* event processing progress. This method is called during initialization.
Expand Down Expand Up @@ -115,7 +117,7 @@ trait EventsourcedWriter[R, W] extends EventsourcedView {
* and can be overridden.
*/
def readSuccess(result: R): Option[Long] =
None
replayFromSequenceNr

/**
* Called with a write result after a `write` operation successfully completes. This method may update
Expand Down
Expand Up @@ -18,9 +18,9 @@ package com.rbmhtechnology.eventuate

import akka.actor._
import akka.testkit._

import org.scalatest._

import scala.concurrent.Future
import scala.util._

object EventsourcedViewSpec {
Expand All @@ -36,7 +36,8 @@ object EventsourcedViewSpec {
class TestEventsourcedView(
val logProbe: ActorRef,
val msgProbe: ActorRef,
customReplayBatchSize: Option[Int]) extends EventsourcedView {
customReplayBatchSize: Option[Int],
override val replayFromSequenceNr: Option[Long] = None) extends EventsourcedView {

val id = emitterIdA
val eventLog = logProbe
Expand Down Expand Up @@ -217,6 +218,10 @@ class EventsourcedViewSpec extends TestKit(ActorSystem("test")) with WordSpecLik
def recoveredGuardingView(): ActorRef =
processRecover(system.actorOf(Props(new TestGuardingView(logProbe.ref, msgProbe.ref))))

def replayControllingActor(snr: Option[Long]): ActorRef = {
system.actorOf(Props(new TestEventsourcedView(logProbe.ref, msgProbe.ref, None, snr)))
}

def processRecover(actor: ActorRef, instanceId: Int = instanceId): ActorRef = {
logProbe.expectMsg(LoadSnapshot(emitterIdA, instanceId))
logProbe.sender() ! LoadSnapshotSuccess(None, instanceId)
Expand Down Expand Up @@ -269,6 +274,18 @@ class EventsourcedViewSpec extends TestKit(ActorSystem("test")) with WordSpecLik
msgProbe.expectMsg(("b", event1b.vectorTimestamp, event1b.localSequenceNr))
msgProbe.expectMsg(("c", event1c.vectorTimestamp, event1c.localSequenceNr))
}
"replay from application-defined sequence number (not load snapshot)" in {
val actor = replayControllingActor(Some(2L))

logProbe.expectMsg(Replay(2L, Some(actor), instanceId))
logProbe.sender() ! ReplaySuccess(List(event1b, event1c), event1c.localSequenceNr, instanceId)

msgProbe.expectMsg(("b", event1b.vectorTimestamp, event1b.localSequenceNr))
msgProbe.expectMsg(("c", event1c.vectorTimestamp, event1c.localSequenceNr))
}
"not replay from application-defined sequence number (load snapshot)" in {
processRecover(replayControllingActor(None))
}
"stash commands during recovery and handle them after initial recovery" in {
val actor = unrecoveredEventsourcedView()

Expand Down
22 changes: 22 additions & 0 deletions src/sphinx/code/EventSourcingDoc.scala
Expand Up @@ -172,6 +172,28 @@ object LoadSnapshot {
//#
}

object AppDefinedSeqNr {
import akka.actor._
import com.rbmhtechnology.eventuate.EventsourcedActor

//#replay-from-sequence-nr
class CustomRecoveryExampleActor(snr: Long) extends EventsourcedActor {

override def replayFromSequenceNr: Option[Long] = Some(snr)

// ...
//#

override def id: String = ???
override def eventLog: ActorRef = ???
override def onCommand: Receive = ???
override def onEvent: Receive = ???

//#replay-from-sequence-nr
}
//#
}

object BatchReplay {
import akka.actor._
import com.rbmhtechnology.eventuate.EventsourcedActor
Expand Down
7 changes: 7 additions & 0 deletions src/sphinx/reference/event-sourcing.rst
Expand Up @@ -189,6 +189,13 @@ Event-sourced components can override the configured default value by overriding

.. _snapshots:

Recovery using an application-defined log sequence number
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
In order to keep recovery times small it is almost always sensible to recover using snapshots. However, in some very rare cases an event-sourced actor or view can recover quickly using an application-defined log sequence number. If defined, only events with a sequence number equal to or larger than the given sequence number are replayed.

.. includecode:: ../code/EventSourcingDoc.scala
:snippet: replay-from-sequence-nr

Snapshots
---------

Expand Down

0 comments on commit 5760801

Please sign in to comment.