-
Notifications
You must be signed in to change notification settings - Fork 840
/
message.go
160 lines (141 loc) · 4.78 KB
/
message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package storage
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
import (
"encoding/xml"
"fmt"
"net/http"
"net/url"
"strconv"
"time"
)
// Message represents an Azure message.
type Message struct {
Queue *Queue
Text string `xml:"MessageText"`
ID string `xml:"MessageId"`
Insertion TimeRFC1123 `xml:"InsertionTime"`
Expiration TimeRFC1123 `xml:"ExpirationTime"`
PopReceipt string `xml:"PopReceipt"`
NextVisible TimeRFC1123 `xml:"TimeNextVisible"`
DequeueCount int `xml:"DequeueCount"`
}
func (m *Message) buildPath() string {
return fmt.Sprintf("%s/%s", m.Queue.buildPathMessages(), m.ID)
}
// PutMessageOptions is the set of options can be specified for Put Messsage
// operation. A zero struct does not use any preferences for the request.
type PutMessageOptions struct {
Timeout uint
VisibilityTimeout int
MessageTTL int
RequestID string `header:"x-ms-client-request-id"`
}
// Put operation adds a new message to the back of the message queue.
//
// See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Put-Message
func (m *Message) Put(options *PutMessageOptions) error {
query := url.Values{}
headers := m.Queue.qsc.client.getStandardHeaders()
req := putMessageRequest{MessageText: m.Text}
body, nn, err := xmlMarshal(req)
if err != nil {
return err
}
headers["Content-Length"] = strconv.Itoa(nn)
if options != nil {
if options.VisibilityTimeout != 0 {
query.Set("visibilitytimeout", strconv.Itoa(options.VisibilityTimeout))
}
if options.MessageTTL != 0 {
query.Set("messagettl", strconv.Itoa(options.MessageTTL))
}
query = addTimeout(query, options.Timeout)
headers = mergeHeaders(headers, headersFromStruct(*options))
}
uri := m.Queue.qsc.client.getEndpoint(queueServiceName, m.Queue.buildPathMessages(), query)
resp, err := m.Queue.qsc.client.exec(http.MethodPost, uri, headers, body, m.Queue.qsc.auth)
if err != nil {
return err
}
defer drainRespBody(resp)
err = checkRespCode(resp, []int{http.StatusCreated})
if err != nil {
return err
}
err = xmlUnmarshal(resp.Body, m)
if err != nil {
return err
}
return nil
}
// UpdateMessageOptions is the set of options can be specified for Update Messsage
// operation. A zero struct does not use any preferences for the request.
type UpdateMessageOptions struct {
Timeout uint
VisibilityTimeout int
RequestID string `header:"x-ms-client-request-id"`
}
// Update operation updates the specified message.
//
// See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Update-Message
func (m *Message) Update(options *UpdateMessageOptions) error {
query := url.Values{}
if m.PopReceipt != "" {
query.Set("popreceipt", m.PopReceipt)
}
headers := m.Queue.qsc.client.getStandardHeaders()
req := putMessageRequest{MessageText: m.Text}
body, nn, err := xmlMarshal(req)
if err != nil {
return err
}
headers["Content-Length"] = strconv.Itoa(nn)
// visibilitytimeout is required for Update (zero or greater) so set the default here
query.Set("visibilitytimeout", "0")
if options != nil {
if options.VisibilityTimeout != 0 {
query.Set("visibilitytimeout", strconv.Itoa(options.VisibilityTimeout))
}
query = addTimeout(query, options.Timeout)
headers = mergeHeaders(headers, headersFromStruct(*options))
}
uri := m.Queue.qsc.client.getEndpoint(queueServiceName, m.buildPath(), query)
resp, err := m.Queue.qsc.client.exec(http.MethodPut, uri, headers, body, m.Queue.qsc.auth)
if err != nil {
return err
}
defer drainRespBody(resp)
m.PopReceipt = resp.Header.Get("x-ms-popreceipt")
nextTimeStr := resp.Header.Get("x-ms-time-next-visible")
if nextTimeStr != "" {
nextTime, err := time.Parse(time.RFC1123, nextTimeStr)
if err != nil {
return err
}
m.NextVisible = TimeRFC1123(nextTime)
}
return checkRespCode(resp, []int{http.StatusNoContent})
}
// Delete operation deletes the specified message.
//
// See https://msdn.microsoft.com/en-us/library/azure/dd179347.aspx
func (m *Message) Delete(options *QueueServiceOptions) error {
params := url.Values{"popreceipt": {m.PopReceipt}}
headers := m.Queue.qsc.client.getStandardHeaders()
if options != nil {
params = addTimeout(params, options.Timeout)
headers = mergeHeaders(headers, headersFromStruct(*options))
}
uri := m.Queue.qsc.client.getEndpoint(queueServiceName, m.buildPath(), params)
resp, err := m.Queue.qsc.client.exec(http.MethodDelete, uri, headers, nil, m.Queue.qsc.auth)
if err != nil {
return err
}
defer drainRespBody(resp)
return checkRespCode(resp, []int{http.StatusNoContent})
}
type putMessageRequest struct {
XMLName xml.Name `xml:"QueueMessage"`
MessageText string `xml:"MessageText"`
}