From 3ada5a0e8bd59ac13f10a48b7de5c64f3e20a5c7 Mon Sep 17 00:00:00 2001 From: Ian Craggs Date: Tue, 9 Jan 2018 15:31:14 +0000 Subject: [PATCH] Add will delay in V5 and TCP bridge skeleton --- interoperability/client_test5.py | 104 ++++++++++++- interoperability/client_testing.conf | 1 - .../mqtt/brokers/SN/MQTTSNBrokers.py | 4 + .../mqtt/brokers/V311/MQTTBrokers.py | 3 + interoperability/mqtt/brokers/V5/Brokers.py | 35 +++-- .../mqtt/brokers/V5/MQTTBrokers.py | 50 ++++++- .../mqtt/brokers/bridges/TCPBridges.py | 141 ++++++++++++++++++ .../mqtt/brokers/listeners/HTTPListeners.py | 2 +- .../mqtt/brokers/listeners/TCPListeners.py | 4 +- .../mqtt/brokers/listeners/UDPListeners.py | 2 +- interoperability/mqtt/brokers/start.py | 22 ++- interoperability/mqtt/clients/V5/main.py | 8 +- 12 files changed, 347 insertions(+), 29 deletions(-) create mode 100644 interoperability/mqtt/brokers/bridges/TCPBridges.py diff --git a/interoperability/client_test5.py b/interoperability/client_test5.py index ee55157..093c633 100644 --- a/interoperability/client_test5.py +++ b/interoperability/client_test5.py @@ -927,13 +927,115 @@ def test_flow_control2(self): pubs += 1 # should get disconnected... - while (testcallback.disconnects == []): + while testcallback.disconnects == []: receiver.receive(testcallback) self.waitfor(testcallback.disconnects, 1, 1) self.assertEqual(len(testcallback.disconnects), 1, len(testcallback.disconnects)) self.assertEqual(testcallback.disconnects[0]["reasonCode"].value, 147, testcallback.disconnects[0]["reasonCode"].value) + def test_will_delay(self): + """ + the will message should be received earlier than the session expiry + + """ + callback.clear() + callback2.clear() + + will_properties = MQTTV5.Properties(MQTTV5.PacketTypes.WILLMESSAGE) + connect_properties = MQTTV5.Properties(MQTTV5.PacketTypes.CONNECT) + + # set the will delay and session expiry to the same value - + # then both should occur at the same time + will_properties.WillDelayInterval = 3 # in seconds + connect_properties.SessionExpiryInterval = 5 + + connack = aclient.connect(host=host, port=port, cleanstart=True, properties=connect_properties, + willProperties=will_properties, willFlag=True, willTopic=topics[0], willMessage=b"test_will_delay will message") + self.assertEqual(connack.reasonCode.getName(), "Success") + self.assertEqual(connack.sessionPresent, False) + + connack = bclient.connect(host=host, port=port, cleanstart=True) + bclient.subscribe([topics[0]], [MQTTV5.SubscribeOptions(2)]) # subscribe to will message topic + self.waitfor(callback2.subscribeds, 1, 3) + + # terminate client a and wait for the will message + aclient.terminate() + start = time.time() + while callback2.messages == []: + time.sleep(.1) + duration = time.time() - start + #print(duration) + self.assertAlmostEqual(duration, 4, delta=1) + self.assertEqual(callback2.messages[0][0], topics[0]) + self.assertEqual(callback2.messages[0][1], b"test_will_delay will message") + + aclient.disconnect() + bclient.disconnect() + + callback.clear() + callback2.clear() + + # if session expiry is less than will delay then session expiry is used + will_properties.WillDelayInterval = 5 # in seconds + connect_properties.SessionExpiryInterval = 0 + + connack = aclient.connect(host=host, port=port, cleanstart=True, properties=connect_properties, + willProperties=will_properties, willFlag=True, willTopic=topics[0], willMessage=b"test_will_delay will message") + self.assertEqual(connack.reasonCode.getName(), "Success") + self.assertEqual(connack.sessionPresent, False) + + connack = bclient.connect(host=host, port=port, cleanstart=True) + bclient.subscribe([topics[0]], [MQTTV5.SubscribeOptions(2)]) # subscribe to will message topic + self.waitfor(callback2.subscribeds, 1, 3) + + # terminate client a and wait for the will message + aclient.terminate() + start = time.time() + while callback2.messages == []: + time.sleep(.1) + duration = time.time() - start + #print(duration) + self.assertAlmostEqual(duration, 1, delta=1) + self.assertEqual(callback2.messages[0][0], topics[0]) + self.assertEqual(callback2.messages[0][1], b"test_will_delay will message") + + aclient.disconnect() + bclient.disconnect() + + callback.clear() + callback2.clear() + + # if session expiry is less than will delay then session expiry is used + will_properties.WillDelayInterval = 5 # in seconds + connect_properties.SessionExpiryInterval = 2 + + connack = aclient.connect(host=host, port=port, cleanstart=True, properties=connect_properties, + willProperties=will_properties, willFlag=True, willTopic=topics[0], willMessage=b"test_will_delay will message") + self.assertEqual(connack.reasonCode.getName(), "Success") + self.assertEqual(connack.sessionPresent, False) + + connack = bclient.connect(host=host, port=port, cleanstart=True) + bclient.subscribe([topics[0]], [MQTTV5.SubscribeOptions(2)]) # subscribe to will message topic + self.waitfor(callback2.subscribeds, 1, 3) + + # terminate client a and wait for the will message + aclient.terminate() + start = time.time() + while callback2.messages == []: + time.sleep(.1) + duration = time.time() - start + #print(duration) + self.assertAlmostEqual(duration, 3, delta=1) + self.assertEqual(callback2.messages[0][0], topics[0]) + self.assertEqual(callback2.messages[0][1], b"test_will_delay will message") + + aclient.disconnect() + bclient.disconnect() + + callback.clear() + callback2.clear() + def setData(): global topics, wildtopics, nosubscribe_topics, host, port diff --git a/interoperability/client_testing.conf b/interoperability/client_testing.conf index dde498c..52c2fcb 100644 --- a/interoperability/client_testing.conf +++ b/interoperability/client_testing.conf @@ -50,5 +50,4 @@ keyfile tls_testing/keys/server/server-mitm.key require_certificate true #tls_version tlsv1 -listener 9983 INADDR_ANY mqttsn diff --git a/interoperability/mqtt/brokers/SN/MQTTSNBrokers.py b/interoperability/mqtt/brokers/SN/MQTTSNBrokers.py index 7eee2d3..5b8c711 100644 --- a/interoperability/mqtt/brokers/SN/MQTTSNBrokers.py +++ b/interoperability/mqtt/brokers/SN/MQTTSNBrokers.py @@ -198,6 +198,10 @@ def __init__(self, publish_on_pubrel=True, logger.info("Optional behaviour, drop QoS 0 publications to disconnected clients: %s", self.dropQoS0) logger.info("Optional behaviour, support zero length clientids: %s", self.zero_length_clientids) + def shutdown(self): + # do we need to do anything here? + pass + def setBroker3(self, broker3): self.broker.setBroker3(broker3.broker) diff --git a/interoperability/mqtt/brokers/V311/MQTTBrokers.py b/interoperability/mqtt/brokers/V311/MQTTBrokers.py index 1cad9fd..fe0878b 100644 --- a/interoperability/mqtt/brokers/V311/MQTTBrokers.py +++ b/interoperability/mqtt/brokers/V311/MQTTBrokers.py @@ -197,6 +197,9 @@ def __init__(self, publish_on_pubrel=True, logger.info("Optional behaviour, drop QoS 0 publications to disconnected clients: %s", self.dropQoS0) logger.info("Optional behaviour, support zero length clientids: %s", self.zero_length_clientids) + def shutdown(self): + self.disconnectAll() + def setBroker5(self, broker5): self.broker.setBroker5(broker5.broker) diff --git a/interoperability/mqtt/brokers/V5/Brokers.py b/interoperability/mqtt/brokers/V5/Brokers.py index 7bf1b99..e20b431 100644 --- a/interoperability/mqtt/brokers/V5/Brokers.py +++ b/interoperability/mqtt/brokers/V5/Brokers.py @@ -33,6 +33,7 @@ def __init__(self, overlapping_single=True, topicAliasMaximum=0, sharedData={}): self.overlapping_single = overlapping_single self.topicAliasMaximum = topicAliasMaximum self.__broker3 = None + self.willMessageClients = set() # set of clients for which will delay calculations are needed def setBroker3(self, broker3): self.__broker3 = broker3 @@ -61,20 +62,36 @@ def connect(self, aClient, clean=False): self.cleanSession(aClient.id) def sendWillMessage(self, aClientid): + "Sends the will message, if any, for a client" + self.__clients[aClientid].delayedWillTime = None + if aClientid in self.willMessageClients: + self.willMessageClients.remove(aClientid) + print("sendWillMessage", self.__clients[aClientid].will) + logger.info("[MQTT-3.1.2-8] sending will message for client %s", aClientid) + willtopic, willQoS, willmsg, willRetain = self.__clients[aClientid].will + if willRetain: + logger.info("[MQTT-3.1.2-17] sending will message retained for client %s", aClientid) + else: + logger.info("[MQTT-3.1.2-16] sending will message non-retained for client %s", aClientid) + self.publish(aClientid, willtopic, willmsg, willQoS, None, time.monotonic(), willRetain) + logger.info("[MQTT-3.1.2-10] will message is deleted after use or disconnect, for client %s", aClientid) + logger.info("[MQTT-3.14.4-3] on receipt of disconnect, will message is deleted") + self.__clients[aClientid].will = None + + def setupWillMessage(self, aClientid): "Sends the will message, if any, for a client" if aClientid in self.__clients.keys() and self.__clients[aClientid].connected: if self.__clients[aClientid].will != None: - logger.info("[MQTT-3.1.2-8] sending will message for client %s", aClientid) - willtopic, willQoS, willmsg, willRetain = self.__clients[aClientid].will - if willRetain: - logger.info("[MQTT-3.1.2-17] sending will message retained for client %s", aClientid) + if self.__clients[aClientid].willDelayInterval > 0: + self.__clients[aClientid].delayedWillTime = time.monotonic() + self.__clients[aClientid].willDelayInterval + self.__clients[aClientid].willDelayInterval = 0 # will be changed on next connect + self.willMessageClients.add(aClientid) else: - logger.info("[MQTT-3.1.2-16] sending will message non-retained for client %s", aClientid) - self.publish(aClientid, willtopic, willmsg, willQoS, None, time.monotonic(), willRetain) + self.sendWillMessage(aClientid) def disconnect(self, aClientid, willMessage=False, sessionExpiryInterval=-1): if willMessage: - self.sendWillMessage(aClientid) + self.setupWillMessage(aClientid) if aClientid in self.__clients.keys(): self.__clients[aClientid].connected = False if sessionExpiryInterval == 0: @@ -83,12 +100,8 @@ def disconnect(self, aClientid, willMessage=False, sessionExpiryInterval=-1): del self.__clients[aClientid] else: logger.info("[MQTT-3.1.2-4] broker must store the session data for client %s", aClientid) - #self.__clients[aClientid].timestamp = time.clock() self.__clients[aClientid].sessionEndedTime = time.monotonic() self.__clients[aClientid].connected = False - logger.info("[MQTT-3.1.2-10] will message is deleted after use or disconnect, for client %s", aClientid) - logger.info("[MQTT-3.14.4-3] on receipt of disconnect, will message is deleted") - self.__clients[aClientid].will = None def disconnectAll(self): for c in self.__clients.keys()[:]: # copy the array because disconnect will remove an element diff --git a/interoperability/mqtt/brokers/V5/MQTTBrokers.py b/interoperability/mqtt/brokers/V5/MQTTBrokers.py index 6276a12..ba571b6 100644 --- a/interoperability/mqtt/brokers/V5/MQTTBrokers.py +++ b/interoperability/mqtt/brokers/V5/MQTTBrokers.py @@ -55,7 +55,7 @@ def respond(sock, packet, maximumPacketSize=500): class MQTTClients: - def __init__(self, anId, cleanStart, sessionExpiryInterval, keepalive, socket, broker): + def __init__(self, anId, cleanStart, sessionExpiryInterval, willDelayInterval, keepalive, socket, broker): self.id = anId # required self.cleanStart = cleanStart self.sessionExpiryInterval = sessionExpiryInterval @@ -64,6 +64,8 @@ def __init__(self, anId, cleanStart, sessionExpiryInterval, keepalive, socket, b self.receiveMaximum = MQTTV5.MAX_PACKETID self.connected = False self.will = None + self.willDelayInterval = willDelayInterval + self.delayedWillTime = None self.socket = socket self.broker = broker # outbound messages @@ -226,6 +228,33 @@ def pubrel(self, msgid): logger.error("Pubrec received for msgid %d, but no message found", msgid) return rc +class cleanupThreads(threading.Thread): + """ + Most of the actions of the broker can be taken when provoked by an external stimulus, + which is generally a client taking some action. A few actions need to be assessed + asynchronously, such as the will delay. + """ + + def __init__(self, broker, lock=None): + threading.Thread.__init__(self) + self.broker = broker + self.lock = lock + self.running = False + self.start() + + def run(self): + self.running = True + while self.running: + time.sleep(1) + # will delay + for clientid in self.broker.willMessageClients.copy(): + client = self.broker.getClient(clientid) + if client and time.monotonic() >= client.delayedWillTime: + self.broker.sendWillMessage(clientid) + + def stop(self): + self.running = False + class MQTTBrokers: @@ -257,6 +286,8 @@ def __init__(self, publish_on_pubrel=True, else: self.lock = threading.RLock() + self.cleanupThread = cleanupThreads(self.broker) + logger.info("MQTT 5.0 Paho Test Broker") logger.info("Optional behaviour, publish on pubrel: %s", self.publish_on_pubrel) logger.info("Optional behaviour, single publish on overlapping topics: %s", self.broker.overlapping_single) @@ -272,6 +303,10 @@ def __init__(self, publish_on_pubrel=True, - topics which are max QoS 0, QoS 1 or unavailable """ + def shutdown(self): + self.disconnectAll() + self.cleanupThread.stop() + def setBroker3(self, broker3): self.broker.setBroker3(broker3.broker) @@ -400,13 +435,20 @@ def connect(self, sock, packet): sessionExpiryInterval = packet.properties.SessionExpiryInterval else: sessionExpiryInterval = -1 # no expiry + # will delay + willDelayInterval = 0 + if hasattr(packet.willProperties, "WillDelayInterval"): + willDelayInterval = packet.willProperties.WillDelayInterval + if willDelayInterval > sessionExpiryInterval: + willDelayInterval = sessionExpiryInterval if me == None: - me = MQTTClients(packet.ClientIdentifier, packet.CleanStart, sessionExpiryInterval, keepalive, sock, self) + me = MQTTClients(packet.ClientIdentifier, packet.CleanStart, sessionExpiryInterval, willDelayInterval, keepalive, sock, self) else: me.socket = sock # set existing client state to new socket me.cleanStart = packet.CleanStart me.keepalive = keepalive me.sessionExpiryInterval = sessionExpiryInterval + me.willDelayInterval = willDelayInterval # the topic alias maximum in the connect properties sets the maximum outgoing topic aliases for a client me.topicAliasMaximum = packet.properties.TopicAliasMaximum if hasattr(packet.properties, "TopicAliasMaximum") else 0 me.maximumPacketSize = packet.properties.MaximumPacketSize if hasattr(packet.properties, "MaximumPacketSize") else MQTTV5.MAX_PACKET_SIZE @@ -426,7 +468,7 @@ def disconnect(self, sock, packet=None, sendWillMessage=False, reasonCode=None, logger.info("[MQTT-3.14.4-2] Client must not send any more packets after disconnect") me = self.clients[sock] me.clearTopicAliases() - # Session expiry + # Session expiry if packet and hasattr(packet.properties, "SessionExpiryInterval"): if me.sessionExpiryInterval == 0 and packet.properties.SessionExpiryInterval > 0: raise MQTTV5.ProtocolError("[MQTT-3.1.0-2] Can't reset SessionExpiryInterval from 0") @@ -450,7 +492,7 @@ def disconnect(self, sock, packet=None, sendWillMessage=False, reasonCode=None, except: pass # doesn't matter if the socket has been closed at the other end already - def disconnectAll(self, sock): + def disconnectAll(self): for sock in self.clients.keys(): self.disconnect(sock, None) diff --git a/interoperability/mqtt/brokers/bridges/TCPBridges.py b/interoperability/mqtt/brokers/bridges/TCPBridges.py new file mode 100644 index 0000000..6efa9f1 --- /dev/null +++ b/interoperability/mqtt/brokers/bridges/TCPBridges.py @@ -0,0 +1,141 @@ +""" +******************************************************************* + Copyright (c) 2013, 2017 IBM Corp. + + All rights reserved. This program and the accompanying materials + are made available under the terms of the Eclipse Public License v1.0 + and Eclipse Distribution License v1.0 which accompany this distribution. + + The Eclipse Public License is available at + http://www.eclipse.org/legal/epl-v10.html + and the Eclipse Distribution License is available at + http://www.eclipse.org/org/documents/edl-v10.php. + + Contributors: + Ian Craggs - initial implementation and/or documentation +******************************************************************* +""" + +import sys, traceback, socket, logging, getopt, hashlib, base64 +import threading, ssl, time + +import mqtt.clients.V5 +from mqtt.brokers.V5 import MQTTBrokers as MQTTV5Brokers +from mqtt.formats import MQTTV311, MQTTV5 + +server = None +logger = logging.getLogger('MQTT broker') + + +class Callbacks(mqtt.clients.V5.Callback): + + def __init__(self, broker): + self.messages = [] + self.messagedicts = [] + self.publisheds = [] + self.subscribeds = [] + self.unsubscribeds = [] + self.disconnects = [] + self.broker = broker + + def __str__(self): + return str(self.messages) + str(self.messagedicts) + str(self.publisheds) + \ + str(self.subscribeds) + str(self.unsubscribeds) + str(self.disconnects) + + def clear(self): + self.__init__() + + def disconnected(self, reasoncode, properties): + logging.info("disconnected %s %s", str(reasoncode), str(properties)) + self.disconnects.append({"reasonCode" : reasoncode, "properties" : properties}) + + def connectionLost(self, cause): + logging.info("connectionLost %s" % str(cause)) + + def publishArrived(self, topicName, payload, qos, retained, msgid, properties=None): + logging.info("publishArrived %s %s %d %s %d %s", topicName, payload, qos, retained, msgid, str(properties)) + self.messages.append((topicName, payload, qos, retained, msgid, properties)) + self.messagedicts.append({"topicname" : topicName, "payload" : payload, + "qos" : qos, "retained" : retained, "msgid" : msgid, "properties" : properties}) + + # add to local broker + self.broker.broker.publish(aClientid, topic, message, qos, properties, receivedTime, retained) + return True + + def published(self, msgid): + logging.info("published %d", msgid) + self.publisheds.append(msgid) + + def subscribed(self, msgid, data): + logging.info("subscribed %d", msgid) + self.subscribeds.append((msgid, data)) + + def unsubscribed(self, msgid): + logging.info("unsubscribed %d", msgid) + self.unsubscribeds.append(msgid) + + +class Bridges: + + def __init__(self, host, port): + self.host = host + self.port = port + self.client = mqtt.clients.V5.Client("local") + self.callback = Callbacks(broker5) + self.client.registerCallback(self.callback) + self.local_connect() + + def local_connect(self): + # connect locally with V5, so we get noLocal and retainAsPublished + connect = MQTTV5.Connects() + connect.ClientIdentifier = "local" + broker5.connect(self, connect) + subscribe = MQTTV5.Subscribes() + options = MQTTV5.SubscribeOptions() + options.noLocal = options.retainAsPublished = True + subscribe.data = [('+', options)] + broker5.subscribe(self, subscribe) + + def connect(self): + self.client.connect(host=self.host, port=self.port, cleanstart=True) + # subscribe if necessary + options = MQTTV5.SubscribeOptions() + options.noLocal = options.retainAsPublished = True + self.client.subscribe(["+"], [options]) + + def getPacket(self): + # get packet from remote + pass + + def handlePacket(self, packet): + # response from local broker + logger.info("from local broker %s", str(packet)) + if packet.fh.PacketType == MQTTV5.PacketTypes.PUBLISH: + self.client.publish(packet.topicName, packet.data, packet.fh.QoS) #retained=False, properties=None) + + def run(self): + while True: + self.connect() + time.sleep(300) + self.shutdown() + +def setBroker5(aBroker5): + global broker5 + broker5 = aBroker5 + +def create(port, host="", TLS=False, + cert_reqs=ssl.CERT_REQUIRED, + ca_certs=None, certfile=None, keyfile=None): + + if host == "": + host = "localhost" + logger.info("Starting TCP bridge for address '%s' port %d %s", host, port, "with TLS support" if TLS else "") + bridge = Bridges(host, port) + thread = threading.Thread(target=bridge.run) + thread.start() + return bridge + + + + + diff --git a/interoperability/mqtt/brokers/listeners/HTTPListeners.py b/interoperability/mqtt/brokers/listeners/HTTPListeners.py index 6db227e..7c3da8c 100644 --- a/interoperability/mqtt/brokers/listeners/HTTPListeners.py +++ b/interoperability/mqtt/brokers/listeners/HTTPListeners.py @@ -248,7 +248,7 @@ def setSharedData(aLock, aSharedData): def create(port, host="", TLS=False, serve_forever=False, cert_reqs=ssl.CERT_REQUIRED, ca_certs=None, certfile=None, keyfile=None): - logger.info("Starting HTTP server on address '%s' port %d %s", host, port, "with TLS support" if TLS else "") + logger.info("Starting HTTP listener on address '%s' port %d %s", host, port, "with TLS support" if TLS else "") bind_address = "" if host not in ["", "INADDR_ANY"]: bind_address = host diff --git a/interoperability/mqtt/brokers/listeners/TCPListeners.py b/interoperability/mqtt/brokers/listeners/TCPListeners.py index 87d7267..0308b0b 100644 --- a/interoperability/mqtt/brokers/listeners/TCPListeners.py +++ b/interoperability/mqtt/brokers/listeners/TCPListeners.py @@ -208,7 +208,7 @@ def create(port, host="", TLS=False, serve_forever=False, cert_reqs=ssl.CERT_REQUIRED, ca_certs=None, certfile=None, keyfile=None): global server - logger.info("Starting MQTT server on address '%s' port %d %s", host, port, "with TLS support" if TLS else "") + logger.info("Starting TCP listener on address '%s' port %d %s", host, port, "with TLS support" if TLS else "") bind_address = "" if host not in ["", "INADDR_ANY"]: bind_address = host @@ -225,7 +225,7 @@ def create(port, host="", TLS=False, serve_forever=False, server.serve_forever() else: thread = threading.Thread(target = server.serve_forever) - thread.daemon = True + #thread.daemon = True thread.start() return server diff --git a/interoperability/mqtt/brokers/listeners/UDPListeners.py b/interoperability/mqtt/brokers/listeners/UDPListeners.py index 802e5e9..55471bc 100644 --- a/interoperability/mqtt/brokers/listeners/UDPListeners.py +++ b/interoperability/mqtt/brokers/listeners/UDPListeners.py @@ -52,7 +52,7 @@ def setBroker(aBrokerSN): brokerSN = aBrokerSN def create(port, host="", serve_forever=False): - logger.info("Starting MQTT-SN server on address '%s' port %d", host, port) + logger.info("Starting UDP listener on address '%s' port %d", host, port) bind_address = "" if host not in ["", "INADDR_ANY"]: bind_address = host diff --git a/interoperability/mqtt/brokers/start.py b/interoperability/mqtt/brokers/start.py index 238ff0a..b732c20 100644 --- a/interoperability/mqtt/brokers/start.py +++ b/interoperability/mqtt/brokers/start.py @@ -28,6 +28,7 @@ from mqtt.formats.MQTTV5 import MQTTException as MQTTV5Exception from mqtt.formats.MQTTSN import MQTTSNException from mqtt.brokers.listeners import TCPListeners, UDPListeners, HTTPListeners +from mqtt.brokers.bridges import TCPBridges logger = None @@ -91,6 +92,8 @@ def run(port=1883, config=None, brokerSN = MQTTSNBrokers(lock=lock, sharedData=sharedData) + brokers = [broker3, broker5, brokerSN] + broker3.setBroker5(broker5) broker5.setBroker3(broker3) @@ -105,6 +108,8 @@ def run(port=1883, config=None, try: if config == None: + TCPBridges.setBroker5(broker5) + TCPBridges.create(1886) servers.append(TCPListeners.create(1883, serve_forever=True)) else: servers_to_create = [] @@ -161,16 +166,21 @@ def run(port=1883, config=None, logger.exception("startBroker") # Stop incoming communications + import socket for server in servers: try: - logger.info("Stopping the MQTT server %s", str(server)) - server.socket.shutdown(socket.SHUT_RDWR) - server.socket.close() + logger.info("Stopping listener %s", str(server)) + server.shutdown() except: - pass #traceback.print_exc() + traceback.print_exc() - # Disconnect any still connected clients - broker3.disconnectAll() + logger.info("Shutdown brokers") + for broker in brokers: + try: + logger.info("Stopping broker %s", str(broker)) + broker.shutdown() + except: + traceback.print_exc() filter.measure() logger.debug("Ending sharedData %s", sharedData) diff --git a/interoperability/mqtt/clients/V5/main.py b/interoperability/mqtt/clients/V5/main.py index a859d03..79a1f7c 100755 --- a/interoperability/mqtt/clients/V5/main.py +++ b/interoperability/mqtt/clients/V5/main.py @@ -38,8 +38,12 @@ def sendtosocket(mysocket, data): logger.debug("out: %s", str(data)) sent = 0 length = len(data) - while sent < length: - sent += mysocket.send(data) + try: + while sent < length: + sent += mysocket.send(data) + except: + pass # could be socket error + return sent class Callback: