# Object Detection in Surveillance Videos

This notebook demonstrates how to use the Detector class from our project to identify objects in videos from an Aqara security camera.

In [1]:
%matplotlib inline
%load_ext autoreload
%autoreload 2

import os
import time
from pathlib import Path
import threading
from queue import Queue

import cv2
import numpy as np
import ipywidgets as widgets
from IPython.display import display, HTML, clear_output

import torch
from PIL import Image as PILImage
from torchvision import transforms

from aqara_video.core.factory import TimelineFactory
from aqara_video.core.clip import Clip
from aqara_video.cli.video_loop import VideoLoop
from aqara_video.ml.detector import Detector

## Setup the Object Detector

We'll use the `Detector` class from our project, which uses a pre-trained Faster R-CNN model with a MobileNet backbone.

In [2]:
# Check if CUDA is available and set the device accordingly
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")

# Initialize the detector with a batch size of 1 for single frame processing
detector = Detector(device=device, batch_size=1)

# Display the available object categories
print(f"The detector can recognize {len(detector.labels)} objects:")
print(", ".join(detector.labels[:20]) + "..." if len(detector.labels) > 20 else "")

Using device: cuda
The detector can recognize 91 objects:
__background__, person, bicycle, car, motorcycle, airplane, bus, train, truck, boat, traffic light, fire hydrant, N/A, stop sign, parking meter, bench, bird, cat, dog, horse...


## Enhanced Detector Features

Our `Detector` class now includes performance optimizations for faster inference:

1. **Batch Processing**: Can process multiple frames at once for better throughput
2. **Optimized Preprocessing**: Direct BGR to RGB conversion without PIL intermediary
3. **Memory Efficiency**: Pre-allocated transform operations and tensor handling
4. **Hardware Acceleration**: Proper utilization of GPU when available

## Create a VideoLoop for bounding box drawing

We'll use the VideoLoop class from our project to draw bounding boxes on detected objects.

In [3]:
# Create a VideoLoop instance just to use its drawing functions
video_loop = VideoLoop(video_producer=None)


# Helper function to process a frame with the detector and draw bounding boxes
def process_frame(frame, threshold=0.5):
    # Preprocess the frame for the model
    tensor = detector.preprocess(frame)

    # Make predictions
    predictions = detector.predict(tensor)

    # Draw bounding boxes on a copy of the frame
    result_frame = frame.copy()
    result_frame = video_loop.draw_boxes(result_frame, predictions, threshold=threshold)

    return result_frame, predictions

## Load Timeline and Clips

Now let's load a timeline from your camera directory.

In [4]:
# Set the path to your camera directory
# Update this path to point to your Aqara camera directory
# CAMERA_DIR = Path("/mnt/hdd/diegocaro/aqara_video/lumi1.54ef44457bc9")
CAMERA_DIR = Path("/mnt/hdd/diegocaro/aqara_video/lumi1.54ef44603857")

# Create a timeline
timeline = TimelineFactory.create_timeline(CAMERA_DIR)
print(f"Camera ID: {timeline.camera_id}")
print(f"Number of clips: {len(timeline)}")

# Get available dates
dates = timeline.get_available_dates()
print(f"Available dates: {dates[:5]}{'...' if len(dates) > 5 else ''}")

Camera ID: lumi1.54ef44603857
Number of clips: 33516
Available dates: [datetime.date(2025, 2, 6), datetime.date(2025, 2, 7), datetime.date(2025, 2, 8), datetime.date(2025, 2, 9), datetime.date(2025, 2, 10)]...


## Create UI Controls for Selecting Clips and Configuration

Let's create UI elements to select a date and clip to process.

In [5]:
# Create widgets for date and clip selection
date_dropdown = widgets.Dropdown(
    options=[(str(date), date) for date in dates],
    description="Date:",
    disabled=False,
)

# Will be populated after date selection
clip_dropdown = widgets.Dropdown(
    options=[],
    description="Clip:",
    disabled=True,
)

