diff --git a/akka-actor/src/main/scala/akka/pattern/BackoffSupervisor.scala b/akka-actor/src/main/scala/akka/pattern/BackoffSupervisor.scala index 90b17ea725b..97ff8172d18 100644 --- a/akka-actor/src/main/scala/akka/pattern/BackoffSupervisor.scala +++ b/akka-actor/src/main/scala/akka/pattern/BackoffSupervisor.scala @@ -61,6 +61,26 @@ object BackoffSupervisor { private case object StartChild extends DeadLetterSuppression private case class ResetRestartCount(current: Int) extends DeadLetterSuppression + + /** + * INTERNAL API + * + * Calculates an exponential back off delay. + */ + private[akka] def calculateDelay( + restartCount: Int, + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double): FiniteDuration = { + val rnd = 1.0 + ThreadLocalRandom.current().nextDouble() * randomFactor + if (restartCount >= 30) // Duration overflow protection (> 100 years) + maxBackoff + else + maxBackoff.min(minBackoff * math.pow(2, restartCount)) * rnd match { + case f: FiniteDuration ⇒ f + case _ ⇒ maxBackoff + } + } } /** @@ -121,15 +141,7 @@ final class BackoffSupervisor( def receive = { case Terminated(ref) if child.contains(ref) ⇒ child = None - val rnd = 1.0 + ThreadLocalRandom.current().nextDouble() * randomFactor - val restartDelay = - if (restartCount >= 30) // Duration overflow protection (> 100 years) - maxBackoff - else - maxBackoff.min(minBackoff * math.pow(2, restartCount)) * rnd match { - case f: FiniteDuration ⇒ f - case _ ⇒ maxBackoff - } + val restartDelay = calculateDelay(restartCount, minBackoff, maxBackoff, randomFactor) context.system.scheduler.scheduleOnce(restartDelay, self, StartChild) restartCount += 1 diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/TransparentExponentialBackoffSupervisor.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/TransparentExponentialBackoffSupervisor.scala new file mode 100644 index 00000000000..067b0acf2ce --- /dev/null +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/TransparentExponentialBackoffSupervisor.scala @@ -0,0 +1,202 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ + +package akka.contrib.pattern + +import akka.actor._ +import akka.actor.OneForOneStrategy +import akka.actor.SupervisorStrategy._ +import scala.concurrent.duration._ + +object TransparentExponentialBackoffSupervisor { + private case class ScheduleRestart(childRef: ActorRef) extends DeadLetterSuppression + private case object StartChild extends DeadLetterSuppression + private case class ResetRestartCount(lastNumRestarts: Int) extends DeadLetterSuppression + + /** + * Props for creating a [[TransparentExponentialBackoffSupervisor]] with a decider. + * + * @param childProps the [[akka.actor.Props]] of the child to be supervised. + * @param childName the name of the child actor. + * @param minBackoff the min time before the child is restarted. + * @param maxBackoff the max time (upperbound) for a child restart. + * @param randomFactor a random delay factor to add on top of the calculated exponential + * back off. + * The calculation is equivalent to: + * {{{ + * final_delay = min( + * maxBackoff, + * (random_delay_factor * calculated_backoff) + calculated_backoff) + * }}} + * @param decider an [[akka.actor.SupervisorStrategy.Decider]] to specify how the supervisor + * should behave for different exceptions. If no cases are matched, the default decider of + * [[akka.actor.Actor]] is used. When the [[akka.actor.SupervisorStrategy.Restart]] directive + * is returned by the decider, this supervisor will apply an exponential back off restart. + */ + def propsWithDecider( + childProps: Props, + childName: String, + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double)(decider: Decider): Props = { + Props( + new TransparentExponentialBackoffSupervisor( + childProps, + childName, + Some(decider), + minBackoff, + maxBackoff, + randomFactor)) + } + + /** + * Props for creating a [[TransparentExponentialBackoffSupervisor]] using the + * default [[akka.actor.Actor]] decider. + * + * @param childProps the [[akka.actor.Props]] of the child to be supervised. + * @param childName the name of the child actor. + * @param minBackoff the min time before the child is restarted. + * @param maxBackoff the max time (upperbound) for a child restart. + * @param randomFactor a random delay factor to add on top of the calculated exponential + * back off. + * The calculation is equivalent to: + * {{{ + * final_delay = min( + * maxBackoff, + * (random_delay_factor * calculated_backoff) + calculated_backoff) + * }}} + */ + def props( + childProps: Props, + childName: String, + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double): Props = { + Props( + new TransparentExponentialBackoffSupervisor( + childProps, + childName, + None, + minBackoff, + maxBackoff, + randomFactor)) + } +} + +/** + * A supervising actor that restarts a child actor with an exponential back off. + * + * This explicit supervisor behaves similarly to the normal implicit supervision where + * if an actor throws an exception, the decider on the supervisor will decide when to + * `Stop`, `Restart`, `Escalate`, `Resume` the child actor. + * + * When the `Restart` directive is specified, the supervisor will delay the restart + * using an exponential back off strategy (bounded by minBackoff and maxBackoff). + * + * This supervisor is intended to be transparent to both the child actor and external actors. + * Where external actors can send messages to the supervisor as if it was the child and the + * messages will be forwarded. And when the child is `Terminated`, the supervisor is also + * `Terminated`. + * Transparent to the child means that the child does not have to be aware that it is being + * supervised specifically by the [[TransparentExponentialBackoffSupervisor]]. Just like it does + * not need to know when it is being supervised by the usual implicit supervisors. + * The only caveat is that the `ActorRef` of the child is not stable, so any user storing the + * `sender()` `ActorRef` from the child response may eventually not be able to communicate with + * the stored `ActorRef`. In general all messages to the child should be directed through the + * [[TransparentExponentialBackoffSupervisor]]. + * + * An example of where this supervisor might be used is when you may have an actor that is + * responsible for continuously polling on a server for some resource that sometimes may be down. + * Instead of hammering the server continuously when the resource is unavailable, the actor will + * be restarted with an exponentially increasing back off until the resource is available again. + * + * '''*** + * This supervisor should not be used with `Akka Persistence` child actors. + * `Akka Persistence` actors, currently, shutdown unconditionally on `persistFailure()`s rather + * than throw an exception on a failure like normal actors. + * [[akka.pattern.BackoffSupervisor]] should be used instead for cases where the child actor + * terminates itself as a failure signal instead of the normal behavior of throwing an exception. + * ***''' + */ +class TransparentExponentialBackoffSupervisor( + props: Props, + childName: String, + decider: Option[Decider], + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double) + extends Actor + with Stash + with ActorLogging { + + import TransparentExponentialBackoffSupervisor._ + import context._ + + override val supervisorStrategy = OneForOneStrategy() { + case ex ⇒ + val defaultDirective: Directive = + super.supervisorStrategy.decider.applyOrElse(ex, (_: Any) ⇒ Escalate) + val maybeDirective: Option[Directive] = decider + .map(_.applyOrElse(ex, (_: Any) ⇒ defaultDirective)) + + // Get the directive from the specified decider or fallback to + // the default decider. + // Whatever the final Directive is, we will translate all Restarts + // to our own Restarts, which involves stopping the child. + maybeDirective.getOrElse(defaultDirective) match { + case Restart ⇒ + val childRef = sender + become({ + case Terminated(`childRef`) ⇒ + unbecome() + self ! ScheduleRestart(childRef) + case _ ⇒ + stash() + }, discardOld = false) + Stop + case other ⇒ other + } + } + + // Initialize by starting up and watching the child + self ! StartChild + + def receive = waitingToStart(-1, false) + + def waitingToStart(numRestarts: Int, scheduleCounterReset: Boolean): Receive = { + case StartChild ⇒ + val childRef = actorOf(props, childName) + watch(childRef) + unstashAll() + if (scheduleCounterReset) { + system.scheduler.scheduleOnce(minBackoff, self, ResetRestartCount(numRestarts + 1)) + } + become(watching(childRef, numRestarts + 1)) + case _ ⇒ stash() + } + + // Steady state + def watching(childRef: ActorRef, numRestarts: Int): Receive = { + case ScheduleRestart(`childRef`) ⇒ + val delay = akka.pattern.BackoffSupervisor.calculateDelay( + numRestarts, minBackoff, maxBackoff, randomFactor) + system.scheduler.scheduleOnce(delay, self, StartChild) + become(waitingToStart(numRestarts, true)) + log.info(s"Restarting child in: $delay; numRestarts: $numRestarts") + case ResetRestartCount(last) ⇒ + if (last == numRestarts) { + log.debug(s"Last restart count [$last] matches current count; resetting") + become(watching(childRef, 0)) + } else { + log.debug(s"Last restart count [$last] does not match the current count [$numRestarts]") + } + case Terminated(`childRef`) ⇒ + log.debug(s"Terminating, because child [$childRef] terminated itself") + stop(self) + case msg if sender() == childRef ⇒ + parent.forward(msg) + case msg ⇒ + childRef.forward(msg) + } +} diff --git a/akka-contrib/src/test/scala/akka/contrib/pattern/TransparentExponentialBackoffSupervisorSpec.scala b/akka-contrib/src/test/scala/akka/contrib/pattern/TransparentExponentialBackoffSupervisorSpec.scala new file mode 100644 index 00000000000..07441cb66fe --- /dev/null +++ b/akka-contrib/src/test/scala/akka/contrib/pattern/TransparentExponentialBackoffSupervisorSpec.scala @@ -0,0 +1,119 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ + +package akka.contrib.pattern + +import akka.testkit.AkkaSpec +import akka.testkit.TestProbe +import scala.concurrent.Future +import scala.concurrent.duration._ +import akka.actor._ +import scala.language.postfixOps + +object TestActor { + class StoppingException extends Exception("stopping exception") + def props(probe: ActorRef): Props = Props(new TestActor(probe)) +} + +class TestActor(probe: ActorRef) extends Actor { + import context.dispatcher + + probe ! "STARTED" + + def receive = { + case "DIE" ⇒ context.stop(self) + case "THROW" ⇒ throw new Exception("normal exception") + case "THROW_STOPPING_EXCEPTION" ⇒ throw new TestActor.StoppingException + case ("TO_PARENT", msg) ⇒ context.parent ! msg + case other ⇒ probe ! other + } +} + +object TestParentActor { + def props(probe: ActorRef, supervisorProps: Props): Props = + Props(new TestParentActor(probe, supervisorProps)) +} +class TestParentActor(probe: ActorRef, supervisorProps: Props) extends Actor { + val supervisor = context.actorOf(supervisorProps) + + def receive = { + case other ⇒ probe.forward(other) + } +} + +class TransparentExponentialBackoffSupervisorSpec extends AkkaSpec { + + def supervisorProps(probeRef: ActorRef) = TransparentExponentialBackoffSupervisor.propsWithDecider( + TestActor.props(probeRef), + "someChildName", + 200 millis, + 10 seconds, + 0.0) { + case _: TestActor.StoppingException ⇒ SupervisorStrategy.Stop + } + + trait Setup { + val probe = TestProbe() + val supervisor = system.actorOf(supervisorProps(probe.ref)) + probe.expectMsg("STARTED") + } + + trait Setup2 { + val probe = TestProbe() + val parent = system.actorOf(TestParentActor.props(probe.ref, supervisorProps(probe.ref))) + probe.expectMsg("STARTED") + val child = probe.lastSender + } + + "TransparentExponentialBackoffSupervisor" must { + "forward messages to child" in new Setup { + supervisor ! "some message" + probe.expectMsg("some message") + } + + "terminate when child terminates" in new Setup { + probe.watch(supervisor) + supervisor ! "DIE" + probe.expectTerminated(supervisor) + } + + "restart the child with an exponential back off" in new Setup { + // Exponential back off restart test + probe.within(1.4 seconds, 2 seconds) { + supervisor ! "THROW" + // numRestart = 0 ~ 200 millis + probe.expectMsg(300 millis, "STARTED") + + supervisor ! "THROW" + // numRestart = 1 ~ 400 millis + probe.expectMsg(500 millis, "STARTED") + + supervisor ! "THROW" + // numRestart = 2 ~ 800 millis + probe.expectMsg(900 millis, "STARTED") + } + + // Verify that we only have one child at this point by selecting all the children + // under the supervisor and broadcasting to them. + // If there exists more than one child, we will get more than one reply. + val supervisorChildSelection = system.actorSelection(supervisor.path / "*") + supervisorChildSelection.tell("testmsg", probe.ref) + probe.expectMsg("testmsg") + probe.expectNoMsg + } + + "stop on exceptions as dictated by the decider" in new Setup { + probe.watch(supervisor) + // This should cause the supervisor to stop the child actor and then + // subsequently stop itself. + supervisor ! "THROW_STOPPING_EXCEPTION" + probe.expectTerminated(supervisor) + } + + "forward messages from the child to the parent of the supervisor" in new Setup2 { + child ! (("TO_PARENT", "TEST_MESSAGE")) + probe.expectMsg("TEST_MESSAGE") + } + } +}