/
producer.go
94 lines (85 loc) · 2.92 KB
/
producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
//Package queue 队列对象
//非常适合作为简单的生产者消费者模式的中间件
package queuehelper
import (
"context"
"time"
"github.com/Golang-Tools/idgener"
"github.com/Golang-Tools/optparams"
"github.com/Golang-Tools/redishelper/v2/clientIdhelper"
"github.com/Golang-Tools/redishelper/v2/pchelper"
"github.com/go-redis/redis/v8"
)
//Producer 队列的生产者对象
type Producer struct {
cli redis.UniversalClient
opt Options
*pchelper.ProducerConsumerABC
*clientIdhelper.ClientIDAbc
}
//NewProducer 创建一个新的队列生产者对象
//@params k *clientkey.ClientKey redis客户端的键对象
//@params opts ...optparams.Option[Options] 生产者的配置
func NewProducer(cli redis.UniversalClient, opts ...optparams.Option[Options]) (*Producer, error) {
c := new(Producer)
c.cli = cli
c.opt = defaultOptions
optparams.GetOption(&c.opt, opts...)
meta, err := clientIdhelper.New(c.opt.ClientIDOpts...)
if err != nil {
return nil, err
}
c.ClientIDAbc = meta
pc := pchelper.New(c.opt.ProducerConsumerOpts...)
c.ProducerConsumerABC = pc
return c, nil
}
//Client 获取连接的redis客户端
func (p *Producer) Client() redis.UniversalClient {
return p.cli
}
//Publish 向队列中放入数据
//@params ctx context.Context 请求的上下文
//@params topic string 发送去的指定双端队列
//@params payload interface{} 发送的消息负载,负载支持string,bytes,bool,number,以及可以被json或者msgpack序列化的对象
//@params opts ...optparams.Option[pchelper.PublishOptions] 无效
func (p *Producer) Publish(ctx context.Context, topic string, payload interface{}, opts ...optparams.Option[pchelper.PublishOptions]) error {
payloadbytes, err := pchelper.ToBytes(p.ProducerConsumerABC.Opt.SerializeProtocol, payload)
if err != nil {
return err
}
_, err = p.cli.LPush(ctx, topic, payloadbytes).Result()
return err
}
//PubEvent 向队列中放入事件数据
//@params ctx context.Context 请求的上下文
//@params topic string 发送去的指定频道
//@params payload []byte 发送的消息负载
//@params opts ...optparams.Option[pchelper.PublishOptions] 无效
//@returns *pchelper.Event 发送出去的消息对象
func (p *Producer) PubEvent(ctx context.Context, topic string, payload interface{}, opts ...optparams.Option[pchelper.PublishOptions]) (*pchelper.Event, error) {
msg := pchelper.Event{
EventTime: time.Now().UnixNano(),
Payload: payload,
Topic: topic,
}
if p.ClientID() != "" {
msg.Sender = p.ClientID()
}
mid, err := idgener.Next(p.ProducerConsumerABC.Opt.UUIDType)
if err != nil {
return nil, err
}
msg.EventID = mid
err = p.Publish(ctx, topic, msg)
if err != nil {
return nil, err
}
return &msg, nil
}
// Len 查看当前队列长度
//@params ctx context.Context 请求的上下文
//@params topic string 指定要查看的队列名
func (p *Producer) Len(ctx context.Context, topic string) (int64, error) {
return p.cli.LLen(ctx, topic).Result()
}