-
Notifications
You must be signed in to change notification settings - Fork 1
/
node_options.go
101 lines (88 loc) · 2.67 KB
/
node_options.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package pool
import (
"time"
"goa.design/pulse/pulse"
)
type (
// NodeOption is a worker creation option.
NodeOption func(*nodeOptions)
nodeOptions struct {
workerTTL time.Duration
pendingJobTTL time.Duration
workerShutdownTTL time.Duration
maxQueuedJobs int
clientOnly bool
jobSinkBlockDuration time.Duration
logger pulse.Logger
}
)
// WithWorkerTTL sets the duration after which the worker is removed from the pool in
// case of network partitioning. The default is 10s. A lower number causes more
// frequent keep-alive updates from all workers.
func WithWorkerTTL(ttl time.Duration) NodeOption {
return func(o *nodeOptions) {
o.workerTTL = ttl
}
}
// WithPendingJobTTL sets the duration after which a job is made available to
// other workers if it wasn't started. The default is 20s.
func WithPendingJobTTL(ttl time.Duration) NodeOption {
return func(o *nodeOptions) {
o.pendingJobTTL = ttl
}
}
// WithWorkerShutdownTTL sets the maximum time to wait for workers to
// shutdown. The default is 2 minutes.
func WithWorkerShutdownTTL(ttl time.Duration) NodeOption {
return func(o *nodeOptions) {
o.workerShutdownTTL = ttl
}
}
// WithMaxQueuedJobs sets the maximum number of jobs that can be queued in the pool.
// The default is 1000.
func WithMaxQueuedJobs(max int) NodeOption {
return func(o *nodeOptions) {
o.maxQueuedJobs = max
}
}
// WithClientOnly sets the pool to be client only. A client-only pool only
// supports dispatching jobs to workers and does not start background goroutines
// to route jobs.
func WithClientOnly() NodeOption {
return func(o *nodeOptions) {
o.clientOnly = true
}
}
// WithJobSinkBlockDuration sets the duration to block when reading from the
// job stream. The default is 5s. This option is mostly useful for testing.
func WithJobSinkBlockDuration(d time.Duration) NodeOption {
return func(o *nodeOptions) {
o.jobSinkBlockDuration = d
}
}
// WithLogger sets the handler used to report temporary errors.
func WithLogger(logger pulse.Logger) NodeOption {
return func(o *nodeOptions) {
o.logger = logger
}
}
// parseOptions parses the given options and returns the corresponding
// options.
func parseOptions(opts ...NodeOption) *nodeOptions {
o := defaultPoolOptions()
for _, opt := range opts {
opt(o)
}
return o
}
// defaultPoolOptions returns the default options.
func defaultPoolOptions() *nodeOptions {
return &nodeOptions{
workerTTL: 10 * time.Second,
pendingJobTTL: 20 * time.Second,
workerShutdownTTL: 2 * time.Minute,
maxQueuedJobs: 1000,
jobSinkBlockDuration: 5 * time.Second,
logger: pulse.NoopLogger(),
}
}