# Global Configs

# Drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

#config

In [None]:
# Imports
from pathlib import Path
import torch
import os

# Global Paths
ROOT_DIR  = Path("/content/drive/MyDrive/Pós-Graduações/Computer_Vision_Master/Materias/DOIV/trabalho_final")

# Yolov8 paths
DATASET_DIR_YOLO = ROOT_DIR / "dataset_marinedebris_yolov8"
DATASET_YAML = DATASET_DIR_YOLO / "data.yaml"
# Models
MODEL_NAME_YOLO = "yolov8n.pt"
MODEL_NAME_YOLO_FINAL1 = ROOT_DIR / "yolov8n_marinedebris_best_final_1.pt"
WEIGHTS_YOLOV8_BASELINE = ROOT_DIR / "yolov8n_marinedebris_baseline.pt"
WEIGHTS_YOLOV8_BASELINE_ADJUSTED = ROOT_DIR / "yolov8n_marinedebris_baseline_adjusted.pt"
WEIGHTS_YOLOV8_BEST = ROOT_DIR / "yolov8n_marinedebris_best_final.pt"
# Final Model Names
MODEL_NAME_YOLO_FINAL_TUNNED = "yolov8n_marinedebris_best_final_2.pt"
MODEL_NAME_YOLO_FINAL_PARAMS = ROOT_DIR / "best_params_used_final_model_2.csv"
MODEL_NAME_YOLO_FINAL_BASELINE_TUNNED = "yolov8n_marinedebris_best_baseline_tunned.pt"  # Best model
MODEL_NAME_YOLO_FINAL_BASELINE_PARAMS = ROOT_DIR / "best_params_used_baseline_tunned.csv"
# Optmizer
OPTIMIZER_RESULTS = ROOT_DIR / "optuna_results.csv"

# Config Device
DEVICE = (
          "cuda"
          if torch.cuda.is_available()
          else "mps"
          if torch.backends.mps.is_available()
          else "cpu"
          )

print(f"GPU is available? {torch.cuda.is_available()}")

print(f"Using {DEVICE} device")

# InferenceTest Prediction
TEST_IMAGE1 = ROOT_DIR / "test_image_marinedebris1.jpg"
TEST_IMAGE2 = ROOT_DIR / "test_image_marinedebris2.jpg"
TEST_IMAGE3 = ROOT_DIR / "test_image_marinedebris3.jpg"
TEST_IMAGE4 = ROOT_DIR / "test_image_marinedebris4.jpg"
TEST_IMAGE5 = ROOT_DIR / "test_image_marinedebris5.png"
TEST_IMAGE6 = ROOT_DIR / "test_image_marinedebris6.png"
TEST_VIDEO = ROOT_DIR / "marine-debris-polution.mp4"

In [None]:
# # RUN ONLY FOR CLEANING THE MEMORY
# # Cleaning GPU memory
# import numba
# from numba import cuda

# device = cuda.get_current_device()
# device.reset()

# # CPU
# import gc
# gc.collect()

# Pre Processing Datasets

## preprocessing_module

In [None]:
# imports
from pathlib import Path
from collections import Counter, defaultdict
import yaml
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import random

