Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor mqtt service status #87

Merged
merged 32 commits into from
Sep 19, 2020
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
17316d9
feat: alarm status link to device
mxmaxime Sep 14, 2020
70927af
refactor: mqtt status manage thread topics
mxmaxime Sep 14, 2020
730241c
wip refactor: camera status & mqtt connection topic
mxmaxime Sep 14, 2020
e508f2f
refactor: mqtt sound/speaker topics
mxmaxime Sep 15, 2020
493ac7e
feat(mqtt json): try/catch if json decode raise decode error
mxmaxime Sep 17, 2020
02237ca
feat: get device_id from mqtt topic
mxmaxime Sep 17, 2020
0a200bb
feat: stop the speaker when nobody
mxmaxime Sep 17, 2020
009cf05
refactor: mqtt status publish via a new specialized class
mxmaxime Sep 17, 2020
916ffe2
clean: remove payload from mqtt message
mxmaxime Sep 17, 2020
991066c
feat: working on boolean payload + config clean_session
mxmaxime Sep 18, 2020
14efd4e
feat: subscribe to utf8 payload topic
mxmaxime Sep 18, 2020
ea6f5d0
wip: camera merge topic
mxmaxime Sep 18, 2020
8a08ddf
fix#92 apply brg to rbg transform opencv capture
mxmaxime Sep 18, 2020
d66ada0
feat: camera status retain
mxmaxime Sep 18, 2020
f3a7072
feat: thread manager clean for boolean
mxmaxime Sep 18, 2020
cc072fc
feat: messaging send status as real boolean
mxmaxime Sep 18, 2020
46188e7
fix: tiredness
mxmaxime Sep 18, 2020
048af8b
clean
mxmaxime Sep 19, 2020
11df32b
refactor: extract things to files
mxmaxime Sep 19, 2020
f1734aa
todo: ref issue
mxmaxime Sep 19, 2020
5cd950b
fix: wrong extend
mxmaxime Sep 19, 2020
5380b58
refactor: extract django init for standalone scripts to function
mxmaxime Sep 19, 2020
0ff75bf
feat: improve mqtt callback logging
mxmaxime Sep 19, 2020
6bfc97a
fix: things
mxmaxime Sep 19, 2020
ef510f6
fix: when client_id is none clean_session has to be True
mxmaxime Sep 19, 2020
04f4561
feat: log when up/off smart camera
mxmaxime Sep 19, 2020
f0a253f
clean
mxmaxime Sep 19, 2020
11e61c7
fix: camera initialization
mxmaxime Sep 19, 2020
64475f5
clean
mxmaxime Sep 19, 2020
5689739
improve: send boolean as bytes and not str for connected topic
mxmaxime Sep 19, 2020
c238995
clean: old print
mxmaxime Sep 19, 2020
e88dc43
feat: detect when we lost mqtt connection with a device service
mxmaxime Sep 19, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions raspberrypi_central/smart-camera/app/alarm.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@
from mqtt.mqtt_client import get_mqtt_client
from camera.videostream import VideoStream

device_id = os.environ['DEVICE_ID']

mqtt_client = get_mqtt_client(f"{os.environ['DEVICE_ID']}-rpi4-alarm-motion")

MQTT_ALARM_CAMERA_TOPIC = 'status/alarm'
mqtt_client = get_mqtt_client(f"{device_id}-rpi4-alarm-motion")


def run():
Expand All @@ -24,14 +23,14 @@ def run():


manager = ThreadManager(run)
MqttStatusManageThread(mqtt_client, manager, MQTT_ALARM_CAMERA_TOPIC)
MqttStatusManageThread(device_id, 'camera', mqtt_client, manager)


def run_sound():
PlaySound()


sound_manager = ThreadManager(run_sound)
MqttStatusManageThread(mqtt_client, sound_manager, 'status/sound')
MqttStatusManageThread(device_id, 'speaker', mqtt_client, sound_manager)

mqtt_client.loop_forever()
37 changes: 19 additions & 18 deletions raspberrypi_central/smart-camera/app/camera/camera.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from camera.detect_motion import DetectMotion
import json
import datetime
import struct


