# Preliminary


In this section, we install and import all the necessary libraries and components required for the project. Additionally, we define the directory paths and configuration flags to ensure proper file handling and smooth execution of the pipeline.


## Imports


In [1]:
!pip install -r requirements.txt
!git clone https://github.com/WongKinYiu/yolov7.git

fatal: destination path 'yolov7' already exists and is not an empty directory.


In [2]:
cd yolov7/

/Users/jan/Documents/code/cv/project/yolov7


  self.shell.db['dhist'] = compress_dhist(dhist)[-100:]


In [3]:
import cv2
import torch
import numpy as np

from pathlib import Path
from tqdm import tqdm
from models.yolo import Model
from utils.torch_utils import select_device

## Directories and Configuration


For the live demo, we selected a test video from the [LifeCLEF 2015](https://www.imageclef.org/lifeclef/2015/fish) Fish Dataset, which we saved in the ./detect/video_in directory. Other configuration files and different YOLO weights can be found in ./yolo_files.

In [4]:
# Configuration Flags
SAVE_ORIGINAL = False  # Flag to save original frames
TRAIN = True  # Flag to switch between creating training images and creating validation images

# Base directory setup
BASE_DIR = Path("/home/jan/Documents/code/CV-Fish-Abundance")

# Training set directories
VIDEO_DIR_IN = BASE_DIR / "detect/video_in"
VIDEO_DIR_OUT = BASE_DIR / "detect/video_out"

# Path to YOLO model
MODEL_DIR = BASE_DIR / "yolo_files/weights/v7_640_best.pt"

# Frame processing parameters
FRAME_RESIZE = (640, 640)

# Optical flow parameters
FARNEBACK_PARAMS = {
    "pyr_scale": 0.95,
    "levels": 10,
    "winsize": 15,
    "iterations": 3,
    "poly_n": 5,
    "poly_sigma": 1.2,
    "flags": 0,
}

# Opacity parameters
OPACITY_FOREGROUND = 0.5
OPACITY_OPTICAL_FLOW = 0.5

---


# Image pipeline


In this section, we create the combined image by merging three sources: the Gaussian Mixture Model (GMM) output, optical flow, and the grayscale version of the original frame.


In [5]:
def adjust_gamma(image, gamma=1.0):
    """
    Adjusts the gamma of an image.

    Args:
        image (np.ndarray): Input image.
        gamma (float): Gamma value to adjust (default is 1.0).

    Returns:
        np.ndarray: Gamma adjusted image.
    """
    invGamma = 1.0 / gamma
    table = np.array([(i / 255.0) ** invGamma * 255 for i in range(256)], dtype="uint8")
    return cv2.LUT(image, table)

In [6]:
def apply_gmm(frame, foreground_detector):
    """
    Applies GMM (Gaussian Mixture Model) to detect foreground objects in a frame.

    Args:
        frame (np.ndarray): Input frame.
        foreground_detector (cv2.BackgroundSubtractorMOG2): Foreground detector.

    Returns:
        np.ndarray: Filtered foreground mask.
    """
    foreground = foreground_detector.apply(frame)
    filtered_foreground = cv2.morphologyEx(
        foreground, cv2.MORPH_OPEN, cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3))
    )
    filtered_foreground = cv2.morphologyEx(
        filtered_foreground,
        cv2.MORPH_CLOSE,
        cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)),
    )

    # Shadow Removal: Convert shadows to binary foreground
    _, filtered_foreground = cv2.threshold(
        filtered_foreground, 127, 255, cv2.THRESH_BINARY
    )

    return filtered_foreground

In [7]:
def apply_optical_flow(frame, prvs, hsv, farneback, frame_size):
    """
    Computes optical flow using Farneback method and visualizes it in HSV space.

    Args:
        frame (np.ndarray): Input frame.
        prvs (np.ndarray): Previous frame in grayscale.
        hsv (np.ndarray): HSV image used for optical flow visualization.
        farneback (dict): Parameters for the Farneback optical flow algorithm.
        frame_size (tuple): Resized image size.

    Returns:
        tuple: Tuple containing resized BGR image of the flow and next grayscale frame.
    """
    next_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    flow = cv2.calcOpticalFlowFarneback(prvs, next_frame, None, **farneback)
    mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])
    hsv[..., 0] = ang * 180 / np.pi / 2
    hsv[..., 2] = cv2.normalize(mag, None, 0, 255, cv2.NORM_MINMAX)
    bgr = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)
    bgr_resized = cv2.resize(bgr, frame_size)

    return bgr_resized, next_frame

