Skip to content

Commit

Permalink
Merge pull request #18776 from henrymai/master
Browse files Browse the repository at this point in the history
Add TransparentExponentialBackoffSupervisor
  • Loading branch information
rkuhn committed Nov 7, 2015
2 parents 1e36e5e + a0e9b01 commit 61c257b
Show file tree
Hide file tree
Showing 3 changed files with 342 additions and 9 deletions.
30 changes: 21 additions & 9 deletions akka-actor/src/main/scala/akka/pattern/BackoffSupervisor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,26 @@ object BackoffSupervisor {

private case object StartChild extends DeadLetterSuppression
private case class ResetRestartCount(current: Int) extends DeadLetterSuppression

/**
* INTERNAL API
*
* Calculates an exponential back off delay.
*/
private[akka] def calculateDelay(
restartCount: Int,
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double): FiniteDuration = {
val rnd = 1.0 + ThreadLocalRandom.current().nextDouble() * randomFactor
if (restartCount >= 30) // Duration overflow protection (> 100 years)
maxBackoff
else
maxBackoff.min(minBackoff * math.pow(2, restartCount)) * rnd match {
case f: FiniteDuration f
case _ maxBackoff
}
}
}

/**
Expand Down Expand Up @@ -121,15 +141,7 @@ final class BackoffSupervisor(
def receive = {
case Terminated(ref) if child.contains(ref)
child = None
val rnd = 1.0 + ThreadLocalRandom.current().nextDouble() * randomFactor
val restartDelay =
if (restartCount >= 30) // Duration overflow protection (> 100 years)
maxBackoff
else
maxBackoff.min(minBackoff * math.pow(2, restartCount)) * rnd match {
case f: FiniteDuration f
case _ maxBackoff
}
val restartDelay = calculateDelay(restartCount, minBackoff, maxBackoff, randomFactor)
context.system.scheduler.scheduleOnce(restartDelay, self, StartChild)
restartCount += 1

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/

package akka.contrib.pattern

import akka.actor._
import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy._
import scala.concurrent.duration._

object TransparentExponentialBackoffSupervisor {
private case class ScheduleRestart(childRef: ActorRef) extends DeadLetterSuppression
private case object StartChild extends DeadLetterSuppression
private case class ResetRestartCount(lastNumRestarts: Int) extends DeadLetterSuppression

/**
* Props for creating a [[TransparentExponentialBackoffSupervisor]] with a decider.
*
* @param childProps the [[akka.actor.Props]] of the child to be supervised.
* @param childName the name of the child actor.
* @param minBackoff the min time before the child is restarted.
* @param maxBackoff the max time (upperbound) for a child restart.
* @param randomFactor a random delay factor to add on top of the calculated exponential
* back off.
* The calculation is equivalent to:
* {{{
* final_delay = min(
* maxBackoff,
* (random_delay_factor * calculated_backoff) + calculated_backoff)
* }}}
* @param decider an [[akka.actor.SupervisorStrategy.Decider]] to specify how the supervisor
* should behave for different exceptions. If no cases are matched, the default decider of
* [[akka.actor.Actor]] is used. When the [[akka.actor.SupervisorStrategy.Restart]] directive
* is returned by the decider, this supervisor will apply an exponential back off restart.
*/
def propsWithDecider(
childProps: Props,
childName: String,
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double)(decider: Decider): Props = {
Props(
new TransparentExponentialBackoffSupervisor(
childProps,
childName,
Some(decider),
minBackoff,
maxBackoff,
randomFactor))
}

/**
* Props for creating a [[TransparentExponentialBackoffSupervisor]] using the
* default [[akka.actor.Actor]] decider.
*
* @param childProps the [[akka.actor.Props]] of the child to be supervised.
* @param childName the name of the child actor.
* @param minBackoff the min time before the child is restarted.
* @param maxBackoff the max time (upperbound) for a child restart.
* @param randomFactor a random delay factor to add on top of the calculated exponential
* back off.
* The calculation is equivalent to:
* {{{
* final_delay = min(
* maxBackoff,
* (random_delay_factor * calculated_backoff) + calculated_backoff)
* }}}
*/
def props(
childProps: Props,
childName: String,
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double): Props = {
Props(
new TransparentExponentialBackoffSupervisor(
childProps,
childName,
None,
minBackoff,
maxBackoff,
randomFactor))
}
}