# Classes
class PreProcessorYoloV8:
    """
    Utility class for inspecting and visualizing YOLOv8 datasets.

    This class provides tools to:
    - Load class definitions from a YOLO data.yaml file
    - Count object instances per class and dataset split
    - Visualize class distribution across splits
    - Plot example images with YOLO bounding box annotations

    It assumes a standard YOLO directory structure:
    data_path/
        train/
            images/
            labels/
        valid/
            images/
            labels/
        test/
            images/
            labels/
    """
    def __init__(self, model_name, data_path, yaml_path):
        """
        Initialize the YOLOv8 dataset preprocessor.

        Parameters
        ----------
        model_name : str
            Name or identifier of the YOLO model (e.g., 'yolov8n', 'yolov8m').
            Stored for reference and logging purposes.
        data_path : pathlib.Path
            Root directory of the YOLO dataset.
        yaml_path : pathlib.Path
            Path to the YOLO data.yaml file containing class definitions.
        """

        self.model_name = model_name
        self.data_path = data_path
        self.yaml_path = yaml_path
        self.class_map = self._load_classes()  # for yolov8
        self.results = None


    def _load_classes(self) -> dict:
        """
        Load class names from the YOLO data.yaml file.

        Returns
        -------
        dict
            Mapping from class index (int) to class name (str).

        Raises
        ------
        TypeError
            If the 'names' field in the YAML file is not a list.
        """

        with open(self.yaml_path, "r") as f:
            data = yaml.safe_load(f)

        names = data["names"]

        if isinstance(names, list):
            return {i: name for i, name in enumerate(names)}

        raise TypeError("Formato inválido para 'names' no data.yaml")

    def _count_split(self, split: str) -> Counter:
        """
        Count object instances per class for a given dataset split.

        Parameters
        ----------
        split : str
            Dataset split name ('train', 'valid', or 'test').

        Returns
        -------
        collections.Counter
            Counter mapping class_id (int) to number of objects.
        """

        labels_path = self.data_path / split / "labels"
        counter = Counter()

        for label_file in labels_path.glob("*.txt"):
            with open(label_file) as f:
                for line in f:
                    class_id = int(line.split()[0])
                    counter[class_id] += 1

        return counter


    def count_all(self) -> dict:
        """
        Count object instances per class for all dataset splits.

        The results are stored internally and returned as a dictionary
        indexed by split name and class name.

        Returns
        -------
        dict
            Nested dictionary of the form:
            {
                'train': {'class_name': count, ...},
                'valid': {'class_name': count, ...},
                'test':  {'class_name': count, ...}
            }
        """

        self.results = {}

        for split in ["train", "valid", "test"]:
            split_counter = self._count_split(split)

            self.results[split] = {
                                    self.class_map[class_id]: count
                                    for class_id, count in split_counter.items()
                                    }

        return self.results


    def _autolabel(self, bars, values, total):
        """
        Attach percentage labels above bar plots.

        Parameters
        ----------
        bars : matplotlib.container.BarContainer
            Bars returned by matplotlib's bar() function.
        values : iterable
            Numerical values corresponding to each bar.
        total : float
            Total value used to compute percentages.
        """

        for bar, v in zip(bars, values):
            if v == 0:
                continue

            pct = 100 * v / total

            plt.text(
                     bar.get_x() + bar.get_width() / 2,
                     bar.get_height(),
                     f"{pct:.1f}%",
                     ha="center",
                     va="bottom",
                     fontsize=8
                     )


    def classes_show(self):
        """
        Plot class distribution per dataset split.

        Displays a grouped bar chart showing the number and percentage
        of objects per class for train, validation, and test splits.

        Raises
        ------
        RuntimeError
            If count_all() has not been executed beforehand.
        """

        if self.results is None:
            raise RuntimeError("Execute count_all() antes de chamar classes_show().")

        df = pd.DataFrame(self.results).fillna(0)

        classes = df.index
        x = np.arange(len(classes))
        width = 0.25
        totals = df.sum(axis=0)

        # Plot
        plt.figure(figsize=(10, 5))

        bars_train = plt.bar(x - width, df["train"], width, label="Train")
        bars_val   = plt.bar(x,         df["valid"], width, label="Val")
        bars_test  = plt.bar(x + width, df["test"],  width, label="Test")

        self._autolabel(bars_train, df["train"], totals["train"])
        self._autolabel(bars_val,   df["valid"], totals["valid"])
        self._autolabel(bars_test,  df["test"],  totals["test"])

        plt.xticks(x, classes, rotation=45)  # type: ignore
        plt.ylabel("Number of objects")
        plt.title("YOLO class distribution (%) per split")
        plt.legend()
        plt.tight_layout()
        plt.show()


    def plot_class_examples(self, split: str = "train"):
        """
        Plot one example image per class with YOLO bounding boxes.

        For each class, a random labeled image is selected and all
        bounding boxes are drawn. The target class is highlighted.

        Parameters
        ----------
        split : str, optional
            Dataset split to visualize ('train', 'valid', or 'test'),
            by default 'train'.
        """

        images_path = self.data_path / split / "images"
        labels_path = self.data_path / split / "labels"

        # Map: class_id -> label file
        class_examples = {}

        # Find one image per class
        for label_file in labels_path.glob("*.txt"):
            with open(label_file) as f:
                lines = f.readlines()

            for line in lines:
                class_id = int(line.split()[0])

                class_examples.setdefault(class_id, []).append(label_file)

        class_examples = {
                          cid: random.choice(files)
                          for cid, files in class_examples.items()
                         }

        n_classes = len(class_examples)
        ncols = min(4, n_classes)
        nrows = int(np.ceil(n_classes / ncols))

        fig, axes = plt.subplots(nrows, ncols, figsize=(4 * ncols, 4 * nrows))
        axes = np.array(axes).reshape(-1)

        for ax, (class_id, label_file) in zip(axes, class_examples.items()):
            image_file = images_path / (label_file.stem + ".jpg")

            if not image_file.exists():
                image_file = images_path / (label_file.stem + ".png")

            img = plt.imread(image_file)
            h, w = img.shape[:2]

            ax.imshow(img)
            ax.axis("off")

            with open(label_file) as f:
                for line in f:
                    cid, xc, yc, bw, bh = map(float, line.split())

                    # YOLO → pixel coords
                    xmin = (xc - bw / 2) * w
                    ymin = (yc - bh / 2) * h
                    xmax = (xc + bw / 2) * w
                    ymax = (yc + bh / 2) * h

                    color = "blue" if int(cid) == class_id else "lime"

                    rect = plt.Rectangle(              # type: ignore
                                         (xmin, ymin),
                                         xmax - xmin,
                                         ymax - ymin,
                                         fill=False,
                                         color=color,
                                         linewidth=2
                                         )
                    ax.add_patch(rect)

            class_name = self.class_map[class_id]
            ax.set_title(class_name, fontsize=12, color="blue")

        # Remove empty axes
        for ax in axes[len(class_examples):]:
            ax.axis("off")

        plt.tight_layout()
        plt.show()


