Skip to content

Commit

Permalink
fixed MQTT reconnection issues
Browse files Browse the repository at this point in the history
  • Loading branch information
Claudio Nold committed Jan 28, 2021
1 parent 3aa16a5 commit f24e1b1
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 30 deletions.
4 changes: 2 additions & 2 deletions docroot/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<!DOCTYPE html>
<html lang="en">
<head>
<title>MQTT Web Socket Example</title>
<title>Safe Chicken</title>
<!-- Required meta tags -->
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">
Expand All @@ -23,7 +23,7 @@
<script type="module" src="js/main.js"></script>

<div class="container-fluid">
<h1>Safe Chicken</h1>
<h1>Safe Chicken Door Control</h1>
<div class="form-row">
<div class="col-md boxbg">
<h3>Status</h3>
Expand Down
9 changes: 9 additions & 0 deletions docs/raspberry/base_setup.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ Disable Bluetooth

:code:`sudo systemctl disable bluetooth`

Disable Serial Console
----------------------

.. code::
sudo systemctl disable hciuart
sudo apt purge bluez
sudo apt autoremove
Copy your SSH public keys
-------------------------

Expand Down
7 changes: 3 additions & 4 deletions raspberry/copy_to_raspi.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,16 @@

RASPIIP="${1}"
RASPISSH="pi@${RASPIIP}"
REMOTE_BASEDIR="/home/pi/safechicken"
REMOTE_BASEDIR="/home/pi/safe-chicken"

ssh "${RASPISSH}" "mkdir -p ${REMOTE_BASEDIR}"

# --- 1. copy everything except the venv

shopt -s extglob
scp -r !(venv) "${RASPISSH}:${REMOTE_BASEDIR}/"
scp -r docroot raspberry safechicken tests config.json requirements.txt "${RASPISSH}:${REMOTE_BASEDIR}/"

if [[ $? -ne 0 ]]; then
echo "copy failed, do not check venv"
echo "copy failed"
exit 1
fi

Expand Down
2 changes: 1 addition & 1 deletion raspberry/safechicken.service
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ After=multi-user.target

[Service]
Type=idle
ExecStart=/home/pi/safechicken/raspberry/startup_safechicken.sh &
ExecStart=/home/pi/safe-chicken/raspberry/startup_safechicken.sh &

[Install]
WantedBy=multi-user.target
2 changes: 1 addition & 1 deletion raspberry/startup_safechicken.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# this file is started in the system start and runs as root
#-------------------------------------------------------------------------------

PROJ_BASEPATH="/home/pi/safechicken"
PROJ_BASEPATH="/home/pi/safe-chicken"

cd ${PROJ_BASEPATH}

Expand Down
11 changes: 5 additions & 6 deletions safechicken/iocontrol.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ def __init__(self, config_dict):
active_state=self.config_dict['in_door_closed']['active_state'],
pull_up=None,
bounce_time=1.0)
self.in_door_closed.when_activated = self.on_door_closed
self.in_door_closed.when_deactivated = self.on_door_not_closed
self.in_door_closed.when_activated = self.on_door_not_closed
self.in_door_closed.when_deactivated = self.on_door_closed

self.systemtime_synced = False
self.clear_timer = None
Expand All @@ -39,7 +39,7 @@ def update_commands(self, command_out):
self._check_and_open('force')
elif command_out['current'] == 'close':
self._check_and_close('force')
self.last_command_out = command_out
self.last_command_out = copy.deepcopy(command_out)

def execute_pending_command(self):
if self.last_command_out['current']:
Expand Down Expand Up @@ -74,13 +74,13 @@ def get_door_state_log(self):

def on_door_closed(self):
logging.info('input: door closed')
self.door_closed_log.insert(0, {'state': 'open', 'datetime': datetime.now().isoformat(timespec='minutes')})
self.door_closed_log.insert(0, {'state': 'close', 'datetime': datetime.now().isoformat(timespec='minutes')})
if len(self.door_closed_log) > 10:
del self.door_closed_log[-1]

def on_door_not_closed(self):
logging.info('input: door NOT closed/opening/open')
self.door_closed_log.insert(0, {'state': 'close', 'datetime': datetime.now().isoformat(timespec='minutes')})
self.door_closed_log.insert(0, {'state': 'open', 'datetime': datetime.now().isoformat(timespec='minutes')})
if len(self.door_closed_log) > 10:
del self.door_closed_log[-1]

Expand Down Expand Up @@ -122,7 +122,6 @@ def _time_passed(self, check_time):
# do nothing of time is not correct
return False

