Skip to content

Commit

Permalink
Merge pull request #18198 from akka/wip-15816-circuit-breaker-patriknw
Browse files Browse the repository at this point in the history
+per #15816 Use CircuitBreaker
  • Loading branch information
ktoso committed Aug 12, 2015
2 parents 9993e03 + 39c2d6d commit 5c5decf
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 52 deletions.
18 changes: 18 additions & 0 deletions akka-persistence/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ akka.persistence {
}
}
}
# CircuitBreaker setting if not defined in plugin config section named circuit-breaker
default-circuit-breaker {
max-failures = 10
call-timeout = 10s
reset-timeout = 30s
}
}

# Protobuf serialization for the persistent extension messages.
Expand Down Expand Up @@ -124,6 +130,12 @@ akka.persistence.snapshot-store.local {
# yet older snapshot files are available. Each recovery attempt will try
# to recover using an older than previously failed-on snapshot file (if any are present).
max-load-attempts = 3
# CircuitBreaker settings
circuit-breaker {
max-failures = 5
call-timeout = 20s
reset-timeout = 60s
}
}

# LevelDB journal plugin.
Expand All @@ -143,6 +155,12 @@ akka.persistence.journal.leveldb {
checksum = off
# Native LevelDB (via JNI) or LevelDB Java port.
native = on
# CircuitBreaker settings
circuit-breaker {
max-failures = 10
call-timeout = 10s
reset-timeout = 30s
}
}

