-
Notifications
You must be signed in to change notification settings - Fork 3
/
task.go
411 lines (337 loc) · 8.34 KB
/
task.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
package axe
import (
"context"
"errors"
"time"
"github.com/256dpi/xo"
"gopkg.in/tomb.v2"
"github.com/256dpi/fire/coal"
"github.com/256dpi/fire/stick"
)
// Error is used to control retry a cancellation. These errors are expected and
// are not forwarded to the reporter.
type Error struct {
Reason string
Retry bool
}
// E is a shorthand to construct an error. If retry is true the job will be
// retried and if false it will be cancelled. These settings take precedence
// over the tasks max attempts setting.
func E(reason string, retry bool) *Error {
return &Error{
Reason: reason,
Retry: retry,
}
}
// Error implements the error interface.
func (c *Error) Error() string {
return c.Reason
}
// Context holds and stores contextual data.
type Context struct {
// The context that is cancelled when the task timeout has been reached.
//
// Values: opentracing.Span, *xo.Tracer
context.Context
// The executed job.
Job Job
// The current attempt to execute the job.
//
// Usage: Read Only
Attempt int
// The task that executes this job.
//
// Usage: Read Only
Task *Task
// The queue this job was dequeued from.
//
// Usage: Read Only
Queue *Queue
// The current tracer.
//
// Usage: Read Only
Tracer *xo.Tracer
}
// Update will update the job and set the provided execution status and progress.
func (c *Context) Update(status string, progress float64) error {
return Update(c, c.Queue.options.Store, c.Job, status, progress)
}
// Task describes work that is managed using a job queue.
type Task struct {
// The job this task should execute.
Job Job
// The callback that is called with jobs for execution. The handler may
// return errors formatted with E to manually control the state of the job.
Handler func(ctx *Context) error
// The callback that is called once a job has been completed or cancelled.
Notifier func(ctx *Context, cancelled bool, reason string) error
// The number for spawned workers that dequeue and execute jobs in parallel.
//
// Default: 2.
Workers int
// The maximum attempts to complete a task. Zero means that the jobs is
// retried forever. The error retry field will take precedence to this
// setting and allow retry beyond the configured maximum.
//
// Default: 0
MaxAttempts int
// The rate at which a worker will request a job from the queue.
//
// Default: 100ms.
Interval time.Duration
// The minimal delay after a failed task is retried.
//
// Default: 1s.
MinDelay time.Duration
// The maximal delay after a failed task is retried.
//
// Default: 10m.
MaxDelay time.Duration
// The exponential increase of the delay after individual attempts.
//
// Default: 2.
DelayFactor float64
// Time after which the context of a job is cancelled and the execution
// should be stopped. Should be several minutes less than timeout to prevent
// race conditions.
//
// Default: 5m.
Lifetime time.Duration
// The time after which a task can be dequeued again in case the worker was
// unable to set its state.
//
// Default: 10m.
Timeout time.Duration
// Set to let the system enqueue a job periodically every given interval.
//
// Default: 0.
Periodicity time.Duration
// The blueprint of the job that is periodically enqueued.
//
// Default: Blueprint{Name: Task.Name}.
PeriodicJob Blueprint
}
func (t *Task) prepare() {
// check job
if t.Job == nil {
panic("axe: missing job")
}
// check handler
if t.Handler == nil {
panic("axe: missing handler")
}
// set default workers
if t.Workers == 0 {
t.Workers = 2
}
// set default interval
if t.Interval == 0 {
t.Interval = 100 * time.Millisecond
}
// set default minimal delay
if t.MinDelay == 0 {
t.MinDelay = time.Second
}
// set default maximal delay
if t.MaxDelay == 0 {
t.MaxDelay = 10 * time.Minute
}
// set default delay factor
if t.DelayFactor < 1 {
t.DelayFactor = 2
}
// set default lifetime
if t.Lifetime == 0 {
t.Lifetime = 5 * time.Minute
}
// set default timeout
if t.Timeout == 0 {
t.Timeout = 10 * time.Minute
}
// check timeout
if t.Lifetime > t.Timeout {
panic("axe: lifetime must be less than timeout")
}
// check periodic job
if t.Periodicity > 0 {
// check existence
if t.PeriodicJob.Job == nil {
panic("axe: missing periodic job")
}
// validate job
err := t.PeriodicJob.Job.Validate()
if err != nil {
panic(err.Error())
}
}
}
func (t *Task) start(queue *Queue) {
// start workers for queue
for i := 0; i < t.Workers; i++ {
queue.tomb.Go(func() error {
return t.worker(queue)
})
}
// run periodic enqueuer if interval is given
if t.Periodicity > 0 {
queue.tomb.Go(func() error {
return t.enqueuer(queue)
})
}
}
func (t *Task) worker(queue *Queue) error {
// get name
name := GetMeta(t.Job).Name
// run forever
for {
// return if queue is closed
if !queue.tomb.Alive() {
return tomb.ErrDying
}
// attempt to get job from queue
id, ok := queue.get(name)
if !ok {
// wait some time before trying again
select {
case <-time.After(t.Interval):
case <-queue.tomb.Dying():
return tomb.ErrDying
}
continue
}
// execute job
err := t.execute(queue, name, id)
if err != nil && queue.options.Reporter != nil {
queue.options.Reporter(err)
}
}
}
func (t *Task) enqueuer(queue *Queue) error {
// get job, delay and isolation
job := t.PeriodicJob.Job
delay := t.PeriodicJob.Delay
isolation := t.PeriodicJob.Isolation
// run forever
for {
// reset id
job.GetBase().DocID = coal.New()
// enqueue task
_, err := queue.Enqueue(nil, job, delay, isolation)
if err != nil && queue.options.Reporter != nil {
// report error
queue.options.Reporter(err)
// wait some time
select {
case <-time.After(time.Second):
case <-queue.tomb.Dying():
return tomb.ErrDying
}
continue
}
// wait for next interval
select {
case <-time.After(t.Periodicity):
case <-queue.tomb.Dying():
return tomb.ErrDying
}
}
}
func (t *Task) execute(queue *Queue, name string, id coal.ID) error {
// create tracer
tracer, outerContext := xo.CreateTracer(context.Background(), "TASK "+name)
defer tracer.End()
// prepare job
job := GetMeta(t.Job).Make()
job.GetBase().DocID = id
// dequeue job
dequeued, attempt, err := Dequeue(outerContext, queue.options.Store, job, t.Timeout)
if err != nil {
return err
}
// return if not dequeued (might be dequeued already by another worker)
if !dequeued {
return nil
}
// get time
start := time.Now()
// add timeout
innerContext, cancel := context.WithTimeout(outerContext, t.Lifetime)
defer cancel()
// prepare context
ctx := &Context{
Context: innerContext,
Job: job,
Attempt: attempt,
Task: t,
Queue: queue,
Tracer: tracer,
}
// call handler
err = xo.Catch(func() error {
tracer.Push("axe/Task.execute")
defer tracer.Pop()
return t.Handler(ctx)
})
// return immediately if lifetime has been reached. another worker might
// already have dequeued the job
if time.Since(start) > t.Lifetime {
return xo.F(`task "%s" ran longer than the specified lifetime`, name)
}
// check error
var anError *Error
if errors.As(err, &anError) {
// check retry
if anError.Retry {
// fail job
delay := stick.Backoff(t.MinDelay, t.MaxDelay, t.DelayFactor, attempt)
err = Fail(outerContext, queue.options.Store, job, anError.Reason, delay)
if err != nil {
return err
}
return nil
}
// cancel job
err = Cancel(outerContext, queue.options.Store, job, anError.Reason)
if err != nil {
return err
}
// call notifier if available
if t.Notifier != nil {
err = t.Notifier(ctx, true, anError.Reason)
if err != nil {
return xo.W(err)
}
}
return nil
}
// handle other errors
if err != nil {
// check attempts
if t.MaxAttempts == 0 || attempt < t.MaxAttempts {
// fail job
delay := stick.Backoff(t.MinDelay, t.MaxDelay, t.DelayFactor, attempt)
_ = Fail(outerContext, queue.options.Store, job, err.Error(), delay)
return err
}
// cancel job
_ = Cancel(outerContext, queue.options.Store, job, err.Error())
// call notifier if available
if t.Notifier != nil {
_ = t.Notifier(ctx, true, err.Error())
}
return err
}
// complete job
err = Complete(outerContext, queue.options.Store, job)
if err != nil {
return err
}
// call notifier if available
if t.Notifier != nil {
err = t.Notifier(ctx, false, "")
if err != nil {
return xo.W(err)
}
}
return nil
}