## Preprocessor Main

In [None]:
def main():
    prep = PreProcessorYoloV8(
                              model_name="yolov8",
                              data_path=DATASET_DIR_YOLO,
                              yaml_path=DATASET_YAML
                              )

    counts = prep.count_all()

    for split, classes in counts.items():
        print(f"\n{split.upper()}")

        for cls, n in classes.items():
            print(f"{cls}: {n}")

    prep.classes_show()

    prep.plot_class_examples()


if __name__ == "__main__":
    main()

# ModelYoloV8

## Install ultralytics library

In [None]:
!pip install -q ultralytics
!pip install -q optuna

## yolov8_module

In [None]:
# Imports
from ultralytics import YOLO
import shutil
from pathlib import Path

# Classes
class ModelYoloV8():
    """
    Wrapper class for training, evaluating, and saving YOLOv8 models.

    This class provides a thin abstraction over the Ultralytics YOLO API,
    exposing common workflows such as training, validation, metric extraction,
    and saving the best model weights.
    """

    # Atributes
    def __init__(self, model_name: str = MODEL_NAME_YOLO):
        """
        Initialize a YOLOv8 model.

        Parameters
        ----------
        model_name : str, optional
            Name or path of the YOLOv8 model to load (e.g., 'yolov8n.pt',
            'yolov8m.pt'). Defaults to MODEL_NAME_YOLO.
        """

        self.model = YOLO(model_name)
        self.model_name = model_name
        self.metrics = None


    # Methods
    def fit(self, **kwargs):
        """
        Train the YOLOv8 model.

        This method is a direct wrapper around ``YOLO.train`` and forwards
        all keyword arguments to the underlying Ultralytics API.

        Parameters
        ----------
        **kwargs
            Keyword arguments supported by ``YOLO.train`` (e.g., data, epochs,
            imgsz, batch, device, optimizer).

        Returns
        -------
        object
            Training results object returned by ``YOLO.train``.
        """

        return self.model.train(**kwargs)

    def evaluate(self, **kwargs):
        """
        Evaluate the YOLOv8 model on a validation or test dataset.

        This method runs model validation and extracts the most common
        detection metrics related to bounding boxes.

        Parameters
        ----------
        **kwargs
            Keyword arguments supported by ``YOLO.val`` (e.g., data, split,
            imgsz, device).

        Returns
        -------
        dict
            Dictionary containing evaluation metrics:
            - 'map50_95': mean Average Precision at IoU 0.50:0.95
            - 'map50'   : mean Average Precision at IoU 0.50
            - 'map75'   : mean Average Precision at IoU 0.75
            - 'per_class_map': per-class mAP values

        Raises
        ------
        ValueError
            If evaluation fails or expected metrics are unavailable.
        """

        self.metrics = self.model.val(**kwargs)

        if self.metrics is None or not hasattr(self.metrics, "box"):
            raise ValueError("Evaluation failed or metrics are unavailable.")

        return {
                "map50_95": self.metrics.box.map,
                "map50": self.metrics.box.map50,
                "map75": self.metrics.box.map75,
                "per_class_map": self.metrics.box.maps
                }

    def save_model(self, weight_name_model):
        """
        Save the best model weights after training.

        This method copies the best-performing weights (as determined during
        training) to a user-defined location.

        Parameters
        ----------
        weight_name_model : str
            Filename (or relative path) to store the best model weights.

        Raises
        ------
        ValueError
            If training has not been completed or best weights are unavailable.
        """

        if self.model.trainer is None or self.model.trainer.best is None:
            raise ValueError("Model training has not been completed or 'best' weights are unavailable.")

        best = self.model.trainer.best
        shutil.copy(best, ROOT_DIR / weight_name_model)

## tuning_optimizer

In [None]:
# Imports
import torch
import random
import numpy as np
import gc

def set_seed(seed=42):
    """
    Set random seeds for reproducibility.

    This function fixes the random state for Python's built-in random module,
    NumPy, and PyTorch to ensure deterministic behavior across runs.

    Parameters
    ----------
    seed : int, optional
        Random seed value used to initialize all random number generators.
        Defaults to 42.
    """

    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)

