-
Notifications
You must be signed in to change notification settings - Fork 126
/
pool_assigner.go
189 lines (175 loc) · 6.25 KB
/
pool_assigner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
package scheduler
import (
"time"
"github.com/gogo/protobuf/proto"
lru "github.com/hashicorp/golang-lru"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/util/clock"
"github.com/armadaproject/armada/internal/common/armadacontext"
"github.com/armadaproject/armada/internal/common/stringinterner"
"github.com/armadaproject/armada/internal/common/types"
"github.com/armadaproject/armada/internal/scheduler/configuration"
"github.com/armadaproject/armada/internal/scheduler/constraints"
schedulercontext "github.com/armadaproject/armada/internal/scheduler/context"
"github.com/armadaproject/armada/internal/scheduler/database"
"github.com/armadaproject/armada/internal/scheduler/internaltypes"
"github.com/armadaproject/armada/internal/scheduler/jobdb"
"github.com/armadaproject/armada/internal/scheduler/nodedb"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
)
// PoolAssigner allows jobs to be assigned to a pool
// Note that this is intended only for use with metrics calculation
type PoolAssigner interface {
Refresh(ctx *armadacontext.Context) error
AssignPool(j *jobdb.Job) (string, error)
}
type executor struct {
nodeDb *nodedb.NodeDb
minimumJobSize schedulerobjects.ResourceList
}
type DefaultPoolAssigner struct {
executorTimeout time.Duration
priorityClasses map[string]types.PriorityClass
priorities []int32
indexedResources []configuration.ResourceType
indexedTaints []string
indexedNodeLabels []string
wellKnownNodeTypes []configuration.WellKnownNodeType
poolByExecutorId map[string]string
executorsByPool map[string][]*executor
executorRepository database.ExecutorRepository
poolCache *lru.Cache
clock clock.Clock
resourceListFactory *internaltypes.ResourceListFactory
}
func NewPoolAssigner(executorTimeout time.Duration,
schedulingConfig configuration.SchedulingConfig,
executorRepository database.ExecutorRepository,
resourceListFactory *internaltypes.ResourceListFactory,
) (*DefaultPoolAssigner, error) {
poolCache, err := lru.New(10000)
if err != nil {
return nil, errors.Wrap(err, "error creating PoolAssigner pool cache")
}
return &DefaultPoolAssigner{
executorTimeout: executorTimeout,
priorityClasses: schedulingConfig.PriorityClasses,
executorsByPool: map[string][]*executor{},
poolByExecutorId: map[string]string{},
priorities: types.AllowedPriorities(schedulingConfig.PriorityClasses),
indexedResources: schedulingConfig.IndexedResources,
indexedTaints: schedulingConfig.IndexedTaints,
wellKnownNodeTypes: schedulingConfig.WellKnownNodeTypes,
indexedNodeLabels: schedulingConfig.IndexedNodeLabels,
executorRepository: executorRepository,
poolCache: poolCache,
clock: clock.RealClock{},
resourceListFactory: resourceListFactory,
}, nil
}
// Refresh updates executor state
func (p *DefaultPoolAssigner) Refresh(ctx *armadacontext.Context) error {
executors, err := p.executorRepository.GetExecutors(ctx)
executorsByPool := map[string][]*executor{}
poolByExecutorId := map[string]string{}
if err != nil {
return err
}
for _, e := range executors {
if p.clock.Since(e.LastUpdateTime) < p.executorTimeout {
poolByExecutorId[e.Id] = e.Pool
nodeDb, err := p.constructNodeDb(e.Nodes)
if err != nil {
return errors.WithMessagef(err, "could not construct node db for executor %s", e.Id)
}
executorsByPool[e.Pool] = append(executorsByPool[e.Pool], &executor{
nodeDb: nodeDb,
minimumJobSize: e.MinimumJobSize,
})
}
}
p.executorsByPool = executorsByPool
p.poolByExecutorId = poolByExecutorId
p.poolCache.Purge()
return nil
}
// AssignPool returns the pool associated with the job or the empty string if no pool is valid
func (p *DefaultPoolAssigner) AssignPool(j *jobdb.Job) (string, error) {
// If Job is running then use the pool associated with the executor it was assigned to
if !j.Queued() && j.HasRuns() {
return p.poolByExecutorId[j.LatestRun().Executor()], nil
}
// See if we have this set of reqs cached.
schedulingKey := j.SchedulingKey()
if cachedPool, ok := p.poolCache.Get(schedulingKey); ok {
return cachedPool.(string), nil
}
req := j.PodRequirements()
req = p.clearAnnotations(req)
// Otherwise iterate through each pool and detect the first one the job is potentially schedulable on.
// TODO: We should use the real scheduler instead since this check may go out of sync with the scheduler.
for pool, executors := range p.executorsByPool {
for _, e := range executors {
requests := req.GetResourceRequirements().Requests
if ok, _ := constraints.RequestsAreLargeEnough(schedulerobjects.ResourceListFromV1ResourceList(requests), e.minimumJobSize); !ok {
continue
}
nodeDb := e.nodeDb
txn := nodeDb.Txn(true)
jctx := &schedulercontext.JobSchedulingContext{
Created: time.Now(),
JobId: j.Id(),
Job: j,
PodRequirements: j.PodRequirements(),
GangInfo: schedulercontext.EmptyGangInfo(j),
}
node, err := nodeDb.SelectNodeForJobWithTxn(txn, jctx)
txn.Abort()
if err != nil {
return "", errors.WithMessagef(err, "error selecting node for job %s", j.Id())
}
if node != nil {
p.poolCache.Add(schedulingKey, pool)
return pool, nil
}
}
}
return "", nil
}
func (p *DefaultPoolAssigner) constructNodeDb(nodes []*schedulerobjects.Node) (*nodedb.NodeDb, error) {
// Nodes to be considered by the scheduler.
nodeDb, err := nodedb.NewNodeDb(
p.priorityClasses,
0,
p.indexedResources,
p.indexedTaints,
p.indexedNodeLabels,
p.wellKnownNodeTypes,
stringinterner.New(1024),
p.resourceListFactory,
)
if err != nil {
return nil, err
}
txn := nodeDb.Txn(true)
defer txn.Abort()
for _, node := range nodes {
if err := nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, node); err != nil {
return nil, err
}
}
txn.Commit()
err = nodeDb.ClearAllocated()
if err != nil {
return nil, err
}
return nodeDb, nil
}
// clearAnnotations
func (p *DefaultPoolAssigner) clearAnnotations(reqs *schedulerobjects.PodRequirements) *schedulerobjects.PodRequirements {
reqsCopy := proto.Clone(reqs).(*schedulerobjects.PodRequirements)
for key := range reqsCopy.GetAnnotations() {
reqsCopy.Annotations[key] = "poolassigner"
}
return reqsCopy
}