Skip to content

Commit

Permalink
More broker testing.
Browse files Browse the repository at this point in the history
  • Loading branch information
eerimoq committed May 9, 2019
1 parent 8603613 commit 93d4044
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 5 deletions.
2 changes: 1 addition & 1 deletion mqttools/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def on_subscribe(self, payload):
self._write_packet(pack_suback(packet_identifier))

def on_unsubscribe(self, payload):
topic, packet_identifier = unpack_unsubscribe(payload)
packet_identifier, topic = unpack_unsubscribe(payload)
validate_topic(topic)
self._session.subscribes.remove(topic)
self._broker.remove_subscriber(topic, self._session)
Expand Down
2 changes: 1 addition & 1 deletion mqttools/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ def unpack_unsubscribe(payload):


def pack_unsuback(packet_identifier):
packed = pack_fixed_header(ControlPacketType.UNSUBSCRIBE, 0, 3)
packed = pack_fixed_header(ControlPacketType.UNSUBACK, 0, 3)
packed += pack_u16(packet_identifier)
packed += pack_properties('UNSUBACK', {})

Expand Down
88 changes: 85 additions & 3 deletions tests/test_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@

class BrokerTest(unittest.TestCase):

def test_publish_to_two_subscribers(self):
asyncio.run(self.publish_to_two_subscribers())
def test_multiple_subscribers(self):
asyncio.run(self.multiple_subscribers())

async def publish_to_two_subscribers(self):
async def multiple_subscribers(self):
broker = mqttools.Broker('localhost', 0)

async def broker_wrapper():
Expand Down Expand Up @@ -50,6 +50,8 @@ async def tester():
writer_3.write(connect)
connack = await reader_3.readexactly(7)
self.assertEqual(connack, b'\x20\x05\x00\x00\x02\x24\x00')

# Publish a topic.
publish = b'\x30\x0a\x00\x04/a/b\x00apa'
writer_3.write(publish)

Expand All @@ -59,13 +61,93 @@ async def tester():
publish = await reader_2.readexactly(12)
self.assertEqual(publish, b'\x30\x0a\x00\x04/a/b\x00apa')

# Cleanly disconnect subscriber 1.
disconnect = b'\xe0\x02\x00\x00'
writer_1.write(disconnect)
writer_1.close()

# Publish another topic, now only to subscriber 2.
publish = b'\x30\x0a\x00\x04/a/b\x00boo'
writer_3.write(publish)

# Receive the publish in subscriber 2.
publish = await reader_2.readexactly(12)
self.assertEqual(publish, b'\x30\x0a\x00\x04/a/b\x00boo')

writer_2.close()
writer_3.close()
broker_task.cancel()

await asyncio.wait_for(asyncio.gather(broker_task, tester()), 1)

def test_unsubscribe(self):
asyncio.run(self.unsubscribe())

async def unsubscribe(self):
broker = mqttools.Broker('localhost', 0)

async def broker_wrapper():
with self.assertRaises(asyncio.CancelledError):
await broker.run()

broker_task = asyncio.create_task(broker_wrapper())

async def tester():
address = await broker.getsockname()

# Setup the subscriber. Subscribe to /a/b and /a/c.
reader_1, writer_1 = await asyncio.open_connection(*address)
connect = b'\x10\x10\x00\x04MQTT\x05\x02\x00\x00\x00\x00\x03su1'
writer_1.write(connect)
connack = await reader_1.readexactly(7)
self.assertEqual(connack, b'\x20\x05\x00\x00\x02\x24\x00')
subscribe = b'\x82\x0a\x00\x01\x00\x00\x04/a/b\x00'
writer_1.write(subscribe)
suback = await reader_1.readexactly(5)
self.assertEqual(suback, b'\x90\x03\x00\x01\x00')
subscribe = b'\x82\x0a\x00\x01\x00\x00\x04/a/c\x00'
writer_1.write(subscribe)
suback = await reader_1.readexactly(5)
self.assertEqual(suback, b'\x90\x03\x00\x01\x00')

# Setup a publisher.
reader_2, writer_2 = await asyncio.open_connection(*address)
connect = b'\x10\x10\x00\x04MQTT\x05\x02\x00\x00\x00\x00\x03pub'
writer_2.write(connect)
connack = await reader_2.readexactly(7)
self.assertEqual(connack, b'\x20\x05\x00\x00\x02\x24\x00')

# Publish /a/b.
publish = b'\x30\x0a\x00\x04/a/b\x00apa'
writer_2.write(publish)

# Receive /a/b in the subscriber.
publish = await reader_1.readexactly(12)
self.assertEqual(publish, b'\x30\x0a\x00\x04/a/b\x00apa')

# Unsubscribe from /a/b.
unsubscribe = b'\xa2\x09\x00\x02\x00\x00\x04/a/b'
writer_1.write(unsubscribe)
unsuback = await reader_1.readexactly(5)
self.assertEqual(unsuback, b'\xb0\x03\x00\x02\x00')

# Publish /a/b and then /a/c, /a/b should not be received
# by the subscriber.
publish = b'\x30\x0a\x00\x04/a/b\x00apa'
writer_2.write(publish)
publish = b'\x30\x0a\x00\x04/a/c\x00apa'
writer_2.write(publish)

# Receive /a/c in the subscriber.
publish = await reader_1.readexactly(12)
self.assertEqual(publish, b'\x30\x0a\x00\x04/a/c\x00apa')

writer_1.close()
writer_2.close()
broker_task.cancel()

await asyncio.wait_for(asyncio.gather(broker_task, tester()), 1)


logging.basicConfig(level=logging.DEBUG)

Expand Down

0 comments on commit 93d4044

Please sign in to comment.