def objective(trial):
    """
    Optuna objective function for YOLOv8 hyperparameter optimization.

    This function defines the search space, trains a YOLOv8 model using
    the sampled hyperparameters, evaluates it on the validation split,
    and returns the optimization metric.

    Parameters
    ----------
    trial : optuna.trial.Trial
        Optuna trial object used to sample hyperparameters.

    Returns
    -------
    float
        Validation mAP (IoU 0.50:0.95) used as the optimization objective.
    """

    set_seed()

    params = {
              "lr0": trial.suggest_float("lr0", 1e-4, 1e-2, log=True),
              "lrf": trial.suggest_float("lrf", 0.01, 0.2),
              "weight_decay": trial.suggest_float("weight_decay", 1e-5, 1e-3, log=True),
              "box": trial.suggest_float("box", 7., 10.0),
              "cls": trial.suggest_float("cls", 0.4, 0.9),
              "dfl": trial.suggest_float("dfl", 1.4, 2.0),
              "iou": trial.suggest_float("iou", 0.4, 0.7),
              }

    model = ModelYoloV8(MODEL_NAME_YOLO)

    model.fit(
              data=DATASET_YAML,
              device=DEVICE,
              name=f"optuna_trial_{trial.number}",
              project="runs/optuna",
              imgsz=640,

              epochs=8,
              patience=2,
              batch=32,

              freeze=8,

              optimizer="AdamW",
              warmup_epochs=2,
              warmup_bias_lr=0.1,
              momentum = 0.937,

              verbose=False,
              **params
              )

    model.evaluate(
                   data=DATASET_YAML,
                   device=DEVICE,
                   split="val",
                   )

    value = model.metrics.box.map  # type: ignore

    del model
    gc.collect()
    torch.cuda.empty_cache()
    return value


def load_best_params(csv_path):
    """
    Load the best hyperparameters from an Optuna trials CSV file.

    This function filters completed trials, selects the one with the
    highest objective value, and extracts the corresponding parameters.

    Parameters
    ----------
    csv_path : str or pathlib.Path
        Path to the CSV file exported by Optuna containing trial results.

    Returns
    -------
    dict
        Dictionary containing the best hyperparameters, ready to be passed
        to a YOLOv8 training configuration.
    """

    df = pd.read_csv(csv_path)

    # Best trial
    df = df[df["state"] == "COMPLETE"]
    best_row = df.sort_values("value", ascending=False).iloc[0]

    # Best Params
    best_params = {
                   "lr0":          float(best_row["params_lr0"]),
                   "lrf":          float(best_row["params_lrf"]),
                   "weight_decay": float(best_row["params_weight_decay"]),
                   "box":          float(best_row["params_box"]),
                   "cls":          float(best_row["params_cls"]),
                   "dfl":          float(best_row["params_dfl"]),
                   "iou":          float(best_row["params_iou"]),
                   }

    return best_params

## YoloModelV8 train_baseline

In [None]:
%%time

def main():
    """
    Baseline training and evaluation script for YOLOv8.

    This script trains a YOLOv8 model for marine debris detection using a
    fixed baseline configuration, evaluates the trained model on the test
    split, and saves the best-performing weights to disk.
    """

    model_yolov8 = ModelYoloV8()  # 640x640 Size

    # Training
    trained_yolov8 = model_yolov8.fit(
                                      data=DATASET_YAML,
                                      device=DEVICE,
                                      name="baseline",
                                      project="runs/baseline",
                                      imgsz=640,

                                      batch=32,
                                      epochs=30,
                                      patience=5,
                                      freeze=8,  # 10 -> last block layers of the backbone

                                      # Optimizer
                                      optimizer="AdamW",
                                      warmup_epochs = 5,
                                      warmup_bias_lr=0.1,
                                      momentum = 0.937,

                                      # IOU: The smaller the number, the lower the chance of overlap.
                                      iou = 0.5,  # default = 0.7,

                                      weight_decay = 0.0005,
                                      lr0 = 0.003,
                                      lrf = 0.01,  # lr_final = lr0 * lrf

                                      # Losses
                                      box = 10.,  # default = 7.5
                                      cls = 0.8,  # default = 0.5
                                      dfl = 2.,   # default = 1.5
                                      )

    # Test Metrics
    print("\n" * 5)
    print(f"TEST METRICS")
    metrics_yolov8 = model_yolov8.evaluate(
                                           data=DATASET_YAML,
                                           device=DEVICE,
                                           split="test"
                                           )
    print(metrics_yolov8)

    # Saving model in memory
    model_yolov8.save_model(weight_name_model="yolov8n_marinedebris_baseline.pt")

if __name__ == "__main__":
    main()

## YoloModelV8 tune

In [None]:
%%time

# Imports
import optuna

# Main
def main():
    """
    Run the Optuna optimization pipeline for YOLOv8.

    This function creates an Optuna study, executes hyperparameter
    optimization, reports the best trial, and saves all trial results
    to disk.
    """

    study = optuna.create_study(
                                direction="maximize",
                                study_name="yolov8_marine_debris"
                                )

    study.optimize(
                   objective,
                   n_trials=20,
                   timeout=None
                   )

    # Metrics
    print("Best trial:")
    print("  Value:", study.best_value)
    print("  Params:")
    for k, v in study.best_params.items():
        print(f"    {k}: {v}")

    study.trials_dataframe().to_csv(OPTIMIZER_RESULTS, index=False)

if __name__ == "__main__":
    main()

