forked from denverdino/aliyungo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
client.go
153 lines (133 loc) · 4.13 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package mq
import (
"encoding/json"
"errors"
"fmt"
"strconv"
)
type Client struct {
AccessKey string
SecretKey string
Endpoint string
Topic string
ProducerId string
ConsumerId string
Key string
Tag string
}
type MessageResponse struct {
Body string `json:"body"`
BornTime int64 `json:"bornTime"` // UTC time in Unix
}
func NewClient(ak string, sk string, endpoint string, topic string,
producerId string, consumerId string, key string, tag string) (client *Client) {
client = &Client{
AccessKey: ak,
SecretKey: sk,
Endpoint: endpoint,
Topic: topic,
ProducerId: producerId,
ConsumerId: consumerId,
Key: key,
Tag: tag,
}
return client
}
func getSendUrl(endpoint string, topic string, time int64, tag string, key string) string {
return endpoint + "/message/?topic=" + topic + "&time=" +
strconv.FormatInt(time, 10) + "&tag=" + tag + "&key=" + key
}
func getReceiveUrl(endpoint, topic string, time int64, tag string, num int) string {
return endpoint + "/message/?topic=" + topic + "&time=" +
strconv.FormatInt(time, 10) + "&tag=" + tag + "&num=" + strconv.Itoa(num)
}
func getSendSign(topic string, producerId string, messageBody []byte, time int64, sk string) (sign string) {
signStr := topic + newline + producerId + newline + Md5(messageBody) + newline + strconv.FormatInt(time, 10)
sign = HamSha1(signStr, []byte(sk))
return sign
}
func getReceiveSign(topic string, consumerId string, time int64, sk string) string {
// [topic+”\n”+ cid+”\n”+time]
signStr := topic + newline + consumerId + newline + strconv.FormatInt(time, 10)
return HamSha1(signStr, []byte(sk))
}
func getReceiveHeader(ak, sign, consumerId string) (map[string]string, error) {
if consumerId == "" {
return nil, fmt.Errorf("consumer id is not provided")
}
header := make(map[string]string)
header["AccessKey"] = ak
header["Signature"] = sign
header["ConsumerId"] = consumerId
return header, nil
}
func getSendHeader(ak string, sign string, producerId string) (header map[string]string, err error) {
if producerId == "" {
return nil, fmt.Errorf("producer id is not provided")
}
header = make(map[string]string, 0)
header["AccessKey"] = ak
header["Signature"] = sign
header["ProducerId"] = producerId
return header, nil
}
func (client *Client) Send(time int64, message []byte) (msgId string, err error) {
url := getSendUrl(client.Endpoint, client.Topic, time, client.Tag, client.Key)
sign := getSendSign(client.Topic, client.ProducerId, message, time, client.SecretKey)
header, err := getSendHeader(client.AccessKey, sign, client.ProducerId)
if err != nil {
return "", err
}
response, status, err := httpPost(url, header, message)
if err != nil {
return "", err
}
fmt.Printf("receive message: %s %d", response, status)
statusMessage := getStatusCodeMessage(status)
if statusMessage != "" {
return "", errors.New(statusMessage)
}
var rs interface{}
err = json.Unmarshal(response, &rs)
if err != nil {
return "", err
}
result := rs.(map[string]interface{})
sendStatus := result["sendStatus"].(string)
if sendStatus != "SEND_OK" {
return "", errors.New(sendStatus)
}
return result["msgId"].(string), nil
}
func (client *Client) ReceiveMessage(messageChan chan string, errChan chan error) {
// only listen for the latest message
time := GetCurrentUnixMicro()
url := getReceiveUrl(client.Endpoint, client.Topic, time, client.Tag, 1)
sign := getReceiveSign(client.Topic, client.ConsumerId, time, client.SecretKey)
header, err := getReceiveHeader(client.AccessKey, sign, client.ConsumerId)
if err != nil {
errChan <- err
return
}
response, status, err := HttpGet(url, header)
if err != nil {
errChan <- err
return
}
fmt.Printf("receive message: %s %d", response, status)
statusMessage := getStatusCodeMessage(status)
if statusMessage != "" {
errChan <- errors.New(statusMessage)
return
}
messages := make([]MessageResponse, 0)
json.Unmarshal(response, &messages)
if len(messages) > 0 {
fmt.Printf("size of messages is %d", len(messages))
message := messages[0]
messageChan <- message.Body
} else {
errChan <- fmt.Errorf("no message available")
return
}
}