forked from aliyunmq/mq-http-go-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
mq_producer.go
155 lines (128 loc) · 4.19 KB
/
mq_producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package mq_http_sdk
import (
"fmt"
"strings"
)
// MQ的消息生产者
type MQProducer interface {
// 主题名字
TopicName() string
// 实例ID,可空
InstanceId() string
// 发送消息
PublishMessage(message PublishMessageRequest) (resp PublishMessageResponse, err error)
}
// MQ的事务消息生产者
type MQTransProducer interface {
// 主题名字
TopicName() string
// 实例ID,可空
InstanceId() string
// GroupId,非空
GroupId() string
// 发送消息
PublishMessage(message PublishMessageRequest) (resp PublishMessageResponse, err error)
// 消费事务半消息
ConsumeHalfMessage(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, waitseconds int64)
// 提交事务消息
Commit(receiptHandle string) (err error)
// 回滚事务消息
Rollback(receiptHandle string) (err error)
}
type AliyunMQProducer struct {
topicName string
instanceId string
client *AliyunMQClient
decoder MQDecoder
}
func (p *AliyunMQProducer) TopicName() string {
return p.topicName
}
func (p *AliyunMQProducer) InstanceId() string {
return p.instanceId
}
func (p *AliyunMQProducer) PublishMessage(message PublishMessageRequest) (resp PublishMessageResponse, err error) {
resourceBuilder := strings.Builder{}
if p.instanceId != "" {
resourceBuilder.WriteString(fmt.Sprintf("topics/%s/%s?ns=%s", p.topicName, "messages", p.instanceId))
} else {
resourceBuilder.WriteString(fmt.Sprintf("topics/%s/%s", p.topicName, "messages"))
}
err = ConstructPubMessage(&message)
if err != nil {
return
}
_, err = p.client.Send(p.decoder, POST, nil, message, resourceBuilder.String(), &resp)
return
}
type AliyunMQTransProducer struct {
groupId string
producer *AliyunMQProducer
consumeDecoder MQDecoder
}
func (p *AliyunMQTransProducer) GroupId() string {
return p.groupId
}
func (p *AliyunMQTransProducer) TopicName() string {
return p.producer.topicName
}
func (p *AliyunMQTransProducer) InstanceId() string {
return p.producer.instanceId
}
func (p *AliyunMQTransProducer) PublishMessage(message PublishMessageRequest) (resp PublishMessageResponse, err error) {
resp, err = p.producer.PublishMessage(message)
return
}
func (p *AliyunMQTransProducer) ConsumeHalfMessage(respChan chan ConsumeMessageResponse, errChan chan error, numOfMessages int32, waitseconds int64) {
if numOfMessages <= 0 {
numOfMessages = DefaultNumOfMessages
}
if numOfMessages > 16 {
numOfMessages = DefaultNumOfMessages
}
resourceBuilder := strings.Builder{}
if p.InstanceId() != "" {
resourceBuilder.WriteString(fmt.Sprintf("topics/%s/%s?consumer=%s&ns=%s&numOfMessages=%d&trans=pop",
p.TopicName(), "messages", p.groupId, p.InstanceId(), numOfMessages))
} else {
resourceBuilder.WriteString(fmt.Sprintf("topics/%s/%s?consumer=%s&numOfMessages=%d&trans=pop",
p.TopicName(), "messages", p.groupId, numOfMessages))
}
if waitseconds > 0 && waitseconds <= 30 {
resourceBuilder.WriteString(fmt.Sprintf("&waitseconds=%d", waitseconds))
}
resp := ConsumeMessageResponse{}
_, err := p.producer.client.Send(p.consumeDecoder, GET, nil, nil, resourceBuilder.String(), &resp)
if err != nil {
errChan <- err
} else {
ConstructRecMessage(&resp.Messages)
respChan <- resp
}
return
}
func (p *AliyunMQTransProducer) Commit(receiptHandle string) (err error) {
err = CommitOrRollback(p, receiptHandle, "commit")
return
}
func (p *AliyunMQTransProducer) Rollback(receiptHandle string) (err error) {
err = CommitOrRollback(p, receiptHandle, "rollback")
return
}
func CommitOrRollback(p *AliyunMQTransProducer, receiptHandle string, trans string) (err error) {
if receiptHandle == "" {
return
}
handlers := ReceiptHandles{}
handlers.ReceiptHandles = append(handlers.ReceiptHandles, receiptHandle)
resourceBuilder := strings.Builder{}
if p.InstanceId() != "" {
resourceBuilder.WriteString(fmt.Sprintf("topics/%s/%s?consumer=%s&ns=%s&trans=%s",
p.TopicName(), "messages", p.groupId, p.InstanceId(), trans))
} else {
resourceBuilder.WriteString(fmt.Sprintf("topics/%s/%s?consumer=%s&trans=%s",
p.TopicName(), "messages", p.InstanceId(), trans))
}
_, err = p.producer.client.Send(NewAliyunMQAckMsgDecoder(), DELETE, nil, handlers, resourceBuilder.String(), nil)
return
}