forked from vgarvardt/gue
-
Notifications
You must be signed in to change notification settings - Fork 0
/
worker_option.go
107 lines (91 loc) · 3.05 KB
/
worker_option.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package guex
import (
"context"
"time"
"go.uber.org/zap"
)
// WorkerPoolOption defines a type that allows to set worker pool properties during the build-time.
type WorkerPoolOption func(pool *WorkerPool)
// WithWorkerPoolQueue overrides default worker queue name with the given value.
func WithWorkerPoolQueue(queue ...QueueLimit) WorkerPoolOption {
return func(w *WorkerPool) {
w.queue = append(w.queue, queue...)
}
}
// WithWorkerPanicStackBufSize sets max size for the stacktrace buffer for panicking jobs.
// Default value is 1024 that is enough for most of the cases. Be careful setting buffer suze to the big values
// as this may affect overall performance.
func WithWorkerPanicStackBufSize(size int) WorkerPoolOption {
return func(w *WorkerPool) {
w.panicStackBufSize = size
}
}
func WithWorkerPoolHandler(jobType string, h WorkFunc) WorkerPoolOption {
return func(w *WorkerPool) {
w.wm[jobType] = h
}
}
func WithLogger(l *zap.Logger) WorkerPoolOption {
return func(w *WorkerPool) {
w.logger = l
}
}
func WithWorkerPanicWorkerMap(workMap WorkMap) WorkerPoolOption {
return func(w *WorkerPool) {
w.wm = workMap
}
}
// WithPoolInterval overrides default poll interval with the given value.
// Poll interval is the "sleep" duration if there were no jobs found in the DB.
func WithPoolInterval(d time.Duration) WorkerPoolOption {
return func(w *WorkerPool) {
w.interval = d
}
}
// WithPoolID sets worker pool ID for easier identification in logs
func WithPoolID(id string) WorkerPoolOption {
return func(w *WorkerPool) {
w.id = id
}
}
// WithPoolHooksUnknownJobType calls WithWorkerHooksUnknownJobType for every worker in the pool.
func WithPoolHooksUnknownJobType(hooks ...HookFunc) WorkerPoolOption {
return func(w *WorkerPool) {
w.hooksUnknownJobType = hooks
}
}
// WithPoolHooksJobDone calls WithWorkerHooksJobDone for every worker in the pool.
func WithPoolHooksJobDone(hooks ...HookFunc) WorkerPoolOption {
return func(w *WorkerPool) {
w.hooksJobDone = hooks
}
}
// WithPoolGracefulShutdown enables graceful shutdown mode for all workers in the pool.
// See WithWorkerGracefulShutdown for details.
func WithPoolGracefulShutdown(handlerCtx func() context.Context) WorkerPoolOption {
return func(w *WorkerPool) {
w.graceful = true
w.gracefulCtx = handlerCtx
}
}
// WithPoolPanicStackBufSize sets max size for the stacktrace buffer for panicking jobs.
// Default value is 1024 that is enough for most of the cases. Be careful setting buffer suze to the big values
// as this may affect overall performance.
func WithPoolPanicStackBufSize(size int) WorkerPoolOption {
return func(w *WorkerPool) {
w.panicStackBufSize = size
}
}
func WithPoolQueueRestore(restoreAfter, interval time.Duration) WorkerPoolOption {
return func(w *WorkerPool) {
w.queueRestoreAfter = restoreAfter
w.queueRestoreInterval = interval
}
}
// WithBackoff sets backoff implementation that will be applied to errored jobs
// within current client session.
func WithBackoff(backoff Backoff) WorkerPoolOption {
return func(c *WorkerPool) {
c.client.backoff = backoff
}
}