-
Notifications
You must be signed in to change notification settings - Fork 0
/
rmq.go
132 lines (121 loc) · 3.21 KB
/
rmq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package mq
import (
"context"
"fmt"
"time"
"github.com/apache/rocketmq-clients/golang"
"github.com/apache/rocketmq-clients/golang/credentials"
"github.com/gogf/gf/v2/frame/g"
)
// Package main implements a simple producer to send message.
var mqProducer golang.Producer
var endpoint string
var defaultTopic string
var defaultGroupName string
var config golang.Config
var awaitDuration = time.Second * 5
var maxMessageNum int32 = 200
// invisibleDuration should > 20s
var invisibleDuration = time.Second * 20
func InitMQ(ctx context.Context) {
_mqProducer, err := GetMqProducer(ctx)
if err != nil {
panic(err)
}
mqProducer = _mqProducer
if err = mqProducer.Start(); err != nil {
panic(err)
}
}
func InitMQConfig(ctx context.Context) {
if endpointVal, err := g.Cfg().Get(ctx, "mq.endpoint"); err != nil {
panic("not found mq nameserver")
} else {
endpoint = endpointVal.String()
}
if defaultTopicVal, err := g.Cfg().Get(ctx, "mq.defaultTopic"); err != nil {
panic("not found mq defaultTopic")
} else {
defaultTopic = defaultTopicVal.String()
}
if defaultGroupNameVal, err := g.Cfg().Get(ctx, "mq.defaultGroupName"); err != nil {
panic("not found mq defaultTopic")
} else {
defaultGroupName = defaultGroupNameVal.String()
}
config = golang.Config{
Endpoint: endpoint,
ConsumerGroup: defaultGroupName,
Credentials: &credentials.SessionCredentials{
AccessKey: "",
AccessSecret: "",
},
}
}
func InitDefaultSimpleConsumer(ctx context.Context, fn func(m *golang.MessageView) error) {
_mqConsumer, err := GetMqSimpleConsumer(ctx)
if err != nil {
panic(err)
}
if err := _mqConsumer.Start(); err != nil {
panic(err)
}
go func() {
for {
mvs, err := _mqConsumer.Receive(ctx, maxMessageNum, invisibleDuration)
if err != nil {
fmt.Println(err)
}
// ack message
for _, mv := range mvs {
if err := fn(mv); err != nil {
g.Log().Infof(ctx, "Receive Handle error: %v", err)
continue
} else {
_mqConsumer.Ack(ctx, mv)
}
}
time.Sleep(time.Second * 3)
}
}()
}
func GetMqSimpleConsumer(ctx context.Context) (simpleConsumer golang.SimpleConsumer, err error) {
InitMQConfig(ctx)
return golang.NewSimpleConsumer(&config,
golang.WithAwaitDuration(awaitDuration),
golang.WithSubscriptionExpressions(map[string]*golang.FilterExpression{
defaultTopic: golang.SUB_ALL,
}))
}
func GetMqPushConsumer(ctx context.Context) (simpleConsumer golang.SimpleConsumer, err error) {
InitMQConfig(ctx)
return golang.NewSimpleConsumer(&config,
golang.WithAwaitDuration(awaitDuration),
golang.WithSubscriptionExpressions(map[string]*golang.FilterExpression{
defaultTopic: golang.SUB_ALL,
}))
}
func GetMqProducer(ctx context.Context) (mqProducer golang.Producer, err error) {
InitMQConfig(ctx)
return golang.NewProducer(
&config,
golang.WithTopics(defaultTopic),
)
}
func SendDefaultMessage(ctx context.Context, tag string, key string, data []byte) (bool, error) {
msg := &golang.Message{
Topic: defaultTopic,
Body: data,
}
// set keys and tag
msg.SetKeys(key)
msg.SetTag(tag)
// send message in sync
resp, err := mqProducer.Send(ctx, msg)
if err != nil {
return false, err
} else {
g.Log().Infof(ctx, "resp: %v", resp)
}
return true, nil
}