This repository has been archived by the owner on May 18, 2021. It is now read-only.
/
strategy.go
92 lines (76 loc) · 2.98 KB
/
strategy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package actor
import (
"math/rand"
"time"
"github.com/AsynkronIT/protoactor-go/actor"
"github.com/AsynkronIT/protoactor-go/eventstream"
"github.com/sirupsen/logrus"
)
// NewExponentialBackoffStrategy creates a new Supervisor strategy that restarts a faulting child using an exponential
// back off algorithm when decider returns actor.RestartDirective
func NewExponentialBackoffStrategy(backoffWindow time.Duration, initialBackoff time.Duration, decider actor.DeciderFunc) actor.SupervisorStrategy {
return &exponentialBackoffStrategy{
backoffWindow: backoffWindow,
initialBackoff: initialBackoff,
decider: decider,
log: logrus.WithField("logger", "supervisor_strategy"),
}
}
type exponentialBackoffStrategy struct {
log *logrus.Entry
backoffWindow time.Duration
initialBackoff time.Duration
decider actor.DeciderFunc
}
func publishFailureEvent(child *actor.PID, reason interface{}, directive actor.Directive) {
eventstream.Publish(&actor.SupervisorEvent{
Child: child,
Reason: reason,
Directive: directive,
})
}
func (strategy *exponentialBackoffStrategy) HandleFailure(supervisor actor.Supervisor, child *actor.PID, rs *actor.RestartStatistics, reason interface{}, message interface{}) {
strategy.log.WithFields(logrus.Fields{"child_id": child.Id}).Debug("Handling child actor failure")
directive := strategy.decider(reason)
switch directive {
case actor.ResumeDirective:
//resume the failing child
publishFailureEvent(child, reason, directive)
supervisor.ResumeChildren(child)
case actor.RestartDirective:
//try restart the failing child
strategy.handleRestart(supervisor, child, rs, reason, message)
case actor.StopDirective:
//stop the failing child, no need to involve the crs
publishFailureEvent(child, reason, directive)
supervisor.StopChildren(child)
case actor.EscalateDirective:
//send failure to parent
//supervisor mailbox
//do not log here, log in the parent handling the error
supervisor.EscalateFailure(reason, message)
}
}
func (strategy *exponentialBackoffStrategy) handleRestart(supervisor actor.Supervisor, child *actor.PID, rs *actor.RestartStatistics, reason interface{}, message interface{}) {
strategy.setFailureCount(rs)
backoff := rs.FailureCount * int(strategy.initialBackoff.Nanoseconds())
noise := rand.Intn(500)
dur := time.Duration(backoff + noise)
strategy.log.WithFields(logrus.Fields{"child_id": child.Id}).
WithFields(logrus.Fields{"delay_sec": dur.Seconds()}).
WithFields(logrus.Fields{"failure_count": rs.FailureCount}).
Debug("Scheduling delayed child restart")
time.AfterFunc(dur, func() {
publishFailureEvent(child, reason, actor.RestartDirective)
supervisor.RestartChildren(child)
})
}
func (strategy *exponentialBackoffStrategy) setFailureCount(rs *actor.RestartStatistics) {
rs.Fail()
// if we are within the backoff window, exit early
if rs.IsWithinDuration(strategy.backoffWindow) {
return
}
//we are past the backoff limit, reset the failure counter
rs.Reset()
}