# <font style="color:blue">Project 3: Object Detection</font>





#### Maximum Points: 100



|Sr. no.|Section|Points|
|:-------|:-------|:------:|
|1|Plot Ground Truth Bounding Boxes|20|
|2|Training|25|
|3|Inference|15|
|4|COCO Detection Evaluation|25|
|5|COCO Detection Evaluation|15|

# <font style="color:purple">Download the config file and trainer package</font> 

In [None]:
!wget "https://raw.githubusercontent.com/RadimKozl/OpenCV_PyTorch_Project3/refs/heads/main/trainer.zip" -O ./trainer.zip
!ls /kaggle/working/
!unzip /kaggle/working/trainer.zip
!rm /kaggle/working/trainer.zip

In [None]:
!wget "https://raw.githubusercontent.com/RadimKozl/OpenCV_PyTorch_Project3/refs/heads/main/config.yaml" -O ./config.yaml

# <font style="color:purple">Install external libraries</font> 

In [None]:
!pip install psutil 
!pip install h5py
!pip install pycocotools

In [None]:
!export PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True

# <font style="color:purple">Imports libraries</font> 

In [None]:
import os
import gc
import json
import h5py
import csv
import yaml
import shutil
import pandas as pd
import numpy as np
import random
import time
import psutil
import matplotlib.patches as patches
import matplotlib.pyplot as plt
import IPython
from PIL import Image
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor

from operator import itemgetter
import multiprocessing as mp
mp.set_start_method('spawn', force=True)

from pycocotools.coco import COCO
from pycocotools.cocoeval import COCOeval

In [None]:
import torch
import torchvision
import torch.nn as nn
import torch.optim as optim

from torch.utils.data import Dataset
from torchvision.transforms import functional as F

from torchvision import datasets, transforms
from torch.optim.lr_scheduler import MultiStepLR
from torch.nn.parallel import DataParallel
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection import faster_rcnn
import torchvision.models as models

torch.multiprocessing.set_start_method('spawn', force=True)

import torchvision.models as models

from torchvision.ops import MultiScaleRoIAlign
from torchvision.models.detection.rpn import AnchorGenerator
from torchvision.models.detection import FasterRCNN

In [None]:
from trainer import RCNNTrainer, hooks, configuration
from trainer.utils import patch_configs
from trainer.utils import setup_system

from trainer.metrics import APEstimator
from trainer.matplotlib_visualizer import MatplotlibVisualizer, DataEmbedingVisualizer, PRVisualizer
from trainer.matplotlib_visualizer import TensorBoardVisualizer, ModelVisualizer, WeightsHistogramVisualizer
from trainer.tensorboard_visualizer import set_writer

from trainer.datasets import HDF5Dataset
from trainer.utils import collate_fn
from trainer.configuration import load_config_from_yaml

from trainer import RCNNTrainer, hooks, configuration


## <font style="color:orange">Function for check memory management</font> 

In [None]:
def memory_management():
    
    cpu_percent = psutil.cpu_percent(interval=1)
    print(f"CPU Usage: {cpu_percent}%")
    
    memory_usage = psutil.virtual_memory()
    print(f"Memory Usage: {memory_usage.percent}%")
    
    disk_usage = psutil.disk_usage('/')
    print(f"Disk Usage: {disk_usage.percent}%")

# <font style="color:purple">Start TensorBoard by Ngrog tunnel</font> 

In [None]:
!tensorboard --version

In [None]:
!wget https://bin.equinox.io/c/bNyj1mQVY4c/ngrok-v3-stable-linux-amd64.tgz
!tar xf ./ngrok-v3-stable-linux-amd64.tgz -C /usr/local/bin

In [None]:
pool = mp.Pool(processes = 10)
results_of_processes = [pool.apply_async(os.system, args=(cmd, ), callback = None )
                        for cmd in [
                        f"tensorboard --logdir /kaggle/working/faster_rcnn --load_fast=false --host 0.0.0.0 --port 6006 &",
                        "/usr/local/bin/ngrok http 6006 &"
                        ]]

In [None]:
! curl -s http://localhost:4040/api/tunnels | python3 -c \
    "import sys, json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])"

# <font style="color:purple">Download the Dataset</font> 



