diff --git a/.gitignore b/.gitignore index aba7bff..144fe3a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,9 +1,9 @@ __pycache__/ .pytest_cache/ .tox/ -.venv/ +.*venv/ .vscode/ .vscode-test/ .vagra DS_Store -hss_venv/ \ No newline at end of file +*.log \ No newline at end of file diff --git a/README.md b/README.md index 8ab4040..ee36d08 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,7 @@ This project is a home security system that uses a Raspberry Pi and a camera, wh ### Installation ```bash +$ sudo apt install -y python3-picamera2 $ virtualenv venv $ source venv/bin/activate $ pip install -r requirements.txt diff --git a/core/observers/observer/hss_observer.py b/core/observers/observer/hss_observer.py index b5fcb33..b0516fd 100644 --- a/core/observers/observer/hss_observer.py +++ b/core/observers/observer/hss_observer.py @@ -7,6 +7,7 @@ from core.observers.subject.wifi_subject import WiFiSubject from core.observers.subject.eye_subject import EyeSubject from core.utils.datatypes import EyeStates, WiFiStates +from core.utils.fileio_adaptor import upload_to_fileio, read_latest_file from core.strategies.notifier.base_notifier_strategy import BaseNotifierStrategy # Add logging support. @@ -34,7 +35,10 @@ def update(self, subject: BaseSubject) -> None: if self.wifi_state == WiFiStates.DISCONNECTED and self.eye_state == EyeStates.DETECTED: logger.info("There is an intruder!") - self._notifier.notify_all("There is an intruder!") + fileio_link = upload_to_fileio( + read_latest_file("~/.home-security-system/images") + ) + self._notifier.notify_all(f"There is an intruder! Here is the image: {fileio_link}.") def set_notifier(self, notifier: BaseNotifierStrategy) -> None: """This method is called when the observer is updated.""" diff --git a/core/observers/subject/eye_subject.py b/core/observers/subject/eye_subject.py index 8f2a691..bec6914 100644 --- a/core/observers/subject/eye_subject.py +++ b/core/observers/subject/eye_subject.py @@ -2,14 +2,17 @@ This class inherits from IBaseSubject. Concretes a subject for Eye/Camera features. """ +import os import logging from datetime import datetime from time import sleep -from threading import Thread +from threading import Thread, Lock +from typing import Optional import cv2 from core.utils.datatypes import EyeStates, EyeStrategyResult +from core.utils.fileio_adaptor import upload_to_fileio from core.observers.subject.base_subject import BaseSubject from core.strategies.eye.base_eye_strategy import BaseEyeStrategy @@ -22,37 +25,61 @@ class EyeSubject(BaseSubject): This class inherits from IBaseSubject. Concretes a subject for Eye/Camera features. """ + DEFAULT_IMAGE_LOCATIONS: str = "~/.home-security-system/images" DEFAULT_SLEEP_INTERVAL = 10 SLEEP_INTERVAL_DETECTED = 5 - def __init__(self, image_path: str): + def __init__(self, image_path: str = DEFAULT_IMAGE_LOCATIONS): super().__init__() - self._image_path = image_path + self._image_path = ( + image_path + if '~' not in image_path + else os.path.expanduser(image_path) + ) + + # Create the default image directory if not exists. + os.makedirs(self._image_path, exist_ok=True) @staticmethod def get_default_state() -> EyeStates: """This method is called when the observer is updated.""" return EyeStates.UNREACHABLE - def run(self, eye_strategy: BaseEyeStrategy) -> None: + def run(self, + eye_strategy: BaseEyeStrategy, + wifi_lock: Optional[Lock] = None + ) -> None: """This method is called when the observer is updated.""" - thread = Thread(target=self._run_in_loop, args=(self, eye_strategy,)) + thread = Thread(target=self._run_in_loop, args=(self, eye_strategy, wifi_lock)) thread.start() logger.debug("EyeSubject is running...") @staticmethod - def _run_in_loop(self, eye_strategy: BaseEyeStrategy) -> None: + def _run_in_loop(self, + eye_strategy: BaseEyeStrategy, + wifi_lock: Optional[Lock] = None + ) -> None: """This method is called when the observer is updated.""" sleep_interval = EyeSubject.DEFAULT_SLEEP_INTERVAL + # Create a dummy lock instance if not given. + if wifi_lock is None: + wifi_lock = Lock() + while True: - result = eye_strategy.check_if_detected() - logger.debug("EyeStrategyResult: " + str(result.result)) + # If WiFi subject would give rights to use camera, + # Check if any intruders detected. + if not wifi_lock.locked(): + result = eye_strategy.check_if_detected() + logger.debug("EyeStrategyResult: " + str(result.result)) - if result.result: - self.set_state(EyeStates.DETECTED) - self._save_image(result) - sleep_interval = EyeSubject.SLEEP_INTERVAL_DETECTED + if result.result: + self.set_state(EyeStates.DETECTED) + self._save_image(result) + sleep_interval = EyeSubject.SLEEP_INTERVAL_DETECTED + + # If the WiFi subject does not give rights, + # aka: "There is protectors around the house." else: self.set_state(EyeStates.NOT_DETECTED) sleep_interval = EyeSubject.DEFAULT_SLEEP_INTERVAL diff --git a/core/observers/subject/wifi_subject.py b/core/observers/subject/wifi_subject.py index a7503d1..e63e39a 100644 --- a/core/observers/subject/wifi_subject.py +++ b/core/observers/subject/wifi_subject.py @@ -3,8 +3,9 @@ Concretes a subject WiFi features. """ import logging -from threading import Thread +from threading import Thread, Lock from time import sleep +from typing import Optional from core.utils.datatypes import WiFiStates from core.observers.subject.base_subject import BaseSubject @@ -19,6 +20,8 @@ class WiFiSubject(BaseSubject): This class inherits from IBaseSubject. Concretes a subject for WiFiS features. """ + SINGLETON_LOCK: Optional[Lock] = None + @staticmethod def get_default_state() -> WiFiStates: """This method is called when the observer is updated.""" @@ -29,16 +32,32 @@ def run(self, wifi_strategy: BaseWiFiStrategy) -> None: thread = Thread(target=self._run_in_loop, args=(self, wifi_strategy,)) thread.start() logger.debug("WiFiSubject is running...") + + @classmethod + def get_protector_lock(cls) -> Lock: + """This method returns a Lock object where it can be + used to block camera when there is a WiFi connection + from protectors. + """ + if cls.SINGLETON_LOCK is None: + cls.SINGLETON_LOCK = Lock() + return cls.SINGLETON_LOCK @staticmethod def _run_in_loop(self, wifi_strategy: BaseWiFiStrategy) -> None: """This method is called when the observer is updated.""" + protector_lock: Lock = self.get_protector_lock() + while True: protectors = wifi_strategy.check_protectors() logger.debug("Protectors: " + str(protectors.result) + " " + str(protectors.protector)) if protectors.result: self.set_state(WiFiStates.CONNECTED) + if not protector_lock.locked(): + protector_lock.acquire() else: self.set_state(WiFiStates.DISCONNECTED) + if protector_lock.locked(): + protector_lock.release() sleep(5) diff --git a/core/strategies/detectors/efficientdet_strategy.py b/core/strategies/detectors/efficientdet_strategy.py new file mode 100644 index 0000000..a5a16b3 --- /dev/null +++ b/core/strategies/detectors/efficientdet_strategy.py @@ -0,0 +1,81 @@ +""" +An TinyML detection technique using Efficientdet model. +""" +import time +from typing import Any +import cv2 +import numpy +from tflite_runtime.interpreter import Interpreter +from .base_detector_strategy import BaseDetectorStrategy, DetectorResult + + +class EfficientdetStrategy(BaseDetectorStrategy): + """ + The Efficientdet strategy for detection of objects. + """ + MODEL_PATH: str = "models/efficientdet_1.tflite" + LABEL_PATH: str = "models/efficientdet_1_labelmap.txt" + DETECTION_THRES: float = 0.35 + + @classmethod + def detect_humans(cls, frame: numpy.ndarray) -> DetectorResult: + """This method detects if there are any humans in the frame.""" + # Create an model interpreter. + interpreter: Interpreter = Interpreter(model_path=cls.MODEL_PATH) + interpreter.allocate_tensors() + + # Get model input and output details. + input_details: list[dict[str, Any]] = interpreter.get_input_details() + output_details: list[dict[str, Any]] = interpreter.get_output_details() + _, input_height, input_width, _ = input_details[0]['shape'] + + # Prepare image for input-tensor. + image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + image = cv2.resize(image, (input_width, input_height), interpolation=cv2.INTER_AREA) + image_height, image_width = image.shape[:2] + + # Apply the frame into first tensor of the model. + input_data = numpy.expand_dims(image, axis=0) + interpreter.set_tensor(input_details[0]['index'], input_data) + + # Calculate the output tensor. + interpreter.invoke() + + # Recieve the output. + boxes = interpreter.get_tensor(output_details[0]['index'])[0] + classes = interpreter.get_tensor(output_details[1]['index'])[0] + scores = interpreter.get_tensor(output_details[2]['index'])[0] + + # Read label-map. + with open(cls.LABEL_PATH, 'r', encoding="utf-8") as labelmap: + labels = [line.strip() for line in labelmap.readlines()] + + # Create color legend for each class type. + colors = numpy.random.randint(0, 255, size=(len(labels), 3), dtype='uint8') + + # Convert RGB to BGR again. + image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) + + # Travers through detections. + detection_regions: list[tuple[int, int, int, int]] = [] + for score, box, pred_class in zip(scores, boxes, classes): + if score < cls.DETECTION_THRES: + continue + + class_name = labels[int(pred_class)] + if class_name == "person": + min_y = round(box[0] * image_height) + min_x = round(box[1] * image_width) + max_y = round(box[2] * image_height) + max_x = round(box[3] * image_width) + detection_regions.append((min_x, max_x, min_y, max_y)) + + cv2.rectangle(image, (min_x, min_y), (max_x, max_y), (0, 255, 0), 2) + + result = DetectorResult( + image=image, + human_found=len(detection_regions) > 0, + regions=detection_regions, + num_detections=len(detection_regions), + ) + return result diff --git a/core/strategies/eye/base_eye_strategy.py b/core/strategies/eye/base_eye_strategy.py index 9120168..d6c54f4 100644 --- a/core/strategies/eye/base_eye_strategy.py +++ b/core/strategies/eye/base_eye_strategy.py @@ -34,8 +34,4 @@ def check_if_detected(self) -> EyeStrategyResult: frame = self.get_frame() # Detect humans in the frame. result = self._detect_humans(frame) - - if result.human_found: - return EyeStrategyResult(image=frame, result=True) - return EyeStrategyResult(image=frame, result=False) - + return EyeStrategyResult(image=result.image, result=result.human_found) diff --git a/core/strategies/notifier/whatsapp_strategy.py b/core/strategies/notifier/whatsapp_strategy.py index cd6bd83..67f872e 100644 --- a/core/strategies/notifier/whatsapp_strategy.py +++ b/core/strategies/notifier/whatsapp_strategy.py @@ -32,7 +32,7 @@ def _send_message(self, reciever: WhatsappReciever, message: str) -> bool: response = requests.get(request_url, timeout=10) # Check if the request was unsuccessful. - if response.status_code != 200: + if response.status_code != 200 or "ERROR" in response.text: logger.error("Failed to send WhatsApp message to %s", reciever.telephone_number) logger.error("Status code: %s", response.status_code) return False diff --git a/core/utils/fileio_adaptor.py b/core/utils/fileio_adaptor.py new file mode 100644 index 0000000..d16d19b --- /dev/null +++ b/core/utils/fileio_adaptor.py @@ -0,0 +1,32 @@ +from typing import Any +import requests +from requests.auth import HTTPBasicAuth +import os +import glob + +def read_latest_file(dir_path: str) -> str: + """This method reads the latest file from the given directory.""" + # Check if ~ is used. + if dir_path.startswith("~"): + dir_path = os.path.expanduser(dir_path) + + # Check if the directory exists. + if not os.path.exists(dir_path): + raise FileNotFoundError(f"The given directory path does not exist: {dir_path}") + + # Get the latest file. + list_of_files = glob.glob(dir_path + '/*') + return max(list_of_files, key=os.path.getctime) + +def upload_to_fileio(file_path: str) -> str: + """Uploads a image file to File.io server.""" + response = requests.post( + 'https://file.io/', + files={"file": open(file_path, 'rb')}, + auth=HTTPBasicAuth("API_KEY_HERE", '') + ) + res: dict[str, Any] = response.json() + if res['success'] == True: + return res['link'] + else: + return "File upload failed!" diff --git a/main.py b/main.py index 55df18b..d9e16e3 100644 --- a/main.py +++ b/main.py @@ -6,10 +6,9 @@ from core.observers.subject.wifi_subject import WiFiSubject from core.observers.observer.hss_observer import HomeSecuritySystemObserver from core.strategies.wifi.ipaddress_strategy import IpAddressStrategy -from core.strategies.eye.usbcamera_strategy import UsbCameraStrategy from core.strategies.eye.picamera_strategy import PiCameraStrategy from core.strategies.notifier.whatsapp_strategy import WhatsappStrategy -from core.strategies.detectors.hog_descriptor_strategy import HogDescriptorStrategy +from core.strategies.detectors.efficientdet_strategy import EfficientdetStrategy from core.utils.datatypes import WhatsappReciever, Protector @@ -22,18 +21,17 @@ filemode='a', ) - def main(): """ This method is the entry point of the application. """ # Create a WhatsApp notifier. whatsapp_notifier = WhatsappStrategy() - whatsapp_notifier.add_reciever(WhatsappReciever("Gokhan", "tel_no", "api_key")) + whatsapp_notifier.add_reciever(WhatsappReciever("RECIEVER_NAME", "TEL_NO", "API_KEY")) # Create a Protector within IpAddressStrategy. ip_address_strategy = IpAddressStrategy() - ip_address_strategy.add_protector(Protector("Gokhan_iPhone", "tel_ip")) + ip_address_strategy.add_protector(Protector("PROTOTECTOR_NAME", "IP_ADDR")) # Create observer. hss_observer = HomeSecuritySystemObserver() @@ -42,14 +40,16 @@ def main(): # Create subjects to observe. wifi_subject = WiFiSubject() wifi_subject.attach(hss_observer) - eye_subject = EyeSubject("images/") + eye_subject = EyeSubject() eye_subject.attach(hss_observer) # Run subjects. wifi_subject.run(ip_address_strategy) + + # Set-up the camera to detect humans. camera = PiCameraStrategy() - camera.set_detector(HogDescriptorStrategy()) - eye_subject.run(camera) + camera.set_detector(EfficientdetStrategy()) + eye_subject.run(camera, wifi_subject.get_protector_lock()) if __name__ == "__main__": diff --git a/models/efficientdet_1.tflite b/models/efficientdet_1.tflite new file mode 100644 index 0000000..d61a02a Binary files /dev/null and b/models/efficientdet_1.tflite differ diff --git a/models/efficientdet_1_labelmap.txt b/models/efficientdet_1_labelmap.txt new file mode 100644 index 0000000..695772d --- /dev/null +++ b/models/efficientdet_1_labelmap.txt @@ -0,0 +1,90 @@ +person +bicycle +car +motorcycle +airplane +bus +train +truck +boat +traffic light +fire hydrant +??? +stop sign +parking meter +bench +bird +cat +dog +horse +sheep +cow +elephant +bear +zebra +giraffe +??? +backpack +umbrella +??? +??? +handbag +tie +suitcase +frisbee +skis +snowboard +sports ball +kite +baseball bat +baseball glove +skateboard +surfboard +tennis racket +bottle +??? +wine glass +cup +fork +knife +spoon +bowl +banana +apple +sandwich +orange +broccoli +carrot +hot dog +pizza +donut +cake +chair +couch +potted plant +bed +??? +dining table +??? +??? +toilet +??? +tv +laptop +mouse +remote +keyboard +cell phone +microwave +oven +toaster +sink +refrigerator +??? +book +clock +vase +scissors +teddy bear +hair drier +toothbrush diff --git a/requirements.txt b/requirements.txt index 68f6ccd..668d212 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,4 +4,5 @@ idna==3.4 numpy==1.26.1 opencv-python==4.8.1.78 requests==2.31.0 +tflite-runtime==2.14.0 urllib3==2.0.7