-
Notifications
You must be signed in to change notification settings - Fork 672
/
worker.go
173 lines (143 loc) · 3.78 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
// Copyright (C) 2019-2022, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package health
import (
"errors"
"fmt"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/ava-labs/avalanchego/utils"
)
var errDuplicateCheck = errors.New("duplicated check")
type worker struct {
metrics *metrics
checksLock sync.RWMutex
checks map[string]Checker
resultsLock sync.RWMutex
results map[string]Result
startOnce sync.Once
closeOnce sync.Once
closer chan struct{}
}
func newWorker(namespace string, registerer prometheus.Registerer) (*worker, error) {
metrics, err := newMetrics(namespace, registerer)
return &worker{
metrics: metrics,
checks: make(map[string]Checker),
results: make(map[string]Result),
closer: make(chan struct{}),
}, err
}
func (w *worker) RegisterCheck(name string, checker Checker) error {
w.checksLock.Lock()
defer w.checksLock.Unlock()
if _, ok := w.checks[name]; ok {
return fmt.Errorf("%w: %q", errDuplicateCheck, name)
}
w.resultsLock.Lock()
defer w.resultsLock.Unlock()
w.checks[name] = checker
w.results[name] = notYetRunResult
// Whenever a new check is added - it is failing
w.metrics.failingChecks.Inc()
return nil
}
func (w *worker) RegisterMonotonicCheck(name string, checker Checker) error {
var result utils.AtomicInterface
return w.RegisterCheck(name, CheckerFunc(func() (interface{}, error) {
details := result.GetValue()
if details != nil {
return details, nil
}
details, err := checker.HealthCheck()
if err == nil {
result.SetValue(details)
}
return details, err
}))
}
func (w *worker) Results() (map[string]Result, bool) {
w.resultsLock.RLock()
defer w.resultsLock.RUnlock()
results := make(map[string]Result, len(w.results))
healthy := true
for name, result := range w.results {
results[name] = result
healthy = healthy && result.Error == nil
}
return results, healthy
}
func (w *worker) Start(freq time.Duration) {
w.startOnce.Do(func() {
go func() {
ticker := time.NewTicker(freq)
defer ticker.Stop()
w.runChecks()
for {
select {
case <-ticker.C:
w.runChecks()
case <-w.closer:
return
}
}
}()
})
}
func (w *worker) Stop() {
w.closeOnce.Do(func() {
close(w.closer)
})
}
func (w *worker) runChecks() {
w.checksLock.RLock()
// Copy the [w.checks] map to collect the checks that we will be running
// during this iteration. If [w.checks] is modified during this iteration of
// [runChecks], then the added check will not be run until the next
// iteration.
checks := make(map[string]Checker, len(w.checks))
for name, checker := range w.checks {
checks[name] = checker
}
w.checksLock.RUnlock()
var wg sync.WaitGroup
wg.Add(len(checks))
for name, check := range checks {
go w.runCheck(&wg, name, check)
}
wg.Wait()
}
func (w *worker) runCheck(wg *sync.WaitGroup, name string, check Checker) {
defer wg.Done()
start := time.Now()
// To avoid any deadlocks when [RegisterCheck] is called with a lock
// that is grabbed by [check.HealthCheck], we ensure that no locks
// are held when [check.HealthCheck] is called.
details, err := check.HealthCheck()
end := time.Now()
result := Result{
Details: details,
Timestamp: end,
Duration: end.Sub(start),
}
w.resultsLock.Lock()
defer w.resultsLock.Unlock()
prevResult := w.results[name]
if err != nil {
errString := err.Error()
result.Error = &errString
result.ContiguousFailures = prevResult.ContiguousFailures + 1
if prevResult.ContiguousFailures > 0 {
result.TimeOfFirstFailure = prevResult.TimeOfFirstFailure
} else {
result.TimeOfFirstFailure = &end
}
if prevResult.Error == nil {
w.metrics.failingChecks.Inc()
}
} else if prevResult.Error != nil {
w.metrics.failingChecks.Dec()
}
w.results[name] = result
}