forked from aliyunmq/mq-http-go-sdk
-
Notifications
You must be signed in to change notification settings - Fork 1
/
message.go
227 lines (199 loc) · 7.14 KB
/
message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
package mq
import (
"encoding/xml"
"strconv"
"strings"
"github.com/gogap/errors"
)
const (
// StartDeliverTime .
StartDeliverTime = "__STARTDELIVERTIME"
// TransCheckImmunityTime .
TransCheckImmunityTime = "__TransCheckT"
// Keys .
Keys = "KEYS"
// SHARDING .
SHARDING = "__SHARDINGKEY"
)
// MessageResponse .
type MessageResponse struct {
XMLName xml.Name `xml:"Message" json:"-"`
// 错误码
Code string `xml:"Code,omitempty" json:"code,omitempty"`
// 错误描述
Message string `xml:"Message,omitempty" json:"message,omitempty"`
// 请求ID
RequestID string `xml:"RequestId,omitempty" json:"request_id,omitempty"`
// 请求HOST
HostID string `xml:"HostId,omitempty" json:"host_id,omitempty"`
}
// ErrorResponse .
type ErrorResponse struct {
XMLName xml.Name `xml:"Error" json:"-"`
// 错误码
Code string `xml:"Code,omitempty" json:"code,omitempty"`
// 错误描述
Message string `xml:"Message,omitempty" json:"message,omitempty"`
// 请求ID
RequestID string `xml:"RequestId,omitempty" json:"request_id,omitempty"`
// 请求HOST
HostID string `xml:"HostId,omitempty" json:"host_id,omitempty"`
}
// PublishMessageRequest .
type PublishMessageRequest struct {
XMLName xml.Name `xml:"Message" json:"-"`
// 消息体
MessageBody string `xml:"MessageBody" json:"message_body"`
// 消息标签
MessageTag string `xml:"MessageTag,omitempty" json:"message_tag,omitempty"`
// 消息属性
Properties map[string]string `xml:"-" json:"-"`
// 消息KEY
MessageKey string `xml:"-" json:"-"`
// 定时消息,单位毫秒(ms),在指定时间戳(当前时间之后)进行投递
StartDeliverTime int64 `xml:"-" json:"-"`
// 在消息属性中添加第一次消息回查的最快时间,单位秒,并且表征这是一条事务消息, 10~300s
TransCheckImmunityTime int `xml:"-" json:"-"`
// 分区顺序消息中区分不同分区的关键字段,sharding key 于普通消息的 key 是完全不同的概念。全局顺序消息,该字段可以设置为任意非空字符串。
ShardingKey string `xml:"-" json:"-"`
// 序列化属性请勿使用
string `xml:"Properties,omitempty" json:"properties,omitempty"`
}
// PublishMessageResponse .
type PublishMessageResponse struct {
MessageResponse
// 消息ID
MessageID string `xml:"MessageId" json:"message_id"`
// 消息体MD5
MessageBodyMD5 string `xml:"MessageBodyMD5" json:"message_body_md5"`
// 消息句柄只有事务消息才有
ReceiptHandle string `xml:"ReceiptHandle" json:"receipt_handle"`
}
// ReceiptHandles .
type ReceiptHandles struct {
XMLName xml.Name `xml:"ReceiptHandles"`
// 消息句柄
ReceiptHandles []string `xml:"ReceiptHandle"`
}
// AckMessageErrorEntry .
type AckMessageErrorEntry struct {
XMLName xml.Name `xml:"Error" json:"-"`
// Ack消息出错的错误码
ErrorCode string `xml:"ErrorCode" json:"error_code"`
// Ack消息出错的错误描述
ErrorMessage string `xml:"ErrorMessage" json:"error_messages"`
// Ack消息出错的消息句柄
ReceiptHandle string `xml:"ReceiptHandle,omitempty" json:"receipt_handle"`
}
// AckMessageErrorResponse .
type AckMessageErrorResponse struct {
XMLName xml.Name `xml:"Errors" json:"-"`
FailedMessages []AckMessageErrorEntry `xml:"Error" json:"errors"`
}
// ConsumeMessageEntry .
type ConsumeMessageEntry struct {
MessageResponse
// 消息ID
MessageID string `xml:"MessageId" json:"message_id"`
// 消息句柄
ReceiptHandle string `xml:"ReceiptHandle" json:"receipt_handle"`
// 消息体MD5
MessageBodyMD5 string `xml:"MessageBodyMD5" json:"message_body_md5"`
// 消息体
MessageBody string `xml:"MessageBody" json:"message_body"`
// 消息发送时间
PublishTime int64 `xml:"PublishTime" json:"publish_time"`
// 下次消费消息的时间(如果这次消费的消息没有Ack)
NextConsumeTime int64 `xml:"NextConsumeTime" json:"next_consume_time"`
// 第一次消费的时间,此值对于顺序消费没有意义
FirstConsumeTime int64 `xml:"FirstConsumeTime" json:"first_consume_time"`
// 消费次数
ConsumedTimes int64 `xml:"ConsumedTimes" json:"consumed_times"`
// 消息标签
MessageTag string `xml:"MessageTag" json:"message_tag"`
// 消息属性
Properties map[string]string `xml:"-" json:"-"`
// 序列化属性请勿使用
PropInner string `xml:"Properties,omitempty" json:"properties,omitempty"`
// 消息KEY
MessageKey string `xml:"-" json:"-"`
// 定时消息,单位毫秒(ms),在指定时间戳(当前时间之后)进行投递
StartDeliverTime int64 `xml:"-" json:"-"`
// 顺序消息分区Key
ShardingKey string `xml:"-" json:"-"`
// 在消息属性中添加第一次消息回查的最快时间,单位秒,并且表征这是一条事务消息
TransCheckImmunityTime int `xml:"-" json:"-"`
}
// ConsumeMessageResponse .
type ConsumeMessageResponse struct {
XMLName xml.Name `xml:"Messages" json:"-"`
Messages []ConsumeMessageEntry `xml:"Message" json:"messages"`
}
// ConstructPubMessage .
func ConstructPubMessage(pubMsgReq *PublishMessageRequest) (err error) {
if pubMsgReq.Properties == nil {
pubMsgReq.Properties = make(map[string]string)
}
if pubMsgReq.MessageKey != "" {
pubMsgReq.Properties[Keys] = pubMsgReq.MessageKey
}
if pubMsgReq.StartDeliverTime > 0 {
pubMsgReq.Properties[StartDeliverTime] = strconv.FormatInt(pubMsgReq.StartDeliverTime, 10)
}
if pubMsgReq.TransCheckImmunityTime > 0 {
pubMsgReq.Properties[TransCheckImmunityTime] = strconv.Itoa(pubMsgReq.TransCheckImmunityTime)
}
if pubMsgReq.ShardingKey != "" {
pubMsgReq.Properties[SHARDING] = pubMsgReq.ShardingKey
}
if pubMsgReq.Properties == nil {
return nil
}
for k, v := range pubMsgReq.Properties {
if ContainsSpecialChar(k) || ContainsSpecialChar(v) {
return ErrMessageProperty.New(errors.Params{"err": k + ":" + v})
}
pubMsgReq.string += k + ":" + v + "|"
}
return nil
}
// ConstructRecMessage .
func ConstructRecMessage(entries *[]ConsumeMessageEntry) {
for i := range *entries {
(*entries)[i].Properties = make(map[string]string)
if (*entries)[i].PropInner == "" {
continue
}
kvArray := strings.Split((*entries)[i].PropInner, "|")
for _, kv := range kvArray {
if kv == "" {
continue
}
kAndV := strings.Split(kv, ":")
if len(kAndV) != 2 || kAndV[0] == "" || kAndV[1] == "" {
continue
}
(*entries)[i].Properties[kAndV[0]] = kAndV[1]
}
(*entries)[i].MessageKey = (*entries)[i].Properties[Keys]
(*entries)[i].StartDeliverTime, _ = strconv.ParseInt((*entries)[i].Properties[StartDeliverTime], 10, 64)
(*entries)[i].TransCheckImmunityTime, _ = strconv.Atoi((*entries)[i].Properties[TransCheckImmunityTime])
(*entries)[i].ShardingKey = (*entries)[i].Properties[SHARDING]
delete((*entries)[i].Properties, Keys)
delete((*entries)[i].Properties, StartDeliverTime)
delete((*entries)[i].Properties, TransCheckImmunityTime)
delete((*entries)[i].Properties, SHARDING)
(*entries)[i].PropInner = ""
}
return
}
// ContainsSpecialChar .
func ContainsSpecialChar(input string) (result bool) {
return strings.Contains(input, "'") ||
strings.Contains(input, "\"") ||
strings.Contains(input, "<") ||
strings.Contains(input, ">") ||
strings.Contains(input, "&") ||
strings.Contains(input, ":") ||
strings.Contains(input, "|")
}