forked from gocelery/gocelery
-
Notifications
You must be signed in to change notification settings - Fork 0
/
redis_broker.go
164 lines (149 loc) · 4.32 KB
/
redis_broker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
// Copyright (c) 2019 Sick Yoon
// This file is part of gocelery which is released under MIT license.
// See file LICENSE for full license details.
package gocelery
import (
"encoding/json"
"fmt"
"time"
"github.com/gomodule/redigo/redis"
)
// RedisCeleryBroker is celery broker for redis
type RedisCeleryBroker struct {
*redis.Pool
QueueName string
// map[taskName]queueName
TaskQueue map[string]string
}
// NewRedisBroker creates new RedisCeleryBroker with given redis connection pool
func NewRedisBroker(conn *redis.Pool, taskQueue map[string]string) *RedisCeleryBroker {
return &RedisCeleryBroker{
Pool: conn,
QueueName: "celery",
TaskQueue: taskQueue,
}
}
// NewRedisCeleryBroker creates new RedisCeleryBroker based on given uri
//
// Deprecated: NewRedisCeleryBroker exists for historical compatibility
// and should not be used. Use NewRedisBroker instead to create new RedisCeleryBroker.
func NewRedisCeleryBroker(uri string) *RedisCeleryBroker {
return &RedisCeleryBroker{
Pool: NewRedisPool(uri),
QueueName: "celery",
}
}
// SendCeleryMessage sends CeleryMessage to redis queue
func (cb *RedisCeleryBroker) SendCeleryMessage(message *CeleryMessage) error {
jsonBytes, err := json.Marshal(message)
if err != nil {
return err
}
conn := cb.Get()
defer conn.Close()
_, err = conn.Do("LPUSH", cb.QueueName, jsonBytes)
if err != nil {
return err
}
return nil
}
// GetCeleryMessage retrieves celery message from redis queue
func (cb *RedisCeleryBroker) GetCeleryMessage() (*CeleryMessage, error) {
conn := cb.Get()
defer conn.Close()
messageJSON, err := conn.Do("BRPOP", cb.QueueName, "1")
if err != nil {
return nil, err
}
if messageJSON == nil {
return nil, fmt.Errorf("null message received from redis")
}
messageList := messageJSON.([]interface{})
if string(messageList[0].([]byte)) != cb.QueueName {
return nil, fmt.Errorf("not a celery message: %v", messageList[0])
}
var message CeleryMessage
if err := json.Unmarshal(messageList[1].([]byte), &message); err != nil {
return nil, err
}
return &message, nil
}
// GetTaskMessage retrieves task message from redis queue
func (cb *RedisCeleryBroker) GetTaskMessage() (*TaskMessage, error) {
celeryMessage, err := cb.GetCeleryMessage()
if err != nil {
return nil, err
}
return celeryMessage.GetTaskMessage(), nil
}
// SendCeleryMessageV2 sends CeleryMessageV2 to redis queue
func (cb *RedisCeleryBroker) SendCeleryMessageV2(message *CeleryMessageV2) error {
jsonBytes, err := json.Marshal(message)
if err != nil {
return err
}
conn := cb.Get()
defer conn.Close()
queueName := cb.QueueName
if newQueueName, ok := cb.TaskQueue[message.Headers.Task]; ok {
queueName = newQueueName
}
// if message.Queue != "" {
// queueName = message.Queue
// }
_, err = conn.Do("LPUSH", queueName, jsonBytes)
if err != nil {
return err
}
return nil
}
// GetCeleryMessageV2 retrieves celery message from redis queue
func (cb *RedisCeleryBroker) GetCeleryMessageV2() (*CeleryMessageV2, error) {
conn := cb.Get()
defer conn.Close()
messageJSON, err := conn.Do("BRPOP", cb.QueueName, "1")
if err != nil {
return nil, err
}
if messageJSON == nil {
return nil, fmt.Errorf("null message received from redis")
}
messageList := messageJSON.([]interface{})
if string(messageList[0].([]byte)) != "celery" {
return nil, fmt.Errorf("not a celery message: %v", messageList[0])
}
var message CeleryMessageV2
if err := json.Unmarshal(messageList[1].([]byte), &message); err != nil {
return nil, err
}
return &message, nil
}
// GetTaskMessageV2 retrieves task message from redis queue
func (cb *RedisCeleryBroker) GetTaskMessageV2() (*TaskMessageV2, error) {
celeryMessage, err := cb.GetCeleryMessageV2()
if err != nil {
return nil, err
}
return celeryMessage.GetTaskMessageV2(), nil
}
// NewRedisPool creates pool of redis connections from given connection string
//
// Deprecated: newRedisPool exists for historical compatibility
// and should not be used. Pool should be initialized outside of gocelery package.
func NewRedisPool(uri string) *redis.Pool {
return &redis.Pool{
MaxIdle: 3,
IdleTimeout: 240 * time.Second,
Dial: func() (redis.Conn, error) {
c, err := redis.DialURL(uri)
if err != nil {
return nil, err
}
return c, err
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
_, err := c.Do("PING")
return err
},
}
}