Skip to content

Commit

Permalink
More broker testing.
Browse files Browse the repository at this point in the history
  • Loading branch information
eerimoq committed May 9, 2019
1 parent 93d4044 commit 17edcf9
Showing 1 changed file with 77 additions and 0 deletions.
77 changes: 77 additions & 0 deletions tests/test_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,83 @@ async def tester():

await asyncio.wait_for(asyncio.gather(broker_task, tester()), 1)

def test_resume_session(self):
asyncio.run(self.resume_session())

async def resume_session(self):
broker = mqttools.Broker('localhost', 0)

async def broker_wrapper():
with self.assertRaises(asyncio.CancelledError):
await broker.run()

broker_task = asyncio.create_task(broker_wrapper())

async def tester():
address = await broker.getsockname()

# Try to resume a session that does not exist.
reader_1, writer_1 = await asyncio.open_connection(*address)
connect = b'\x10\x10\x00\x04MQTT\x05\x00\x00\x00\x00\x00\x03res'
writer_1.write(connect)
connack = await reader_1.readexactly(7)
self.assertEqual(connack, b'\x20\x05\x00\x00\x02\x24\x00')
subscribe = b'\x82\x0a\x00\x01\x00\x00\x04/a/b\x00'
writer_1.write(subscribe)
suback = await reader_1.readexactly(5)
self.assertEqual(suback, b'\x90\x03\x00\x01\x00')
writer_1.close()

# Resume the session.
reader_1, writer_1 = await asyncio.open_connection(*address)
connect = b'\x10\x10\x00\x04MQTT\x05\x00\x00\x00\x00\x00\x03res'
writer_1.write(connect)
connack = await reader_1.readexactly(7)
self.assertEqual(connack, b'\x20\x05\x01\x00\x02\x24\x00')

# Setup a publisher.
reader_2, writer_2 = await asyncio.open_connection(*address)
connect = b'\x10\x10\x00\x04MQTT\x05\x02\x00\x00\x00\x00\x03pub'
writer_2.write(connect)
connack = await reader_2.readexactly(7)
self.assertEqual(connack, b'\x20\x05\x00\x00\x02\x24\x00')

# Publish /a/b.
publish = b'\x30\x0a\x00\x04/a/b\x00apa'
writer_2.write(publish)

# Receive /a/b in the subscriber.
publish = await reader_1.readexactly(12)
self.assertEqual(publish, b'\x30\x0a\x00\x04/a/b\x00apa')

# Perform a clean start and subscribe to /a/c.
writer_1.close()
reader_1, writer_1 = await asyncio.open_connection(*address)
connect = b'\x10\x10\x00\x04MQTT\x05\x02\x00\x00\x00\x00\x03res'
writer_1.write(connect)
connack = await reader_1.readexactly(7)
self.assertEqual(connack, b'\x20\x05\x00\x00\x02\x24\x00')
subscribe = b'\x82\x0a\x00\x01\x00\x00\x04/a/c\x00'
writer_1.write(subscribe)
suback = await reader_1.readexactly(5)
self.assertEqual(suback, b'\x90\x03\x00\x01\x00')

# Publish /a/b and then /a/c.
publish = b'\x30\x0a\x00\x04/a/b\x00apa'
writer_2.write(publish)
publish = b'\x30\x0a\x00\x04/a/c\x00apa'
writer_2.write(publish)

# Receive /a/c in the subscriber.
publish = await reader_1.readexactly(12)
self.assertEqual(publish, b'\x30\x0a\x00\x04/a/c\x00apa')

writer_1.close()
writer_2.close()
broker_task.cancel()

await asyncio.wait_for(asyncio.gather(broker_task, tester()), 1)


logging.basicConfig(level=logging.DEBUG)

Expand Down

0 comments on commit 17edcf9

Please sign in to comment.