Skip to content

Commit

Permalink
Merge pull request #1760 from Odianosen25/mqtt-fixes
Browse files Browse the repository at this point in the history
Fix mqtt issues
  • Loading branch information
acockburn committed Apr 19, 2023
2 parents edac0a2 + c1104dc commit 7f831db
Showing 1 changed file with 41 additions and 36 deletions.
77 changes: 41 additions & 36 deletions appdaemon/plugins/mqtt/mqttplugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import traceback
import ssl
import json
from threading import Lock

import appdaemon.utils as utils
from appdaemon.appdaemon import AppDaemon
Expand Down Expand Up @@ -135,6 +136,7 @@ def __init__(self, ad: AppDaemon, name, args):
}

self.mqtt_connect_event = None
self.mqtt_lock = Lock()

def stop(self):
self.logger.debug("stop() called for %s", self.name)
Expand All @@ -145,7 +147,8 @@ def stop(self):
self.mqtt_client_host,
self.mqtt_client_port,
)
for topic in self.mqtt_client_topics:
client_topics = copy.deepcopy(self.mqtt_client_topics)
for topic in client_topics:
self.mqtt_unsubscribe(topic)

self.mqtt_client.publish(
Expand Down Expand Up @@ -188,9 +191,9 @@ def mqtt_on_connect(self, client, userdata, flags, rc):
self.AD.services.register_service(self.namespace, "mqtt", "unsubscribe", self.call_plugin_service)
self.AD.services.register_service(self.namespace, "mqtt", "publish", self.call_plugin_service)

topics = copy.deepcopy(self.mqtt_client_topics)
client_topics = copy.deepcopy(self.mqtt_client_topics)

for topic in topics:
for topic in client_topics:
self.mqtt_subscribe(topic, self.mqtt_qos)

self.mqtt_connected = True
Expand Down Expand Up @@ -297,27 +300,29 @@ def mqtt_subscribe(self, topic, qos):

result = None
try:
self.update_perf(requests_sent=1, bytes_sent=len(json.dumps(topic)))

result = self.mqtt_client.subscribe(topic, qos)
if result[0] == 0:
self.logger.debug("Subscription to Topic %s Successful", topic)
if topic not in self.mqtt_client_topics:
self.mqtt_client_topics.append(topic)

if "#" in topic or "+" in topic:
# its a wildcard
self.add_mqtt_wildcard(topic)
self.update_perf(requests_sent=1, bytes_sent=len(json.dumps(topic)))
else:
if topic in self.mqtt_client_topics:
self.mqtt_client_topics.remove(topic)
self.update_perf(requests_sent=1, bytes_sent=len(json.dumps(topic)))
with self.mqtt_lock:
self.update_perf(requests_sent=1, bytes_sent=len(json.dumps(topic)))

self.logger.debug(
"Subscription to Topic %s Unsuccessful, as Client possibly not currently connected",
topic,
)
result = self.mqtt_client.subscribe(topic, qos)
if result[0] == 0:
self.logger.debug("Subscription to Topic %s Successful", topic)
if topic not in self.mqtt_client_topics:
self.mqtt_client_topics.append(topic)

if "#" in topic or "+" in topic:
# its a wildcard
self.add_mqtt_wildcard(topic)

else:
if topic in self.mqtt_client_topics:
self.mqtt_client_topics.remove(topic)

self.logger.debug(
"Subscription to Topic %s Unsuccessful, as Client possibly not currently connected",
topic,
)

self.update_perf(updates_recv=1, bytes_recv=len(json.dumps(result)))

except Exception as e:
self.logger.warning("There was an error while subscribing to topic %s, %s", topic, e)
Expand All @@ -330,21 +335,21 @@ def mqtt_unsubscribe(self, topic):

result = None
try:
self.update_perf(requests_sent=1, bytes_sent=len(json.dumps(topic)))
result = self.mqtt_client.unsubscribe(topic)
if result[0] == 0:
self.logger.debug("Unsubscription from Topic %s Successful", topic)
if topic in self.mqtt_client_topics:
self.mqtt_client_topics.remove(topic)
self.update_perf(requests_sent=1, bytes_sent=len(json.dumps(topic)))

self.remove_mqtt_binary(topic)
self.update_perf(requests_sent=1, bytes_sent=len(json.dumps(topic)))
self.remove_mqtt_wildcard(topic)
with self.mqtt_lock:
self.update_perf(requests_sent=1, bytes_sent=len(json.dumps(topic)))
result = self.mqtt_client.unsubscribe(topic)
if result[0] == 0:
self.logger.debug("Unsubscription from Topic %s Successful", topic)
if topic in self.mqtt_client_topics:
self.mqtt_client_topics.remove(topic)

else:
self.logger.warning("Unsubscription from Topic %s was not Successful", topic)
self.remove_mqtt_binary(topic)
self.remove_mqtt_wildcard(topic)

else:
self.logger.warning("Unsubscription from Topic %s was not Successful", topic)

self.update_perf(updates_recv=1, bytes_recv=len(json.dumps(result)))

except Exception as e:
self.logger.warning("There was an error while unsubscribing from topic %s, %s", topic, e)
Expand Down

0 comments on commit 7f831db

Please sign in to comment.