In [1]:
import cv2

import sys
import os
import datetime
import threading
import queue


# Add the src directory to the sys.path list
sys.path.insert(0, os.path.abspath('../src'))
from services.event_service import add_event
from db_models.init_db import create_tables
from surveillance_daemon.object_detector import ObjectDetector
from surveillance_daemon.video_capture import VideoCapture
from surveillance_daemon.motion_detector import MotionDetector
from surveillance_daemon.video_recorder import VideoRecorder
from surveillance_daemon.notification import Notification

In [2]:
# Initialize the database session
create_tables()

# Initialize system components
object_detector = ObjectDetector(
    "../models/yolov7-tiny/yolov7-tiny.cfg",
    "../models/yolov7-tiny/yolov7-tiny.weights",
    "../data/coco.names",
)
motion_detector = MotionDetector()
notification = Notification(to_email="gunes314@gmail.com")

2024-05-17 22:21:20,062 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-05-17 22:21:20,062 INFO sqlalchemy.engine.Engine PRAGMA main.table_info("events")
2024-05-17 22:21:20,062 INFO sqlalchemy.engine.Engine [raw sql] ()
2024-05-17 22:21:20,063 INFO sqlalchemy.engine.Engine COMMIT


In [3]:
# Queue to store video paths to be processed
video_queue = queue.Queue()

def worker():
    """Worker thread that processes video files from the queue."""
    while True:
        video_path = video_queue.get()
        if video_path is None:
            break
        detected_objects, video_path = object_detector.process_video(video_path)
        add_event(video_path, str(detected_objects))
        # send a notification if an object is detected
        if detected_objects:
            notification.send(detected_objects)
        video_queue.task_done()

# Create and start the worker thread
num_worker_threads = 1
threads = []
for i in range(num_worker_threads):
    t = threading.Thread(target=worker)
    t.start()
    threads.append(t)

In [4]:
recording = False
start_time = None

with VideoCapture(device=0) as camera:
    while True:
        frame = camera.read_frame()
        if frame is None:
            break

        font = cv2.FONT_HERSHEY_SIMPLEX
        color = (0, 255, 0)

        # Detect motion and start recording video
        if motion_detector.detect_motion(frame):
            cv2.putText(frame, "M", (10, 30), font, 0.5, color, 2)
            if not recording:
                recording = True
                start_time = datetime.datetime.now()
                video_recorder = VideoRecorder(folder="../videos", fps=camera.get_fps(), width=camera.get_frame_width(), height=camera.get_frame_height())
        
        # Record video for 5 seconds
        if recording:
            cv2.putText(frame, "R", (30, 30), font, 0.5, color, 2)
            video_recorder.write_frame(frame)
            if (datetime.datetime.now() - start_time).total_seconds() > 5:
                # Start object detection on a separate thread when video recording stops
                video_queue.put(video_recorder.get_video_path())

                # Clear the video recorder object
                recording = False
                video_recorder.release()
                del video_recorder

        cv2.imshow("Object detection", frame)
        if cv2.waitKey(1) == 27:
            break

    cv2.destroyAllWindows()

2024-05-17 22:21:31,114 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-05-17 22:21:31,115 INFO sqlalchemy.engine.Engine INSERT INTO events (video_path, objects, created_at) VALUES (?, ?, ?)
2024-05-17 22:21:31,115 INFO sqlalchemy.engine.Engine [generated in 0.00026s] ('../videos/2024-05-17T22:21:24_annotated.avi', "{'sofa', 'tvmonitor', 'bed'}", datetime.datetime(2024, 5, 17, 20, 21, 19, 809970, tzinfo=datetime.timezone.utc))
2024-05-17 22:21:31,116 INFO sqlalchemy.engine.Engine COMMIT
{'sofa', 'tvmonitor', 'bed'}


Releasing camera


In [None]:
object_detector.process_video("../videos/2024-05-15T23:14:42.avi")

In [None]:
camera.release()
cv2.destroyAllWindows()