Skip to content

Commit

Permalink
Broker-clients example and renamed broker run() to serve_forever().
Browse files Browse the repository at this point in the history
  • Loading branch information
eerimoq committed May 11, 2019
1 parent 90235d4 commit 24a39cd
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 5 deletions.
95 changes: 95 additions & 0 deletions examples/broker_and_clients.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
"""
+--------+ +--------+ +-------------+
| |--- ping -->| |--- ping -->| |
| client | | broker | | echo client |
| |<-- pong ---| |<-- pong ---| |
+--------+ +--------+ +-------------+
"""

import time
import asyncio
import mqttools


BROKER_PORT = 10008


async def start_client():
client = mqttools.Client('localhost', BROKER_PORT)

while True:
try:
await client.start()
break
except:
print('Client start failed. Retrying...')
await asyncio.sleep(0.2)

return client


async def client_main():
"""Publish the current time to /ping and wait for the echo client to
publish it back on /pong, with a one second interval.
"""

client = await start_client()
await client.subscribe('/pong')

while True:
print()
message = str(int(time.time())).encode('ascii')
print(f'client: Publishing {message} on /ping.')
client.publish('/ping', message)
topic, message = await client.messages.get()
print(f'client: Got {message} on {topic}.')

if topic is None:
print('Client connection lost.')
break

await asyncio.sleep(1)


async def echo_client_main():
"""Wait for the client to publish to /ping, and publish /pong in
response.
"""

client = await start_client()
await client.subscribe('/ping')

while True:
topic, message = await client.messages.get()
print(f'echo_client: Got {message} on {topic}.')

if topic is None:
print('Echo client connection lost.')
break

print(f'echo_client: Publishing {message} on /pong.')
client.publish('/pong', message)


async def broker_main():
"""The broker, serving both clients, forever.
"""

broker = mqttools.Broker('localhost', BROKER_PORT)
await broker.serve_forever()


async def main():
await asyncio.gather(
broker_main(),
echo_client_main(),
client_main()
)


asyncio.run(main())
2 changes: 1 addition & 1 deletion mqttools/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ async def getsockname(self):

return self._listener.sockets[0].getsockname()

async def run(self):
async def serve_forever(self):
"""Setup a listener socket and forever serve clients. This coroutine
only ends if cancelled by the user.
Expand Down
8 changes: 4 additions & 4 deletions tests/test_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ async def multiple_subscribers(self):

async def broker_wrapper():
with self.assertRaises(asyncio.CancelledError):
await broker.run()
await broker.serve_forever()

broker_task = asyncio.create_task(broker_wrapper())

Expand Down Expand Up @@ -95,7 +95,7 @@ async def unsubscribe(self):

async def broker_wrapper():
with self.assertRaises(asyncio.CancelledError):
await broker.run()
await broker.serve_forever()

broker_task = asyncio.create_task(broker_wrapper())

Expand Down Expand Up @@ -197,7 +197,7 @@ async def resume_session(self):

async def broker_wrapper():
with self.assertRaises(asyncio.CancelledError):
await broker.run()
await broker.serve_forever()

broker_task = asyncio.create_task(broker_wrapper())

Expand Down Expand Up @@ -274,7 +274,7 @@ async def ping(self):

async def broker_wrapper():
with self.assertRaises(asyncio.CancelledError):
await broker.run()
await broker.serve_forever()

broker_task = asyncio.create_task(broker_wrapper())

Expand Down

0 comments on commit 24a39cd

Please sign in to comment.