-
Notifications
You must be signed in to change notification settings - Fork 0
/
job.go
160 lines (134 loc) · 3.79 KB
/
job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package goq
import (
"encoding/base32"
"encoding/json"
"errors"
"time"
)
type Status struct {
Code uint8
Progress uint8
}
type Job struct {
ID string
JSON string
ResultJSON string
Status *Status
processor Processor
queueName string
}
// Method to fail job to failed job set.
// Also set the status Code to 3
func (j *Job) Fail() error {
err := client.SAdd(JOB_FAILED_PREFIX+j.queueName, j.JSON).Err()
if err != nil {
return errors.New("Failed to add to failed jobs set of job " + j.ID + " : " + err.Error())
}
err = j.SetStatus(3, 0)
if err != nil {
return err
}
return nil
}
// Method to set this job Status locally and to redis.
// Code must be 0 for waiting, 1 for working, 2 for completed, and 3 for failed.
// Progress must be 0-100
func (j *Job) SetStatus(code, progress uint8) error {
if code < 0 || code > 3 {
return errors.New("Failed to set status code of job " + j.ID + " : code must be 0-3")
}
if progress < 0 || progress > 100 {
return errors.New("Failed to set status progress of job " + j.ID + " : progress must be 0-100")
}
j.Status.Code = code
j.Status.Progress = progress
statusJSON, err := json.Marshal(j.Status)
if err != nil {
return errors.New("Failed to marshal status of job " + j.ID + " : " + err.Error())
}
err = client.Set(JOB_STATUS_PREFIX+j.ID, string(statusJSON), 0).Err()
if err != nil {
return errors.New("Failed to set status of job " + j.ID + " : " + err.Error())
}
return nil
}
// Method to update this job Status from redis.
func (j *Job) GetStatus() error {
dataJSON, err := client.Get(JOB_STATUS_PREFIX + j.ID).Result()
if err != nil {
return errors.New("Failed to get status of job " + j.ID + " : " + err.Error())
}
err = json.Unmarshal([]byte(dataJSON), j.Status)
if err != nil {
return errors.New("Failed to unmarshal status of job " + j.ID + " : " + err.Error())
}
return nil
}
// Method to save this job result to redis with ttl in seconds.
func (j *Job) SetCache(ttl time.Duration) error {
err := client.Set(JOB_CACHE_PREFIX+j.ID, j.ResultJSON, ttl).Err()
if err != nil {
return errors.New("Failed to set cache of job " + j.ID + " : " + err.Error())
}
return nil
}
// Method to check if this job result is cached.
func (j *Job) IsCached() (bool, error) {
exists, err := client.Exists(JOB_CACHE_PREFIX + j.ID).Result()
if err != nil {
return false, errors.New("Failed to check existence of job " + j.ID + " : " + err.Error())
}
if !exists {
return false, nil
}
return true, nil
}
// Method to load cached job result from redis.
func (j *Job) GetCache() error {
// check if cached or not first
cached, err := j.IsCached()
if err != nil {
return err
}
if !cached {
return errors.New("Failed to get cache of job " + j.ID + " : job is not cached")
}
j.ResultJSON, err = client.Get(JOB_CACHE_PREFIX + j.ID).Result()
if err != nil {
return errors.New("Failed to get cache of job " + j.ID + " : " + err.Error())
}
return nil
}
// Function to get cache of job result json by job JSON.
// Returns existence, the result JSON, and error.
func GetCache(jobJSON string) (bool, string, error) {
id := base32.StdEncoding.EncodeToString([]byte(jobJSON))
j := &Job{
ID: id,
}
exists, err := j.IsCached()
if err != nil {
return false, "", errors.New("Failed to check if job " + id + " is cached : " + err.Error())
}
if !exists {
return false, "", nil
}
err = j.GetCache()
if err != nil {
return false, "", errors.New("Failed to get cache of job " + id + " : " + err.Error())
}
return true, j.ResultJSON, nil
}
// Function to get status of job by job JSON
// Returns job status and error
func GetStatus(jobJSON string) (*Status, error) {
id := base32.StdEncoding.EncodeToString([]byte(jobJSON))
j := &Job{
ID: id,
}
err := j.GetStatus()
if err != nil {
return nil, err
}
return j.Status, nil
}