This repository has been archived by the owner on Nov 1, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
job.go
158 lines (133 loc) · 4.18 KB
/
job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package jobs
import (
"encoding/json"
"errors"
"time"
"github.com/weaveworks/flux"
"github.com/weaveworks/flux/guid"
)
const (
// DefaultQueue is the queue to use if none is set.
DefaultQueue = "default"
// ReleaseJob is the method for a release job
ReleaseJob = "release"
// AutomatedInstanceJob is the method for a check automated instance job
AutomatedInstanceJob = "automated_instance"
// PriorityBackground is priority for background jobs
PriorityBackground = 100
// PriorityInteractive is priority for interactive jobs
PriorityInteractive = 200
)
var (
ErrNoSuchJob = errors.New("no such release job found")
ErrNoJobAvailable = errors.New("no job available")
ErrUnknownJobMethod = errors.New("unknown job method")
ErrJobAlreadyQueued = errors.New("job is already queued")
)
type JobStore interface {
JobReadPusher
JobWritePopper
GC() error
}
type JobReadPusher interface {
GetJob(flux.InstanceID, JobID) (Job, error)
PutJob(flux.InstanceID, Job) (JobID, error)
PutJobIgnoringDuplicates(flux.InstanceID, Job) (JobID, error)
}
type JobWritePopper interface {
JobUpdater
JobPopper
}
type JobUpdater interface {
UpdateJob(Job) error
Heartbeat(JobID) error
}
type JobPopper interface {
NextJob(queues []string) (Job, error)
}
type JobID string
func NewJobID() JobID {
return JobID(guid.New())
}
// Job describes a worker job
type Job struct {
Instance flux.InstanceID `json:"instanceID"`
ID JobID `json:"id"`
// To be set when scheduling the job
Queue string `json:"queue"`
Method string `json:"method"`
Params interface{} `json:"params"`
ScheduledAt time.Time `json:"scheduled_at"`
Priority int `json:"priority"`
// Key is an optional field, and can be used to create jobs iff a pending
// job with the same key doesn't exist.
Key string `json:"key,omitempty"`
// To be used by the worker
Submitted time.Time `json:"submitted"`
Claimed time.Time `json:"claimed,omitempty"`
Heartbeat time.Time `json:"heartbeat,omitempty"`
Finished time.Time `json:"finished,omitempty"`
Log []string `json:"log,omitempty"`
Status string `json:"status"`
Done bool `json:"done"`
Success bool `json:"success"` // only makes sense after done is true
}
func (j *Job) UnmarshalJSON(data []byte) error {
var wireJob struct {
Instance flux.InstanceID `json:"instanceID"`
ID JobID `json:"id"`
// To be set when scheduling the job
Queue string `json:"queue"`
Method string `json:"method"`
Params json.RawMessage `json:"params"`
ScheduledAt time.Time `json:"scheduled_at"`
Priority int `json:"priority"`
// Key is an optional field, and can be used to create jobs iff a pending
// job with the same key doesn't exist.
Key string `json:"key,omitempty"`
// To be used by the worker
Submitted time.Time `json:"submitted"`
Claimed time.Time `json:"claimed,omitempty"`
Heartbeat time.Time `json:"heartbeat,omitempty"`
Finished time.Time `json:"finished,omitempty"`
Log []string `json:"log,omitempty"`
Status string `json:"status"`
Done bool `json:"done"`
Success bool `json:"success"` // only makes sense after done is true
}
if err := json.Unmarshal(data, &wireJob); err != nil {
return err
}
*j = Job{
Instance: wireJob.Instance,
ID: wireJob.ID,
Queue: wireJob.Queue,
Method: wireJob.Method,
ScheduledAt: wireJob.ScheduledAt,
Priority: wireJob.Priority,
Key: wireJob.Key,
Submitted: wireJob.Submitted,
Claimed: wireJob.Claimed,
Heartbeat: wireJob.Heartbeat,
Finished: wireJob.Finished,
Log: wireJob.Log,
Status: wireJob.Status,
Done: wireJob.Done,
Success: wireJob.Success,
}
switch j.Method {
case ReleaseJob:
var p ReleaseJobParams
if err := json.Unmarshal(wireJob.Params, &p); err != nil {
return err
}
j.Params = p
}
return nil
}
// ReleaseJobParams are the params for a release job
type ReleaseJobParams flux.ReleaseSpec
// AutomatedInstanceJobParams are the params for an automated_instance job
type AutomatedInstanceJobParams struct {
InstanceID flux.InstanceID
}