-
Notifications
You must be signed in to change notification settings - Fork 134
/
submit.go
206 lines (171 loc) · 6.38 KB
/
submit.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
package job
import (
"fmt"
"regexp"
"sync"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
k8s_errors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/armadaproject/armada/internal/common/armadaerrors"
"github.com/armadaproject/armada/internal/common/util"
"github.com/armadaproject/armada/internal/executor/configuration"
"github.com/armadaproject/armada/internal/executor/context"
"github.com/armadaproject/armada/internal/executor/domain"
util2 "github.com/armadaproject/armada/internal/executor/util"
"github.com/armadaproject/armada/pkg/api"
)
type Submitter interface {
SubmitApiJobs(jobsToSubmit []*api.Job) []*FailedSubmissionDetails
SubmitJobs(jobsToSubmit []*SubmitJob) []*FailedSubmissionDetails
}
type SubmitService struct {
clusterContext context.ClusterContext
podDefaults *configuration.PodDefaults
submissionThreadCount int
fatalPodSubmissionErrors []string
}
func NewSubmitter(
clusterContext context.ClusterContext,
podDefaults *configuration.PodDefaults,
submissionThreadCount int,
fatalPodSubmissionErrors []string,
) *SubmitService {
return &SubmitService{
clusterContext: clusterContext,
podDefaults: podDefaults,
submissionThreadCount: submissionThreadCount,
fatalPodSubmissionErrors: fatalPodSubmissionErrors,
}
}
type FailedSubmissionDetails struct {
JobRunMeta *RunMeta
Pod *v1.Pod
Error error
Recoverable bool
}
func (submitService *SubmitService) SubmitApiJobs(jobsToSubmit []*api.Job) []*FailedSubmissionDetails {
submitJobs := CreateSubmitJobsFromApiJobs(jobsToSubmit, submitService.podDefaults)
return submitService.submitJobs(submitJobs)
}
func (submitService *SubmitService) SubmitJobs(jobsToSubmit []*SubmitJob) []*FailedSubmissionDetails {
return submitService.submitJobs(jobsToSubmit)
}
func (submitService *SubmitService) submitJobs(jobsToSubmit []*SubmitJob) []*FailedSubmissionDetails {
wg := &sync.WaitGroup{}
submitJobsChannel := make(chan *SubmitJob)
failedJobsChannel := make(chan *FailedSubmissionDetails, len(jobsToSubmit))
for i := 0; i < submitService.submissionThreadCount; i++ {
wg.Add(1)
go submitService.submitWorker(wg, submitJobsChannel, failedJobsChannel)
}
for _, job := range jobsToSubmit {
submitJobsChannel <- job
}
close(submitJobsChannel)
wg.Wait()
close(failedJobsChannel)
toBeFailedJobs := make([]*FailedSubmissionDetails, 0, len(failedJobsChannel))
for failedJob := range failedJobsChannel {
toBeFailedJobs = append(toBeFailedJobs, failedJob)
}
return toBeFailedJobs
}
func (submitService *SubmitService) submitWorker(wg *sync.WaitGroup, jobsToSubmitChannel chan *SubmitJob, failedJobsChannel chan *FailedSubmissionDetails) {
defer wg.Done()
for job := range jobsToSubmitChannel {
jobPods := []*v1.Pod{}
pod, err := submitService.submitPod(job)
jobPods = append(jobPods, pod)
if err != nil {
log.Errorf("Failed to submit job %s because %s", job.Meta.RunMeta.JobId, err)
errDetails := &FailedSubmissionDetails{
JobRunMeta: job.Meta.RunMeta,
Pod: pod,
Error: err,
Recoverable: submitService.isRecoverable(err),
}
failedJobsChannel <- errDetails
// remove just created pods
submitService.clusterContext.DeletePods(jobPods)
}
}
}
// submitPod submits a pod to k8s together with any services and ingresses bundled with the Armada job.
// This function may fail partly, i.e., it may successfully create a subset of the requested objects before failing.
// In case of failure, any already created objects are not cleaned up.
func (submitService *SubmitService) submitPod(job *SubmitJob) (*v1.Pod, error) {
pod := job.Pod
// Ensure the K8SService and K8SIngress fields are populated
submitService.applyExecutorSpecificIngressDetails(job)
if len(job.Ingresses) > 0 || len(job.Services) > 0 {
pod.Annotations = util.MergeMaps(pod.Annotations, map[string]string{
domain.HasIngress: "true",
domain.AssociatedServicesCount: fmt.Sprintf("%d", len(job.Services)),
domain.AssociatedIngressesCount: fmt.Sprintf("%d", len(job.Ingresses)),
})
}
submittedPod, err := submitService.clusterContext.SubmitPod(pod, job.Meta.Owner, job.Meta.OwnershipGroups)
if err != nil {
return pod, err
}
for _, service := range job.Services {
service.ObjectMeta.OwnerReferences = []metav1.OwnerReference{util2.CreateOwnerReference(submittedPod)}
_, err = submitService.clusterContext.SubmitService(service)
if err != nil {
return pod, err
}
}
for _, ingress := range job.Ingresses {
ingress.ObjectMeta.OwnerReferences = []metav1.OwnerReference{util2.CreateOwnerReference(submittedPod)}
_, err = submitService.clusterContext.SubmitIngress(ingress)
if err != nil {
return pod, err
}
}
return pod, err
}
// applyExecutorSpecificIngressDetails populates the executor specific details on ingresses
// These objects are mostly created server side however there will be details that are not known until submit time
// So the executor must fill them in before it creates the objects in kubernetes
func (submitService *SubmitService) applyExecutorSpecificIngressDetails(job *SubmitJob) {
for _, ingress := range job.Ingresses {
ingress.Annotations = util.MergeMaps(
ingress.Annotations,
submitService.podDefaults.Ingress.Annotations,
)
// We need to use indexing here since Spec.Rules isn't pointers.
for i := range ingress.Spec.Rules {
ingress.Spec.Rules[i].Host += submitService.podDefaults.Ingress.HostnameSuffix
}
// We need to use indexing here since Spec.TLS isn't pointers.
for i := range ingress.Spec.TLS {
ingress.Spec.TLS[i].SecretName += submitService.podDefaults.Ingress.CertNameSuffix
for j := range ingress.Spec.TLS[i].Hosts {
ingress.Spec.TLS[i].Hosts[j] += submitService.podDefaults.Ingress.HostnameSuffix
}
}
}
}
func (submitService *SubmitService) isRecoverable(err error) bool {
if apiStatus, ok := err.(k8s_errors.APIStatus); ok {
status := apiStatus.Status()
if status.Reason == metav1.StatusReasonInvalid ||
status.Reason == metav1.StatusReasonForbidden {
return false
}
for _, errorMessage := range submitService.fatalPodSubmissionErrors {
ok, err := regexp.MatchString(errorMessage, err.Error())
if err == nil && ok {
return false
}
}
return true
}
var e *armadaerrors.ErrCreateResource
if errors.As(err, &e) {
return true
}
return false
}