-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
71 lines (57 loc) · 1.59 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package main
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
"os"
"strconv"
"sync"
)
var topic = "tiger_topic_01"
var nameSrv, _ = primitive.NewNamesrvAddr("127.0.0.1:9876")
var producerGroupName = "tiger_producer_group_01"
var consumerGroupName = "tiger_consumer_group_02"
var ctx = context.Background()
var maxMessageCount = 10
var locker = sync.WaitGroup{}
func main() {
if len(os.Args) > 0 {
nameSrv, _ = primitive.NewNamesrvAddr(os.Args[1])
}
locker.Add(maxMessageCount)
startOneProducer()
startOneConsumer()
locker.Wait()
}
func startOneConsumer() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName(consumerGroupName),
consumer.WithNameServer(nameSrv),
consumer.WithConsumeFromWhere(consumer.ConsumeFromLastOffset),
)
c.Subscribe(topic, consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for _, msg := range msgs {
fmt.Println(fmt.Sprintf("[消费消息] %s , %s", msg.MsgId, string(msg.Body)))
}
return consumer.ConsumeSuccess, nil
})
c.Start()
}
func startOneProducer() {
p, _ := rocketmq.NewProducer(
producer.WithNameServer(nameSrv),
producer.WithGroupName(producerGroupName),
)
p.Start()
for i := 1; i <= maxMessageCount; i++ {
msg := &primitive.Message{
Topic: topic,
Body: []byte("Hello RocketMQ Go Client! " + strconv.Itoa(i)),
}
res, _ := p.SendSync(ctx, msg)
fmt.Println(res.String())
}
}