-
Notifications
You must be signed in to change notification settings - Fork 608
/
constraint_enforcer.go
232 lines (206 loc) · 7.32 KB
/
constraint_enforcer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
package constraintenforcer
import (
"time"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/api/genericresource"
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/manager/constraint"
"github.com/docker/swarmkit/manager/state"
"github.com/docker/swarmkit/manager/state/store"
"github.com/docker/swarmkit/protobuf/ptypes"
)
// ConstraintEnforcer watches for updates to nodes and shuts down tasks that no
// longer satisfy scheduling constraints or resource limits.
type ConstraintEnforcer struct {
store *store.MemoryStore
stopChan chan struct{}
doneChan chan struct{}
}
// New creates a new ConstraintEnforcer.
func New(store *store.MemoryStore) *ConstraintEnforcer {
return &ConstraintEnforcer{
store: store,
stopChan: make(chan struct{}),
doneChan: make(chan struct{}),
}
}
// Run is the ConstraintEnforcer's main loop.
func (ce *ConstraintEnforcer) Run() {
defer close(ce.doneChan)
watcher, cancelWatch := state.Watch(ce.store.WatchQueue(), api.EventUpdateNode{})
defer cancelWatch()
var (
nodes []*api.Node
err error
)
ce.store.View(func(readTx store.ReadTx) {
nodes, err = store.FindNodes(readTx, store.All)
})
if err != nil {
log.L.WithError(err).Error("failed to check nodes for noncompliant tasks")
} else {
for _, node := range nodes {
ce.rejectNoncompliantTasks(node)
}
}
for {
select {
case event := <-watcher:
node := event.(api.EventUpdateNode).Node
ce.rejectNoncompliantTasks(node)
case <-ce.stopChan:
return
}
}
}
func (ce *ConstraintEnforcer) rejectNoncompliantTasks(node *api.Node) {
// If the availability is "drain", the orchestrator will
// shut down all tasks.
// If the availability is "pause", we shouldn't touch
// the tasks on this node.
if node.Spec.Availability != api.NodeAvailabilityActive {
return
}
var (
tasks []*api.Task
err error
)
services := map[string]*api.Service{}
ce.store.View(func(tx store.ReadTx) {
tasks, err = store.FindTasks(tx, store.ByNodeID(node.ID))
if err != nil {
return
}
// Deduplicate service IDs using the services map. It's okay for the
// values to be nil for now, we will look them up from the store next.
for _, task := range tasks {
services[task.ServiceID] = nil
}
for serviceID := range services {
services[serviceID] = store.GetService(tx, serviceID)
}
})
if err != nil {
log.L.WithError(err).Errorf("failed to list tasks for node ID %s", node.ID)
}
available := &api.Resources{}
var fakeStore []*api.GenericResource
if node.Description != nil && node.Description.Resources != nil {
available = node.Description.Resources.Copy()
}
removeTasks := make(map[string]*api.Task)
// TODO(aaronl): The set of tasks removed will be
// nondeterministic because it depends on the order of
// the slice returned from FindTasks. We could do
// a separate pass over the tasks for each type of
// resource, and sort by the size of the reservation
// to remove the most resource-intensive tasks.
loop:
for _, t := range tasks {
if t.DesiredState < api.TaskStateAssigned || t.DesiredState > api.TaskStateCompleted {
continue
}
// Ensure that the node still satisfies placement constraints.
// NOTE: If the task is associacted with a service then we must use the
// constraints from the current service spec rather than the
// constraints from the task spec because they may be outdated. This
// will happen if the service was previously updated in a way which
// only changes the placement constraints and the node matched the
// placement constraints both before and after that update. In the case
// of such updates, the tasks are not considered "dirty" and are not
// restarted but it will mean that the task spec's placement
// constraints are outdated. Consider this example:
// - A service is created with no constraints and a task is scheduled
// to a node.
// - The node is updated to add a label, this doesn't affect the task
// on that node because it has no constraints.
// - The service is updated to add a node label constraint which
// matches the label which was just added to the node. The updater
// does not shut down the task because the only the constraints have
// changed and the node still matches the updated constraints.
// - The node is updated to remove the node label. The node no longer
// satisfies the placement constraints of the service, so the task
// should be shutdown. However, the task's spec still has the
// original and outdated constraints (that are still satisfied by
// the node). If we used those original constraints then the task
// would incorrectly not be removed. This is why the constraints
// from the service spec should be used instead.
var placement *api.Placement
if service := services[t.ServiceID]; service != nil {
// This task is associated with a service, so we use the service's
// current placement constraints.
placement = service.Spec.Task.Placement
} else {
// This task is not associated with a service (or the service no
// longer exists), so we use the placement constraints from the
// original task spec.
placement = t.Spec.Placement
}
if placement != nil && len(placement.Constraints) > 0 {
constraints, _ := constraint.Parse(placement.Constraints)
if !constraint.NodeMatches(constraints, node) {
removeTasks[t.ID] = t
continue
}
}
// Ensure that the task assigned to the node
// still satisfies the resource limits.
if t.Spec.Resources != nil && t.Spec.Resources.Reservations != nil {
if t.Spec.Resources.Reservations.MemoryBytes > available.MemoryBytes {
removeTasks[t.ID] = t
continue
}
if t.Spec.Resources.Reservations.NanoCPUs > available.NanoCPUs {
removeTasks[t.ID] = t
continue
}
for _, ta := range t.AssignedGenericResources {
// Type change or no longer available
if genericresource.HasResource(ta, available.Generic) {
removeTasks[t.ID] = t
break loop
}
}
available.MemoryBytes -= t.Spec.Resources.Reservations.MemoryBytes
available.NanoCPUs -= t.Spec.Resources.Reservations.NanoCPUs
genericresource.ClaimResources(&available.Generic,
&fakeStore, t.AssignedGenericResources)
}
}
if len(removeTasks) != 0 {
err := ce.store.Batch(func(batch *store.Batch) error {
for _, t := range removeTasks {
err := batch.Update(func(tx store.Tx) error {
t = store.GetTask(tx, t.ID)
if t == nil || t.DesiredState > api.TaskStateCompleted {
return nil
}
// We set the observed state to
// REJECTED, rather than the desired
// state. Desired state is owned by the
// orchestrator, and setting it directly
// will bypass actions such as
// restarting the task on another node
// (if applicable).
t.Status.State = api.TaskStateRejected
t.Status.Message = "task rejected by constraint enforcer"
t.Status.Err = "assigned node no longer meets constraints"
t.Status.Timestamp = ptypes.MustTimestampProto(time.Now())
return store.UpdateTask(tx, t)
})
if err != nil {
log.L.WithError(err).Errorf("failed to shut down task %s", t.ID)
}
}
return nil
})
if err != nil {
log.L.WithError(err).Errorf("failed to shut down tasks")
}
}
}
// Stop stops the ConstraintEnforcer and waits for the main loop to exit.
func (ce *ConstraintEnforcer) Stop() {
close(ce.stopChan)
<-ce.doneChan
}