## Setting Up Your Python Environment

In [1]:
# %%capture
# # Install PyTorch with CUDA
# !pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118

# # Install additional dependencies
# !pip install pandas pillow opencv-python deep-sort-realtime

# # Install ONNX packages
# !pip install onnx onnxruntime

# # Install utility packages
# !pip install cjm_psl_utils cjm_pil_utils

## Importing the Required Dependencies

In [2]:
# Import Python Standard Library dependencies
import json
from pathlib import Path
import random
import time

# Import utility functions
from cjm_psl_utils.core import download_file
from cjm_pil_utils.core import resize_img

# Import YOLOX package
from cjm_yolox_pytorch.model import build_model
from cjm_yolox_pytorch.inference import YOLOXInferenceWrapper

# Import OpenCV
import cv2

# Class for displaying videos in Jupyter notebooks
from IPython.display import Video

# Import DeepSORT package
from deep_sort_realtime.deepsort_tracker import DeepSort, EMBEDDER_CHOICES

# Import numpy
import numpy as np

# Import the pandas package
import pandas as pd

# Import PIL for image manipulation
from PIL import Image, ImageDraw, ImageFont

# Import PyTorch dependencies
import torch

# Import ONNX dependencies
import onnx # Import the onnx module
from onnxsim import simplify # Import the method to simplify ONNX models
import onnxruntime as ort # Import the ONNX Runtime

# Import tqdm for progress bar
from tqdm.auto import tqdm

## Setting Up the Project

### Set the Directory Paths

In [3]:
# The name for the project
project_name = f"pytorch-yolox-object-detector"

# The path for the project folder
project_dir = Path(f"./{project_name}/")

# Create the project directory if it does not already exist
project_dir.mkdir(parents=True, exist_ok=True)

# The path to the checkpoint folder
checkpoint_dir = Path(project_dir/f"2023-08-17_16-14-43")
# checkpoint_dir = Path(project_dir/f"pretrained-coco")

pd.Series({
    "Project Directory:": project_dir,
    "Checkpoint Directory:": checkpoint_dir,
}).to_frame().style.hide(axis='columns')

0,1
Project Directory:,pytorch-yolox-object-detector
Checkpoint Directory:,pytorch-yolox-object-detector/2023-08-17_16-14-43


### Download a Font File

In [4]:
# Set the name of the font file
font_file = 'KFOlCnqEu92Fr1MmEU9vAw.ttf'

# Download the font file
download_file(f"https://fonts.gstatic.com/s/roboto/v30/{font_file}", "./")

The file ./KFOlCnqEu92Fr1MmEU9vAw.ttf already exists and overwrite is set to False.


## Loading the Checkpoint Data

### Load the Colormap

In [5]:
# The colormap path
colormap_path = list(checkpoint_dir.glob('*colormap.json'))[0]

# Load the JSON colormap data
with open(colormap_path, 'r') as file:
        colormap_json = json.load(file)

# Convert the JSON data to a dictionary        
colormap_dict = {item['label']: item['color'] for item in colormap_json['items']}

# Extract the class names from the colormap
class_names = list(colormap_dict.keys())

# Make a copy of the colormap in integer format
int_colors = [tuple(int(c*255) for c in color) for color in colormap_dict.values()]

### Set the Preprocessing and Post-Processing Parameters

In [6]:
max_stride = 32
input_dim_slice = slice(2, 4, None)

## Defining Utility Functions

### Define a Function to Prepare Images for Inference

In [7]:
def prepare_image_for_inference(frame, target_sz, max_stride):

    # Convert the BGR image to RGB
    rgb_img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    # Resize image without cropping to multiple of the max stride
    resized_img = resize_img(rgb_img, target_sz=target_sz, divisor=1)
    
    # Calculating the input dimensions that multiples of the max stride
    input_dims = [dim - dim % max_stride for dim in resized_img.size]
    # Calculate the offsets from the resized image dimensions to the input dimensions
    offsets = (np.array(resized_img.size) - input_dims) / 2
    # Calculate the scale between the source image and the resized image
    min_img_scale = min(rgb_img.size) / min(resized_img.size)
    
    # Crop the resized image to the input dimensions
    input_img = resized_img.crop(box=[*offsets, *resized_img.size - offsets])
    
    return rgb_img, input_dims, offsets, min_img_scale, input_img

### Define Functions to Process YOLOX Output

#### Define a function to generate the output grids