## YoloModelV8 train_final_fase1

In [None]:
%%time

def main():
    """
    Final training and evaluation script for YOLOv8 using Optuna-selected parameters.

    This script loads the best hyperparameters obtained from Optuna optimization,
    trains a final YOLOv8 model on the full training setup, evaluates it on the
    test dataset, and saves both the trained weights and the parameters used.
    """

    best_params = load_best_params(OPTIMIZER_RESULTS)
    print(best_params)

    model = ModelYoloV8(MODEL_NAME_YOLO)

    # Final Train
    model.fit(
              data=DATASET_YAML,
              device=DEVICE,
              imgsz=640,

              batch=32,
              epochs=50,
              patience=5,

              freeze=8,

              optimizer="AdamW",
              warmup_epochs=5,
              warmup_bias_lr=0.1,
              momentum=0.937,

              **best_params
              )

    metrics = model.evaluate(
                             data=DATASET_YAML,
                             device=DEVICE,
                             split="test"
                             )

    print("FINAL TEST METRICS:", metrics)

    model.save_model(weight_name_model="yolov8n_marinedebris_best_final_1.pt")
    pd.Series(best_params).to_csv(ROOT_DIR / "best_params_used_final_model_1.csv")

if __name__ == "__main__":
    main()

## YoloModelV8 train_baseline_adjusted

In [None]:
%%time

def main():
    model_yolov8 = ModelYoloV8()  # 640x640 Size

    # Training
    trained_yolov8 = model_yolov8.fit(
                                      data=DATASET_YAML,
                                      device=DEVICE,
                                      name="baseline_adjusted",
                                      project="runs/baseline_adjusted",
                                      imgsz=640,

                                      batch=32,
                                      epochs=30,
                                      patience=5,
                                      freeze=8,  # 10 -> last block layers of the backbone

                                      # Optimizer
                                      optimizer="AdamW",
                                      warmup_epochs = 5,
                                      warmup_bias_lr=0.1,
                                      momentum = 0.937,

                                      # IOU: The smaller the number, the lower the chance of overlap.
                                      iou = 0.6,  # default = 0.7,

                                      weight_decay = 0.0005,
                                      lr0 = 0.003,
                                      lrf = 0.01,  # lr_final = lr0 * lrf

                                      # Losses
                                      box = 10.,  # default = 7.5
                                      cls = 0.6,  # default = 0.5
                                      dfl = 2.,   # default = 1.5
                                      )

    # Test Metrics
    print("\n" * 5)
    print(f"TEST METRICS")
    metrics_yolov8 = model_yolov8.evaluate(
                                           data=DATASET_YAML,
                                           device=DEVICE,
                                           split="test",
                                           agnostic_nms=True
                                           )
    print(metrics_yolov8)

    # Confusion Matrix
    cm = model_yolov8.metrics.confusion_matrix.matrix
    sns.heatmap(cm, annot=True, fmt="d")
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.show()

    # Saving model in memory
    model_yolov8.save_model(weight_name_model="yolov8n_marinedebris_baseline_adjusted.pt")

if __name__ == "__main__":
    main()

## YoloModelV8 train_final_fase2

In [None]:
%%time

def main():
    best_params = load_best_params(OPTIMIZER_RESULTS)
    print(best_params)

    model = ModelYoloV8(MODEL_NAME_YOLO_FINAL1)

    # New Fine-tuning
    model.fit(
              data=DATASET_YAML,
              device=DEVICE,
              imgsz=640,
              batch=16,
              epochs=20,
              patience=5,
              freeze=0,
              lr0=best_params["lr0"] * 0.1
              )

    metrics = model.evaluate(
                             data=DATASET_YAML,
                             device=DEVICE,
                             split="test"
                             )

    # Confusion Matrix
    cm = metrics.confusion_matrix.metrics
    sns.heatmap(cm, annot=True, fmt="d")
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.show()

    print("FINAL TEST METRICS:", metrics)

    model.save_model(weight_name_model=MODEL_NAME_YOLO_FINAL_TUNNED)
    pd.Series(best_params).to_csv(MODEL_NAME_YOLO_FINAL_PARAMS)

if __name__ == "__main__":
    main()

## YoloModelV8 train_final_baseline_tunned

In [None]:
%%time
import seaborn as sns

def main():
    """
    Fine-tuning and evaluation script for a YOLOv8 baseline model.

    This script fine-tunes the previous trained YOLOv8 baseline model version in "train_baseline.py"
    using a reduced learning rate, evaluates performance on the test dataset, visualizes the
    confusion matrix, and saves the tuned model weights.
    """

    model = ModelYoloV8(WEIGHTS_YOLOV8_BASELINE)

    # Fine-tuning
    model.fit(
              data=DATASET_YAML,
              device=DEVICE,
              imgsz=640,
              batch=16,
              epochs=20,
              patience=5,
              freeze=0,
              lr0=0.003 * 0.1
              )

    metrics = model.evaluate(
                             data=DATASET_YAML,
                             device=DEVICE,
                             split="test"
                             )

    # Confusion Matrix
    cm = model.metrics.confusion_matrix.matrix.astype(int)
    sns.heatmap(
                cm,
                annot=True,
                fmt="d",
                cmap="Greens",
                cbar=True,
                linewidths=0.5,
                linecolor="white"
                )

    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.title("Confusion Matrix (Counts)")
    plt.tight_layout()
    plt.show()

    print("FINAL TEST METRICS:", metrics)

    model.save_model(weight_name_model=MODEL_NAME_YOLO_FINAL_BASELINE_TUNNED)

