Skip to content

Commit

Permalink
SSL support in broker.
Browse files Browse the repository at this point in the history
  • Loading branch information
eerimoq committed Aug 13, 2019
1 parent 1b5f643 commit acd1991
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 13 deletions.
43 changes: 37 additions & 6 deletions mqttools/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,37 +307,52 @@ def log_info(self, fmt, *args):
class Broker(object):
"""A limited MQTT version 5.0 broker.
`host` and `port` are the host and port to listen for clients on.
`host`, `port` and `secure_port` are the host and ports to listen
for clients on.
`kwargs` are passed to ``asyncio.start_server()``.
`secure_ssl` is an SSL context passed to `asyncio.start_server()`
as `ssl`.
"""

def __init__(self, host, port, **kwargs):
def __init__(self, host, port=1883, secure_port=8883, secure_ssl=None):
self._host = host
self._port = port
self._kwargs = kwargs
self._secure_port = secure_port
self._secure_ssl = secure_ssl
self._sessions = {}
self._subscribers = defaultdict(list)
self._wildcard_subscribers = []
self._listener = None
self._listener_ready = asyncio.Event()
self._secure_listener = None
self._secure_listener_ready = asyncio.Event()

async def getsockname(self):
await self._listener_ready.wait()

return self._listener.sockets[0].getsockname()

async def secure_getsockname(self):
await self._secure_listener_ready.wait()

return self._secure_listener.sockets[0].getsockname()

async def serve_forever(self):
"""Setup a listener socket and forever serve clients. This coroutine
only ends if cancelled by the user.
"""

await asyncio.gather(
self.insecure_serve_forever(),
self.secure_serve_forever()
)

async def insecure_serve_forever(self):
self._listener = await asyncio.start_server(self.serve_client,
self._host,
self._port,
**self._kwargs)
self._port)
self._listener_ready.set()
listener_address = self._listener.sockets[0].getsockname()

Expand All @@ -346,6 +361,22 @@ async def serve_forever(self):
async with self._listener:
await self._listener.serve_forever()

async def secure_serve_forever(self):
if self._secure_ssl is None:
return

self._secure_listener = await asyncio.start_server(self.serve_client,
self._host,
self._secure_port,
ssl=self._secure_ssl)
self._secure_listener_ready.set()
listener_address = self._secure_listener.sockets[0].getsockname()

LOGGER.info('Listening for secure clients on %s.', listener_address)

async with self._secure_listener:
await self._secure_listener.serve_forever()

async def serve_client(self, reader, writer):
client = Client(self, reader, writer)
await client.serve_forever()
Expand Down
16 changes: 10 additions & 6 deletions mqttools/subparsers/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ def _do_broker(args):
print(f"CA File: '{args.cafile}'")
print(f"Check hostname: {not args.no_check_hostname}")

ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH,
cafile=args.cafile)
ssl_context.check_hostname = not args.no_check_hostname
ssl_context.load_cert_chain(certfile=args.certfile, keyfile=args.keyfile)
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH,
cafile=args.cafile)
context.check_hostname = not args.no_check_hostname
context.load_cert_chain(certfile=args.certfile, keyfile=args.keyfile)
else:
ssl_context = None
context = None

broker = Broker(args.host, args.port, ssl=ssl_context)
broker = Broker(args.host, args.port, args.secure_port, secure_ssl=context)
asyncio.run(broker.serve_forever())


Expand All @@ -34,6 +34,10 @@ def add_subparser(subparsers):
type=int,
default=1883,
help='Broker port (default: %(default)s).')
subparser.add_argument('--secure-port',
type=int,
default=8883,
help='Secure broker port (default: %(default)s).')
subparser.add_argument(
'--cafile',
default='',
Expand Down
2 changes: 1 addition & 1 deletion mqttools/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.29.0'
__version__ = '0.30.0'

0 comments on commit acd1991

Please sign in to comment.