In [8]:
def generate_output_grids_np(height, width, strides=[8,16,32]):
    """
    Generate a numpy array containing grid coordinates and strides for a given height and width.

    Args:
        height (int): The height of the image.
        width (int): The width of the image.

    Returns:
        np.ndarray: A numpy array containing grid coordinates and strides.
    """

    all_coordinates = []

    for stride in strides:
        # Calculate the grid height and width
        grid_height = height // stride
        grid_width = width // stride

        # Generate grid coordinates
        g1, g0 = np.meshgrid(np.arange(grid_height), np.arange(grid_width), indexing='ij')

        # Create an array of strides
        s = np.full((grid_height, grid_width), stride)

        # Stack the coordinates along with the stride
        coordinates = np.stack((g0.flatten(), g1.flatten(), s.flatten()), axis=-1)

        # Append to the list
        all_coordinates.append(coordinates)

    # Concatenate all arrays in the list along the first dimension
    output_grids = np.concatenate(all_coordinates, axis=0)

    return output_grids

#### Define a function to calculate bounding boxes and probabilities

In [9]:
def calculate_boxes_and_probs(model_output:np.ndarray, output_grids:np.ndarray) -> np.ndarray:
    """
    Calculate the bounding boxes and their probabilities.

    Parameters:
    model_output (numpy.ndarray): The output of the model.
    output_grids (numpy.ndarray): The output grids.

    Returns:
    numpy.ndarray: The array containing the bounding box coordinates, class labels, and maximum probabilities.
    """
    # Calculate the bounding box coordinates
    box_centroids = (model_output[..., :2] + output_grids[..., :2]) * output_grids[..., 2:]
    box_sizes = np.exp(model_output[..., 2:4]) * output_grids[..., 2:]

    x0, y0 = [t.squeeze(axis=2) for t in np.split(box_centroids - box_sizes / 2, 2, axis=2)]
    w, h = [t.squeeze(axis=2) for t in np.split(box_sizes, 2, axis=2)]

    # Calculate the probabilities for each class
    box_objectness = model_output[..., 4]
    box_cls_scores = model_output[..., 5:]
    box_probs = np.expand_dims(box_objectness, -1) * box_cls_scores

    # Get the maximum probability and corresponding class for each proposal
    max_probs = np.max(box_probs, axis=-1)
    labels = np.argmax(box_probs, axis=-1)

    return np.array([x0, y0, w, h, labels, max_probs]).transpose((1, 2, 0))

#### Define a function to extract object proposals from the raw model output

In [10]:
def process_outputs(outputs, input_dims, bbox_conf_thresh):
    # Process the model output
    outputs = calculate_boxes_and_probs(outputs, generate_output_grids_np(*input_dims))
    # Filter the proposals based on the confidence threshold
    max_probs = outputs[:, :, -1]
    mask = max_probs > bbox_conf_thresh
    proposals = outputs[mask]
    # Sort the proposals by probability in descending order
    proposals = proposals[proposals[..., -1].argsort()][::-1]
    return proposals

#### Define a function to calculate the intersection-over-union

In [11]:
def calc_iou(proposals:np.ndarray) -> np.ndarray:
    """
    Calculates the Intersection over Union (IoU) for all pairs of bounding boxes (x,y,w,h) in 'proposals'.

    The IoU is a measure of overlap between two bounding boxes. It is calculated as the area of
    intersection divided by the area of union of the two boxes.

    Parameters:
    proposals (2D np.array): A NumPy array of bounding boxes, where each box is an array [x, y, width, height].

    Returns:
    iou (2D np.array): The IoU matrix where each element i,j represents the IoU of boxes i and j.
    """

    # Calculate coordinates for the intersection rectangles
    x1 = np.maximum(proposals[:, 0], proposals[:, 0][:, None])
    y1 = np.maximum(proposals[:, 1], proposals[:, 1][:, None])
    x2 = np.minimum(proposals[:, 0] + proposals[:, 2], (proposals[:, 0] + proposals[:, 2])[:, None])
    y2 = np.minimum(proposals[:, 1] + proposals[:, 3], (proposals[:, 1] + proposals[:, 3])[:, None])
    
    # Calculate intersection areas
    intersections = np.maximum(x2 - x1, 0) * np.maximum(y2 - y1, 0)

    # Calculate union areas
    areas = proposals[:, 2] * proposals[:, 3]
    unions = areas[:, None] + areas - intersections

    # Calculate IoUs
    iou = intersections / unions

    # Return the iou matrix
    return iou

#### Define a function to filter bounding box proposals using Non-Maximum Suppression

