-
Notifications
You must be signed in to change notification settings - Fork 136
/
job.go
451 lines (391 loc) · 12.2 KB
/
job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
package jobdb
import (
"time"
"github.com/google/uuid"
"golang.org/x/exp/maps"
v1 "k8s.io/api/core/v1"
"github.com/armadaproject/armada/internal/armada/configuration"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
)
// Job is the scheduler-internal representation of a job.
type Job struct {
// String representation of the job id
id string
// Name of the queue this job belongs to.
queue string
// Jobset the job belongs to
// We store this as it's needed for sending job event messages
jobset string
// Per-queue priority of this job.
priority uint32
// Requested per queue priority of this job.
// This is used when syncing the postgres database with the scheduler-internal database
requestedPriority uint32
// Logical timestamp indicating the order in which jobs are submitted.
// Jobs with identical Queue and Priority are sorted by this.
created int64
// True if the job is currently queued.
// If this is set then the job will not be considered for scheduling
queued bool
// The current version of the queued state
queuedVersion int32
// Scheduling requirements of this job.
jobSchedulingInfo *schedulerobjects.JobSchedulingInfo
// True if the user has requested this job be cancelled
cancelRequested bool
// True if the user has requested this job's jobset be cancelled
cancelByJobsetRequested bool
// True if the scheduler has cancelled the job
cancelled bool
// True if the scheduler has failed the job
failed bool
// True if the scheduler has marked the job as succeeded
succeeded bool
// Job Runs by run id
runsById map[uuid.UUID]*JobRun
// The currently active run. The run with the latest timestamp is the active run
activeRun *JobRun
// The timestamp of the currently active run.
activeRunTimestamp int64
}
func EmptyJob(id string) *Job {
return &Job{id: id, runsById: map[uuid.UUID]*JobRun{}}
}
// NewJob creates a new scheduler job
func NewJob(
jobId string,
jobset string,
queue string,
priority uint32,
schedulingInfo *schedulerobjects.JobSchedulingInfo,
queued bool,
queuedVersion int32,
cancelRequested bool,
cancelByJobsetRequested bool,
cancelled bool,
created int64,
) *Job {
return &Job{
id: jobId,
jobset: jobset,
queue: queue,
queued: queued,
queuedVersion: queuedVersion,
priority: priority,
requestedPriority: priority,
jobSchedulingInfo: schedulingInfo,
cancelRequested: cancelRequested,
cancelByJobsetRequested: cancelByJobsetRequested,
cancelled: cancelled,
created: created,
runsById: map[uuid.UUID]*JobRun{},
}
}
// Id returns the id of the Job.
func (job *Job) Id() string {
return job.id
}
// GetId returns the id of the Job.
// This is needed for the LegacyJob interface.
func (job *Job) GetId() string {
return job.id
}
// Jobset returns the jobset the job belongs to.
func (job *Job) Jobset() string {
return job.jobset
}
// GetJobSet returns the jobset the job belongs to.
// This is needed for compatibility with legacyJob
func (job *Job) GetJobSet() string {
return job.jobset
}
// Queue returns the queue this job belongs to.
func (job *Job) Queue() string {
return job.queue
}
// GetQueue returns the queue this job belongs to.
// This is needed for the LegacyJob interface.
func (job *Job) GetQueue() string {
return job.queue
}
// Priority returns the priority of the job.
func (job *Job) Priority() uint32 {
return job.priority
}
// RequestedPriority returns the requested priority of the job.
func (job *Job) RequestedPriority() uint32 {
return job.requestedPriority
}
// WithPriority returns a copy of the job with the priority updated.
func (job *Job) WithPriority(priority uint32) *Job {
j := copyJob(*job)
j.priority = priority
return j
}
// WithRequestedPriority returns a copy of the job with the priority updated.
func (job *Job) WithRequestedPriority(priority uint32) *Job {
j := copyJob(*job)
j.requestedPriority = priority
return j
}
// JobSchedulingInfo returns the scheduling requirements associated with the job
func (job *Job) JobSchedulingInfo() *schedulerobjects.JobSchedulingInfo {
return job.jobSchedulingInfo
}
// GetAnnotations returns the annotations on the job.
// This is needed for compatibility with interfaces.LegacySchedulerJob
func (job *Job) GetAnnotations() map[string]string {
if req := job.getPodRequirements(); req != nil {
return req.Annotations
}
return nil
}
// GetRequirements returns the scheduling requirements associated with the job.
// Needed for compatibility with interfaces.LegacySchedulerJob
func (job *Job) GetRequirements(_ map[string]configuration.PriorityClass) *schedulerobjects.JobSchedulingInfo {
return job.JobSchedulingInfo()
}
// Needed for compatibility with interfaces.LegacySchedulerJob
func (job *Job) GetPriorityClassName() string {
return job.JobSchedulingInfo().PriorityClassName
}
// Needed for compatibility with interfaces.LegacySchedulerJob
func (job *Job) GetNodeSelector() map[string]string {
if req := job.getPodRequirements(); req != nil {
return req.NodeSelector
}
return nil
}
// Needed for compatibility with interfaces.LegacySchedulerJob
func (job *Job) GetAffinity() *v1.Affinity {
if req := job.getPodRequirements(); req != nil {
return req.Affinity
}
return nil
}
// Needed for compatibility with interfaces.LegacySchedulerJob
func (job *Job) GetTolerations() []v1.Toleration {
if req := job.getPodRequirements(); req != nil {
return req.Tolerations
}
return nil
}
// Needed for compatibility with interfaces.LegacySchedulerJob
func (job *Job) GetResourceRequirements() v1.ResourceRequirements {
if req := job.getPodRequirements(); req != nil {
return req.ResourceRequirements
}
return v1.ResourceRequirements{}
}
func (job *Job) getPodRequirements() *schedulerobjects.PodRequirements {
requirements := job.jobSchedulingInfo.GetObjectRequirements()
if len(requirements) == 0 {
return nil
}
if podReqs := requirements[0].GetPodRequirements(); podReqs != nil {
return podReqs
}
return nil
}
// Queued returns true if the job should be considered by the scheduler for assignment or false otherwise.
func (job *Job) Queued() bool {
return job.queued
}
// WithQueued returns a copy of the job with the queued status updated.
func (job *Job) WithQueued(queued bool) *Job {
j := copyJob(*job)
j.queued = queued
return j
}
// QueuedVersion returns current queued state version.
func (job *Job) QueuedVersion() int32 {
return job.queuedVersion
}
// WithQueuedVersion returns a copy of the job with the queued version updated.
func (job *Job) WithQueuedVersion(version int32) *Job {
j := copyJob(*job)
j.queuedVersion = version
return j
}
// CancelRequested returns true if the user has requested this job be cancelled.
func (job *Job) CancelRequested() bool {
return job.cancelRequested
}
// CancelByJobsetRequested returns true if the user has requested this job's jobset be cancelled.
func (job *Job) CancelByJobsetRequested() bool {
return job.cancelByJobsetRequested
}
// WithCancelRequested returns a copy of the job with the cancelRequested status updated.
func (job *Job) WithCancelRequested(cancelRequested bool) *Job {
j := copyJob(*job)
j.cancelRequested = cancelRequested
return j
}
// WithCancelByJobsetRequested returns a copy of the job with the cancelByJobsetRequested status updated.
func (job *Job) WithCancelByJobsetRequested(cancelByJobsetRequested bool) *Job {
j := copyJob(*job)
j.cancelByJobsetRequested = cancelByJobsetRequested
return j
}
// Cancelled Returns true if the scheduler has cancelled the job
func (job *Job) Cancelled() bool {
return job.cancelled
}
// WithCancelled returns a copy of the job with the cancelled status updated
func (job *Job) WithCancelled(cancelled bool) *Job {
j := copyJob(*job)
j.cancelled = cancelled
return j
}
// Succeeded Returns true if the scheduler has marked the job as succeeded
func (job *Job) Succeeded() bool {
return job.succeeded
}
// WithSucceeded returns a copy of the job with the succeeded status updated.
func (job *Job) WithSucceeded(succeeded bool) *Job {
j := copyJob(*job)
j.succeeded = succeeded
return j
}
// Failed Returns true if the scheduler has marked the job as failed
func (job *Job) Failed() bool {
return job.failed
}
// WithFailed returns a copy of the job with the failed status updated.
func (job *Job) WithFailed(failed bool) *Job {
j := copyJob(*job)
j.failed = failed
return j
}
// Created Returns the creation time of the job
func (job *Job) Created() int64 {
return job.created
}
// InTerminalState returns true if the job is in a terminal state
func (job *Job) InTerminalState() bool {
return job.succeeded || job.cancelled || job.failed
}
// HasRuns returns true if the job has been run
// If this is returns true then LatestRun is guaranteed to return a non-nil value.
func (job *Job) HasRuns() bool {
return job.activeRun != nil
}
// WithNewRun creates a copy of the job with a new run on the given executor.
func (job *Job) WithNewRun(executor string, node string) *Job {
run := &JobRun{
id: uuid.New(),
jobId: job.id,
created: time.Now().UnixNano(),
executor: executor,
node: node,
}
return job.WithUpdatedRun(run)
}
// WithUpdatedRun creates a copy of the job with run details updated.
func (job *Job) WithUpdatedRun(run *JobRun) *Job {
j := copyJob(*job)
j.runsById = maps.Clone(j.runsById)
if run.created >= j.activeRunTimestamp {
j.activeRunTimestamp = run.created
j.activeRun = run
}
j.runsById[run.id] = run
return j
}
// NumReturned returns the number of times this job has been returned by executors
// Note that this is O(N) on Runs, but this should be fine as the number of runs should be small.
func (job *Job) NumReturned() uint {
returned := uint(0)
for _, run := range job.runsById {
if run.returned {
returned++
}
}
return returned
}
// NumAttempts returns the number of times the executors tried to run this job
// Note that this is O(N) on Runs, but this should be fine as the number of runs should be small.
func (job *Job) NumAttempts() uint {
attempts := uint(0)
for _, run := range job.runsById {
if run.runAttempted {
attempts++
}
}
return attempts
}
// AllRuns returns all runs associated with job.
func (job *Job) AllRuns() []*JobRun {
return maps.Values(job.runsById)
}
// LatestRun returns the currently active job run or nil if there are no runs yet.
// Callers should either guard against nil values explicitly or via HasRuns.
func (job *Job) LatestRun() *JobRun {
return job.activeRun
}
// RunById returns the Run corresponding to the provided run id or nil if no such Run exists.
func (job *Job) RunById(id uuid.UUID) *JobRun {
return job.runsById[id]
}
// WithJobset returns a copy of the job with the jobset updated.
func (job *Job) WithJobset(jobset string) *Job {
j := copyJob(*job)
j.jobset = jobset
return j
}
// WithQueue returns a copy of the job with the queue updated.
func (job *Job) WithQueue(queue string) *Job {
j := copyJob(*job)
j.queue = queue
return j
}
// WithCreated returns a copy of the job with the creation time updated.
func (job *Job) WithCreated(created int64) *Job {
j := copyJob(*job)
j.created = created
return j
}
// WithJobSchedulingInfo returns a copy of the job with the job scheduling info updated.
func (job *Job) WithJobSchedulingInfo(jobSchedulingInfo *schedulerobjects.JobSchedulingInfo) *Job {
j := copyJob(*job)
j.jobSchedulingInfo = jobSchedulingInfo
return j
}
// copyJob makes a copy of the job
func copyJob(j Job) *Job {
return &j
}
type JobPriorityComparer struct{}
// Compare jobs first by priority then by created and finally by id.
// returns -1 if a should come before b, 1 if a should come after b and 0 if the two jobs are equal
func (j JobPriorityComparer) Compare(a, b *Job) int {
if a == b {
return 0
}
// Compare the jobs by priority
if a.priority != b.priority {
if a.priority > b.priority {
return -1
} else {
return 1
}
}
// If the jobs have the same priority, compare them by created timestamp
if a.created != b.created {
if a.created < b.created {
return -1
} else {
return 1
}
}
// If the jobs have the same priority and created timestamp, compare them by ID
if a.id != b.id {
if a.id < b.id {
return -1
} else {
return 1
}
}
// If the jobs have the same ID, return 0
return 0
}