-
Notifications
You must be signed in to change notification settings - Fork 56
/
stripe_executor.go
93 lines (83 loc) · 2.53 KB
/
stripe_executor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package invocation
import (
"math/rand"
"runtime"
"sync"
)
var (
// Default values differ from java impl. Also queue size is calculated differently.
// Java Client: queueSize per worker = defaultEventQueueCapacity / defaultEventWorkerCount
// Go Client: queueSize per worker = defaultEventQueueCapacity
defaultEventQueueCapacity = 10000
defaultEventWorkerCount = runtime.NumCPU()
)
// executor represents the function that will run on workers of stripeExecutor.
type executor func(queue chan func(), quit chan struct{}, wg *sync.WaitGroup)
// stripeExecutor executes given "tasks" preserving the order among the ones that are given with the same key.
type stripeExecutor struct {
quit chan struct{}
execFn executor
taskQueues []chan func()
queueCount int
wg sync.WaitGroup
}
// newStripeExecutor returns a new stripeExecutor with default configuration.
func newStripeExecutor() *stripeExecutor {
return newStripeExecutorWithConfig(defaultEventWorkerCount, defaultEventQueueCapacity)
}
// newStripeExecutor returns a new stripeExecutor with configured queueCount and queueSize. If parameters are not greater than zero, it panics.
func newStripeExecutorWithConfig(queueCount, queueSize int) *stripeExecutor {
if queueCount <= 0 {
panic("queueCount must be greater than 0")
}
if queueSize <= 0 {
panic("queueSize must be greater than 0")
}
se := stripeExecutor{
taskQueues: make([]chan func(), queueCount),
queueCount: queueCount,
}
for i := range se.taskQueues {
se.taskQueues[i] = make(chan func(), queueSize)
}
se.quit = make(chan struct{})
se.execFn = defaultExecFn
return &se
}
// start fires up the workers for each queue.
func (se *stripeExecutor) start() {
se.wg.Add(se.queueCount)
for i := range se.taskQueues {
go se.execFn(se.taskQueues[i], se.quit, &se.wg)
}
}
// dispatch sends the handler "task" to one of the appropriate taskQueues, "tasks" with the same key end up on the same queue. Returns false if queue is full and could not dispatch.
func (se *stripeExecutor) dispatch(key int, task func()) bool {
if key < 0 {
// dispatch random.
key = rand.Intn(se.queueCount)
}
select {
case se.taskQueues[key%se.queueCount] <- task:
default:
// do not block if queue is full.
return false
}
return true
}
// stop blocks until all workers are stopped.
func (se *stripeExecutor) stop() {
close(se.quit)
se.wg.Wait()
}
func defaultExecFn(queue chan func(), quit chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case task := <-queue:
task()
case <-quit:
return
}
}
}