Skip to content

Commit

Permalink
Stale device info cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
FlyingDiver committed Aug 3, 2019
1 parent 1e321b4 commit c7dd3a2
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 28 deletions.
9 changes: 5 additions & 4 deletions MQTT.indigoPlugin/Contents/Server Plugin/mqtt_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def __init__(self, device):

self.useTLS = device.pluginProps.get(u'useTLS', False)

self.logger.debug(u"Broker __init__ address = {}, port = {}, protocol = {}, transport = {}".format(self.address, self.port, self.protocol, self.transport))
self.logger.debug(u"{}: Broker __init__ address = {}, port = {}, protocol = {}, transport = {}".format(device.name, self.address, self.port, self.protocol, self.transport))

self.device.updateStateOnServer(key="status", value="Not Connected")
self.device.updateStateImageOnServer(indigo.kStateImageSel.SensorOff)
Expand Down Expand Up @@ -55,7 +55,6 @@ def __init__(self, device):
self.device.updateStateImageOnServer(indigo.kStateImageSel.SensorTripped)

def __del__(self):

self.client.disconnect()
self.device.updateStateOnServer(key="status", value="Not Connected")
self.device.updateStateImageOnServer(indigo.kStateImageSel.SensorOff)
Expand All @@ -72,15 +71,17 @@ def subscribe(self, topic, qos=0):
def unsubscribe(self, topic):
self.client.unsubscribe(topic)

def refreshFromServer(self):
self.device.refreshFromServer()
self.logger.debug(u"{}: refreshFromServer complete".format(self.device.name))


################################################################################
# Callbacks
################################################################################

def on_connect(self, client, userdata, flags, rc):
self.logger.debug(u"{}: Connected with result code {}".format(self.device.name, rc))

# make sure the local copy of the subscription list is up to date
self.device.refreshFromServer()

# Subscribing in on_connect() means that if we lose the connection and reconnect then subscriptions will be renewed.
Expand Down
51 changes: 27 additions & 24 deletions MQTT.indigoPlugin/Contents/Server Plugin/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,16 @@ def startup(self):

def shutdown(self):
self.logger.info(u"Shutting down MQTT")

if self.server:
self.server.kill()

def runConcurrentThread(self):

def runConcurrentThread(self):
try:
while True:

for broker in self.brokers.values():
broker.loop()

self.sleep(0.1)

except self.stopThread:
pass

Expand Down Expand Up @@ -135,6 +131,9 @@ def didDeviceCommPropertyChange(self, origDev, newDev):
return True
if origDev.pluginProps.get('useTLS', None) != newDev.pluginProps.get('useTLS', None):
return True

# a bit of a hack to make sure name changes get pushed down immediately
self.brokers[newDev.id].refreshFromServer()

return False

Expand Down Expand Up @@ -210,10 +209,10 @@ def clearAllSubscriptionsMenu(self, valuesDict, typeId):
return True

def printSubscriptionsMenu(self, valuesDict, typeId):
broker = self.brokers[int(valuesDict["brokerID"])]
self.logger.info(u"{}: Current topic subscriptions:".format(broker.device.name))
for topic in broker.device.pluginProps[u'subscriptions']:
self.logger.info(u"{}:\t\t{}".format(broker.device.name, topic))
device = indigo.devices[int(valuesDict["brokerID"])]
self.logger.info(u"{}: Current topic subscriptions:".format(device.name))
for topic in device.pluginProps[u'subscriptions']:
self.logger.info(u"{}:\t\t{}".format(device.name, topic))
return True