# Confidence threshold slider
threshold_slider = widgets.FloatSlider(
    value=0.5,
    min=0.1,
    max=0.9,
    step=0.05,
    description="Threshold:",
    disabled=False,
    continuous_update=False,
    orientation="horizontal",
    readout=True,
    readout_format=".2f",
)

# Batch size slider for optimized detector
batch_size_slider = widgets.IntSlider(
    value=4,
    min=1,
    max=16,
    step=1,
    description="Batch Size:",
    disabled=False,
    continuous_update=False,
    orientation="horizontal",
    readout=True,
)

# Frame sampling rate slider
sample_rate_slider = widgets.IntSlider(
    value=2,
    min=1,
    max=10,
    step=1,
    description="Sample every n frames:",
    disabled=False,
    continuous_update=False,
    orientation="horizontal",
    readout=True,
)

# Processing status text
status_text = widgets.HTML(value="")


# Function to update clips dropdown when date changes
def on_date_change(change):
    if change["type"] == "change" and change["name"] == "value":
        selected_date = change["new"]
        # Filter clips for the selected date
        date_clips = [
            clip for clip in timeline.clips if clip.timestamp.date() == selected_date
        ]
        # Sort by timestamp
        date_clips.sort(key=lambda clip: clip.timestamp)
        # Update dropdown options
        clip_dropdown.options = [
            (clip.timestamp.strftime("%H:%M:%S"), clip) for clip in date_clips
        ]
        clip_dropdown.disabled = False


# Register the callback
date_dropdown.observe(on_date_change)

# Display the widgets
display(
    widgets.VBox(
        [
            date_dropdown,
            clip_dropdown,
            widgets.HBox([threshold_slider, batch_size_slider, sample_rate_slider]),
            status_text,
        ]
    )
)

