-
Notifications
You must be signed in to change notification settings - Fork 54
/
client.go
298 lines (246 loc) · 9.89 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
/********************************************************************************
* Copyright 2019 Dell Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*******************************************************************************/
package mqtt
import (
"crypto/tls"
"encoding/json"
"fmt"
"time"
"github.com/edgexfoundry/go-mod-messaging/v2/internal/pkg"
"github.com/edgexfoundry/go-mod-messaging/v2/pkg/types"
pahoMqtt "github.com/eclipse/paho.mqtt.golang"
)
// ClientCreator defines the function signature for creating an MQTT client.
type ClientCreator func(config types.MessageBusConfig, handler pahoMqtt.OnConnectHandler) (pahoMqtt.Client, error)
// MessageMarshaller defines the function signature for marshaling structs into []byte.
type MessageMarshaller func(v interface{}) ([]byte, error)
// MessageUnmarshaller defines the function signature for unmarshaling []byte into structs.
type MessageUnmarshaller func(data []byte, v interface{}) error
// Client facilitates communication to an MQTT server and provides functionality needed to send and receive MQTT
// messages.
type Client struct {
creator ClientCreator
configuration types.MessageBusConfig
mqttClient pahoMqtt.Client
marshaller MessageMarshaller
unmarshaller MessageUnmarshaller
activeSubscriptions []activeSubscription
}
type activeSubscription struct {
topic string
qos byte
handler pahoMqtt.MessageHandler
errors chan error
}
// NewMQTTClient constructs a new MQTT client based on the options provided.
func NewMQTTClient(config types.MessageBusConfig) (*Client, error) {
client := &Client{
creator: DefaultClientCreator(),
configuration: config,
marshaller: json.Marshal,
unmarshaller: json.Unmarshal,
}
return client, nil
}
// NewMQTTClientWithCreator constructs a new MQTT client based on the options and ClientCreator provided.
func NewMQTTClientWithCreator(
config types.MessageBusConfig,
marshaller MessageMarshaller,
unmarshaller MessageUnmarshaller,
creator ClientCreator) (*Client, error) {
client := &Client{
creator: creator,
configuration: config,
marshaller: marshaller,
unmarshaller: unmarshaller,
}
return client, nil
}
// Connect establishes a connection to a MQTT server.
// This must be called before any other functionality provided by the Client.
func (mc *Client) Connect() error {
if mc.mqttClient == nil {
// Move created MQTT Client here since we need to set the onConnectHandler which needs to have access to
// the Client's activeSubscriptions. This was not possible from the factory method.
mqttClient, err := mc.creator(mc.configuration, mc.onConnectHandler)
if err != nil {
return err
}
mc.mqttClient = mqttClient
}
// Avoid reconnecting if already connected.
if mc.mqttClient.IsConnected() {
return nil
}
optionsReader := mc.mqttClient.OptionsReader()
return getTokenError(
mc.mqttClient.Connect(),
optionsReader.ConnectTimeout(),
ConnectOperation,
"Unable to connect")
}
func (mc *Client) onConnectHandler(_ pahoMqtt.Client) {
optionsReader := mc.mqttClient.OptionsReader()
// activeSubscriptions will be empty on the first connection.
// On a re-connect is when the subscriptions must be re-created.
for _, subscription := range mc.activeSubscriptions {
token := mc.mqttClient.Subscribe(subscription.topic, subscription.qos, subscription.handler)
message := fmt.Sprintf("Failed to re-create subscription for topic=%s", subscription.topic)
err := getTokenError(token, optionsReader.ConnectTimeout(), SubscribeOperation, message)
if err != nil {
subscription.errors <- err
}
}
}
// Publish sends a message to the connected MQTT server.
func (mc *Client) Publish(message types.MessageEnvelope, topic string) error {
marshaledMessage, err := mc.marshaller(message)
if err != nil {
return NewOperationErr(PublishOperation, err.Error())
}
optionsReader := mc.mqttClient.OptionsReader()
return getTokenError(
mc.mqttClient.Publish(
topic,
optionsReader.WillQos(),
optionsReader.WillRetained(),
marshaledMessage),
optionsReader.ConnectTimeout(),
PublishOperation,
"Unable to publish message")
}
// Subscribe creates a subscription for the specified topics.
func (mc *Client) Subscribe(topics []types.TopicChannel, messageErrors chan error) error {
optionsReader := mc.mqttClient.OptionsReader()
for _, topic := range topics {
handler := newMessageHandler(mc.unmarshaller, topic.Messages, messageErrors)
qos := optionsReader.WillQos()
token := mc.mqttClient.Subscribe(topic.Topic, qos, handler)
err := getTokenError(token, optionsReader.ConnectTimeout(), SubscribeOperation, "Failed to create subscription")
if err != nil {
return err
}
mc.activeSubscriptions = append(mc.activeSubscriptions, activeSubscription{
topic: topic.Topic,
qos: qos,
handler: handler,
errors: messageErrors,
})
}
return nil
}
// Disconnect closes the connection to the connected MQTT server.
func (mc *Client) Disconnect() error {
// Specify a wait time equal to the write timeout so that we allow other any queued processing to complete before
// disconnecting.
optionsReader := mc.mqttClient.OptionsReader()
mc.mqttClient.Disconnect(uint(optionsReader.ConnectTimeout() * time.Millisecond))
return nil
}
// DefaultClientCreator returns a default function for creating MQTT clients.
func DefaultClientCreator() ClientCreator {
return func(config types.MessageBusConfig, handler pahoMqtt.OnConnectHandler) (pahoMqtt.Client, error) {
clientConfiguration, err := CreateMQTTClientConfiguration(config)
if err != nil {
return nil, err
}
clientOptions, err := createClientOptions(clientConfiguration, tls.X509KeyPair, tls.LoadX509KeyPair)
if err != nil {
return nil, err
}
clientOptions.OnConnect = handler
return pahoMqtt.NewClient(clientOptions), nil
}
}
// ClientCreatorWithCertLoader creates a ClientCreator which leverages the specified cert creator and loader when
// creating an MQTT client.
func ClientCreatorWithCertLoader(certCreator pkg.X509KeyPairCreator, certLoader pkg.X509KeyLoader) ClientCreator {
return func(options types.MessageBusConfig, handler pahoMqtt.OnConnectHandler) (pahoMqtt.Client, error) {
clientConfiguration, err := CreateMQTTClientConfiguration(options)
if err != nil {
return nil, err
}
clientOptions, err := createClientOptions(clientConfiguration, certCreator, certLoader)
if err != nil {
return nil, err
}
clientOptions.OnConnect = handler
return pahoMqtt.NewClient(clientOptions), nil
}
}
// newMessageHandler creates a function which meets the criteria for a MessageHandler and propagates the received
// messages to the proper channel.
func newMessageHandler(
unmarshaler MessageUnmarshaller,
messageChannel chan<- types.MessageEnvelope,
errorChannel chan<- error) pahoMqtt.MessageHandler {
return func(client pahoMqtt.Client, message pahoMqtt.Message) {
var messageEnvelope types.MessageEnvelope
payload := message.Payload()
err := unmarshaler(payload, &messageEnvelope)
if err != nil {
errorChannel <- err
}
messageEnvelope.ReceivedTopic = message.Topic()
messageChannel <- messageEnvelope
}
}
// getTokenError determines if a Token is in an errored state and if so returns the proper error message. Otherwise,
// nil.
//
// NOTE the paho.pahoMqtt.golang's recommended way for handling errors do not cover all cases. During manual verification
// with an MQTT server, it was observed that the Token.Error() was sometimes nil even when a token.WaitTimeout(...)
// returned false(indicating the operation has timed-out). Therefore, there are some additional checks that need to
// take place to ensure the error message is returned if it is present. One example scenario, if you attempt to connect
// without providing a ClientID.
func getTokenError(token pahoMqtt.Token, timeout time.Duration, operation string, defaultTimeoutMessage string) error {
hasTimedOut := !token.WaitTimeout(timeout)
if hasTimedOut && token.Error() != nil {
return NewTimeoutError(operation, token.Error().Error())
}
if hasTimedOut && token.Error() == nil {
return NewTimeoutError(operation, defaultTimeoutMessage)
}
if token.Error() != nil {
return NewOperationErr(operation, token.Error().Error())
}
return nil
}
// createClientOptions constructs mqtt.Client options from an MQTTClientConfig.
func createClientOptions(
clientConfiguration MQTTClientConfig,
certCreator pkg.X509KeyPairCreator,
certLoader pkg.X509KeyLoader) (*pahoMqtt.ClientOptions, error) {
clientOptions := pahoMqtt.NewClientOptions()
clientOptions.AddBroker(clientConfiguration.BrokerURL)
clientOptions.SetUsername(clientConfiguration.Username)
clientOptions.SetPassword(clientConfiguration.Password)
clientOptions.SetClientID(clientConfiguration.ClientId)
clientOptions.SetKeepAlive(time.Duration(clientConfiguration.KeepAlive) * time.Second)
clientOptions.WillQos = byte(clientConfiguration.Qos)
clientOptions.WillRetained = clientConfiguration.Retained
clientOptions.CleanSession = clientConfiguration.CleanSession
clientOptions.SetAutoReconnect(clientConfiguration.AutoReconnect)
clientOptions.SetConnectTimeout(time.Duration(clientConfiguration.ConnectTimeout) * time.Second)
tlsConfiguration, err := pkg.GenerateTLSForClientClientOptions(
clientConfiguration.BrokerURL,
clientConfiguration.TlsConfigurationOptions,
certCreator,
certLoader)
if err != nil {
return clientOptions, err
}
clientOptions.SetTLSConfig(tlsConfiguration)
return clientOptions, nil
}