/
job.go
129 lines (109 loc) · 3.18 KB
/
job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package web
import (
"encoding/json"
"errors"
"net/http"
"github.com/gorilla/mux"
)
func (app *Application) serveJob(w http.ResponseWriter, req *http.Request) error {
if req.Method != "POST" {
return errMethodNotAllowed
}
vars := mux.Vars(req)
var job IncomingJob
decoder := json.NewDecoder(req.Body)
decoder.UseNumber()
if err := decoder.Decode(&job); err != nil {
return errBadRequest.WithDetail(err.Error())
}
if err := job.DecodePayload(); err != nil {
return errBadRequest.WithDetail(err.Error())
}
if job.URLField == "" {
return errBadRequest.WithDetail("Missing field: url")
}
job.CategoryField = vars["category"]
r, err := app.Service.Push(&job)
if err != nil {
return err
}
result := PushResult{r.ID, r.QueueName, job}
j, err := json.Marshal(&result)
if err != nil {
return err
}
writeJSON(w, j)
return nil
}
// IncomingJob describes a job to be pushed in a queue.
type IncomingJob struct {
CategoryField string `json:"category"`
URLField string `json:"url"`
PayloadField json.RawMessage `json:"payload"`
payloadField string
RunAfterField uint `json:"run_after"` // seconds
TimeoutField uint `json:"timeout"` // seconds
RetryDelayField uint `json:"retry_delay"` // seconds
MaxRetriesField uint `json:"max_retries"`
}
// PushResult describes a job pushed to a queue.
type PushResult struct {
ID uint64 `json:"id"`
QueueName string `json:"queue_name"`
IncomingJob
}
// Category returns the category of the job.
func (job *IncomingJob) Category() string {
return job.CategoryField
}
// URL returns the URL of the job.
func (job *IncomingJob) URL() string {
return job.URLField
}
// DecodePayload decodes PayloadField of the job.
//
// If PayloadField starts and ends with ", then it is decoded as a
// JSON string. If PayloadField is "null" then it is decoded to an
// empty string. Otherwise, the decoded value is the raw string of
// PayloadField.
//
// The decoded value can be retrieved by Payload() method.
func (job *IncomingJob) DecodePayload() error {
payload := job.PayloadField
if len(payload) > 0 && payload[0] == '"' && payload[len(payload)-1] == '"' {
var buf string
err := json.Unmarshal(payload, &buf)
if err != nil {
return errors.New("The payload seems to be a string but is broken")
}
job.payloadField = buf
} else if len(payload) == 4 && payload[0] == 'n' && payload[1] == 'u' && payload[2] == 'l' && payload[3] == 'l' {
job.payloadField = ""
} else {
job.payloadField = string(payload)
}
return nil
}
// Payload returns a decoded value of PayloadField.
func (job *IncomingJob) Payload() string {
if job.payloadField == "" {
job.DecodePayload()
}
return job.payloadField
}
// NextDelay returns the delay for a next try of the job.
func (job *IncomingJob) NextDelay() uint64 {
return uint64(job.RunAfterField * 1000)
}
// RetryCount returns the max retries of the job.
func (job *IncomingJob) RetryCount() uint {
return job.MaxRetriesField
}
// RetryDelay returns the delay for retries of the job.
func (job *IncomingJob) RetryDelay() uint {
return job.RetryDelayField
}
// Timeout returns the timeout of the job.
func (job *IncomingJob) Timeout() uint {
return job.TimeoutField
}