In [8]:
def apply_combination(
    frame,
    filtered_foreground,
    bgr_resized,
    opacity_foreground=0.5,
    opacity_optical_flow=0.5,
):
    """
    Combines the results of GMM and optical flow with opacity blending, and saves the combined image and annotations.

    Args:
        frame (np.ndarray): Original frame.
        filtered_foreground (np.ndarray): Foreground mask obtained from GMM.
        bgr_resized (np.ndarray): Optical flow visualization in BGR format.
        opacity_foreground (float): Opacity for filtered foreground mask (0 to 1).
        opacity_optical_flow (float): Opacity for optical flow visualization (0 to 1).
    """

    combined_frame = np.zeros_like(frame)
    grayscale_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    filtered_foreground_normalized = cv2.normalize(
        filtered_foreground, None, 0, 255, cv2.NORM_MINMAX
    )
    blended_foreground = cv2.addWeighted(
        grayscale_frame,
        1 - opacity_foreground,
        filtered_foreground_normalized,
        opacity_foreground,
        0,
    )
    blue_channel_optical_flow = bgr_resized[:, :, 0]
    blended_optical_flow = cv2.addWeighted(
        grayscale_frame,
        1 - opacity_optical_flow,
        blue_channel_optical_flow,
        opacity_optical_flow,
        0,
    )

    combined_frame[:, :, 0] = grayscale_frame  # Grayscale frame
    combined_frame[:, :, 1] = blended_foreground  # filtered foreground
    combined_frame[:, :, 2] = blended_optical_flow  # Blended optical flow

    return combined_frame

In [9]:
def process_frame(
    frame,
    frame1,
    foreground_detector,
    prvs,
    hsv,
    farneback,
    frame_size,
    opacity_foreground,
    opacity_optical_flow,
):
    """
    Processes a single video frame by applying background subtraction (GMM) and optical flow,
    and then combines the results. Optionally saves the original frame, and stores the combined
    output along with ground truth annotations.

    Args:
        frame (numpy.ndarray): The current video frame after resizing and gamma adjustment.
        frame1 (numpy.ndarray): The next video frame to compute optical flow.
        foreground_detector (cv2.BackgroundSubtractor): Foreground detector based on GMM.
        prvs (numpy.ndarray): The previous grayscale frame used for optical flow calculation.
        hsv (numpy.ndarray): The HSV image used for visualizing optical flow.
        farneback (dict): Parameters for the Farneback optical flow algorithm.
        frame_size (tuple): Resized image size
        opacity_foreground (float): Opacity for filtered foreground mask (0 to 1).
        opacity_optical_flow (float): Opacity for optical flow visualization (0 to 1).

    Returns:
        next_frame (numpy.ndarray): The grayscale version of the current frame (frame1) for use in the next iteration of optical flow calculation.
    """

    # Apply GMM to the frame to detect foreground objects
    foreground = apply_gmm(frame, foreground_detector)

    # Apply optical flow to the next frame
    bgr, next_frame = apply_optical_flow(frame1, prvs, hsv, farneback, frame_size)

    # Combine GMM and optical flow results and save the combined image
    combined_frame = apply_combination(
        frame,
        foreground,
        bgr,
        opacity_foreground,
        opacity_optical_flow,
    )

    return next_frame, combined_frame

## Video detection


In this section, each video frame is processed through our trained YOLO model. The model detects fish and generates bounding boxes around them, which are then drawn onto the original frame. These annotated frames are sequentially saved to produce a new video file that includes the detection results, providing a visual output of the model’s performance on the input video.


In [10]:
def create_model(path_or_model, autoshape=True):
    """Custom model loading function.

    Arguments:
        path_or_model (str): Path to the model file (e.g., 'path/to/model.pt').
        path_or_model (dict): Loaded model dictionary (torch.load('path/to/model.pt')).
        path_or_model (nn.Module): Pre-loaded model (torch.load('path/to/model.pt')['model']).

    Returns:
        hub_model (nn.Module): PyTorch model.
    """
    
    if isinstance(path_or_model, str):  # If it's a path, load the model
        model = torch.load(path_or_model, map_location=torch.device('cpu'))
    else:  # Otherwise, treat it as a loaded model or dictionary
        model = path_or_model

    if isinstance(model, dict):
        model = model['ema' if model.get('ema') else 'model']  # Use 'ema' if available, otherwise use 'model'

    try:
        # If 'yaml' exists, use it
        hub_model = Model(model.yaml).to(next(model.parameters()).device)
        hub_model.load_state_dict(model.float().state_dict())  # Load weights
        hub_model.names = model.names  # Copy class names
    except AttributeError:
        print("Warning: 'yaml' attribute not found. Loading model without YAML configuration.")
        # Manually specify model configuration (channels, number of classes, etc.)
        # Example: Assuming 3 input channels (RGB) and 80 classes (COCO dataset)
        hub_model = Model(ch=3, nc=80).to(next(model.parameters()).device)
        hub_model.load_state_dict(model.float().state_dict())  # Load weights
        hub_model.names = model.names  # Copy class names

    if autoshape:
        hub_model = hub_model.autoshape()

    device = select_device('0' if torch.cuda.is_available() else 'cpu')
    return hub_model.to(device)  # Move the model to the selected device