/**
* A supervising actor that restarts a child actor with an exponential back off.
*
* This explicit supervisor behaves similarly to the normal implicit supervision where
* if an actor throws an exception, the decider on the supervisor will decide when to
* `Stop`, `Restart`, `Escalate`, `Resume` the child actor.
*
* When the `Restart` directive is specified, the supervisor will delay the restart
* using an exponential back off strategy (bounded by minBackoff and maxBackoff).
*
* This supervisor is intended to be transparent to both the child actor and external actors.
* Where external actors can send messages to the supervisor as if it was the child and the
* messages will be forwarded. And when the child is `Terminated`, the supervisor is also
* `Terminated`.
* Transparent to the child means that the child does not have to be aware that it is being
* supervised specifically by the [[TransparentExponentialBackoffSupervisor]]. Just like it does
* not need to know when it is being supervised by the usual implicit supervisors.
* The only caveat is that the `ActorRef` of the child is not stable, so any user storing the
* `sender()` `ActorRef` from the child response may eventually not be able to communicate with
* the stored `ActorRef`. In general all messages to the child should be directed through the
* [[TransparentExponentialBackoffSupervisor]].
*
* An example of where this supervisor might be used is when you may have an actor that is
* responsible for continuously polling on a server for some resource that sometimes may be down.
* Instead of hammering the server continuously when the resource is unavailable, the actor will
* be restarted with an exponentially increasing back off until the resource is available again.
*
* '''***
* This supervisor should not be used with `Akka Persistence` child actors.
* `Akka Persistence` actors, currently, shutdown unconditionally on `persistFailure()`s rather
* than throw an exception on a failure like normal actors.
* [[akka.pattern.BackoffSupervisor]] should be used instead for cases where the child actor
* terminates itself as a failure signal instead of the normal behavior of throwing an exception.
* ***'''
*/
class TransparentExponentialBackoffSupervisor(
props: Props,
childName: String,
decider: Option[Decider],
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double)
extends Actor
with Stash
with ActorLogging {

import TransparentExponentialBackoffSupervisor._
import context._

override val supervisorStrategy = OneForOneStrategy() {
case ex
val defaultDirective: Directive =
super.supervisorStrategy.decider.applyOrElse(ex, (_: Any) Escalate)
val maybeDirective: Option[Directive] = decider
.map(_.applyOrElse(ex, (_: Any) defaultDirective))

// Get the directive from the specified decider or fallback to
// the default decider.
// Whatever the final Directive is, we will translate all Restarts
// to our own Restarts, which involves stopping the child.
maybeDirective.getOrElse(defaultDirective) match {
case Restart
val childRef = sender
become({
case Terminated(`childRef`)
unbecome()
self ! ScheduleRestart(childRef)
case _
stash()
}, discardOld = false)
Stop
case other other
}
}

// Initialize by starting up and watching the child
self ! StartChild

def receive = waitingToStart(-1, false)

def waitingToStart(numRestarts: Int, scheduleCounterReset: Boolean): Receive = {
case StartChild
val childRef = actorOf(props, childName)
watch(childRef)
unstashAll()
if (scheduleCounterReset) {
system.scheduler.scheduleOnce(minBackoff, self, ResetRestartCount(numRestarts + 1))
}
become(watching(childRef, numRestarts + 1))
case _ stash()
}

// Steady state
def watching(childRef: ActorRef, numRestarts: Int): Receive = {
case ScheduleRestart(`childRef`)
val delay = akka.pattern.BackoffSupervisor.calculateDelay(
numRestarts, minBackoff, maxBackoff, randomFactor)
system.scheduler.scheduleOnce(delay, self, StartChild)
become(waitingToStart(numRestarts, true))
log.info(s"Restarting child in: $delay; numRestarts: $numRestarts")
case ResetRestartCount(last)
if (last == numRestarts) {
log.debug(s"Last restart count [$last] matches current count; resetting")
become(watching(childRef, 0))
} else {
log.debug(s"Last restart count [$last] does not match the current count [$numRestarts]")
}
case Terminated(`childRef`)
log.debug(s"Terminating, because child [$childRef] terminated itself")
stop(self)
case msg if sender() == childRef
parent.forward(msg)
case msg
childRef.forward(msg)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/

package akka.contrib.pattern

import akka.testkit.AkkaSpec
import akka.testkit.TestProbe
import scala.concurrent.Future
import scala.concurrent.duration._
import akka.actor._
import scala.language.postfixOps

object TestActor {
class StoppingException extends Exception("stopping exception")
def props(probe: ActorRef): Props = Props(new TestActor(probe))
}

class TestActor(probe: ActorRef) extends Actor {
import context.dispatcher

probe ! "STARTED"

def receive = {
case "DIE" context.stop(self)
case "THROW" throw new Exception("normal exception")
case "THROW_STOPPING_EXCEPTION" throw new TestActor.StoppingException
case ("TO_PARENT", msg) context.parent ! msg
case other probe ! other
}
}

object TestParentActor {
def props(probe: ActorRef, supervisorProps: Props): Props =
Props(new TestParentActor(probe, supervisorProps))
}
class TestParentActor(probe: ActorRef, supervisorProps: Props) extends Actor {
val supervisor = context.actorOf(supervisorProps)

def receive = {
case other probe.forward(other)
}
}

class TransparentExponentialBackoffSupervisorSpec extends AkkaSpec {

def supervisorProps(probeRef: ActorRef) = TransparentExponentialBackoffSupervisor.propsWithDecider(
TestActor.props(probeRef),
"someChildName",
200 millis,
10 seconds,
0.0) {
case _: TestActor.StoppingException SupervisorStrategy.Stop
}

trait Setup {
val probe = TestProbe()
val supervisor = system.actorOf(supervisorProps(probe.ref))
probe.expectMsg("STARTED")
}

trait Setup2 {
val probe = TestProbe()
val parent = system.actorOf(TestParentActor.props(probe.ref, supervisorProps(probe.ref)))
probe.expectMsg("STARTED")
val child = probe.lastSender
}

"TransparentExponentialBackoffSupervisor" must {
"forward messages to child" in new Setup {
supervisor ! "some message"
probe.expectMsg("some message")
}

"terminate when child terminates" in new Setup {
probe.watch(supervisor)
supervisor ! "DIE"
probe.expectTerminated(supervisor)
}

"restart the child with an exponential back off" in new Setup {
// Exponential back off restart test
probe.within(1.4 seconds, 2 seconds) {
supervisor ! "THROW"
// numRestart = 0 ~ 200 millis
probe.expectMsg(300 millis, "STARTED")

supervisor ! "THROW"
// numRestart = 1 ~ 400 millis
probe.expectMsg(500 millis, "STARTED")

supervisor ! "THROW"
// numRestart = 2 ~ 800 millis
probe.expectMsg(900 millis, "STARTED")
}

// Verify that we only have one child at this point by selecting all the children
// under the supervisor and broadcasting to them.
// If there exists more than one child, we will get more than one reply.
val supervisorChildSelection = system.actorSelection(supervisor.path / "*")
supervisorChildSelection.tell("testmsg", probe.ref)
probe.expectMsg("testmsg")
probe.expectNoMsg
}

"stop on exceptions as dictated by the decider" in new Setup {
probe.watch(supervisor)
// This should cause the supervisor to stop the child actor and then
// subsequently stop itself.
supervisor ! "THROW_STOPPING_EXCEPTION"
probe.expectTerminated(supervisor)
}

"forward messages from the child to the parent of the supervisor" in new Setup2 {
child ! (("TO_PARENT", "TEST_MESSAGE"))
probe.expectMsg("TEST_MESSAGE")
}
}
}

0 comments on commit 61c257b

Please sign in to comment.