Skip to content

Commit

Permalink
Reason codes in SUBACK and UNSUBACK.
Browse files Browse the repository at this point in the history
  • Loading branch information
eerimoq committed May 11, 2019
1 parent 09ee771 commit 4ce9449
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 58 deletions.
43 changes: 28 additions & 15 deletions mqttools/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from collections import defaultdict

from .common import ControlPacketType
from .common import SubackReasonCode
from .common import UnsubackReasonCode
from .common import PropertyIds
from .common import MalformedPacketError
from .common import PayloadReader
Expand Down Expand Up @@ -41,10 +43,8 @@ def clean(self):
self.client = None


def validate_topic(topic):
if '#' in topic or '+' in topic:
raise MalformedPacketError(
'# and + are not supported in topic patterns.')
def is_valid_topic(topic):
return '#' not in topic and '+' not in topic


class Client(object):
Expand Down Expand Up @@ -141,34 +141,47 @@ def on_connect(self, payload):

def on_publish(self, payload):
topic, message, _ = unpack_publish(payload, 0)
validate_topic(topic)

if not is_valid_topic(topic):
raise MalformedPacketError('Invalid topic in publish.')

for session in self._broker.iter_subscribers(topic):
session.client.publish(topic, message)

def on_subscribe(self, payload):
packet_identifier, topics = unpack_subscribe(payload)
reasons = bytearray()

for topic in topics:
validate_topic(topic)
if is_valid_topic(topic):
self._session.subscribes.add(topic)
self._broker.add_subscriber(topic, self._session)
reason = SubackReasonCode.GRANTED_QOS_0
else:
reason = SubackReasonCode.IMPLEMENTATION_SPECIFIC_ERROR

for topic in topics:
self._session.subscribes.add(topic)
self._broker.add_subscriber(topic, self._session)
reasons.append(reason)

self._write_packet(pack_suback(packet_identifier))
self._write_packet(pack_suback(packet_identifier, reasons))

def on_unsubscribe(self, payload):
packet_identifier, topics = unpack_unsubscribe(payload)
reasons = bytearray()

for topic in topics:
validate_topic(topic)
if is_valid_topic(topic):
if topic in self._session.subscribes:
self._session.subscribes.remove(topic)
self._broker.remove_subscriber(topic, self._session)
reason = UnsubackReasonCode.SUCCESS
else:
reason = UnsubackReasonCode.NO_SUBSCRIPTION_EXISTED
else:
reason = UnsubackReasonCode.IMPLEMENTATION_SPECIFIC_ERROR

for topic in topics:
self._session.subscribes.remove(topic)
self._broker.remove_subscriber(topic, self._session)
reasons.append(reason)

self._write_packet(pack_unsuback(packet_identifier))
self._write_packet(pack_unsuback(packet_identifier, reasons))

def on_pingreq(self):
self._write_packet(pack_pingresp())
Expand Down
4 changes: 2 additions & 2 deletions mqttools/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ async def on_publish(self, flags, payload):
await self._messages.put((topic, message))

def on_suback(self, payload):
packet_identifier, properties = unpack_suback(payload)
packet_identifier, properties, _ = unpack_suback(payload)

if packet_identifier in self.transactions:
self.transactions[packet_identifier].set_completed(None)
Expand All @@ -449,7 +449,7 @@ def on_suback(self, payload):
packet_identifier)

def on_unsuback(self, payload):
packet_identifier, properties = unpack_unsuback(payload)
packet_identifier, properties, _ = unpack_unsuback(payload)

if packet_identifier in self.transactions:
self.transactions[packet_identifier].set_completed(None)
Expand Down
97 changes: 77 additions & 20 deletions mqttools/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,40 @@ class PubrecReasonCode(enum.IntEnum):


class PubrelReasonCode(enum.IntEnum):
SUCCESS = 0
SUCCESS = 0
PACKET_IDENTIFIER_NOT_FOUND = 146


class PubcompReasonCode(enum.IntEnum):
SUCCESS = 0
SUCCESS = 0
PACKET_IDENTIFIER_NOT_FOUND = 146


class SubackReasonCode(enum.IntEnum):
GRANTED_QOS_0 = 0
GRANTED_QOS_1 = 1
GRANTED_QOS_2 = 2
UNSPECIFIED_ERROR = 128
IMPLEMENTATION_SPECIFIC_ERROR = 131
NOT_AUTHORIZED = 135
TOPIC_FILTER_INVALID = 143
PACKET_IDENTIFIER_IN_USE = 145
QUOTA_EXCEEDED = 151
SHARED_SUBSCRIPTIONS_NOT_SUPPORTED = 158
SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED = 161
WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED = 162