**[Download the Vehicle registration plate](https://www.dropbox.com/s/k81ljpmzy3fgtx9/Dataset.zip?dl=1)**





Download the Vehicle Registration Plate dataset from [here](https://www.dropbox.com/s/k81ljpmzy3fgtx9/Dataset.zip?dl=1) and unzip it. 



We will have the following directory structure:



```

Dataset

├── train

│   └── Vehicle registration plate

│       └── Label

└── validation

    └── Vehicle registration plate

        └── Label

```



Unzipping the file will give you a directory `Dataset`. This directory has two folder `train` and `validation`. Each train and validation folder has `Vehicle registration plate`  folder with `.jpg` images and a folder `Labels`.  `Labels` folder has bounding box data for the images.





For example,

For image: `Dataset/train/Vehicle registration plate/bf4689922cdfd532.jpg`

Label file is  `Dataset/train/Vehicle registration plate/Label/bf4689922cdfd532.txt`



There are one or more lines in each `.txt` file. Each line represents one bounding box.

For example,

```

Vehicle registration plate 385.28 445.15 618.24 514.225

Vehicle registration plate 839.68 266.066462 874.24 289.091462

```



We have a single class detection (`Vehicle registration plate detection`) problem. So bounding box details start from the fourth column in each row.



Representation is in `xmin`, `ymin`, `xmax`, and `ymax` format.



**It has `5308` training and `386` validation dataset.**



Data is downloaded from [Open Images Dataset](https://storage.googleapis.com/openimages/web/index.html)

#  <font style="color:green">1. Plot Ground Truth Bounding Boxes [20 Points]</font> 



**You have to show three images from validation data with the bounding boxes.**



The plotted images should be similar to the following:



<img src='https://www.learnopencv.com/wp-content/uploads/2020/04/c3-w10-g1.png'>







<img src='https://www.learnopencv.com/wp-content/uploads/2020/04/c3-w10-g2.png'>







<img src='https://www.learnopencv.com/wp-content/uploads/2020/04/c3-w10-g3.png'>




##  <font style="color:blue">1.1. Class for create HDF5 file of datasets</font>

We create Json structure of `train`, `valid` and `test` datasets, for creating PyTorch Dataloader a standard module of trainer. We add test samples as [OpenCV_evalution_ALPR_dataset](https://www.kaggle.com/datasets/radimkzl/opencv-evalution-alpr-dataset).

In [None]:
class TransformDataset:
    def __init__(self, name_hdf5_file='datasets.hdf5', hdf5_image=False):
        self.name_hdf5_file = name_hdf5_file
        self.path_test_images = os.path.join('/kaggle', 'input', 'opencv-evalution-alpr-dataset', 'cars_ALPR_test', 'images')
        self.path_test_annotations = os.path.join('/kaggle', 'input', 'opencv-evalution-alpr-dataset', 'cars_ALPR_test','Annotations')
        self.path_train_images = os.path.join('/kaggle', 'input', 'vehicle-registration-plate', 'Dataset', 'train', 'Vehicle registration plate')
        self.path_valid_images = os.path.join('/kaggle', 'input', 'vehicle-registration-plate', 'Dataset', 'validation', 'Vehicle registration plate')
        self.hdf5_file = None
        self.hdf5_image = hdf5_image
        

    def __select_files(self, root_path, extensions=('.jpg', '.png')):
        """
        Select image files from the given directory with specified extensions.

        Args:
            root_path (str): Path to the directory containing images.
            extensions (tuple): Valid file extensions.

        Returns:
            list: List of file paths matching the extensions.
        """
        if not os.path.exists(root_path):
            print(f"Warning: Path {root_path} does not exist!")
            return []

        return [os.path.join(root_path, filename) for filename in os.listdir(root_path) if filename.endswith(extensions)]

    def __get_image_shape(self, image_path):
        """
        Get the shape (width, height, channels) of an image.

        Args:
            image_path (str): Path to the image file.

        Returns:
            list: [width, height, channels]
        """
        channel = None
        img = Image.open(image_path)
        w, h = img.size
        c = img.mode
        if c == 'RGB':
            channel = 3
        return [w, h, channel]

    def __select_data(self, file_path):
        """
        Parse annotation data from a file.

        Args:
            file_path (str): Path to the annotation file.

        Returns:
            tuple: Tuple of data (label_id, box_coordinates)
        """
        box_coordinates = []
        labels_id = []
        list_class_name = ['__background__', 'Vehicle registration plate']

        if not os.path.isfile(file_path):
            print(f"Warning: Annotation file {file_path} not found!")
            return []

        with open(file_path, 'r') as input_file:
            lines = input_file.readlines()

        for line in lines:
            parts = line.strip().split()
            if len(parts) < 5:
                print(f"Warning: Invalid annotation format in file {file_path}: {line.strip()}")
                continue

            label = parts[0] + ' ' + parts[1] + ' ' + parts[2]
            numbers = [round(float(num)) for num in parts[3:]]

            labels_id.append(list_class_name.index(label))
            box_coordinates.append(numbers)

        return (labels_id, box_coordinates)

    def process_files(self, file_paths, dataset_type):
        """
        Process image and annotation files and add them to the HDF5 dataset.

        Args:
            file_paths (list): List of image file paths.
            dataset_type (str): Dataset type ('train', 'valid', 'test').
        """
        for path in tqdm(file_paths, desc=f"Processing {dataset_type} files"):
            try:
                # Get image dimensions
                img_dim = self.__get_image_shape(path)
                name_dir, name_input = os.path.split(path)
                id_name, file_ext = os.path.splitext(name_input)

                # Determine label file path
                if dataset_type == 'test':
                    # Test dataset annotations are in a separate directory
                    path_label = os.path.join(self.path_test_annotations, f"{id_name}.txt")
                else:
                    annotations_dir = os.path.join(name_dir, 'Label')
                    path_label = os.path.join(annotations_dir, f"{id_name}.txt")

                # Read and process annotation data
                sub_data = self.__select_data(path_label)
                if not sub_data:
                    print(f"Warning: No annotations found for {path_label}. Skipping.")
                    continue

                # Prepare data dictionary
                data = {
                    "name_id": str(id_name),
                    "name": str(name_input),
                    "path": str(path),
                    "type": file_ext.lstrip('.'),
                    "dimension": img_dim,
                    "labels": sub_data[0],
                    "boxes": sub_data[1]
                }

                try:
                    # Serialize data and write to HDF5
                    # set dataset group
                    dataset_group = self.hdf5_file[dataset_type]
                    # Create a subgroup for the image
                    image_group = dataset_group.create_group(data['name_id'])
                    if self.hdf5_image:
                        # Add image path, labels, and boxes to the image group
                        img = Image.open(path)
                        img_array = np.array(img)
                        image_group.create_dataset('image', data=img_array)
                    
                        image_group.attrs['name'] = np.array(data['name'], dtype='S100')
                        image_group.attrs['type'] = np.array(data['type'], dtype='S3')
                        image_group.attrs['dimension'] = np.array(data['dimension'], dtype='float64')
                        image_group.attrs['labels'] = data['labels']
                        image_group.attrs['boxes'] = data['boxes']
                        image_group.attrs['link'] = np.array(path, dtype='S200')
                    else:
                        image_group.create_dataset('image_link', data=[path])
                        image_group.attrs['name'] = np.array(data['name'], dtype='S100')
                        image_group.attrs['type'] = np.array(data['type'], dtype='S3')
                        image_group.attrs['dimension'] = np.array(data['dimension'], dtype='float64')
                        image_group.attrs['labels'] = data['labels']
                        image_group.attrs['boxes'] = data['boxes']
                        
                except Exception as e:
                    print(f"Error processing: {e}")
            

            except Exception as e:
                print(f"Error processing file {path}: {e}")

    def create_hdf5_data(self):
        """
        Create HDF5 dataset file with train, valid, and test groups.
        """
        # Select files for each dataset type
        list_train_files = self.__select_files(self.path_train_images, ('.jpg', '.png'))
        list_valid_files = self.__select_files(self.path_valid_images, ('.jpg', '.png'))
        list_test_files = self.__select_files(self.path_test_images, ('.jpg', '.png'))

        # Validate file lists
        if not list_train_files:
            print("No training files found!")
        if not list_valid_files:
            print("No validation files found!")
        if not list_test_files:
            print("No test files found!")

        with h5py.File(os.path.join('/kaggle', 'working', self.name_hdf5_file), 'w') as self.hdf5_file:
            # Create dataset groups
            self.hdf5_file.create_group('train')
            self.hdf5_file.create_group('valid')
            self.hdf5_file.create_group('test')

            # Add metadata attributes
            self.hdf5_file.attrs['class_number'] = 2
            self.hdf5_file.attrs['names_class'] = ['__background__', 'Vehicle registration plate']

            # Process and add files
            self.process_files(list_train_files, 'train')
            self.process_files(list_valid_files, 'valid')
            self.process_files(list_test_files, 'test')

    def close(self):
        if hasattr(self, 'name_hdf5_file') and self.name_hdf5_file:
            self.hdf5_file.close()
        else:
            try:
                f = h5py.File(os.path.join('/kaggle', 'working', self.name_hdf5_file), 'w')
                f.close()
                print("File is closed...")
            except:
                  print("File is still locked, we close file...")  
            finally:
                self.hdf5_file.close()

        print("HDF5 dataset created successfully!")

##  <font style="color:blue">1.2. Create HDF5 file of datasets</font>

In [None]:
%%time
transform_dataset = TransformDataset(name_hdf5_file='datasets.hdf5', hdf5_image=False)
transform_dataset.create_hdf5_data()

In [None]:
transform_dataset.close()
del transform_dataset

In [None]:
gc.collect()
memory_management()

##  <font style="color:blue">1.3. Create Dataloaders for read data</font>

###  <font style="color:orange">Plot function of samples of data</font>

In [None]:
def plot_images(loader, rows=2, columns=3):
    """
    Plot few images with their bounding boxes.

    Args:
        loader (DataLoader): DataLoader containing the dataset.
        rows (int): Number of rows in the plot.
        columns (int): Number of columns in the plot.
    """
    plt.rcParams["figure.figsize"] = (15, 9)
    plt.figure()

    for images, targets in loader:
        for i in range(min(len(targets), rows * columns)):
            plt.subplot(rows, columns, i + 1)
            img = F.to_pil_image(images[i])
            plt.imshow(img)

            # Get bounding boxes and labels
            boxes = targets[i]['boxes'].numpy()
            labels = targets[i]['labels'].numpy()

            # Create an Axes instance
            ax = plt.gca()

            # Plot each bounding box
            for box, label in zip(boxes, labels):
                xmin, ymin, xmax, ymax = box
                width = xmax - xmin
                height = ymax - ymin
                rect = patches.Rectangle((xmin, ymin), width, height, linewidth=2, edgecolor='r', facecolor='none')
                ax.add_patch(rect)
                plt.text(xmin, ymin, f'Label: {label}', color='red', fontsize=12, backgroundcolor='none')

            plt.axis('off')

        plt.show()
        break


###  <font style="color:orange">1.3.1 Create train dataset</font>

In [None]:
path_hdf5_file = os.path.join('/kaggle','working','datasets.hdf5')

In [None]:
%%time
hdf5_train_dataset = HDF5Dataset(path_hdf5_file, 'train')

In [None]:
%%time
# Calculate mean and std of train dataset
mean_train, std_train = hdf5_train_dataset.calculate_mean_std_manual()
print(f"Calculated Mean: {mean_train}, Std: {std_train} of train dataset.")

In [None]:
# show samples
train_dataloader = torch.utils.data.DataLoader(
    hdf5_train_dataset, 
    batch_size=6, 
    shuffle=True,
    collate_fn=collate_fn,
    num_workers=0,
    pin_memory=True
)

In [None]:
plot_images(train_dataloader, rows=2, columns=3)

In [None]:
del train_dataloader

###  <font style="color:orange">1.3.2 Create valid dataset</font>

In [None]:
%%time
hdf5_valid_dataset = HDF5Dataset(path_hdf5_file, 'valid')

In [None]:
%%time
# Calculate mean and std of train dataset
mean_valid, std_valid = hdf5_valid_dataset.calculate_mean_std_manual()
print(f"Calculated Mean: {mean_valid}, Std: {std_valid} of valid dataset.")
del mean_valid, std_valid

In [None]:
# show samples
valid_dataloader = torch.utils.data.DataLoader(
    hdf5_valid_dataset, 
    batch_size=6, 
    shuffle=True,
    collate_fn=collate_fn,
    num_workers=0,
    pin_memory=True
)


In [None]:
plot_images(valid_dataloader, rows=2, columns=3)

In [None]:
del valid_dataloader, hdf5_valid_dataset

###  <font style="color:orange">1.3.3 Create test dataset</font>

In [None]:
%%time
hdf5_test_dataset = HDF5Dataset(path_hdf5_file, 'test')

In [None]:
%%time
# Calculate mean and std of train dataset
mean_test, std_test = hdf5_test_dataset.calculate_mean_std_manual()
print(f"Calculated Mean: {mean_test}, Std: {std_test} of train dataset.")
del mean_test, std_test

In [None]:
# show samples
test_dataloader = torch.utils.data.DataLoader(
    hdf5_test_dataset, 
    batch_size=6, 
    shuffle=True,
    collate_fn=collate_fn,
    num_workers=0,
    pin_memory=True
)

In [None]:
plot_images(test_dataloader, rows=2, columns=3)

In [None]:
del test_dataloader, hdf5_test_dataset

###  <font style="color:orange">1.3.4 Show class names</font>

In [None]:
list_class_names = hdf5_train_dataset.names_class

# Access metadata
print(f"Class Number: {hdf5_train_dataset.class_number}")
print(f"Class Names: {list_class_names}")

In [None]:
gc.collect()
memory_management()

#  <font style="color:green">2. Training [25 Points]</font> 



- **Write your training code in this section.**



- **You also have to share ([shared logs example](https://tensorboard.dev/experiment/JRtnsKbwTaq1ow6nPLPGeg)) the loss plot of your training using tensorboard.dev.** 



How to share TensorBoard logs using tensorboard.dev find [here](https://courses.opencv.org/courses/course-v1:OpenCV+OpenCV-106+2019_T1/courseware/b1c43ffe765246658e537109e188addb/d62572ec8bd344db9aeae81235ede618/4?activate_block_id=block-v1%3AOpenCV%2BOpenCV-106%2B2019_T1%2Btype%40vertical%2Bblock%40398b46ddcd5c465fa52cb4d572ba3229).

##  <font style="color:blue">2.1. Create Model</font>

###  <font style="color:orange">2.1.1 Simple model</font>

In [None]:
def faster_rcnn_pretrained_model(num_classes):
    # load an instance detection model pre-trained on COCO
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights='DEFAULT')

    # get the number of input features for the classifier
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    # replace the pre-trained head with a new one
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    
    return model

###  <font style="color:orange">2.1.2 Model update / proposed model (Fine-tuning)</font>

In [None]:
def faster_rcnn_pretrained_model_alexnet(num_classes):
    
    # Get the backbone of any pretrained network, we'll use AlexNet
    alexnet = models.alexnet(pretrained=True)
    new_backbone = alexnet.features
    new_backbone.out_channels = 256
    
    # Configure the anchors. We shall have 12 different anchors.
    new_anchor_generator = AnchorGenerator(sizes=((32, 64, 128, 256),),
                                          aspect_ratios=((0.5, 1.0, 2.0),))
    
    # Configure the output size of ROI-Pooling layer.
    # We shall end up with (num_boxes, num_features, 4, 4) after the ROIPooling layer
    new_roi_pooler = MultiScaleRoIAlign(featmap_names=['0'], output_size=4, sampling_ratio=1)
    
    
    # let's use dummy variables for mean, std, min_size and max_size
    min_size = 300
    max_size = 1050
    mean = mean_train
    std = std_train
   
    # Instantiate the Faster-rcnn model with the variables declared above.
    frcnn_model = FasterRCNN(backbone=new_backbone,
                          num_classes=num_classes,
                          min_size=min_size,
                          max_size=max_size,
                          image_mean=mean,
                          image_std=std,
                          rpn_anchor_generator=new_anchor_generator,
                          box_roi_pool=new_roi_pooler)

    return frcnn_model

##  <font style="color:blue">2.2. Config file</font>

###  <font style="color:orange">2.2.1 Update config file</font>

In [None]:
config_ex_path = os.path.join('/kaggle','working','config.yaml') # path of yaml file

In [None]:
# load yaml file
with open(config_ex_path, 'r') as f:
    config = yaml.safe_load(f)

In [None]:
# set system config values
config['system']['seed'] = 42
config['system']['cudnn_benchmark_enabled'] = False
config['system']['cudnn_deterministic'] = True

# set dataset config values
config['dataset']['root_dir'] = os.path.join('/','kaggle','working')
config['dataset']['hdf5_file'] = "datasets.hdf5"

# set dataloader config values
config['dataloader']['batch_size'] = 250
config['dataloader']['num_workers'] = 0
config['dataloader']['data_augmentation'] = False

# set optimizer config values
config['optimizer']['learning_rate'] = 0.001
config['optimizer']['momentum'] = 0.9
config['optimizer']['weight_decay'] = 0.0001
config['optimizer']['lr_step_milestones'] = [30, 40]
config['optimizer']['lr_gamma'] = 0.1

# set trainer config values
config['trainer']['model_dir'] = os.path.join('/kaggle','working','modelscheckpoints')
config['trainer']['model_save_best'] = True
config['trainer']['model_saving_frequency'] = 1
config['trainer']['device'] = "cuda"
config['trainer']['epoch_num'] = 30
config['trainer']['progress_bar'] = False
config['trainer']['normalization'] = False

In [None]:
with open(config_ex_path, 'w') as f:
    yaml.dump(config, f)

###  <font style="color:orange">2.2.2 Load Config data from file</font>

In [None]:
system_config, dataset_config, dataloader_config, optimizer_config, trainer_config = load_config_from_yaml(config_ex_path)

In [None]:
print('system_config\n', system_config)
print('\ndataset_config\n', dataset_config)
print('\ndataloader_config\n', dataloader_config)
print('\noptimizer_config\n', optimizer_config)
print('\ntrainer_config\n', trainer_config)

##  <font style="color:blue">2.2. Create Experiment Class</font>

In [None]:
class Experiment:
    def __init__(
        self,
        system_config: configuration.SystemConfig = system_config,
        dataset_config: configuration.DatasetConfig = dataset_config,  
        dataloader_config: configuration.DataloaderConfig = dataloader_config,
        optimizer_config: configuration.OptimizerConfig = optimizer_config,
    ):
        self.system_config = system_config
        setup_system(system_config)
        self.model_name = 'faster_rcnn_pretrained'
        self.tb_writer = set_writer(os.path.join('faster_rcnn',self.model_name))
        
        # fruit detection data has 1-classes. Anything other than these three classes is called background
        self.classes = list_class_names
        
        # written custom dataset class of our dataset
        self.dataset_train = HDF5Dataset(
            hdf5_file=path_hdf5_file, 
            dataset_type='train',
            transform=None,
            train=True,
            width_image=1024
        )
          
        self.loader_train = torch.utils.data.DataLoader(
            dataset=self.dataset_train,
            batch_size=dataloader_config.batch_size,
            shuffle=True,
            collate_fn=collate_fn,
            num_workers=dataloader_config.num_workers,
            pin_memory=True
        )
        

        self.dataset_valid = HDF5Dataset(
            hdf5_file=path_hdf5_file, 
            dataset_type='valid',
            transform=None,
            train=False,
        )
        
        
        self.loader_valid = torch.utils.data.DataLoader(
            dataset=self.dataset_valid,
            batch_size=dataloader_config.batch_size,
            shuffle=False,
            collate_fn=collate_fn,
            num_workers=dataloader_config.num_workers,
            pin_memory=True
        )

        self.dataset_test = HDF5Dataset(
            hdf5_file=path_hdf5_file, 
            dataset_type='test',
            transform=None,
            train=False,
        )
        
        
        self.loader_test = torch.utils.data.DataLoader(
            dataset=self.dataset_test,
            batch_size=dataloader_config.batch_size,
            shuffle=False,
            collate_fn=collate_fn,
            num_workers=dataloader_config.num_workers,
            pin_memory=True
        )

        if self.tb_writer is not None:
            add_data_embedings = DataEmbedingVisualizer(dataset=self.dataset_valid, writer=self.tb_writer, number_samples=50)
            add_data_embedings.update_charts()

        
        # get faster rcnn model pretrained on coco
        #self.model = faster_rcnn_pretrained_model_alexnet(len(self.classes))
        self.model = faster_rcnn_pretrained_model(len(self.classes))
        
        self.metric_fn = APEstimator(classes=self.classes)
        
        params = [p for p in self.model.parameters() if p.requires_grad]
        self.optimizer = optim.SGD(
            params,
            lr=optimizer_config.learning_rate,
            weight_decay=optimizer_config.weight_decay,
            momentum=optimizer_config.momentum
        )
        self.lr_scheduler = MultiStepLR(
            self.optimizer, milestones=optimizer_config.lr_step_milestones, gamma=optimizer_config.lr_gamma
        )
        self.visualizer = TensorBoardVisualizer(writer=self.tb_writer)

    def run(self, trainer_config: trainer_config) -> dict:  
        setup_system(self.system_config)
        device = torch.device(trainer_config.device)
        self.model = self.model.to(device)

        # add network graph with inputs info
        graph_loader = torch.utils.data.DataLoader(
            dataset=self.dataset_train,
            batch_size=1,
            shuffle=True,
            collate_fn=collate_fn,
            num_workers=0,
            pin_memory=True
        )
        
        inputs, targets = next(iter(graph_loader))
        inputs = inputs.to(trainer_config.device)
        
        add_network_graph_tensorboard = ModelVisualizer(self.model, images, self.tb_writer)
        add_network_graph_tensorboard.update_charts()

        model_trainer = RCNNTrainer(
            model=self.model,
            loader_train=self.loader_train,
            loader_test=self.loader_valid,
            metric_fn=self.metric_fn,
            optimizer=self.optimizer,
            lr_scheduler=self.lr_scheduler,
            device=device,
            data_getter=itemgetter("image"),
            target_getter=itemgetter("target"),
            stage_progress=trainer_config.progress_bar,
            get_key_metric=itemgetter("mAP"),
            visualizer=self.visualizer,
            model_save_best=trainer_config.model_save_best,
            model_saving_frequency=trainer_config.model_saving_frequency,
            save_dir=trainer_config.model_dir
        )

        model_trainer.register_hook("train", hooks.train_hook_faster_rcnn)
        model_trainer.register_hook("test", hooks.test_hook_faster_rcnn)
        model_trainer.register_hook("end_epoch", hooks.end_epoch_hook_faster_rcnn)
        self.metrics = model_trainer.fit(trainer_config.epoch_num)
        return self.metrics

    def draw_bboxes(self, rows, columns, trainer_config:  trainer_config):
        # load the best model
        if trainer_config.model_save_best:
            self.model.load_state_dict(
                torch.
                load(os.path.join(trainer_config.model_dir, self.model.__class__.__name__) + '_best.pth')
            )
        # or use the last saved
        self.model = self.model.eval()

        fig, ax = plt.subplots(
            nrows=rows, ncols=columns, figsize=(15, 30), gridspec_kw={
                'wspace': 0,
                'hspace': 0.05
            }
        )
        
        colors = [(255, 0, 0), (0, 225, 0), (0, 0, 225)]

        for axi in ax.flat:
            index = random.randrange(len(self.loader_test.dataset))

            image, targets = self.loader_test.dataset[index]

            device = torch.device(trainer_config.device)
            image = image.to(device).clone()

            detections = self.model(image.unsqueeze(0))
            bboxes = detections[0]['boxes'].cpu().detach().numpy()
            labels = detections[0]['labels'].cpu().detach().numpy()
            scores = detections[0]['scores'].cpu().detach().numpy()

            with torch.no_grad():
                img = image.cpu()
                img = img.numpy().transpose(1, 2, 0)
                img = (img * 255.).astype(np.uint8)
                gt_img = img.copy()
                pred_img = img.copy()

                for i, box in enumerate(targets['boxes']):
                    label = targets['labels'][i]
                    cls = self.classes[label]
                    clr = colors[label-1]
                    gt_img = cv2.rectangle(
                        gt_img, (int(box[0]), int(box[1])), (int(box[2]), int(box[3])), clr, thickness=2)
                    gt_img = cv2.putText(gt_img, cls, (int(box[0]), int(box[1])-10), cv2.FONT_HERSHEY_SIMPLEX, 
                                         0.9, clr, 2)
                    
                for i, box in enumerate(bboxes):
                    label = labels[i]
                    score = scores[i]
                    cls = self.classes[label]
                    clr = colors[label-1]
                    cls_score = '{0}:{1:.2}'.format(cls, score)
                    pred_img = cv2.rectangle(
                        pred_img, (int(box[0]), int(box[1])), (int(box[2]), int(box[3])), clr, thickness=2)
                    pred_img = cv2.putText(pred_img, cls_score, (int(box[0]), int(box[1])-10), 
                                           cv2.FONT_HERSHEY_SIMPLEX, 0.9, clr, 2)

                merged_img = np.concatenate((gt_img, pred_img), axis=1)
                axi.imshow(merged_img)
                axi.axis('off')
        fig.show()

    
    def infer_video(self, video_path: str, output_path: str, trainer_config: trainer_config, conf_threshold: float = 0.5):
        """
        Perform inference on a video and save the output with detected bounding boxes.
    
        Args:
            video_path (str): Path to the input video.
            output_path (str): Path to save the output video with detections.
            trainer_config: Configuration object for the trainer.
            conf_threshold (float): Confidence threshold to filter predictions.
        """
        # Load the best model if specified
        if trainer_config.model_save_best:
            self.model.load_state_dict(
                torch.load(os.path.join(trainer_config.model_dir, self.model.__class__.__name__ + '_best.pth'))
            )
        self.model = self.model.eval()
        device = torch.device(trainer_config.device)
        self.model = self.model.to(device)
    
        # Open the video file
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            raise ValueError(f"Cannot open video file {video_path}")
    
        # Get video properties
        frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        fps = int(cap.get(cv2.CAP_PROP_FPS))
        frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
        # Set up video writer for output
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))
    
        colors = [(255, 0, 0), (0, 255, 0), (0, 0, 255)]  # Colors for different classes
    
        print(f"Processing video: {video_path}")
        frame_idx = 0
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break
    
            # Convert frame to tensor
            img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            img = Image.fromarray(img)
            img_tensor = F.to_tensor(img).unsqueeze(0).to(device)
    
            # Perform inference
            with torch.no_grad():
                detections = self.model(img_tensor)
            boxes = detections[0]['boxes'].cpu().numpy()
            labels = detections[0]['labels'].cpu().numpy()
            scores = detections[0]['scores'].cpu().numpy()
    
            # Draw detections on the frame
            for i, box in enumerate(boxes):
                if scores[i] < conf_threshold:
                    continue
                label = labels[i]
                score = scores[i]
                cls = self.classes[label]
                color = colors[label % len(colors)]
                frame = cv2.rectangle(frame, (int(box[0]), int(box[1])), (int(box[2]), int(box[3])), color, 2)
                frame = cv2.putText(frame, f"{cls} {score:.2f}", (int(box[0]), int(box[1]) - 10),
                                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
    
            # Write the frame to the output video
            out.write(frame)
            frame_idx += 1
            print(f"Processed frame {frame_idx}/{frame_count}", end='\r')
    
        # Release resources
        cap.release()
        out.release()
        print(f"Video saved to {output_path}")

    
    def evaluate_coco(self, trainer_config: trainer_config):
        """
        Evaluate model performance using COCO Detection metrics.
    
        Args:
            trainer_config: Configuration object for trainer settings.
        """
        # Prepare the model for inference
        device = torch.device(trainer_config.device)
        self.model = self.model.to(device).eval()
        
        # COCO-style structures
        coco_annotations = {
            "images": [],
            "annotations": [],
            "categories": [{"id": i, "name": name} for i, name in enumerate(self.classes, start=1)]
        }
        predictions = []
        
        annotation_id = 1
        
        print("Preparing ground truth and predictions for COCO evaluation...")
        
        for img_id, (img, target) in enumerate(self.loader_valid, start=1):
            img = img.to(device)
            target = {k: v.to(device) for k, v in target.items()}
            
            # Add an image to COCO annotations
            coco_annotations["images"].append({
                "id": img_id,
                "file_name": f"image_{img_id}.jpg",
                "width": img.shape[-1],
                "height": img.shape[-2]
            })
            
            # Add ground truth bounding boxes to COCO annotations
            for box, label in zip(target["boxes"], target["labels"]):
                x_min, y_min, x_max, y_max = box.tolist()
                coco_annotations["annotations"].append({
                    "id": annotation_id,
                    "image_id": img_id,
                    "category_id": label.item(),
                    "bbox": [x_min, y_min, x_max - x_min, y_max - y_min],
                    "area": (x_max - x_min) * (y_max - y_min),
                    "iscrowd": 0
                })
                annotation_id += 1
            
            # Model predictions
            with torch.no_grad():
                outputs = self.model(img.unsqueeze(0))
            for box, score, label in zip(
                    outputs[0]['boxes'].cpu(), 
                    outputs[0]['scores'].cpu(), 
                    outputs[0]['labels'].cpu()):
                x_min, y_min, x_max, y_max = box.tolist()
                predictions.append({
                    "image_id": img_id,
                    "category_id": label.item(),
                    "bbox": [x_min, y_min, x_max - x_min, y_max - y_min],
                    "score": score.item()
                })
        
        # Temporary files (in memory)
        gt_file = os.path.join('/kaggle','working','coco_gt_annotations.json')
        pred_file = os.path.join('/kaggle','working','coco_predictions.json')
        
        with open(gt_file, "w") as f:
            json.dump(coco_annotations, f)
        
        with open(pred_file, "w") as f:
            json.dump(predictions, f)
        
        # Retrieving annotations using the COCO API
        coco_gt = COCO(gt_file)
        coco_dt = coco_gt.loadRes(pred_file)
        
        # Evaluation
        coco_eval = COCOeval(coco_gt, coco_dt, iouType='bbox')
        coco_eval.evaluate()
        coco_eval.accumulate()
        coco_eval.summarize()

    def save_onnx(self, file_name: str, trainer_config: trainer_config):
        """
        Export the trained model to ONNX format.

        Args:
            file_name (str): The output file name for the ONNX model.
            trainer_config: Configuration object for trainer settings.
        """
        # Load the best model if it exists
        if trainer_config.model_save_best:
            model_path = os.path.join(
                trainer_config.model_dir, f"{self.model.__class__.__name__}_best.pth"
            )
            if not os.path.exists(model_path):
                raise FileNotFoundError(f"Best model not found at {model_path}")
            self.model.load_state_dict(torch.load(model_path))
        
        # Prepare the model for export
        device = torch.device(trainer_config.device)
        self.model = self.model.to(device).eval()
        
        # Get a single input sample for export
        loader_input = torch.utils.data.DataLoader(
            dataset=self.dataset_valid,
            batch_size=1,
            shuffle=False,
            collate_fn=collate_fn,
            num_workers=0,
            pin_memory=True
        )
        inputs, _ = next(iter(loader_input))
        inputs = inputs.to(device)
        
        # Define ONNX export parameters
        dynamic_axes = {
            "input": {0: "batch_size"},
            "output_boxes": {0: "batch_size"},
            "output_scores": {0: "batch_size"},
            "output_labels": {0: "batch_size"},
        }
        
        # Export the model
        print(f"Exporting model to ONNX format: {file_name}")
        torch.onnx.export(
            self.model,
            inputs,
            file_name,
            export_params=True,
            opset_version=11,
            do_constant_folding=True,
            input_names=["input"],
            output_names=["output_boxes", "output_scores", "output_labels"],
            dynamic_axes=dynamic_axes
        )
        print(f"Model successfully exported to {file_name}")

##  <font style="color:blue">2.3. Run Experiment</font>

In [None]:
if __name__ == '__main__':
    dataloader_config, trainer_config = patch_configs(epoch_num_to_set=100, batch_size_to_set=2)

    dataset_config = configuration.DatasetConfig(
        root_dir="data",
    )
    
    optimizer_config = configuration.OptimizerConfig(
        learning_rate=5e-3, 
        lr_step_milestones=[50], 
        lr_gamma=0.1, 
        momentum=0.9, 
        weight_decay=1e-5
    )
    
    experiment = Experiment(
        dataset_config=dataset_config, 
        dataloader_config=dataloader_config, 
        optimizer_config=optimizer_config
    )
    
    # Run the experiment / start training
    experiment.run(trainer_config)

#  <font style="color:green">3. Inference [15 Points]</font> 



**You have to make predictions from your trained model on three images from the validation dataset.**



The plotted images should be similar to the following:



<img src='https://www.learnopencv.com/wp-content/uploads/2020/04/c3-w10-p1.png'>







<img src='https://www.learnopencv.com/wp-content/uploads/2020/04/c3-w10-p2.png'>







<img src='https://www.learnopencv.com/wp-content/uploads/2020/04/c3-w10-p3.png'>






In [None]:
if __name__ == '__main__':
    experiment.draw_bboxes(4, 1, trainer_config)

#  <font style="color:green">4. COCO Detection Evaluation [25 Points]</font> 



**You have to evaluate your detection model on COCO detection evaluation metric.**



For your reference here is the coco evaluation metric chart:





---



<img src="https://www.learnopencv.com/wp-content/uploads/2020/03/c3-w9-coco_metric.png">



---



#### <font style="color:red">The expected `AP` (primary challenge metric) is more than `0.5`.</font>



**The expected output should look similar to the following:**



```

 Average Precision  (AP) @[ IoU=0.50:0.95 | area=   all | maxDets=100 ] = 0.550

 Average Precision  (AP) @[ IoU=0.50      | area=   all | maxDets=100 ] = 0.886

 Average Precision  (AP) @[ IoU=0.75      | area=   all | maxDets=100 ] = 0.629

 Average Precision  (AP) @[ IoU=0.50:0.95 | area= small | maxDets=100 ] = 0.256

 Average Precision  (AP) @[ IoU=0.50:0.95 | area=medium | maxDets=100 ] = 0.653

 Average Precision  (AP) @[ IoU=0.50:0.95 | area= large | maxDets=100 ] = 0.627

 Average Recall     (AR) @[ IoU=0.50:0.95 | area=   all | maxDets=  1 ] = 0.504

 Average Recall     (AR) @[ IoU=0.50:0.95 | area=   all | maxDets= 10 ] = 0.629

 Average Recall     (AR) @[ IoU=0.50:0.95 | area=   all | maxDets=100 ] = 0.633

 Average Recall     (AR) @[ IoU=0.50:0.95 | area= small | maxDets=100 ] = 0.380

 Average Recall     (AR) @[ IoU=0.50:0.95 | area=medium | maxDets=100 ] = 0.722

 Average Recall     (AR) @[ IoU=0.50:0.95 | area= large | maxDets=100 ] = 0.704

```




In [None]:
if __name__ == '__main__':
    experiment.evaluate_coco(trainer_config=trainer_config)

# <font style="color:green">5. Run Inference on a Video [15 Points]</font>



#### [Download the Input Video](https://www.dropbox.com/s/g88o4dx18zpgn8k/projet3-input-video.mp4?dl=1)



**You have to run inference on a video.** 



You can download the video from [here](https://www.dropbox.com/s/g88o4dx18zpgn8k/projet3-input-video.mp4?dl=1).



#### <font style="color:red">Upload the output video on youtube and share the link. Do not upload the video in the lab.</font>

In [None]:
from IPython.display import YouTubeVideo, display

video = YouTubeVideo("18HWHCevFdU", width=640, height=360)

display(video)

**Your output video should have a bounding box around the vehicle registration plate.**

In [None]:
video = YouTubeVideo("5SgCuee7AMs", width=640, height=360)

display(video)

In [None]:
if __name__ == '__main__':
    video_input_path = os.path.join('/kaggle','input','opencv-evalution-alpr-dataset','cars_ALPR_test','videos','projet3-input-video.mp4')
    video_output_path = os.path.join('/kaggle', 'working', 'output_projet3_input_video.mp4')
    
    experiment.infer_video(video_path=video_input_path, output_path=video_output_path, trainer_config=trainer_config, conf_threshold=0.5)

# <font style="color:green">6. Save ONNX file</font>

In [None]:
if __name__ == '__main__':
    path_onnx_file = os.path.join('/kaggle', 'working','ALPR_license_plate_car.onnx')
    experiment.save_onnx(file_name=path_onnx_file, trainer_config=trainer_config)

In [None]:
def zip_folder_with_shutil(source_folder, output_path):
    '''Function for zip TensorBoard data'''
    shutil.make_archive(output_path, 'zip', source_folder)

In [None]:
zip_folder_with_shutil('/kaggle/working/faster_rcnn', '/kaggle/working/faster_rcnn')

In [None]:
zip_folder_with_shutil('/kaggle/working/modelscheckpoints', '/kaggle/working/modelscheckpoints')