if __name__ == "__main__":
    main()

# PredictionInference YOLOV8

## inference.py

In [None]:
#!pip -q install norfair
!pip install -q --force-reinstall "numpy<2.0" "scipy<1.12" norfair

In [None]:
!pip uninstall -y numpy scipy norfair
!pip install numpy==1.26.4
!pip install scipy==1.11.4


In [None]:
!pip install norfair

In [None]:
import cv2
import matplotlib.pyplot as plt
from google.colab.patches import cv2_imshow
from ultralytics import YOLO
import numpy as np
from norfair import Detection, Tracker, Video, draw_tracked_objects

CLASS_COLORS = {
                "can": (255, 0, 0),               # blue
                "foam": (0, 255, 255),            # yellow
                "plastic": (0, 255, 0),           # green
                "plastic bottle": (0, 165, 255),  # orange
                "unknow": (128, 128, 128),        # gray
                }

class InferencePicture():
    """
    Perform YOLOv8 inference on a single image and visualize detections.

    This class loads a YOLOv8 model with specified weights, runs inference
    on a given image, renders the detected bounding boxes, and displays
    the resulting image.
    """

    def __init__(self, weights_yolo, image_path):
        """
        Initialize the image inference pipeline.

        Parameters
        ----------
        weights_yolo : str or pathlib.Path
            Path to the YOLOv8 model weights file.
        image_path : str or pathlib.Path
            Path to the input image used for inference.
        """

        self.model = YOLO(weights_yolo)
        self.image_path = image_path

    def results(self):
        """
        Run inference on the input image and display detections.

        This method performs YOLOv8 prediction, draws bounding boxes
        on the image, displays the result using matplotlib, and
        returns the raw YOLO results object.

        Returns
        -------
        list
            List of YOLOv8 Results objects containing detection outputs.
        """

        results = self.model.predict(
                                     source=self.image_path,
                                     imgsz=640,
                                     agnostic_nms=True
                                     )
        img_det = results[0].plot()
        img_det = img_det[:, :, ::-1]

        h, w = img_det.shape[:2]
        dpi = 100

        plt.figure(figsize=(w / dpi, h / dpi), dpi=dpi)
        plt.imshow(img_det)
        plt.axis("off")
        plt.show()

        return results

class InferenceVideo():
    """
    Perform object detection and tracking on a video using a YOLO model.

    This class loads a trained YOLO model, iterates over video frames,
    performs inference, tracks detected objects across frames, and
    renders bounding boxes with object IDs, class labels, and confidence
    scores on the output video.
    """

    def __init__(self, input_path: str, model_path):
        """
        Initialize the video inference pipeline.

        Parameters
        ----------
        input_path : str
            Path to the input video file.
        model_path : str
            Path to the trained YOLO model weights.
        """

        self.input_path = input_path
        self.model = YOLO(model_path)
        self.video = Video(input_path=self.input_path)
        self.tracker = Tracker(distance_function="euclidean", distance_threshold=100)

    def run(self):
        """
        Run inference and tracking over the entire video.

        For each frame, the method:
        - Runs YOLO inference to detect objects.
        - Converts detections into tracking-compatible format.
        - Updates object tracks using a distance-based tracker.
        - Draws bounding boxes, object IDs, class names, and confidence
          scores on the frame.
        - Writes the processed frame to the output video.
        """

        for frame in self.video:
            results = self.model(frame, agnostic_nms=True, conf=0.4)  # Inference
            detections = []

            for r in results:
                boxes = r.boxes
                if boxes is None or len(boxes) == 0:
                    continue

                for xyxy, cls_id, conf in zip(boxes.xyxy, boxes.cls, boxes.conf):
                    x1, y1, x2, y2 = xyxy.tolist()

                    # Center for tracking
                    center = np.array([(x1 + x2) / 2, (y1 + y2) / 2], dtype=np.float32)

                    detections.append(
                                    Detection(
                                                points=center,
                                                scores=np.array([float(conf)]),
                                                data={
                                                    "bbox": (int(x1), int(y1), int(x2), int(y2)),
                                                    "class_name": self.model.names[int(cls_id)],
                                                    "conf": float(conf),
                                                    },
                                                )
                                    )

            tracked_objects = self.tracker.update(detections=detections)

            # Drawing bounding boxes
            for obj in tracked_objects:
                det = obj.last_detection
                if det is None or det.data is None:
                    continue

                x1, y1, x2, y2 = det.data["bbox"]
                label = det.data["class_name"]
                conf = det.data["conf"]

                color = CLASS_COLORS.get(label, (255, 255, 255))

                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
                cv2.putText(
                            frame,
                            f"ID {obj.id} | {label} {conf:.2f}",
                            (x1, max(0, y1 - 8)),
                            cv2.FONT_HERSHEY_SIMPLEX,
                            0.55,
                            color,
                            2,
                            cv2.LINE_AA,
                            )

            self.video.write(frame)