VBox(children=(Dropdown(description='Date:', options=(('2025-02-06', datetime.date(2025, 2, 6)), ('2025-02-07'…

## Process and Display Video with Optimized Object Detection

Now let's create functions to process and display a selected clip with our enhanced batch processing.

In [6]:
# Create an image widget for displaying video frames
output_widget = widgets.Image(format="jpeg", width=800, height=600)
display(output_widget)

# Global variables for video processing
processing_thread = None
stop_processing = threading.Event()


def process_clip_batch(clip, threshold=0.5, batch_size=4, sample_rate=2):
    """Process a clip using batch processing capabilities"""
    status_text.value = f"<b>Processing clip {clip.timestamp} with batch size {batch_size}, sampling every {sample_rate} frames</b>"
    stop_processing.clear()

    # Create detector with specified batch size
    batch_detector = Detector(device=device, batch_size=batch_size)

    # Process in a separate thread
    def process():
        # Store processed frames and their results for display
        processed_frames = {}
        prediction_results = {}

        # Collect frames for batch processing
        batch_frames = []
        batch_frame_ids = []

        for frame_id, frame in clip.frames():
            if stop_processing.is_set():
                break

            # Only process every n frames
            if frame_id % sample_rate != 0:
                continue

            # Store the original frame
            processed_frames[frame_id] = frame.copy()

            # Add to batch for processing
            batch_frames.append(frame)
            batch_frame_ids.append(frame_id)

            # Process batch when it reaches the target size
            if len(batch_frames) >= batch_size:
                batch_results = batch_detector.predict_batch(
                    batch_frame_ids, batch_frames
                )

                # Update prediction results with batch results
                for res_frame_id, predictions in batch_results:
                    prediction_results[res_frame_id] = predictions

                    # Update display with the processed frame
                    display_processed_frame(
                        res_frame_id, predictions, processed_frames, threshold
                    )

                # Clear batch buffers after processing
                batch_frames = []
                batch_frame_ids = []

        # Process any remaining frames in the batch
        if batch_frames:
            batch_results = batch_detector.predict_batch(batch_frame_ids, batch_frames)
            for res_frame_id, predictions in batch_results:
                prediction_results[res_frame_id] = predictions
                display_processed_frame(
                    res_frame_id, predictions, processed_frames, threshold
                )

        status_text.value = f"<b>Completed processing clip {clip.timestamp}</b>"

    def display_processed_frame(frame_id, predictions, frames_dict, threshold):
        """Helper function to display a processed frame with detections"""
        if frame_id in frames_dict:
            result_frame = frames_dict[frame_id].copy()
            result_frame = video_loop.draw_boxes(
                result_frame, predictions, threshold=threshold
            )

            # Count detections above threshold
            detection_count = 0
            if len(predictions) > 0:
                scores = predictions[0]["scores"]
                detection_count = sum(1 for score in scores if score > threshold)

            # Update status with frame info
            status_text.value = (
                f"<b>Processed frame {frame_id} - Found {detection_count} objects</b>"
            )

            # Convert to JPEG for display
            _, jpeg_data = cv2.imencode(".jpg", result_frame)
            output_widget.value = jpeg_data.tobytes()

    return threading.Thread(target=process, daemon=True)


# Process button for batch processing
process_button = widgets.Button(
    description="Process with Batching",
    disabled=False,
    button_style="success",
    tooltip="Process the selected clip using batch processing",
    icon="play",
)

# Process button (single frame method)
process_button_single = widgets.Button(
    description="Process Single Frames",
    disabled=False,
    button_style="warning",
    tooltip="Process the selected clip one frame at a time",
    icon="play",
)

# Stop button
stop_button = widgets.Button(
    description="Stop Processing",
    disabled=False,
    button_style="danger",
    tooltip="Stop processing",
    icon="stop",
)


def process_clip_single(clip, threshold=0.5):
    """Process a clip and update the output widget using single frame processing"""
    status_text.value = f"<b>Processing clip {clip.timestamp} one frame at a time</b>"
    stop_processing.clear()

    # Create processing function
    def process():
        for frame_id, frame in clip.frames():
            if stop_processing.is_set():
                break

            # Process frame and draw bounding boxes
            result_frame, predictions = process_frame(frame, threshold)

            # Count detections above threshold
            detection_count = 0
            if len(predictions) > 0:
                scores = predictions[0]["scores"]
                detection_count = sum(1 for score in scores if score > threshold)

            # Update status with frame info
            status_text.value = (
                f"<b>Frame {frame_id} - Found {detection_count} objects</b>"
            )

            # Convert to JPEG for display
            _, jpeg_data = cv2.imencode(".jpg", result_frame)
            # Update the image widget
            output_widget.value = jpeg_data.tobytes()

        status_text.value = f"<b>Completed processing clip {clip.timestamp}</b>"

    return threading.Thread(target=process, daemon=True)


def on_process_button_batch_click(b):
    global processing_thread
    # Stop any existing processing
    if processing_thread and processing_thread.is_alive():
        stop_processing.set()
        processing_thread.join(timeout=1)

    if clip_dropdown.value:
        # Start new processing thread with batch processing
        processing_thread = process_clip_batch(
            clip_dropdown.value,
            threshold_slider.value,
            batch_size_slider.value,
            sample_rate_slider.value,
        )
        processing_thread.start()
    else:
        status_text.value = "<b>Please select a clip first</b>"


def on_process_button_single_click(b):
    global processing_thread
    # Stop any existing processing
    if processing_thread and processing_thread.is_alive():
        stop_processing.set()
        processing_thread.join(timeout=1)

    if clip_dropdown.value:
        # Start new processing thread with original method
        processing_thread = process_clip_single(
            clip_dropdown.value, threshold_slider.value
        )
        processing_thread.start()
    else:
        status_text.value = "<b>Please select a clip first</b>"


def on_stop_button_click(b):
    stop_processing.set()
    status_text.value = "<b>Processing stopped</b>"


process_button.on_click(on_process_button_batch_click)
process_button_single.on_click(on_process_button_single_click)
stop_button.on_click(on_stop_button_click)

# Display the buttons
display(widgets.HBox([process_button, process_button_single, stop_button]))

Image(value=b'', format='jpeg', height='600', width='800')

HBox(children=(Button(button_style='success', description='Process with Batching', icon='play', style=ButtonSt…

## Benchmark Processing Speed

Let's add a function to compare the processing speeds with different batch sizes.

In [9]:
def benchmark_methods(
    clip, num_frames=100, batch_sizes=[1, 2, 4, 8], sample_rates=[1, 2, 3]
):
    """Benchmark different processing methods and parameters"""
    results = []

    # Single frame method first as baseline
    status_text.value = f"<b>Benchmarking single frame method...</b>"
    single_detector = Detector(device=device, batch_size=1)
    start_time = time.time()
    frame_count = 0

    for frame_id, frame in clip.frames():
        tensor = single_detector.preprocess(frame)
        predictions = single_detector.predict(tensor)
        frame_count += 1
        if frame_count >= num_frames:
            break

    orig_time = time.time() - start_time
    fps_orig = frame_count / orig_time
    results.append(("Single Frame", 1, 1, fps_orig))

    # Now benchmark batch method with different batch sizes and sample rates
    for batch_size in batch_sizes:
        for sample_rate in sample_rates:
            status_text.value = f"<b>Benchmarking batch method with size={batch_size}, sample_rate={sample_rate}...</b>"
            batch_detector = Detector(device=device, batch_size=batch_size)

            start_time = time.time()
            frame_count = 0
            processed_count = 0

            # Collect frames for batch processing
            batch_frames = []
            batch_frame_ids = []

            for frame_id, frame in clip.frames():
                frame_count += 1

                # Skip frames based on sample rate
                if frame_id % sample_rate != 0:
                    continue

                processed_count += 1
                batch_frames.append(frame)
                batch_frame_ids.append(frame_id)

                # Process batch when it reaches the target size
                if len(batch_frames) >= batch_size:
                    _ = batch_detector.predict_batch(batch_frame_ids, batch_frames)
                    batch_frames = []
                    batch_frame_ids = []

                if frame_count >= num_frames:
                    break

            # Process any remaining frames
            if batch_frames:
                _ = batch_detector.predict_batch(batch_frame_ids, batch_frames)

            opt_time = time.time() - start_time
            # Calculate FPS based on actual processed frames
            fps_opt = frame_count / opt_time  # Overall FPS including skipped frames
            effective_fps = processed_count / opt_time  # Effective processing rate

            results.append(
                (f"Batch Size {batch_size}", batch_size, sample_rate, fps_opt)
            )

    # Display results as HTML table
    html = "<h3>Benchmark Results</h3>"
    html += "<table border='1'>"
    html += "<tr><th>Method</th><th>Batch Size</th><th>Sample Rate</th><th>FPS</th><th>Speedup</th></tr>"

    baseline_fps = results[0][3]  # Single frame method FPS

    for method, batch_size, sample_rate, fps in results:
        speedup = fps / baseline_fps
        html += f"<tr><td>{method}</td><td>{batch_size}</td><td>{sample_rate}</td><td>{fps:.2f}</td><td>{speedup:.2f}x</td></tr>"

    html += "</table>"
    html += "<p><b>Note:</b> The FPS values represent total frames processed. With sample_rate > 1, this means some frames are skipped.</p>"

    status_text.value = html


# Benchmark button
benchmark_button = widgets.Button(
    description="Benchmark Methods",
    disabled=False,
    button_style="info",
    tooltip="Benchmark different processing methods",
    icon="dashboard",
)

# Frame count for benchmark
benchmark_frames = widgets.IntSlider(
    value=50,
    min=10,
    max=200,
    step=10,
    description="Frames to benchmark:",
    disabled=False,
    continuous_update=False,
    orientation="horizontal",
    readout=True,
)


def on_benchmark_button_click(b):
    if clip_dropdown.value:
        benchmark_methods(clip_dropdown.value, num_frames=benchmark_frames.value)
    else:
        status_text.value = "<b>Please select a clip first</b>"


benchmark_button.on_click(on_benchmark_button_click)

# Display benchmark controls
display(widgets.HBox([benchmark_button, benchmark_frames]))

HBox(children=(Button(button_style='info', description='Benchmark Methods', icon='dashboard', style=ButtonStyl…

Exception in thread Thread-6 (process):
Traceback (most recent call last):
  File "/home/diegocaro/miniconda3/envs/ds/lib/python3.12/threading.py", line 1075, in _bootstrap_inner
    self.run()
  File "/home/diegocaro/miniconda3/envs/ds/lib/python3.12/site-packages/ipykernel/ipkernel.py", line 766, in run_closure
    _threading_Thread_run(self)
  File "/home/diegocaro/miniconda3/envs/ds/lib/python3.12/threading.py", line 1012, in run
    self._target(*self._args, **self._kwargs)
  File "/tmp/ipykernel_1451378/228633817.py", line 28, in process
  File "/home/diegocaro/notebooks/soylentgreen/aqara_video/core/video_reader.py", line 243, in frames
    stream = self.best_stream
             ^^^^^^^^^^^^^^^^
  File "/home/diegocaro/notebooks/soylentgreen/aqara_video/core/video_reader.py", line 139, in best_stream
    return self.metadata.get_best_stream()
           ^^^^^^^^^^^^^
  File "/home/diegocaro/notebooks/soylentgreen/aqara_video/core/video_reader.py", line 128, in metadata
    self.

## Clip Analysis with Batch Processing

We can use batch processing to efficiently analyze a clip and count objects.

In [None]:
def analyze_clip_batch(clip, threshold=0.5, batch_size=4, sample_rate=5):
    """Analyze a clip using batch processing"""
    detection_counts = {}
    processed_frames = 0

    status_text.value = f"<b>Analyzing clip {clip.timestamp} with batch size {batch_size}, sampling every {sample_rate} frames</b>"

    # Create detector with specified batch size
    batch_detector = Detector(device=device, batch_size=batch_size)

    # Collect frames for batch processing
    batch_frames = []
    batch_frame_ids = []

    # Process frames in batches
    for frame_id, frame in clip.frames():
        # Only process every N frames
        if frame_id % sample_rate != 0:
            continue

        processed_frames += 1
        status_text.value = f"<b>Adding frame {frame_id} to batch</b>"

        # Add to batch for processing
        batch_frames.append(frame)
        batch_frame_ids.append(frame_id)

        # Process batch when it reaches the target size
        if len(batch_frames) >= batch_size:
            batch_results = batch_detector.predict_batch(batch_frame_ids, batch_frames)

            # Process detection results
            process_detection_results(batch_results, detection_counts, threshold)

            # Clear batch buffers
            batch_frames = []
            batch_frame_ids = []

    # Process any remaining frames in the batch
    if batch_frames:
        batch_results = batch_detector.predict_batch(batch_frame_ids, batch_frames)
        process_detection_results(batch_results, detection_counts, threshold)

    # Helper function to process detection results
    def process_detection_results(batch_results, detection_counts, threshold):
        for _, predictions in batch_results:
            if len(predictions) > 0:
                scores = predictions[0]["scores"]
                categories = predictions[0]["categories"]

                # Count objects above threshold
                for i, (category, score) in enumerate(zip(categories, scores)):
                    if score > threshold:
                        if category not in detection_counts:
                            detection_counts[category] = 0
                        detection_counts[category] += 1

    # Sort by count (descending)
    sorted_detections = sorted(
        detection_counts.items(), key=lambda x: x[1], reverse=True
    )

    # Prepare results HTML
    result_html = f"<h3>Analysis of clip {clip.timestamp}</h3>"
    result_html += f"<p>Processed {processed_frames} frames (sampling every {sample_rate} frames)</p>"
    result_html += "<h4>Detected Objects:</h4>"
    result_html += "<ul>"
    for category, count in sorted_detections:
        result_html += f"<li><b>{category}</b>: {count} instances</li>"
    result_html += "</ul>"

    status_text.value = result_html


# Analyze button
analyze_button = widgets.Button(
    description="Analyze Clip",
    disabled=False,
    button_style="info",
    tooltip="Analyze the selected clip with batch processing",
    icon="search",
)


def on_analyze_button_click(b):
    if clip_dropdown.value:
        analyze_clip_batch(
            clip_dropdown.value,
            threshold_slider.value,
            batch_size_slider.value,
            sample_rate_slider.value,
        )
    else:
        status_text.value = "<b>Please select a clip first</b>"


analyze_button.on_click(on_analyze_button_click)

# Display the analyze button
display(analyze_button)

## Process a Single Frame

You can also process a single frame from a clip for detailed inspection.

In [None]:
def get_single_frame(clip, frame_num=0):
    """Get a specific frame from a clip"""
    for i, (frame_id, frame) in enumerate(clip.frames()):
        if i == frame_num:
            return frame
    return None


frame_slider = widgets.IntSlider(
    value=0,
    min=0,
    max=100,  # Will be updated when clip is selected
    step=1,
    description="Frame #:",
    disabled=False,
    continuous_update=False,
    orientation="horizontal",
    readout=True,
)

process_frame_button = widgets.Button(
    description="Process Frame",
    disabled=False,
    button_style="primary",
    tooltip="Process the selected frame",
    icon="camera",
)


def on_process_frame_button_click(b):
    if clip_dropdown.value:
        clip = clip_dropdown.value
        frame = get_single_frame(clip, frame_slider.value)
        if frame is not None:
            result_frame, predictions = process_frame(frame, threshold_slider.value)

            # Show detailed detection info
            detection_info = "<h4>Detections:</h4><ul>"
            if len(predictions) > 0:
                boxes = predictions[0]["boxes"]
                scores = predictions[0]["scores"]
                categories = predictions[0]["categories"]

                for i, (box, category, score) in enumerate(
                    zip(boxes, categories, scores)
                ):
                    if score > threshold_slider.value:
                        x1, y1, x2, y2 = box
                        detection_info += f"<li><b>{category}</b> (confidence: {score:.2f}) at [{x1}, {y1}, {x2}, {y2}]</li>"
            detection_info += "</ul>"

            # Convert to JPEG for display
            _, jpeg_data = cv2.imencode(".jpg", result_frame)
            output_widget.value = jpeg_data.tobytes()

            status_text.value = (
                f"<b>Frame {frame_slider.value} from clip {clip.timestamp}</b><br>"
                + detection_info
            )
        else:
            status_text.value = "<b>Could not retrieve the specified frame</b>"
    else:
        status_text.value = "<b>Please select a clip first</b>"


process_frame_button.on_click(on_process_frame_button_click)

# Display frame processing controls
display(widgets.HBox([frame_slider, process_frame_button]))

## Optimization Strategies

In this notebook, we've implemented several strategies to improve prediction speed without changing the ML model:

1. **Batch Processing**: By processing multiple frames in a batch, we leverage PyTorch's ability to parallelize operations, which is much more efficient than processing frames one by one.

2. **Frame Sampling**: Instead of processing every frame, we can skip frames (e.g., process every 2nd or 3rd frame) which drastically reduces the computational load while still providing good results for most applications.

3. **Optimized Preprocessing**: We've streamlined the preprocessing steps to reduce redundant operations.

4. **Memory Management**: Pre-allocating the transforms and reusing tensors where possible helps reduce memory allocation overhead.

5. **Hardware Acceleration**: We ensure proper use of GPU acceleration when available.

You can use the benchmark tool to compare the performance of different configurations and find the optimal settings for your specific hardware.