# Shared LevelDB journal plugin (for testing only).
Expand Down
32 changes: 11 additions & 21 deletions akka-persistence/src/main/scala/akka/persistence/Persistence.scala
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,7 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
case Some(extensionId)
extensionId(system).adapters
case None
val extensionId = new ExtensionId[PluginHolder] {
override def createExtension(system: ExtendedActorSystem): PluginHolder = {
val plugin = createPlugin(configPath)(journalDispatchSelector)
val adapters = createAdapters(configPath)
PluginHolder(plugin, adapters)
}
}
val extensionId = new PluginHolderExtensionId(configPath)
journalPluginExtensionId.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId))
adaptersFor(journalPluginId) // Recursive invocation.
}
Expand Down Expand Up @@ -226,13 +220,7 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
case Some(extensionId)
extensionId(system).actor
case None
val extensionId = new ExtensionId[PluginHolder] {
override def createExtension(system: ExtendedActorSystem): PluginHolder = {
val plugin = createPlugin(configPath)(journalDispatchSelector)
val adapters = createAdapters(configPath)
PluginHolder(plugin, adapters)
}
}
val extensionId = new PluginHolderExtensionId(configPath)
journalPluginExtensionId.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId))
journalFor(journalPluginId) // Recursive invocation.
}
Expand All @@ -251,13 +239,7 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
case Some(extensionId)
extensionId(system).actor
case None
val extensionId = new ExtensionId[PluginHolder] {
override def createExtension(system: ExtendedActorSystem): PluginHolder = {
val plugin = createPlugin(configPath)(snapshotDispatchSelector)
val adapters = createAdapters(configPath)
PluginHolder(plugin, adapters)
}
}
val extensionId = new PluginHolderExtensionId(configPath)
snapshotPluginExtensionId.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId))
snapshotStoreFor(snapshotPluginId) // Recursive invocation.
}
Expand Down Expand Up @@ -288,4 +270,12 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {

private def id(ref: ActorRef) = ref.path.toStringWithoutAddress

private class PluginHolderExtensionId(configPath: String) extends ExtensionId[PluginHolder] {
override def createExtension(system: ExtendedActorSystem): PluginHolder = {
val plugin = createPlugin(configPath)(journalDispatchSelector)
val adapters = createAdapters(configPath)
PluginHolder(plugin, adapters)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ trait AsyncRecovery {
* The `toSequenceNr` is the lowest of what was returned by [[#asyncReadHighestSequenceNr]]
* and what the user specified as recovery [[akka.persistence.Recovery]] parameter.
*
* This call is NOT protected with a circuit-breaker because it may take long time
* to replay all events. The plugin implementation itself must protect against
* an unresponsive backend store and make sure that the returned Future is
* completed with success or failure within reasonable time. It is not allowed
* to ignore completing the future.
*
* @param persistenceId persistent actor id.
* @param fromSequenceNr sequence number where replay should start (inclusive).
* @param toSequenceNr sequence number where replay should end (inclusive).
Expand All @@ -46,6 +52,8 @@ trait AsyncRecovery {
* This sequence number is also used as `toSequenceNr` in subsequent call
* to [[#asyncReplayMessages]] unless the user has specified a lower `toSequenceNr`.
*
* This call is protected with a circuit-breaker.
*
* @param persistenceId persistent actor id.
* @param fromSequenceNr hint where to start searching for the highest sequence
* number.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@

package akka.persistence.journal

import scala.concurrent.duration._
import akka.actor._
import akka.pattern.pipe
import akka.persistence._

import scala.collection.immutable
import scala.concurrent.Future
import scala.util.{ Failure, Success, Try }
import scala.util.control.NonFatal
import akka.pattern.CircuitBreaker

/**
* Abstract journal, optimized for asynchronous, non-blocking writes.
Expand All @@ -28,6 +29,19 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
private val resequencer = context.actorOf(Props[Resequencer]())
private var resequencerCounter = 1L

private val breaker = {
val cfg = context.system.settings.config
val cbConfig =
if (cfg.hasPath(self.path.name + ".circuit-breaker"))
cfg.getConfig(self.path.name + ".circuit-breaker")
.withFallback(cfg.getConfig("akka.persistence.default-circuit-breaker"))
else cfg.getConfig("akka.persistence.default-circuit-breaker")
val maxFailures = cbConfig.getInt("max-failures")
val callTimeout = cbConfig.getDuration("call-timeout", MILLISECONDS).millis
val resetTimeout = cbConfig.getDuration("reset-timeout", MILLISECONDS).millis
CircuitBreaker(context.system.scheduler, maxFailures, callTimeout, resetTimeout)
}

final def receive = receiveWriteJournal.orElse[Any, Unit](receivePluginInternal)

final val receiveWriteJournal: Actor.Receive = {
Expand All @@ -39,8 +53,9 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
val prepared = Try(preparePersistentBatch(messages))
val writeResult = (prepared match {
case Success(prep)
// in case the asyncWriteMessages throws
try asyncWriteMessages(prep) catch { case NonFatal(e) Future.failed(e) }
// try in case the asyncWriteMessages throws
try breaker.withCircuitBreaker(asyncWriteMessages(prep))
catch { case NonFatal(e) Future.failed(e) }
case f @ Failure(_)
// exception from preparePersistentBatch => rejected
Future.successful(messages.collect { case a: AtomicWrite f })
Expand Down Expand Up @@ -96,30 +111,32 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {

case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId, persistentActor)

asyncReadHighestSequenceNr(persistenceId, fromSequenceNr).flatMap { highSeqNr
val toSeqNr = math.min(toSequenceNr, highSeqNr)
if (highSeqNr == 0L || fromSequenceNr > toSeqNr)
Future.successful(highSeqNr)
else {
// Send replayed messages and replay result to persistentActor directly. No need
// to resequence replayed messages relative to written and looped messages.
asyncReplayMessages(persistenceId, fromSequenceNr, toSeqNr, max) { p
if (!p.deleted) // old records from 2.3 may still have the deleted flag
adaptFromJournal(p).foreach { adaptedPersistentRepr
persistentActor.tell(ReplayedMessage(adaptedPersistentRepr), Actor.noSender)
}
}.map(_ highSeqNr)
breaker.withCircuitBreaker(asyncReadHighestSequenceNr(persistenceId, fromSequenceNr))
.flatMap { highSeqNr
val toSeqNr = math.min(toSequenceNr, highSeqNr)
if (highSeqNr == 0L || fromSequenceNr > toSeqNr)
Future.successful(highSeqNr)
else {
// Send replayed messages and replay result to persistentActor directly. No need
// to resequence replayed messages relative to written and looped messages.
// not possible to use circuit breaker here
asyncReplayMessages(persistenceId, fromSequenceNr, toSeqNr, max) { p
if (!p.deleted) // old records from 2.3 may still have the deleted flag
adaptFromJournal(p).foreach { adaptedPersistentRepr
persistentActor.tell(ReplayedMessage(adaptedPersistentRepr), Actor.noSender)
}
}.map(_ highSeqNr)
}
}.map {
highSeqNr RecoverySuccess(highSeqNr)
}.recover {
case e ReplayMessagesFailure(e)
}.pipeTo(persistentActor).onSuccess {
case _ if publish context.system.eventStream.publish(r)
}
}.map {
highSeqNr RecoverySuccess(highSeqNr)
}.recover {
case e ReplayMessagesFailure(e)
}.pipeTo(persistentActor).onSuccess {
case _ if publish context.system.eventStream.publish(r)
}

case d @ DeleteMessagesTo(persistenceId, toSequenceNr, persistentActor)
asyncDeleteMessagesTo(persistenceId, toSequenceNr) map {
breaker.withCircuitBreaker(asyncDeleteMessagesTo(persistenceId, toSequenceNr)) map {
case _ DeleteMessagesSuccess(toSequenceNr)
} recover {
case e DeleteMessagesFailure(e, toSequenceNr)
Expand Down Expand Up @@ -173,12 +190,16 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
*
* It is possible but not mandatory to reduce number of allocations by returning
* `Future.successful(Nil)` for the happy path, i.e. when no messages are rejected.
*
* This call is protected with a circuit-breaker.
*/
def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]]

/**
* Plugin API: asynchronously deletes all persistent messages up to `toSequenceNr`
* (inclusive).
*
* This call is protected with a circuit-breaker.
*/
def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit]

Expand All @@ -187,6 +208,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
*
* Allows plugin implementers to use `f pipeTo self` and
* handle additional messages for implementing advanced features
*
*/
def receivePluginInternal: Actor.Receive = Actor.emptyBehavior
//#journal-plugin-api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@

package akka.persistence.snapshot

import scala.concurrent.duration._
import scala.concurrent.Future

import akka.actor._
import akka.pattern.pipe
import akka.persistence._
import akka.pattern.CircuitBreaker

/**
* Abstract snapshot store.
Expand All @@ -21,19 +22,32 @@ trait SnapshotStore extends Actor with ActorLogging {
private val extension = Persistence(context.system)
private val publish = extension.settings.internal.publishPluginCommands

private val breaker = {
val cfg = context.system.settings.config
val cbConfig =
if (cfg.hasPath(self.path.name + ".circuit-breaker"))
cfg.getConfig(self.path.name + ".circuit-breaker")
.withFallback(cfg.getConfig("akka.persistence.default-circuit-breaker"))
else cfg.getConfig("akka.persistence.default-circuit-breaker")
val maxFailures = cbConfig.getInt("max-failures")
val callTimeout = cbConfig.getDuration("call-timeout", MILLISECONDS).millis
val resetTimeout = cbConfig.getDuration("reset-timeout", MILLISECONDS).millis
CircuitBreaker(context.system.scheduler, maxFailures, callTimeout, resetTimeout)
}

final def receive = receiveSnapshotStore.orElse[Any, Unit](receivePluginInternal)

final val receiveSnapshotStore: Actor.Receive = {
case LoadSnapshot(persistenceId, criteria, toSequenceNr)
loadAsync(persistenceId, criteria.limit(toSequenceNr)) map {
breaker.withCircuitBreaker(loadAsync(persistenceId, criteria.limit(toSequenceNr))) map {
sso LoadSnapshotResult(sso, toSequenceNr)
} recover {
case e LoadSnapshotResult(None, toSequenceNr)
} pipeTo senderPersistentActor()

case SaveSnapshot(metadata, snapshot)
val md = metadata.copy(timestamp = System.currentTimeMillis)
saveAsync(md, snapshot) map {
breaker.withCircuitBreaker(saveAsync(md, snapshot)) map {
_ SaveSnapshotSuccess(md)
} recover {
case e SaveSnapshotFailure(metadata, e)
Expand All @@ -44,11 +58,11 @@ trait SnapshotStore extends Actor with ActorLogging {
case evt @ SaveSnapshotFailure(metadata, _)
try {
tryReceivePluginInternal(evt)
deleteAsync(metadata)
breaker.withCircuitBreaker(deleteAsync(metadata))
} finally senderPersistentActor() ! evt // sender is persistentActor

case d @ DeleteSnapshot(metadata)
deleteAsync(metadata).map {
breaker.withCircuitBreaker(deleteAsync(metadata)).map {
case _ DeleteSnapshotSuccess(metadata)
}.recover {
case e DeleteSnapshotFailure(metadata, e)
Expand All @@ -62,7 +76,7 @@ trait SnapshotStore extends Actor with ActorLogging {
try tryReceivePluginInternal(evt) finally senderPersistentActor() ! evt

case d @ DeleteSnapshots(persistenceId, criteria)
deleteAsync(persistenceId, criteria).map {
breaker.withCircuitBreaker(deleteAsync(persistenceId, criteria)).map {
case _ DeleteSnapshotsSuccess(criteria)
}.recover {
case e DeleteSnapshotsFailure(criteria, e)
Expand All @@ -87,6 +101,8 @@ trait SnapshotStore extends Actor with ActorLogging {
/**
* Plugin API: asynchronously loads a snapshot.
*
* This call is protected with a circuit-breaker.
*
* @param persistenceId id of the persistent actor.
* @param criteria selection criteria for loading.
*/
Expand All @@ -95,6 +111,8 @@ trait SnapshotStore extends Actor with ActorLogging {
/**
* Plugin API: asynchronously saves a snapshot.
*
* This call is protected with a circuit-breaker.
*
* @param metadata snapshot metadata.
* @param snapshot snapshot.
*/
Expand All @@ -103,14 +121,17 @@ trait SnapshotStore extends Actor with ActorLogging {
/**
* Plugin API: deletes the snapshot identified by `metadata`.
*
* This call is protected with a circuit-breaker.
*
* @param metadata snapshot metadata.
*/

def deleteAsync(metadata: SnapshotMetadata): Future[Unit]

/**
* Plugin API: deletes all snapshots matching `criteria`.
*
* This call is protected with a circuit-breaker.
*
* @param persistenceId id of the persistent actor.
* @param criteria selection criteria for deleting.
*/
Expand Down

0 comments on commit 5c5decf

Please sign in to comment.