This repository has been archived by the owner on Dec 14, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 276
/
client.go
265 lines (224 loc) · 7.29 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
// Copyright © 2017 The Things Network
// Use of this source code is governed by the MIT license that can be found in the LICENSE file.
package mqtt
import (
"fmt"
"sync"
"time"
"github.com/TheThingsNetwork/go-utils/log"
"github.com/TheThingsNetwork/ttn/core/types"
"github.com/TheThingsNetwork/ttn/utils/random"
MQTT "github.com/eclipse/paho.mqtt.golang"
)
// QoS indicates the MQTT Quality of Service level.
// 0: The broker/client will deliver the message once, with no confirmation.
// 1: The broker/client will deliver the message at least once, with confirmation required.
// 2: The broker/client will deliver the message exactly once by using a four step handshake.
var (
PublishQoS byte = 0x00
SubscribeQoS byte = 0x00
)
// Client connects to the MQTT server and can publish/subscribe on uplink, downlink and activations from devices
type Client interface {
Connect() error
Disconnect()
IsConnected() bool
// Uplink pub/sub
PublishUplink(payload types.UplinkMessage) Token
PublishUplinkFields(appID string, devID string, fields map[string]interface{}) Token
SubscribeDeviceUplink(appID string, devID string, handler UplinkHandler) Token
SubscribeAppUplink(appID string, handler UplinkHandler) Token
SubscribeUplink(handler UplinkHandler) Token
UnsubscribeDeviceUplink(appID string, devID string) Token
UnsubscribeAppUplink(appID string) Token
UnsubscribeUplink() Token
// Downlink pub/sub
PublishDownlink(payload types.DownlinkMessage) Token
SubscribeDeviceDownlink(appID string, devID string, handler DownlinkHandler) Token
SubscribeAppDownlink(appID string, handler DownlinkHandler) Token
SubscribeDownlink(handler DownlinkHandler) Token
UnsubscribeDeviceDownlink(appID string, devID string) Token
UnsubscribeAppDownlink(appID string) Token
UnsubscribeDownlink() Token
// Event pub/sub
PublishAppEvent(appID string, eventType types.EventType, payload interface{}) Token
PublishDeviceEvent(appID string, devID string, eventType types.EventType, payload interface{}) Token
SubscribeAppEvents(appID string, eventType types.EventType, handler AppEventHandler) Token
SubscribeDeviceEvents(appID string, devID string, eventType types.EventType, handler DeviceEventHandler) Token
UnsubscribeAppEvents(appID string, eventType types.EventType) Token
UnsubscribeDeviceEvents(appID string, devID string, eventType types.EventType) Token
// Activation pub/sub
PublishActivation(payload types.Activation) Token
SubscribeDeviceActivations(appID string, devID string, handler ActivationHandler) Token
SubscribeAppActivations(appID string, handler ActivationHandler) Token
SubscribeActivations(handler ActivationHandler) Token
UnsubscribeDeviceActivations(appID string, devID string) Token
UnsubscribeAppActivations(appID string) Token
UnsubscribeActivations() Token
}
// Token is returned on asyncronous functions
type Token interface {
// Wait for the function to finish
Wait() bool
// Wait for the function to finish or return false after a certain time
WaitTimeout(time.Duration) bool
// The error associated with the result of the function (nil if everything okay)
Error() error
}
type simpleToken struct {
err error
}
// Wait always returns true
func (t *simpleToken) Wait() bool {
return true
}
// WaitTimeout always returns true
func (t *simpleToken) WaitTimeout(_ time.Duration) bool {
return true
}
// Error contains the error if present
func (t *simpleToken) Error() error {
return t.err
}
type token struct {
sync.RWMutex
complete chan bool
ready bool
err error
}
func newToken() *token {
return &token{
complete: make(chan bool),
}
}
func (t *token) Wait() bool {
t.Lock()
defer t.Unlock()
if !t.ready {
<-t.complete
t.ready = true
}
return t.ready
}
func (t *token) WaitTimeout(d time.Duration) bool {
t.Lock()
defer t.Unlock()
if !t.ready {
select {
case <-t.complete:
t.ready = true
case <-time.After(d):
}
}
return t.ready
}
func (t *token) flowComplete() {
close(t.complete)
}
func (t *token) Error() error {
t.RLock()
defer t.RUnlock()
return t.err
}
// DefaultClient is the default MQTT client for The Things Network
type DefaultClient struct {
opts *MQTT.ClientOptions
mqtt MQTT.Client
ctx log.Interface
subscriptions map[string]MQTT.MessageHandler
}
// NewClient creates a new DefaultClient
func NewClient(ctx log.Interface, id, username, password string, brokers ...string) Client {
if ctx == nil {
ctx = log.Get()
}
ttnClient := &DefaultClient{
opts: MQTT.NewClientOptions(),
ctx: ctx,
subscriptions: make(map[string]MQTT.MessageHandler),
}
for _, broker := range brokers {
ttnClient.opts.AddBroker(broker)
}
ttnClient.opts.SetClientID(fmt.Sprintf("%s-%s", id, random.String(16)))
ttnClient.opts.SetUsername(username)
ttnClient.opts.SetPassword(password)
// TODO: Some tuning of these values probably won't hurt:
ttnClient.opts.SetKeepAlive(30 * time.Second)
ttnClient.opts.SetPingTimeout(10 * time.Second)
ttnClient.opts.SetCleanSession(true)
ttnClient.opts.SetDefaultPublishHandler(func(client MQTT.Client, msg MQTT.Message) {
ctx.Warnf("mqtt: received unhandled message: %v", msg)
})
var reconnecting bool
ttnClient.opts.SetConnectionLostHandler(func(client MQTT.Client, err error) {
ctx.Warnf("mqtt: disconnected (%s), reconnecting...", err)
reconnecting = true
})
ttnClient.opts.SetOnConnectHandler(func(client MQTT.Client) {
ctx.Info("mqtt: connected")
if reconnecting {
for topic, handler := range ttnClient.subscriptions {
ctx.Infof("mqtt: re-subscribing to topic: %s", topic)
ttnClient.subscribe(topic, handler)
}
reconnecting = false
}
})
ttnClient.mqtt = MQTT.NewClient(ttnClient.opts)
return ttnClient
}
var (
// ConnectRetries says how many times the client should retry a failed connection
ConnectRetries = 10
// ConnectRetryDelay says how long the client should wait between retries
ConnectRetryDelay = time.Second
)
// Connect to the MQTT broker. It will retry for ConnectRetries times with a delay of ConnectRetryDelay between retries
func (c *DefaultClient) Connect() error {
if c.mqtt.IsConnected() {
return nil
}
var err error
for retries := 0; retries < ConnectRetries; retries++ {
token := c.mqtt.Connect()
finished := token.WaitTimeout(1 * time.Second)
if !finished {
c.ctx.Warn("mqtt: connection took longer than expected...")
token.Wait()
}
err = token.Error()
if err == nil {
break
}
c.ctx.Warnf("mqtt: could not connect (%s), retrying...", err)
<-time.After(ConnectRetryDelay)
}
if err != nil {
return fmt.Errorf("Could not connect to MQTT Broker (%s)", err)
}
return nil
}
func (c *DefaultClient) publish(topic string, msg []byte) Token {
return c.mqtt.Publish(topic, PublishQoS, false, msg)
}
func (c *DefaultClient) subscribe(topic string, handler MQTT.MessageHandler) Token {
c.subscriptions[topic] = handler
return c.mqtt.Subscribe(topic, SubscribeQoS, handler)
}
func (c *DefaultClient) unsubscribe(topic string) Token {
delete(c.subscriptions, topic)
return c.mqtt.Unsubscribe(topic)
}
// Disconnect from the MQTT broker
func (c *DefaultClient) Disconnect() {
if !c.mqtt.IsConnected() {
return
}
c.ctx.Debug("mqtt: disconnecting")
c.mqtt.Disconnect(25)
}
// IsConnected returns true if there is a connection to the MQTT broker
func (c *DefaultClient) IsConnected() bool {
return c.mqtt.IsConnected()
}