forked from hashicorp/nomad
-
Notifications
You must be signed in to change notification settings - Fork 0
/
stack.go
274 lines (224 loc) · 9.12 KB
/
stack.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
package scheduler
import (
"math"
"time"
"github.com/hashicorp/nomad/nomad/structs"
)
const (
// serviceJobAntiAffinityPenalty is the penalty applied
// to the score for placing an alloc on a node that
// already has an alloc for this job.
serviceJobAntiAffinityPenalty = 10.0
// batchJobAntiAffinityPenalty is the same as the
// serviceJobAntiAffinityPenalty but for batch type jobs.
batchJobAntiAffinityPenalty = 5.0
)
// Stack is a chained collection of iterators. The stack is used to
// make placement decisions. Different schedulers may customize the
// stack they use to vary the way placements are made.
type Stack interface {
// SetNodes is used to set the base set of potential nodes
SetNodes([]*structs.Node)
// SetTaskGroup is used to set the job for selection
SetJob(job *structs.Job)
// Select is used to select a node for the task group
Select(tg *structs.TaskGroup) (*RankedNode, *structs.Resources)
}
// GenericStack is the Stack used for the Generic scheduler. It is
// designed to make better placement decisions at the cost of performance.
type GenericStack struct {
batch bool
ctx Context
source *StaticIterator
wrappedChecks *FeasibilityWrapper
jobConstraint *ConstraintChecker
taskGroupDrivers *DriverChecker
taskGroupConstraint *ConstraintChecker
proposedAllocConstraint *ProposedAllocConstraintIterator
binPack *BinPackIterator
jobAntiAff *JobAntiAffinityIterator
limit *LimitIterator
maxScore *MaxScoreIterator
}
// NewGenericStack constructs a stack used for selecting service placements
func NewGenericStack(batch bool, ctx Context) *GenericStack {
// Create a new stack
s := &GenericStack{
batch: batch,
ctx: ctx,
}
// Create the source iterator. We randomize the order we visit nodes
// to reduce collisions between schedulers and to do a basic load
// balancing across eligible nodes.
s.source = NewRandomIterator(ctx, nil)
// Attach the job constraints. The job is filled in later.
s.jobConstraint = NewConstraintChecker(ctx, nil)
// Filter on task group drivers first as they are faster
s.taskGroupDrivers = NewDriverChecker(ctx, nil)
// Filter on task group constraints second
s.taskGroupConstraint = NewConstraintChecker(ctx, nil)
// Create the feasibility wrapper which wraps all feasibility checks in
// which feasibility checking can be skipped if the computed node class has
// previously been marked as eligible or ineligible. Generally this will be
// checks that only needs to examine the single node to determine feasibility.
jobs := []FeasibilityChecker{s.jobConstraint}
tgs := []FeasibilityChecker{s.taskGroupDrivers, s.taskGroupConstraint}
s.wrappedChecks = NewFeasibilityWrapper(ctx, s.source, jobs, tgs)
// Filter on constraints that are affected by propsed allocations.
s.proposedAllocConstraint = NewProposedAllocConstraintIterator(ctx, s.wrappedChecks)
// Upgrade from feasible to rank iterator
rankSource := NewFeasibleRankIterator(ctx, s.proposedAllocConstraint)
// Apply the bin packing, this depends on the resources needed
// by a particular task group. Only enable eviction for the service
// scheduler as that logic is expensive.
evict := !batch
s.binPack = NewBinPackIterator(ctx, rankSource, evict, 0)
// Apply the job anti-affinity iterator. This is to avoid placing
// multiple allocations on the same node for this job. The penalty
// is less for batch jobs as it matters less.
penalty := serviceJobAntiAffinityPenalty
if batch {
penalty = batchJobAntiAffinityPenalty
}
s.jobAntiAff = NewJobAntiAffinityIterator(ctx, s.binPack, penalty, "")
// Apply a limit function. This is to avoid scanning *every* possible node.
s.limit = NewLimitIterator(ctx, s.jobAntiAff, 2)
// Select the node with the maximum score for placement
s.maxScore = NewMaxScoreIterator(ctx, s.limit)
return s
}
func (s *GenericStack) SetNodes(baseNodes []*structs.Node) {
// Shuffle base nodes
shuffleNodes(baseNodes)
// Update the set of base nodes
s.source.SetNodes(baseNodes)
// Apply a limit function. This is to avoid scanning *every* possible node.
// For batch jobs we only need to evaluate 2 options and depend on the
// power of two choices. For services jobs we need to visit "enough".
// Using a log of the total number of nodes is a good restriction, with
// at least 2 as the floor
limit := 2
if n := len(baseNodes); !s.batch && n > 0 {
logLimit := int(math.Ceil(math.Log2(float64(n))))
if logLimit > limit {
limit = logLimit
}
}
s.limit.SetLimit(limit)
}
func (s *GenericStack) SetJob(job *structs.Job) {
s.jobConstraint.SetConstraints(job.Constraints)
s.proposedAllocConstraint.SetJob(job)
s.binPack.SetPriority(job.Priority)
s.jobAntiAff.SetJob(job.ID)
s.ctx.Eligibility().SetJob(job)
}
func (s *GenericStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Resources) {
// Reset the max selector and context
s.maxScore.Reset()
s.ctx.Reset()
start := time.Now()
// Get the task groups constraints.
tgConstr := taskGroupConstraints(tg)
// Update the parameters of iterators
s.taskGroupDrivers.SetDrivers(tgConstr.drivers)
s.taskGroupConstraint.SetConstraints(tgConstr.constraints)
s.proposedAllocConstraint.SetTaskGroup(tg)
s.wrappedChecks.SetTaskGroup(tg.Name)
s.binPack.SetTaskGroup(tg)
// Find the node with the max score
option := s.maxScore.Next()
// Ensure that the task resources were specified
if option != nil && len(option.TaskResources) != len(tg.Tasks) {
for _, task := range tg.Tasks {
option.SetTaskResources(task, task.Resources)
}
}
// Store the compute time
s.ctx.Metrics().AllocationTime = time.Since(start)
return option, tgConstr.size
}
// SelectPreferredNode returns a node where an allocation of the task group can
// be placed, the node passed to it is preferred over the other available nodes
func (s *GenericStack) SelectPreferringNodes(tg *structs.TaskGroup, nodes []*structs.Node) (*RankedNode, *structs.Resources) {
originalNodes := s.source.nodes
s.source.SetNodes(nodes)
if option, resources := s.Select(tg); option != nil {
s.source.SetNodes(originalNodes)
return option, resources
}
s.source.SetNodes(originalNodes)
return s.Select(tg)
}
// SystemStack is the Stack used for the System scheduler. It is designed to
// attempt to make placements on all nodes.
type SystemStack struct {
ctx Context
source *StaticIterator
wrappedChecks *FeasibilityWrapper
jobConstraint *ConstraintChecker
taskGroupDrivers *DriverChecker
taskGroupConstraint *ConstraintChecker
binPack *BinPackIterator
}
// NewSystemStack constructs a stack used for selecting service placements
func NewSystemStack(ctx Context) *SystemStack {
// Create a new stack
s := &SystemStack{ctx: ctx}
// Create the source iterator. We visit nodes in a linear order because we
// have to evaluate on all nodes.
s.source = NewStaticIterator(ctx, nil)
// Attach the job constraints. The job is filled in later.
s.jobConstraint = NewConstraintChecker(ctx, nil)
// Filter on task group drivers first as they are faster
s.taskGroupDrivers = NewDriverChecker(ctx, nil)
// Filter on task group constraints second
s.taskGroupConstraint = NewConstraintChecker(ctx, nil)
// Create the feasibility wrapper which wraps all feasibility checks in
// which feasibility checking can be skipped if the computed node class has
// previously been marked as eligible or ineligible. Generally this will be
// checks that only needs to examine the single node to determine feasibility.
jobs := []FeasibilityChecker{s.jobConstraint}
tgs := []FeasibilityChecker{s.taskGroupDrivers, s.taskGroupConstraint}
s.wrappedChecks = NewFeasibilityWrapper(ctx, s.source, jobs, tgs)
// Upgrade from feasible to rank iterator
rankSource := NewFeasibleRankIterator(ctx, s.wrappedChecks)
// Apply the bin packing, this depends on the resources needed
// by a particular task group. Enable eviction as system jobs are high
// priority.
s.binPack = NewBinPackIterator(ctx, rankSource, true, 0)
return s
}
func (s *SystemStack) SetNodes(baseNodes []*structs.Node) {
// Update the set of base nodes
s.source.SetNodes(baseNodes)
}
func (s *SystemStack) SetJob(job *structs.Job) {
s.jobConstraint.SetConstraints(job.Constraints)
s.binPack.SetPriority(job.Priority)
s.ctx.Eligibility().SetJob(job)
}
func (s *SystemStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Resources) {
// Reset the binpack selector and context
s.binPack.Reset()
s.ctx.Reset()
start := time.Now()
// Get the task groups constraints.
tgConstr := taskGroupConstraints(tg)
// Update the parameters of iterators
s.taskGroupDrivers.SetDrivers(tgConstr.drivers)
s.taskGroupConstraint.SetConstraints(tgConstr.constraints)
s.binPack.SetTaskGroup(tg)
s.wrappedChecks.SetTaskGroup(tg.Name)
// Get the next option that satisfies the constraints.
option := s.binPack.Next()
// Ensure that the task resources were specified
if option != nil && len(option.TaskResources) != len(tg.Tasks) {
for _, task := range tg.Tasks {
option.SetTaskResources(task, task.Resources)
}
}
// Store the compute time
s.ctx.Metrics().AllocationTime = time.Since(start)
return option, tgConstr.size
}