forked from hashicorp/nomad
-
Notifications
You must be signed in to change notification settings - Fork 0
/
delay_heap.go
167 lines (141 loc) · 3.97 KB
/
delay_heap.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package lib
import (
"container/heap"
"fmt"
"time"
"github.com/hashicorp/nomad/nomad/structs"
)
// DelayHeap wraps a heap and gives operations other than Push/Pop.
// The inner heap is sorted by the time in the WaitUntil field of delayHeapNode
type DelayHeap struct {
index map[structs.NamespacedID]*delayHeapNode
heap delayedHeapImp
}
// HeapNode is an interface type implemented by objects stored in the DelayHeap
type HeapNode interface {
Data() interface{} // The data object
ID() string // ID of the object, used in conjunction with namespace for deduplication
Namespace() string // Namespace of the object, can be empty
}
// delayHeapNode encapsulates the node stored in DelayHeap
// WaitUntil is used as the sorting criteria
type delayHeapNode struct {
// Node is the data object stored in the delay heap
Node HeapNode
// WaitUntil is the time delay associated with the node
// Objects in the heap are sorted by WaitUntil
WaitUntil time.Time
index int
}
type delayedHeapImp []*delayHeapNode
func (h delayedHeapImp) Len() int {
return len(h)
}
func (h delayedHeapImp) Less(i, j int) bool {
// Two zero times should return false.
// Otherwise, zero is "greater" than any other time.
// (To sort it at the end of the list.)
// Sort such that zero times are at the end of the list.
iZero, jZero := h[i].WaitUntil.IsZero(), h[j].WaitUntil.IsZero()
if iZero && jZero {
return false
} else if iZero {
return false
} else if jZero {
return true
}
return h[i].WaitUntil.Before(h[j].WaitUntil)
}
func (h delayedHeapImp) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
h[i].index = i
h[j].index = j
}
func (h *delayedHeapImp) Push(x interface{}) {
node := x.(*delayHeapNode)
n := len(*h)
node.index = n
*h = append(*h, node)
}
func (h *delayedHeapImp) Pop() interface{} {
old := *h
n := len(old)
node := old[n-1]
node.index = -1 // for safety
*h = old[0 : n-1]
return node
}
func NewDelayHeap() *DelayHeap {
return &DelayHeap{
index: make(map[structs.NamespacedID]*delayHeapNode),
heap: make(delayedHeapImp, 0),
}
}
func (p *DelayHeap) Push(dataNode HeapNode, next time.Time) error {
tuple := structs.NamespacedID{
ID: dataNode.ID(),
Namespace: dataNode.Namespace(),
}
if _, ok := p.index[tuple]; ok {
return fmt.Errorf("node %q (%s) already exists", dataNode.ID(), dataNode.Namespace())
}
delayHeapNode := &delayHeapNode{dataNode, next, 0}
p.index[tuple] = delayHeapNode
heap.Push(&p.heap, delayHeapNode)
return nil
}
func (p *DelayHeap) Pop() *delayHeapNode {
if len(p.heap) == 0 {
return nil
}
delayHeapNode := heap.Pop(&p.heap).(*delayHeapNode)
tuple := structs.NamespacedID{
ID: delayHeapNode.Node.ID(),
Namespace: delayHeapNode.Node.Namespace(),
}
delete(p.index, tuple)
return delayHeapNode
}
func (p *DelayHeap) Peek() *delayHeapNode {
if len(p.heap) == 0 {
return nil
}
return p.heap[0]
}
func (p *DelayHeap) Contains(heapNode HeapNode) bool {
tuple := structs.NamespacedID{
ID: heapNode.ID(),
Namespace: heapNode.Namespace(),
}
_, ok := p.index[tuple]
return ok
}
func (p *DelayHeap) Update(heapNode HeapNode, waitUntil time.Time) error {
tuple := structs.NamespacedID{
ID: heapNode.ID(),
Namespace: heapNode.Namespace(),
}
if existingHeapNode, ok := p.index[tuple]; ok {
// Need to update the job as well because its spec can change.
existingHeapNode.Node = heapNode
existingHeapNode.WaitUntil = waitUntil
heap.Fix(&p.heap, existingHeapNode.index)
return nil
}
return fmt.Errorf("heap doesn't contain object with ID %q (%s)", heapNode.ID(), heapNode.Namespace())
}
func (p *DelayHeap) Remove(heapNode HeapNode) error {
tuple := structs.NamespacedID{
ID: heapNode.ID(),
Namespace: heapNode.Namespace(),
}
if node, ok := p.index[tuple]; ok {
heap.Remove(&p.heap, node.index)
delete(p.index, tuple)
return nil
}
return fmt.Errorf("heap doesn't contain object with ID %q (%s)", heapNode.ID(), heapNode.Namespace())
}
func (p *DelayHeap) Length() int {
return len(p.heap)
}