In [12]:
def nms_sorted_boxes(iou:np.ndarray, iou_thresh:float=0.45) -> np.ndarray:
    """
    Applies non-maximum suppression (NMS) to sorted bounding boxes.

    It suppresses boxes that have high overlap (as defined by the IoU threshold) with a box that 
    has a higher score.

    Parameters:
    iou (np.ndarray): An IoU matrix where each element i,j represents the IoU of boxes i and j.
    iou_thresh (float): The IoU threshold for suppression. Boxes with IoU > iou_thresh are suppressed.

    Returns:
    keep (np.ndarray): The indices of the boxes to keep after applying NMS.
    """

    # Create a boolean mask to keep track of boxes
    mask = np.ones(iou.shape[0], dtype=bool)

    # Apply non-max suppression
    for i in range(iou.shape[0]):
        if mask[i]:
            # Suppress boxes with higher index and IoU > threshold
            mask[(iou[i] > iou_thresh) & (np.arange(iou.shape[0]) > i)] = False

    # Return the indices of the boxes to keep
    return np.arange(iou.shape[0])[mask]

### Define a Function to Annotate Images with Bounding Boxes

In [13]:
def draw_bboxes_pil(image, boxes, labels, colors, font, width:int=2, font_size:int=18, probs=None):
    """
    Annotates an image with bounding boxes, labels, and optional probability scores.

    This function draws bounding boxes on the provided image using the given box coordinates, 
    colors, and labels. If probabilities are provided, they will be added to the labels.

    Parameters:
    image (PIL.Image): The input image on which annotations will be drawn.
    boxes (list of tuples): A list of bounding box coordinates where each tuple is (x, y, w, h).
    labels (list of str): A list of labels corresponding to each bounding box.
    colors (list of str): A list of colors for each bounding box and its corresponding label.
    font (str): Path to the font file to be used for displaying the labels.
    width (int, optional): Width of the bounding box lines. Defaults to 2.
    font_size (int, optional): Size of the font for the labels. Defaults to 25.
    probs (list of float, optional): A list of probability scores corresponding to each label. Defaults to None.

    Returns:
    annotated_image (PIL.Image): The image annotated with bounding boxes, labels, and optional probability scores.
    """
    
    # Define a reference diagonal
    REFERENCE_DIAGONAL = 1000
    
    # Scale the font size using the hypotenuse of the image
    font_size = int(font_size * (np.hypot(*image.size) / REFERENCE_DIAGONAL))
    
    # Add probability scores to labels
    if probs is not None:
        labels = [f"{label}: {prob*100:.2f}%" for label, prob in zip(labels, probs)]
    
    # Create a copy of the image
    annotated_image = image.copy()

    # Create an ImageDraw object for drawing on the image
    draw = ImageDraw.Draw(annotated_image)

    # Loop through the bounding boxes and labels in the 'annotation' DataFrame
    for i in range(len(labels)):
        # Get the bounding box coordinates
        x, y, w, h = boxes[i]

        # Create a tuple of coordinates for the bounding box
        shape = (x, y, x+w, y+h)

        # Draw the bounding box on the image
        draw.rectangle(shape, outline=colors[i], width=width)
        
        # Load the font file
        fnt = ImageFont.truetype(font, font_size)
        
        # Draw the label box on the image
        label_w, label_h = draw.textbbox(xy=(0,0), text=labels[i], font=fnt)[2:]
        draw.rectangle((x, y-label_h, x+label_w, y), outline=colors[i], fill=colors[i], width=width)

        # Draw the label on the image
        draw.multiline_text((x, y-label_h), labels[i], font=fnt, fill='black' if np.mean(colors[i]) > 127.5 else 'white')
        
    return annotated_image

## Performing Inference with ONNX Runtime

### Create an Inference Session

In [14]:
# Get a filename for the ONNX model
onnx_file_path = list(checkpoint_dir.glob('*.onnx'))[0]

In [15]:
# Load the model and create an InferenceSession
providers = [
    'CPUExecutionProvider',
    # "CUDAExecutionProvider",
]
sess_options = ort.SessionOptions()
session = ort.InferenceSession(onnx_file_path, sess_options=sess_options, providers=providers)

### Select a Test Video

In [16]:
video_dir = "./videos/"
test_video_name = "pexels-rodnae-productions-10373924.mp4"
# test_video_name = "cars_on_highway.mp4"
video_path = f"{video_dir}{test_video_name}"

test_video_url = f"https://huggingface.co/datasets/cj-mills/pexels-object-tracking-test-videos/resolve/main/{test_video_name}"

