-
Notifications
You must be signed in to change notification settings - Fork 5
/
fault_detector.go
179 lines (158 loc) · 5.13 KB
/
fault_detector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package heartbeat
import (
"fmt"
"math/rand"
"sync"
"time"
"github.com/antongulenko/RTP/protocols"
"github.com/antongulenko/golib"
)
// ======================= Server for receiving heartbeats =======================
var (
tokenRand = rand.New(rand.NewSource(time.Now().Unix()))
)
type HeartbeatServer struct {
*protocols.Server
detectors map[int64]*HeartbeatFaultDetector
}
type serverStopper struct {
protocols.Protocol
*HeartbeatServer
}
func NewHeartbeatServer(local_addr string) (*HeartbeatServer, error) {
heartbeatServer := &HeartbeatServer{
detectors: make(map[int64]*HeartbeatFaultDetector),
}
if server, err := protocols.NewServer(local_addr, &serverStopper{MiniProtocol, heartbeatServer}); err == nil {
heartbeatServer.Server = server
if err = RegisterServerHandler(server, heartbeatServer); err == nil {
return heartbeatServer, nil
} else {
return nil, err
}
} else {
return nil, err
}
}
func (server *HeartbeatServer) Start(wg *sync.WaitGroup) golib.StopChan {
res := server.Server.Start(wg)
for _, detector := range server.detectors {
detector.Start()
}
return res
}
func (server *serverStopper) StopServer() {
for _, detector := range server.detectors {
if err := detector.Close(); err != nil {
server.LogError(fmt.Errorf("Error closing %v: %v", detector, err))
}
}
}
func (server *HeartbeatServer) HeartbeatReceived(beat *HeartbeatPacket) {
received := time.Now()
token := beat.Token
if detector, ok := server.detectors[token]; ok {
detector.heartbeatReceived(received, beat)
} else {
server.LogError(fmt.Errorf("Unexpected heartbeat (seq %v) from %v", beat.Seq, beat.Source))
}
}
// ======================= FaultDetector interface =======================
type HeartbeatFaultDetector struct {
*protocols.FaultDetectorBase
server *HeartbeatServer
client *Client
configError error
acceptableTimeout time.Duration
heartbeatFrequency time.Duration
token int64
seq uint64
lastHeartbeatSent time.Time
lastHeartbeatReceived time.Time
}
func (detector *HeartbeatFaultDetector) String() string {
return fmt.Sprintf("%v-HeartbeatFaultDetector for %v", detector.server.Protocol().Name(), detector.ObservedServer())
}
func (server *HeartbeatServer) ObserveServer(endpoint string, heartbeatFrequency time.Duration, acceptableTimeout time.Duration) (protocols.FaultDetector, error) {
client, err := NewClientFor(endpoint)
if err != nil {
return nil, err
}
var token int64
for {
token = tokenRand.Int63()
if _, ok := server.detectors[token]; !ok && token != 0 {
break
}
}
detector := &HeartbeatFaultDetector{
FaultDetectorBase: protocols.NewFaultDetectorBase(server.Protocol(), client.Server()),
client: client,
acceptableTimeout: acceptableTimeout,
heartbeatFrequency: heartbeatFrequency,
server: server,
token: token,
lastHeartbeatReceived: time.Now(),
}
server.detectors[token] = detector
return detector, nil
}
func (detector *HeartbeatFaultDetector) heartbeatReceived(received time.Time, beat *HeartbeatPacket) {
if detector.seq != 0 && detector.seq != beat.Seq {
detector.server.LogError(fmt.Errorf("Heartbeat sequence jump (%v -> %v) for %v", detector.seq, beat.Seq, detector))
}
detector.seq = beat.Seq + 1
detector.lastHeartbeatReceived = received
detector.lastHeartbeatSent = beat.TimeSent
detector.Check()
}
func (detector *HeartbeatFaultDetector) IsStopped() bool {
return detector.Closed.Enabled() || detector.server.Stopped
}
func (detector *HeartbeatFaultDetector) Check() {
detector.PerformCheck(func() error {
timeSinceLastHeartbeat := time.Now().Sub(detector.lastHeartbeatReceived)
if timeSinceLastHeartbeat <= detector.acceptableTimeout {
return nil
} else {
var configErr string
if detector.configError != nil {
configErr = fmt.Sprintf(". Error configuring remote server: %v", detector.configError)
}
return fmt.Errorf("Heartbeat timeout: last heartbeat %v ago%s", timeSinceLastHeartbeat, configErr)
}
})
if !detector.Online() {
detector.configureObservedServer()
}
}
func (detector *HeartbeatFaultDetector) configureObservedServer() {
detector.seq = 0
detector.configError = detector.client.ConfigureHeartbeat(detector.server.Server, detector.token, detector.heartbeatFrequency)
if detector.configError != nil {
detector.client.ResetConnection()
}
}
func (detector *HeartbeatFaultDetector) Start() {
if !detector.IsStopped() {
detector.configureObservedServer() // Once when starting up
go func() {
// TODO sleep something less than the timeout. This is random and probably will not scale.
timeout := detector.acceptableTimeout
time.Sleep(timeout) // Sleep now to wait for first heartbeat
detector.LoopCheck(detector.Check, timeout)
}()
}
}
func (detector *HeartbeatFaultDetector) Close() error {
var err golib.MultiError
detector.Closed.Enable(func() {
delete(detector.server.detectors, detector.token)
// Notify remote server to stop sending heartbeats.
if detector.Online() {
err.Add(detector.client.ConfigureHeartbeat(detector.server.Server, 0, 0))
}
err.Add(detector.client.Close())
})
return err.NilOrError()
}