forked from hashicorp/nomad
-
Notifications
You must be signed in to change notification settings - Fork 0
/
watch_nodes.go
246 lines (207 loc) · 7 KB
/
watch_nodes.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
package drainer
import (
"context"
"log"
"time"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"golang.org/x/time/rate"
)
// DrainingNodeWatcher is the interface for watching for draining nodes.
type DrainingNodeWatcher interface{}
// TrackedNodes returns the set of tracked nodes
func (n *NodeDrainer) TrackedNodes() map[string]*structs.Node {
n.l.RLock()
defer n.l.RUnlock()
t := make(map[string]*structs.Node, len(n.nodes))
for n, d := range n.nodes {
t[n] = d.GetNode()
}
return t
}
// Remove removes the given node from being tracked
func (n *NodeDrainer) Remove(nodeID string) {
n.l.Lock()
defer n.l.Unlock()
// TODO test the notifier is updated
// Remove it from being tracked and remove it from the dealiner
delete(n.nodes, nodeID)
n.deadlineNotifier.Remove(nodeID)
}
// Update updates the node, either updating the tracked version or starting to
// track the node.
func (n *NodeDrainer) Update(node *structs.Node) {
n.l.Lock()
defer n.l.Unlock()
if node == nil {
return
}
draining, ok := n.nodes[node.ID]
if !ok {
draining = NewDrainingNode(node, n.state)
n.nodes[node.ID] = draining
} else {
// Update it
draining.Update(node)
}
// TODO test the notifier is updated
if inf, deadline := node.DrainStrategy.DeadlineTime(); !inf {
n.deadlineNotifier.Watch(node.ID, deadline)
} else {
// There is an infinite deadline so it shouldn't be tracked for
// deadlining
n.deadlineNotifier.Remove(node.ID)
}
// TODO Test this
// Register interest in the draining jobs.
jobs, err := draining.DrainingJobs()
if err != nil {
n.logger.Printf("[ERR] nomad.drain: error retrieving draining jobs on node %q: %v", node.ID, err)
return
}
n.logger.Printf("[TRACE] nomad.drain: node %q has %d draining jobs on it", node.ID, len(jobs))
n.jobWatcher.RegisterJobs(jobs)
// TODO Test at this layer as well that a node drain on a node without
// allocs immediately gets unmarked as draining
// Check if the node is done such that if an operator drains a node with
// nothing on it we unset drain
done, err := draining.IsDone()
if err != nil {
n.logger.Printf("[ERR] nomad.drain: failed to check if node %q is done draining: %v", node.ID, err)
return
}
if done {
// Node is done draining. Stop remaining system allocs before
// marking node as complete.
remaining, err := draining.RemainingAllocs()
if err != nil {
n.logger.Printf("[ERR] nomad.drain: error getting remaining allocs on drained node %q: %v",
node.ID, err)
} else if len(remaining) > 0 {
future := structs.NewBatchFuture()
n.drainAllocs(future, remaining)
if err := future.Wait(); err != nil {
n.logger.Printf("[ERR] nomad.drain: failed to drain %d remaining allocs from done node %q: %v",
len(remaining), node.ID, err)
}
}
index, err := n.raft.NodesDrainComplete([]string{node.ID})
if err != nil {
n.logger.Printf("[ERR] nomad.drain: failed to unset drain for node %q: %v", node.ID, err)
} else {
n.logger.Printf("[INFO] nomad.drain: node %q completed draining at index %d", node.ID, index)
}
}
}
// nodeDrainWatcher is used to watch nodes that are entering, leaving or
// changing their drain strategy.
type nodeDrainWatcher struct {
ctx context.Context
logger *log.Logger
// state is the state that is watched for state changes.
state *state.StateStore
// limiter is used to limit the rate of blocking queries
limiter *rate.Limiter
// tracker is the object that is tracking the nodes and provides us with the
// needed callbacks
tracker NodeTracker
}
// NewNodeDrainWatcher returns a new node drain watcher.
func NewNodeDrainWatcher(ctx context.Context, limiter *rate.Limiter, state *state.StateStore, logger *log.Logger, tracker NodeTracker) *nodeDrainWatcher {
w := &nodeDrainWatcher{
ctx: ctx,
limiter: limiter,
logger: logger,
tracker: tracker,
state: state,
}
go w.watch()
return w
}
// watch is the long lived watching routine that detects node changes.
func (w *nodeDrainWatcher) watch() {
nindex := uint64(1)
for {
w.logger.Printf("[TRACE] nomad.drain.node_watcher: getting nodes at index %d", nindex)
nodes, index, err := w.getNodes(nindex)
w.logger.Printf("[TRACE] nomad.drain.node_watcher: got nodes %d at index %d: %v", len(nodes), nindex, err)
if err != nil {
if err == context.Canceled {
w.logger.Printf("[TRACE] nomad.drain.node_watcher: shutting down")
return
}
w.logger.Printf("[ERR] nomad.drain.node_watcher: error watching node updates at index %d: %v", nindex, err)
select {
case <-w.ctx.Done():
w.logger.Printf("[TRACE] nomad.drain.node_watcher: shutting down")
return
case <-time.After(stateReadErrorDelay):
continue
}
}
// update index for next run
nindex = index
tracked := w.tracker.TrackedNodes()
for nodeID, node := range nodes {
newDraining := node.DrainStrategy != nil
currentNode, tracked := tracked[nodeID]
switch {
// If the node is tracked but not draining, untrack
case tracked && !newDraining:
w.logger.Printf("[TRACE] nomad.drain.node_watcher: tracked node %q is no longer draining", nodeID)
w.tracker.Remove(nodeID)
// If the node is not being tracked but is draining, track
case !tracked && newDraining:
w.logger.Printf("[TRACE] nomad.drain.node_watcher: untracked node %q is draining", nodeID)
w.tracker.Update(node)
// If the node is being tracked but has changed, update:
case tracked && newDraining && !currentNode.DrainStrategy.Equal(node.DrainStrategy):
w.logger.Printf("[TRACE] nomad.drain.node_watcher: tracked node %q has updated drain", nodeID)
w.tracker.Update(node)
default:
w.logger.Printf("[TRACE] nomad.drain.node_watcher: node %q at index %v: tracked %v, draining %v", nodeID, node.ModifyIndex, tracked, newDraining)
}
// TODO(schmichael) handle the case of a lost node
}
for nodeID := range tracked {
if _, ok := nodes[nodeID]; !ok {
w.logger.Printf("[TRACE] nomad.drain.node_watcher: tracked node %q is no longer exists", nodeID)
w.tracker.Remove(nodeID)
}
}
}
}
// getNodes returns all nodes blocking until the nodes are after the given index.
func (w *nodeDrainWatcher) getNodes(minIndex uint64) (map[string]*structs.Node, uint64, error) {
if err := w.limiter.Wait(w.ctx); err != nil {
return nil, 0, err
}
resp, index, err := w.state.BlockingQuery(w.getNodesImpl, minIndex, w.ctx)
if err != nil {
return nil, 0, err
}
return resp.(map[string]*structs.Node), index, nil
}
// getNodesImpl is used to get nodes from the state store, returning the set of
// nodes and the given index.
func (w *nodeDrainWatcher) getNodesImpl(ws memdb.WatchSet, state *state.StateStore) (interface{}, uint64, error) {
iter, err := state.Nodes(ws)
if err != nil {
return nil, 0, err
}
index, err := state.Index("nodes")
if err != nil {
return nil, 0, err
}
resp := make(map[string]*structs.Node, 64)
for {
raw := iter.Next()
if raw == nil {
break
}
node := raw.(*structs.Node)
resp[node.ID] = node
}
return resp, index, nil
}