-
Notifications
You must be signed in to change notification settings - Fork 151
/
rqueue.go
330 lines (296 loc) · 9 KB
/
rqueue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
package rqueue
import (
"container/heap"
"sync"
"time"
"go.uber.org/atomic"
"github.com/gohornet/hornet/pkg/model/hornet"
"github.com/gohornet/hornet/pkg/model/milestone"
)
// Queue implements a queue which contains requests for needed data.
type Queue interface {
// Next returns the next request to send, pops it from the queue and marks it as pending.
Next() *Request
// Peek returns the next request to send without popping it from the queue.
Peek() *Request
// Enqueue enqueues the given request if it isn't already queued or pending.
Enqueue(*Request) (enqueued bool)
// IsQueued tells whether a given request for the given transaction hash is queued.
IsQueued(hash hornet.Hash) bool
// IsPending tells whether a given request was popped from the queue and is now pending.
IsPending(hash hornet.Hash) bool
// IsProcessing tells whether a given request was popped from the queue, received and is now processing.
IsProcessing(hash hornet.Hash) bool
// Received marks a request as received and thereby removes it from the pending set.
// It is added to the processing set.
// Returns the origin request which was pending or nil if the hash was not requested.
Received(hash hornet.Hash) *Request
// Processed marks a request as fulfilled and thereby removes it from the processing set.
// Returns the origin request which was pending or nil if the hash was not requested.
Processed(hash hornet.Hash) *Request
// EnqueuePending enqueues all pending requests back into the queue.
// It also discards requests in the pending set of which their enqueue time is over the given delta threshold.
// If discardOlderThan is zero, no requests are discarded.
EnqueuePending(discardOlderThan time.Duration) (queued int)
// Size returns the size of currently queued, requested/pending and processing requests.
Size() (queued int, pending int, processing int)
// Empty tells whether the queue has no queued and pending requests.
Empty() bool
// Requests returns a snapshot of all queued, pending and processing requests in the queue.
Requests() (queued []*Request, pending []*Request, processing []*Request)
// AvgLatency returns the average latency of enqueueing and then receiving a request.
AvgLatency() int64
// Filter adds the given filter function to the queue. Passing nil resets the current one.
// Setting a filter automatically clears all queued and pending requests which do not fulfill
// the filter criteria.
Filter(f FilterFunc)
}
// FilterFunc is a function which determines whether a request should be enqueued or not.
type FilterFunc func(r *Request) bool
const DefaultLatencyResolution = 100
// New creates a new Queue where request are prioritized over their milestone index (lower = higher priority).
func New(latencyResolution ...int32) Queue {
q := &priorityqueue{
queue: make([]*Request, 0),
queued: make(map[string]*Request),
pending: make(map[string]*Request),
processing: make(map[string]*Request),
}
if len(latencyResolution) == 0 {
q.latencyResolution = DefaultLatencyResolution
}
heap.Init(q)
return q
}
// Request is a request for a particular transaction.
type Request struct {
// The hash of the transaction to request.
Hash hornet.Hash
// The milestone index under which this request is linked.
MilestoneIndex milestone.Index
// internal to the priority queue
index int
// Tells the request queue to not remove this request if the enqueue time is
// over the given threshold.
PreventDiscard bool
// the time at which this request was first enqueued.
// do not modify this time
EnqueueTime time.Time
}
// implements a priority queue where requests with the lowest milestone index are popped first.
type priorityqueue struct {
// must be first field for 64-bit alignment.
// otherwise it crashes under 32-bit ARM systems
// see: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
avgLatency atomic.Int64
queue []*Request
queued map[string]*Request
pending map[string]*Request
processing map[string]*Request
latencyResolution int64
latencySum int64
latencyEntries int64
filter FilterFunc
sync.RWMutex
}
func (pq *priorityqueue) Next() (r *Request) {
pq.Lock()
defer pq.Unlock()
// Pop() doesn't gracefully handle empty queues, so we check it ourselves
if len(pq.queued) == 0 {
return nil
}
return heap.Pop(pq).(*Request)
}
func (pq *priorityqueue) Enqueue(r *Request) bool {
pq.Lock()
defer pq.Unlock()
if _, queued := pq.queued[string(r.Hash)]; queued {
return false
}
if _, pending := pq.pending[string(r.Hash)]; pending {
return false
}
if _, processing := pq.processing[string(r.Hash)]; processing {
return false
}
if pq.filter != nil && !pq.filter(r) {
return false
}
r.EnqueueTime = time.Now()
heap.Push(pq, r)
return true
}
func (pq *priorityqueue) IsQueued(hash hornet.Hash) bool {
pq.RLock()
_, k := pq.queued[string(hash)]
pq.RUnlock()
return k
}
func (pq *priorityqueue) IsPending(hash hornet.Hash) bool {
pq.RLock()
_, k := pq.pending[string(hash)]
pq.RUnlock()
return k
}
func (pq *priorityqueue) IsProcessing(hash hornet.Hash) bool {
pq.RLock()
_, k := pq.processing[string(hash)]
pq.RUnlock()
return k
}
func (pq *priorityqueue) Received(hash hornet.Hash) *Request {
pq.Lock()
defer pq.Unlock()
if req, wasPending := pq.pending[string(hash)]; wasPending {
pq.latencySum += time.Since(req.EnqueueTime).Milliseconds()
pq.latencyEntries++
if pq.latencyEntries == pq.latencyResolution {
pq.avgLatency.Store(pq.latencySum / pq.latencyResolution)
pq.latencySum = 0
pq.latencyEntries = 0
}
delete(pq.pending, string(hash))
if len(pq.pending) == 0 {
pq.latencySum = 0
pq.avgLatency.Store(0)
}
// add the request to processing
pq.processing[string(hash)] = req
return req
}
// check if the request is in the queue (was enqueued again after request)
return pq.queued[string(hash)]
}
func (pq *priorityqueue) Processed(hash hornet.Hash) *Request {
pq.Lock()
req, wasProcessing := pq.processing[string(hash)]
if wasProcessing {
delete(pq.processing, string(hash))
}
pq.Unlock()
return req
}
func (pq *priorityqueue) EnqueuePending(discardOlderThan time.Duration) int {
pq.Lock()
defer pq.Unlock()
if len(pq.queued) != 0 {
return len(pq.queued)
}
enqueued := len(pq.pending)
s := time.Now()
for k, v := range pq.pending {
if pq.filter != nil && !pq.filter(v) {
delete(pq.pending, k)
enqueued--
continue
}
if discardOlderThan == 0 || v.PreventDiscard || s.Sub(v.EnqueueTime) < discardOlderThan {
// no need to examine the queued set
// as addition and removal are synced over Push and Pops
heap.Push(pq, v)
continue
}
// discard request from the queue
delete(pq.pending, k)
enqueued--
}
return enqueued
}
func (pq *priorityqueue) Size() (int, int, int) {
pq.RLock()
x := len(pq.queued)
y := len(pq.pending)
z := len(pq.processing)
pq.RUnlock()
return x, y, z
}
func (pq *priorityqueue) Empty() bool {
pq.RLock()
empty := len(pq.queued) == 0 && len(pq.pending) == 0 && len(pq.processing) == 0
pq.RUnlock()
return empty
}
func (pq *priorityqueue) AvgLatency() int64 {
return pq.avgLatency.Load()
}
func (pq *priorityqueue) Requests() (queued []*Request, pending []*Request, processing []*Request) {
pq.Lock()
defer pq.Unlock()
queued = make([]*Request, len(pq.queue))
var i int
for _, v := range pq.queued {
queued[i] = v
i++
}
pending = make([]*Request, len(pq.pending))
var j int
for _, v := range pq.pending {
pending[j] = v
j++
}
processing = make([]*Request, len(pq.processing))
var k int
for _, v := range pq.processing {
processing[k] = v
k++
}
return queued, pending, processing
}
func (pq *priorityqueue) Filter(f FilterFunc) {
pq.Lock()
defer pq.Unlock()
if f != nil {
filteredQueue := make([]*Request, 0)
for _, r := range pq.queue {
if !f(r) {
delete(pq.queued, string(r.Hash))
continue
}
filteredQueue = append(filteredQueue, r)
}
pq.queue = filteredQueue
for k, v := range pq.pending {
if !f(v) {
delete(pq.pending, k)
}
}
}
pq.filter = f
}
func (pq *priorityqueue) Len() int { return len(pq.queue) }
func (pq *priorityqueue) Less(i, j int) bool {
// requests for older milestones (lower number) have priority
return pq.queue[i].MilestoneIndex < pq.queue[j].MilestoneIndex
}
func (pq *priorityqueue) Swap(i, j int) {
pq.queue[i], pq.queue[j] = pq.queue[j], pq.queue[i]
pq.queue[i].index = i
pq.queue[j].index = j
}
func (pq *priorityqueue) Push(x interface{}) {
r := x.(*Request)
pq.queue = append(pq.queue, r)
// mark as queued and remove from pending
delete(pq.pending, string(r.Hash))
pq.queued[string(r.Hash)] = r
}
func (pq *priorityqueue) Pop() interface{} {
old := pq.queue
n := len(pq.queue)
r := old[n-1]
old[n-1] = nil // avoid memory leak
pq.queue = old[0 : n-1]
// mark as pending and remove from queued
delete(pq.queued, string(r.Hash))
pq.pending[string(r.Hash)] = r
return r
}
func (pq *priorityqueue) Peek() *Request {
pq.RWMutex.Lock()
defer pq.RWMutex.Unlock()
if len(pq.queue) == 0 {
return nil
}
return pq.queue[len(pq.queue)-1]
}