forked from rapidpro/mailroom
/
queue.go
130 lines (103 loc) · 3.13 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package queue
import (
"encoding/json"
"fmt"
"strconv"
"time"
"github.com/gomodule/redigo/redis"
)
// Task is a utility struct for encoding a task
type Task struct {
Type string `json:"type"`
OrgID int `json:"org_id"`
Task json.RawMessage `json:"task"`
}
// Priority is the priority for the task
type Priority int
const (
queuePattern = "%s:%d"
activePattern = "%s:active"
// DefaultPriority is the default priority for tasks
DefaultPriority = Priority(0)
// HighPriority is the highest priority for tasks
HighPriority = Priority(-10000000)
// LowPriority is the lowest priority for tasks
LowPriority = Priority(+10000000)
)
// AddTask adds the passed in task to our queue for execution
func AddTask(rc redis.Conn, queue string, taskType string, orgID int, task interface{}, priority Priority) error {
score := strconv.FormatFloat(float64(time.Now().UnixNano()/int64(time.Microsecond))/float64(1000000)+float64(priority), 'f', 6, 64)
taskBody, err := json.Marshal(task)
if err != nil {
return err
}
payload := Task{
Type: taskType,
OrgID: orgID,
Task: taskBody,
}
jsonPayload, err := json.Marshal(payload)
if err != nil {
return err
}
rc.Send("zadd", fmt.Sprintf(queuePattern, queue, orgID), score, jsonPayload)
rc.Send("zincrby", fmt.Sprintf(activePattern, queue), 0, orgID)
_, err = rc.Do("")
return err
}
var popTask = redis.NewScript(1, `-- KEYS: [QueueName]
-- first get what is the active queue
local result = redis.call("zrange", KEYS[1] .. ":active", 0, 0, "WITHSCORES")
-- nothing? return nothing
local group = result[1]
if not group then
return {"empty", ""}
end
local queue = KEYS[1] .. ":" .. group
-- pop off our queue
local result = redis.call("zrangebyscore", queue, 0, "+inf", "WITHSCORES", "LIMIT", 0, 1)
-- found a result?
if result[1] then
-- then remove it from the queue
redis.call('zremrangebyrank', queue, 0, 0)
-- and add a worker to this queue
redis.call("zincrby", KEYS[1] .. ":active", 1, group)
return {group, result[1]}
else
-- no result found, remove this group from active queues
redis.call("zrem", KEYS[1] .. ":active", group)
return {"retry", ""}
end
`)
// PopNextTask pops the next task off our queue
func PopNextTask(rc redis.Conn, queue string) (*Task, error) {
task := Task{}
for {
values, err := redis.Strings(popTask.Do(rc, queue))
if err != nil {
return nil, err
}
if values[0] == "empty" {
return nil, nil
}
if values[0] == "retry" {
continue
}
err = json.Unmarshal([]byte(values[1]), &task)
return &task, err
}
}
var markComplete = redis.NewScript(2, `-- KEYS: [QueueName] [TaskGroup]
-- decrement our active
local active = tonumber(redis.call("zincrby", KEYS[1] .. ":active", -1, KEYS[2]))
-- reset to zero if we somehow go below
if active < 0 then
redis.call("zadd", KEYS[1] .. ":active", 0, KEYS[2])
end
`)
// MarkTaskComplete marks the passed in task as complete. Callers must call this in order
// to maintain fair workers across orgs
func MarkTaskComplete(rc redis.Conn, queue string, orgID int) error {
_, err := markComplete.Do(rc, queue, strconv.FormatInt(int64(orgID), 10))
return err
}