-
Notifications
You must be signed in to change notification settings - Fork 663
/
detector.go
373 lines (331 loc) · 10.7 KB
/
detector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
package unhanger
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"math/rand" //#nosec // this is only used for shuffling an array to pick random jobs to unhang
"time"
"golang.org/x/xerrors"
"github.com/google/uuid"
"cdr.dev/slog"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbauthz"
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/coderd/database/pubsub"
"github.com/coder/coder/v2/provisionersdk"
)
const (
// HungJobDuration is the duration of time since the last update to a job
// before it is considered hung.
HungJobDuration = 5 * time.Minute
// HungJobExitTimeout is the duration of time that provisioners should allow
// for a graceful exit upon cancellation due to failing to send an update to
// a job.
//
// Provisioners should avoid keeping a job "running" for longer than this
// time after failing to send an update to the job.
HungJobExitTimeout = 3 * time.Minute
// MaxJobsPerRun is the maximum number of hung jobs that the detector will
// terminate in a single run.
MaxJobsPerRun = 10
)
// HungJobLogMessages are written to provisioner job logs when a job is hung and
// terminated.
var HungJobLogMessages = []string{
"",
"====================",
"Coder: Build has been detected as hung for 5 minutes and will be terminated.",
"====================",
"",
}
// acquireLockError is returned when the detector fails to acquire a lock and
// cancels the current run.
type acquireLockError struct{}
// Error implements error.
func (acquireLockError) Error() string {
return "lock is held by another client"
}
// jobInelligibleError is returned when a job is not eligible to be terminated
// anymore.
type jobInelligibleError struct {
Err error
}
// Error implements error.
func (e jobInelligibleError) Error() string {
return fmt.Sprintf("job is no longer eligible to be terminated: %s", e.Err)
}
// Detector automatically detects hung provisioner jobs, sends messages into the
// build log and terminates them as failed.
type Detector struct {
ctx context.Context
cancel context.CancelFunc
done chan struct{}
db database.Store
pubsub pubsub.Pubsub
log slog.Logger
tick <-chan time.Time
stats chan<- Stats
}
// Stats contains statistics about the last run of the detector.
type Stats struct {
// TerminatedJobIDs contains the IDs of all jobs that were detected as hung and
// terminated.
TerminatedJobIDs []uuid.UUID
// Error is the fatal error that occurred during the last run of the
// detector, if any. Error may be set to AcquireLockError if the detector
// failed to acquire a lock.
Error error
}
// New returns a new hang detector.
func New(ctx context.Context, db database.Store, pub pubsub.Pubsub, log slog.Logger, tick <-chan time.Time) *Detector {
//nolint:gocritic // Hang detector has a limited set of permissions.
ctx, cancel := context.WithCancel(dbauthz.AsHangDetector(ctx))
d := &Detector{
ctx: ctx,
cancel: cancel,
done: make(chan struct{}),
db: db,
pubsub: pub,
log: log,
tick: tick,
stats: nil,
}
return d
}
// WithStatsChannel will cause Executor to push a RunStats to ch after
// every tick. This push is blocking, so if ch is not read, the detector will
// hang. This should only be used in tests.
func (d *Detector) WithStatsChannel(ch chan<- Stats) *Detector {
d.stats = ch
return d
}
// Start will cause the detector to detect and unhang provisioner jobs on every
// tick from its channel. It will stop when its context is Done, or when its
// channel is closed.
//
// Start should only be called once.
func (d *Detector) Start() {
go func() {
defer close(d.done)
defer d.cancel()
for {
select {
case <-d.ctx.Done():
return
case t, ok := <-d.tick:
if !ok {
return
}
stats := d.run(t)
if stats.Error != nil && !xerrors.As(stats.Error, &acquireLockError{}) {
d.log.Warn(d.ctx, "error running workspace build hang detector once", slog.Error(stats.Error))
}
if d.stats != nil {
select {
case <-d.ctx.Done():
return
case d.stats <- stats:
}
}
}
}
}()
}
// Wait will block until the detector is stopped.
func (d *Detector) Wait() {
<-d.done
}
// Close will stop the detector.
func (d *Detector) Close() {
d.cancel()
<-d.done
}
func (d *Detector) run(t time.Time) Stats {
ctx, cancel := context.WithTimeout(d.ctx, 5*time.Minute)
defer cancel()
stats := Stats{
TerminatedJobIDs: []uuid.UUID{},
Error: nil,
}
// Find all provisioner jobs that are currently running but have not
// received an update in the last 5 minutes.
jobs, err := d.db.GetHungProvisionerJobs(ctx, t.Add(-HungJobDuration))
if err != nil {
stats.Error = xerrors.Errorf("get hung provisioner jobs: %w", err)
return stats
}
// Limit the number of jobs we'll unhang in a single run to avoid
// timing out.
if len(jobs) > MaxJobsPerRun {
// Pick a random subset of the jobs to unhang.
rand.Shuffle(len(jobs), func(i, j int) {
jobs[i], jobs[j] = jobs[j], jobs[i]
})
jobs = jobs[:MaxJobsPerRun]
}
// Send a message into the build log for each hung job saying that it
// has been detected and will be terminated, then mark the job as
// failed.
for _, job := range jobs {
log := d.log.With(slog.F("job_id", job.ID))
err := unhangJob(ctx, log, d.db, d.pubsub, job.ID)
if err != nil {
if !(xerrors.As(err, &acquireLockError{}) || xerrors.As(err, &jobInelligibleError{})) {
log.Error(ctx, "error forcefully terminating hung provisioner job", slog.Error(err))
}
continue
}
stats.TerminatedJobIDs = append(stats.TerminatedJobIDs, job.ID)
}
return stats
}
func unhangJob(ctx context.Context, log slog.Logger, db database.Store, pub pubsub.Pubsub, jobID uuid.UUID) error {
var lowestLogID int64
err := db.InTx(func(db database.Store) error {
locked, err := db.TryAcquireLock(ctx, database.GenLockID(fmt.Sprintf("hang-detector:%s", jobID)))
if err != nil {
return xerrors.Errorf("acquire lock: %w", err)
}
if !locked {
// This error is ignored.
return acquireLockError{}
}
// Refetch the job while we hold the lock.
job, err := db.GetProvisionerJobByID(ctx, jobID)
if err != nil {
return xerrors.Errorf("get provisioner job: %w", err)
}
// Check if we should still unhang it.
if !job.StartedAt.Valid {
// This shouldn't be possible to hit because the query only selects
// started and not completed jobs, and a job can't be "un-started".
return jobInelligibleError{
Err: xerrors.New("job is not started"),
}
}
if job.CompletedAt.Valid {
return jobInelligibleError{
Err: xerrors.Errorf("job is completed (status %s)", job.JobStatus),
}
}
if job.UpdatedAt.After(time.Now().Add(-HungJobDuration)) {
return jobInelligibleError{
Err: xerrors.New("job has been updated recently"),
}
}
log.Warn(
ctx, "detected hung provisioner job, forcefully terminating",
"threshold", HungJobDuration,
)
// First, get the latest logs from the build so we can make sure
// our messages are in the latest stage.
logs, err := db.GetProvisionerLogsAfterID(ctx, database.GetProvisionerLogsAfterIDParams{
JobID: job.ID,
CreatedAfter: 0,
})
if err != nil {
return xerrors.Errorf("get logs for hung job: %w", err)
}
logStage := ""
if len(logs) != 0 {
logStage = logs[len(logs)-1].Stage
}
if logStage == "" {
logStage = "Unknown"
}
// Insert the messages into the build log.
insertParams := database.InsertProvisionerJobLogsParams{
JobID: job.ID,
CreatedAt: nil,
Source: nil,
Level: nil,
Stage: nil,
Output: nil,
}
now := dbtime.Now()
for i, msg := range HungJobLogMessages {
// Set the created at in a way that ensures each message has
// a unique timestamp so they will be sorted correctly.
insertParams.CreatedAt = append(insertParams.CreatedAt, now.Add(time.Millisecond*time.Duration(i)))
insertParams.Level = append(insertParams.Level, database.LogLevelError)
insertParams.Stage = append(insertParams.Stage, logStage)
insertParams.Source = append(insertParams.Source, database.LogSourceProvisionerDaemon)
insertParams.Output = append(insertParams.Output, msg)
}
newLogs, err := db.InsertProvisionerJobLogs(ctx, insertParams)
if err != nil {
return xerrors.Errorf("insert logs for hung job: %w", err)
}
lowestLogID = newLogs[0].ID
// Mark the job as failed.
now = dbtime.Now()
err = db.UpdateProvisionerJobWithCompleteByID(ctx, database.UpdateProvisionerJobWithCompleteByIDParams{
ID: job.ID,
UpdatedAt: now,
CompletedAt: sql.NullTime{
Time: now,
Valid: true,
},
Error: sql.NullString{
String: "Coder: Build has been detected as hung for 5 minutes and has been terminated by hang detector.",
Valid: true,
},
ErrorCode: sql.NullString{
Valid: false,
},
})
if err != nil {
return xerrors.Errorf("mark job as failed: %w", err)
}
// If the provisioner job is a workspace build, copy the
// provisioner state from the previous build to this workspace
// build.
if job.Type == database.ProvisionerJobTypeWorkspaceBuild {
build, err := db.GetWorkspaceBuildByJobID(ctx, job.ID)
if err != nil {
return xerrors.Errorf("get workspace build for workspace build job by job id: %w", err)
}
// Only copy the provisioner state if there's no state in
// the current build.
if len(build.ProvisionerState) == 0 {
// Get the previous build if it exists.
prevBuild, err := db.GetWorkspaceBuildByWorkspaceIDAndBuildNumber(ctx, database.GetWorkspaceBuildByWorkspaceIDAndBuildNumberParams{
WorkspaceID: build.WorkspaceID,
BuildNumber: build.BuildNumber - 1,
})
if err != nil && !xerrors.Is(err, sql.ErrNoRows) {
return xerrors.Errorf("get previous workspace build: %w", err)
}
if err == nil {
err = db.UpdateWorkspaceBuildProvisionerStateByID(ctx, database.UpdateWorkspaceBuildProvisionerStateByIDParams{
ID: build.ID,
UpdatedAt: dbtime.Now(),
ProvisionerState: prevBuild.ProvisionerState,
})
if err != nil {
return xerrors.Errorf("update workspace build by id: %w", err)
}
}
}
}
return nil
}, nil)
if err != nil {
return xerrors.Errorf("in tx: %w", err)
}
// Publish the new log notification to pubsub. Use the lowest log ID
// inserted so the log stream will fetch everything after that point.
data, err := json.Marshal(provisionersdk.ProvisionerJobLogsNotifyMessage{
CreatedAfter: lowestLogID - 1,
EndOfLogs: true,
})
if err != nil {
return xerrors.Errorf("marshal log notification: %w", err)
}
err = pub.Publish(provisionersdk.ProvisionerJobLogsNotifyChannel(jobID), data)
if err != nil {
return xerrors.Errorf("publish log notification: %w", err)
}
return nil
}