forked from samsungnext/nomad
/
drain_heap.go
160 lines (136 loc) · 3.61 KB
/
drain_heap.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package drainer
import (
"context"
"sync"
"time"
)
// DrainDeadlineNotifier allows batch notification of nodes that have reached
// their drain deadline.
type DrainDeadlineNotifier interface {
// NextBatch returns the next batch of nodes that have reached their
// deadline.
NextBatch() <-chan []string
// Remove removes the given node from being tracked for a deadline.
Remove(nodeID string)
// Watch marks the given node for being watched for its deadline.
Watch(nodeID string, deadline time.Time)
}
// deadlineHeap implements the DrainDeadlineNotifier and is backed by a min-heap
// to efficiently determine the next deadlining node. It also supports
// coalescing several deadlines into a single emission.
type deadlineHeap struct {
ctx context.Context
coalesceWindow time.Duration
batch chan []string
nodes map[string]time.Time
trigger chan struct{}
mu sync.Mutex
}
// NewDeadlineHeap returns a new deadline heap that coalesces for the given
// duration and will stop watching when the passed context is cancelled.
func NewDeadlineHeap(ctx context.Context, coalesceWindow time.Duration) *deadlineHeap {
d := &deadlineHeap{
ctx: ctx,
coalesceWindow: coalesceWindow,
batch: make(chan []string),
nodes: make(map[string]time.Time, 64),
trigger: make(chan struct{}, 1),
}
go d.watch()
return d
}
func (d *deadlineHeap) watch() {
timer := time.NewTimer(0)
timer.Stop()
select {
case <-timer.C:
default:
}
defer timer.Stop()
var nextDeadline time.Time
for {
select {
case <-d.ctx.Done():
return
case <-timer.C:
var batch []string
d.mu.Lock()
for nodeID, nodeDeadline := range d.nodes {
if !nodeDeadline.After(nextDeadline) {
batch = append(batch, nodeID)
delete(d.nodes, nodeID)
}
}
d.mu.Unlock()
if len(batch) > 0 {
// Send the batch
select {
case d.batch <- batch:
case <-d.ctx.Done():
return
}
}
case <-d.trigger:
}
// Calculate the next deadline
deadline, ok := d.calculateNextDeadline()
if !ok {
continue
}
// If the deadline is zero, it is a force drain. Otherwise if the
// deadline is in the future, see if we already have a timer setup to
// handle it. If we don't create the timer.
if deadline.IsZero() || !deadline.Equal(nextDeadline) {
timer.Reset(deadline.Sub(time.Now()))
nextDeadline = deadline
}
}
}
// calculateNextDeadline returns the next deadline in which to scan for
// deadlined nodes. It applies the coalesce window.
func (d *deadlineHeap) calculateNextDeadline() (time.Time, bool) {
d.mu.Lock()
defer d.mu.Unlock()
if len(d.nodes) == 0 {
return time.Time{}, false
}
// Calculate the new timer value
var deadline time.Time
for _, v := range d.nodes {
if deadline.IsZero() || v.Before(deadline) {
deadline = v
}
}
var maxWithinWindow time.Time
coalescedDeadline := deadline.Add(d.coalesceWindow)
for _, nodeDeadline := range d.nodes {
if nodeDeadline.Before(coalescedDeadline) {
if maxWithinWindow.IsZero() || nodeDeadline.After(maxWithinWindow) {
maxWithinWindow = nodeDeadline
}
}
}
return maxWithinWindow, true
}
// NextBatch returns the next batch of nodes to be drained.
func (d *deadlineHeap) NextBatch() <-chan []string {
return d.batch
}
func (d *deadlineHeap) Remove(nodeID string) {
d.mu.Lock()
defer d.mu.Unlock()
delete(d.nodes, nodeID)
select {
case d.trigger <- struct{}{}:
default:
}
}
func (d *deadlineHeap) Watch(nodeID string, deadline time.Time) {
d.mu.Lock()
defer d.mu.Unlock()
d.nodes[nodeID] = deadline
select {
case d.trigger <- struct{}{}:
default:
}
}