/
MQTTSNBrokers.py
443 lines (403 loc) · 17.5 KB
/
MQTTSNBrokers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
"""
*******************************************************************
Copyright (c) 2013, 2017 IBM Corp.
All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License v1.0
and Eclipse Distribution License v1.0 which accompany this distribution.
The Eclipse Public License is available at
http://www.eclipse.org/legal/epl-v10.html
and the Eclipse Distribution License is available at
http://www.eclipse.org/org/documents/edl-v10.php.
Contributors:
Ian Craggs - initial implementation and/or documentation
*******************************************************************
"""
import traceback, random, sys, string, copy, threading, logging, socket, time, uuid
from mqtt.formats import MQTTSN
from .Brokers import Brokers
logger = logging.getLogger('MQTT broker')
def respond(address, callback, packet):
logger.info("out: "+repr(packet))
if hasattr(callback[1], "handlePacket"):
callback[1].handlePacket(packet)
else:
try:
respondfn = callback[0]
context = callback[1]
respondfn(context, packet.pack())
except:
traceback.print_exc()
class MQTTSNClients:
def __init__(self, anId, cleansession, keepalive, socket, broker):
self.id = anId # required
self.cleansession = cleansession
self.socket = socket
self.msgid = 1
self.outbound = [] # message objects - for ordering
self.outmsgs = {} # msgids to message objects
self.broker = broker
if broker.publish_on_pubrel:
self.inbound = {} # stored inbound QoS 2 publications
else:
self.inbound = []
self.connected = False
self.will = None
self.keepalive = keepalive
self.lastPacket = None
def resend(self):
logger.debug("resending unfinished publications %s", str(self.outbound))
if len(self.outbound) > 0:
logger.info("[MQTT-4.4.0-1] resending inflight QoS 1 and 2 messages")
for pub in self.outbound:
logger.debug("resending", pub)
logger.info("[MQTT-4.4.0-2] dup flag must be set on in re-publish")
if pub.fh.QoS == 0:
respond(self.socket, pub)
elif pub.fh.QoS == 1:
pub.fh.DUP = 1
logger.info("[MQTT-2.1.2-3] Dup when resending QoS 1 publish id %d", pub.messageIdentifier)
logger.info("[MQTT-2.3.1-4] Message id same as original publish on resend")
logger.info("[MQTT-4.3.2-1] Resending QoS 1 with DUP flag")
respond(self.socket, pub)
elif pub.fh.QoS == 2:
if pub.qos2state == "PUBREC":
logger.info("[MQTT-2.1.2-3] Dup when resending QoS 2 publish id %d", pub.messageIdentifier)
pub.fh.DUP = 1
logger.info("[MQTT-2.3.1-4] Message id same as original publish on resend")
logger.info("[MQTT-4.3.3-1] Resending QoS 2 with DUP flag")
respond(self.socket, pub)
else:
resp = MQTTSN.Pubrels()
logger.info("[MQTT-2.3.1-4] Message id same as original publish on resend")
resp.messageIdentifier = pub.messageIdentifier
respond(self.socket, resp)
def publishArrived(self, topic, msg, qos, retained=False):
pub = MQTTSN.Publishes()
logger.info("[MQTT-3.2.3-3] topic name must match the subscription's topic filter")
pub.topicName = topic
pub.data = msg
pub.fh.QoS = qos
pub.fh.RETAIN = retained
if retained:
logger.info("[MQTT-2.1.2-7] Last retained message on matching topics sent on subscribe")
if pub.fh.RETAIN:
logger.info("[MQTT-2.1.2-9] Set retained flag on retained messages")
if qos == 2:
pub.qos2state = "PUBREC"
if qos in [1, 2]:
pub.messageIdentifier = self.msgid
logger.debug("client id: %d msgid: %d", self.id, self.msgid)
if self.msgid == 65535:
self.msgid = 1
else:
self.msgid += 1
self.outbound.append(pub)
self.outmsgs[pub.messageIdentifier] = pub
logger.info("[MQTT-4.6.0-6] publish packets must be sent in order of receipt from any given client")
if self.connected:
respond(self.socket, pub)
else:
if qos == 0 and not self.broker.dropQoS0:
self.outbound.append(pub)
if qos in [1, 2]:
logger.info("[MQTT-3.1.2-5] storing of QoS 1 and 2 messages for disconnected client %s", self.id)
def puback(self, msgid):
if msgid in self.outmsgs.keys():
pub = self.outmsgs[msgid]
if pub.fh.QoS == 1:
self.outbound.remove(pub)
del self.outmsgs[msgid]
else:
logger.error("%s: Puback received for msgid %d, but QoS is %d", self.id, msgid, pub.fh.QoS)
else:
logger.error("%s: Puback received for msgid %d, but no message found", self.id, msgid)
def pubrec(self, msgid):
rc = False
if msgid in self.outmsgs.keys():
pub = self.outmsgs[msgid]
if pub.fh.QoS == 2:
if pub.qos2state == "PUBREC":
pub.qos2state = "PUBCOMP"
rc = True
else:
logger.error("%s: Pubrec received for msgid %d, but message in wrong state", self.id, msgid)
else:
logger.error("%s: Pubrec received for msgid %d, but QoS is %d", self.id, msgid, pub.fh.QoS)
else:
logger.error("%s: Pubrec received for msgid %d, but no message found", self.id, msgid)
return rc
def pubcomp(self, msgid):
if msgid in self.outmsgs.keys():
pub = self.outmsgs[msgid]
if pub.fh.QoS == 2:
if pub.qos2state == "PUBCOMP":
self.outbound.remove(pub)
del self.outmsgs[msgid]
else:
logger.error("Pubcomp received for msgid %d, but message in wrong state", msgid)
else:
logger.error("Pubcomp received for msgid %d, but QoS is %d", msgid, pub.fh.QoS)
else:
logger.error("Pubcomp received for msgid %d, but no message found", msgid)
def pubrel(self, msgid):
rc = None
if self.broker.publish_on_pubrel:
if msgid in self.inbound.keys():
pub = self.inbound[msgid]
if pub.fh.QoS == 2:
rc = pub
else:
logger.error("Pubrec received for msgid %d, but QoS is %d", msgid, pub.fh.QoS)
else:
rc = msgid in self.inbound
if not rc:
logger.error("Pubrec received for msgid %d, but no message found", msgid)
return rc
class MQTTSNBrokers:
def __init__(self, publish_on_pubrel=True,
overlapping_single=True,
dropQoS0=True,
zero_length_clientids=True,
lock=None, sharedData={}):
# optional behaviours
self.publish_on_pubrel = publish_on_pubrel
self.dropQoS0 = dropQoS0 # don't queue QoS 0 messages for disconnected clients
self.zero_length_clientids = zero_length_clientids
self.broker = Brokers(overlapping_single, sharedData=sharedData)
self.clients = {} # socket -> clients
if lock:
logger.info("Using shared lock %d", id(lock))
self.lock = lock
else:
self.lock = threading.RLock()
logger.info("MQTT-SN Paho Test Broker")
logger.info("Optional behaviour, publish on pubrel: %s", self.publish_on_pubrel)
logger.info("Optional behaviour, single publish on overlapping topics: %s", self.broker.overlapping_single)
logger.info("Optional behaviour, drop QoS 0 publications to disconnected clients: %s", self.dropQoS0)
logger.info("Optional behaviour, support zero length clientids: %s", self.zero_length_clientids)
def shutdown(self):
# do we need to do anything here?
pass
def setBroker3(self, broker3):
self.broker.setBroker3(broker3.broker)
def setBroker5(self, broker5):
self.broker.setBroker5(broker5.broker)
def reinitialize(self):
logger.info("Reinitializing broker")
self.clients = {}
self.broker.reinitialize()
def handleRequest(self, raw_packet, client_address, callback):
"this is going to be called from multiple threads, so synchronize"
self.lock.acquire()
terminate = False
try:
if raw_packet == None:
# will message
self.disconnect(sock, None, terminate=True)
terminate = True
else:
packet = MQTTSN.unpackPacket(raw_packet)
if packet:
terminate = self.handlePacket(packet, client_address, callback)
else:
raise MQTTSN.MQTTSNException("[MQTT-2.0.0-1] handleRequest: badly formed MQTT packet")
finally:
self.lock.release()
return terminate
def handlePacket(self, packet, sock, callback):
terminate = False
logger.info("in: "+str(packet))
if sock not in self.clients.keys() and not isinstance(packet, MQTTSN.Connects):
print(self.clients.keys(), sock)
self.disconnect(sock, packet)
raise MQTTSN.MQTTSNException("[MQTT-3.1.0-1] Connect was not first packet on socket")
else:
getattr(self, MQTTSN.Messages.Names[packet.messageType].lower())(sock, packet, callback)
if sock in self.clients.keys():
self.clients[sock].lastPacket = time.time()
if packet.messageType == MQTTSN.MessageTypes.DISCONNECT:
terminate = True
return terminate
def connect(self, sock, packet, callback):
if packet.ProtocolId != 1:
logger.error("[MQTT-3.1.2-2] Wrong protocol version %d", packet.ProtocolVersion)
resp = MQTTSN.Connacks()
resp.ReturnCode = 1
respond(sock, callback, resp)
logger.info("[MQTT-3.2.2-5] must close connection after non-zero connack")
self.disconnect(sock, None)
logger.info("[MQTT-3.1.4-5] When rejecting connect, no more data must be processed")
return
if sock in self.clients.keys(): # is socket is already connected?
self.disconnect(sock, None)
logger.info("[MQTT-3.1.4-5] When rejecting connect, no more data must be processed")
raise MQTTSN.MQTTSNException("[MQTT-3.1.0-2] Second connect packet")
if len(packet.ClientId) == 0:
if self.zero_length_clientids == False or packet.CleanSession == False:
if self.zero_length_clientids:
logger.info("[MQTT-3.1.3-8] Reject 0-length clientid with cleansession false")
logger.info("[MQTT-3.1.3-9] if clientid is rejected, must send connack 2 and close connection")
resp = MQTTSN.Connacks()
resp.returnCode = 2
respond(sock, callback, resp)
logger.info("[MQTT-3.2.2-5] must close connection after non-zero connack")
self.disconnect(sock, None)
logger.info("[MQTT-3.1.4-5] When rejecting connect, no more data must be processed")
return
else:
logger.info("[MQTT-3.1.3-7] 0-length clientid must have cleansession true")
packet.ClientId = uuid.uuid4() # give the client a unique clientid
logger.info("[MQTT-3.1.3-6] 0-length clientid must be assigned a unique id %s", packet.ClientId)
logger.info("[MQTT-3.1.3-5] Clientids of 1 to 23 chars and ascii alphanumeric must be allowed")
if packet.ClientId in [client.id for client in self.clients.values()]: # is this client already connected on a different socket?
for s in self.clients.keys():
if self.clients[s].id == packet.ClientId:
logger.info("[MQTT-3.1.4-2] Disconnecting old client %s", packet.ClientId)
self.disconnect(s, None)
break
me = None
if not packet.Flags.CleanSession:
me = self.broker.getClient(packet.ClientId) # find existing state, if there is any
if me:
logger.info("[MQTT-3.1.3-2] clientid used to retrieve client state")
resp = MQTTSN.Connacks()
if me == None:
me = MQTTSNClients(packet.ClientId, packet.Flags.CleanSession, packet.Duration, sock, self)
else:
me.socket = sock # set existing client state to new socket
me.cleansession = packet.Flags.CleanSession
me.keepalive = packet.Duration
logger.info("[MQTT-4.1.0-1] server must store data for at least as long as the network connection lasts")
self.clients[sock] = me
#me.will = (packet.WillTopic, packet.WillQoS, packet.WillMessage, packet.WillRETAIN) if packet.WillFlag else None
self.broker.connect(me)
logger.info("[MQTT-3.2.0-1] the first response to a client must be a connack")
resp.ReturnCode = 0
respond(sock, callback, resp)
me.resend()
def disconnect(self, sock, packet, terminate=False):
logger.info("[MQTT-3.14.4-2] Client must not send any more packets after disconnect")
if sock in self.clients.keys():
if terminate:
self.broker.terminate(self.clients[sock].id)
else:
self.broker.disconnect(self.clients[sock].id)
del self.clients[sock]
def disconnectAll(self, sock):
for sock in self.clients.keys():
self.disconnect(sock, None)
def subscribe(self, sock, packet):
topics = []
qoss = []
respqoss = []
for p in packet.data:
if p[0] == "test/nosubscribe":
respqoss.append(0x80)
else:
if p[0] == "test/QoS 1 only":
respqoss.append(min(1), p[1])
elif p[0] == "test/QoS 0 only":
respqoss.append(min(0), p[1])
else:
respqoss.append(p[1])
topics.append(p[0])
qoss.append(respqoss[-1])
if len(topics) > 0:
self.broker.subscribe(self.clients[sock].id, topics, qoss)
resp = MQTTSN.Subacks()
logger.info("[MQTT-2.3.1-7][MQTT-3.8.4-2] Suback has same message id as subscribe")
logger.info("[MQTT-3.8.4-1] Must respond with suback")
resp.messageIdentifier = packet.messageIdentifier
logger.info("[MQTT-3.8.4-5] return code must be returned for each topic in subscribe")
logger.info("[MQTT-3.9.3-1] the order of return codes must match order of topics in subscribe")
resp.data = respqoss
respond(sock, resp)
def unsubscribe(self, sock, packet):
self.broker.unsubscribe(self.clients[sock].id, packet.data)
resp = MQTTSN.Unsubacks()
logger.info("[MQTT-2.3.1-7] Unsuback has same message id as unsubscribe")
logger.info("[MQTT-3.10.4-4] Unsuback must be sent - same message id as unsubscribe")
me = self.clients[sock]
if len(me.outbound) > 0:
logger.info("[MQTT-3.10.4-3] sending unsuback has no effect on outward inflight messages")
resp.messageIdentifier = packet.messageIdentifier
respond(sock, resp)
def publish(self, sock, packet, callback):
if packet.Flags.QoS == 0:
if packet.Flags.TopicIdType == 2:
topic = MQTTSN.writeInt16(packet.TopicId).decode()
print("topic", topic)
self.broker.publish(self.clients[sock].id,
topic, packet.Data, packet.Flags.QoS, packet.Flags.RETAIN)
elif packet.fh.QoS == 1:
if packet.fh.DUP:
logger.info("[MQTT-3.3.1-3] Incoming publish DUP 1 ==> outgoing publish with DUP 0")
logger.info("[MQTT-4.3.2-2] server must store message in accordance with QoS 1")
self.broker.publish(self.clients[sock].id,
packet.topicName, packet.data, packet.fh.QoS, packet.fh.RETAIN)
resp = MQTTSN.Pubacks()
logger.info("[MQTT-2.3.1-6] puback messge id same as publish")
resp.messageIdentifier = packet.messageIdentifier
respond(sock, callback, resp)
elif packet.fh.QoS == 2:
myclient = self.clients[sock]
if self.publish_on_pubrel:
if packet.messageIdentifier in myclient.inbound.keys():
if packet.fh.DUP == 0:
logger.error("[MQTT-3.3.1-2] duplicate QoS 2 message id %d found with DUP 0", packet.messageIdentifier)
else:
logger.info("[MQTT-3.3.1-2] DUP flag is 1 on redelivery")
else:
myclient.inbound[packet.messageIdentifier] = packet
else:
if packet.messageIdentifier in myclient.inbound:
if packet.fh.DUP == 0:
logger.error("[MQTT-3.3.1-2] duplicate QoS 2 message id %d found with DUP 0", packet.messageIdentifier)
else:
logger.info("[MQTT-3.3.1-2] DUP flag is 1 on redelivery")
else:
myclient.inbound.append(packet.messageIdentifier)
logger.info("[MQTT-4.3.3-2] server must store message in accordance with QoS 2")
self.broker.publish(myclient, packet.topicName, packet.data, packet.fh.QoS, packet.fh.RETAIN)
resp = MQTTSN.Pubrecs()
logger.info("[MQTT-2.3.1-6] pubrec messge id same as publish")
resp.messageIdentifier = packet.messageIdentifier
respond(sock, callback, resp)
def pubrel(self, sock, packet):
myclient = self.clients[sock]
pub = myclient.pubrel(packet.messageIdentifier)
if pub:
if self.publish_on_pubrel:
self.broker.publish(myclient.id, pub.topicName, pub.data, pub.fh.QoS, pub.fh.RETAIN)
del myclient.inbound[packet.messageIdentifier]
else:
myclient.inbound.remove(packet.messageIdentifier)
resp = MQTTSN.Pubcomps()
logger.info("[MQTT-2.3.1-6] pubcomp messge id same as publish")
resp.messageIdentifier = packet.messageIdentifier
respond(sock, resp)
def pingreq(self, sock, packet):
resp = MQTTSN.Pingresps()
logger.info("[MQTT-3.12.4-1] sending pingresp in response to pingreq")
respond(sock, resp)
def puback(self, sock, packet):
"confirmed reception of qos 1"
self.clients[sock].puback(packet.messageIdentifier)
def pubrec(self, sock, packet):
"confirmed reception of qos 2"
myclient = self.clients[sock]
if myclient.pubrec(packet.messageIdentifier):
logger.info("[MQTT-3.5.4-1] must reply with pubrel in response to pubrec")
resp = MQTTSN.Pubrels()
resp.messageIdentifier = packet.messageIdentifier
respond(sock, resp)
def pubcomp(self, sock, packet):
"confirmed reception of qos 2"
self.clients[sock].pubcomp(packet.messageIdentifier)
def keepalive(self, sock):
if sock in self.clients.keys():
client = self.clients[sock]
if client.keepalive > 0 and time.time() - client.lastPacket > client.keepalive * 1.5:
# keep alive timeout
logger.info("[MQTT-3.1.2-22] keepalive timeout for client %s", client.id)
self.disconnect(sock, None, terminate=True)