forked from argoproj/argo-workflows
-
Notifications
You must be signed in to change notification settings - Fork 0
/
throttler.go
153 lines (130 loc) · 3.67 KB
/
throttler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package controller
import (
"container/heap"
"sync"
"time"
"k8s.io/client-go/util/workqueue"
)
// Throttler allows CRD controller to limit number of items it is processing in parallel.
type Throttler interface {
Add(key interface{}, priority int32, creationTime time.Time)
// Next returns true if item should be processed by controller now or return false.
Next(key interface{}) (interface{}, bool)
// Remove notifies throttler that item processing is done. In responses the throttler triggers processing of previously throttled items.
Remove(key interface{})
// SetParallelism update throttler parallelism limit.
SetParallelism(parallelism int)
}
type throttler struct {
queue workqueue.RateLimitingInterface
inProgress map[interface{}]bool
pending *priorityQueue
lock *sync.Mutex
parallelism int
}
func NewThrottler(parallelism int, queue workqueue.RateLimitingInterface) Throttler {
return &throttler{
queue: queue,
inProgress: make(map[interface{}]bool),
lock: &sync.Mutex{},
parallelism: parallelism,
pending: &priorityQueue{itemByKey: make(map[interface{}]*item)},
}
}
func (t *throttler) SetParallelism(parallelism int) {
t.lock.Lock()
defer t.lock.Unlock()
if t.parallelism != parallelism {
t.parallelism = parallelism
t.queueThrottled()
}
}
func (t *throttler) Add(key interface{}, priority int32, creationTime time.Time) {
t.lock.Lock()
defer t.lock.Unlock()
t.pending.add(key, priority, creationTime)
}
func (t *throttler) Next(key interface{}) (interface{}, bool) {
t.lock.Lock()
defer t.lock.Unlock()
if _, isInProgress := t.inProgress[key]; isInProgress || t.pending.Len() == 0 {
return key, true
}
if t.parallelism < 1 || t.parallelism > len(t.inProgress) {
next := t.pending.pop()
t.inProgress[next.key] = true
return next.key, true
}
return key, false
}
func (t *throttler) Remove(key interface{}) {
t.lock.Lock()
defer t.lock.Unlock()
delete(t.inProgress, key)
t.pending.remove(key)
t.queueThrottled()
}
func (t *throttler) queueThrottled() {
for t.pending.Len() > 0 && (t.parallelism < 1 || t.parallelism > len(t.inProgress)) {
next := t.pending.pop()
t.inProgress[next.key] = true
t.queue.Add(next.key)
}
}
type item struct {
key interface{}
creationTime time.Time
priority int32
index int
}
type priorityQueue struct {
items []*item
itemByKey map[interface{}]*item
}
func (pq *priorityQueue) pop() *item {
return heap.Pop(pq).(*item)
}
func (pq *priorityQueue) add(key interface{}, priority int32, creationTime time.Time) {
if res, ok := pq.itemByKey[key]; ok {
if res.priority != priority {
res.priority = priority
heap.Fix(pq, res.index)
}
} else {
heap.Push(pq, &item{key: key, priority: priority, creationTime: creationTime})
}
}
func (pq *priorityQueue) remove(key interface{}) {
if item, ok := pq.itemByKey[key]; ok {
heap.Remove(pq, item.index)
delete(pq.itemByKey, key)
}
}
func (pq priorityQueue) Len() int { return len(pq.items) }
func (pq priorityQueue) Less(i, j int) bool {
if pq.items[i].priority == pq.items[j].priority {
return pq.items[i].creationTime.Before(pq.items[j].creationTime)
}
return pq.items[i].priority > pq.items[j].priority
}
func (pq priorityQueue) Swap(i, j int) {
pq.items[i], pq.items[j] = pq.items[j], pq.items[i]
pq.items[i].index = i
pq.items[j].index = j
}
func (pq *priorityQueue) Push(x interface{}) {
n := len(pq.items)
item := x.(*item)
item.index = n
pq.items = append(pq.items, item)
pq.itemByKey[item.key] = item
}
func (pq *priorityQueue) Pop() interface{} {
old := pq.items
n := len(old)
item := old[n-1]
item.index = -1
pq.items = old[0 : n-1]
delete(pq.itemByKey, item.key)
return item
}