# doesn't do anything, just needed to force other menus to dynamically refresh
Expand All @@ -231,8 +230,8 @@ def publishMessageAction(self, pluginAction, brokerDevice, callerWaitingForResul
payload = indigo.activePlugin.substitute(pluginAction.props["payload"])
qos = int(pluginAction.props["qos"])
retain = bool(pluginAction.props["retain"])
self.logger.debug(u"{}: publishMessageAction {}: {}".format(brokerDevice.name, topic, payload))
broker.publish(topic=topic, payload=payload, qos=qos, retain=retain)
self.logger.debug(u"{}: publishMessageAction {}: {}".format(broker.device.name, topic, payload))

def publishDeviceAction(self, pluginAction, brokerDevice, callerWaitingForResult):
broker = self.brokers[brokerDevice.id]
Expand All @@ -251,7 +250,7 @@ def publishDeviceAction(self, pluginAction, brokerDevice, callerWaitingForResult
payload['pluginProps'] = {}
for key in pubDevice.pluginProps:
payload['states'] [key] = pubDevice.pluginProps[key]
self.logger.debug(u"{}: publishDeviceAction {}: {}".format(broker.device.name, topic, payload))
self.logger.debug(u"{}: publishDeviceAction {}: {}".format(brokerDevice.name, topic, payload))
broker.publish(topic=topic, payload=json.dumps(payload), qos=qos, retain=retain)

def addSubscriptionAction(self, pluginAction, brokerDevice, callerWaitingForResult):
Expand All @@ -269,8 +268,9 @@ def clearAllSubscriptionsAction(self, pluginAction, brokerDevice, callerWaitingF

def pickBroker(self, filter=None, valuesDict=None, typeId=0, targetId=0):
retList = []
for broker in self.brokers.values():
retList.append((broker.device.id, broker.device.name))
for brokerID in self.brokers:
device = indigo.devices[int(brokerID)]
retList.append((device.id, device.name))
retList.sort(key=lambda tup: tup[1])
return retList

Expand All @@ -280,41 +280,44 @@ def pickBroker(self, filter=None, valuesDict=None, typeId=0, targetId=0):

def addSubscription(self, brokerID, topic, qos):
broker = self.brokers[brokerID]
device = indigo.devices[int(brokerID)]
broker.subscribe(topic=topic, qos=qos)
self.logger.debug(u"{}: addSubscription {} ({})".format(broker.device.name, topic, qos))
self.logger.debug(u"{}: addSubscription {} ({})".format(device.name, topic, qos))

updatedPluginProps = broker.device.pluginProps
updatedPluginProps = device.pluginProps
if not 'subscriptions' in updatedPluginProps:
subList = []
else:
subList = updatedPluginProps[u'subscriptions']
if not topic in subList:
subList.append(topic)
updatedPluginProps[u'subscriptions'] = subList
self.logger.debug(u"{}: subscriptions after update :\n{}".format(broker.device.name, updatedPluginProps[u'subscriptions']))
broker.device.replacePluginPropsOnServer(updatedPluginProps)
self.logger.debug(u"{}: subscriptions after update :\n{}".format(device.name, updatedPluginProps[u'subscriptions']))
device.replacePluginPropsOnServer(updatedPluginProps)

def delSubscription(self, brokerID, topic):
broker = self.brokers[brokerID]
device = indigo.devices[int(brokerID)]
broker.unsubscribe(topic=topic)
self.logger.debug(u"{}: delSubscription {}".format(broker.device.name, topic))
self.logger.debug(u"{}: delSubscription {}".format(device.name, topic))

updatedPluginProps = broker.device.pluginProps
updatedPluginProps = device.pluginProps
if not 'subscriptions' in updatedPluginProps:
self.logger.error(u"{}: delSubscription error, subList is empty".format(broker.device.name))
self.logger.error(u"{}: delSubscription error, subList is empty".format(device.name))
return
subList = updatedPluginProps[u'subscriptions']
if not topic in subList:
self.logger.debug(u"{}: Topic {} not in subList.".format(broker.device.name, topics))
self.logger.debug(u"{}: Topic {} not in subList.".format(device.name, topics))
return
subList.remove(topic)
updatedPluginProps[u'subscriptions'] = subList
self.logger.debug(u"{}: subscriptions after update :\n{}".format(broker.device.name, updatedPluginProps[u'subscriptions']))
broker.device.replacePluginPropsOnServer(updatedPluginProps)
self.logger.debug(u"{}: subscriptions after update :\n{}".format(device.name, updatedPluginProps[u'subscriptions']))
device.replacePluginPropsOnServer(updatedPluginProps)

def clearAllSubscriptions(self, brokerID):
broker = self.brokers[brokerID]
for topic in broker.device.pluginProps[u'subscriptions']:
device = indigo.devices[int(brokerID)]
for topic in device.pluginProps[u'subscriptions']:
self.delSubscription(brokerID, topic)


Expand Down

0 comments on commit c7dd3a2

Please sign in to comment.