Skip to content

Commit

Permalink
Merge pull request #18 from jimbobbennett/topic_subscribe
Browse files Browse the repository at this point in the history
Updated to use topic specific subscriptions
  • Loading branch information
brentru committed Jun 10, 2020
2 parents 7f30d82 + b2b6035 commit 6735960
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 40 deletions.
51 changes: 12 additions & 39 deletions adafruit_azureiot/iot_mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ def _create_mqtt_client(self) -> None:

# set actions to take throughout connection lifecycle
self._mqtts.on_connect = self._on_connect
self._mqtts.on_message = self._on_message
self._mqtts.on_log = self._on_log
self._mqtts.on_publish = self._on_publish
self._mqtts.on_disconnect = self._on_disconnect
Expand Down Expand Up @@ -172,7 +171,7 @@ def _on_publish(self, client, data, topic, msg_id) -> None:
self._logger.info("- iot_mqtt :: _on_publish :: " + str(data) + " on topic " + str(topic))

# pylint: disable=W0703
def _handle_device_twin_update(self, msg: str, topic: str) -> None:
def _handle_device_twin_update(self, client, topic: str, msg: str) -> None:
self._logger.debug("- iot_mqtt :: _echo_desired :: " + topic)
twin = None
desired = None
Expand Down Expand Up @@ -213,7 +212,7 @@ def _handle_device_twin_update(self, msg: str, topic: str) -> None:
for property_name, value in desired.items():
self._callback.device_twin_desired_updated(property_name, value, desired_version)

def _handle_direct_method(self, msg: str, topic: str) -> None:
def _handle_direct_method(self, client, topic: str, msg: str) -> None:
index = topic.find("$rid=")
method_id = 1
method_name = "None"
Expand Down Expand Up @@ -244,7 +243,7 @@ def _handle_direct_method(self, msg: str, topic: str) -> None:
self._logger.info("C2D: => " + next_topic + " with data " + ret_message + " and name => " + method_name)
self._send_common(next_topic, ret_message)

def _handle_cloud_to_device_message(self, msg: str, topic: str) -> None:
def _handle_cloud_to_device_message(self, client, topic: str, msg: str) -> None:
parts = topic.split("&")[1:]

properties = {}
Expand All @@ -255,38 +254,6 @@ def _handle_cloud_to_device_message(self, msg: str, topic: str) -> None:
self._callback.cloud_to_device_message_received(msg, properties)
gc.collect()

# pylint: disable=W0702, R0912
def _on_message(self, client, msg_topic, payload) -> None:
topic = ""
msg = None

self._logger.info("- iot_mqtt :: _on_message")

if payload is not None:
try:
msg = payload.decode("utf-8")
except:
msg = str(payload)

if msg_topic is not None:
try:
topic = msg_topic.decode("utf-8")
except:
topic = str(msg_topic)

if topic.startswith("$iothub/"):
if topic.startswith("$iothub/twin/PATCH/properties/desired/") or topic.startswith("$iothub/twin/res/200/?$rid="):
self._handle_device_twin_update(str(msg), topic)
elif topic.startswith("$iothub/methods"):
self._handle_direct_method(str(msg), topic)
else:
if not topic.startswith("$iothub/twin/res/"): # not twin response
self._logger.error("ERROR: unknown twin! - {}".format(msg))
elif topic.startswith("devices/{}/messages/devicebound".format(self._device_id)):
self._handle_cloud_to_device_message(str(msg), topic)
else:
self._logger.error("ERROR: (unknown message) - {}".format(msg))

def _send_common(self, topic: str, data) -> None:
# Convert data to a string
if isinstance(data, dict):
Expand Down Expand Up @@ -363,13 +330,19 @@ def __init__(
self._is_subscribed_to_twins = False

def _subscribe_to_core_topics(self):
self._mqtts.subscribe("devices/{}/messages/events/#".format(self._device_id))
self._mqtts.subscribe("devices/{}/messages/devicebound/#".format(self._device_id))
device_bound_topic = "devices/{}/messages/devicebound/#".format(self._device_id)
self._mqtts.add_topic_callback(device_bound_topic, self._handle_cloud_to_device_message)
self._mqtts.subscribe(device_bound_topic)

self._mqtts.add_topic_callback("$iothub/methods/#", self._handle_direct_method)
self._mqtts.subscribe("$iothub/methods/#")

def _subscribe_to_twin_topics(self):
self._mqtts.add_topic_callback("$iothub/twin/PATCH/properties/desired/#", self._handle_device_twin_update)
self._mqtts.subscribe("$iothub/twin/PATCH/properties/desired/#") # twin desired property changes
self._mqtts.subscribe("$iothub/twin/res/#") # twin properties response

self._mqtts.add_topic_callback("$iothub/twin/res/200/#", self._handle_device_twin_update)
self._mqtts.subscribe("$iothub/twin/res/200/#") # twin properties response

def connect(self) -> bool:
"""Connects to the MQTT broker
Expand Down
9 changes: 8 additions & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,14 @@
# Uncomment the below if you use native CircuitPython modules such as
# digitalio, micropython and busio. List the modules you use. Without it, the
# autodoc module docs will fail to generate with a warning.
autodoc_mock_imports = ["adafruit_binascii", "adafruit_logging", "adafruit_requests", "adafruit_hashlib", "adafruit_ntp"]
autodoc_mock_imports = [
"adafruit_binascii",
"adafruit_logging",
"adafruit_requests",
"adafruit_hashlib",
"adafruit_ntp",
"adafruit_minimqtt",
]


intersphinx_mapping = {
Expand Down

0 comments on commit 6735960

Please sign in to comment.