class Camera():
Expand All @@ -10,37 +10,38 @@ def __init__(self, detect_motion: DetectMotion, get_mqtt_client, device_id):
self.get_mqtt_client = get_mqtt_client
self._last_time_people_detected = None

self._initialize = True

self.detect_motion = detect_motion
self._start()

def _start(self):
self.mqtt_client = self.get_mqtt_client(client_name=f'{self._device_id}-rpi4-alarm-motion-DETECT')
self.mqtt_client.loop_start()

def _noMorePresence(self):
payload = {
'device_id': self._device_id,
}
def _needToPublishNoMotion(self):
if self._initialize is True:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have changed the way status are working so we need to tell the system if nobody is here when the camera is starting up. So basically, if in the first frame we don't detect anybody, we have to publish a message to inform the system.

self._initialize = False
return True

time_lapsed = (self._last_time_people_detected is not None) and (
datetime.datetime.now() - self._last_time_people_detected).seconds >= 5

if time_lapsed:
self._last_time_people_detected = None

self.mqtt_client.publish('motion/camera/no_more', payload=json.dumps(payload), qos=1)
return time_lapsed

def processFrame(self, frame):
result, byteArr = self.detect_motion.process_frame(frame)

if result is True:
if self._last_time_people_detected is None:
payload = {
'device_id': self._device_id,
}

self.mqtt_client.publish('motion/camera', payload=json.dumps(payload), qos=1)
self.mqtt_client.publish('motion/picture', payload=byteArr, qos=1)
self.mqtt_client.publish(f'motion/camera/{self._device_id}', struct.pack('?', 1), qos=1, retain=True)
self.mqtt_client.publish(f'motion/picture/{self._device_id}', payload=byteArr, qos=1)

self._last_time_people_detected = datetime.datetime.now()

time_lapsed = (self._last_time_people_detected is not None) and (
datetime.datetime.now() - self._last_time_people_detected).seconds >= 5

if time_lapsed:
self._last_time_people_detected = None
self._noMorePresence()
if self._needToPublishNoMotion():
print('PUBLISH NO MOTION')
self.mqtt_client.publish(f'motion/camera/{self._device_id}', payload=struct.pack('?', 0), qos=1, retain=True)
7 changes: 6 additions & 1 deletion raspberrypi_central/smart-camera/app/camera/detect_motion.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,12 @@ def _detect_objects(self, interpreter, image, threshold):
return results

