Skip to content

Commit

Permalink
Check reason in SUBACK and UNSUBACK.
Browse files Browse the repository at this point in the history
  • Loading branch information
eerimoq committed May 11, 2019
1 parent 4ce9449 commit 3f586de
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 14 deletions.
9 changes: 6 additions & 3 deletions mqttools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@

from .version import __version__
from .client import Client
from .client import MalformedPacketError
from .client import ConnectError
from .client import SessionResumeError
from .client import PublishError
from .client import QoS
from .client import SubscribeError
from .client import UnsubscribeError
from .broker import Broker
from .common import MalformedPacketError
from .common import SubackReasonCode
from .common import UnsubackReasonCode
from .common import QoS


def main():
Expand Down
40 changes: 29 additions & 11 deletions mqttools/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from .common import ControlPacketType
from .common import ConnectReasonCode
from .common import PropertyIds
from .common import SubackReasonCode
from .common import UnsubackReasonCode
from .common import DisconnectReasonCode
from .common import QoS
from .common import pack_connect
Expand All @@ -29,6 +31,10 @@
LOGGER = logging.getLogger(__name__)


class SessionResumeError(Error):
pass


class ConnectError(Error):

def __init__(self, reason):
Expand All @@ -44,11 +50,7 @@ def __str__(self):
return message


class SessionResumeError(Error):
pass


class PublishError(Error):
class ReasonError(Error):

def __init__(self, reason):
super().__init__()
Expand All @@ -61,6 +63,14 @@ def __str__(self):
return f'UNKNOWN({self.reason})'


class SubscribeError(ReasonError):
pass


class UnsubscribeError(ReasonError):
pass


class Transaction(object):

def __init__(self, client):
Expand Down Expand Up @@ -375,7 +385,11 @@ async def subscribe(self, topic):
with Transaction(self) as transaction:
self._write_packet(pack_subscribe(topic,
transaction.packet_identifier))
await transaction.wait_until_completed()
reasons = await transaction.wait_until_completed()
reason = reasons[0]

if reason != SubackReasonCode.GRANTED_QOS_0:
raise SubscribeError(reason)

async def unsubscribe(self, topic):
"""Unsubscribe from given topic.
Expand All @@ -387,7 +401,11 @@ async def unsubscribe(self, topic):
with Transaction(self) as transaction:
self._write_packet(pack_unsubscribe(topic,
transaction.packet_identifier))
await transaction.wait_until_completed()
reasons = await transaction.wait_until_completed()
reason = reasons[0]

if reason != UnsubackReasonCode.SUCCESS:
raise UnsubscribeError(reason)

def publish(self, topic, message):
"""Publish given message to given topic with QoS 0.
Expand Down Expand Up @@ -439,20 +457,20 @@ async def on_publish(self, flags, payload):
await self._messages.put((topic, message))

def on_suback(self, payload):
packet_identifier, properties, _ = unpack_suback(payload)
packet_identifier, properties, reasons = unpack_suback(payload)

if packet_identifier in self.transactions:
self.transactions[packet_identifier].set_completed(None)
self.transactions[packet_identifier].set_completed(reasons)
else:
LOGGER.debug(
'Discarding unexpected SUBACK packet with identifier %d.',
packet_identifier)

def on_unsuback(self, payload):
packet_identifier, properties, _ = unpack_unsuback(payload)
packet_identifier, properties, reasons = unpack_unsuback(payload)

if packet_identifier in self.transactions:
self.transactions[packet_identifier].set_completed(None)
self.transactions[packet_identifier].set_completed(reasons)
else:
LOGGER.debug(
'Discarding unexpected UNSUBACK packet with identifier %d.',
Expand Down
20 changes: 20 additions & 0 deletions tests/test_mqttools.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ def test_subscribe(self):
('c2s', b'\x82\n\x00\x02\x00\x00\x04/a/c\x00'),
# SUBACK
('s2c', b'\x90\x04\x00\x02\x00\x00'),
# SUBSCRIBE with invalid topic
('c2s', b'\x82\x09\x00\x03\x00\x00\x03/a#\x00'),
# SUBACK
('s2c', b'\x90\x04\x00\x03\x00\xa2'),
# PUBLISH QoS 0
('s2c', b'\x30\x0a\x00\x04/a/b\x00apa'),
# DISCONNECT
Expand All @@ -132,6 +136,12 @@ def test_subscribe(self):
self.run_until_complete(client.start())
self.run_until_complete(client.subscribe('/a/b'))
self.run_until_complete(client.subscribe('/a/c'))

with self.assertRaises(mqttools.SubscribeError) as cm:
self.run_until_complete(client.subscribe('/a#'))

self.assertEqual(cm.exception.reason,
mqttools.SubackReasonCode.WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED)
topic, message = self.run_until_complete(client.messages.get())
self.assertEqual(topic, '/a/b')
self.assertEqual(message, b'apa')
Expand All @@ -151,6 +161,10 @@ def test_unsubscribe(self):
('c2s', b'\xa2\x09\x00\x02\x00\x00\x04/a/b'),
# UNSUBACK
('s2c', b'\xb0\x04\x00\x02\x00\x00'),
# UNSUBSCRIBE from non-subscribed topic
('c2s', b'\xa2\x09\x00\x03\x00\x00\x04/a/d'),
# UNSUBACK
('s2c', b'\xb0\x04\x00\x03\x00\x11'),
# DISCONNECT
('c2s', b'\xe0\x02\x00\x00')
]
Expand All @@ -159,6 +173,12 @@ def test_unsubscribe(self):
self.run_until_complete(client.start())
self.run_until_complete(client.subscribe('/a/b'))
self.run_until_complete(client.unsubscribe('/a/b'))

with self.assertRaises(mqttools.UnsubscribeError) as cm:
self.run_until_complete(client.unsubscribe('/a/d'))

self.assertEqual(cm.exception.reason,
mqttools.UnsubackReasonCode.NO_SUBSCRIPTION_EXISTED)
self.run_until_complete(client.stop())

def test_publish_qos_0(self):
Expand Down

0 comments on commit 3f586de

Please sign in to comment.