Skip to content

Commit

Permalink
Simplify backoff supervision API #26156
Browse files Browse the repository at this point in the history
  • Loading branch information
johanandren committed Mar 7, 2019
2 parents 4117ce4 + 35207d5 commit c01e6b6
Show file tree
Hide file tree
Showing 15 changed files with 1,180 additions and 622 deletions.
9 changes: 9 additions & 0 deletions akka-actor/src/main/mima-filters/2.5.21.backwards.excludes
Expand Up @@ -4,3 +4,12 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.AskableActorRef
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.util.ccompat.package.fromCanBuildFrom")
# Add optional field to ApiMayChange annotation #26409
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.annotation.ApiMayChange.issue")

# Simplify backoff supervision API #19016
ProblemFilters.exclude[MissingClassProblem]("akka.pattern.BackoffOnRestartSupervisor")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.internal.BackoffOnRestartSupervisor.replyWhileStopped")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.internal.BackoffOnRestartSupervisor.finalStopMessage")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.internal.BackoffOnRestartSupervisor.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.HandleBackoff.replyWhileStopped")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.HandleBackoff.finalStopMessage")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.pattern.HandleBackoff.handleMessageToChild")
611 changes: 611 additions & 0 deletions akka-actor/src/main/scala/akka/pattern/Backoff.scala

Large diffs are not rendered by default.

502 changes: 136 additions & 366 deletions akka-actor/src/main/scala/akka/pattern/BackoffOptions.scala

Large diffs are not rendered by default.

322 changes: 111 additions & 211 deletions akka-actor/src/main/scala/akka/pattern/BackoffSupervisor.scala

Large diffs are not rendered by default.

69 changes: 69 additions & 0 deletions akka-actor/src/main/scala/akka/pattern/HandleBackoff.scala
@@ -0,0 +1,69 @@
/*
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.pattern

import akka.actor.{ Actor, ActorRef, Props }
import akka.annotation.InternalApi
import akka.pattern.internal.{ BackoffOnRestartSupervisor, BackoffOnStopSupervisor }

/**
* INTERNAL API
*
* Implements basic backoff handling for [[BackoffOnRestartSupervisor]] and [[BackoffOnStopSupervisor]].
*/
@InternalApi private[akka] trait HandleBackoff {
this: Actor
def childProps: Props
def childName: String
def reset: BackoffReset
protected def handleMessageToChild(m: Any): Unit

var child: Option[ActorRef] = None
var restartCount = 0
var finalStopMessageReceived = false

import BackoffSupervisor._
import context.dispatcher

override def preStart(): Unit = startChild()

def startChild(): Unit = if (child.isEmpty) {
child = Some(context.watch(context.actorOf(childProps, childName)))
}

def handleBackoff: Actor.Receive = {
case StartChild
startChild()
reset match {
case AutoReset(resetBackoff)
context.system.scheduler.scheduleOnce(resetBackoff, self, ResetRestartCount(restartCount))
case _ // ignore
}

case Reset
reset match {
case ManualReset restartCount = 0
case msg unhandled(msg)
}

case ResetRestartCount(current)
if (current == restartCount) {
restartCount = 0
}

case GetRestartCount
sender() ! RestartCount(restartCount)

case GetCurrentChild
sender() ! CurrentChild(child)

case msg if child.contains(sender())
// use the BackoffSupervisor as sender
context.parent ! msg

case msg
handleMessageToChild(msg)
}
}
Expand Up @@ -2,34 +2,36 @@
* Copyright (C) 2015-2019 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.pattern
package akka.pattern.internal

import scala.concurrent.duration._

import akka.actor._
import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy._
import akka.actor.{ OneForOneStrategy, _ }
import akka.annotation.InternalApi
import akka.pattern.{ BackoffReset, BackoffSupervisor, HandleBackoff }

import scala.concurrent.duration._

