-
Notifications
You must be signed in to change notification settings - Fork 0
/
queue.go
154 lines (128 loc) · 3.7 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package deadlinequeue
import (
"container/heap"
"sync"
"time"
)
// DeadlineQueue defines the interface of a deadline queue implementation.
// Items with a deadline can be enqueued, and when the deadline expires,
// dequeue operation will return back the item.
type DeadlineQueue interface {
// Enqueue is used to enqueue a queue item with a deadline
Enqueue(qi QueueItem, deadline time.Time)
// Dequeue is a blocking call to wait for the next queue item
// whose deadline expires.
Dequeue(stopChan <-chan struct{}) QueueItem
}
// NewDeadlineQueue returns a deadline queue object.
func NewDeadlineQueue(mtx *QueueMetrics) DeadlineQueue {
q := &deadlineQueue{
pq: &priorityQueue{},
queueChanged: make(chan struct{}, 1),
mtx: mtx,
}
heap.Init(q.pq)
return q
}
// deadlineQueue implemments the DeadlineQueue interface.
type deadlineQueue struct {
sync.RWMutex // mutex to access objects in the deadline queue
pq *priorityQueue // a priority queue
queueChanged chan struct{} // channel to indicate queue has changed
mtx *QueueMetrics // track queue metrics
}
func (q *deadlineQueue) nextDeadline() time.Time {
if q.pq.Len() == 0 {
return time.Time{}
}
return q.pq.NextDeadline()
}
func (q *deadlineQueue) popIfReady() QueueItem {
if q.pq.Len() == 0 {
return nil
}
qi := heap.Pop(q.pq).(QueueItem)
q.mtx.queuePopDelay.Record(time.Since(qi.Deadline()))
qi.SetDeadline(time.Time{})
q.mtx.queueLength.Update(float64(q.pq.Len()))
return qi
}
func (q *deadlineQueue) update(item QueueItem) {
// Check if it's not in the queue.
if item.Index() == -1 {
if item.Deadline().IsZero() {
// Should not be scheduled.
return
}
heap.Push(q.pq, item)
q.mtx.queueLength.Update(float64(q.pq.Len()))
return
}
// It's in the queue. Remove if it should not be scheduled.
if item.Deadline().IsZero() {
heap.Remove(q.pq, item.Index())
q.mtx.queueLength.Update(float64(q.pq.Len()))
return
}
heap.Fix(q.pq, item.Index())
}
// Enqueue will be used to enqueue a queue item into a deadline queue
func (q *deadlineQueue) Enqueue(qi QueueItem, deadline time.Time) {
q.Lock()
defer q.Unlock()
// Override only if deadline is earlier.
if !qi.Deadline().IsZero() && !deadline.Before(qi.Deadline()) {
return
}
qi.SetDeadline(deadline)
q.update(qi)
select {
case q.queueChanged <- struct{}{}:
default:
}
}
// Dequeue will be used to dequeue the next queue item whose deadline expires.
// Currently this implementation works only when there is one thread which
// is dequeueing, though multiple threads can enqueue.
func (q *deadlineQueue) Dequeue(stopChan <-chan struct{}) QueueItem {
for {
q.RLock()
deadline := q.nextDeadline()
q.RUnlock()
var timer *time.Timer
var timerChan <-chan time.Time
if !deadline.IsZero() {
timer = time.NewTimer(time.Until(deadline))
timerChan = timer.C
}
select {
case <-timerChan:
q.Lock()
r := q.popIfReady()
q.Unlock()
if r != nil {
return r
}
case <-q.queueChanged:
// Wake up to process the next item in the queue
case <-stopChan:
return nil
}
if timer != nil {
timer.Stop()
}
}
}