/
checker.go
139 lines (118 loc) · 2.58 KB
/
checker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package servicecheck
import (
"log"
"net"
"net/http"
"strings"
"time"
"github.com/brentahughes/service_tester/pkg/config"
"github.com/brentahughes/service_tester/pkg/models"
"github.com/dgraph-io/badger"
"github.com/digineo/go-ping"
"github.com/panjf2000/ants"
)
type Checker struct {
db *badger.DB
cfg *config.Config
pool *ants.PoolWithFunc
pinger pinger
httpClient *http.Client
}
func NewChecker(
db *badger.DB,
conf *config.Config,
) (*Checker, error) {
c := &Checker{
db: db,
cfg: conf,
httpClient: &http.Client{
Timeout: checkTimeout,
},
}
pool, err := ants.NewPoolWithFunc(conf.ParallelChecks, c.checkHost)
if err != nil {
return nil, err
}
c.pool = pool
var p pinger
p, err = ping.New("0.0.0.0", "")
if err != nil {
if opErr, ok := err.(*net.OpError); ok && strings.Contains(opErr.Err.Error(), "operation not permitted") {
p = &pingNoOp{}
} else {
return nil, err
}
}
c.pinger = p
return c, nil
}
func (c *Checker) Start() {
c.runCheck()
tick := time.NewTicker(c.cfg.CheckInterval)
for range tick.C {
c.runCheck()
}
}
func (c *Checker) Stop() {
log.Printf("Shutting down checker")
c.httpClient.CloseIdleConnections()
c.pinger.Close()
}
func (c *Checker) runCheck() {
c.discoverNewHosts()
hosts, err := models.GetHosts(c.db)
if err != nil {
log.Printf("error getting recent hosts: %v", err)
return
}
for _, host := range hosts {
c.pool.Invoke(host)
}
}
func (c *Checker) discoverNewHosts() {
var err error
ips := c.cfg.Hosts
if c.cfg.Discovery != "" {
ips, err = c.discoverHosts()
if err != nil {
log.Printf("error checking discovery endpoint (%s) %v", c.cfg.Discovery, err)
return
}
}
currentHost, err := models.GetCurrentHost(c.db)
if err != nil {
log.Printf("error getting current host %v", err)
return
}
for _, ip := range ips {
if currentHost.PublicIP == ip || currentHost.InternalIP == ip {
continue
}
host, err := models.GetHostByIP(c.db, ip)
if err != nil {
if err != badger.ErrKeyNotFound {
log.Printf("error looking up host by ip (%s) %v", ip, err)
continue
}
// Call health endpoint and save host information
c.newHost(ip)
} else {
// Update the last seen
if err := host.Save(c.db); err != nil {
log.Printf("error updating host (%s): %v", host.Hostname, err)
continue
}
}
}
}
func (c *Checker) discoverHosts() ([]string, error) {
addrs, err := net.LookupIP(c.cfg.Discovery)
if err != nil {
return nil, err
}
var hosts []string
for _, addr := range addrs {
hosts = append(hosts, addr.String())
}
return hosts, nil
}