-
Notifications
You must be signed in to change notification settings - Fork 781
/
scheduling_queue.go
173 lines (148 loc) · 3.82 KB
/
scheduling_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
package ruler
import (
"container/heap"
"time"
"github.com/jonboulle/clockwork"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
itemEvaluationLatency = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "scheduling_queue_item_latency",
Help: "Difference between the items scheduled evaluation time and actual evaluation time",
Buckets: prometheus.DefBuckets,
})
)
// ScheduledItem is an item in a queue of scheduled items.
type ScheduledItem interface {
Key() string
// Scheduled returns the earliest possible time the time is available for
// dequeueing.
Scheduled() time.Time
}
type queueState struct {
items []ScheduledItem
hit map[string]int
}
// Less implements heap.Interface
func (q queueState) Less(i, j int) bool {
return q.items[i].Scheduled().Before(q.items[j].Scheduled())
}
// Pop implements heap.Interface
func (q *queueState) Pop() interface{} {
old := q.items
n := len(old)
x := old[n-1]
delete(q.hit, x.Key())
q.items = old[0 : n-1]
return x
}
// Push implements heap.Interface
func (q *queueState) Push(x interface{}) {
n := len(q.items)
y := x.(ScheduledItem)
q.hit[y.Key()] = n
q.items = append(q.items, y)
}
func (q *queueState) Swap(i, j int) {
q.hit[q.items[i].Key()] = j
q.hit[q.items[j].Key()] = i
q.items[i], q.items[j] = q.items[j], q.items[i]
}
func (q *queueState) Enqueue(op ScheduledItem) {
key := op.Key()
i, enqueued := q.hit[key]
if enqueued {
q.items[i] = op
heap.Fix(q, i)
} else {
heap.Push(q, op)
}
}
func (q *queueState) Dequeue() ScheduledItem {
item := heap.Pop(q).(ScheduledItem)
itemEvaluationLatency.Observe(time.Now().Sub(item.Scheduled()).Seconds())
return item
}
func (q *queueState) Front() ScheduledItem {
return q.items[0]
}
func (q *queueState) Len() int {
return len(q.items)
}
// SchedulingQueue is like a priority queue, but the first item is the oldest
// scheduled item. Items are only able to be dequeued after the time they are
// scheduled to be run.
type SchedulingQueue struct {
clock clockwork.Clock
add, next chan ScheduledItem
}
// NewSchedulingQueue makes a new scheduling queue.
func NewSchedulingQueue(clock clockwork.Clock) *SchedulingQueue {
sq := &SchedulingQueue{
clock: clock,
add: make(chan ScheduledItem),
next: make(chan ScheduledItem),
}
go sq.run()
return sq
}
// Close the scheduling queue. No more items can be added. Items can be
// dequeued until there are none left.
func (sq *SchedulingQueue) Close() {
close(sq.add)
}
func (sq *SchedulingQueue) run() {
q := queueState{
items: []ScheduledItem{},
hit: map[string]int{},
}
for {
// Nothing on the queue? Wait for something to be added.
if q.Len() == 0 {
next, open := <-sq.add
// If sq.add is closed (and there is nothing on the queue),
// we can close sq.next and stop this goroutine
if !open {
close(sq.next)
return
}
q.Enqueue(next)
continue
}
next := q.Front()
delay := next.Scheduled().Sub(sq.clock.Now())
// Item on the queue that is ready now?
if delay <= 0 {
select {
case sq.next <- next:
q.Dequeue()
case justAdded, open := <-sq.add:
if open {
q.Enqueue(justAdded)
}
}
continue
}
// Item on the queue that needs waiting for?
// Wait on a timer _or_ for something to be added.
select {
case <-sq.clock.After(delay):
case justAdded, open := <-sq.add:
if open {
q.Enqueue(justAdded)
}
}
}
}
// Enqueue schedules an item for later Dequeueing.
func (sq *SchedulingQueue) Enqueue(item ScheduledItem) {
sq.add <- item
}
// Dequeue takes an item from the queue.
// If there are no items, or the first item isn't ready to be scheduled, it
// blocks. If there queue is closed, this will return nil.
func (sq *SchedulingQueue) Dequeue() ScheduledItem {
return <-sq.next
}