-
Notifications
You must be signed in to change notification settings - Fork 590
/
acquirer.go
489 lines (459 loc) · 14 KB
/
acquirer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
package provisionerdserver
import (
"context"
"database/sql"
"encoding/json"
"strings"
"sync"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/google/uuid"
"golang.org/x/exp/slices"
"golang.org/x/xerrors"
"cdr.dev/slog"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/coderd/database/provisionerjobs"
"github.com/coder/coder/v2/coderd/database/pubsub"
)
const (
dbMaxBackoff = 10 * time.Second
// backPollDuration is the period for the backup polling described in Acquirer comment
backupPollDuration = 30 * time.Second
)
// Acquirer is shared among multiple routines that need to call
// database.Store.AcquireProvisionerJob. The callers that acquire jobs are called "acquirees". The
// goal is to minimize polling the database (i.e. lower our average query rate) and simplify the
// acquiree's logic by handling retrying the database if a job is not available at the time of the
// call.
//
// When multiple acquirees share a set of provisioner types and tags, we define them as part of the
// same "domain". Only one acquiree from each domain may query the database at a time. If the
// database returns no jobs for that acquiree, the entire domain waits until the Acquirer is
// notified over the pubsub of a new job acceptable to the domain.
//
// As a backup to pubsub notifications, each domain is allowed to query periodically once every 30s.
// This ensures jobs are not stuck permanently if the service that created them fails to publish
// (e.g. a crash).
type Acquirer struct {
ctx context.Context
logger slog.Logger
store AcquirerStore
ps pubsub.Pubsub
mu sync.Mutex
q map[dKey]domain
// testing only
backupPollDuration time.Duration
}
type AcquirerOption func(*Acquirer)
func TestingBackupPollDuration(dur time.Duration) AcquirerOption {
return func(a *Acquirer) {
a.backupPollDuration = dur
}
}
// AcquirerStore is the subset of database.Store that the Acquirer needs
type AcquirerStore interface {
AcquireProvisionerJob(context.Context, database.AcquireProvisionerJobParams) (database.ProvisionerJob, error)
}
func NewAcquirer(ctx context.Context, logger slog.Logger, store AcquirerStore, ps pubsub.Pubsub,
opts ...AcquirerOption,
) *Acquirer {
a := &Acquirer{
ctx: ctx,
logger: logger,
store: store,
ps: ps,
q: make(map[dKey]domain),
backupPollDuration: backupPollDuration,
}
for _, opt := range opts {
opt(a)
}
a.subscribe()
return a
}
// AcquireJob acquires a job with one of the given provisioner types and compatible
// tags from the database. The call blocks until a job is acquired, the context is
// done, or the database returns an error _other_ than that no jobs are available.
// If no jobs are available, this method handles retrying as appropriate.
func (a *Acquirer) AcquireJob(
ctx context.Context, organization uuid.UUID, worker uuid.UUID, pt []database.ProvisionerType, tags Tags,
) (
retJob database.ProvisionerJob, retErr error,
) {
logger := a.logger.With(
slog.F("organization_id", organization),
slog.F("worker_id", worker),
slog.F("provisioner_types", pt),
slog.F("tags", tags))
logger.Debug(ctx, "acquiring job")
dk := domainKey(organization, pt, tags)
dbTags, err := tags.ToJSON()
if err != nil {
return database.ProvisionerJob{}, err
}
// buffer of 1 so that cancel doesn't deadlock while writing to the channel
clearance := make(chan struct{}, 1)
for {
a.want(organization, pt, tags, clearance)
select {
case <-ctx.Done():
err := ctx.Err()
logger.Debug(ctx, "acquiring job canceled", slog.Error(err))
internalError := a.cancel(dk, clearance)
if internalError != nil {
// internalError takes precedence
return database.ProvisionerJob{}, internalError
}
return database.ProvisionerJob{}, err
case <-clearance:
logger.Debug(ctx, "got clearance to call database")
job, err := a.store.AcquireProvisionerJob(ctx, database.AcquireProvisionerJobParams{
OrganizationID: organization,
StartedAt: sql.NullTime{
Time: dbtime.Now(),
Valid: true,
},
WorkerID: uuid.NullUUID{
UUID: worker,
Valid: true,
},
Types: pt,
Tags: dbTags,
})
if xerrors.Is(err, sql.ErrNoRows) {
logger.Debug(ctx, "no job available")
continue
}
// we are not going to retry, so signal we are done
internalError := a.done(dk, clearance)
if internalError != nil {
// internal error takes precedence
return database.ProvisionerJob{}, internalError
}
if err != nil {
logger.Warn(ctx, "error attempting to acquire job", slog.Error(err))
return database.ProvisionerJob{}, xerrors.Errorf("failed to acquire job: %w", err)
}
logger.Debug(ctx, "successfully acquired job")
return job, nil
}
}
}
// want signals that an acquiree wants clearance to query for a job with the given dKey.
func (a *Acquirer) want(organization uuid.UUID, pt []database.ProvisionerType, tags Tags, clearance chan<- struct{}) {
dk := domainKey(organization, pt, tags)
a.mu.Lock()
defer a.mu.Unlock()
cleared := false
d, ok := a.q[dk]
if !ok {
ctx, cancel := context.WithCancel(a.ctx)
d = domain{
ctx: ctx,
cancel: cancel,
a: a,
key: dk,
pt: pt,
tags: tags,
acquirees: make(map[chan<- struct{}]*acquiree),
}
a.q[dk] = d
go d.poll(a.backupPollDuration)
// this is a new request for this dKey, so is cleared.
cleared = true
}
w, ok := d.acquirees[clearance]
if !ok {
w = &acquiree{clearance: clearance}
d.acquirees[clearance] = w
}
// pending means that we got a job posting for this dKey while we were
// querying, so we should clear this acquiree to retry another time.
if w.pending {
cleared = true
w.pending = false
}
w.inProgress = cleared
if cleared {
// this won't block because clearance is buffered.
clearance <- struct{}{}
}
}
// cancel signals that an acquiree no longer wants clearance to query. Any error returned is a serious internal error
// indicating that integrity of the internal state is corrupted by a code bug.
func (a *Acquirer) cancel(dk dKey, clearance chan<- struct{}) error {
a.mu.Lock()
defer a.mu.Unlock()
d, ok := a.q[dk]
if !ok {
// this is a code error, as something removed the domain early, or cancel
// was called twice.
err := xerrors.New("cancel for domain that doesn't exist")
a.logger.Critical(a.ctx, "internal error", slog.Error(err))
return err
}
w, ok := d.acquirees[clearance]
if !ok {
// this is a code error, as something removed the acquiree early, or cancel
// was called twice.
err := xerrors.New("cancel for an acquiree that doesn't exist")
a.logger.Critical(a.ctx, "internal error", slog.Error(err))
return err
}
delete(d.acquirees, clearance)
if w.inProgress && len(d.acquirees) > 0 {
// this one canceled before querying, so give another acquiree a chance
// instead
for _, other := range d.acquirees {
if other.inProgress {
err := xerrors.New("more than one acquiree in progress for same key")
a.logger.Critical(a.ctx, "internal error", slog.Error(err))
return err
}
other.inProgress = true
other.clearance <- struct{}{}
break // just one
}
}
if len(d.acquirees) == 0 {
d.cancel()
delete(a.q, dk)
}
return nil
}
// done signals that the acquiree has completed acquiring a job (usually successfully, but we also get this call if
// there is a database error other than ErrNoRows). Any error returned is a serious internal error indicating that
// integrity of the internal state is corrupted by a code bug.
func (a *Acquirer) done(dk dKey, clearance chan struct{}) error {
a.mu.Lock()
defer a.mu.Unlock()
d, ok := a.q[dk]
if !ok {
// this is a code error, as something removed the domain early, or done
// was called twice.
err := xerrors.New("done for a domain that doesn't exist")
a.logger.Critical(a.ctx, "internal error", slog.Error(err))
return err
}
w, ok := d.acquirees[clearance]
if !ok {
// this is a code error, as something removed the dKey early, or done
// was called twice.
err := xerrors.New("done for an acquiree that doesn't exist")
a.logger.Critical(a.ctx, "internal error", slog.Error(err))
return err
}
if !w.inProgress {
err := xerrors.New("done acquiree was not in progress")
a.logger.Critical(a.ctx, "internal error", slog.Error(err))
return err
}
delete(d.acquirees, clearance)
if len(d.acquirees) == 0 {
d.cancel()
delete(a.q, dk)
return nil
}
// in the mainline, this means that the acquiree successfully got a job.
// if any others are waiting, clear one of them to try to get a job next so
// that we process the jobs until there are no more acquirees or the database
// is empty of jobs meeting our criteria
for _, other := range d.acquirees {
if other.inProgress {
err := xerrors.New("more than one acquiree in progress for same key")
a.logger.Critical(a.ctx, "internal error", slog.Error(err))
return err
}
other.inProgress = true
other.clearance <- struct{}{}
break // just one
}
return nil
}
func (a *Acquirer) subscribe() {
subscribed := make(chan struct{})
go func() {
defer close(subscribed)
eb := backoff.NewExponentialBackOff()
eb.MaxElapsedTime = 0 // retry indefinitely
eb.MaxInterval = dbMaxBackoff
bkoff := backoff.WithContext(eb, a.ctx)
var cancel context.CancelFunc
err := backoff.Retry(func() error {
cancelFn, err := a.ps.SubscribeWithErr(provisionerjobs.EventJobPosted, a.jobPosted)
if err != nil {
a.logger.Warn(a.ctx, "failed to subscribe to job postings", slog.Error(err))
return err
}
cancel = cancelFn
return nil
}, bkoff)
if err != nil {
if a.ctx.Err() == nil {
a.logger.Error(a.ctx, "code bug: retry failed before context canceled", slog.Error(err))
}
return
}
defer cancel()
bkoff.Reset()
a.logger.Debug(a.ctx, "subscribed to job postings")
// unblock the outer function from returning
subscribed <- struct{}{}
// hold subscriptions open until context is canceled
<-a.ctx.Done()
}()
<-subscribed
}
func (a *Acquirer) jobPosted(ctx context.Context, message []byte, err error) {
if xerrors.Is(err, pubsub.ErrDroppedMessages) {
a.logger.Warn(a.ctx, "pubsub may have dropped job postings")
a.clearOrPendAll()
return
}
if err != nil {
a.logger.Warn(a.ctx, "unhandled pubsub error", slog.Error(err))
return
}
posting := provisionerjobs.JobPosting{}
err = json.Unmarshal(message, &posting)
if err != nil {
a.logger.Error(a.ctx, "unable to parse job posting",
slog.F("message", string(message)),
slog.Error(err),
)
return
}
a.logger.Debug(ctx, "got job posting", slog.F("posting", posting))
a.mu.Lock()
defer a.mu.Unlock()
for _, d := range a.q {
if d.contains(posting) {
a.clearOrPendLocked(d)
// we only need to wake up a single domain since there is only one
// new job available
return
}
}
}
func (a *Acquirer) clearOrPendAll() {
a.mu.Lock()
defer a.mu.Unlock()
for _, d := range a.q {
a.clearOrPendLocked(d)
}
}
func (a *Acquirer) clearOrPend(d domain) {
a.mu.Lock()
defer a.mu.Unlock()
if len(d.acquirees) == 0 {
// this can happen if the domain is removed right around the time the
// backup poll (which calls this function) triggers. Nothing to do
// since there are no acquirees.
return
}
a.clearOrPendLocked(d)
}
func (*Acquirer) clearOrPendLocked(d domain) {
// MUST BE CALLED HOLDING THE a.mu LOCK
var nominee *acquiree
for _, w := range d.acquirees {
if nominee == nil {
nominee = w
}
// acquiree in progress always takes precedence, since we don't want to
// wake up more than one acquiree per dKey at a time.
if w.inProgress {
nominee = w
break
}
}
if nominee.inProgress {
nominee.pending = true
return
}
nominee.inProgress = true
nominee.clearance <- struct{}{}
}
type dKey string
// domainKey generates a canonical map key for the given provisioner types and
// tags. It uses the null byte (0x00) as a delimiter because it is an
// unprintable control character and won't show up in any "reasonable" set of
// string tags, even in non-Latin scripts. It is important that Tags are
// validated not to contain this control character prior to use.
func domainKey(orgID uuid.UUID, pt []database.ProvisionerType, tags Tags) dKey {
sb := strings.Builder{}
_, _ = sb.WriteString(orgID.String())
_ = sb.WriteByte(0x00)
// make a copy of pt before sorting, so that we don't mutate the original
// slice or underlying array.
pts := make([]database.ProvisionerType, len(pt))
copy(pts, pt)
slices.Sort(pts)
for _, t := range pts {
_, _ = sb.WriteString(string(t))
_ = sb.WriteByte(0x00)
}
_ = sb.WriteByte(0x00)
var keys []string
for k := range tags {
keys = append(keys, k)
}
slices.Sort(keys)
for _, k := range keys {
_, _ = sb.WriteString(k)
_ = sb.WriteByte(0x00)
_, _ = sb.WriteString(tags[k])
_ = sb.WriteByte(0x00)
}
return dKey(sb.String())
}
// acquiree represents a specific client of Acquirer that wants to acquire a job
type acquiree struct {
clearance chan<- struct{}
// inProgress is true when the acquiree was granted clearance and a query
// is possibly in progress.
inProgress bool
// pending is true if we get a job posting while a query is in progress, so
// that we know to try again, even if we didn't get a job on the query.
pending bool
}
// domain represents a set of acquirees with the same provisioner types and
// tags. Acquirees in the same domain are restricted such that only one queries
// the database at a time.
type domain struct {
ctx context.Context
cancel context.CancelFunc
a *Acquirer
key dKey
pt []database.ProvisionerType
tags Tags
acquirees map[chan<- struct{}]*acquiree
}
func (d domain) contains(p provisionerjobs.JobPosting) bool {
if !slices.Contains(d.pt, p.ProvisionerType) {
return false
}
for k, v := range p.Tags {
dv, ok := d.tags[k]
if !ok {
return false
}
if v != dv {
return false
}
}
return true
}
func (d domain) poll(dur time.Duration) {
tkr := time.NewTicker(dur)
defer tkr.Stop()
for {
select {
case <-d.ctx.Done():
return
case <-tkr.C:
d.a.clearOrPend(d)
}
}
}