Skip to content

Commit

Permalink
Merge pull request #42 from brentru/variable-headers-large-payload
Browse files Browse the repository at this point in the history
Fix publishing large payloads (>127 bytes)
  • Loading branch information
brentru committed Jul 3, 2020
2 parents 6ed8b95 + 2ce4f09 commit eb07565
Showing 1 changed file with 46 additions and 36 deletions.
82 changes: 46 additions & 36 deletions adafruit_minimqtt/adafruit_minimqtt.py
Expand Up @@ -62,11 +62,13 @@
MQTT_PINGRESP = const(0xD0)
MQTT_SUB = b"\x82"
MQTT_UNSUB = b"\xA2"
MQTT_PUB = bytearray(b"\x30\0")
# Variable CONNECT header [MQTT 3.1.2]
MQTT_VAR_HEADER = bytearray(b"\x04MQTT\x04\x02\0\0")
MQTT_PUB = bytearray(b"\x30")
MQTT_DISCONNECT = b"\xe0\0"

# Variable CONNECT header [MQTT 3.1.2]
MQTT_HDR_CONNECT = bytearray(b"\x04MQTT\x04\x02\0\0")


CONNACK_ERRORS = {
const(0x01): "Connection Refused - Incorrect Protocol Version",
const(0x02): "Connection Refused - ID Rejected",
Expand Down Expand Up @@ -301,8 +303,11 @@ def connect(self, clean_session=True):
fixed_header = bytearray()
fixed_header.append(0x10)

# NOTE: Variable header is
# MQTT_HDR_CONNECT = bytearray(b"\x04MQTT\x04\x02\0\0")
# because final 4 bytes are 4, 2, 0, 0
# Variable Header
var_header = MQTT_VAR_HEADER
var_header = MQTT_HDR_CONNECT
var_header[6] = clean_session << 1

# Set up variable header and remaining_length
Expand Down Expand Up @@ -427,7 +432,6 @@ def publish(self, topic, msg, retain=False, qos=0):
.. code-block:: python
mqtt_client.publish('topics/piVal', 'threepointonefour')
"""
self.is_connected()
self._check_topic(topic)
Expand All @@ -443,42 +447,52 @@ def publish(self, topic, msg, retain=False, qos=0):
else:
raise MMQTTException("Invalid message data type.")
if len(msg) > MQTT_MSG_MAX_SZ:
raise MMQTTException("Message size larger than %db." % MQTT_MSG_MAX_SZ)
self._check_qos(qos)
pkt = MQTT_PUB
pkt[0] |= qos << 1 | retain
sz = 2 + len(topic) + len(msg)
raise MMQTTException("Message size larger than %d bytes." % MQTT_MSG_MAX_SZ)
assert (
0 <= qos <= 1
), "Quality of Service Level 2 is unsupported by this library."

pub_hdr_fixed = bytearray() # fixed header
pub_hdr_fixed.extend(MQTT_PUB)
pub_hdr_fixed[0] |= retain | qos << 1 # [3.3.1.2], [3.3.1.3]

pub_hdr_var = bytearray() # variable header
pub_hdr_var.append(len(topic) >> 8) # Topic length, MSB
pub_hdr_var.append(len(topic) & 0xFF) # Topic length, LSB
pub_hdr_var.extend(topic.encode("utf-8")) # Topic name

remaining_length = 2 + len(msg) + len(topic)
if qos > 0:
sz += 2
assert sz < 2097152
i = 1
while sz > 0x7F:
pkt[i] = (sz & 0x7F) | 0x80
sz >>= 7
i += 1
pkt[i] = sz
# packet identifier where QoS level is 1 or 2. [3.3.2.2]
pid = self._pid
remaining_length += 2
pub_hdr_var.append(0x00)
pub_hdr_var.append(self._pid)
self._pid += 1

# Calculate remaining length [2.2.3]
if remaining_length > 0x7F:
while remaining_length > 0:
encoded_byte = remaining_length % 0x80
remaining_length = remaining_length // 0x80
if remaining_length > 0:
encoded_byte |= 0x80
pub_hdr_fixed.append(encoded_byte)
else:
pub_hdr_fixed.append(remaining_length)

if self.logger is not None:
self.logger.debug(
"Sending PUBLISH\nTopic: {0}\nMsg: {1}\
\nQoS: {2}\nRetain? {3}".format(
topic, msg, qos, retain
)
)
self._sock.send(pkt)
self._send_str(topic)
if qos == 0:
if self.on_publish is not None:
self.on_publish(self, self.user_data, topic, self._pid)
if qos > 0:
self._pid += 1
pid = self._pid
struct.pack_into("!H", pkt, 0, pid)
self._sock.send(pkt)
if self.on_publish is not None:
self.on_publish(self, self.user_data, topic, pid)
if self.logger is not None:
self.logger.debug("Sending PUBACK")
self._sock.send(pub_hdr_fixed)
self._sock.send(pub_hdr_var)
self._sock.send(msg)
if qos == 0 and self.on_publish is not None:
self.on_publish(self, self.user_data, topic, self._pid)
if qos == 1:
while True:
op = self._wait_for_msg()
Expand All @@ -491,10 +505,6 @@ def publish(self, topic, msg, retain=False, qos=0):
if self.on_publish is not None:
self.on_publish(self, self.user_data, topic, rcv_pid)
return
elif qos == 2:
assert 0
if self.on_publish is not None:
self.on_publish(self, self.user_data, topic, rcv_pid)

def subscribe(self, topic, qos=0):
"""Subscribes to a topic on the MQTT Broker.
Expand Down

0 comments on commit eb07565

Please sign in to comment.