def process_frame(self, stream):
image = Image.fromarray(stream).convert('RGB').resize(
"""
From Tensorflow examples, we have .convert('RGB') before the resize.
We removed it because the RGB is created at the stream level (opencv or picamera).
And actually it didn't transform the stream to RGB.
"""
image = Image.fromarray(stream).resize(
(self.input_width, self.input_height), Image.ANTIALIAS)

results = self._detect_objects(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ def _update(self):
# keep looping infinitely until the thread is stopped
while True:
(_, frame) = self.stream.read()
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
self.processFrame(frame)
# TODO issue #79: self.stream.release() to release resources when turning off the camera.
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import json
import ssl
import paho.mqtt.client as mqtt
import struct
from thread.thread_manager import ThreadManager


Expand All @@ -9,28 +7,25 @@ class MqttStatusManageThread():
This class synchronise the alarm status with MQTT.
If we receive a message to switch on/off the alarm, we're doing it here.
"""
def __init__(self, mqtt_client, thread_manager: ThreadManager, mqtt_topic: str):
def __init__(self, device_id: str, service_name: str, mqtt_client, thread_manager: ThreadManager):
self._thread_manager = thread_manager

mqtt_topic = mqtt_topic.lstrip('/')
mqtt_topic = f'status/{service_name}/{device_id}'

mqtt_client.subscribe(mqtt_topic)
mqtt_client.message_callback_add(mqtt_topic, self._switch_on_or_off)

mqtt_client.publish(f'ask/{mqtt_topic}', payload=True)
mqtt_client.publish(f'connected/{service_name}/{device_id}', payload=True, qos=1, retain=True)
mxmaxime marked this conversation as resolved.
Show resolved Hide resolved
mqtt_client.will_set(f'connected/{service_name}/{device_id}', payload=False, qos=1, retain=True)

def _switch_on_or_off(self, client, userdata, msg):
message = msg.payload.decode()
mxmaxime marked this conversation as resolved.
Show resolved Hide resolved
message = msg.payload
(status) = struct.unpack('?', message)

print(f"I've received a message: {message}")

if message == 'True':
if status:
self._thread_manager.running = True
elif message == 'False':
self._thread_manager.running = False
else:
t = type(message)
raise ValueError(f'Value ({t}) "{message}" incorrect')
self._thread_manager.running = False

# WIP: work with TLS.
# os.environ['REQUESTS_CA_BUNDLE'] = "/usr/local/share/ca-certificates/ca.cert"
Expand All @@ -45,4 +40,3 @@ def _switch_on_or_off(self, client, userdata, msg):
# client.tls_set(ca_certs=CA, certfile=CERT_FILE,
# keyfile=KEY_FILE, tls_version=ssl.PROTOCOL_TLSv1_2)
# client.tls_insecure_set(True)

45 changes: 45 additions & 0 deletions raspberrypi_central/webapp/app/alarm/messaging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import struct


class MqttStatus():
mxmaxime marked this conversation as resolved.
Show resolved Hide resolved
def __init__(self, mqtt_client):
self._mqtt_client = mqtt_client

def publish(self, topic, message: bool):
status_bytes = struct.pack('?', message)
print(f'send status: {status_bytes} on {topic}')
self._mqtt_client.publish(topic, status_bytes, qos=1, retain=True)


class SpeakerMessaging():
def __init__(self, mqtt_status: MqttStatus):
self._mqtt_status = mqtt_status

def publish_speaker_status(self, device_id: str, status: bool):
self._mqtt_status.publish(f'status/speaker/{device_id}', status)


class AlarmMessaging():

def __init__(self, mqtt_status: MqttStatus, speaker_messaging: SpeakerMessaging):
self._mqtt_status = mqtt_status
self._speaker_messaging = speaker_messaging

def publish_alarm_status(self, device_id: str, status: bool):
self._mqtt_status.publish(f'status/camera/{device_id}', status)

if status is False:
self._speaker_messaging.publish_speaker_status(device_id, False)


def speaker_messaging_factory(mqtt_client):
mqtt_status = MqttStatus(mqtt_client)

return SpeakerMessaging(mqtt_status)


def alarm_messaging_factory(mqtt_client):
mqtt_status = MqttStatus(mqtt_client)
speaker = speaker_messaging_factory(mqtt_client)

return AlarmMessaging(mqtt_status, speaker)
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Generated by Django 3.0.7 on 2020-09-14 15:36

from django.db import migrations, models
import django.db.models.deletion


class Migration(migrations.Migration):

dependencies = [
('devices', '0002_auto_20200731_2128'),
('alarm', '0006_auto_20200809_1008'),
]

operations = [
migrations.AddField(
model_name='alarmstatus',
name='device',
field=models.ForeignKey(default=1, on_delete=django.db.models.deletion.PROTECT, to='devices.Device'),
preserve_default=False,
),
]
13 changes: 8 additions & 5 deletions raspberrypi_central/webapp/app/alarm/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from django_celery_beat.models import CrontabSchedule, PeriodicTask
from django.db import models
from house.models import House
import pytz
from . import tasks
from devices.models import Device

Expand Down Expand Up @@ -95,27 +94,31 @@ def model_boolean_fields_to_cron_days():

super().save(*args, **kwargs)


class AlarmStatusManager(models.Manager):
def get_status(self):
return self.all().first().running
return self.all().first()
mxmaxime marked this conversation as resolved.
Show resolved Hide resolved


class AlarmStatus(models.Model):
objects = AlarmStatusManager()

running = models.BooleanField()
device = models.ForeignKey(Device, on_delete=models.PROTECT)

# only one row can be created, otherwise: IntegrityError is raised.
# from https://books.agiliq.com/projects/django-orm-cookbook/en/latest/singleton.html
# TODO: we will remove this for issue #86
def save(self, *args, **kwargs):
if self.__class__.objects.count():
self.pk = self.__class__.objects.first().pk
tasks.alarm_status_changed.delay(self.running)

tasks.alarm_status_changed.delay(self.device_id, self.running)

super().save(*args, **kwargs)

def __str__(self):
return f'Status is {self.running}'
return f'Status is {self.running} for {self.device}'


class CameraMotionDetected(models.Model):
Expand Down
63 changes: 43 additions & 20 deletions raspberrypi_central/webapp/app/alarm/mqtt.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,46 @@
import uuid
from standalone.mqtt import MqttTopicFilterSubscription, MqttTopicSubscription, MqttMessage, MqttTopicSubscriptionJson, MQTT
import logging
from standalone.mqtt import MqttTopicSubscriptionBoolean, MqttTopicFilterSubscription, MqttTopicSubscription, MqttMessage, MQTT
from alarm.tasks import camera_motion_picture, camera_motion_detected
from django.core.files.storage import default_storage
from django.core.files.base import ContentFile
from functools import partial
from alarm.models import AlarmStatus
from .messaging import alarm_messaging_factory, speaker_messaging_factory

_LOGGER = logging.getLogger(__name__)

def on_motion_camera(message: MqttMessage):
payload = message.payload

data = {
'device_id': payload['device_id']
def split_camera_topic(topic: str):
data = topic.split('/')

return {
'type': data[0],
'service': data[1],
'device_id': data[2]
}

camera_motion_detected.apply_async(kwargs=data)

def on_motion_camera(client: MQTT, message: MqttMessage):
topic = split_camera_topic(message.topic)

print(f'on motion {message}')

if message.payload is True:
print('on motion camera true!')
data = {
'device_id': topic['device_id']
}
camera_motion_detected.apply_async(kwargs=data)
else:
print('on motion camera false!')
speaker = speaker_messaging_factory(client)
speaker.publish_speaker_status(topic['device_id'], False)


def on_motion_picture(message: MqttMessage):
topic = split_camera_topic(message.topic)

random = uuid.uuid4()
file_name = f'{random}.jpg'

Expand All @@ -28,24 +51,25 @@ def on_motion_picture(message: MqttMessage):
picture_path = default_storage.path(filename)

data = {
'device_id': topic['device_id'],
'picture_path': picture_path
}

camera_motion_picture.apply_async(kwargs=data)


def on_motion_camera_no_more(client: MQTT, message: MqttMessage):
client.publish('status/sound', str(False), qos=1)

def on_connected_speaker(client: MQTT, message: MqttMessage):
topic = split_camera_topic(message.topic)
speaker_messaging_factory(client).publish_speaker_status(topic['device_id'], False)

def on_status_alarm(client: MQTT, message: MqttMessage):
status = AlarmStatus.objects.get_status()

client.publish('status/alarm', message=str(status), qos=1)
def on_connected_camera(client: MQTT, message: MqttMessage):
topic = split_camera_topic(message.topic)

device_id = topic['device_id']

def on_status_sound(client: MQTT, message: MqttMessage):
client.publish('status/sound', message=str(False), qos=1)
device_status = AlarmStatus.objects.get(device__device_id=device_id)
alarm_messaging_factory(client).publish_alarm_status(device_status.device.device_id, device_status.running)


def register(mqtt: MQTT):
Expand All @@ -54,18 +78,17 @@ def register(mqtt: MQTT):
topic='motion/#',
qos=1,
topics=[
MqttTopicSubscriptionJson('motion/camera', on_motion_camera),
MqttTopicSubscriptionBoolean('motion/camera/+', partial(on_motion_camera, mqtt)),
# encoding is set to None because this topic receives a picture as bytes -> decode utf-8 on it will raise an Exception.
MqttTopicSubscription('motion/picture', on_motion_picture, encoding=None),
MqttTopicSubscription('motion/camera/no_more', partial(on_motion_camera_no_more, mqtt)),
MqttTopicSubscription('motion/picture/+', on_motion_picture),
],
),
MqttTopicFilterSubscription(
topic='ask/#',
topic='connected/camera/+',
qos=1,
topics=[
MqttTopicSubscription('ask/status/alarm', partial(on_status_alarm, mqtt)),
MqttTopicSubscription('ask/status/sound', partial(on_status_sound, mqtt)),
MqttTopicSubscription('connected/camera/+', partial(on_connected_camera, mqtt)),
MqttTopicSubscription('connected/speaker/+', partial(on_connected_speaker, mqtt)),
]
)
])
Loading