-
Notifications
You must be signed in to change notification settings - Fork 7
/
queue.go
127 lines (114 loc) · 3.41 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package main
import (
"context"
"fmt"
"time"
"github.com/byte-power/gorich/cloud"
"github.com/byte-power/gorich/cloud/queue"
)
// Configure Addr/Addrs, Password to run standalone/cluster redis example.
// Configure token, url, topic_name and subscription_name to run tencentcloud example.
// Configure secret_id, secret_key, region, and queue_name to run this example.
func main() {
// Redis 单节点
//optionForBaseRedis := queue.StandaloneRedisQueueOption{
// Addr: "localhost:6379",
// Password: "",
// ConsumerGroup: "save_task_consumer_group",
// Idle: 10,
//}
// Redis 集群
optionForBaseRedis := queue.ClusterRedisQueueOption{
Addrs: []string{
"localhost:7000",
"localhost:7001",
"localhost:7002",
"localhost:7003",
"localhost:7004",
"localhost:7005",
},
Password: "",
ConsumerGroup: "save_task_consumer_group",
Idle: 10,
}
queue_examples("test_queue_name", optionForBaseRedis)
optionForTencentCloud := queue.TencentCloudQueueOption{
Token: "access_jwt_token_xxx",
URL: "http://pulsar-xxxxxxxxx.tdmq.ap-gz.public.tencenttdmq.com:8080",
}
topicName := "pulsar-xxxxxx/namespace_name/topic_name"
subscriptionName := "subscription_name"
topicSub := queue.GenerateTopicAndSubName(topicName, subscriptionName)
queue_examples(topicSub, optionForTencentCloud)
optionForAWS := cloud.CommonOption{
Provider: cloud.AWSProvider,
SecretID: "aws_secret_id_xxxx",
SecretKey: "aws_secret_key_xxxx",
Region: "aws_region_xxx",
}
queue_examples("aws_queue_name", optionForAWS)
dialTimeout := 5 * time.Second
clusterRedisQueueOptionV7 := queue.ClusterRedisQueueOptionV7{
ClusterRedisQueueOption: queue.ClusterRedisQueueOption{
Addrs: []string{
"localhost:30001",
"localhost:30002",
"localhost:30003",
},
ConsumerGroup: "save_task_consumer_group_2",
DialTimeout: &dialTimeout,
Idle: 10,
},
}
queue_examples("redis_cluster_queue_v7", clusterRedisQueueOptionV7)
}
func queue_examples(queueOrTopicName string, option cloud.Option) {
service, err := queue.GetQueueService(queueOrTopicName, option)
if err != nil {
fmt.Printf("get queue service error %s %+v %s\n", queueOrTopicName, option, err)
return
}
defer service.Close()
fmt.Printf("get service %+v\n", service)
producer, err := service.CreateProducer()
if err != nil {
fmt.Printf("create producer error %s\n", err)
return
}
defer producer.Close()
consumer, err := service.CreateConsumer()
if err != nil {
fmt.Printf("create consumer error %s\n", err)
return
}
defer consumer.Close()
ts := int(time.Now().Unix())
var messages []string
for i := 0; i < 3; i++ {
messages = append(messages, fmt.Sprintf("message %d", ts+i))
}
for _, message := range messages {
err = producer.SendMessage(context.TODO(), message)
if err != nil {
fmt.Printf("producer send message error %s", err)
return
}
fmt.Printf("producer send message %s\n", message)
}
receivedMsgs, err := consumer.ReceiveMessages(context.TODO(), 10)
if err != nil {
fmt.Printf("receive messages error %s", err)
return
}
for _, message := range receivedMsgs {
fmt.Printf("received message %s\n", message.Body())
}
for _, message := range receivedMsgs {
err := consumer.AckMessage(context.TODO(), message)
if err != nil {
fmt.Printf("ack message error %s %s\n", message.Body(), err)
return
}
fmt.Printf("ack message %s\n", message.Body())
}
}