-
Notifications
You must be signed in to change notification settings - Fork 6
/
monitor.go
95 lines (90 loc) · 2.29 KB
/
monitor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package hypervisors
import (
"flag"
"time"
"github.com/Cloud-Foundations/Dominator/lib/srpc"
hyper_proto "github.com/Cloud-Foundations/Dominator/proto/hypervisor"
)
var (
hypervisorProbeTimeout = flag.Duration("hypervisorProbeTimeout",
time.Second*5, "time after which a probe is sent to a quiet Hypervisor")
hypervisorResponseTimeout = flag.Duration("hypervisorResponseTimeout",
time.Second*19,
"time after which a Hypervisor is marked as unresponsive")
)
func (h *hypervisorType) monitorLoop(client *srpc.Client, conn *srpc.Conn) {
pingDeferChannel := make(chan struct{})
defer close(pingDeferChannel)
go h.pingLoop(conn, pingDeferChannel)
lastReceiveTime := time.Now()
for {
timeout := *hypervisorResponseTimeout - time.Since(lastReceiveTime)
if timeout <= 0 {
timeout = time.Millisecond
}
timer := time.NewTimer(timeout)
select {
case _, ok := <-h.receiveChannel:
if !timer.Stop() {
<-timer.C
}
if !ok {
return
}
select {
case pingDeferChannel <- struct{}{}:
default:
}
lastReceiveTime = time.Now()
h.mutex.Lock()
h.probeStatus = probeStatusConnected
h.mutex.Unlock()
case <-timer.C:
h.mutex.Lock()
h.probeStatus = probeStatusUnreachable
h.conn = nil
h.mutex.Unlock()
h.logger.Debugln(0, "shutting down unresponsive client")
client.Close()
return
}
}
}
func (h *hypervisorType) pingLoop(conn *srpc.Conn,
pingDeferChannel <-chan struct{}) {
pingsSinceLastDefer := 0
for {
timer := time.NewTimer(*hypervisorProbeTimeout)
select {
case _, ok := <-pingDeferChannel:
if !timer.Stop() {
<-timer.C
}
if !ok {
return
}
timer.Reset(*hypervisorProbeTimeout)
h.mutex.Lock()
h.probeStatus = probeStatusConnected
h.mutex.Unlock()
pingsSinceLastDefer = 0
case <-timer.C:
pingsSinceLastDefer++
if pingsSinceLastDefer > 1 {
h.logger.Debugf(0, "sending ping #%d since last activity\n",
pingsSinceLastDefer)
} else {
h.logger.Debugln(1, "sending first ping since last activity")
}
err := conn.Encode(hyper_proto.GetUpdatesRequest{})
if err != nil {
h.logger.Printf("error sending ping: %s\n", err)
} else {
if err := conn.Flush(); err != nil {
h.logger.Printf("error flushing ping: %s\n", err)
}
}
timer.Reset(*hypervisorProbeTimeout)
}
}
}