forked from openshift/origin
-
Notifications
You must be signed in to change notification settings - Fork 1
/
scheduler.go
183 lines (160 loc) · 5.37 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
package controller
import (
"sync"
utilwait "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/flowcontrol"
)
// NOTE: scheduler's semantics do not lend it for reuse elsewhere and its use in
// this package quite probably has some odd corner cases/race conditions. If
// these cause problems in the future, this implementation should be replaced
// with a new and simpler one based on container/heap. End users looking for a
// component like this: see if k8s.io/client-go/util/workqueue.NewDelayingQueue
// suits your needs.
// scheduler is a self-balancing, rate-limited, bucketed queue that can periodically invoke
// an action on all items in a bucket before moving to the next bucket. A ratelimiter sets
// an upper bound on the number of buckets processed per unit time. The queue has a key and a
// value, so both uniqueness and equality can be tested (key must be unique, value can carry
// info for the next processing). Items remain in the queue until removed by a call to Remove().
type scheduler struct {
handle func(key, value interface{})
position int
limiter flowcontrol.RateLimiter
mu sync.Mutex
buckets []bucket
}
type bucket map[interface{}]interface{}
// newScheduler creates a scheduler with bucketCount buckets, a rate limiter for restricting
// the rate at which buckets are processed, and a function to invoke when items are scanned in
// a bucket.
// TODO: remove DEBUG statements from this file once this logic has been adequately validated.
func newScheduler(bucketCount int, bucketLimiter flowcontrol.RateLimiter, fn func(key, value interface{})) *scheduler {
// Add one more bucket to serve as the "current" bucket
bucketCount++
buckets := make([]bucket, bucketCount)
for i := range buckets {
buckets[i] = make(bucket)
}
return &scheduler{
handle: fn,
buckets: buckets,
limiter: bucketLimiter,
}
}
// RunUntil launches the scheduler until ch is closed.
func (s *scheduler) RunUntil(ch <-chan struct{}) {
go utilwait.Until(s.RunOnce, 0, ch)
}
// RunOnce takes a single item out of the current bucket and processes it. If
// the bucket is empty, we wait for the rate limiter before returning.
func (s *scheduler) RunOnce() {
key, value, last := s.next()
if last {
s.limiter.Accept()
return
}
s.handle(key, value)
}
// at returns the bucket index relative to the current bucket.
func (s *scheduler) at(inc int) int {
return (s.position + inc + len(s.buckets)) % len(s.buckets)
}
// next takes a key from the current bucket and places it in the last bucket, returns the
// removed key. Returns true if the current bucket is empty and no key and value were returned.
func (s *scheduler) next() (interface{}, interface{}, bool) {
s.mu.Lock()
defer s.mu.Unlock()
last := s.buckets[s.position]
// Grab the first item in the bucket, move it to the end and return it.
for k, v := range last {
delete(last, k)
s.buckets[s.at(-1)][k] = v
return k, v, false
}
// The bucket was empty. Advance to the next bucket.
s.position = s.at(1)
return nil, nil, true
}
// Add places the key in the bucket with the least entries (except the current bucket). The key is used to
// determine uniqueness, while value can be used to associate additional data for later retrieval. An Add
// removes the previous key and value and will place the item in a new bucket. This allows callers to ensure
// that Add'ing a new item to the queue purges old versions of the item, while Remove can be conditional on
// removing only the known old version.
func (s *scheduler) Add(key, value interface{}) {
s.mu.Lock()
defer s.mu.Unlock()
for _, bucket := range s.buckets {
delete(bucket, key)
}
// Pick the bucket with the least entries that is furthest from the current position
n := len(s.buckets)
base := s.position + n
target, least := 0, 0
for i := n - 1; i > 0; i-- {
position := (base + i) % n
size := len(s.buckets[position])
if size == 0 {
target = position
break
}
if size < least || least == 0 {
target = position
least = size
}
}
s.buckets[target][key] = value
}
// Remove takes the key out of all buckets. If value is non-nil, the key will only be removed if it has
// the same value. Returns true if the key was removed.
func (s *scheduler) Remove(key, value interface{}) bool {
s.mu.Lock()
defer s.mu.Unlock()
match := true
for _, bucket := range s.buckets {
if value != nil {
if old, ok := bucket[key]; ok && old != value {
match = false
continue
}
}
delete(bucket, key)
}
return match
}
// Delay moves the key to the end of the chain if it exists.
func (s *scheduler) Delay(key interface{}) {
s.mu.Lock()
defer s.mu.Unlock()
last := s.at(-1)
for i, bucket := range s.buckets {
if i == last {
continue
}
if value, ok := bucket[key]; ok {
delete(bucket, key)
s.buckets[last][key] = value
}
}
}
// Len returns the number of scheduled items.
func (s *scheduler) Len() int {
s.mu.Lock()
defer s.mu.Unlock()
count := 0
for _, bucket := range s.buckets {
count += len(bucket)
}
return count
}
// Map returns a copy of the scheduler contents, but does not copy the keys or values themselves.
// If values and keys are not immutable, changing the value will affect the value in the queue.
func (s *scheduler) Map() map[interface{}]interface{} {
s.mu.Lock()
defer s.mu.Unlock()
out := make(map[interface{}]interface{})
for _, bucket := range s.buckets {
for k, v := range bucket {
out[k] = v
}
}
return out
}