Skip to content

Commit 20c3761

Browse files
author
yanglong.2017
committed
feat: simple delay queue
Change-Id: I8b51bd35c4860843a592a6ef74b15e31350861d1
1 parent a4a27c5 commit 20c3761

File tree

10 files changed

+502
-0
lines changed

10 files changed

+502
-0
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,4 @@
1212

1313
# Project-local glide cache, RE: https://github.com/Masterminds/glide/issues/736
1414
.glide/
15+
/later

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
## later
2+
later is a redis base delay queue .

main.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package main
2+
3+
import (
4+
"flag"
5+
6+
"github.com/btfak/later/queue"
7+
log "github.com/sirupsen/logrus"
8+
)
9+
10+
var (
11+
redisURL = flag.String("redis", "redis://127.0.0.1:6379/0", "redis address")
12+
address = flag.String("address", ":8080", "serve listen address")
13+
)
14+
15+
func init() {
16+
log.SetFormatter(&log.TextFormatter{FullTimestamp: true})
17+
flag.Parse()
18+
}
19+
20+
func main() {
21+
err := queue.InitRedis(*redisURL)
22+
if err != nil {
23+
log.Fatal(err)
24+
}
25+
queue.RunWorker()
26+
log.Infof("server listen on :%v", *address)
27+
err = queue.ListenAndServe(*address)
28+
if err != nil {
29+
log.Fatal(err)
30+
}
31+
}

queue/callback.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package queue
2+
3+
import (
4+
"bytes"
5+
"encoding/json"
6+
"io/ioutil"
7+
"net/http"
8+
"time"
9+
10+
log "github.com/sirupsen/logrus"
11+
)
12+
13+
var HttpClient = &http.Client{
14+
Timeout: time.Second * 3,
15+
Transport: &http.Transport{
16+
MaxIdleConnsPerHost: 10,
17+
MaxIdleConns: 1024,
18+
IdleConnTimeout: time.Minute * 5,
19+
},
20+
}
21+
22+
type callbackRequest struct {
23+
ID string `json:"id"`
24+
Topic string `json:"topic"`
25+
Content string `json:"content"`
26+
}
27+
28+
type callbackResponse struct {
29+
Code int `json:"code"`
30+
}
31+
32+
const (
33+
CodeSuccess = 100
34+
CodeTooManyRequest = 101
35+
)
36+
37+
func post(task *Task) (int, error) {
38+
request := callbackRequest{
39+
ID: task.ID,
40+
Topic: task.Topic,
41+
Content: task.Content,
42+
}
43+
data, err := json.Marshal(request)
44+
if err != nil {
45+
log.WithError(err).Error("json marshal fail")
46+
return 0, err
47+
}
48+
49+
content := bytes.NewBuffer(data)
50+
resp, err := HttpClient.Post(task.Callback, "application/json", content)
51+
if err != nil {
52+
log.WithError(err).Error("http post fail")
53+
return 0, err
54+
}
55+
defer resp.Body.Close()
56+
57+
result, err := ioutil.ReadAll(resp.Body)
58+
if err != nil {
59+
log.WithError(err).Error("io read from backend fail")
60+
return 0, err
61+
}
62+
var response callbackResponse
63+
err = json.Unmarshal(result, &response)
64+
if err != nil {
65+
log.WithError(err).Error("json unmarshal fail")
66+
return 0, err
67+
}
68+
return response.Code, nil
69+
}

queue/config.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package queue
2+
3+
import (
4+
"time"
5+
)
6+
7+
var (
8+
RedisConnectTimeout = 50 * time.Millisecond
9+
RedisReadTimeout = 50 * time.Millisecond
10+
RedisWriteTimeout = 100 * time.Millisecond
11+
RedisPoolMaxIdle = 200
12+
RedisPoolIdleTimeout = 3 * time.Minute
13+
)
14+
15+
var (
16+
TaskTTL = 24 * 3600
17+
ZrangeOffset = 20
18+
DelayWorkerInterval = 100 * time.Millisecond
19+
UnackWorkerInterval = 1000 * time.Millisecond
20+
ErrorWorkerInterval = 1000 * time.Millisecond
21+
RetryInterval = 10 //second
22+
)