In [None]:
from pathlib import Path
import cv2
from ultralytics import YOLO  # type: ignore
import numpy as np
from norfair import Detection, Tracker  # type: ignore

CLASS_COLORS = {
                "can": (255, 0, 0),               # blue
                "foam": (0, 255, 255),            # yellow
                "plastic": (0, 255, 0),           # green
                "plastic bottle": (0, 165, 255),  # orange
                "unknow": (128, 128, 128),        # gray
                }

class InferencePicture():
    """
    Perform YOLOv8 inference on a single image.

    This class runs object detection on an input image and returns
    the annotated image as a NumPy array, suitable for API responses
    or further processing.
    """

    def __init__(self, weights_yolo, image_path):
        """
        Initialize the image inference pipeline.

        Parameters
        ----------
        weights_yolo : str or pathlib.Path
            Path to the YOLOv8 model weights file.
        image_path : str or pathlib.Path
            Path to the input image used for inference.
        """

        self.model = YOLO(weights_yolo)
        self.image_path = image_path

    def run(self):
        """
        Run YOLOv8 inference and return the annotated image.

        Returns
        -------
        np.ndarray
            Annotated image in RGB format (H, W, 3), dtype uint8.
        """

        results = self.model.predict(
                                     source=self.image_path,
                                     imgsz=640,
                                     agnostic_nms=True
                                     )

        img_bgr = results[0].plot()

        return img_bgr

class InferenceVideo():
    """
    Perform object detection and tracking on a video using a YOLO model.

    This class loads a trained YOLO model, iterates over video frames,
    performs inference, tracks detected objects across frames, and
    renders bounding boxes with object IDs, class labels, and confidence
    scores on the output video.
    """

    def __init__(self, input_path: str, model_path):
        """
        Initialize the video inference pipeline.

        Parameters
        ----------
        input_path : str
            Path to the input video file.
        model_path : str
            Path to the trained YOLO model weights.
        """

        self.input_path = input_path
        self.model = YOLO(model_path)
        self.tracker = Tracker(distance_function="euclidean", distance_threshold=100)


    def run(self):
        """
        Run inference and tracking over the entire video.

        This version explicitly controls video reading and writing
        using OpenCV to ensure compatibility in Docker environments.
        """

        # OpenCV reader
        cap = cv2.VideoCapture(self.input_path)
        if not cap.isOpened():
            raise RuntimeError(f"Could not open video: {self.input_path}")

        fps = cap.get(cv2.CAP_PROP_FPS)

        if not fps or fps <= 0:
            fps = 30.0

        width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

        # OpenCV writer
        in_path = Path(self.input_path)
        output_path = str(in_path.with_name(in_path.stem + "_annotated.mp4"))
        fourcc = cv2.VideoWriter_fourcc(*"mp4v")  # type: ignore
        writer = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

        # Frame loop
        while True:
            ret, frame = cap.read()
            if not ret:
                break

            # YOLO inference
            results = self.model(frame, agnostic_nms=True, conf=0.4)
            detections = []

            for r in results:
                boxes = r.boxes
                if boxes is None or len(boxes) == 0:
                    continue

                for xyxy, cls_id, conf in zip(boxes.xyxy, boxes.cls, boxes.conf):
                    x1, y1, x2, y2 = xyxy.tolist()

                    center = np.array(
                                      [(x1 + x2) / 2, (y1 + y2) / 2],
                                      dtype=np.float32
                                      )

                    detections.append(
                        Detection(
                            points=center,
                            scores=np.array([float(conf)]),
                            data={
                                  "bbox": (int(x1), int(y1), int(x2), int(y2)),
                                  "class_name": self.model.names[int(cls_id)],
                                  "conf": float(conf),
                                  },
                                  )
                                      )

            # Norfair tracking
            tracked_objects = self.tracker.update(detections=detections)

            # Drawing bounding boxes
            for obj in tracked_objects:
                det = obj.last_detection
                if det is None or det.data is None:
                    continue

                x1, y1, x2, y2 = det.data["bbox"]
                label = det.data["class_name"]
                conf = det.data["conf"]

                color = CLASS_COLORS.get(label, (255, 255, 255))

                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
                cv2.putText(
                            frame,
                            f"ID {obj.id} | {label} {conf:.2f}",
                            (x1, max(0, y1 - 8)),
                            cv2.FONT_HERSHEY_SIMPLEX,
                            0.55,
                            color,
                            2,
                            cv2.LINE_AA,
                            )

            # writer
            writer.write(frame)

        # Cleanup
        cap.release()
        writer.release()

        return output_path

