Skip to content

Commit

Permalink
Logging per client in the broker.
Browse files Browse the repository at this point in the history
  • Loading branch information
eerimoq committed May 10, 2019
1 parent e7fd17e commit 399b741
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 48 deletions.
67 changes: 43 additions & 24 deletions mqttools/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from .common import pack_unsuback
from .common import pack_pingresp
from .common import unpack_disconnect
from .common import format_packet


LOGGER = logging.getLogger(__name__)
Expand All @@ -32,7 +33,8 @@ class DisconnectError(Exception):

class Session(object):

def __init__(self):
def __init__(self, client_id):
self.client_id = client_id
self.subscribes = set()
self.expiry_time = None
self.client = None
Expand Down Expand Up @@ -60,7 +62,7 @@ def __init__(self, broker, reader, writer):
async def serve_forever(self):
addr = self._writer.get_extra_info('peername')

LOGGER.info('Serving client %r.', addr)
self.log_info('Serving client %r.', addr)

try:
packet_type, _, payload = await self.read_packet()
Expand All @@ -75,14 +77,14 @@ async def serve_forever(self):
except DisconnectError:
pass
except asyncio.IncompleteReadError:
LOGGER.debug('Client connection lost.')
self.log_debug('Client connection lost.')
except Exception as e:
LOGGER.debug('Reader task stopped by %r.', e)
self.log_debug('Reader task stopped by %r.', e)

if self._session is not None:
self._session.client = None

LOGGER.info('Closing client %r.', addr)
self.log_info('Closing client %r.', addr)

async def reader_loop(self):
while True:
Expand Down Expand Up @@ -118,9 +120,8 @@ async def read_packet(self):
data = await self._reader.readexactly(size)

if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug("Received %s packet %s.",
control_packet_type_to_string(packet_type),
binascii.hexlify(buf + data))
for line in format_packet('Received', buf + data):
self.log_debug(line)

return packet_type, flags, PayloadReader(data)

Expand All @@ -130,12 +131,17 @@ def on_connect(self, payload):
self._session, session_present = self._broker.get_session(
client_id,
clean_start)

if session_present:
self.log_info('Session resumed with %d subscribes.',
len(self._session.subscribes))

self._session.client = self
self._write_packet(pack_connack(session_present,
0,
{PropertyIds.MAXIMUM_QOS: 0}))

LOGGER.info("Client '%s' connected.", client_id)
self.log_info('Client connected.')

def on_publish(self, payload):
topic, message, _ = unpack_publish(payload, 0)
Expand All @@ -145,7 +151,7 @@ def on_publish(self, payload):
session.client.publish(topic, message)

def on_subscribe(self, payload):
topics, packet_identifier = unpack_subscribe(payload)
packet_identifier, topics = unpack_subscribe(payload)

for topic in topics:
validate_topic(topic)
Expand All @@ -157,10 +163,15 @@ def on_subscribe(self, payload):
self._write_packet(pack_suback(packet_identifier))

def on_unsubscribe(self, payload):
packet_identifier, topic = unpack_unsubscribe(payload)
validate_topic(topic)
self._session.subscribes.remove(topic)
self._broker.remove_subscriber(topic, self._session)
packet_identifier, topics = unpack_unsubscribe(payload)

for topic in topics:
validate_topic(topic)

for topic in topics:
self._session.subscribes.remove(topic)
self._broker.remove_subscriber(topic, self._session)

self._write_packet(pack_unsuback(packet_identifier))

def on_pingreq(self):
Expand All @@ -176,13 +187,25 @@ def publish(self, topic, message):

def _write_packet(self, message):
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug(
"Sending %s packet %s.",
control_packet_type_to_string(unpack_packet_type(message)),
binascii.hexlify(message))
for line in format_packet('Sending', message):
self.log_debug(line)

self._writer.write(message)

def log_debug(self, fmt, *args):
if LOGGER.isEnabledFor(logging.DEBUG):
if self._session is None:
LOGGER.debug(fmt, *args)
else:
LOGGER.debug(f'{self._session.client_id} {fmt}', *args)

def log_info(self, fmt, *args):
if LOGGER.isEnabledFor(logging.INFO):
if self._session is None:
LOGGER.info(fmt, *args)
else:
LOGGER.info(f'{self._session.client_id} {fmt}', *args)


