/
BackoffOnRestartSupervisor.scala
83 lines (71 loc) · 3.23 KB
/
BackoffOnRestartSupervisor.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
/**
* Copyright (C) 2015-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.pattern
import scala.concurrent.duration._
import akka.actor._
import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy._
/**
* Back-off supervisor that stops and starts a child actor when the child actor restarts.
* This back-off supervisor is created by using ``akka.pattern.BackoffSupervisor.props``
* with ``akka.pattern.Backoff.onFailure``.
*/
private class BackoffOnRestartSupervisor(
val childProps: Props,
val childName: String,
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
val reset: BackoffReset,
randomFactor: Double,
strategy: OneForOneStrategy,
val replyWhileStopped: Option[Any])
extends Actor with HandleBackoff
with ActorLogging {
import context._
import BackoffSupervisor._
override val supervisorStrategy = OneForOneStrategy(strategy.maxNrOfRetries, strategy.withinTimeRange, strategy.loggingEnabled) {
case ex ⇒
val defaultDirective: Directive =
super.supervisorStrategy.decider.applyOrElse(ex, (_: Any) ⇒ Escalate)
strategy.decider.applyOrElse(ex, (_: Any) ⇒ defaultDirective) match {
// Whatever the final Directive is, we will translate all Restarts
// to our own Restarts, which involves stopping the child.
case Restart ⇒
if (strategy.withinTimeRange.isFinite() && restartCount == 0) {
// If the user has defined a time range for the maxNrOfRetries, we'll schedule a message
// to ourselves every time that range elapses, to reset the restart counter. We hide it
// behind this conditional to avoid queuing the message unnecessarily
val finiteWithinTimeRange = strategy.withinTimeRange.asInstanceOf[FiniteDuration]
system.scheduler.scheduleOnce(finiteWithinTimeRange, self, ResetRestartCount(restartCount))
}
val childRef = sender()
val nextRestartCount = restartCount + 1
if (strategy.maxNrOfRetries >= 0 && nextRestartCount > strategy.maxNrOfRetries) {
// If we've exceeded the maximum # of retries allowed by the Strategy, die.
log.debug(s"Terminating on restart #{} which exceeds max allowed restarts ({})", nextRestartCount, strategy.maxNrOfRetries)
become(receive)
stop(self)
} else {
become(waitChildTerminatedBeforeBackoff(childRef) orElse handleBackoff)
}
Stop
case other ⇒ other
}
}
def waitChildTerminatedBeforeBackoff(childRef: ActorRef): Receive = {
case Terminated(`childRef`) ⇒
become(receive)
child = None
val restartDelay = BackoffSupervisor.calculateDelay(restartCount, minBackoff, maxBackoff, randomFactor)
context.system.scheduler.scheduleOnce(restartDelay, self, BackoffSupervisor.StartChild)
restartCount += 1
case StartChild ⇒ // Ignore it, we will schedule a new one once current child terminated.
}
def onTerminated: Receive = {
case Terminated(child) ⇒
log.debug(s"Terminating, because child [$child] terminated itself")
stop(self)
}
def receive = onTerminated orElse handleBackoff
}