/**
* INTERNAL API
*
* Back-off supervisor that stops and starts a child actor when the child actor restarts.
* This back-off supervisor is created by using ``akka.pattern.BackoffSupervisor.props``
* with ``akka.pattern.Backoff.onFailure``.
* with ``akka.pattern.BackoffOpts.onFailure``.
*/
private class BackoffOnRestartSupervisor(
val childProps: Props,
val childName: String,
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
val reset: BackoffReset,
randomFactor: Double,
strategy: OneForOneStrategy,
val replyWhileStopped: Option[Any],
val finalStopMessage: Option[Any Boolean])
@InternalApi private[pattern] class BackoffOnRestartSupervisor(
val childProps: Props,
val childName: String,
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
val reset: BackoffReset,
randomFactor: Double,
strategy: OneForOneStrategy,
replyWhileStopped: Option[Any])
extends Actor with HandleBackoff
with ActorLogging {

import context._
import BackoffSupervisor._
import context._

override val supervisorStrategy = OneForOneStrategy(strategy.maxNrOfRetries, strategy.withinTimeRange, strategy.loggingEnabled) {
case ex
Expand Down Expand Up @@ -81,6 +83,15 @@ private class BackoffOnRestartSupervisor(
stop(self)
}

def receive = onTerminated orElse handleBackoff
def receive: Receive = onTerminated orElse handleBackoff

protected def handleMessageToChild(msg: Any): Unit = child match {
case Some(c)
c.forward(msg)
case None
replyWhileStopped match {
case None context.system.deadLetters.forward(msg)
case Some(r) sender() ! r
}
}
}
@@ -0,0 +1,93 @@
/*
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.pattern.internal

import akka.actor.SupervisorStrategy.{ Directive, Escalate }
import akka.actor.{ Actor, ActorLogging, OneForOneStrategy, Props, SupervisorStrategy, Terminated }
import akka.annotation.InternalApi
import akka.pattern.{ BackoffReset, BackoffSupervisor, HandleBackoff }

import scala.concurrent.duration.FiniteDuration

/**
* INTERNAL API
*
* Back-off supervisor that stops and starts a child actor using a back-off algorithm when the child actor stops.
* This back-off supervisor is created by using `akka.pattern.BackoffSupervisor.props`
* with `BackoffOpts.onStop`.
*/
@InternalApi private[pattern] class BackoffOnStopSupervisor(
val childProps: Props,
val childName: String,
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
val reset: BackoffReset,
randomFactor: Double,
strategy: SupervisorStrategy,
replyWhileStopped: Option[Any],
finalStopMessage: Option[Any Boolean])
extends Actor with HandleBackoff
with ActorLogging {

import BackoffSupervisor._
import context.dispatcher

override val supervisorStrategy = strategy match {
case oneForOne: OneForOneStrategy
OneForOneStrategy(oneForOne.maxNrOfRetries, oneForOne.withinTimeRange, oneForOne.loggingEnabled) {
case ex
val defaultDirective: Directive =
super.supervisorStrategy.decider.applyOrElse(ex, (_: Any) Escalate)

strategy.decider.applyOrElse(ex, (_: Any) defaultDirective)
}
case s s
}

def onTerminated: Receive = {
case Terminated(ref) if child.contains(ref)
child = None
if (finalStopMessageReceived) {
context.stop(self)
} else {
val maxNrOfRetries = strategy match {
case oneForOne: OneForOneStrategy oneForOne.maxNrOfRetries
case _ -1
}

val nextRestartCount = restartCount + 1

if (maxNrOfRetries == -1 || nextRestartCount <= maxNrOfRetries) {
val restartDelay = calculateDelay(restartCount, minBackoff, maxBackoff, randomFactor)
context.system.scheduler.scheduleOnce(restartDelay, self, StartChild)
restartCount = nextRestartCount
} else {
log.debug(s"Terminating on restart #{} which exceeds max allowed restarts ({})", nextRestartCount, maxNrOfRetries)
context.stop(self)
}
}

}

def receive: Receive = onTerminated orElse handleBackoff

protected def handleMessageToChild(msg: Any): Unit = child match {
case Some(c)
c.forward(msg)
if (!finalStopMessageReceived) finalStopMessage match {
case Some(fsm) finalStopMessageReceived = fsm(msg)
case None
}
case None
replyWhileStopped match {
case Some(r) sender() ! r
case None context.system.deadLetters.forward(msg)
}
finalStopMessage match {
case Some(fsm) if fsm(msg) context.stop(self)
case _
}
}
}
Expand Up @@ -7,7 +7,7 @@ package akka.cluster.sharding
import akka.actor.{ Actor, ActorLogging, ActorRef, PoisonPill, Props }
import akka.cluster.Cluster
import akka.cluster.sharding.ShardRegion.Passivate
import akka.pattern.{ Backoff, BackoffSupervisor }
import akka.pattern.{ Backoff, BackoffOpts, BackoffSupervisor }
import akka.testkit.{ AkkaSpec, ImplicitSender }
import com.typesafe.config.ConfigFactory

Expand Down Expand Up @@ -64,7 +64,7 @@ class SupervisionSpec extends AkkaSpec(SupervisionSpec.config) with ImplicitSend

import SupervisionSpec._

"Supervision for a sharded actor" must {
"Supervision for a sharded actor (deprecated)" must {

"allow passivation" in {

Expand Down Expand Up @@ -99,4 +99,38 @@ class SupervisionSpec extends AkkaSpec(SupervisionSpec.config) with ImplicitSend
}
}

"Supervision for a sharded actor" must {

"allow passivation" in {

val supervisedProps = BackoffSupervisor.props(BackoffOpts.onStop(
Props(new PassivatingActor()),
childName = "child",
minBackoff = 1.seconds,
maxBackoff = 30.seconds,
randomFactor = 0.2
).withFinalStopMessage(_ == StopMessage))

Cluster(system).join(Cluster(system).selfAddress)
val region = ClusterSharding(system).start(
"passy",
supervisedProps,
ClusterShardingSettings(system),
idExtractor,
shardResolver
)

region ! Msg(10, "hello")
val response = expectMsgType[Response](5.seconds)
watch(response.self)

region ! Msg(10, "passivate")
expectTerminated(response.self)

// This would fail before as sharded actor would be stuck passivating
region ! Msg(10, "hello")
expectMsgType[Response](20.seconds)
}
}

}
2 changes: 2 additions & 0 deletions akka-docs/src/main/paradox/cluster-sharding.md
Expand Up @@ -413,6 +413,8 @@ Java

Note that stopped entities will be started again when a new message is targeted to the entity.

If 'on stop' backoff supervision strategy is used, a final termination message must be set and used for passivation, see @ref:[Supervision](general/supervision.md#Sharding)

## Graceful Shutdown

You can send the @scala[`ShardRegion.GracefulShutdown`] @java[`ShardRegion.gracefulShutdownInstance`] message
Expand Down
36 changes: 35 additions & 1 deletion akka-docs/src/main/paradox/general/supervision.md
Expand Up @@ -207,6 +207,26 @@ to recover before the persistent actor is started.

> <a id="1" href="#^1">[1]</a> A failure can be indicated in two different ways; by an actor stopping or crashing.
<a id="supervision-strategies"></a>
#### Supervision strategies

There are two basic supervision strategies available for backoff:
* 'On failure': The supervisor will restart the supervised actor once it crashes, but terminate if the actor stops normaly (e.g. through `context.stop`)
* 'On stop': The supervisor will restart the supervised actor if it terminates in any way (consider this for `PersistentActor` since they stop on persistence failures instead of crashing)

#### Sharding
If the 'on stop' strategy is used for sharded actors a final termination message should be configured and used to terminate the actor on passivation. Otherwise the supervisor will just restart the actor again.

The termination message is configured with:

@@snip [BackoffSupervisorDocSpec.scala](/akka-docs/src/test/scala/docs/pattern/BackoffSupervisorDocSpec.scala) { #backoff-sharded }

And must be used for passivation:

@@snip [BackoffSupervisorDocSpec.scala](/akka-docs/src/test/scala/docs/pattern/BackoffSupervisorDocSpec.scala) { #backoff-sharded-passivation }

#### Simple backoff

The following Scala snippet shows how to create a backoff supervisor which will start the given echo actor after it has stopped
because of a failure, in increasing intervals of 3, 6, 12, 24 and finally 30 seconds:

Expand Down Expand Up @@ -239,7 +259,21 @@ The above is equivalent to this Java code:

@@snip [BackoffSupervisorDocTest.java](/akka-docs/src/test/java/jdocs/pattern/BackoffSupervisorDocTest.java) { #backoff-fail }

The `akka.pattern.BackoffOptions` can be used to customize the behavior of the back-off supervisor actor, below are some examples:
#### Customization

The `akka.pattern.BackoffOnFailureOptions` and `akka.pattern.BackoffOnRestartOptions` can be used to customize the behavior of the back-off supervisor actor.
Options are:
* `withAutoReset`: The backoff is reset if no failure/stop occurs within the duration. This is the default behaviour with `minBackoff` as default value
* `withManualReset`: The child must send `BackoffSupervisor.Reset` to its backoff supervisor (parent)
* `withSupervisionStrategy`: Sets a custom `OneForOneStrategy` (as each backoff supervisor only has one child). The default strategy uses the `akka.actor.SupervisorStrategy.defaultDecider` which restarts on exceptions.
* `withMaxNrOfRetries`: Sets the maximum number of retries until the supervisor will give up (`-1` is default which means no limit of retries). Note: This is set on the supervision strategy, so setting a different strategy resets the `maxNrOfRetries`.
* `withReplyWhileStopped`: By default all messages received while the child is stopped are forwarded to dead letters. With this set, the supervisor will reply to the sender instead.

Only available on `BackoffOnStopOptions`:
* `withDefaultStoppingStrategy`: Sets a `OneForOneStrategy` with the stopping decider that stops the child on all exceptions.
* `withFinalStopMessage`: Allows to define a predicate to decide on finally stopping the child (and supervisor). Used for passivate sharded actors - see above.

Some examples:

@@snip [BackoffSupervisorDocSpec.scala](/akka-docs/src/test/scala/docs/pattern/BackoffSupervisorDocSpec.scala) { #backoff-custom-stop }

Expand Down
2 changes: 2 additions & 0 deletions akka-docs/src/main/paradox/persistence.md
Expand Up @@ -466,6 +466,8 @@ Scala
Java
: @@snip [LambdaPersistenceDocTest.java](/akka-docs/src/test/java/jdocs/persistence/LambdaPersistenceDocTest.java) { #backoff }

See @ref:[Supervision strategies](general/supervision.md#supervision-strategies) for more details about actor supervision.

If persistence of an event is rejected before it is stored, e.g. due to serialization error,
`onPersistRejected` will be invoked (logging a warning by default), and the actor continues with
next message.
Expand Down
Expand Up @@ -6,6 +6,7 @@

import akka.actor.*;
import akka.pattern.Backoff;
import akka.pattern.BackoffOpts;
import akka.pattern.BackoffSupervisor;
import akka.testkit.TestActors.EchoActor;
// #backoff-imports
Expand All @@ -20,7 +21,7 @@ void exampleStop(ActorSystem system) {

final Props supervisorProps =
BackoffSupervisor.props(
Backoff.onStop(
BackoffOpts.onStop(
childProps,
"myEcho",
Duration.ofSeconds(3),
Expand All @@ -37,7 +38,7 @@ void exampleFailure(ActorSystem system) {

final Props supervisorProps =
BackoffSupervisor.props(
Backoff.onFailure(
BackoffOpts.onFailure(
childProps,
"myEcho",
Duration.ofSeconds(3),
Expand Down

0 comments on commit c01e6b6

Please sign in to comment.