Skip to content

Commit

Permalink
Merge pull request #19700 from akka/wip-19694-journal-protocol-RK
Browse files Browse the repository at this point in the history
prohibit concurrent write request from same persistenceId #19694
  • Loading branch information
rkuhn committed Feb 10, 2016
2 parents b19b940 + edc92eb commit cf81bdc
Show file tree
Hide file tree
Showing 10 changed files with 331 additions and 73 deletions.
16 changes: 7 additions & 9 deletions akka-docs/rst/java/persistence.rst
Expand Up @@ -330,15 +330,13 @@ command, i.e. ``onPersistRejected`` is called with an exception (typically ``Uns
Batch writes
------------

In order to optimize throughput a persistent actor internally batches events to be stored under high load before
writing them to the journal (as a single batch). The batch size dynamically grows from 1 under low and moderate loads
to a configurable maximum size (default is ``200``) under high load. When using ``persistAsync`` this increases
the maximum throughput dramatically.

.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#max-message-batch-size

A new batch write is triggered by a persistent actor as soon as a batch reaches the maximum size or if the journal completed
writing the previous batch. Batch writes are never timer-based which keeps latencies at a minimum.
In order to optimize throughput when using ``persistAsync``, a persistent actor
internally batches events to be stored under high load before writing them to
the journal (as a single batch). The batch size is dynamically determined by
how many events are emitted during the time of a journal round-trip: after
sending a batch to the journal no further batch can be sent before confirmation
has been received that the previous batch has been written. Batch writes are never
timer-based which keeps latencies at a minimum.

Message deletion
----------------
Expand Down
16 changes: 7 additions & 9 deletions akka-docs/rst/scala/persistence.rst
Expand Up @@ -317,15 +317,13 @@ command, i.e. ``onPersistRejected`` is called with an exception (typically ``Uns
Batch writes
------------

In order to optimize throughput, a persistent actor internally batches events to be stored under high load before
writing them to the journal (as a single batch). The batch size dynamically grows from 1 under low and moderate loads
to a configurable maximum size (default is ``200``) under high load. When using ``persistAsync`` this increases
the maximum throughput dramatically.

.. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#max-message-batch-size

A new batch write is triggered by a persistent actor as soon as a batch reaches the maximum size or if the journal completed
writing the previous batch. Batch writes are never timer-based which keeps latencies at a minimum.
In order to optimize throughput when using ``persistAsync``, a persistent actor
internally batches events to be stored under high load before writing them to
the journal (as a single batch). The batch size is dynamically determined by
how many events are emitted during the time of a journal round-trip: after
sending a batch to the journal no further batch can be sent before confirmation
has been received that the previous batch has been written. Batch writes are never
timer-based which keeps latencies at a minimum.

Message deletion
----------------
Expand Down
Expand Up @@ -46,9 +46,17 @@ abstract class JournalSpec(config: Config) extends PluginSpec(config) with MayVe
super.beforeEach()
senderProbe = TestProbe()
receiverProbe = TestProbe()
preparePersistenceId(pid)
writeMessages(1, 5, pid, senderProbe.ref, writerUuid)
}

/**
* Overridable hook that is called before populating the journal for the next
* test case. `pid` is the `persistenceId` that will be used in the test.
* This method may be needed to clean pre-existing events from the log.
*/
def preparePersistenceId(pid: String): Unit = ()

/**
* Implementation may override and return false if it does not
* support atomic writes of several events, as emitted by `persistAll`.
Expand Down
4 changes: 3 additions & 1 deletion akka-persistence/src/main/resources/reference.conf
Expand Up @@ -105,7 +105,9 @@ akka.persistence {
# Dispatcher for message replay.
replay-dispatcher = "akka.persistence.dispatchers.default-replay-dispatcher"

# Maximum size of a persistent message batch written to the journal.
# Removed: used to be the Maximum size of a persistent message batch written to the journal.
# Now this setting is without function, PersistentActor will write as many messages
# as it has accumulated since the last write.
max-message-batch-size = 200

circuit-breaker {
Expand Down
67 changes: 25 additions & 42 deletions akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala
Expand Up @@ -51,6 +51,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
private val writerUuid = UUID.randomUUID.toString

private var journalBatch = Vector.empty[PersistentEnvelope]
// no longer used, but kept for binary compatibility
private val maxMessageBatchSize = extension.journalConfigFor(journalPluginId).getInt("max-message-batch-size")
private var writeInProgress = false
private var sequenceNr: Long = 0L
Expand Down Expand Up @@ -232,11 +233,12 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
sequenceNr
}

private def flushJournalBatch(): Unit = {
journal ! WriteMessages(journalBatch, self, instanceId)
journalBatch = Vector.empty
writeInProgress = true
}
private def flushJournalBatch(): Unit =
if (!writeInProgress) {
journal ! WriteMessages(journalBatch, self, instanceId)
journalBatch = Vector.empty
writeInProgress = true
}

private def log: LoggingAdapter = Logging(context.system, this)

Expand Down Expand Up @@ -291,7 +293,8 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
def persist[A](event: A)(handler: A Unit): Unit = {
pendingStashingPersistInvocations += 1
pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any Unit])
eventBatch = AtomicWrite(PersistentRepr(event, sender = sender())) :: eventBatch
eventBatch ::= AtomicWrite(PersistentRepr(event, persistenceId = persistenceId,
sequenceNr = nextSequenceNr(), writerUuid = writerUuid, sender = sender()))
}

/**
Expand All @@ -308,7 +311,8 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
pendingStashingPersistInvocations += 1
pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any Unit])
}
eventBatch = AtomicWrite(events.map(PersistentRepr.apply(_, sender = sender()))) :: eventBatch
eventBatch ::= AtomicWrite(events.map(PersistentRepr.apply(_, persistenceId = persistenceId,
sequenceNr = nextSequenceNr(), writerUuid = writerUuid, sender = sender())))
}
}

Expand Down Expand Up @@ -341,7 +345,8 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
*/
def persistAsync[A](event: A)(handler: A Unit): Unit = {
pendingInvocations addLast AsyncHandlerInvocation(event, handler.asInstanceOf[Any Unit])
eventBatch = AtomicWrite(PersistentRepr(event, sender = sender())) :: eventBatch
eventBatch ::= AtomicWrite(PersistentRepr(event, persistenceId = persistenceId,
sequenceNr = nextSequenceNr(), writerUuid = writerUuid, sender = sender()))
}

/**
Expand All @@ -357,7 +362,8 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
events.foreach { event
pendingInvocations addLast AsyncHandlerInvocation(event, handler.asInstanceOf[Any Unit])
}
eventBatch = AtomicWrite(events.map(PersistentRepr.apply(_, sender = sender()))) :: eventBatch
eventBatch ::= AtomicWrite(events.map(PersistentRepr(_, persistenceId = persistenceId,
sequenceNr = nextSequenceNr(), writerUuid = writerUuid, sender = sender())))
}

@deprecated("use persistAllAsync instead", "2.4")
Expand Down Expand Up @@ -501,42 +507,17 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
}

private def flushBatch() {
def addToBatch(p: PersistentEnvelope): Unit = p match {
case a: AtomicWrite
journalBatch :+= a.copy(payload =
a.payload.map(_.update(persistenceId = persistenceId, sequenceNr = nextSequenceNr(), writerUuid = writerUuid)))
case r: PersistentEnvelope
journalBatch :+= r
if (eventBatch.nonEmpty) {
journalBatch ++= eventBatch.reverse
eventBatch = Nil
}

def maxBatchSizeReached: Boolean =
journalBatch.size >= maxMessageBatchSize

// When using only `persistAsync` and `defer` max throughput is increased by using
// batching, but when using `persist` we want to use one atomic WriteMessages
// for the emitted events.
// Flush previously collected events, if any, separately from the `persist` batch
if (pendingStashingPersistInvocations > 0 && journalBatch.nonEmpty)
flushJournalBatch()

eventBatch.reverse.foreach { p
addToBatch(p)
if (!writeInProgress || maxBatchSizeReached) flushJournalBatch()
}

eventBatch = Nil
if (journalBatch.nonEmpty) flushJournalBatch()
}

private def peekApplyHandler(payload: Any): Unit = {
val batchSizeBeforeApply = eventBatch.size
private def peekApplyHandler(payload: Any): Unit =
try pendingInvocations.peek().handler(payload)
finally {
val batchSizeAfterApply = eventBatch.size

if (batchSizeAfterApply > batchSizeBeforeApply)
flushBatch()
}
}
finally flushBatch()

/**
* Common receive handler for processingCommands and persistingEvents
Expand Down Expand Up @@ -573,14 +554,16 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
// while message is in flight, in that case we ignore the call to the handler
if (id == instanceId) {
try {
pendingInvocations.peek().handler(l)
peekApplyHandler(l)
onWriteMessageComplete(err = false)
} catch { case NonFatal(e) onWriteMessageComplete(err = true); throw e }
}
case WriteMessagesSuccessful
if (journalBatch.isEmpty) writeInProgress = false else flushJournalBatch()
writeInProgress = false
if (journalBatch.nonEmpty) flushJournalBatch()

case WriteMessagesFailed(_)
writeInProgress = false
() // it will be stopped by the first WriteMessageFailure message
}

Expand Down
Expand Up @@ -26,6 +26,8 @@ trait AsyncRecovery {
*
* The `toSequenceNr` is the lowest of what was returned by [[#asyncReadHighestSequenceNr]]
* and what the user specified as recovery [[akka.persistence.Recovery]] parameter.
* This does imply that this call is always preceded by reading the highest sequence
* number for the given `persistenceId`.
*
* This call is NOT protected with a circuit-breaker because it may take long time
* to replay all events. The plugin implementation itself must protect against
Expand Down Expand Up @@ -55,6 +57,10 @@ trait AsyncRecovery {
*
* This call is protected with a circuit-breaker.
*
* Please also note that requests for the highest sequence number may be made concurrently
* to writes executing for the same `persistenceId`, in particular it is possible that
* a restarting actor tries to recover before its outstanding writes have completed.
*
* @param persistenceId persistent actor id.
* @param fromSequenceNr hint where to start searching for the highest sequence
* number. When a persistent actor is recovering this
Expand Down
Expand Up @@ -60,7 +60,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
{
case WriteMessages(messages, persistentActor, actorInstanceId)
val cctr = resequencerCounter
resequencerCounter += messages.foldLeft(0)((acc, m) acc + m.size) + 1
resequencerCounter += messages.foldLeft(1)((acc, m) acc + m.size)

val atomicWriteCount = messages.count(_.isInstanceOf[AtomicWrite])
val prepared = Try(preparePersistentBatch(messages))
Expand Down Expand Up @@ -215,11 +215,20 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
* work in asyncronous tasks it is alright that they complete the futures in any order,
* but the actual writes for a specific persistenceId should be serialized to avoid
* issues such as events of a later write are visible to consumers (query side, or replay)
* before the events of an earlier write are visible. This can also be done with
* consistent hashing if it is too fine grained to do it on the persistenceId level.
* Normally a `PersistentActor` will only have one outstanding write request to the journal but
* it may emit several write requests when `persistAsync` is used and the max batch size
* is reached.
* before the events of an earlier write are visible.
* A PersistentActor will not send a new WriteMessages request before the previous one
* has been completed.
*
* Please note that the `sender` field of the contained PersistentRepr objects has been
* nulled out (i.e. set to `ActorRef.noSender`) in order to not use space in the journal
* for a sender reference that will likely be obsolete during replay.
*
* Please also note that requests for the highest sequence number may be made concurrently
* to this call executing for the same `persistenceId`, in particular it is possible that
* a restarting actor tries to recover before its outstanding writes have completed. In
* the latter case it is highly desirable to defer reading the highest sequence number
* until all outstanding writes have completed, otherwise the PersistentActor may reuse
* sequence numbers.
*
* This call is protected with a circuit-breaker.
*/
Expand Down
Expand Up @@ -8,7 +8,7 @@ import java.io.File
import akka.actor._
import akka.persistence.EndToEndEventAdapterSpec.NewA
import akka.persistence.journal.{ EventSeq, EventAdapter }
import akka.testkit.{ ImplicitSender, WatchedByCoroner, AkkaSpec, TestProbe }
import akka.testkit.{ ImplicitSender, WatchedByCoroner, AkkaSpec, TestProbe, EventFilter }
import com.typesafe.config.{ Config, ConfigFactory }
import org.apache.commons.io.FileUtils
import org.scalatest.{ WordSpecLike, Matchers, BeforeAndAfterAll }
Expand Down Expand Up @@ -127,6 +127,7 @@ abstract class EndToEndEventAdapterSpec(journalName: String, journalConfig: Conf
| }
| }
|}
|akka.loggers = ["akka.testkit.TestEventListener"]
""".stripMargin)

val newAdaptersConfig = ConfigFactory.parseString(
Expand Down Expand Up @@ -226,11 +227,13 @@ abstract class EndToEndEventAdapterSpec(journalName: String, journalConfig: Conf
.withoutPath(s"$journalPath.event-adapters.a")
.withoutPath(s"""$journalPath.event-adapter-bindings."${classOf[EndToEndEventAdapterSpec].getCanonicalName}$$A"""")

intercept[IllegalArgumentException] {
withActorSystem("MissingAdapterSystem", journalConfig.withFallback(missingAdapterConfig)) { implicit system2
Persistence(system2).adaptersFor(s"akka.persistence.journal.$journalName").get(classOf[String])
withActorSystem("MissingAdapterSystem", journalConfig.withFallback(missingAdapterConfig)) { implicit system2
EventFilter[ActorInitializationException](occurrences = 1, pattern = ".*undefined event-adapter.*") intercept {
intercept[IllegalArgumentException] {
Persistence(system2).adaptersFor(s"akka.persistence.journal.$journalName").get(classOf[String])
}.getMessage should include("was bound to undefined event-adapter: a (bindings: [a, b], known adapters: b)")
}
}.getMessage should include("was bound to undefined event-adapter: a (bindings: [a, b], known adapters: b)")
}
}
}
}
Expand Down

0 comments on commit cf81bdc

Please sign in to comment.