-
Notifications
You must be signed in to change notification settings - Fork 3.2k
/
throttler.go
145 lines (127 loc) · 3.4 KB
/
throttler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package sync
import (
"container/heap"
"sync"
"time"
)
// Throttler allows the controller to limit number of items it is processing in parallel.
// Items are processed in priority order, and one processing starts, other items (including higher-priority items)
// will be kept pending until the processing is complete.
// Implementations should be idempotent.
type Throttler interface {
Add(key string, priority int32, creationTime time.Time)
// Admin returns if the item should be processed.
Admit(key string) bool
// Remove notifies throttler that item processing is no longer needed
Remove(key string)
}
type throttler struct {
queue func(key string)
inProgress map[string]bool
pending *priorityQueue
lock *sync.Mutex
parallelism int
}
// NewThrottler returns a throttle that only runs `parallelism` items at once. When an item may need processing,
// `queue` is invoked.
func NewThrottler(parallelism int, queue func(key string)) Throttler {
return &throttler{
queue: queue,
inProgress: make(map[string]bool),
lock: &sync.Mutex{},
parallelism: parallelism,
pending: &priorityQueue{itemByKey: make(map[string]*item)},
}
}
func (t *throttler) Add(key string, priority int32, creationTime time.Time) {
t.lock.Lock()
defer t.lock.Unlock()
if t.parallelism == 0 {
return
}
t.pending.add(key, priority, creationTime)
t.queueThrottled()
}
func (t *throttler) Admit(key string) bool {
t.lock.Lock()
defer t.lock.Unlock()
if t.parallelism == 0 || t.inProgress[key] {
return true
}
t.queueThrottled()
return false
}
func (t *throttler) Remove(key string) {
t.lock.Lock()
defer t.lock.Unlock()
delete(t.inProgress, key)
t.pending.remove(key)
t.queueThrottled()
}
func (t *throttler) queueThrottled() {
for t.pending.Len() > 0 && t.parallelism > len(t.inProgress) {
key := t.pending.pop().key
t.inProgress[key] = true
t.queue(key)
}
}
type item struct {
key string
creationTime time.Time
priority int32
index int
}
type priorityQueue struct {
items []*item
itemByKey map[string]*item
}
func (pq *priorityQueue) pop() *item {
return heap.Pop(pq).(*item)
}
func (pq *priorityQueue) peek() *item {
return pq.items[0]
}
func (pq *priorityQueue) add(key string, priority int32, creationTime time.Time) {
if res, ok := pq.itemByKey[key]; ok {
if res.priority != priority {
res.priority = priority
heap.Fix(pq, res.index)
}
} else {
heap.Push(pq, &item{key: key, priority: priority, creationTime: creationTime})
}
}
func (pq *priorityQueue) remove(key string) {
if item, ok := pq.itemByKey[key]; ok {
heap.Remove(pq, item.index)
delete(pq.itemByKey, key)
}
}
func (pq priorityQueue) Len() int { return len(pq.items) }
func (pq priorityQueue) Less(i, j int) bool {
if pq.items[i].priority == pq.items[j].priority {
return pq.items[i].creationTime.Before(pq.items[j].creationTime)
}
return pq.items[i].priority > pq.items[j].priority
}
func (pq priorityQueue) Swap(i, j int) {
pq.items[i], pq.items[j] = pq.items[j], pq.items[i]
pq.items[i].index = i
pq.items[j].index = j
}
func (pq *priorityQueue) Push(x interface{}) {
n := len(pq.items)
item := x.(*item)
item.index = n
pq.items = append(pq.items, item)
pq.itemByKey[item.key] = item
}
func (pq *priorityQueue) Pop() interface{} {
old := pq.items
n := len(old)
item := old[n-1]
item.index = -1
pq.items = old[0 : n-1]
delete(pq.itemByKey, item.key)
return item
}