forked from HydroProtocol/hydro-sdk-backend
/
queue.go
89 lines (73 loc) · 1.58 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package common
import (
"context"
"errors"
"fmt"
"github.com/go-redis/redis"
"time"
)
// Iqueue is an interface of common queue service
// You can use your favourite backend to handle messages.
type IQueue interface {
Push([]byte) error
// Pop should not block the current thread all the time.
Pop() ([]byte, error)
}
func InitQueue(config interface{}) (queue IQueue, err error) {
switch c := config.(type) {
case nil:
return nil, fmt.Errorf("Need Config to init queue")
case *RedisQueueConfig:
client := &RedisQueue{}
err = client.Init(c)
if err != nil {
return
}
return client, nil
default:
return nil, fmt.Errorf("Config is not support %v", config)
}
}
// Redis Queue Implement
var EXIT = errors.New("EXIT")
type (
RedisQueue struct {
name string
ctx context.Context
client *redis.Client
}
RedisQueueConfig struct {
Name string
Ctx context.Context
Client *redis.Client
}
)
func (queue *RedisQueue) Push(data []byte) error {
ret := queue.client.LPush(queue.name, data)
return ret.Err()
}
func (queue *RedisQueue) Pop() ([]byte, error) {
for {
select {
case <-queue.ctx.Done():
return nil, EXIT
default:
res, err := queue.client.BRPop(time.Second, queue.name).Result()
if err == redis.Nil {
continue
} else if err != nil {
return nil, err
}
return []byte(res[1]), err
}
}
}
func (queue *RedisQueue) Init(config *RedisQueueConfig) error {
if config.Client == nil {
return fmt.Errorf("No redis Connection")
}
queue.client = config.Client
queue.ctx = config.Ctx
queue.name = config.Name
return nil
}