Skip to content

Commit

Permalink
Better timeout error handling.
Browse files Browse the repository at this point in the history
  • Loading branch information
eerimoq committed May 11, 2019
1 parent 2e53ad4 commit a78c6e9
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 3 deletions.
1 change: 1 addition & 0 deletions mqttools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from .client import UnsubscribeError
from .broker import Broker
from .common import MalformedPacketError
from .common import TimeoutError
from .common import SubackReasonCode
from .common import UnsubackReasonCode
from .common import QoS
Expand Down
17 changes: 14 additions & 3 deletions mqttools/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from .common import pack_pingreq
from .common import Error
from .common import MalformedPacketError
from .common import TimeoutError
from .common import PayloadReader
from .common import format_packet

Expand Down Expand Up @@ -335,7 +336,7 @@ async def connect(self, resume_session):
await asyncio.wait_for(self._connack_event.wait(),
self.response_timeout)
except asyncio.TimeoutError:
raise Error('Timeout waiting for CONNACK from the broker.')
raise TimeoutError('Timeout waiting for CONNACK from the broker.')

session_present, reason, properties = self._connack

Expand Down Expand Up @@ -385,7 +386,12 @@ async def subscribe(self, topic):
with Transaction(self) as transaction:
self._write_packet(pack_subscribe(topic,
transaction.packet_identifier))
reasons = await transaction.wait_until_completed()

try:
reasons = await transaction.wait_until_completed()
except asyncio.TimeoutError:
raise TimeoutError('Timeout waiting for SUBACK from the broker.')

reason = reasons[0]

if reason != SubackReasonCode.GRANTED_QOS_0:
Expand All @@ -401,7 +407,12 @@ async def unsubscribe(self, topic):
with Transaction(self) as transaction:
self._write_packet(pack_unsubscribe(topic,
transaction.packet_identifier))
reasons = await transaction.wait_until_completed()

try:
reasons = await transaction.wait_until_completed()
except asyncio.TimeoutError:
raise TimeoutError('Timeout waiting for UNSUBACK from the broker.')

reason = reasons[0]

if reason != UnsubackReasonCode.SUCCESS:
Expand Down
4 changes: 4 additions & 0 deletions mqttools/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,10 @@ class MalformedPacketError(Error):
pass


class TimeoutError(Error):
pass


class PayloadReader(BytesIO):

def read(self, size):
Expand Down
103 changes: 103 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import logging
import asyncio
import unittest

import mqttools


class ClientTest(unittest.TestCase):

def test_connack_timeout(self):
asyncio.run(self.connack_timeout())

async def connack_timeout(self):
def on_client_connected(reader, writer):
pass

listener = await asyncio.start_server(on_client_connected, 'localhost', 0)

async def broker_main():
async with listener:
try:
await listener.serve_forever()
except asyncio.CancelledError:
pass

async def client_main():
client = mqttools.Client(*listener.sockets[0].getsockname(),
'connack',
response_timeout=0.1)

with self.assertRaises(mqttools.TimeoutError):
await client.start()

listener.close()

await asyncio.wait_for(asyncio.gather(broker_main(), client_main()), 1)

def test_subscribe_timeout(self):
asyncio.run(self.subscribe_timeout())

async def subscribe_timeout(self):
def on_client_connected(reader, writer):
# CONNACK
writer.write(b'\x20\x03\x00\x00\x00')

listener = await asyncio.start_server(on_client_connected, 'localhost', 0)

async def broker_main():
async with listener:
try:
await listener.serve_forever()
except asyncio.CancelledError:
pass

async def client_main():
client = mqttools.Client(*listener.sockets[0].getsockname(),
'suback',
response_timeout=0.1)
await client.start()

with self.assertRaises(mqttools.TimeoutError):
await client.subscribe('/foo')

listener.close()

await asyncio.wait_for(asyncio.gather(broker_main(), client_main()), 1)

def test_unsubscribe_timeout(self):
asyncio.run(self.unsubscribe_timeout())

async def unsubscribe_timeout(self):
def on_client_connected(reader, writer):
# CONNACK
writer.write(b'\x20\x03\x00\x00\x00')

listener = await asyncio.start_server(on_client_connected, 'localhost', 0)

async def broker_main():
async with listener:
try:
await listener.serve_forever()
except asyncio.CancelledError:
pass

async def client_main():
client = mqttools.Client(*listener.sockets[0].getsockname(),
'unsuback',
response_timeout=0.1)
await client.start()

with self.assertRaises(mqttools.TimeoutError):
await client.unsubscribe('/foo')

listener.close()

await asyncio.wait_for(asyncio.gather(broker_main(), client_main()), 1)


logging.basicConfig(level=logging.DEBUG)


if __name__ == '__main__':
unittest.main()

0 comments on commit a78c6e9

Please sign in to comment.