queue/hack.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package queue
2+
3+
import (
4+
"reflect"
5+
"unsafe"
6+
)
7+
8+
// No copy to change slice to string, use your own risk
9+
func String(b []byte) (s string) {
10+
pbytes := (*reflect.SliceHeader)(unsafe.Pointer(&b))
11+
pstring := (*reflect.StringHeader)(unsafe.Pointer(&s))
12+
pstring.Data = pbytes.Data
13+
pstring.Len = pbytes.Len
14+
return
15+
}
16+
17+
// No copy to change string to slice, use your own risk
18+
func Slice(s string) (b []byte) {
19+
pbytes := (*reflect.SliceHeader)(unsafe.Pointer(&b))
20+
pstring := (*reflect.StringHeader)(unsafe.Pointer(&s))
21+
pbytes.Data = pstring.Data
22+
pbytes.Len = pstring.Len
23+
pbytes.Cap = pstring.Len
24+
return
25+
}

queue/redis.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package queue
2+
3+
import (
4+
"github.com/garyburd/redigo/redis"
5+
)
6+
7+
var pool *redis.Pool
8+
9+
func InitRedis(address string) error {
10+
dial := func() (redis.Conn, error) {
11+
return redis.DialURL(address,
12+
redis.DialConnectTimeout(RedisConnectTimeout),
13+
redis.DialReadTimeout(RedisReadTimeout),
14+
redis.DialWriteTimeout(RedisWriteTimeout))
15+
}
16+
pool = &redis.Pool{
17+
MaxIdle: RedisPoolMaxIdle,
18+
IdleTimeout: RedisPoolIdleTimeout,
19+
Dial: dial,
20+
}
21+
return nil
22+
}

queue/server.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package queue
2+
3+
import (
4+
"encoding/json"
5+
"io/ioutil"
6+
"net/http"
7+
"time"
8+
9+
"github.com/pborman/uuid"
10+
log "github.com/sirupsen/logrus"
11+
)
12+
13+
type CreateTaskRequest struct {
14+
// Topic use to classify tasks
15+
Topic string `json:"topic"`
16+
// Delay is the number of seconds that should elapse before the task execute
17+
Delay int64 `json:"delay"`
18+
// Retry is max deliver retry times
19+
Retry int `json:"retry"`
20+
// Callback is the deliver address
21+
Callback string `json:"callback"`
22+
// Content is the task content to deliver
23+
Content string `json:"content"`
24+
}
25+
26+
type CreateTaskResponse struct {
27+
ID string `json:"id"`
28+
}
29+
30+
func ListenAndServe(addr string) error {
31+
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
32+
if r.Method != http.MethodPost {
33+
w.WriteHeader(400)
34+
return
35+
}
36+
var request CreateTaskRequest
37+
data, err := ioutil.ReadAll(r.Body)
38+
if err != nil {
39+
log.WithError(err).Error("io read from frontend fail")
40+
w.WriteHeader(500)
41+
return
42+
}
43+
err = json.Unmarshal(data, &request)
44+
if err != nil {
45+
log.WithError(err).Error("json unmarshal fail")
46+
w.WriteHeader(400)
47+
return
48+
}
49+
task := &Task{
50+
ID: uuid.New(),
51+
Topic: request.Topic,
52+
ExecuteTime: time.Now().Unix() + request.Delay,
53+
MaxRetry: request.Retry,
54+
Callback: request.Callback,
55+
Content: request.Content,
56+
CreatTime: time.Now().Unix(),
57+
}
58+
err = createTask(task)
59+
if err != nil {
60+
log.WithError(err).Error("create task fail")
61+
w.WriteHeader(500)
62+
return
63+
}
64+
response := CreateTaskResponse{ID: task.ID}
65+
respData, err := json.Marshal(response)
66+
if err != nil {
67+
log.WithError(err).Error("json marshal fail")
68+
w.WriteHeader(500)
69+
return
70+
}
71+
w.Write(respData)
72+
})
73+
return http.ListenAndServe(addr, nil)
74+
}