download_file(test_video_url, video_dir, False)

Video(video_path)

The file ./videos/pexels-rodnae-productions-10373924.mp4 already exists and overwrite is set to False.


### Initialize a `VideoCapture` Object

In [17]:
video_capture = cv2.VideoCapture(video_path)

frame_width = int(video_capture.get(3))
frame_height = int(video_capture.get(4))
frame_fps = int(video_capture.get(5))
frames = int(video_capture.get(cv2.CAP_PROP_FRAME_COUNT))

pd.Series({
    "Frame Width:": frame_width,
    "Frame Height:": frame_height,
    "Frame FPS:": frame_fps,
    "Frames:": frames
}).to_frame().style.hide(axis='columns')

0,1
Frame Width:,1080
Frame Height:,1920
Frame FPS:,29
Frames:,226


### Initialize a `VideoWriter` Object

In [18]:
video_out_path = f"{(video_dir)}{Path(video_path).stem}-deep-sort.mp4"
video_writer = cv2.VideoWriter(video_out_path, cv2.VideoWriter_fourcc(*'mp4v'), frame_fps, (frame_width, frame_height))

### Define Inference Parameters

In [19]:
test_sz = 288
# test_sz = 384
bbox_conf_thresh = 0.5
iou_thresh = 0.45

### Initialize a Tracker

In [20]:
pd.DataFrame(EMBEDDER_CHOICES)

Unnamed: 0,0
0,mobilenet
1,torchreid
2,clip_RN50
3,clip_RN101
4,clip_RN50x4
5,clip_RN50x16
6,clip_ViT-B/32
7,clip_ViT-B/16


In [21]:
tracker = DeepSort(max_age=15, embedder=EMBEDDER_CHOICES[0], half=True, embedder_gpu=True, bgr=False)

### Detect, Track, and Annotate Objects in Video Frames

In [22]:
tracker.delete_all_tracks()

with tqdm(total=frames, desc="Processing frames") as pbar:
    while video_capture.isOpened():
        ret, frame = video_capture.read()
        if ret:

            # Prepare an input image for inference
            rgb_img, input_dims, offsets, min_img_scale, input_img = prepare_image_for_inference(frame, test_sz, max_stride)
                        
            # Convert the existing input image to NumPy format
            input_tensor_np = np.array(input_img, dtype=np.float32).transpose((2, 0, 1))[None]/255

            # Start performance counter
            start_time = time.perf_counter()
                        
            # Run inference
            outputs = session.run(None, {"input": input_tensor_np})[0]

            # Process the model output
            proposals = process_outputs(outputs, input_tensor_np.shape[input_dim_slice], bbox_conf_thresh)
            
            # Apply non-max suppression to the proposals with the specified threshold
            proposal_indices = nms_sorted_boxes(calc_iou(proposals[:, :-2]), iou_thresh)
            proposals = proposals[proposal_indices]
            
            bbox_list = proposals[:,:4]
            # bbox_list = (proposals[:,:4]+[*offsets, 0, 0])*min_img_scale
            label_list = [class_names[int(idx)] for idx in proposals[:,4]]
            probs_list = proposals[:,5]

            detections = [(box, prob, label) for box, prob, label in zip(bbox_list, probs_list, label_list)]
            
            # Update tracker with detections.
            tracks = tracker.update_tracks(detections, frame=np.array(input_img))

            # End performance counter
            end_time = time.perf_counter()
            # Calculate the combined FPS for object detection and tracking
            fps = 1 / (end_time - start_time)
            # Display the frame rate in the progress bar
            pbar.set_postfix(fps=fps)
            
            bbox_list = (np.array([track.to_tlwh() for track in tracks])+[*offsets, 0, 0])*min_img_scale
            label_list = [track.det_class for track in tracks]
            probs_list = [track.det_conf if track.det_conf != None else 0 for track in tracks]

            # Annotate the current frame with bounding boxes and tracking IDs
            annotated_img = draw_bboxes_pil(
                image=rgb_img, 
                boxes=bbox_list, 
                labels=[f"{track.track_id}-{track.det_class}" for track in tracks],
                probs=probs_list,
                colors=[int_colors[class_names.index(i)] for i in label_list], 
                font=font_file
            )
            annotated_frame = cv2.cvtColor(np.array(annotated_img), cv2.COLOR_RGB2BGR)
            
            video_writer.write(annotated_frame)
            pbar.update(1)
        else:
            break
video_capture.release()
video_writer.release()

Processing frames:   0%|          | 0/226 [00:00<?, ?it/s]