-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
watcher.go
326 lines (276 loc) · 9.3 KB
/
watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
package serviceregistration
import (
"context"
"fmt"
"time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-set"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
)
// composite of allocID + taskName for uniqueness
type key string
type restarter struct {
allocID string
taskName string
checkID string
checkName string
taskKey key
logger hclog.Logger
task WorkloadRestarter
grace time.Duration
interval time.Duration
timeLimit time.Duration
ignoreWarnings bool
// unhealthyState is the time a check first went unhealthy. Set to the
// zero value if the check passes before timeLimit.
unhealthyState time.Time
// graceUntil is when the check's grace period expires and unhealthy
// checks should be counted.
graceUntil time.Time
}
// apply restart state for check and restart task if necessary. Current
// timestamp is passed in so all check updates have the same view of time (and
// to ease testing).
//
// Returns true if a restart was triggered in which case this check should be
// removed (checks are added on task startup).
func (r *restarter) apply(ctx context.Context, now time.Time, status string) bool {
healthy := func() {
if !r.unhealthyState.IsZero() {
r.logger.Debug("canceling restart because check became healthy")
r.unhealthyState = time.Time{}
}
}
switch status {
case "critical": // consul
case string(structs.CheckFailure): // nomad
case string(structs.CheckPending): // nomad
case "warning": // consul
if r.ignoreWarnings {
// Warnings are ignored, reset state and exit
healthy()
return false
}
default:
// All other statuses are ok, reset state and exit
healthy()
return false
}
if now.Before(r.graceUntil) {
// In grace period, exit
return false
}
if r.unhealthyState.IsZero() {
// First failure, set restart deadline
if r.timeLimit != 0 {
r.logger.Debug("check became unhealthy. Will restart if check doesn't become healthy", "time_limit", r.timeLimit)
}
r.unhealthyState = now
}
// restart timeLimit after start of this check becoming unhealthy
restartAt := r.unhealthyState.Add(r.timeLimit)
// Must test >= because if limit=1, restartAt == first failure
if now.Equal(restartAt) || now.After(restartAt) {
// hasn't become healthy by deadline, restart!
r.logger.Debug("restarting due to unhealthy check")
// Tell TaskRunner to restart due to failure
reason := fmt.Sprintf("healthcheck: check %q unhealthy", r.checkName)
event := structs.NewTaskEvent(structs.TaskRestartSignal).SetRestartReason(reason)
go asyncRestart(ctx, r.logger, r.task, event)
return true
}
return false
}
// asyncRestart mimics the pre-0.9 TaskRunner.Restart behavior and is intended
// to be called in a goroutine.
func asyncRestart(ctx context.Context, logger hclog.Logger, task WorkloadRestarter, event *structs.TaskEvent) {
// Check watcher restarts are always failures
const failure = true
// Restarting is asynchronous so there's no reason to allow this
// goroutine to block indefinitely.
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := task.Restart(ctx, event, failure); err != nil {
// Restart errors are not actionable and only relevant when
// debugging allocation lifecycle management.
logger.Debug("failed to restart task", "error", err, "event_time", event.Time, "event_type", event.Type)
}
}
// CheckStatusGetter is implemented per-provider.
type CheckStatusGetter interface {
// Get returns a map from CheckID -> (minimal) CheckStatus
Get() (map[string]string, error)
}
// checkWatchUpdates add or remove checks from the watcher
type checkWatchUpdate struct {
checkID string
remove bool
restart *restarter
}
// A CheckWatcher watches for check failures and restarts tasks according to
// their check_restart policy.
type CheckWatcher interface {
// Run the CheckWatcher. Maintains a background process to continuously
// monitor active checks. Must be called before Watch or Unwatch. Must be
// called as a goroutine.
Run(ctx context.Context)
// Watch the given check. If the check status enters a failing state, the
// task associated with the check will be restarted according to its check_restart
// policy via wr.
Watch(allocID, taskName, checkID string, check *structs.ServiceCheck, wr WorkloadRestarter)
// Unwatch will cause the CheckWatcher to no longer monitor the check of given checkID.
Unwatch(checkID string)
}
// UniversalCheckWatcher is an implementation of CheckWatcher capable of watching
// checks in the Nomad or Consul service providers.
type UniversalCheckWatcher struct {
logger hclog.Logger
getter CheckStatusGetter
// pollFrequency is how often to poll the checks API
pollFrequency time.Duration
// checkUpdateCh sends watches/removals to the main loop
checkUpdateCh chan checkWatchUpdate
// done is closed when Run has exited
done chan struct{}
// failedPreviousInterval is used to indicate whether something went wrong during
// the previous poll interval - if so we can silence ongoing errors
failedPreviousInterval bool
}
func NewCheckWatcher(logger hclog.Logger, getter CheckStatusGetter) *UniversalCheckWatcher {
return &UniversalCheckWatcher{
logger: logger.ResetNamed("watch.checks"),
getter: getter,
pollFrequency: 1 * time.Second,
checkUpdateCh: make(chan checkWatchUpdate, 8),
done: make(chan struct{}),
}
}
// Watch a check and restart its task if unhealthy.
func (w *UniversalCheckWatcher) Watch(allocID, taskName, checkID string, check *structs.ServiceCheck, wr WorkloadRestarter) {
if !check.TriggersRestarts() {
return // check_restart not set; no-op
}
c := &restarter{
allocID: allocID,
taskName: taskName,
checkID: checkID,
checkName: check.Name,
taskKey: key(allocID + taskName),
task: wr,
interval: check.Interval,
grace: check.CheckRestart.Grace,
graceUntil: time.Now().Add(check.CheckRestart.Grace),
timeLimit: check.Interval * time.Duration(check.CheckRestart.Limit-1),
ignoreWarnings: check.CheckRestart.IgnoreWarnings,
logger: w.logger.With("alloc_id", allocID, "task", taskName, "check", check.Name),
}
select {
case w.checkUpdateCh <- checkWatchUpdate{
checkID: checkID,
restart: c,
}: // activate watch
case <-w.done: // exited; nothing to do
}
}
// Unwatch a check.
func (w *UniversalCheckWatcher) Unwatch(checkID string) {
select {
case w.checkUpdateCh <- checkWatchUpdate{
checkID: checkID,
remove: true,
}: // deactivate watch
case <-w.done: // exited; nothing to do
}
}
func (w *UniversalCheckWatcher) Run(ctx context.Context) {
defer close(w.done)
// map of checkID to their restarter handle (contains only checks we are watching)
watched := make(map[string]*restarter)
checkTimer, cleanupCheckTimer := helper.NewSafeTimer(0)
defer cleanupCheckTimer()
stopCheckTimer := func() { // todo: refactor using that other pattern
checkTimer.Stop()
select {
case <-checkTimer.C:
default:
}
}
// initialize with checkTimer disabled
stopCheckTimer()
for {
// disable polling if there are no checks
if len(watched) == 0 {
stopCheckTimer()
}
select {
// caller cancelled us; goodbye
case <-ctx.Done():
return
// received an update; add or remove check
case update := <-w.checkUpdateCh:
if update.remove {
delete(watched, update.checkID)
continue
}
watched[update.checkID] = update.restart
allocID := update.restart.allocID
taskName := update.restart.taskName
checkName := update.restart.checkName
w.logger.Trace("now watching check", "alloc_i", allocID, "task", taskName, "check", checkName)
// turn on the timer if we are now active
if len(watched) == 1 {
stopCheckTimer()
checkTimer.Reset(w.pollFrequency)
}
// poll time; refresh check statuses
case now := <-checkTimer.C:
w.interval(ctx, now, watched)
checkTimer.Reset(w.pollFrequency)
}
}
}
func (w *UniversalCheckWatcher) interval(ctx context.Context, now time.Time, watched map[string]*restarter) {
statuses, err := w.getter.Get()
if err != nil && !w.failedPreviousInterval {
w.failedPreviousInterval = true
w.logger.Error("failed to retrieve check statuses", "error", err)
return
}
w.failedPreviousInterval = false
// keep track of tasks restarted this interval
restarts := set.New[key](len(statuses))
// iterate over status of all checks, and update the status of checks
// we care about watching
for checkID, checkRestarter := range watched {
if ctx.Err() != nil {
return // short circuit; caller cancelled us
}
if restarts.Contains(checkRestarter.taskKey) {
// skip; task is already being restarted
delete(watched, checkID)
continue
}
status, exists := statuses[checkID]
if !exists {
// warn only if outside grace period; avoiding race with check registration
if now.After(checkRestarter.graceUntil) {
w.logger.Warn("watched check not found", "check_id", checkID)
}
continue
}
if checkRestarter.apply(ctx, now, status) {
// check will be re-registered & re-watched on startup
delete(watched, checkID)
restarts.Insert(checkRestarter.taskKey)
}
}
// purge passing checks of tasks that are being restarted
if restarts.Size() > 0 {
for checkID, checkRestarter := range watched {
if restarts.Contains(checkRestarter.taskKey) {
delete(watched, checkID)
}
}
}
}