queue/task.go

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
package queue
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
7+
"github.com/garyburd/redigo/redis"
8+
)
9+
10+
// Task is the task to execute
11+
type Task struct {
12+
// ID is a global unique id
13+
ID string
14+
// Topic use to classify tasks
15+
Topic string
16+
// ExecuteTime is the time to deliver
17+
ExecuteTime int64
18+
// MaxRetry is max deliver retry times
19+
MaxRetry int
20+
//HasRetry is the current retry times
21+
HasRetry int
22+
// Callback is the deliver address
23+
Callback string
24+
// Content is the task content to deliver
25+
Content string
26+
// CreatTime is the time task created
27+
CreatTime int64
28+
}
29+
30+
const (
31+
DelayBucket = "later_delay"
32+
UnackBucket = "later_unack"
33+
ErrorBucket = "later_error"
34+
)
35+
36+
func createTask(task *Task) error {
37+
data, err := json.Marshal(task)
38+
if err != nil {
39+
return err
40+
}
41+
c := pool.Get()
42+
defer c.Close()
43+
_, err = c.Do("SET", task.ID, String(data), "EX", TaskTTL)
44+
if err != nil {
45+
return err
46+
}
47+
_, err = c.Do("ZADD", DelayBucket, task.ExecuteTime, task.ID)
48+
return err
49+
}
50+
51+
func updateTask(task *Task) error {
52+
data, err := json.Marshal(task)
53+
if err != nil {
54+
return err
55+
}
56+
c := pool.Get()
57+
defer c.Close()
58+
ttl, err := redis.Int(c.Do("TTL", task.ID))
59+
if err != nil {
60+
return err
61+
}
62+
_, err = c.Do("SET", task.ID, String(data), "EX", ttl)
63+
return err
64+
}
65+
66+
func getTask(id string) (*Task, error) {
67+
c := pool.Get()
68+
defer c.Close()
69+
data, err := redis.String(c.Do("GET", id))
70+
if err != nil {
71+
return nil, err
72+
}
73+
var task Task
74+
err = json.Unmarshal(Slice(data), &task)
75+
return &task, err
76+
}
77+
78+
func getTasks(bucket string, begin int64, end int64) ([]string, error) {
79+
c := pool.Get()
80+
defer c.Close()
81+
return redis.Strings(c.Do("ZRANGEBYSCORE", bucket, begin, end, "LIMIT", "0", fmt.Sprintf("%v", ZrangeOffset)))
82+
}
83+
84+
func delayToUnack(id string, score int64) (bool, error) {
85+
return bucketTransfer(DelayBucket, UnackBucket, id, score)
86+
}
87+
88+
func unackToDelay(id string, score int64) (bool, error) {
89+
return bucketTransfer(UnackBucket, DelayBucket, id, score)
90+
}
91+
92+
func errorToDelay(id string, score int64) (bool, error) {
93+
return bucketTransfer(ErrorBucket, DelayBucket, id, score)
94+
}
95+
96+
func bucketTransfer(from string, to string, id string, score int64) (bool, error) {
97+
c := pool.Get()
98+
defer c.Close()
99+
reply, err := redis.Int(c.Do("ZADD", to, score, id))
100+
if err != nil {
101+
return false, err
102+
}
103+
if reply == 0 {
104+
return false, nil
105+
}
106+
_, err = c.Do("ZREM", from, id)
107+
return true, err
108+
}
109+
110+
func unackToError(id string, score int64) error {
111+
c := pool.Get()
112+
defer c.Close()
113+
_, err := c.Do("ZADD", ErrorBucket, score, id)
114+
if err != nil {
115+
return err
116+
}
117+
_, err = c.Do("ZREM", UnackBucket, id)
118+
return err
119+
}
120+
121+
func deleteTask(id string) error {
122+
c := pool.Get()
123+
defer c.Close()
124+
_, err := c.Do("DEL", id)
125+
if err != nil {
126+
return err
127+
}
128+
_, err = c.Do("ZREM", DelayBucket, id)
129+
if err != nil {
130+
return err
131+
}
132+
_, err = c.Do("ZREM", UnackBucket, id)
133+
if err != nil {
134+
return err
135+
}
136+
_, err = c.Do("ZREM", ErrorBucket, id)
137+
return err
138+
}

0 commit comments

Comments
 (0)