-
Notifications
You must be signed in to change notification settings - Fork 3
/
job.go
225 lines (181 loc) · 5.37 KB
/
job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
package axe
import (
"time"
"github.com/256dpi/fire/coal"
"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
)
// Status defines the allowed statuses of a job.
type Status string
// The available job statuses.
const (
// StatusEnqueued is used as the initial status when jobs are created.
StatusEnqueued Status = "enqueued"
// StatusDequeued is set when a job has been successfully dequeued.
StatusDequeued Status = "dequeued"
// StatusCompleted is set when a jobs has been successfully executed.
StatusCompleted Status = "completed"
// StatusFailed is set when an execution of a job failed.
StatusFailed Status = "failed"
// StatusCancelled is set when a jobs has been cancelled.
StatusCancelled Status = "cancelled"
)
// Model can be any BSON serializable type.
type Model interface{}
// Job is a single job managed by a queue.
type Job struct {
coal.Base `json:"-" bson:",inline" coal:"jobs"`
// The name of the job.
Name string `json:"name" bson:"name"`
// The data that has been supplied on creation.
Data bson.Raw `json:"data" bson:"data"`
// The current status of the job.
Status Status `json:"status" bson:"status"`
// The time when the job was created.
Created time.Time `json:"created-at" bson:"created_at"`
// The time when the job is available for execution.
Available time.Time `json:"available-at" bson:"available_at"`
// The time when the job was dequeue the last time.
Started *time.Time `json:"started-at" bson:"started_at"`
// The time when the last attempt ended (completed, failed or cancelled).
Ended *time.Time `json:"ended-at" bson:"ended_at"`
// The time when the job was finished (completed or cancelled).
Finished *time.Time `json:"finished-at" bson:"finished_at"`
// Attempts is incremented with each execution attempt.
Attempts int `json:"attempts" bson:"attempts"`
// The result submitted during completion.
Result bson.M `json:"result" bson:"result"`
// The last message submitted when the job was failed or cancelled.
Reason string `json:"reason" bson:"reason"`
}
// AddJobIndexes will add job indexes to the specified indexer. If removeAfter
// is specified, completed and cancelled jobs are automatically removed when
// their finished timestamp falls behind the specified duration.
//
// Note: It is recommended to create custom indexes that support the exact
// nature of data and access patterns.
func AddJobIndexes(indexer *coal.Indexer, removeAfter time.Duration) {
// add name index
indexer.Add(&Job{}, false, 0, "Name")
// add status index
indexer.Add(&Job{}, false, 0, "Status")
// add finished index
indexer.Add(&Job{}, false, removeAfter, "Finished")
}
// Enqueue will enqueue a job using the specified name and data. If a delay
// is specified the job will not be dequeued until the specified time has passed.
func Enqueue(store *coal.SubStore, name string, data Model, delay time.Duration) (*Job, error) {
// set default data
if data == nil {
data = bson.M{}
}
// get time
now := time.Now()
// prepare job
job := coal.Init(&Job{
Name: name,
Status: StatusEnqueued,
Created: now,
Available: now.Add(delay),
}).(*Job)
// marshall data
raw, err := bson.Marshal(data)
if err != nil {
return nil, err
}
// marshall into job
err = bson.Unmarshal(raw, &job.Data)
if err != nil {
return nil, err
}
// insert job
err = store.C(job).Insert(job)
if err != nil {
return nil, err
}
return job, nil
}
func dequeue(store *coal.SubStore, id bson.ObjectId, timeout time.Duration) (*Job, error) {
// get time
now := time.Now()
// dequeue job
var job Job
_, err := store.C(&Job{}).Find(bson.M{
"_id": id,
coal.F(&Job{}, "Status"): bson.M{
"$in": []Status{StatusEnqueued, StatusDequeued, StatusFailed},
},
coal.F(&Job{}, "Available"): bson.M{
"$lte": now,
},
}).Sort("_id").Apply(mgo.Change{
Update: bson.M{
"$set": bson.M{
coal.F(&Job{}, "Status"): StatusDequeued,
coal.F(&Job{}, "Started"): now,
coal.F(&Job{}, "Available"): now.Add(timeout),
},
"$inc": bson.M{
coal.F(&Job{}, "Attempts"): 1,
},
},
ReturnNew: true,
}, &job)
if err == mgo.ErrNotFound {
return nil, nil
} else if err != nil {
return nil, err
}
return &job, nil
}
func complete(store *coal.SubStore, id bson.ObjectId, result bson.M) error {
// get time
now := time.Now()
// update job
err := store.C(&Job{}).UpdateId(id, bson.M{
"$set": bson.M{
coal.F(&Job{}, "Status"): StatusCompleted,
coal.F(&Job{}, "Result"): result,
coal.F(&Job{}, "Ended"): now,
coal.F(&Job{}, "Finished"): now,
},
})
if err != nil {
return err
}
return nil
}
func fail(store *coal.SubStore, id bson.ObjectId, reason string, delay time.Duration) error {
// get time
now := time.Now()
// update job
err := store.C(&Job{}).UpdateId(id, bson.M{
"$set": bson.M{
coal.F(&Job{}, "Status"): StatusFailed,
coal.F(&Job{}, "Reason"): reason,
coal.F(&Job{}, "Ended"): now,
coal.F(&Job{}, "Available"): now.Add(delay),
},
})
if err != nil {
return err
}
return nil
}
func cancel(store *coal.SubStore, id bson.ObjectId, reason string) error {
// get time
now := time.Now()
// update job
err := store.C(&Job{}).UpdateId(id, bson.M{
"$set": bson.M{
coal.F(&Job{}, "Status"): StatusCancelled,
coal.F(&Job{}, "Reason"): reason,
coal.F(&Job{}, "Ended"): now,
coal.F(&Job{}, "Finished"): now,
},
})
if err != nil {
return err
}
return nil
}