-
Notifications
You must be signed in to change notification settings - Fork 134
/
job.go
172 lines (157 loc) · 5.57 KB
/
job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package validation
import (
"github.com/pkg/errors"
"github.com/armadaproject/armada/internal/scheduler"
"github.com/armadaproject/armada/internal/armada/configuration"
"github.com/armadaproject/armada/internal/common/armadaerrors"
"github.com/armadaproject/armada/internal/common/util"
"github.com/armadaproject/armada/pkg/api"
)
func ValidateApiJobs(jobs []*api.Job, config configuration.SchedulingConfig) error {
if err := validateGangs(jobs); err != nil {
return err
}
for _, job := range jobs {
if err := ValidateApiJob(job, config); err != nil {
return err
}
}
return nil
}
func validateGangs(jobs []*api.Job) error {
gangDetailsByGangId := make(map[string]struct {
actualCardinality int
expectedCardinality int
expectedPriorityClassName string
expectedNodeUniformityLabel string
})
for i, job := range jobs {
annotations := job.Annotations
gangId, gangCardinality, isGangJob, err := scheduler.GangIdAndCardinalityFromAnnotations(annotations)
nodeUniformityLabel := annotations[configuration.GangNodeUniformityLabelAnnotation]
if err != nil {
return errors.WithMessagef(err, "%d-th job with id %s in gang %s", i, job.Id, gangId)
}
if !isGangJob {
continue
}
if gangId == "" {
return errors.Errorf("empty gang id for %d-th job with id %s", i, job.Id)
}
podSpec := util.PodSpecFromJob(job)
if details, ok := gangDetailsByGangId[gangId]; ok {
if details.expectedCardinality != gangCardinality {
return errors.Errorf(
"inconsistent gang cardinality for %d-th job with id %s in gang %s: expected %d but got %d",
i, job.Id, gangId, details.expectedCardinality, gangCardinality,
)
}
if podSpec != nil && details.expectedPriorityClassName != podSpec.PriorityClassName {
return errors.Errorf(
"inconsistent PriorityClassName for %d-th job with id %s in gang %s: expected %s but got %s",
i, job.Id, gangId, details.expectedPriorityClassName, podSpec.PriorityClassName,
)
}
if nodeUniformityLabel != details.expectedNodeUniformityLabel {
return errors.Errorf(
"inconsistent nodeUniformityLabel for %d-th job with id %s in gang %s: expected %s but got %s",
i, job.Id, gangId, details.expectedNodeUniformityLabel, nodeUniformityLabel,
)
}
details.actualCardinality++
gangDetailsByGangId[gangId] = details
} else {
details.actualCardinality = 1
details.expectedCardinality = gangCardinality
if podSpec != nil {
details.expectedPriorityClassName = podSpec.PriorityClassName
}
details.expectedNodeUniformityLabel = nodeUniformityLabel
gangDetailsByGangId[gangId] = details
}
}
for gangId, details := range gangDetailsByGangId {
if details.expectedCardinality != details.actualCardinality {
return errors.Errorf(
"unexpected number of jobs for gang %s: expected %d jobs but got %d",
gangId, details.expectedCardinality, details.actualCardinality,
)
}
}
return nil
}
func ValidateApiJob(job *api.Job, config configuration.SchedulingConfig) error {
if err := ValidateApiJobPodSpecs(job); err != nil {
return err
}
if err := validatePodSpecPriorityClass(job.PodSpec, true, config.Preemption.PriorityClasses); err != nil {
return err
}
for _, podSpec := range job.PodSpecs {
if err := validatePodSpecPriorityClass(podSpec, true, config.Preemption.PriorityClasses); err != nil {
return err
}
}
return nil
}
func ValidateApiJobPodSpecs(j *api.Job) error {
if j.PodSpec == nil && len(j.PodSpecs) == 0 {
return errors.WithStack(&armadaerrors.ErrInvalidArgument{
Name: "PodSpec",
Value: j.PodSpec,
Message: "Job does not contain at least one PodSpec",
})
}
// We only support jobs with a single PodSpec, and it must be set to j.PodSpec.
if j.PodSpec == nil && len(j.PodSpecs) == 1 {
j.PodSpec = j.PodSpecs[0]
j.PodSpecs = nil
}
// I'm not convinced that the code to create services/ingresses when multiple pods are submitted is correct.
// In particular, I think job.populateServicesIngresses is wrong.
// Hence, we return an error until we can make sure that the code is correct.
// The next error is redundant with this one, but we leave both since we may wish to remove this one.
// - Albin
if len(j.PodSpecs) > 0 {
return errors.WithStack(&armadaerrors.ErrInvalidArgument{
Name: "PodSpecs",
Value: j.PodSpecs,
Message: "Jobs with multiple pods are not supported",
})
}
// I'm not convinced the code is correct when combining j.PodSpec and j.PodSpecs.
// We should do more testing to make sure it's safe before we allow it.
// - Albin
if len(j.PodSpecs) > 0 && j.PodSpec != nil {
return errors.WithStack(&armadaerrors.ErrInvalidArgument{
Name: "PodSpec",
Value: j.PodSpec,
Message: "PodSpec must be nil if PodSpecs is provided (i.e., these are exclusive)",
})
}
return nil
}
func ValidateJobSubmitRequestItem(request *api.JobSubmitRequestItem) error {
return validateIngressConfigs(request)
}
func validateIngressConfigs(item *api.JobSubmitRequestItem) error {
existingPortSet := make(map[uint32]int)
for index, portConfig := range item.Ingress {
if len(portConfig.Ports) == 0 {
return errors.Errorf("ingress contains zero ports. Each ingress should have at least one port.")
}
for _, port := range portConfig.Ports {
if existingIndex, existing := existingPortSet[port]; existing {
return errors.Errorf(
"port %d has two ingress configurations, specified in ingress configs with indexes %d, %d. Each port should at maximum have one ingress configuration",
port,
existingIndex,
index,
)
} else {
existingPortSet[port] = index
}
}
}
return nil
}