/
options.go
52 lines (45 loc) · 1.26 KB
/
options.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package pool
// Option func type
type Option func(p *Workers)
// ChunkFn functional option defines chunk func distributing records to particular workers.
// The function should return key string identifying the record.
// Record with a given key string guaranteed to be processed by the same worker.
func ChunkFn(chunkFn func(val interface{}) string) Option {
return func(p *Workers) {
p.chunkFn = chunkFn
}
}
// ResChanSize sets size of response's channel buffer
func ResChanSize(size int) Option {
return func(p *Workers) {
if size >= 0 {
p.resChanSize = size
}
}
}
// WorkerChanSize sets size of worker channel(s)
func WorkerChanSize(size int) Option {
return func(p *Workers) {
if size >= 0 {
p.workerChanSize = size
}
}
}
// Batch sets batch size to collect incoming records in a buffer before sending to workers
func Batch(size int) Option {
return func(p *Workers) {
if size >= 1 {
p.batchSize = size
}
}
}
// ContinueOnError change default early termination on the first error and continue after error
func ContinueOnError(p *Workers) {
p.continueOnError = true
}
// OnCompletion set function called on completion for each worker id
func OnCompletion(completeFn CompleteFn) Option {
return func(p *Workers) {
p.completeFn = completeFn
}
}