This repository has been archived by the owner on Oct 4, 2022. It is now read-only.
/
liveness.go
291 lines (259 loc) · 7.42 KB
/
liveness.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
package funk
//
//Copyright 2019 Telenor Digital AS
//
//Licensed under the Apache License, Version 2.0 (the "License");
//you may not use this file except in compliance with the License.
//You may obtain a copy of the License at
//
//http://www.apache.org/licenses/LICENSE-2.0
//
//Unless required by applicable law or agreed to in writing, software
//distributed under the License is distributed on an "AS IS" BASIS,
//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//See the License for the specific language governing permissions and
//limitations under the License.
//
import (
"fmt"
"net"
"time"
log "github.com/sirupsen/logrus"
)
// LivenessChecker is a liveness checker. It does a (very simple) high freqyency
// liveness check on nodes. If it fails more than a certain number of times an
// event is generated on the DeadEvents channel. Only a single subscriber is
// supported for this channel.
type LivenessChecker interface {
// Add adds a new new check to the list.
Add(id string, endpoint string)
// Remove removes a single checker
Remove(id string)
// DeadEvents returns the event channel. The ID from the Add method is
// echoed on this channel when a client stops responding.
DeadEvents() <-chan string
// AliveEvents returns an event channel for alive events.
AliveEvents() <-chan string
// Clear removes all endpoint checks.
Clear()
// Shutdown shuts down the checker and closes the event channel. The
// checker will no longer be in an usable state after this.
Shutdown()
}
// LocalLivenessEndpoint launches a local liveness client. The client will respond
// to (short) UDP packets by echoing back the packet on the same port.
// This is not a *health* check, just a liveness check if the node is
// reachable on the network.
type LocalLivenessEndpoint interface {
Stop()
}
type udpLivenessClient struct {
stopCh chan struct{}
}
// NewLivenessClient creates a new liveness endpoint with the specified
// port/address. Response is sent immideately
func NewLivenessClient(ep string) LocalLivenessEndpoint {
ret := &udpLivenessClient{stopCh: make(chan struct{})}
ret.launch(ep)
return ret
}
// Stop will cause the liveness client to stop
func (u *udpLivenessClient) Stop() {
u.stopCh <- struct{}{}
}
func (u *udpLivenessClient) launch(endpoint string) {
conn, err := net.ListenPacket("udp", endpoint)
if err != nil {
// This is panic-worthy material since the other members of the cluster might think that
// this node is dead
panic(fmt.Sprintf("liveness: unable to listen on %s: %v", endpoint, err))
}
go func(conn net.PacketConn) {
buf := make([]byte, 2)
defer conn.Close()
for {
select {
case <-u.stopCh:
return
default:
}
if err := conn.SetReadDeadline(time.Now().Add(1 * time.Second)); err != nil {
log.WithError(err).Warning("Can't set deadline for socket")
time.Sleep(1 * time.Second)
continue
}
_, addr, err := conn.ReadFrom(buf)
if err != nil {
continue
}
// nolint This *might* cause an error but we'll ignore the erorr since it's handled elsewhere.
conn.WriteTo(buf, addr)
}
}(conn)
}
// This type checks a single client for liveness.
type singleChecker struct {
deadCh chan<- string
aliveCh chan<- string
stopCh chan struct{}
maxErrors int
}
func newSingleChecker(id, endpoint string, deadCh chan<- string, aliveCh chan<- string, interval time.Duration, retries int) singleChecker {
ret := singleChecker{
deadCh: deadCh,
aliveCh: aliveCh,
stopCh: make(chan struct{}),
maxErrors: retries,
}
go ret.checkerProc(id, endpoint, interval)
return ret
}
func (c *singleChecker) getConnection(endpoint string, interval time.Duration) net.Conn {
for i := 0; i < c.maxErrors; i++ {
d := net.Dialer{Timeout: interval}
conn, err := d.Dial("udp", endpoint)
if err != nil {
// Usually this wil work just fine, it's the write that will fail
// but you can never be too sure.
time.Sleep(interval)
continue
}
return conn
}
return nil
}
func (c *singleChecker) checkerProc(id, endpoint string, interval time.Duration) {
buffer := make([]byte, 2)
// The error counter counts up and down - when it reaches the limit (aka maxErrors)
// the client is set to either alive (with negative errors, ie success) or dead
// (ie errors is positive)
errors := 0
var conn net.Conn
alive := true
// The waiting interval is bumped up and down depending on the state. If the
// client is alive it is set to the check interval and when it is dead the
// check interval is 10 times higher.
waitInterval := interval
for {
select {
case <-c.stopCh:
log.Infof("Terminating leveness checker to %s", endpoint)
return
default:
// keep on running
}
if alive && errors >= c.maxErrors {
alive = false
c.deadCh <- id
waitInterval = 10 * interval
}
if !alive && errors <= 0 {
alive = true
c.aliveCh <- id
waitInterval = interval
}
if conn == nil {
conn = c.getConnection(endpoint, waitInterval)
if conn == nil {
errors++
time.Sleep(waitInterval)
continue
}
}
waitCh := time.After(waitInterval)
if err := conn.SetWriteDeadline(time.Now().Add(interval)); err != nil {
log.WithError(err).Warning("Can't set deadline for liveness check socket")
conn.Close()
conn = nil
time.Sleep(waitInterval)
continue
}
_, err := conn.Write([]byte("Yo"))
if err != nil {
// Writes will usually succeed since UDP is a black hole but
// this *might* fail. Close the connection and reopen.
errors++
conn.Close()
conn = nil
time.Sleep(waitInterval)
continue
}
if err := conn.SetReadDeadline(time.Now().Add(interval)); err != nil {
log.WithError(err).Warning("Can't set deadline for liveness check socket")
conn.Close()
conn = nil
time.Sleep(waitInterval)
continue
}
_, err = conn.Read(buffer)
if err != nil {
// This will fail if the client is dead.
errors++
conn.Close()
conn = nil
time.Sleep(waitInterval)
continue
}
errors--
if errors > c.maxErrors {
errors = c.maxErrors
}
if errors < -c.maxErrors {
errors = -c.maxErrors
}
<-waitCh
}
}
func (c *singleChecker) Stop() {
select {
case c.stopCh <- struct{}{}:
case <-time.After(10 * time.Millisecond):
// it's already stopping
}
}
// this is the liveness checker type that implements the LivenessChecker
// interface.
type livenessChecker struct {
checkers map[string]singleChecker
deadEvents chan string
aliveEvents chan string
retries int
interval time.Duration
}
// NewLivenessChecker is a type that checks hosts for liveness
func NewLivenessChecker(interval time.Duration, retries int) LivenessChecker {
return &livenessChecker{
interval: interval,
retries: retries,
checkers: make(map[string]singleChecker),
deadEvents: make(chan string, 10),
aliveEvents: make(chan string, 10),
}
}
func (l *livenessChecker) Add(id string, endpoint string) {
l.checkers[id] = newSingleChecker(id, endpoint, l.deadEvents, l.aliveEvents, l.interval, l.retries)
}
func (l *livenessChecker) Remove(id string) {
existing, ok := l.checkers[id]
if ok {
existing.Stop()
delete(l.checkers, id)
}
}
func (l *livenessChecker) DeadEvents() <-chan string {
return l.deadEvents
}
func (l *livenessChecker) AliveEvents() <-chan string {
return l.aliveEvents
}
func (l *livenessChecker) Clear() {
for k, v := range l.checkers {
v.Stop()
delete(l.checkers, k)
}
}
func (l *livenessChecker) Shutdown() {
l.Clear()
close(l.deadEvents)
close(l.aliveEvents)
}