-
Notifications
You must be signed in to change notification settings - Fork 23
/
backend.go
514 lines (416 loc) · 11.9 KB
/
backend.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
package broker
import (
"errors"
"sync"
"time"
"github.com/256dpi/gomqtt/packet"
"github.com/256dpi/gomqtt/session"
"github.com/256dpi/gomqtt/topic"
)
type memorySession struct {
*session.MemorySession
subscriptions *topic.Tree
storedQueue chan *packet.Message
temporaryQueue chan *packet.Message
activeClient *Client
}
func newMemorySession(backlog int) *memorySession {
return &memorySession{
MemorySession: session.NewMemorySession(),
subscriptions: topic.NewStandardTree(),
storedQueue: make(chan *packet.Message, backlog),
temporaryQueue: make(chan *packet.Message, backlog),
}
}
func (s *memorySession) lookupSubscription(topic string) *packet.Subscription {
// find subscription
value := s.subscriptions.MatchFirst(topic)
if value != nil {
return value.(*packet.Subscription)
}
return nil
}
func (s *memorySession) applyQOS(msg *packet.Message) *packet.Message {
// get subscription
sub := s.lookupSubscription(msg.Topic)
if sub != nil {
// respect maximum qos
if msg.QOS > sub.QOS {
msg = msg.Copy()
msg.QOS = sub.QOS
}
}
return msg
}
func (s *memorySession) reuse() {
// reset temporary queue
s.temporaryQueue = make(chan *packet.Message, cap(s.temporaryQueue))
}
// ErrQueueFull is returned to a client that attempts two write to its own full
// queue, which would result in a deadlock.
var ErrQueueFull = errors.New("queue full")
// ErrClosing is returned to a client if the backend is closing.
var ErrClosing = errors.New("closing")
// ErrKillTimeout is returned to a client if the existing client does not close
// in time.
var ErrKillTimeout = errors.New("kill timeout")
// A MemoryBackend stores everything in memory.
type MemoryBackend struct {
// The size of the session queue.
SessionQueueSize int
// The time after an error is returned while waiting on an killed existing
// client to exit.
KillTimeout time.Duration
// Client configuration options. See broker.Client for details.
ClientMaximumKeepAlive time.Duration
ClientParallelPublishes int
ClientParallelSubscribes int
ClientInflightMessages int
ClientTokenTimeout time.Duration
// A map of username and passwords that grant read and write access.
Credentials map[string]string
// The Logger callback handles incoming log events.
Logger func(LogEvent, *Client, packet.Generic, *packet.Message, error)
activeClients map[string]*Client
storedSessions map[string]*memorySession
temporarySessions map[*Client]*memorySession
retainedMessages *topic.Tree
globalMutex sync.Mutex
setupMutex sync.Mutex
closing bool
}
// NewMemoryBackend returns a new MemoryBackend.
func NewMemoryBackend() *MemoryBackend {
return &MemoryBackend{
SessionQueueSize: 100,
KillTimeout: 5 * time.Second,
activeClients: make(map[string]*Client),
storedSessions: make(map[string]*memorySession),
temporarySessions: make(map[*Client]*memorySession),
retainedMessages: topic.NewStandardTree(),
}
}
// Authenticate will authenticates a clients credentials.
func (m *MemoryBackend) Authenticate(_ *Client, user, password string) (bool, error) {
// acquire global mutex
m.globalMutex.Lock()
defer m.globalMutex.Unlock()
// return error if closing
if m.closing {
return false, ErrClosing
}
// allow all if there are no credentials
if m.Credentials == nil {
return true, nil
}
// check login
if pw, ok := m.Credentials[user]; ok && pw == password {
return true, nil
}
return false, nil
}
// Setup will close existing clients and return an appropriate session.
func (m *MemoryBackend) Setup(client *Client, id string, clean bool) (Session, bool, error) {
// acquire setup mutex
m.setupMutex.Lock()
defer m.setupMutex.Unlock()
// acquire global mutex
m.globalMutex.Lock()
defer m.globalMutex.Unlock()
// return error if closing
if m.closing {
return nil, false, ErrClosing
}
// apply client settings
client.MaximumKeepAlive = m.ClientMaximumKeepAlive
client.ParallelPublishes = m.ClientParallelPublishes
client.ParallelSubscribes = m.ClientParallelSubscribes
client.InflightMessages = m.ClientInflightMessages
client.TokenTimeout = m.ClientTokenTimeout
// return a new temporary session if id is zero
if len(id) == 0 {
// create session
sess := newMemorySession(m.SessionQueueSize)
// set active client
sess.activeClient = client
// save session
m.temporarySessions[client] = sess
return sess, false, nil
}
// client id is available
// retrieve existing client. try stored sessions before temporary sessions
existingSession, ok := m.storedSessions[id]
if !ok {
if existingClient, ok2 := m.activeClients[id]; ok2 {
existingSession, ok = m.temporarySessions[existingClient]
}
}
// kill existing client if session is taken
if ok && existingSession.activeClient != nil {
// close client
existingSession.activeClient.Close()
// release global mutex to allow publish and termination, but leave the
// setup mutex to prevent setups
m.globalMutex.Unlock()
// wait for client to close
var err error
select {
case <-existingSession.activeClient.Closed():
// continue
case <-time.After(m.KillTimeout):
err = ErrKillTimeout
}
// acquire mutex again
m.globalMutex.Lock()
// return err if set
if err != nil {
return nil, false, err
}
}
// delete any stored session and return a temporary session if a clean
// session is requested
if clean {
// delete any stored session
delete(m.storedSessions, id)
// create new session
sess := newMemorySession(m.SessionQueueSize)
// set active client
sess.activeClient = client
// save session
m.temporarySessions[client] = sess
// save client
m.activeClients[id] = client
return sess, false, nil
}
// attempt to reuse a stored session
storedSession, ok := m.storedSessions[id]
if ok {
// reuse session
storedSession.reuse()
// set active client
storedSession.activeClient = client
// save client
m.activeClients[id] = client
return storedSession, true, nil
}
// otherwise create fresh session
storedSession = newMemorySession(m.SessionQueueSize)
// set active client
storedSession.activeClient = client
// save session
m.storedSessions[id] = storedSession
// save client
m.activeClients[id] = client
return storedSession, false, nil
}
// Restore is not needed at the moment.
func (m *MemoryBackend) Restore(*Client) error {
return nil
}
// Subscribe will store the subscription and queue retained messages.
func (m *MemoryBackend) Subscribe(client *Client, subs []packet.Subscription, ack Ack) error {
// acquire global mutex
m.globalMutex.Lock()
defer m.globalMutex.Unlock()
// get session
sess := client.Session().(*memorySession)
// save subscription
for _, sub := range subs {
sess.subscriptions.Set(sub.Topic, &sub)
}
// call ack if provided
if ack != nil {
ack()
}
// handle all subscriptions
for _, sub := range subs {
// get retained messages
values := m.retainedMessages.Search(sub.Topic)
// publish messages
for _, value := range values {
// add to temporary queue or return error if queue is full
select {
case sess.temporaryQueue <- value.(*packet.Message):
default:
return ErrQueueFull
}
}
}
return nil
}
// Unsubscribe will delete the subscription.
func (m *MemoryBackend) Unsubscribe(client *Client, topics []string, ack Ack) error {
// get session
sess := client.Session().(*memorySession)
// delete subscriptions
for _, t := range topics {
sess.subscriptions.Empty(t)
}
// call ack if provided
if ack != nil {
ack()
}
return nil
}
// Publish will handle retained messages and add the message to the session queues.
func (m *MemoryBackend) Publish(client *Client, msg *packet.Message, ack Ack) error {
// acquire global mutex
m.globalMutex.Lock()
defer m.globalMutex.Unlock()
// this implementation is very basic and will block the backend on every
// publish. clients that stay connected but won't drain their queue will
// eventually deadlock the broker. full queues are skipped if the client is
// or is going offline
// check retain flag
if msg.Retain {
if len(msg.Payload) > 0 {
// retain message
m.retainedMessages.Set(msg.Topic, msg.Copy())
} else {
// clear already retained message
m.retainedMessages.Empty(msg.Topic)
}
}
// reset retained flag
msg.Retain = false
// use temporary queue by default
queue := func(s *memorySession) chan *packet.Message {
return s.temporaryQueue
}
// use stored queue if qos > 0
if msg.QOS > 0 {
queue = func(s *memorySession) chan *packet.Message {
return s.storedQueue
}
}
// add message to temporary sessions
for _, sess := range m.temporarySessions {
if sub := sess.lookupSubscription(msg.Topic); sub != nil {
if sess.activeClient == client {
// detect deadlock when adding to own queue
select {
case queue(sess) <- msg:
default:
return ErrQueueFull
}
} else {
// wait for room since client is online
select {
case queue(sess) <- msg:
case <-sess.activeClient.Closing():
}
}
}
}
// add message to stored sessions
for _, sess := range m.storedSessions {
if sub := sess.lookupSubscription(msg.Topic); sub != nil {
if sess.activeClient == client {
// detect deadlock when adding to own queue
select {
case queue(sess) <- msg:
default:
return ErrQueueFull
}
} else if sess.activeClient != nil {
// wait for room since client is online
select {
case queue(sess) <- msg:
case <-sess.activeClient.Closing():
}
} else {
// ignore message if offline queue is full
select {
case queue(sess) <- msg:
default:
}
}
}
}
// call ack if available
if ack != nil {
ack()
}
return nil
}
// Dequeue will get the next message from the temporary or stored queue.
func (m *MemoryBackend) Dequeue(client *Client) (*packet.Message, Ack, error) {
// mutex locking not needed
// get session
sess := client.Session().(*memorySession)
// this implementation is very basic and will dequeue messages immediately
// and not return no ack. messages are lost if the client fails to handle them
// get next message from queue
select {
case msg := <-sess.temporaryQueue:
return sess.applyQOS(msg), nil, nil
case msg := <-sess.storedQueue:
return sess.applyQOS(msg), nil, nil
case <-client.Closing():
return nil, nil, nil
}
}
// Terminate will disassociate the session from the client.
func (m *MemoryBackend) Terminate(client *Client) error {
// acquire global mutex
m.globalMutex.Lock()
defer m.globalMutex.Unlock()
// get session
sess := client.Session().(*memorySession)
// release session if available
if sess != nil {
sess.activeClient = nil
}
// remove any temporary session
delete(m.temporarySessions, client)
// remove any saved client
delete(m.activeClients, client.ID())
return nil
}
// Log will call the associated logger.
func (m *MemoryBackend) Log(event LogEvent, client *Client, pkt packet.Generic, msg *packet.Message, err error) {
// call logger if available
if m.Logger != nil {
m.Logger(event, client, pkt, msg, err)
}
}
// Close will close all active clients and close the backend. The return value
// denotes if the timeout has been reached.
func (m *MemoryBackend) Close(timeout time.Duration) bool {
// acquire global mutex
m.globalMutex.Lock()
// set closing
m.closing = true
// prepare list
var clients []*Client
// close temporary sessions
for _, sess := range m.temporarySessions {
sess.activeClient.Close()
clients = append(clients, sess.activeClient)
}
// closed active stored sessions
for _, sess := range m.storedSessions {
if sess.activeClient != nil {
sess.activeClient.Close()
clients = append(clients, sess.activeClient)
}
}
// release mutex to allow termination
m.globalMutex.Unlock()
// return early if empty
if len(clients) == 0 {
return true
}
// prepare timeout
deadline := time.After(timeout)
// wait for clients to close
for _, client := range clients {
select {
case <-client.Closed():
continue
case <-deadline:
return false
}
}
return true
}