# Console Deployment

This notebook showcases how the **Drowning Detection System** is applied directly within the notebook for:

- **Video File Inference**  
  We run inference on pre-recorded swimming pool footage to detect and classify human behaviors such as *swimming*, *treading water*, and *drowning*.

- **Live Webcam Streaming**  
  The system captures frames from a live webcam feed, overlays predictions (with timestamp), and streams the output back in real time. This emulates real-world deployment for poolside surveillance or lifeguard assist systems.

In [1]:
from dotenv import load_dotenv

load_dotenv()

import sys
import os

sys.path.append(os.getenv('SRC_DIR'))

from utils.system import display_system_info

display_system_info(markdown=True)


**Last Updated**: 2025-04-22 13:48:34

**Python Version**: 3.11.12  
**OS**: Windows 10.0.26100  
**Architecture**: 64bit  
**Hostname**: ShenLaptop  
**Processor**: Intel64 Family 6 Model 186 Stepping 3, GenuineIntel  
**RAM Size**: 15.65 GB  
  
        

In [2]:
from ultralytics import YOLO
from classify import TorchClassifier
import cv2
from pygame import mixer
from datetime import datetime
import numpy as np

from collections import deque

pygame 2.6.1 (SDL 2.28.4, Python 3.11.12)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [6]:
YOLO_PATH = r"C:\Users\hp\Downloads\Drowning-Detection\models\detection\YOLO\jrom_yolo11n.pt"
# r"C:\Users\hp\Downloads\Drowning-Detection\models\detection\YOLO\jrom_best.pt"
CNN_PATH = r"C:\Users\hp\Downloads\Drowning-Detection\models\classification\CNN\test.pt"
SIREN_PATH = r"C:\Users\hp\Downloads\Drowning-Detection\data\audio\siren.wav"
VIDEO_PATH = r"C:\Users\hp\Downloads\Drowning-Detection\data\videos\manyswimmers.mp4"
# r"C:\Users\hp\Downloads\Drowning-Detection\data\videos\3397733457-preview.mp4"
LOOKBACK = 20
SENSITIVITY = 0.5

mixer.init()
mixer.music.load(SIREN_PATH)

detection_model = YOLO(YOLO_PATH)
classification_model = TorchClassifier(model="CNNClassifier", model_path=CNN_PATH)
siren = mixer.music

obj_confs_info = {}
class_confs_info = {}
predicted_classes_info = {}

In [7]:
def classify_activity(roi):
    import torch
    from torchvision import transforms
    from PIL import Image

    class RGBToHSV:
        def __call__(self, img):
            # Ensure the image is in PIL format before converting
            if isinstance(img, torch.Tensor):
                img = transforms.ToPILImage()(img)  # Convert tensor to PIL image

            # Convert the image to HSV using PIL
            img_hsv = img.convert("HSV")

            return img_hsv

    roi_pil = Image.fromarray(cv2.cvtColor(roi, cv2.COLOR_BGR2RGB))

    transform = transforms.Compose([
        transforms.Resize((128, 128)),
        RGBToHSV(),
        transforms.ToTensor()
    ])

    prediction = classification_model(
        img=roi_pil,
        transform=transform,
        prob=True
    )

    return prediction.cpu().numpy()


def detect_drowning(frame, sensitivity, lookback, activate_siren):
    CLASS_IDX_TO_NAME = {
        0: 'drowning',
        1: 'swimming',
        2: 'treadwater'
    }

    CLASS_NAME_TO_IDX = {
        value: key
        for key, value in CLASS_IDX_TO_NAME.items()
    }

    annotated_frame = frame.copy()

    result = detection_model.track(
        frame, persist=True, tracker="botsort.yaml", verbose=False)[0]

    if result.boxes and result.boxes.id is not None:
        obj_boxes = result.boxes.xyxy.cpu().numpy()
        obj_ids = result.boxes.id.cpu().numpy()
        obj_confs = result.boxes.conf.cpu().numpy()

        for box, obj_id, obj_conf in zip(obj_boxes, obj_ids, obj_confs):
            if obj_id not in obj_confs_info or obj_id not in class_confs_info or obj_id not in predicted_classes_info:
                current_obj_conf = None
                current_class_vec = None
                predicted_class_history = deque(maxlen=lookback)
            else:
                current_obj_conf = obj_confs_info[obj_id]
                current_class_vec = class_confs_info[obj_id]
                predicted_class_history = predicted_classes_info[obj_id]

            new_obj_conf = obj_confs_info[obj_id] = (
                (1 - sensitivity) * current_obj_conf + sensitivity * obj_conf
                if current_obj_conf is not None else
                obj_conf
            )

            if new_obj_conf < 0.5:
                continue

            x1, y1, x2, y2 = map(int, box)
            roi = frame[y1:y2, x1:x2]
            class_vec = classify_activity(roi)

            new_class_vec = class_confs_info[obj_id] = (
                (1 - sensitivity) * current_class_vec + sensitivity * class_vec
                if current_class_vec is not None else
                class_vec
            )

            predicted_class_idx = int(np.argmax(new_class_vec))
            predicted_class_history.append(predicted_class_idx)
            predicted_classes_info[obj_id] = predicted_class_history

            if len(predicted_class_history) < lookback:
                continue

            predicted_class_name = CLASS_IDX_TO_NAME[max(
                set(predicted_class_history), key=predicted_class_history.count)]
            # TODO: For testing only
            # predicted_class_name = "drowning" if predicted_class_name == "treadwater" else predicted_class_name
            # Normalize to 0–1
            drowning_prob = min(max(new_class_vec[CLASS_NAME_TO_IDX["drowning"]], 0), 100)

            # Interpolate: red = (255, 0, 0), green = (0, 255, 0)
            bgr = 0, int(255 * (1 - drowning_prob)), int(255 * drowning_prob)

            cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), bgr, 2)
            cv2.putText(annotated_frame, f'{int(obj_id)} {predicted_class_name} {new_class_vec[predicted_class_idx] * 100:.2f}%', (
                x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, bgr, 2)

            if obj_conf < 0.5:
                continue

            if predicted_class_name == "drowning":
                activate_siren = True

                if activate_siren and not siren.get_busy():
                    siren.play()

                # log_drowning_info(obj_id, drowning_prob, roi)

    if not activate_siren and siren.get_busy():
        siren.fadeout(1000)

    cv2.putText(annotated_frame, f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
                (10, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)

    return annotated_frame, activate_siren

In [8]:
cap = cv2.VideoCapture(VIDEO_PATH)

if not cap.isOpened():
    print("Error: Unable to access the camera.")
    sys.exit(-1)

while True:
    ret, frame = cap.read()
    if not ret:
        print("Error: Unable to read from the camera.")
        break

    activate_siren = False

    annotated_frame, activate_siren = detect_drowning(frame, SENSITIVITY, LOOKBACK, activate_siren)

    cv2.imshow("Console Deployment", annotated_frame)

    if not activate_siren and siren.get_busy():
        siren.fadeout(1000)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
if not activate_siren and siren.get_busy():
    siren.fadeout(1000)
cv2.destroyAllWindows()

Error: Unable to read from the camera.