In [None]:
# For images
path = f"{str(ROOT_DIR)}/{MODEL_NAME_YOLO_FINAL_BASELINE_TUNNED}"
inference = InferencePicture(path, TEST_IMAGE6)
inference.results()

In [None]:
# For videos
path_model = f"{ROOT_DIR}/{MODEL_NAME_YOLO_FINAL_BASELINE_TUNNED}"
video_infer = InferenceVideo(
                             model_path=str(path_model),
                             input_path=str(TEST_VIDEO),
                             )

video_infer.run()

In [None]:
# Main inference
def main():
    path = TEST_VIDEO  # TEST_VIDEO or TEST_IMAGE6

    if path.suffix.lower() in {".jpg", ".jpeg", ".png", ".bmp"}:
        # Image
        InferencePicture(Path(ROOT_DIR / MODEL_NAME_YOLO_FINAL_BASELINE_TUNNED), path).results()

    elif path.suffix.lower() in {".mp4", ".avi", ".mov", ".mkv"}:
        # Video
        InferenceVideo(str(path), str(Path(ROOT_DIR / MODEL_NAME_YOLO_FINAL_BASELINE_TUNNED))).run()

    else:
        raise ValueError(f"Invalid format: {path.suffix}")

if __name__ == "__main__":
    main()

In [None]:
# Imports
import cv2
import matplotlib.pyplot as plt
from google.colab.patches import cv2_imshow
from ultralytics import YOLO

#Inference YOLOV8
model = YOLO(WEIGHTS_YOLOV8_BASELINE)

results = model.predict(
                        source=TEST_IMAGE6,
                        agnostic_nms=True
                        )

img_det = results[0].plot()
cv2_imshow(img_det)

In [None]:
#!pip -q install norfair
!pip install -q --force-reinstall "numpy<2.0" "scipy<1.12" norfair

In [None]:
import numpy as np
from norfair import Detection, Tracker, Video, draw_tracked_objects

# Config Norfair
video = Video(input_path=str(TEST_VIDEO))
tracker = Tracker(distance_function="euclidean", distance_threshold=150)
model = YOLO(WEIGHTS_YOLOV8_BASELINE)

for frame in video:
    results = model(frame)  # Inference
    detections = []

    for result in results:
        for box in result.boxes.xyxy:  # Bounding Boxes
            x1, y1, x2, y2 = box.tolist()
            center = np.array([(x1 + x2) / 2, (y1 + y2) / 2])
            detections.append(Detection(center))

    tracked_objects = tracker.update(detections=detections)
    draw_tracked_objects(frame, tracked_objects)
    video.write(frame)


In [None]:
import numpy as np
import cv2
from norfair import Detection, Tracker, Video

path = f"{str(ROOT_DIR)}/{MODEL_NAME_YOLO_FINAL_BASELINE_TUNNED}"
model = YOLO(path)

CLASS_COLORS = {
                "can": (255, 0, 0),             # blue
                "foam": (0, 255, 255),          # yellow
                "plastic": (0, 255, 0),         # green
                "plastic bottle": (0, 165, 255),# orange
                "unknow": (128, 128, 128),      # gray
                }

video = Video(input_path=str(TEST_VIDEO))
tracker = Tracker(distance_function="euclidean", distance_threshold=80)

for frame in video:
    results = model(frame, agnostic_nms=True)
    detections = []

    for r in results:
        boxes = r.boxes
        if boxes is None or len(boxes) == 0:
            continue

        for xyxy, cls_id, conf in zip(boxes.xyxy, boxes.cls, boxes.conf):
            x1, y1, x2, y2 = xyxy.tolist()

            # Center for tracking
            center = np.array([(x1 + x2) / 2, (y1 + y2) / 2], dtype=np.float32)

            detections.append(
                              Detection(
                                        points=center,
                                        scores=np.array([float(conf)]),
                                        data={
                                              "bbox": (int(x1), int(y1), int(x2), int(y2)),
                                              "class_name": model.names[int(cls_id)],
                                              "conf": float(conf),
                                              },
                                        )
                              )

    tracked_objects = tracker.update(detections=detections)

    # Drawing bounding boxes
    for obj in tracked_objects:
        det = obj.last_detection
        if det is None or det.data is None:
            continue

        x1, y1, x2, y2 = det.data["bbox"]
        label = det.data["class_name"]
        conf = det.data["conf"]

        color = CLASS_COLORS.get(label, (255, 255, 255))

        cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
        cv2.putText(
                    frame,
                    f"ID {obj.id} | {label} {conf:.2f}",
                    (x1, max(0, y1 - 8)),
                    cv2.FONT_HERSHEY_SIMPLEX,
                    0.55,
                    color,
                    2,
                    cv2.LINE_AA,
                    )

    video.write(frame)