In [11]:
def process_video(
    video_path,
    farneback,
    frame_size,
    out_dir,
    opacity_foreground,
    opacity_optical_flow,
):
    """
    Processes a video by applying background subtraction (using Gaussian Mixture Model), optical flow, and frame adjustments, and saves the processed frames and combined results along with ground truth annotations.

    Args:
        video_path (Path): Path to the input video file.
        farneback (dict): Parameters for the Farneback optical flow algorithm.
        frame_size (tuple): Resized image size (width, height).
        out_dir (Path): Path to save the output video.
        opacity_foreground (float): Opacity for filtered foreground mask (0 to 1).
        opacity_optical_flow (float): Opacity for optical flow visualization (0 to 1).

    Returns:
        None: The function processes the video, saves results, and does not return anything.
    """

    # Load YOLOv7 model
    model = create_model(path_or_model=MODEL_DIR)

    video_name_short = video_path.stem[-15:]
    output_path = out_dir / (video_name_short + ".mp4")

    # Open the video file
    cap = cv2.VideoCapture(str(video_path))
    if not cap.isOpened():
        print(f"Error: Could not open video file: {video_path}")
        return

    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    fps = cap.get(cv2.CAP_PROP_FPS) or 30  # Default to 30 if FPS is invalid

    foreground_detector = cv2.createBackgroundSubtractorMOG2(
        history=250, varThreshold=16, detectShadows=True
    )

    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    out = cv2.VideoWriter(output_path, fourcc, fps, (frame_size[0], frame_size[1]))

    if not out.isOpened():
        print("Error: Could not open VideoWriter.")
        cap.release()
        return

    try:
        ret, frame1 = cap.read()
        if not ret:
            print(f"Failed to read the video file: {video_name_short}")
            return

        # Convert first frame to grayscale for optical flow
        prvs = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY)
        hsv = np.zeros_like(frame1)
        hsv[..., 1] = 255

        frame_idx = 0

        # Process each frame of the video
        with tqdm(
            total=total_frames, desc=f"Processing {video_name_short}"
        ) as video_pbar:
            while ret:
                # Resize and adjust gamma for the current frame
                frame = cv2.resize(frame1, frame_size)
                original_resized_frame = frame
                frame = adjust_gamma(frame, 1.5)
                frame_blurred = cv2.GaussianBlur(frame, (5, 5), 0)

                # Process the current frame (optical flow and foreground detection)
                next_frame, combined_frame = process_frame(
                    frame_blurred,
                    frame1,
                    foreground_detector,
                    prvs,
                    hsv,
                    farneback,
                    frame_size,
                    opacity_foreground,
                    opacity_optical_flow,
                )

                results = model(combined_frame)

                # Draw bounding boxes and labels on the frame
                labels, cords = results.xyxyn[0][:, -1], results.xyxyn[0][:, :-1]
                
                n = len(labels)
                for i in range(n):
                    row = cords[i]
                    if row[4] >= 0.5:  # Confidence threshold
                        x1, y1, x2, y2 = (
                            int(row[0] * original_resized_frame.shape[1]),
                            int(row[1] * original_resized_frame.shape[0]),
                            int(row[2] * original_resized_frame.shape[1]),
                            int(row[3] * original_resized_frame.shape[0]),
                        )
                        # Draw a rectangle around the object
                        cv2.rectangle(
                            original_resized_frame, (x1, y1), (x2, y2), (0, 255, 0), 2
                        )

                        # Put label on the detected object
                        label = f"{model.names[int(labels[i])]}: {row[4]:.2f}"
                        cv2.putText(
                            original_resized_frame,
                            label,
                            (x1, y1 - 10),
                            cv2.FONT_HERSHEY_SIMPLEX,
                            0.9,
                            (255, 0, 0),
                            2,
                        )

                # Update progress bar and move to next frame
                video_pbar.update(1)
                prvs = next_frame
                ret, frame1 = cap.read()
                frame_idx += 1
                
                # Write the processed frame to the output video
                out.write(original_resized_frame)

    #except Exception as e:
    #    print(f"Error processing video {e}")

    finally:

        # Release video capture and writer resources
        cap.release()
        out.release()

In [12]:
"""
Main entry point of the script. Processes either training or test videos.
"""

video_files = list(VIDEO_DIR_IN.glob("*.flv"))

for video in video_files:
    process_video(
        video,
        FARNEBACK_PARAMS,
        FRAME_RESIZE,
        VIDEO_DIR_OUT,
        OPACITY_FOREGROUND,
        OPACITY_OPTICAL_FLOW,
    )