class Broker(object):
"""A limited MQTT version 5.0 broker.
Expand Down Expand Up @@ -211,7 +234,7 @@ async def run(self):
self._listener_ready.set()
listener_address = self._listener.sockets[0].getsockname()

LOGGER.info(f'Listening for clients on {listener_address}.')
LOGGER.info('Listening for clients on %s.', listener_address)

async with self._listener:
await self._listener.serve_forever()
Expand Down Expand Up @@ -249,13 +272,9 @@ def get_session(self, client_id, clean_start):

session.clean()
else:
LOGGER.info(
"Session resumed for client '%s' with %d subscribes.",
client_id,
len(session.subscribes))
session_present = True
else:
session = Session()
session = Session(client_id)
self._sessions[client_id] = session

return session, session_present
12 changes: 5 additions & 7 deletions mqttools/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from .common import Error
from .common import MalformedPacketError
from .common import PayloadReader
from .common import format_packet


LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -535,10 +536,8 @@ async def _keep_alive_main(self):

def _write_packet(self, message):
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug(
"Sending %s packet %s.",
control_packet_type_to_string(unpack_packet_type(message)),
binascii.hexlify(message))
for line in format_packet('Sending', message):
LOGGER.debug(line)

self._writer.write(message)

Expand All @@ -558,9 +557,8 @@ async def _read_packet(self):
data = await self._reader.readexactly(size)

if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug("Received %s packet %s.",
control_packet_type_to_string(packet_type),
binascii.hexlify(buf + data))
for line in format_packet('Received', buf + data):
LOGGER.debug(line)

return packet_type, flags, PayloadReader(data)

Expand Down
143 changes: 126 additions & 17 deletions mqttools/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import enum
from io import BytesIO
import bitstruct
from binascii import hexlify


# Connection flags.
Expand Down Expand Up @@ -293,19 +294,7 @@ def pack_property(property_id, value):
}[property_id](value)


def log_properties(packet_name, properties):
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug('%s properties:', packet_name)

for identifier, value in properties.items():
LOGGER.debug(' %s(%d): %s',
identifier.name,
identifier.value,
value)


def pack_properties(packet_name, properties):
log_properties(packet_name, properties)
packed = b''

for property_id, value in properties.items():
Expand Down Expand Up @@ -367,8 +356,6 @@ def unpack_properties(packet_name,
property_id = PropertyIds(property_id)
properties[property_id] = unpack_property(property_id, payload)

log_properties(packet_name, properties)

return properties


Expand Down Expand Up @@ -622,7 +609,7 @@ def unpack_subscribe(payload):
topics.append(unpack_string(payload))
unpack_u8(payload)

return topics, packet_identifier
return packet_identifier, topics


def pack_suback(packet_identifier):
Expand Down Expand Up @@ -662,9 +649,12 @@ def pack_unsubscribe(topic, packet_identifier):
def unpack_unsubscribe(payload):
packet_identifier = unpack_u16(payload)
unpack_u8(payload)
topic = unpack_string(payload)
topics = []

return packet_identifier, topic
while payload.is_data_available():
topics.append(unpack_string(payload))

return packet_identifier, topics


def pack_unsuback(packet_identifier):
Expand Down Expand Up @@ -735,3 +725,122 @@ def pack_pingreq():

def pack_pingresp():
return pack_fixed_header(ControlPacketType.PINGRESP, 0, 0)


def format_properties(properties):
if not properties:
return []

lines = [' Properties:']

for identifier, value in properties.items():
lines.append(f' {identifier.name}({identifier.value}): {value}')

return lines


def format_connect(payload):
client_id, clean_start, keep_alive_s, properties = unpack_connect(payload)

return [
f' ClientId: {client_id}',
f' CleanStart: {clean_start}',
f' KeepAlive: {keep_alive_s}'
] + format_properties(properties)


def format_connack(payload):
session_present, reason, properties = unpack_connack(payload)

return [
f' SessionPresent: {session_present}',
f' Reason: {reason}'
] + format_properties(properties)


def format_publish(payload):
topic, message, properties = unpack_publish(payload, 0)

return [
f' Topic: {topic}',
f' Message: {message}',
f' Properties:'
] + format_properties(properties)


def format_subscribe(payload):
packet_identifier, topics = unpack_subscribe(payload)

return [
f' PacketIdentifier: {packet_identifier}',
f' Topics:'
] + [f' {topic}' for topic in topics]


def format_suback(payload):
packet_identifier, properties = unpack_suback(payload)

return [
f' PacketIdentifier: {packet_identifier}',
f' Properties:'
] + format_properties(properties)


def format_unsubscribe(payload):
packet_identifier, topics = unpack_unsubscribe(payload)

return [
f' PacketIdentifier: {packet_identifier}',
f' Topics:'
] + [f' {topic}' for topic in topics]


def format_unsuback(payload):
packet_identifier, properties = unpack_unsuback(payload)

return [
f' PacketIdentifier: {packet_identifier}',
f' Properties:'
] + format_properties(properties)


def format_disconnect(payload):
reason, properties = unpack_disconnect(payload)

return [
f' Reason: {reason}',
f' Properties:'
] + format_properties(properties)


def format_packet(prefix, packet):
lines = []

try:
packet_type = unpack_packet_type(packet)
payload = PayloadReader(packet[1:])
size = unpack_variable_integer(payload)
packet_kind = packet_type.name
lines.append(
f'{prefix} {packet_kind}({packet_type.value}) packet {hexlify(packet)}')

if packet_kind == 'CONNECT':
lines += format_connect(payload)
elif packet_kind == 'CONNACK':
lines += format_connack(payload)
elif packet_kind == 'PUBLISH':
lines += format_publish(payload)
elif packet_kind == 'SUBSCRIBE':
lines += format_subscribe(payload)
elif packet_kind == 'SUBACK':
lines += format_suback(payload)
elif packet_kind == 'UNSUBSCRIBE':
lines += format_unsubscribe(payload)
elif packet_kind == 'UNSUBACK':
lines += format_unsuback(payload)
elif packet_kind == 'DISCONNECT':
lines += format_disconnect(payload)
except Exception:
lines.append(' *** Malformed packet ***')

return lines

0 comments on commit 399b741

Please sign in to comment.