-
Notifications
You must be signed in to change notification settings - Fork 85
/
queue.go
65 lines (53 loc) · 1.4 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package controller
import (
"sync"
"time"
)
// Queue is a thread-safe, insertion-ordered set of nodes.
//
// Queue is intended for tracking nodes that have been updated for later reevaluation.
type Queue struct {
mut sync.Mutex
queuedSet map[*QueuedNode]struct{}
queuedOrder []*QueuedNode
updateCh chan struct{}
}
type QueuedNode struct {
Node BlockNode
LastUpdatedTime time.Time
}
// NewQueue returns a new queue.
func NewQueue() *Queue {
return &Queue{
updateCh: make(chan struct{}, 1),
queuedSet: make(map[*QueuedNode]struct{}),
queuedOrder: make([]*QueuedNode, 0),
}
}
// Enqueue inserts a new BlockNode into the Queue. Enqueue is a no-op if the
// BlockNode is already in the Queue.
func (q *Queue) Enqueue(c *QueuedNode) {
q.mut.Lock()
defer q.mut.Unlock()
// Skip if already queued.
if _, ok := q.queuedSet[c]; ok {
return
}
q.queuedOrder = append(q.queuedOrder, c)
q.queuedSet[c] = struct{}{}
select {
case q.updateCh <- struct{}{}:
default:
}
}
// Chan returns a channel which is written to when the queue is non-empty.
func (q *Queue) Chan() <-chan struct{} { return q.updateCh }
// DequeueAll removes all BlockNode from the queue and returns them.
func (q *Queue) DequeueAll() []*QueuedNode {
q.mut.Lock()
defer q.mut.Unlock()
all := q.queuedOrder
q.queuedOrder = make([]*QueuedNode, 0)
q.queuedSet = make(map[*QueuedNode]struct{})
return all
}