Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
__pycache__/
.pytest_cache/
.tox/
.venv/
.*venv/
.vscode/
.vscode-test/
.vagra
DS_Store
hss_venv/
*.log
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ This project is a home security system that uses a Raspberry Pi and a camera, wh
### Installation

```bash
$ sudo apt install -y python3-picamera2
$ virtualenv venv
$ source venv/bin/activate
$ pip install -r requirements.txt
Expand Down
6 changes: 5 additions & 1 deletion core/observers/observer/hss_observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from core.observers.subject.wifi_subject import WiFiSubject
from core.observers.subject.eye_subject import EyeSubject
from core.utils.datatypes import EyeStates, WiFiStates
from core.utils.fileio_adaptor import upload_to_fileio, read_latest_file
from core.strategies.notifier.base_notifier_strategy import BaseNotifierStrategy

# Add logging support.
Expand Down Expand Up @@ -34,7 +35,10 @@ def update(self, subject: BaseSubject) -> None:

if self.wifi_state == WiFiStates.DISCONNECTED and self.eye_state == EyeStates.DETECTED:
logger.info("There is an intruder!")
self._notifier.notify_all("There is an intruder!")
fileio_link = upload_to_fileio(
read_latest_file("~/.home-security-system/images")
)
self._notifier.notify_all(f"There is an intruder! Here is the image: {fileio_link}.")

def set_notifier(self, notifier: BaseNotifierStrategy) -> None:
"""This method is called when the observer is updated."""
Expand Down
51 changes: 39 additions & 12 deletions core/observers/subject/eye_subject.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@
This class inherits from IBaseSubject.
Concretes a subject for Eye/Camera features.
"""
import os
import logging
from datetime import datetime
from time import sleep
from threading import Thread
from threading import Thread, Lock
from typing import Optional

import cv2

from core.utils.datatypes import EyeStates, EyeStrategyResult
from core.utils.fileio_adaptor import upload_to_fileio
from core.observers.subject.base_subject import BaseSubject
from core.strategies.eye.base_eye_strategy import BaseEyeStrategy

Expand All @@ -22,37 +25,61 @@ class EyeSubject(BaseSubject):
This class inherits from IBaseSubject.
Concretes a subject for Eye/Camera features.
"""
DEFAULT_IMAGE_LOCATIONS: str = "~/.home-security-system/images"
DEFAULT_SLEEP_INTERVAL = 10
SLEEP_INTERVAL_DETECTED = 5

def __init__(self, image_path: str):
def __init__(self, image_path: str = DEFAULT_IMAGE_LOCATIONS):
super().__init__()
self._image_path = image_path
self._image_path = (
image_path
if '~' not in image_path
else os.path.expanduser(image_path)
)

# Create the default image directory if not exists.
os.makedirs(self._image_path, exist_ok=True)

@staticmethod
def get_default_state() -> EyeStates:
"""This method is called when the observer is updated."""
return EyeStates.UNREACHABLE

def run(self, eye_strategy: BaseEyeStrategy) -> None:
def run(self,
eye_strategy: BaseEyeStrategy,
wifi_lock: Optional[Lock] = None
) -> None:
"""This method is called when the observer is updated."""
thread = Thread(target=self._run_in_loop, args=(self, eye_strategy,))
thread = Thread(target=self._run_in_loop, args=(self, eye_strategy, wifi_lock))
thread.start()
logger.debug("EyeSubject is running...")

@staticmethod
def _run_in_loop(self, eye_strategy: BaseEyeStrategy) -> None:
def _run_in_loop(self,
eye_strategy: BaseEyeStrategy,
wifi_lock: Optional[Lock] = None
) -> None:
"""This method is called when the observer is updated."""
sleep_interval = EyeSubject.DEFAULT_SLEEP_INTERVAL

# Create a dummy lock instance if not given.
if wifi_lock is None:
wifi_lock = Lock()

while True:
result = eye_strategy.check_if_detected()
logger.debug("EyeStrategyResult: " + str(result.result))
# If WiFi subject would give rights to use camera,
# Check if any intruders detected.
if not wifi_lock.locked():
result = eye_strategy.check_if_detected()
logger.debug("EyeStrategyResult: " + str(result.result))

if result.result:
self.set_state(EyeStates.DETECTED)
self._save_image(result)
sleep_interval = EyeSubject.SLEEP_INTERVAL_DETECTED
if result.result:
self.set_state(EyeStates.DETECTED)
self._save_image(result)
sleep_interval = EyeSubject.SLEEP_INTERVAL_DETECTED

# If the WiFi subject does not give rights,
# aka: "There is protectors around the house."
else:
self.set_state(EyeStates.NOT_DETECTED)
sleep_interval = EyeSubject.DEFAULT_SLEEP_INTERVAL
Expand Down
21 changes: 20 additions & 1 deletion core/observers/subject/wifi_subject.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
Concretes a subject WiFi features.
"""
import logging
from threading import Thread
from threading import Thread, Lock
from time import sleep
from typing import Optional

from core.utils.datatypes import WiFiStates
from core.observers.subject.base_subject import BaseSubject
Expand All @@ -19,6 +20,8 @@ class WiFiSubject(BaseSubject):
This class inherits from IBaseSubject.
Concretes a subject for WiFiS features.
"""
SINGLETON_LOCK: Optional[Lock] = None

@staticmethod
def get_default_state() -> WiFiStates:
"""This method is called when the observer is updated."""
Expand All @@ -29,16 +32,32 @@ def run(self, wifi_strategy: BaseWiFiStrategy) -> None:
thread = Thread(target=self._run_in_loop, args=(self, wifi_strategy,))
thread.start()
logger.debug("WiFiSubject is running...")

@classmethod
def get_protector_lock(cls) -> Lock:
"""This method returns a Lock object where it can be
used to block camera when there is a WiFi connection
from protectors.
"""
if cls.SINGLETON_LOCK is None:
cls.SINGLETON_LOCK = Lock()
return cls.SINGLETON_LOCK

@staticmethod
def _run_in_loop(self, wifi_strategy: BaseWiFiStrategy) -> None:
"""This method is called when the observer is updated."""
protector_lock: Lock = self.get_protector_lock()

while True:
protectors = wifi_strategy.check_protectors()
logger.debug("Protectors: " + str(protectors.result) + " " + str(protectors.protector))

if protectors.result:
self.set_state(WiFiStates.CONNECTED)
if not protector_lock.locked():
protector_lock.acquire()
else:
self.set_state(WiFiStates.DISCONNECTED)
if protector_lock.locked():
protector_lock.release()
sleep(5)
81 changes: 81 additions & 0 deletions core/strategies/detectors/efficientdet_strategy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""
An TinyML detection technique using Efficientdet model.
"""
import time
from typing import Any
import cv2
import numpy
from tflite_runtime.interpreter import Interpreter
from .base_detector_strategy import BaseDetectorStrategy, DetectorResult


class EfficientdetStrategy(BaseDetectorStrategy):
"""
The Efficientdet strategy for detection of objects.
"""
MODEL_PATH: str = "models/efficientdet_1.tflite"
LABEL_PATH: str = "models/efficientdet_1_labelmap.txt"
DETECTION_THRES: float = 0.35

@classmethod
def detect_humans(cls, frame: numpy.ndarray) -> DetectorResult:
"""This method detects if there are any humans in the frame."""
# Create an model interpreter.
interpreter: Interpreter = Interpreter(model_path=cls.MODEL_PATH)
interpreter.allocate_tensors()

# Get model input and output details.
input_details: list[dict[str, Any]] = interpreter.get_input_details()
output_details: list[dict[str, Any]] = interpreter.get_output_details()
_, input_height, input_width, _ = input_details[0]['shape']

# Prepare image for input-tensor.
image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
image = cv2.resize(image, (input_width, input_height), interpolation=cv2.INTER_AREA)
image_height, image_width = image.shape[:2]

# Apply the frame into first tensor of the model.
input_data = numpy.expand_dims(image, axis=0)
interpreter.set_tensor(input_details[0]['index'], input_data)

# Calculate the output tensor.
interpreter.invoke()

# Recieve the output.
boxes = interpreter.get_tensor(output_details[0]['index'])[0]
classes = interpreter.get_tensor(output_details[1]['index'])[0]
scores = interpreter.get_tensor(output_details[2]['index'])[0]

# Read label-map.
with open(cls.LABEL_PATH, 'r', encoding="utf-8") as labelmap:
labels = [line.strip() for line in labelmap.readlines()]

# Create color legend for each class type.
colors = numpy.random.randint(0, 255, size=(len(labels), 3), dtype='uint8')

# Convert RGB to BGR again.
image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

# Travers through detections.
detection_regions: list[tuple[int, int, int, int]] = []
for score, box, pred_class in zip(scores, boxes, classes):
if score < cls.DETECTION_THRES:
continue

class_name = labels[int(pred_class)]
if class_name == "person":
min_y = round(box[0] * image_height)
min_x = round(box[1] * image_width)
max_y = round(box[2] * image_height)
max_x = round(box[3] * image_width)
detection_regions.append((min_x, max_x, min_y, max_y))

cv2.rectangle(image, (min_x, min_y), (max_x, max_y), (0, 255, 0), 2)

result = DetectorResult(
image=image,
human_found=len(detection_regions) > 0,
regions=detection_regions,
num_detections=len(detection_regions),
)
return result
6 changes: 1 addition & 5 deletions core/strategies/eye/base_eye_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,4 @@ def check_if_detected(self) -> EyeStrategyResult:
frame = self.get_frame()
# Detect humans in the frame.
result = self._detect_humans(frame)

if result.human_found:
return EyeStrategyResult(image=frame, result=True)
return EyeStrategyResult(image=frame, result=False)

return EyeStrategyResult(image=result.image, result=result.human_found)
2 changes: 1 addition & 1 deletion core/strategies/notifier/whatsapp_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def _send_message(self, reciever: WhatsappReciever, message: str) -> bool:
response = requests.get(request_url, timeout=10)

# Check if the request was unsuccessful.
if response.status_code != 200:
if response.status_code != 200 or "ERROR" in response.text:
logger.error("Failed to send WhatsApp message to %s", reciever.telephone_number)
logger.error("Status code: %s", response.status_code)
return False
Expand Down
32 changes: 32 additions & 0 deletions core/utils/fileio_adaptor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from typing import Any
import requests
from requests.auth import HTTPBasicAuth
import os
import glob

def read_latest_file(dir_path: str) -> str:
"""This method reads the latest file from the given directory."""
# Check if ~ is used.
if dir_path.startswith("~"):
dir_path = os.path.expanduser(dir_path)

# Check if the directory exists.
if not os.path.exists(dir_path):
raise FileNotFoundError(f"The given directory path does not exist: {dir_path}")

# Get the latest file.
list_of_files = glob.glob(dir_path + '/*')
return max(list_of_files, key=os.path.getctime)

def upload_to_fileio(file_path: str) -> str:
"""Uploads a image file to File.io server."""
response = requests.post(
'https://file.io/',
files={"file": open(file_path, 'rb')},
auth=HTTPBasicAuth("API_KEY_HERE", '')
)
res: dict[str, Any] = response.json()
if res['success'] == True:
return res['link']
else:
return "File upload failed!"
16 changes: 8 additions & 8 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@
from core.observers.subject.wifi_subject import WiFiSubject
from core.observers.observer.hss_observer import HomeSecuritySystemObserver
from core.strategies.wifi.ipaddress_strategy import IpAddressStrategy
from core.strategies.eye.usbcamera_strategy import UsbCameraStrategy
from core.strategies.eye.picamera_strategy import PiCameraStrategy
from core.strategies.notifier.whatsapp_strategy import WhatsappStrategy
from core.strategies.detectors.hog_descriptor_strategy import HogDescriptorStrategy
from core.strategies.detectors.efficientdet_strategy import EfficientdetStrategy
from core.utils.datatypes import WhatsappReciever, Protector


Expand All @@ -22,18 +21,17 @@
filemode='a',
)


def main():
"""
This method is the entry point of the application.
"""
# Create a WhatsApp notifier.
whatsapp_notifier = WhatsappStrategy()
whatsapp_notifier.add_reciever(WhatsappReciever("Gokhan", "tel_no", "api_key"))
whatsapp_notifier.add_reciever(WhatsappReciever("RECIEVER_NAME", "TEL_NO", "API_KEY"))

# Create a Protector within IpAddressStrategy.
ip_address_strategy = IpAddressStrategy()
ip_address_strategy.add_protector(Protector("Gokhan_iPhone", "tel_ip"))
ip_address_strategy.add_protector(Protector("PROTOTECTOR_NAME", "IP_ADDR"))

# Create observer.
hss_observer = HomeSecuritySystemObserver()
Expand All @@ -42,14 +40,16 @@ def main():
# Create subjects to observe.
wifi_subject = WiFiSubject()
wifi_subject.attach(hss_observer)
eye_subject = EyeSubject("images/")
eye_subject = EyeSubject()
eye_subject.attach(hss_observer)

# Run subjects.
wifi_subject.run(ip_address_strategy)

# Set-up the camera to detect humans.
camera = PiCameraStrategy()
camera.set_detector(HogDescriptorStrategy())
eye_subject.run(camera)
camera.set_detector(EfficientdetStrategy())
eye_subject.run(camera, wifi_subject.get_protector_lock())


if __name__ == "__main__":
Expand Down
Binary file added models/efficientdet_1.tflite
Binary file not shown.
Loading