/
api_job.go
372 lines (332 loc) · 9.67 KB
/
api_job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
package main
import (
"encoding/json"
"fmt"
"net/http"
"strconv"
"time"
log "github.com/Sirupsen/logrus"
docker "github.com/fsouza/go-dockerclient"
)
// JobHandler dispatches API calls to /job based on request type.
func JobHandler(c *Context, w http.ResponseWriter, r *http.Request) {
switch r.Method {
case "GET":
JobListHandler(c, w, r)
case "POST":
JobSubmitHandler(c, w, r)
default:
APIError{
Code: CodeMethodNotSupported,
Message: "Method not supported",
Hint: "Use GET or POST against this endpoint.",
Retry: false,
}.Report(http.StatusMethodNotAllowed, w)
}
}
// JobSubmitHandler enqueues a new job associated with the authenticated account.
func JobSubmitHandler(c *Context, w http.ResponseWriter, r *http.Request) {
type Request struct {
Jobs []Job `json:"jobs"`
}
type Response struct {
JIDs []uint64 `json:"jids"`
}
account, err := Authenticate(c, w, r)
if err != nil {
log.WithFields(log.Fields{
"error": err,
}).Error("Authentication failure.")
return
}
var req Request
err = json.NewDecoder(r.Body).Decode(&req)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"account": account.Name,
}).Error("Unable to parse JSON.")
APIError{
Code: CodeInvalidJobJSON,
Message: fmt.Sprintf("Unable to parse job payload as JSON: %v", err),
Hint: "Please supply valid JSON in your request.",
Retry: false,
}.Report(http.StatusBadRequest, w)
return
}
jids := make([]uint64, len(req.Jobs))
for index, job := range req.Jobs {
// Validate the job.
if err := job.Validate(); err != nil {
log.WithFields(log.Fields{
"account": account.Name,
"job": job,
"error": err,
}).Error("Invalid job submitted.")
err.Report(http.StatusBadRequest, w)
return
}
// Pack the job into a SubmittedJob and store it.
submitted := SubmittedJob{
Job: job,
CreatedAt: StoreTime(time.Now()),
Status: StatusQueued,
Account: account.Name,
}
jid, err := c.InsertJob(submitted)
if err != nil {
log.WithFields(log.Fields{
"account": account.Name,
"error": err,
}).Error("Unable to enqueue a submitted job.")
APIError{
Code: CodeEnqueueFailure,
Message: "Unable to enqueue your job.",
Retry: true,
}.Report(http.StatusServiceUnavailable, w)
return
}
jids[index] = jid
log.WithFields(log.Fields{
"jid": jid,
"job": job,
"account": account.Name,
}).Info("Successfully submitted a job.")
}
response := Response{JIDs: jids}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
// JobListHandler provides updated details about one or more jobs currently submitted to the
// cluster.
func JobListHandler(c *Context, w http.ResponseWriter, r *http.Request) {
account, err := Authenticate(c, w, r)
if err != nil {
log.WithFields(log.Fields{
"error": err,
}).Error("Authentication failure.")
return
}
if err := r.ParseForm(); err != nil {
APIError{
Code: CodeUnableToParseQuery,
Message: fmt.Sprintf("Unable to parse query parameters: %v", err),
Hint: "You broke Go's URL parsing somehow! Make URLs that suck less.",
Retry: false,
}.Log(account).Report(http.StatusBadRequest, w)
return
}
q := JobQuery{AccountName: account.Name}
if rawJIDs, ok := r.Form["jid"]; ok {
jids := make([]uint64, len(rawJIDs))
for i, rawJID := range rawJIDs {
if jids[i], err = strconv.ParseUint(rawJID, 10, 64); err != nil {
APIError{
Code: CodeUnableToParseQuery,
Message: fmt.Sprintf("Unable to parse JID [%s]: %v", rawJID, err),
Hint: "Please only use valid JIDs.",
Retry: false,
}.Log(account).Report(http.StatusBadRequest, w)
return
}
}
q.JIDs = jids
}
if names, ok := r.Form["name"]; ok {
q.Names = names
}
if statuses, ok := r.Form["status"]; ok {
q.Statuses = statuses
}
if rawLimit := r.FormValue("limit"); rawLimit != "" {
limit, err := strconv.ParseInt(rawLimit, 10, 0)
if err != nil {
APIError{
Code: CodeUnableToParseQuery,
Message: fmt.Sprintf("Unable to parse limit [%s]: %v", rawLimit, err),
Hint: "Please specify a valid integral limit.",
Retry: false,
}.Log(account).Report(http.StatusBadRequest, w)
return
}
if limit > 9999 {
limit = 9999
}
if limit < 1 {
APIError{
Code: CodeUnableToParseQuery,
Message: fmt.Sprintf("Invalid negative or zero limit [%d]", limit),
Hint: "Please specify a valid, positive integral limit.",
Retry: false,
}.Log(account).Report(http.StatusBadRequest, w)
return
}
q.Limit = int(limit)
} else {
q.Limit = 1000
}
if rawBefore := r.FormValue("before"); rawBefore != "" {
before, err := strconv.ParseUint(rawBefore, 10, 64)
if err != nil {
APIError{
Code: CodeUnableToParseQuery,
Message: fmt.Sprintf(`Unable to parse Before bound [%s]: %v`, rawBefore, err),
Hint: "Please specify a valid integral JID as the lower bound.",
Retry: false,
}.Log(account).Report(http.StatusBadRequest, w)
return
}
q.Before = before
}
if rawAfter := r.FormValue("after"); rawAfter != "" {
after, err := strconv.ParseUint(rawAfter, 10, 64)
if err != nil {
APIError{
Code: CodeUnableToParseQuery,
Message: fmt.Sprintf(`Unable to parse After bound [%s]: %v`, rawAfter, err),
Hint: "Please specify a valid integral JID as the upper bound.",
Retry: false,
}.Log(account).Report(http.StatusBadRequest, w)
return
}
q.After = after
}
results, err := c.ListJobs(q)
if err != nil {
re := APIError{
Code: CodeListFailure,
Message: fmt.Sprintf("Unable to list jobs: %v", err),
Hint: "This is most likely a database problem.",
Retry: true,
}
re.Log(account).Report(http.StatusServiceUnavailable, w)
return
}
var response struct {
Jobs []SubmittedJob `json:"jobs"`
}
response.Jobs = results
log.WithFields(log.Fields{
"query": q,
"result count": len(results),
"account": account.Name,
}).Debug("Successful job query.")
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
// JobKillHandler allows a user to prematurely terminate a running job.
func JobKillHandler(c *Context, w http.ResponseWriter, r *http.Request) {
account, err := Authenticate(c, w, r)
if err != nil {
log.WithFields(log.Fields{
"error": err,
}).Error("Authentication failure.")
return
}
if err = r.ParseForm(); err != nil {
APIError{
Code: CodeInvalidJobForm,
Message: fmt.Sprintf("Unable to parse Job: Kill payload as a POST body: %v", err),
Hint: "Please use valid form encoding in your request.",
Retry: false,
}.Log(account).Report(http.StatusBadRequest, w)
return
}
jidstr := r.PostFormValue("jid")
jid, err := strconv.ParseUint(jidstr, 10, 64)
if err != nil {
APIError{
Code: CodeInvalidJobForm,
Message: fmt.Sprintf("Unable to parse Job: Kill payload as a valid JID: %v", err),
Hint: "Please provide a valid integer job ID to Job: Kill.",
Retry: false,
}.Log(account).Report(http.StatusBadRequest, w)
return
}
sudo := r.PostFormValue("sudo") == "true"
query := JobQuery{JIDs: []uint64{jid}}
if !sudo {
query.AccountName = account.Name
}
jobs, err := c.ListJobs(query)
if err != nil {
APIError{
Code: CodeListFailure,
Message: "Unable to list jobs.",
Hint: "This is probably a storage error on our end.",
Retry: true,
}.Log(account).Report(http.StatusInternalServerError, w)
return
}
if len(jobs) == 0 {
APIError{
Code: CodeJobNotFound,
Message: fmt.Sprintf("Unable to find a job with ID [%s].", jid),
Hint: "Make sure that the JID is still valid.",
Retry: false,
}.Log(account).Report(http.StatusNotFound, w)
return
}
if len(jobs) != 1 {
APIError{
Code: CodeWTF,
Message: fmt.Sprintf(
"Job query for JID [%s] on account [%s] returned [%d] results.",
jid, account.Name, len(jobs),
),
Hint: "Duplicate JID. No clue how that happened.",
Retry: false,
}.Log(account).Report(http.StatusInternalServerError, w)
return
}
job := &jobs[0]
job.KillRequested = true
// If the container ID hasn't been assigned yet, the job most likely isn't running.
// If it's already left StatusQueued, let the job runner handle the transition to
// StatusKilled. Otherwise, set it to StatusKilled ourselves to remove it from the queue.
if job.Status == StatusQueued {
job.Status = StatusKilled
}
err = c.UpdateJob(job)
if err != nil {
APIError{
Code: CodeJobUpdateFailure,
Message: fmt.Sprintf("Unable to request a job kill: %v", err),
Hint: "This is probably a storage error on our end.",
Retry: true,
}.Log(account).Report(http.StatusInternalServerError, w)
return
}
if job.ContainerID != "" {
err = c.KillContainer(docker.KillContainerOptions{ID: job.ContainerID})
if err != nil {
APIError{
Code: CodeJobKillFailure,
Message: fmt.Sprintf("Unable to kill a running job: %v", err),
Hint: "The container is misbehaving somehow.",
Retry: true,
}.Log(account).Report(http.StatusInternalServerError, w)
return
}
log.WithFields(log.Fields{
"jid": job.JID,
"account": account.Name,
"sudo": sudo,
}).Info("Running job killed.")
} else {
log.WithFields(log.Fields{
"jid": job.JID,
"account": account.Name,
"sudo": sudo,
}).Info("Job kill requested.")
}
OKResponse(w)
}
// JobKillAllHandler allows a user to terminate all jobs associated with their account.
func JobKillAllHandler(c *Context, w http.ResponseWriter, r *http.Request) {
//
}
// JobQueueStatsHandler allows a user to view statistics about the jobs that they have submitted.
func JobQueueStatsHandler(c *Context, w http.ResponseWriter, r *http.Request) {
//
}