In [16]:
import sys
import numpy as np
import tensorflow as tf
import cv2 
import time  
from memory_profiler import memory_usage
sys.path.append("../inference")
from ssd_mobilenet_tflite_inference import SSDMobileNetTFLiteDetector

In [2]:
def load_labels(labels_path: str) -> dict:
    """
    Load labels from a file into a dictionary.

    Args:
        labels_path (str): Path to the label file.
    
    Returns:
        dict: A dictionary mapping class indices to class names.
    """
    labels = {}
    with open(labels_path, 'r') as f:
        for line in f:
            idx, label = line.strip().split(maxsplit=1)
            labels[int(idx)] = label
    return labels


In [4]:
def inference_process(detector, input_frame_rgb):
    """
    Perform an inference process using an object detector.
    
    This function runs an inference process on a provided input image using
    an object detector. The inference time is measured in milliseconds.
    
    Args:
        detector (object): An object detector that must have a `detect_objects` method.
        input_frame_rgb (numpy.ndarray): A color image in RGB format on which inference will be performed.
    
    Returns:
        float: The inference time in milliseconds.
    """
    
    start_time = time.time()
    num_detections, boxes, classes, scores = detector.detect_objects(input_frame_rgb)
    inference_time = (time.time() - start_time) * 1000 


    return inference_time
    
def profile_function(func, *args, **kwargs):
    """
    Profile the memory usage of a function.

    This function measures the memory usage of a given function using the
    memory_usage function from the `memory_profiler` library.

    Args:
        func (callable): The function to be profiled.
        *args: Variable length argument list for the function.
        **kwargs: Arbitrary keyword arguments for the function.

    Returns:
        list: A list of adjusted memory usage measurements over time.
    """

    # Use memory_usage to measure the memory usage of the function
    mem_usage = memory_usage((func, args, kwargs), interval=0.0001)

    # Adjust memory usage to start from zero
    min_mem_usage = min(mem_usage)

    # Subtract the initial memory from the measurements
    adjusted_mem_usage = [mem - min_mem_usage for mem in mem_usage]

    return adjusted_mem_usage

In [5]:
def process_video_performance(source: str, model_path: str, labels_path: str, confidence_threshold: float = 0.5):
    """
    Processes a video file or stream to perform object detection using a specified model, and measures performance metrics.
    
    Args:
        source (str): The path to the video file or a camera index (if using a webcam).
        model_path (str): The path to the object detection model file.
        labels_path (str): The path to the file containing class labels.
        confidence_threshold (float): The confidence threshold for filtering detected objects. Default is 0.5.
    """
    
    labels = load_labels(labels_path)
    detector = SSDMobileNetTFLiteDetector(model_path)

    if source.isdigit():
        cap = cv2.VideoCapture(int(source))
    else:
        cap = cv2.VideoCapture(source)
    
    if not cap.isOpened():
        print("Error: Unable to open video file.")
        return
    
    frame_count = 0
    total_detections = 0
    total_inference_time = 0.0
    
    while cap.isOpened():
        ret, frame = cap.read()
        
        if not ret:
            break
        
        input_frame = cv2.resize(frame, (300, 300))
        input_frame_rgb = cv2.cvtColor(input_frame, cv2.COLOR_BGR2RGB)
        
        # Détection d'objets
        start_time = time.time()
        num_detections, boxes, classes, scores = detector.detect_objects(input_frame_rgb)
        inference_time = (time.time() - start_time) * 1000
        
        total_detections += num_detections
        total_inference_time += inference_time
        
        for i in range(num_detections):
            if scores[i] > 0.5:
                box = boxes[i]
                y_min, x_min, y_max, x_max = int(box[0] * frame.shape[0]), int(box[1] * frame.shape[1]), int(box[2] * frame.shape[0]), int(box[3] * frame.shape[1])
                cv2.rectangle(frame, (x_min, y_min), (x_max, y_max), (0, 255, 0), 2)
                
                label = labels.get(int(classes[i]), 'Unknown')
                label_text = f'{label}: {scores[i]:.2f}'
                cv2.putText(frame, label_text, (x_min, y_min - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
        
        
        fps = 1000 / inference_time if inference_time > 0 else 0
        cv2.putText(frame, f'FPS: {fps:.2f}, Inference Time: {inference_time:.2f}ms', 
                    (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
        
        cv2.imshow('Object Detection', frame)
        
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
        
        frame_count += 1
    
    avg_inference_time = total_inference_time / frame_count if frame_count > 0 else 0
    print(f"Processed {frame_count} frames.")
    print(f"Total detections: {total_detections}")
    print(f"Average inference time per frame: {avg_inference_time:.2f} ms")
    print(f"Average FPS: {1000 / avg_inference_time:.2f}")

    cap.release()
    cv2.destroyAllWindows()

In [9]:
def process_video_memory_performance(source: str, model_path: str, labels_path: str, confidence_threshold: float = 0.5):
    """
    Processes a video file or stream to perform object detection and profiles memory usage.
    
    Args:
        source (str): The path to the video file or a camera index (if using a webcam).
        model_path (str): The path to the object detection model file.
        labels_path (str): The path to the file containing class labels.
        confidence_threshold (float): The confidence threshold for filtering detected objects. Default is 0.5.
    """

    labels = load_labels(labels_path)
    detector = SSDMobileNetTFLiteDetector(model_path)

    if source.isdigit():
        cap = cv2.VideoCapture(int(source))
    else:
        cap = cv2.VideoCapture(source)
    
    if not cap.isOpened():
        print("Error: Unable to open video file.")
        return
    
    frame_count = 0
    mem_usage_images = []
    while cap.isOpened():
        ret, frame = cap.read()
        
        if not ret:
            break
        
        input_frame = cv2.resize(frame, (300, 300))
        input_frame_rgb = cv2.cvtColor(input_frame, cv2.COLOR_BGR2RGB)
        mem_usage_image = profile_function(inference_process, detector, input_frame_rgb)
        
        mem_usage_images.append(mem_usage_image)

        cv2.imshow('Object Detection', frame)
        
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
        
        frame_count += 1
        
    print(f"Processed {frame_count} frames.")
    print(f"Profiling memory usage for object detection process with SSDMobileNetV1: {np.mean([np.max(test) for test in mem_usage_images])} MiB")

    cap.release()
    cv2.destroyAllWindows()

## Performance evaluation of SSDMobileNetV1 a video stream

In [10]:
video_path = '../data/026c7465-309f6d33.mp4'
model_path = '../models/ssd_mobilenet_tflite/ssd_mobilenet.tflite'
label_path = '../models/label_map.txt'
confidence_threshold = 0.6
process_video_performance(video_path, model_path, label_path, confidence_threshold)

Processed 45 frames.
Total detections: 460
Average inference time per frame: 33.43 ms
Average FPS: 29.92


## Analysis of model SSDMobileNetV1 memory usage on a video

In [11]:
video_path = '../data/026c7465-309f6d33.mp4'
model_path = '../models/ssd_mobilenet_tflite/ssd_mobilenet.tflite'
label_path = '../models/label_map.txt'
confidence_threshold = 0.6

process_video_memory_performance(video_path, model_path, label_path, confidence_threshold)

Processed 17 frames.
Profiling memory usage for object detection process with SSDMobileNetV1: 0.158203125 MiB
