Skip to content

Commit

Permalink
A broker test.
Browse files Browse the repository at this point in the history
  • Loading branch information
eerimoq committed May 9, 2019
1 parent 75676e4 commit 8603613
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 7 deletions.
1 change: 1 addition & 0 deletions mqttools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from .client import SessionResumeError
from .client import PublishError
from .client import QoS
from .broker import Broker


def main():
Expand Down
22 changes: 15 additions & 7 deletions mqttools/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,19 +177,27 @@ def __init__(self, host, port):
self._port = port
self._sessions = {}
self._subscribers = defaultdict(list)
self._listener = None
self._listener_ready = asyncio.Event()

async def getsockname(self):
await self._listener_ready.wait()

return self._listener.sockets[0].getsockname()

async def run(self):
listener = await asyncio.start_server(self._serve_client,
self._host,
self._port)
listener_address = listener.sockets[0].getsockname()
self._listener = await asyncio.start_server(self.serve_client,
self._host,
self._port)
self._listener_ready.set()
listener_address = self._listener.sockets[0].getsockname()

LOGGER.info(f'Listening for clients on {listener_address}.')

async with listener:
await listener.serve_forever()
async with self._listener:
await self._listener.serve_forever()

async def _serve_client(self, reader, writer):
async def serve_client(self, reader, writer):
client = Client(self, reader, writer)
await client.serve_forever()

Expand Down
74 changes: 74 additions & 0 deletions tests/test_broker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import logging
import asyncio
import unittest

import mqttools


class BrokerTest(unittest.TestCase):

def test_publish_to_two_subscribers(self):
asyncio.run(self.publish_to_two_subscribers())

async def publish_to_two_subscribers(self):
broker = mqttools.Broker('localhost', 0)

async def broker_wrapper():
with self.assertRaises(asyncio.CancelledError):
await broker.run()

broker_task = asyncio.create_task(broker_wrapper())

async def tester():
address = await broker.getsockname()

# Setup subscriber 1.
reader_1, writer_1 = await asyncio.open_connection(*address)
connect = b'\x10\x10\x00\x04MQTT\x05\x02\x00\x00\x00\x00\x03su1'
writer_1.write(connect)
connack = await reader_1.readexactly(7)
self.assertEqual(connack, b'\x20\x05\x00\x00\x02\x24\x00')
subscribe = b'\x82\x0a\x00\x01\x00\x00\x04/a/b\x00'
writer_1.write(subscribe)
suback = await reader_1.readexactly(5)
self.assertEqual(suback, b'\x90\x03\x00\x01\x00')

# Setup subscriber 2.
reader_2, writer_2 = await asyncio.open_connection(*address)
connect = b'\x10\x10\x00\x04MQTT\x05\x02\x00\x00\x00\x00\x03su2'
writer_2.write(connect)
connack = await reader_2.readexactly(7)
self.assertEqual(connack, b'\x20\x05\x00\x00\x02\x24\x00')
subscribe = b'\x82\x0a\x00\x01\x00\x00\x04/a/b\x00'
writer_2.write(subscribe)
suback = await reader_2.readexactly(5)
self.assertEqual(suback, b'\x90\x03\x00\x01\x00')

# Setup a publisher.
reader_3, writer_3 = await asyncio.open_connection(*address)
connect = b'\x10\x10\x00\x04MQTT\x05\x02\x00\x00\x00\x00\x03pub'
writer_3.write(connect)
connack = await reader_3.readexactly(7)
self.assertEqual(connack, b'\x20\x05\x00\x00\x02\x24\x00')
publish = b'\x30\x0a\x00\x04/a/b\x00apa'
writer_3.write(publish)

# Receive the publish in both subscribers.
publish = await reader_1.readexactly(12)
self.assertEqual(publish, b'\x30\x0a\x00\x04/a/b\x00apa')
publish = await reader_2.readexactly(12)
self.assertEqual(publish, b'\x30\x0a\x00\x04/a/b\x00apa')

writer_1.close()
writer_2.close()
writer_3.close()
broker_task.cancel()

await asyncio.wait_for(asyncio.gather(broker_task, tester()), 1)


logging.basicConfig(level=logging.DEBUG)


if __name__ == '__main__':
unittest.main()

0 comments on commit 8603613

Please sign in to comment.