# <------ TODO: avoid skipping a timed command because after the 'next_time' the time has alredy been changed to the next!
return check_time < datetime.now()


Expand Down
7 changes: 5 additions & 2 deletions safechicken/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,10 @@ def main():
help='Set logging level. Example --loglevel debug|info|warning|error, default=info')
parser.add_argument('-lf', '--logfile', default=None, help='Logging to a file if set; default: to console')
args = parser.parse_args()
logging.basicConfig(filename=args.logfile, level=args.loglevel.upper())

log_format = '%(asctime)-15s|%(module)+11s.%(funcName)-20s| %(message)s'
logging.basicConfig(filename=args.logfile, level=args.loglevel.upper(), format=log_format)
logging.info('Starting Safe Chicken Door Control...')

global config_dict
config_dict = json.load(args.config_file)
Expand Down Expand Up @@ -191,7 +194,7 @@ def main():

while True:
# sleep time but note: MQTT client is listening and always active
time.sleep(60)
time.sleep(30)
# logging.info('.')

# periodical checks
Expand Down
55 changes: 41 additions & 14 deletions safechicken/mqttclient.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import copy
import functools
import json

Expand Down Expand Up @@ -28,39 +29,48 @@ def on_message(client, userdata, message):
client.loop_stop()


def _on_mqtt_message(mqtt_client, paho_client, userdata, message):
logging.info('received message {0}: {1}'.format(message.topic, str(message.payload.decode("utf-8"))))
topic_func = mqtt_client.get_topic_func(message.topic)
if topic_func:
topic_func(message.topic, json.loads(message.payload.decode("utf-8")))


class MqttClient:
def __init__(self, mqtt_config, topic_config):
self.mqtt_config = mqtt_config
self.topic_config = topic_config
self.client = mqtt.Client(mqtt_config['client_name'])
self.host = self.mqtt_config['broker_hostname']
self.topic_list = []
self.published_backup = {}

def connect_subscribe(self, topic_list):
try:
self.client.connect(self.mqtt_config['broker_hostname'])
self.topic_list = topic_list
self.client.on_connect = functools.partial(_on_connect, self)
self.client.on_message = functools.partial(_on_mqtt_message, self)
self.client.connect(self.host)
self.client.loop_start()
logging.info('MQTT connected to {0}'.format(self.mqtt_config['broker_hostname']))

self.topic_list = topic_list
except Exception as e:
logging.warning('Error on MQTT connection (retry later): {0} (topic list size: {1})'.
format(e, len(topic_list)))

def subscribe_now(self):
try:
for topic_elem in self.topic_list:
topic_name = topic_elem[0]
logging.info('Subscribe for topic {0}'.format(topic_name))
self.client.subscribe(topic_name)

# re-publish everything again. this is needed if an IO changes its state while the MQTT client is disconnected
for topic_elem in self.published_backup:
self.client.publish(topic=topic_elem, payload=self.published_backup[topic_elem], retain=True)

for topic_elem in topic_list:
self.client.subscribe(topic_elem[0])
except Exception as e:
logging.warning('Error on MQTT connection (retry later): {0}'.format(e))
logging.warning('Error on MQTT connection (retry later): {0} (topic list size: {1})'.
format(e, len(self.topic_list)))

def disconnect(self):
self.client.loop_stop()

def publish(self, topic, content_dict):
self.client.publish(topic=topic, payload=json.dumps(content_dict), retain=True)
self.published_backup[topic] = json.dumps(content_dict)
self.client.publish(topic=topic, payload=self.published_backup[topic], retain=True)

def publish_volatile(self, topic, content_dict):
self.client.publish(topic=topic, payload=json.dumps(content_dict), retain=False)
Expand All @@ -73,3 +83,20 @@ def get_topic_func(self, topic_name):

def is_connected(self):
return self.client.is_connected()


def _on_mqtt_message(mqtt_client: MqttClient, paho_client, userdata, message):
logging.info('received message {0}: {1}'.format(message.topic, str(message.payload.decode("utf-8"))))
topic_func = mqtt_client.get_topic_func(message.topic)
if topic_func:
topic_func(message.topic, json.loads(message.payload.decode("utf-8")))


def _on_connect(mqtt_client: MqttClient, client, userdata, flags, rc):
if rc == 0:
logging.info('MQTT connected to {0} (topic list size: {1})'.
format(mqtt_client.host, len(mqtt_client.topic_list)))
mqtt_client.subscribe_now()

else:
logging.info('Failed to connect to {0}, return code {1}'.format(mqtt_client.host, rc))

0 comments on commit f24e1b1

Please sign in to comment.