Skip to content

Commit

Permalink
Broker continued.
Browse files Browse the repository at this point in the history
  • Loading branch information
eerimoq committed May 9, 2019
1 parent d0f5429 commit 31b0272
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 24 deletions.
23 changes: 17 additions & 6 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ MQTT Tools

MQTT tools in Python 3.7 and later.

Features:
Both the client and the broker implments MQTT version 5.0 using
``asyncio``.

- MQTT 5.0 client using ``asyncio``.
Client features:

- Subscribe to and publish QoS level 0 messages.
- Subscribe to and publish QoS level 0 topics.

- Broker session resume (or clean start support) for less initial
communication.
Expand All @@ -19,10 +20,20 @@ Features:

- Monitor, subscribe and publish command line commands.

QoS level 1 and 2 messages are not yet supprted. A client side session
state storage is required to do so.
Broker features:

MQTT 5.0 specification:
- Subscribe to and publish QoS level 0 topics. Wildcards (# and +) are
not supported.

- Session resume (or clean start support) for less initial
communication. Session state storage in RAM.

- Broker command line command.

QoS level 1 and 2 messages are not yet supprted. A session state
storage is required to do so, both in the client and the broker.

MQTT version 5.0 specification:
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html

Project homepage: https://github.com/eerimoq/mqttools
Expand Down
10 changes: 8 additions & 2 deletions mqttools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ def main():
parser = argparse.ArgumentParser(description='MQTT Tools.')

parser.add_argument('-d', '--debug', action='store_true')
parser.add_argument('-l', '--log-level',
choices=[
'debug', 'info', 'warning', 'error', 'critical'
],
help='Set the logging level.')
parser.add_argument('--version',
action='version',
version=__version__,
Expand All @@ -39,8 +44,9 @@ def main():

args = parser.parse_args()

if args.debug:
logging.basicConfig(level=logging.DEBUG)
if args.log_level:
level = logging.getLevelName(args.log_level.upper())
logging.basicConfig(level=level)

if args.debug:
args.func(args)
Expand Down
46 changes: 33 additions & 13 deletions mqttools/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
import logging
import binascii
from collections import defaultdict
import enum

from .common import ControlPacketType
from .common import DisconnectReasonCode
from .common import PropertyIds
from .common import MalformedPacketError
from .common import PayloadReader
from .common import control_packet_type_to_string
Expand All @@ -30,10 +30,19 @@ class Session(object):
def __init__(self):
self.subscribes = set()
self.expiry_time = None
self.client = None

def clean(self):
self.subscribes = set()
self.expiry_time = None
self.client = None


def validate_topic(topic):
if '#' in topic or '+' in topic:
raise MalformedPacketError(
'# and + are not supported in topic patterns.')


class Client(object):

Expand Down Expand Up @@ -65,6 +74,9 @@ async def serve_forever(self):
if isinstance(e, MalformedPacketError):
self._disconnect_reason = DisconnectReasonCode.MALFORMED_PACKET

if self._session is not None:
self._session.client = None

LOGGER.info('Closing client %r.', addr)

async def reader_loop(self):
Expand Down Expand Up @@ -111,22 +123,30 @@ def on_connect(self, payload):
self._session, session_present = self._broker.get_session(
client_id,
clean_start)
self._write_packet(pack_connack(session_present, 0, {}))
self._session.client = self
self._write_packet(pack_connack(session_present,
0,
{PropertyIds.MAXIMUM_QOS: 0}))

LOGGER.info("Client '%s' connected.", client_id)

def on_publish(self, payload):
topic, message, _ = unpack_publish(payload, 0)
validate_topic(topic)

for client in self._broker.iter_subscribers(topic):
client.publish(topic, message)
for session in self._broker.iter_subscribers(topic):
session.client.publish(topic, message)

def on_subscribe(self, payload):
topic, packet_identifier = unpack_subscribe(payload)
validate_topic(topic)
self._session.subscribes.add(topic)
self._broker.add_subscriber(topic, self._session)
self._write_packet(pack_suback(packet_identifier))

def on_unsubscribe(self, payload):
topic, packet_identifier = unpack_unsubscribe(payload)
validate_topic(topic)
self._session.subscribes.remove(topic)
self._broker.remove_subscriber(topic, self._session)
self._write_packet(pack_unsuback(packet_identifier))
Expand All @@ -150,7 +170,7 @@ def _write_packet(self, message):


class Broker(object):
"""An MQTT 5.0 broker.
"""A limited MQTT version 5.0 broker.
`host` and `port` are the host and port to listen for clients on.
Expand All @@ -163,7 +183,7 @@ def __init__(self, host, port):
self._subscribers = defaultdict(list)

async def run(self):
listener = await asyncio.start_server(self.serve_client,
listener = await asyncio.start_server(self._serve_client,
self._host,
self._port)
listener_address = listener.sockets[0].getsockname()
Expand All @@ -173,7 +193,7 @@ async def run(self):
async with listener:
await listener.serve_forever()

async def serve_client(self, reader, writer):
async def _serve_client(self, reader, writer):
client = Client(self, reader, writer)
await client.serve_forever()

Expand All @@ -190,13 +210,9 @@ def remove_subscriber(self, topic, session):
del topic_sessions[topic_sessions.index(session)]

def iter_subscribers(self, topic):
topic_sessions = self._subscribers[topic]

if session in topic_sessions:
if topic in session.subscribes:
for session in self._subscribers[topic]:
if session.client is not None:
yield session
else:
del topic_sessions[topic_sessions.index(session)]

def get_session(self, client_id, clean_start):
session_present = False
Expand All @@ -210,6 +226,10 @@ def get_session(self, client_id, clean_start):

session.clean()
else:
LOGGER.info(
"Session resumed for client '%s' with %d subscribes.",
client_id,
len(session.subscribes))
session_present = True
else:
session = Session()
Expand Down
2 changes: 1 addition & 1 deletion mqttools/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def set_completed(self, response):


class Client(object):
"""An MQTT 5.0 client.
"""An MQTT version 5.0 client.
`host` and `port` are the host and port of the broker.
Expand Down
4 changes: 2 additions & 2 deletions mqttools/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,10 +486,10 @@ def unpack_connect(payload):
unpack_u16(payload)

if payload.read(4) != b'MQTT':
raise MalformedPacketError()
raise MalformedPacketError('Invalid MQTT magic string.')

if unpack_u8(payload) != PROTOCOL_VERSION:
raise MalformedPacketError()
raise MalformedPacketError('Wrong protocol version.')

flags = unpack_u8(payload)
clean_start = bool(flags & CLEAN_START)
Expand Down
1 change: 1 addition & 0 deletions mqttools/subparsers/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@


def _do_broker(args):
print(f"Starting a broker at '{args.host}:{args.port}'.")
broker = Broker(args.host, args.port)
asyncio.run(broker.run())

Expand Down

0 comments on commit 31b0272

Please sign in to comment.