class UnsubackReasonCode(enum.IntEnum):
SUCCESS = 0
NO_SUBSCRIPTION_EXISTED = 17
UNSPECIFIED_ERROR = 128
IMPLEMENTATION_SPECIFIC_ERROR = 131
NOT_AUTHORIZED = 135
TOPIC_FILTER_INVALID = 143
PACKET_IDENTIFIER_IN_USE = 145


class PropertyIds(enum.IntEnum):
PAYLOAD_FORMAT_INDICATOR = 1
MESSAGE_EXPIRY_INTERVAL = 2
Expand Down Expand Up @@ -603,13 +628,14 @@ def unpack_subscribe(payload):
return packet_identifier, topics


def pack_suback(packet_identifier):
def pack_suback(packet_identifier, reasons):
properties = pack_properties('SUBACK', {})
packed = pack_fixed_header(ControlPacketType.SUBACK,
0,
2 + len(properties))
2 + len(properties) + len(reasons))
packed += pack_u16(packet_identifier)
packed += properties
packed += reasons

return packed

Expand All @@ -622,8 +648,19 @@ def unpack_suback(payload):
PropertyIds.USER_PROPERTY
],
payload)
reasons = []

while payload.is_data_available():
reason = unpack_u8(payload)

try:
reason = SubackReasonCode(reason)
except ValueError:
pass

return packet_identifier, properties
reasons.append(reason)

return packet_identifier, properties, reasons


def pack_unsubscribe(topic, packet_identifier):
Expand All @@ -648,10 +685,11 @@ def unpack_unsubscribe(payload):
return packet_identifier, topics


def pack_unsuback(packet_identifier):
packed = pack_fixed_header(ControlPacketType.UNSUBACK, 0, 3)
def pack_unsuback(packet_identifier, reasons):
packed = pack_fixed_header(ControlPacketType.UNSUBACK, 0, 3 + len(reasons))
packed += pack_u16(packet_identifier)
packed += pack_properties('UNSUBACK', {})
packed += reasons

return packed

Expand All @@ -664,8 +702,19 @@ def unpack_unsuback(payload):
PropertyIds.USER_PROPERTY
],
payload)
reasons = []

while payload.is_data_available():
reason = unpack_u8(payload)

try:
reason = UnsubackReasonCode(reason)
except ValueError:
pass

return packet_identifier, properties
reasons.append(reason)

return packet_identifier, properties, reasons


def pack_publish(topic, message, alias):
Expand Down Expand Up @@ -745,7 +794,7 @@ def format_connack(payload):

return [
f' SessionPresent: {session_present}',
f' Reason: {reason}'
f' Reason: {reason.name}({reason.value})'
] + format_properties(properties)


Expand All @@ -755,7 +804,7 @@ def format_publish(payload):
return [
f' Topic: {topic}',
f' Message: {message}',
f' Properties:'
' Properties:'
] + format_properties(properties)


Expand All @@ -764,43 +813,51 @@ def format_subscribe(payload):

return [
f' PacketIdentifier: {packet_identifier}',
f' Topics:'
' Topics:'
] + [f' {topic}' for topic in topics]


def format_suback(payload):
packet_identifier, properties = unpack_suback(payload)
packet_identifier, properties, reasons = unpack_suback(payload)

return [
f' PacketIdentifier: {packet_identifier}',
f' Properties:'
] + format_properties(properties)
' Properties:'
] + format_properties(properties) + [
' Reasons:'
] + [
f' {reason.name}({reason.value})' for reason in reasons
]


def format_unsubscribe(payload):
packet_identifier, topics = unpack_unsubscribe(payload)

return [
f' PacketIdentifier: {packet_identifier}',
f' Topics:'
' Topics:'
] + [f' {topic}' for topic in topics]


def format_unsuback(payload):
packet_identifier, properties = unpack_unsuback(payload)
packet_identifier, properties, reasons = unpack_unsuback(payload)

return [
f' PacketIdentifier: {packet_identifier}',
f' Properties:'
] + format_properties(properties)
' Properties:'
] + format_properties(properties) + [
' Reasons:'
] + [
f' {reason.name}({reason.value})' for reason in reasons
]


def format_disconnect(payload):
reason, properties = unpack_disconnect(payload)

return [
f' Reason: {reason}',
f' Properties:'
f' Reason: {reason.name}({reason.value})',
' Properties:'
] + format_properties(properties)


Expand Down

0 comments on commit 4ce9449

Please sign in to comment.