Skip to content

Commit

Permalink
Unsubscribe from topic.
Browse files Browse the repository at this point in the history
  • Loading branch information
eerimoq committed May 8, 2019
1 parent e5dc6a4 commit bfd015b
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 0 deletions.
29 changes: 29 additions & 0 deletions examples/unsubscriber.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import asyncio
import mqttools


async def unsubscriber():
client = mqttools.Client('broker.hivemq.com',
1883,
keep_alive_s=5)

await client.start()

print('Subscribing to /test/mqttools/foo.')
await client.subscribe('/test/mqttools/foo', 0)
topic, message = await client.messages.get()

print(f'Topic: {topic}')
print(f'Message: {message}')

print('Unsubscribing from /test/mqttools/foo.')
await client.unsubscribe('/test/mqttools/foo')

# Should only return when the broker connection is lost.
topic, message = await client.messages.get()

print(f'Topic: {topic}')
print(f'Message: {message}')


asyncio.run(unsubscriber())
51 changes: 51 additions & 0 deletions mqttools/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,29 @@ def unpack_suback(payload):
return packet_identifier, properties


def pack_unsubscribe(topic, packet_identifier):
packed_topic = pack_string(topic)
packed = pack_fixed_header(ControlPacketType.UNSUBSCRIBE,
2,
len(packed_topic) + 3)
packed += struct.pack('>HB', packet_identifier, 0)
packed += packed_topic

return packed


def unpack_unsuback(payload):
packet_identifier = unpack_u16(payload)
properties = unpack_properties('UNSUBACK',
[
PropertyIds.REASON_STRING,
PropertyIds.USER_PROPERTY
],
payload)

return packet_identifier, properties


def pack_publish(topic, message, qos, packet_identifier, alias):
if alias is None:
properties = b'\x00'
Expand Down Expand Up @@ -1091,6 +1114,18 @@ async def subscribe(self, topic, qos):
transaction.packet_identifier))
await transaction.wait_until_completed()

async def unsubscribe(self, topic):
"""Unsubscribe from given topic.
>>> await client.unsubscribe('/my/topic')
"""

with Transaction(self) as transaction:
self._write_packet(pack_unsubscribe(topic,
transaction.packet_identifier))
await transaction.wait_until_completed()

def publish_qos_0(self, topic, alias, message):
with Transaction(self) as transaction:
self._write_packet(pack_publish(topic,
Expand Down Expand Up @@ -1308,6 +1343,20 @@ def on_suback(self, payload):
'Discarding unexpected SUBACK packet with identifier %d.',
packet_identifier)

def on_unsuback(self, payload):
try:
packet_identifier, properties = unpack_unsuback(payload)
except MalformedPacketError:
LOGGER.debug('Discarding malformed UNSUBACK packet.')
return

if packet_identifier in self.transactions:
self.transactions[packet_identifier].set_completed(None)
else:
LOGGER.debug(
'Discarding unexpected UNSUBACK packet with identifier %d.',
packet_identifier)

def on_pingresp(self):
self._pingresp_event.set()

Expand All @@ -1329,6 +1378,8 @@ async def reader_loop(self):
self.on_pubcomp(payload)
elif packet_type == ControlPacketType.SUBACK:
self.on_suback(payload)
elif packet_type == ControlPacketType.UNSUBACK:
self.on_unsuback(payload)
elif packet_type == ControlPacketType.PINGRESP:
self.on_pingresp()
else:
Expand Down
24 changes: 24 additions & 0 deletions tests/test_mqttools.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,30 @@ def test_subscribe(self):
self.assertEqual(message, b'c')
self.run_until_complete(client.stop())

def test_unsubscribe(self):
Broker.EXPECTED_DATA_STREAM = [
# CONNECT
('c2s', b'\x10\x10\x00\x04MQTT\x05\x02\x00\x00\x00\x00\x03bar'),
# CONNACK
('s2c', b'\x20\x03\x00\x00\x00'),
# SUBSCRIBE
('c2s', b'\x82\n\x00\x01\x00\x00\x04/a/b\x00'),
# SUBACK
('s2c', b'\x90\x04\x00\x01\x00\x00'),
# UNSUBSCRIBE
('c2s', b'\xa2\x09\x00\x02\x00\x00\x04/a/b'),
# UNSUBACK
('s2c', b'\xb0\x04\x00\x02\x00\x00'),
# DISCONNECT
('c2s', b'\xe0\x02\x00\x00')
]

client = mqttools.Client(*self.broker.address, 'bar')
self.run_until_complete(client.start())
self.run_until_complete(client.subscribe('/a/b', 0))
self.run_until_complete(client.unsubscribe('/a/b'))
self.run_until_complete(client.stop())

def test_publish_qos_0(self):
Broker.EXPECTED_DATA_STREAM = [
# CONNECT
Expand Down

0 comments on commit bfd015b

Please sign in to comment.