/
workers.go
527 lines (480 loc) · 18 KB
/
workers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
package api
import (
"context"
"fmt"
"log"
"time"
"github.com/brigadecore/brigade-foundations/crypto"
libCrypto "github.com/brigadecore/brigade/v2/apiserver/internal/lib/crypto"
"github.com/brigadecore/brigade/v2/apiserver/internal/meta"
"github.com/pkg/errors"
)
// LogLevel represents the desired granularity of Worker log output.
type LogLevel string
// LogLevelInfo represents INFO level granularity in Worker log output.
const LogLevelInfo LogLevel = "INFO"
// WorkerPhase represents where a Worker is within its lifecycle.
type WorkerPhase string
const (
// WorkerPhaseAborted represents the state wherein a Worker was forcefully
// stopped during execution.
WorkerPhaseAborted WorkerPhase = "ABORTED"
// WorkerPhaseCanceled represents the state wherein a pending Worker was
// canceled prior to execution.
WorkerPhaseCanceled WorkerPhase = "CANCELED"
// WorkerPhaseFailed represents the state wherein a Worker has run to
// completion but experienced errors.
WorkerPhaseFailed WorkerPhase = "FAILED"
// WorkerPhasePending represents the state wherein a Worker is awaiting
// execution.
WorkerPhasePending WorkerPhase = "PENDING"
// WorkerPhaseRunning represents the state wherein a Worker is currently
// being executed.
WorkerPhaseRunning WorkerPhase = "RUNNING"
// WorkerPhaseSchedulingFailed represents the state wherein a Worker was not
// scheduled due to some unexpected and unrecoverable error encountered by the
// scheduler.
WorkerPhaseSchedulingFailed WorkerPhase = "SCHEDULING_FAILED"
// WorkerPhaseStarting represents the state wherein a Worker is starting on
// the substrate but isn't running yet.
WorkerPhaseStarting WorkerPhase = "STARTING"
// WorkerPhaseSucceeded represents the state where a Worker has run to
// completion without error.
WorkerPhaseSucceeded WorkerPhase = "SUCCEEDED"
// WorkerPhaseTimedOut represents the state wherein a Worker has has not
// completed within a designated timeframe.
WorkerPhaseTimedOut WorkerPhase = "TIMED_OUT"
// WorkerPhaseUnknown represents the state wherein a Worker's state is
// unknown. Note that this is possible if and only if the underlying Worker
// execution substrate (Kubernetes), for some unanticipated, reason does not
// know the Worker's (Pod's) state.
WorkerPhaseUnknown WorkerPhase = "UNKNOWN"
)
// WorkerPhasesAll returns a slice of WorkerPhases containing ALL possible
// phases. Note that instead of utilizing a package-level slice, this a function
// returns ad-hoc copies of the slice in order to preclude the possibility of
// this important collection being modified at runtime.
func WorkerPhasesAll() []WorkerPhase {
return []WorkerPhase{
WorkerPhaseAborted,
WorkerPhaseCanceled,
WorkerPhaseFailed,
WorkerPhasePending,
WorkerPhaseRunning,
WorkerPhaseSchedulingFailed,
WorkerPhaseStarting,
WorkerPhaseSucceeded,
WorkerPhaseTimedOut,
WorkerPhaseUnknown,
}
}
// IsTerminal returns a bool indicating whether the WorkerPhase is terminal.
func (w WorkerPhase) IsTerminal() bool {
switch w {
case WorkerPhaseAborted:
fallthrough
case WorkerPhaseCanceled:
fallthrough
case WorkerPhaseFailed:
fallthrough
case WorkerPhaseSchedulingFailed:
fallthrough
case WorkerPhaseSucceeded:
fallthrough
case WorkerPhaseTimedOut:
return true
}
return false
}
// Worker represents a component that orchestrates handling of a single Event.
type Worker struct {
// Spec is the technical blueprint for the Worker.
Spec WorkerSpec `json:"spec" bson:"spec"`
// Status contains details of the Worker's current state.
Status WorkerStatus `json:"status" bson:"status"`
// Jobs contains details of all Jobs spawned by the Worker during handling of
// the Event.
Jobs []Job `json:"jobs,omitempty" bson:"jobs"`
}
// Job retrieves a Job by name. It returns a boolean indicating whether the
// returned Job is the one requested (true) or a zero value (false) because no
// Job with the specified name belongs to this Worker.
func (w *Worker) Job(jobName string) (Job, bool) {
for _, j := range w.Jobs {
if j.Name == jobName {
return j, true
}
}
return Job{}, false
}
// WorkerSpec is the technical blueprint for a Worker.
type WorkerSpec struct {
// Container specifies the details of an OCI container that forms the
// cornerstone of the Worker.
Container *ContainerSpec `json:"container,omitempty" bson:"container,omitempty"` // nolint: lll
// UseWorkspace indicates whether the Worker and/or any Jobs it may spawn
// requires access to a shared workspace. When false, no such workspace is
// provisioned prior to Worker creation. This is a generally useful feature,
// but by opting out of it (or rather, not opting-in), Job results can be made
// cacheable and Jobs resumable/retriable-- something which cannot be done
// otherwise since managing the state of the shared volume would require a
// layered file system that we currently do not have.
UseWorkspace bool `json:"useWorkspace" bson:"useWorkspace"`
// WorkspaceSize specifies the size of a volume that will be provisioned as
// a shared workspace for the Worker and any Jobs it spawns.
// The value can be expressed in bytes (as a plain integer) or as a
// fixed-point integer using one of these suffixes: E, P, T, G, M, K.
// Power-of-two equivalents may also be used: Ei, Pi, Ti, Gi, Mi, Ki.
WorkspaceSize string `json:"workspaceSize,omitempty" bson:"workspaceSize,omitempty"` // nolint: lll
// Git contains git-specific Worker details.
Git *GitConfig `json:"git,omitempty"`
// Kubernetes contains Kubernetes-specific Worker details.
Kubernetes *KubernetesConfig `json:"kubernetes,omitempty" bson:"kubernetes,omitempty"` // nolint: lll
// JobPolicies specifies policies for any Jobs spawned by the Worker.
JobPolicies *JobPolicies `json:"jobPolicies,omitempty" bson:"jobPolicies,omitempty"` // nolint: lll
// LogLevel specifies the desired granularity of Worker log output.
LogLevel LogLevel `json:"logLevel,omitempty" bson:"logLevel,omitempty"`
// ConfigFilesDirectory specifies a directory within the Worker's workspace
// where any relevant configuration files (e.g. brigade.js, package.json,
// etc.) can be located.
ConfigFilesDirectory string `json:"configFilesDirectory,omitempty" bson:"configFilesDirectory,omitempty"` // nolint: lll
// DefaultConfigFiles is a map of configuration file names to configuration
// file content. This is useful for Workers that do not integrate with any
// source control system and would like to embed configuration (e.g.
// package.json) or scripts (e.g. brigade.js) directly within the WorkerSpec.
DefaultConfigFiles map[string]string `json:"defaultConfigFiles,omitempty" bson:"defaultConfigFiles,omitempty"` // nolint: lll
// TimeoutDuration specifies the time duration that must elapse before a
// running Job should be considered to have timed out. This duration string
// is a possibly signed sequence of decimal numbers, each with optional
// fraction and a unit suffix, such as "300ms", "-1.5h" or "2h45m".
// Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
TimeoutDuration string `json:"timeoutDuration,omitempty" bson:"timeoutDuration,omitempty"` // nolint: lll
}
// GitConfig represents git-specific Worker details.
type GitConfig struct {
// CloneURL specifies the location from where a source code repository may
// be cloned.
CloneURL string `json:"cloneURL,omitempty" bson:"cloneURL,omitempty"`
// Commit specifies a revision (by SHA) to be checked out. If non-empty, this
// field takes precedence over any value in the Ref field.
Commit string `json:"commit,omitempty" bson:"commit,omitempty"`
// Ref is a symbolic reference to a revision to be checked out. If non-empty,
// the value of the Commit field supercedes any value in this field. Example
// uses of this field include referencing a branch (refs/heads/<branch name>)
// or a tag (refs/tags/<tag name>). If left blank, this field is interpreted
// as a reference to the repository's default branch.
Ref string `json:"ref,omitempty" bson:"ref,omitempty"`
// InitSubmodules indicates whether to clone the repository's submodules.
InitSubmodules bool `json:"initSubmodules" bson:"initSubmodules"`
}
// KubernetesConfig represents Kubernetes-specific Worker or Job configuration.
type KubernetesConfig struct {
// ImagePullSecrets enumerates any image pull secrets that Kubernetes may use
// when pulling the OCI image on which a Worker's or Job's container is based.
// This field only needs to be utilized in the case of private, custom Worker
// or Job images. The image pull secrets in question must be created
// out-of-band by a sufficiently authorized user of the Kubernetes cluster.
ImagePullSecrets []string `json:"imagePullSecrets,omitempty" bson:"imagePullSecrets,omitempty"` // nolint: lll
}
// JobPolicies represents policies for any Jobs spawned by a Worker.
type JobPolicies struct {
// AllowPrivileged specifies whether the Worker is permitted to launch Jobs
// that utilize privileged containers.
AllowPrivileged bool `json:"allowPrivileged" bson:"allowPrivileged"`
// AllowDockerSocketMount specifies whether the Worker is permitted to launch
// Jobs that mount the underlying host's Docker socket into its own file
// system.
//
// Note: This is being removed for the 2.0.0 release because of security
// issues AND declining usefulness. (Many Kubernetes distros now use
// containerd instead of Docker.) This can be put back in the future if the
// need is proven AND if it can be done safely.
//
// For more details, see https://github.com/brigadecore/brigade/issues/1666
//
// nolint: lll
// AllowDockerSocketMount bool `json:"allowDockerSocketMount" bson:"allowDockerSocketMount"`
}
// WorkerStatus represents the status of a Worker.
type WorkerStatus struct {
// Started indicates the time the Worker began execution. It will be nil for
// a Worker that is not yet executing.
Started *time.Time `json:"started,omitempty" bson:"started,omitempty"`
// Ended indicates the time the Worker concluded execution. It will be nil
// for a Worker that is not done executing (or hasn't started).
Ended *time.Time `json:"ended,omitempty" bson:"ended,omitempty"`
// Phase indicates where the Worker is in its lifecycle.
Phase WorkerPhase `json:"phase,omitempty" bson:"phase,omitempty"`
}
// WorkersService is the specialized interface for managing Workers. It's
// decoupled from underlying technology choices (e.g. data store, message bus,
// etc.) to keep business logic reusable and consistent while the underlying
// tech stack remains free to change.
type WorkersService interface {
// Start starts the indicated Event's Worker on Brigade's workload
// execution substrate. If the specified Event does not exist, implementations
// MUST return a *meta.ErrNotFound.
Start(ctx context.Context, eventID string) error
// GetStatus returns an Event's Worker's status. If the specified Event does
// not exist, implementations MUST return a *meta.ErrNotFound.
GetStatus(
ctx context.Context,
eventID string,
) (WorkerStatus, error)
// WatchStatus returns a channel over which an Event's Worker's status is
// streamed. The channel receives a new WorkerStatus every time there is any
// change in that status. If the specified Event does not exist,
// implementations MUST return a *meta.ErrNotFound.
WatchStatus(
ctx context.Context,
eventID string,
) (<-chan WorkerStatus, error)
// UpdateStatus updates the status of an Event's Worker. If the specified
// Event does not exist, implementations MUST return a *meta.ErrNotFound.
UpdateStatus(
ctx context.Context,
eventID string,
status WorkerStatus,
) error
// Cleanup removes Worker-related resources from the substrate, presumably
// upon completion, without deleting the Worker from the data store.
Cleanup(ctx context.Context, eventID string) error
// Timeout updates the status of an Event's Worker that has timed out and
// then proceeds to remove Worker-related resources from the substrate.
Timeout(ctx context.Context, eventID string) error
}
type workersService struct {
authorize AuthorizeFn
projectsStore ProjectsStore
eventsStore EventsStore
workersStore WorkersStore
substrate Substrate
}
// NewWorkersService returns a specialized interface for managing Workers.
func NewWorkersService(
authorizeFn AuthorizeFn,
projectsStore ProjectsStore,
eventsStore EventsStore,
workersStore WorkersStore,
substrate Substrate,
) WorkersService {
return &workersService{
authorize: authorizeFn,
projectsStore: projectsStore,
eventsStore: eventsStore,
workersStore: workersStore,
substrate: substrate,
}
}
func (w *workersService) Start(ctx context.Context, eventID string) error {
if err := w.authorize(ctx, RoleScheduler, ""); err != nil {
return err
}
event, err := w.eventsStore.Get(ctx, eventID)
if err != nil {
return errors.Wrapf(err, "error retrieving event %q from store", eventID)
}
if event.Worker.Status.Phase != WorkerPhasePending {
return &meta.ErrConflict{
Type: EventKind,
ID: event.ID,
Reason: fmt.Sprintf(
"Event %q worker has already been started.",
event.ID,
),
}
}
project, err := w.projectsStore.Get(ctx, event.ProjectID)
if err != nil {
return errors.Wrapf(
err,
"error retrieving project %q from store",
event.ProjectID,
)
}
// This is a token unique to the Event so that the Event's Worker can use when
// communicating with the API server to do things like spawn a new Job. i.e.
// Only THIS event's worker can create new Jobs for THIS event.
token := libCrypto.NewToken(256)
hashedToken := crypto.Hash("", token)
if err =
w.workersStore.UpdateHashedToken(ctx, eventID, hashedToken); err != nil {
return errors.Wrapf(
err,
"error updating event %q worker hashed token in store",
eventID,
)
}
if err = w.workersStore.UpdateStatus(
ctx,
eventID,
WorkerStatus{
Phase: WorkerPhaseStarting,
},
); err != nil {
return errors.Wrapf(
err,
"error updating status of event %q worker in store",
eventID,
)
}
if err = w.substrate.StartWorker(ctx, project, event, token); err != nil {
return errors.Wrapf(err, "error starting worker for event %q", event.ID)
}
return nil
}
func (w *workersService) GetStatus(
ctx context.Context,
eventID string,
) (WorkerStatus, error) {
if err := w.authorize(ctx, RoleReader, ""); err != nil {
return WorkerStatus{}, err
}
event, err := w.eventsStore.Get(ctx, eventID)
if err != nil {
return WorkerStatus{},
errors.Wrapf(err, "error retrieving event %q from store", eventID)
}
return event.Worker.Status, nil
}
func (w *workersService) WatchStatus(
ctx context.Context,
eventID string,
) (<-chan WorkerStatus, error) {
if err := w.authorize(ctx, RoleReader, ""); err != nil {
return nil, err
}
// Read the event up front to confirm it exists.
if _, err := w.eventsStore.Get(ctx, eventID); err != nil {
return nil,
errors.Wrapf(err, "error retrieving event %q from store", eventID)
}
statusCh := make(chan WorkerStatus)
go func() {
defer close(statusCh)
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
case <-ctx.Done():
return
}
event, err := w.eventsStore.Get(ctx, eventID)
if err != nil {
log.Printf("error retrieving event %q from store: %s", eventID, err)
return
}
select {
case statusCh <- event.Worker.Status:
case <-ctx.Done():
return
}
}
}()
return statusCh, nil
}
func (w *workersService) UpdateStatus(
ctx context.Context,
eventID string,
status WorkerStatus,
) error {
if err := w.authorize(ctx, RoleObserver, ""); err != nil {
return err
}
// Check current phase of event worker
event, err := w.eventsStore.Get(ctx, eventID)
if err != nil {
return errors.Wrapf(err, "error retrieving event %q from store", eventID)
}
return w.updateStatus(ctx, event, status)
}
func (w *workersService) Cleanup(
ctx context.Context,
eventID string,
) error {
if err := w.authorize(ctx, RoleObserver, ""); err != nil {
return err
}
event, err := w.eventsStore.Get(ctx, eventID)
if err != nil {
return errors.Wrapf(err, "error retrieving event %q from store", eventID)
}
return w.cleanup(ctx, event)
}
func (w *workersService) Timeout(
ctx context.Context,
eventID string,
) error {
if err := w.authorize(ctx, RoleObserver, ""); err != nil {
return err
}
event, err := w.eventsStore.Get(ctx, eventID)
if err != nil {
return errors.Wrapf(err, "error retrieving event %q from store", eventID)
}
if err := w.workersStore.Timeout(ctx, eventID); err != nil {
return errors.Wrapf(err, "error timing out worker for event %q", eventID)
}
return w.cleanup(ctx, event)
}
// updateStatus is an internal helper func created so that multiple exported
// functions can share this logic after they've retrieved specified events.
func (w *workersService) updateStatus(
ctx context.Context,
event Event,
status WorkerStatus,
) error {
// We have a conflict if the worker's phase is already terminal
if event.Worker.Status.Phase.IsTerminal() {
return &meta.ErrConflict{
Type: EventKind,
ID: event.ID,
Reason: fmt.Sprintf(
"Event %q worker has already reached a terminal phase.",
event.ID,
),
}
}
return errors.Wrapf(
w.workersStore.UpdateStatus(
ctx,
event.ID,
status,
),
"error updating status of event %q worker in store",
event.ID,
)
}
// cleanup is an internal helper func created so that multiple exported
// functions can share this logic after they've retrieved specified events.
func (w *workersService) cleanup(ctx context.Context, event Event) error {
project, err := w.projectsStore.Get(ctx, event.ProjectID)
if err != nil {
return errors.Wrapf(
err,
"error retrieving project %q from store",
event.ProjectID,
)
}
return errors.Wrapf(
w.substrate.DeleteWorkerAndJobs(ctx, project, event),
"error deleting event %q worker and jobs from the substrate",
event.ID,
)
}
// WorkersStore is an interface for components that implement Worker persistence
// concerns.
type WorkersStore interface {
UpdateStatus(
ctx context.Context,
eventID string,
status WorkerStatus,
) error
UpdateHashedToken(
ctx context.Context,
eventID string,
hashedToken string,
) error
Timeout(ctx context.Context, eventID string) error
}