# **PPE DETECTION WITH MULTIPLE MODELS** *texto en cursiva*
In this notebook, different deep learning models will be trained in order to achieve the detection of different personal protection equipment as well as some heavy machinery vehicles.

Object detection includes categorising and locating objects from diverse categories or classes in an image.

In this case, the different object classes to distinguish are:
- mask
- helmet
- vest
- boots
- gloves
- glasses
- ear protection
- human or person
- bulldozer
- dumb truck
- excavator
- road roller
- wheel loader
- background, empty site or null



## 2. Batching the data

In [None]:
# {   1   }
import os
import torch
import string
from collections import defaultdict
import shutil
import pandas as pd
import albumentations as A
import matplotlib.pyplot as plt
import numpy as np
import random
from PIL import Image
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils
import torch
from torchvision.utils import draw_bounding_boxes


class PPEsDataset(Dataset):
    # Personal Protection Equipment Dataset Class for defining different datasets

  def __init__(self, csv_file, root_dir, augmentation_method=None, transform=None):
    self.PPE_frame = pd.read_csv(csv_file)
    self.root_dir = root_dir
    self.transform = transform

    # Save all the JPGs to check if they are present in the CSV file
    all_files = set()
    for root, dirs, files in os.walk(root_dir):
      all_files.update(files)

    # Preprocess annotations to avoid scanning the CSV every time
    self.annotations = defaultdict(list)
    for _, row in self.PPE_frame.iterrows():
      image_name = row[0]
      if image_name in all_files: # First check if the image is present in CSV file
        bbox = row[4:8].values.astype(np.float32)
        label = row[3]
        self.annotations[image_name].append((bbox, label))

    # Store list of unique image names that have annotations
    all_image_names = self.PPE_frame.iloc[:, 0].unique()
    self.image_list = [img for img in all_image_names if img in self.annotations]

    # Category to integer mapping
    self.category_map = {
        'background': 0,
        'vest': 1,
        'helmet': 2,
        'gloves': 3,
        'glasses': 4,
        'mask': 5,
        'boots': 6,
        'ear_protection': 7,
        'human': 8,
        'bulldozer': 9,
        'dump_truck': 10,
        'excavator': 11,
        'road_roller': 12,
        'wheel_loader': 13
    }

  def __len__(self): # Return CSV length
    return len(self.image_list)

  def __countCategory__(self): # Number of elements from each class
    accElements = defaultdict(int) # Dictionary with default values
    for _, row in self.PPE_frame.iterrows(): # For all the rows in the CSV file
      category = row[3]  # Read category
      accElements[category] += 1  # Increment the category counter
    # Turn into a regular dictionary
    accElements = dict(accElements)
    return(accElements)

  def __getitem__(self, idx):
    if torch.is_tensor(idx):
        idx = idx.tolist()  # Turn to a regular expresion

    img_name = self.image_list[idx]
    image_path = os.path.join(self.root_dir, img_name) # Full image path

    # Load image and convert to RGB
    image = Image.open(image_path).convert("RGB")
    image_np = np.array(image)

    annotations = self.annotations[img_name] # Extract labels
    bboxes = np.array([ann[0] for ann in annotations], dtype=np.float32) # Bounding boxes coordinates in array form
    categories = [ann[1] for ann in annotations] # Labels

    if self.transform: # Apply resize transform
        transformed = self.transform(image=image_np, bboxes=bboxes, category=categories)
        image_np = transformed['image']
        bboxes = transformed['bboxes']
        categories = transformed['category']

    # Convert image to tensor and normalize
    image_tensor = torch.tensor(image_np).permute(2, 0, 1).float() / 255.0 #Permute channels for correct visualization

    numerical_categories = [self.category_map[cat] for cat in categories] # Convert category names to integers
    categories_tensor = torch.tensor(numerical_categories, dtype=torch.int64) # From integer to tensor
    bboxes_tensor = torch.tensor(bboxes, dtype=torch.float32) # Turn bboxes to integers also

    image_id = torch.tensor([idx])     # Use integer ID for image_id (COCO format requires integer)

    target = { # Full annotations object
        'boxes': bboxes_tensor,
        'labels': categories_tensor,
        'image_id': image_id,
    } 

    return image_tensor, target # Return image and annotations

  def showDatasetImage(self, image, bboxes, categories):
    index_to_category = {
        0: 'background', 1: 'vest', 2: 'helmet', 3: 'gloves', 4: 'glasses', 5: 'mask',
        6: 'boots', 7: 'ear_protection', 8: 'human', 9: 'bulldozer',
        10: 'dump_truck', 11: 'excavator', 12: 'road_roller',
        13: 'wheel_loader'
    }
    labels_text = [index_to_category[int(idx)] for idx in categories]  # Ensure categories are ints
    image_tensor=image.as_subclass(torch.Tensor) # Get the underlying tensor and rescale
    bboxes_tensor = torch.tensor(bboxes, dtype=torch.float32) # Convert bounding boxes to tensors
    image_to_show=(image_tensor*255).type(torch.uint8) # Reorder channels for visualization (width, height, image)
    image_with_boxes = draw_bounding_boxes(image_to_show, bboxes_tensor, labels=labels_text, colors="red", width=2) # Draw image with boxes
    plt.imshow(image_with_boxes.permute(1, 2, 0))  # Reorder channels for visualization (width, height, image)
    plt.title('Dataset image')
    plt.axis('off')
    plt.show()
    print("Categories:", labels_text)


  def Visualizator(self, index=None):
    if index is None: # If there is no index specified
        index = random.randint(0, len(self.image_list) - 1) # Use a random one
    image, target = self[index] # Calls get item method
    boxes = target['boxes']  # Should be tensor (N,4)
    labels = target['labels']  # Should be tensor (N,)
    self.showDatasetImage(image, boxes, labels)

# Resize transform
transformResize=A.Compose([A.Resize(height=640,width=640)], bbox_params=A.BboxParams(format='pascal_voc', label_fields=['category']))

def collate_func_train(batch): # For train dataloader
    images, targets = zip(*batch)
    new_targets = [{'boxes': t['boxes'], 'labels': t['labels']} for t in targets] #Turns labels into lists
    return list(images), new_targets # Turn images into a list to get .item function

def collate_func_test(batch):
    images, targets = zip(*batch)
    return list(images), list(targets) # Turn images and labels into a listo to get .item function

# Define an element of the defined class for our dataset
FinalDataset=PPEsDataset(csv_file="/kaggle/input/ppe-and-heavy-machinery-detection/FinalDataset/final_dataset_normalized.csv",
                          root_dir="/kaggle/input/ppe-and-heavy-machinery-detection/FinalDataset",
                         transform=None)

FinalDataset.Visualizator() # Visualize an image from the set
FinalDataset.__countCategory__() # Nº of instances per class




The dataset can be split into training and validation sets

In [None]:
# {   2   }
from collections import defaultdict
import random
import torch
from torch.utils.data import Subset

def subset_split(dataset, valid_size=1000, seed=42):
    #  Construct a map image → class
    image_to_classes = {}
    class_to_images = defaultdict(set)

    for idx, img_name in enumerate(dataset.image_list): # Iterate over all images
        anns = dataset.annotations[img_name] # Get image labels
        class_labels = set([label for _, label in anns])
        image_to_classes[img_name] = class_labels # For an image, get it´s labels
        for label in class_labels:
            class_to_images[label].add(img_name) # For a label add image

    # Shuffle all the images
    all_images = list(image_to_classes.keys())
    random.seed(seed)
    random.shuffle(all_images) # Mix all the images

    selected_valid = set()
    class_counts = defaultdict(int)

    # Select validation images until you have the number of images you want
    for img in all_images:
        if len(selected_valid) >= valid_size:
            break
        if img in selected_valid: # Skip already included images
            continue
        selected_valid.add(img) # Add element to validation
        for label in image_to_classes[img]: # Keep a count of labels in validation set
            class_counts[label] += 1

    # Split in training and validation set
    val_indices = [i for i, img in enumerate(dataset.image_list) if img in selected_valid]
    train_indices = [i for i, img in enumerate(dataset.image_list) if img not in selected_valid]

    # Create the subsets
    dataset_train = Subset(dataset, train_indices)
    dataset_test = Subset(dataset, val_indices)

    return dataset_train, dataset_test

def count_classes_in_subset(dataset, indices): # Function to count classes in each set
    accElements = defaultdict(int)
    for idx in indices: # Go over all images
        img_name = dataset.image_list[idx]
        annotations = dataset.annotations[img_name]
        for _, category in annotations: # Check category 
            accElements[category] += 1 # Add one corresponding unit
    return dict(accElements)

dataset_train, dataset_test = subset_split(FinalDataset, valid_size=5000, seed=42)
train_files = set([dataset.image_list[i] for i in dataset_train.indices])
test_files = set([dataset.image_list[i] for i in dataset_test.indices])
counts_train = count_classes_in_subset(dataset_train.dataset, dataset_train.indices)
counts_test = count_classes_in_subset(dataset_test.dataset, dataset_test.indices)
print(counts_train)
print(counts_test)

Now that we have our dataset loaded, we can use Pytorch DataLoader to extract mini-batches from the set. Also, data will be shuffled and processed.

In [None]:
# {   3   }
dataloader_train = DataLoader(dataset_train, batch_size=8, shuffle=True, num_workers=2, collate_fn=collate_func_train, pin_memory=True)

batch = next(iter(dataloader_train))  # Iterate over the dataloader
images, targets = batch  # Unpack the images and targets


# If you need the batch size (number of images)
batch_size = len(images)

index_to_category = { # map int→string for draw_bounding_boxes
    0: 'background',
    1: 'vest',
    2: 'helmet',
    3: 'gloves',
    4: 'glasses',
    5: 'mask',
    6: 'boots',
    7: 'ear_protection',
    8: 'human',
    9: 'bulldozer',
    10: 'dump_truck',
    11: 'excavator',
    12: 'road_roller',
    13: 'wheel_loader'
}

if batch_size >= 9:
    num_to_display = 9
else:
    num_to_display = batch_size

for i in range(num_to_display):
    # targets[i] is already a dictionary with 'boxes' and 'labels'
    boxes = targets[i]['boxes']
    labels = targets[i]['labels']
    image_to_show = images[i] * 255 # Image rescale

    # Check if boxes is not empty before stacking
    boxes = boxes if boxes.numel() > 0 else torch.empty(0, 4)
    labels = labels if labels.numel() > 0 else torch.empty(0, dtype=torch.int64)

    label_texts = [index_to_category[int(idx)] for idx in labels]

    image_with_boxes = draw_bounding_boxes(image_to_show.type(torch.uint8), boxes, labels=label_texts, colors="red", width=2)

    print(f'Categories {label_texts}')
    plt.subplot(3, 3, i + 1)
    plt.imshow(image_with_boxes.permute(1, 2, 0))
    plt.title(f'Batch image {i}')
    plt.axis('off')

plt.show()


In [None]:
# {   4   }
dataloader_test = DataLoader(dataset_test, batch_size=8, shuffle=True, num_workers=2, collate_fn=collate_func_test, pin_memory=True)

batch = next(iter(dataloader_test))  # Iterate over the dataloader
images, targets = batch  # Unpack the images and targets


# If you need the batch size (number of images)
batch_size = len(images)

index_to_category = { # map int→string for draw_bounding_boxes
    0: 'background',
    1: 'vest',
    2: 'helmet',
    3: 'gloves',
    4: 'glasses',
    5: 'mask',
    6: 'boots',
    7: 'ear_protection',
    8: 'human',
    9: 'bulldozer',
    10: 'dump_truck',
    11: 'excavator',
    12: 'road_roller',
    13: 'wheel_loader'
}

if batch_size >= 9:
    num_to_display = 9
else:
    num_to_display = batch_size

for i in range(num_to_display):
    # targets[i] is already a dictionary with 'boxes' and 'labels'
    boxes = targets[i]['boxes']
    labels = targets[i]['labels']
    image_to_show = images[i] * 255 # Rescale

    # Check if boxes is not empty before stacking
    boxes = boxes if boxes.numel() > 0 else torch.empty(0, 4)
    labels = labels if labels.numel() > 0 else torch.empty(0, dtype=torch.int64)

    label_texts = [index_to_category[int(idx)] for idx in labels]

    image_with_boxes = draw_bounding_boxes(image_to_show.type(torch.uint8), boxes, labels=label_texts, colors="red", width=2)

    print(f'Categories {label_texts}')
    plt.subplot(3, 3, i + 1)
    plt.imshow(image_with_boxes.permute(1, 2, 0))
    plt.title(f'Batch image {i}')
    plt.axis('off')

plt.show()

Now that the available data can be accessed in batches, the different models can be trained

# 3. Training the models
For the academic purpose of this project, the models that will be used are:


*    Faster R-CNN
*    FCOS
*    RetinaNet






## 3.1 Faster R-CNN [2]
This kind of approach can be divided into two steps:


1.   Proposing regions where objects might be located
2.   Proposing which class the object from that region is part of



 A new backbone has to be added. The backbone of a Faster R-CNN model is the CNN which extracts important features from the images and configures a feature map. "*mobilenet_v2*" will be used as bakcbone. Anchors, which represent features in the feature map will be generated, also, the cropping of the interest regions in the feature map is configured. All this is put together into the final model, for the part of object detection

In [None]:
# {   5   }
import torchvision
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.rpn import AnchorGenerator

# Redefine the number of classes, 14 in this case
n_classes=14

# load a pre-trained model for classification
backbone = torchvision.models.mobilenet_v2(weights="DEFAULT").features
# ``FasterRCNN`` needs to know the number of
# output channels in a backbone. For mobilenet_v2, it's 1280
# so we need to add it here
backbone.out_channels = 1280

# let's make the RPN generate 5 x 3 anchors per spatial
# location, with 5 different sizes and 3 different aspect
# ratios. We have a Tuple[Tuple[int]] because each feature
# map could potentially have different sizes and
# aspect ratios
anchor_generator = AnchorGenerator(
    sizes=((32, 64, 128, 256, 512),),
    aspect_ratios=((0.5, 1.0, 2.0),)
)

# let's define what are the feature maps that we will
# use to perform the region of interest cropping, as well as
# the size of the crop after rescaling.
# if your backbone returns a Tensor, featmap_names is expected to
# be [0]. More generally, the backbone should return an
# ``OrderedDict[Tensor]``, and in ``featmap_names`` you can choose which
# feature maps to use.
roi_pooler = torchvision.ops.MultiScaleRoIAlign(
    featmap_names=['0'],
    output_size=7,
    sampling_ratio=2
)

# put the pieces together inside a Faster-RCNN model
model = FasterRCNN(
    backbone,
    num_classes=n_classes,
    rpn_anchor_generator=anchor_generator,
    box_roi_pool=roi_pooler
)

Everything is ready for training the model. But first, some helper functions to simplify training and evaluating detection models will be downloaded to make the work easier

In [None]:
# {   6   }
os.system("wget https://raw.githubusercontent.com/pytorch/vision/main/references/detection/engine.py")
os.system("wget https://raw.githubusercontent.com/pytorch/vision/main/references/detection/utils.py")

We will define a training and evaluation function

In [None]:
!pip install torchmetrics


In [None]:
# {   7   }
import math
import sys
import time
import torch
from torchmetrics.detection.mean_ap import MeanAveragePrecision
from sklearn.metrics import f1_score, precision_recall_curve, confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt
import utils

# Training function
def train_one_epoch(model, optimizer, data_loader, device, epoch, print_freq, scaler=None):
    model.train() # Train mode
    metric_logger = utils.MetricLogger(delimiter="  ")
    metric_logger.add_meter("lr", utils.SmoothedValue(window_size=1, fmt="{value:.6f}"))
    header = f"Epoch: [{epoch}]"

    # To store loss values per batch
    loss_history = []

    lr_scheduler = None
    if epoch == 0:
        warmup_factor = 1.0 / 1000 # Warm up value for learning rate
        warmup_iters = min(1000, len(data_loader) - 1)

        lr_scheduler = torch.optim.lr_scheduler.LinearLR( # Linear learning rate warm up
            optimizer, start_factor=warmup_factor, total_iters=warmup_iters
        )

    for step, (images, targets) in enumerate(metric_logger.log_every(data_loader, print_freq, header)): # For every image and labels in set (loaded from dataloader)
        images = [image.to(device) for image in images] # Send images to GPU
        targets = [{k: v.to(device) if isinstance(v, torch.Tensor) else v for k, v in t.items()} for t in targets] # Send labels to GPU

        with torch.cuda.amp.autocast(enabled=scaler is not None): # Mixed precision training
            loss_dict = model(images, targets) # Calculate losses (classifiaction, regression)
            losses = sum(loss for loss in loss_dict.values()) # Total loss

        # Reduce losses across all GPUs if using distributed training
        loss_dict_reduced = utils.reduce_dict(loss_dict)
        losses_reduced = sum(loss for loss in loss_dict_reduced.values())

        loss_value = losses_reduced.item()

        if not math.isfinite(loss_value): # Verify if loss is a finite number
            print(f"Loss is {loss_value}, stopping training")
            print(loss_dict_reduced)
            sys.exit(1)

        optimizer.zero_grad() # Prepare to calculate parameters
        if scaler is not None:
            scaler.scale(losses).backward() # Bakcpropagation
            scaler.step(optimizer)
            scaler.update() # Update weights
        else: # For no scaler (not the case in this project)
            losses.backward()
            optimizer.step()

        if lr_scheduler is not None:
            lr_scheduler.step() # Update learning rate

        metric_logger.update(loss=losses_reduced, **loss_dict_reduced)
        metric_logger.update(lr=optimizer.param_groups[0]["lr"])

        # Save per-batch loss data
        loss_history.append({
            "epoch": epoch,
            "step": step,
            "loss": loss_value,
            "loss_classifier": loss_dict_reduced["loss_classifier"].item(),
            "loss_box_reg": loss_dict_reduced["loss_box_reg"].item(),
            "loss_objectness": loss_dict_reduced["loss_objectness"].item(),
            "loss_rpn_box_reg": loss_dict_reduced["loss_rpn_box_reg"].item()
        })

    # Save the loss history to a CSV file
    loss_dir = "/kaggle/working/loss_evo/frcnn"
    if not os.path.exists(loss_dir):
      os.makedirs(loss_dir)
    df = pd.DataFrame(loss_history)
    df.to_csv(os.path.join(loss_dir, f"loss_epoch_{epoch}.csv"), index=False)

# Evaluation function
def evaluate(model, dataloader, num_classes, score_threshold=0.5):
    model.eval()

    metric_map = MeanAveragePrecision(iou_type="bbox")

    all_preds = []
    all_targs = []

    with torch.no_grad():
        for images, targets in dataloader:
            images = [img.to(device) for img in images] # Send images to GPU
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets] # Send labels to GPU

            outputs = model(images) # Calculate predictions

            batch_preds = []
            batch_targs = []

            for out, tgt in zip(outputs, targets): # For all predictions and labels (groundtruths)
                batch_preds.append({
                    "boxes": out["boxes"].detach().cpu(),
                    "scores": out["scores"].detach().cpu(),
                    "labels": out["labels"].detach().cpu()
                })
                batch_targs.append({
                    "boxes": tgt["boxes"].detach().cpu(),
                    "labels": tgt["labels"].detach().cpu()
                })

            metric_map.update(batch_preds, batch_targs) # Store predictions and ground truths

            all_preds.extend(batch_preds)
            all_targs.extend(batch_targs)

    map_result = metric_map.compute() # Calculate mAP from accumulated predictions and GTs
    print("mAP results:", map_result)

    pr_scores = []
    pr_tp_flags = []

    matched_true_filtered = []
    matched_pred_filtered = []

    for out, tgt in zip(all_preds, all_targs): #For all predictions and all targets
        pred_boxes = out["boxes"] # Predicted bboxes
        pred_labels = out["labels"] # Predicted labels
        pred_scores = out["scores"] # Prediction score
        gt_boxes = tgt["boxes"] # Ground truth bboxes
        gt_labels = tgt["labels"] # Ground truth labels

        if len(gt_boxes) == 0: # Image has no labels
            # No GT: all preds are false positives for PR
            for i in range(len(pred_boxes)):
                pr_tp_flags.append(0)
                pr_scores.append(pred_scores[i].item())
            # For F1/confusion: filter by score and mark all as FP with background GT
            for i in range(len(pred_boxes)):
                if pred_scores[i] >= score_threshold:
                    matched_true_filtered.append(0)  # Background class
                    matched_pred_filtered.append(pred_labels[i].item())
            continue

        if len(pred_boxes) == 0: # No predictions (background)
            # No predictions: all GT are false negatives for F1/confusion
            for j in range(len(gt_boxes)):
                matched_true_filtered.append(gt_labels[j].item())
                matched_pred_filtered.append(0)  # Background prediction 
            # No preds for PR curve here
            continue

        ious = torchvision.ops.box_iou(pred_boxes, gt_boxes) #Calculate IoU for each pair of prediction-ground-truth

        gt_matched_pr = torch.zeros(len(gt_boxes))  # For PR matching (all preds)
        gt_matched_f1 = torch.zeros(len(gt_boxes))  # For F1/confusion matching (filtered preds)

        # reorder scores in descendent order
        sorted_indices = torch.argsort(pred_scores, descending=True)
        pred_boxes = pred_boxes[sorted_indices]
        pred_labels = pred_labels[sorted_indices]
        pred_scores = pred_scores[sorted_indices]
        ious = ious[sorted_indices]
        
        for i in range(len(pred_boxes)): # For all predictions
            iou_row = ious[i] # Get the IoU  for the prediction with each GT
            max_iou, max_j = iou_row.max(0) # Get the best match according to IoU

            # Precision-Recall curve calculation (all preds, no score filtering)
            if max_iou >= 0.3: # The IoU surpasses 0.3?
                if pred_labels[i] == gt_labels[max_j] and gt_matched_pr[max_j] == 0: # Box isn't macthed and class is correct
                    pr_tp_flags.append(1) #TP 
                    gt_matched_pr[max_j] = 1 # GT matched
                else:
                    pr_tp_flags.append(0) #FP (GT not matched)
            else:
                pr_tp_flags.append(0) #FP (GT not matched)
            
            pr_scores.append(pred_scores[i].item()) # Get prediction score

            # F1 and confusion matrix calculation (filtered by score)
            if pred_scores[i] >= score_threshold: # Score > 0.5
                if max_iou>=0.3: # Match by box location
                    if pred_labels[i] == gt_labels[max_j] and gt_matched_f1[max_j] == 0: #TP
                        matched_true_filtered.append(gt_labels[max_j].item()) # GT
                        matched_pred_filtered.append(pred_labels[i].item()) # Predicted class
                        gt_matched_f1[max_j] = 1  # Mark GT matched for F1/confusion
                    elif gt_matched_f1[max_j] == 0 :  #FP for wrong class
                        # Localization match but wrong class (classification FP)
                        matched_true_filtered.append(gt_labels[max_j].item()) # GT 
                        matched_pred_filtered.append(pred_labels[i].item())# Predicted class
                    elif pred_labels[i] == gt_labels[max_j]: # FP for already paired GT (overprediction)
                        matched_true_filtered.append(0) # Background 
                        matched_pred_filtered.append(pred_labels[i].item()) # Predicted class
                else:
                    # No localization match (localization FP)
                    matched_true_filtered.append(0) # Background
                    matched_pred_filtered.append(pred_labels[i].item()) # Predicted class

        # Add False Negatives (GT not matched in F1/confusion)
        for j in range(len(gt_boxes)):
            if gt_matched_f1[j] == 0: # Unpaired GT
                matched_true_filtered.append(gt_labels[j].item()) # GT
                matched_pred_filtered.append(0) # Bakcground predicted

    labels_range = list(range(num_classes))

    f1_per_class = f1_score(matched_true_filtered, matched_pred_filtered, labels=labels_range, average=None, zero_division=0) # Calculate F1 per class
    f1_macro = f1_score(matched_true_filtered, matched_pred_filtered, labels=labels_range, average='macro', zero_division=0) # General F1
    precision, recall, thresholds = precision_recall_curve(pr_tp_flags, pr_scores) # Calculate precision and recall

    print("F1-score per class:", f1_per_class)
    print("Macro F1-score:", f1_macro)

    class_map = {
        'background': 0,
        'vest': 1,
        'helmet': 2,
        'gloves': 3,
        'glasses': 4,
        'mask': 5,
        'boots': 6,
        'ear_protection': 7,
        'human': 8,
        'bulldozer': 9,
        'dump_truck': 10,
        'excavator': 11,
        'road_roller': 12,
        'wheel_loader': 13
    }
    class_names = [k for k, v in sorted(class_map.items(), key=lambda item: item[1])]

    cm = confusion_matrix(matched_true_filtered, matched_pred_filtered, labels=labels_range) # Confusion matrix

    # Plot F1 score per class
    plt.figure(figsize=(10, 6))
    plt.bar(class_names, f1_per_class, color='steelblue')
    plt.xlabel("Classes")
    plt.ylabel("F1 Score")
    plt.title("F1 Score per Class")
    plt.ylim(0, 1)
    plt.grid(axis='y', linestyle='--', alpha=0.6)
    plt.xticks(rotation=45, ha='right')
    plt.tight_layout()
    plt.show()

    # Plot Precision-Recall curve
    plt.figure(figsize=(8, 6))
    plt.plot(recall, precision, label="Precision-Recall Curve")
    plt.xlabel("Recall")
    plt.ylabel("Precision")
    plt.title("Precision vs Recall Curve (Object Detection)")
    plt.grid()
    plt.legend()
    plt.show()

    # Plot Confusion Matrix
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', xticklabels=class_names, yticklabels=class_names, cmap="Blues")
    plt.xlabel("Predicted Class")
    plt.ylabel("True Class")
    plt.title("Confusion Matrix")
    plt.xticks(rotation=45)
    plt.yticks(rotation=0)
    plt.tight_layout()
    plt.show()

    return map_result["map"].item()


A main function is written to train the model and evaluate every now and then

In [None]:
os.environ['PYDEVD_DISABLE_FILE_VALIDATION'] = '1'

In [None]:
# {   8   }
#from engine import train_one_epoch, evaluate
import torch.nn as nn
from torch.cuda.amp import  GradScaler

# train on the GPU or on the CPU, if a GPU is not available
if torch.cuda.is_available():
  print("GPU available")
  device = torch.device('cuda')
else:
  print("GPU not available... Using CPU instead")
  device = torch.device('cpu')

# Move model to GPU or CPU
model.to(device) 

# Use a scaler for mixed precision training
scaler = GradScaler()

# Set the optimizer (stochastic gradient descent)
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)

# Learning rate scheduler, modifies dinamically the lr throughout the training
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

# Number of training epocs
num_epochs=10
best_map=None
n_epochs_stop=1 # If there are no improvements in this amount of epochs, stop the training
evaluate_between=False

for epoch in range(num_epochs):
    # train for one epoch, printing every 10 iterations
    train_one_epoch(model, optimizer, dataloader_train, device, epoch, print_freq=10, scaler=scaler)
    # update the learning rate
    lr_scheduler.step()
    if evaluate_between==True: # Choose wether to train inside the training loop or not
      if (epoch+1) % 2 == 0 or epoch == num_epochs - 1: # Evaluate at epochs: 1,3,5,7...
          # evaluate on the test dataset
          current_map = evaluate(model, dataloader_test, n_classes)  # returns mAP, F1, recall-precision curve and confusion matrix
          if best_map is None or current_map > best_map: # If results imporve
              best_map = current_map
              epochs_no_improve = 0
          else:
              epochs_no_improve += 1
          if epochs_no_improve >= n_epochs_stop: # If too many epochs without improvement
              print(f"Early stopping in epoch {epoch} mAP is not improving")
              break


# Save the fine tuned Faster R-CNN model
# Create directory if it doesn't exist
models_dir = "/kaggle/working/trained_models"
if not os.path.exists(models_dir):
    os.makedirs(models_dir)

# Full path with filename to save the model
model_path = os.path.join(models_dir, "fasterrcnn_model.pth")

# Save the model state dictionary to the specified path
torch.save(model.state_dict(), model_path)

print(f"Model saved at {model_path}")

We can observe the loss evolution throughout the training

In [None]:
# {   9   }
import os
import pandas as pd
import matplotlib.pyplot as plt # gegbg


# Path to the folder containing CSVs
loss_dir = "/kaggle/working/loss_evo/frcnn"
output_dir = os.path.join(loss_dir, "loss_plots")

# Create the output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)

# List and sort the CSV files by epoch number
csv_files = sorted(
    [f for f in os.listdir(loss_dir) if f.startswith("loss_epoch_") and f.endswith(".csv")],
    key=lambda x: int(x.split("_")[-1].split(".")[0])
)

# Concatenate all DataFrames, keeping a global step index
all_dfs = []
global_step = 0

for file in csv_files:
    df = pd.read_csv(os.path.join(loss_dir, file))
    df["global_step"] = df.index + global_step
    global_step = df["global_step"].iloc[-1] + 1  # update global step count
    all_dfs.append(df)

# Merge all epochs into one DataFrame
full_df = pd.concat(all_dfs, ignore_index=True)

# List of loss components to plot
loss_components = ["loss", "loss_classifier", "loss_box_reg", "loss_objectness", "loss_rpn_box_reg"]
titles = {
    "loss": "Total Loss",
    "loss_classifier": "Classifier Loss",
    "loss_box_reg": "Box Regression Loss",
    "loss_objectness": "Objectness Loss",
    "loss_rpn_box_reg": "RPN Box Regression Loss"
}

# Plot each loss on a separate figure and save it
for loss_name in loss_components:
    plt.figure(figsize=(12, 4))
    plt.plot(full_df["global_step"], full_df[loss_name], label=titles[loss_name], linewidth=2)
    plt.xlabel("Global Step (accumulated batch index)")
    plt.ylabel("Loss")
    plt.title(f"Evolution of {titles[loss_name]}")
    plt.grid(True)
    plt.legend()
    plt.tight_layout()

    # Save figure
    output_path = os.path.join(output_dir, f"{loss_name}_evolution.png")
    plt.savefig(output_path, dpi=300)
    plt.show()


Now that the model has been trained it is time to test it' average precision

In [None]:
# {   10   }
# Evaluation of the Faster R-CNN model
import torch

# Use GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load pretrained model
state_dict=torch.load("/kaggle/working/trained_models/fasterrcnn_model.pth")
model.load_state_dict(state_dict)

# Evaluate the model
mAP_map=evaluate(model, dataloader_test, num_classes=n_classes)


Let's try the model with a real picture

In [None]:
# {   11  }
import os
import random
import cv2
import torch
from PIL import Image
import torchvision.transforms as T
from google.colab.patches import cv2_imshow
import numpy as np

device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # Use GPU

img_dir = "/kaggle/input/ppe-and-heavy-machinery-detection/FinalDataset"
imgs = [f for f in os.listdir(img_dir) if f.lower().endswith(('.jpg', '.jpeg', '.png'))]

img_name = random.choice(imgs) # Get random image
img_path = os.path.join(img_dir, img_name) # Get full path to image

img = Image.open(img_path).convert("RGB") # Opne image in RGB format

transform = T.Compose([ # Rescale
    T.Resize((640, 640)),
    T.ToTensor(),
])

img_tensor = transform(img).unsqueeze(0).to(device) # Turn into tensro and apply resize

model.eval() # Evaluation mode
with torch.no_grad():
    preds = model(img_tensor) # predict objects in image

boxes = preds[0]['boxes'].cpu().numpy()
labels = preds[0]['labels'].cpu().numpy()
scores = preds[0]['scores'].cpu().numpy()

class_names = ['background','vest', 'helmet', 'gloves', 'glasses', 'mask', 'boots',
               'ear_protection', 'human', 'bulldozer', 'dump_truck',
               'excavator', 'road_roller', 'wheel_loader']

img_cv = cv2.cvtColor(np.array(img.resize((640, 640))), cv2.COLOR_RGB2BGR)

score_threshold = 0.5  # Filter low confidence predictions
detected_classes = set()

for box, label, score in zip(boxes, labels, scores):
    if score < score_threshold: # Filter predictions with low score
        continue
    detected_classes.add(class_names[label]) # Add label
    x1, y1, x2, y2 = box.astype(int) # Bounding box
    cv2.rectangle(img_cv, (x1, y1), (x2, y2), (0, 255, 0), 2) # Draw recatngle with bounding box coordinate
    text = f"{class_names[label]}: {score:.2f}"  # Label text 
    cv2.putText(img_cv, text, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, # Box format (green color)
                0.5, (0, 255, 0), 2)

print("Clases detectadas:", ", ".join(sorted(detected_classes)))

cv2_imshow(img_cv)



# YOLO
The next model we wil use is YOLO. The labels of the images now need to be in JSON format, therefore we need to make a format change.
YOLO expects a YAML file, indicating paths to directories where images and labels in txt format are stored. The txt files will contain class_index x_center y_center width height. The name of the txt must match the corresponding image file name

In [None]:
# {   12   }
import pandas as pd
import os
from PIL import Image
import random

def convert_csv_to_yolo(csv_path, images_dir, output_dir, train_files, test_files):
    # Category map
    category_map = {
        'background': 0,
        'vest': 1,
        'helmet': 2,
        'gloves': 3,
        'glasses': 4,
        'mask': 5,
        'boots': 6,
        'ear_protection': 7,
        'human': 8,
        'bulldozer': 9,
        'dump_truck': 10,
        'excavator': 11,
        'road_roller': 12,
        'wheel_loader': 13,
    }

    # Load annotations from CSV
    df = pd.read_csv(csv_path)

    # Create YOLO output directories
    train_dir = os.path.join(output_dir, "labels/train")
    test_dir = os.path.join(output_dir, "labels/test")
    os.makedirs(train_dir, exist_ok=True)
    os.makedirs(test_dir, exist_ok=True)

    # Inicializar contadores
    train_class_counts = Counter()
    test_class_counts = Counter()

    all_files = train_files.union(test_files)

    for filename in all_files:
        image_path = os.path.join(images_dir, filename)

        if not os.path.exists(image_path):
            print(f"Image not found -> {filename}, ignored.")
            continue

        subset = "train" if filename in train_files else "test"
        yolo_lines = []

        for _, row in df[df['filename'] == filename].iterrows():
            xmin, ymin, xmax, ymax = row['xmin'], row['ymin'], row['xmax'], row['ymax']
            category = row['class']
            width, height = row['width'], row['height']

            if category not in category_map:
                print(f"Warning: unknown label '{category}' en {filename}")
                continue

            class_id = category_map[category]

            if subset == "train":
                train_class_counts[category] += 1
            else:
                test_class_counts[category] += 1

            x_center = ((xmin + xmax) / 2) / width
            y_center = ((ymin + ymax) / 2) / height
            w = (xmax - xmin) / width
            h = (ymax - ymin) / height

            yolo_lines.append(f"{class_id} {x_center:.6f} {y_center:.6f} {w:.6f} {h:.6f}")

        # Save .txt file
        txt_filename = os.path.splitext(filename)[0] + ".txt"
        subdir = train_dir if subset == "train" else test_dir
        txt_path = os.path.join(subdir, txt_filename)

        with open(txt_path, 'w') as f:
            f.write("\n".join(yolo_lines))

In [None]:
# {   13   }
convert_csv_to_yolo(csv_path="/kaggle/input/ppe-and-heavy-machinery-detection/FinalDataset/final_dataset_normalized.csv",
                    images_dir="/kaggle/input/ppe-and-heavy-machinery-detection/FinalDataset/",
                    output_dir="/kaggle/working/YOLO/",
                    train_files=train_files, test_files=test_files)

Let's move also the images to the YOLO directory

In [None]:
# {   14   }
import os
import shutil

# Paths - change these as needed
original_images_dir = "/kaggle/input/ppe-and-heavy-machinery-detection/FinalDataset"
train_txt_dir = "/kaggle/working/YOLO/labels/train/"
test_txt_dir = "/kaggle/working/YOLO/labels/test/"
target_images_train_dir = "/kaggle/working/YOLO/images/train/"
target_images_test_dir = "/kaggle/working/YOLO/images/test/"

# Create target directories if they don't exist
os.makedirs(target_images_train_dir, exist_ok=True)
os.makedirs(target_images_test_dir, exist_ok=True)

# Valid image extensions (optional filter)
valid_extensions = {'.jpg', '.jpeg', '.png'}

# Iterate through all files in the original images directory
for img_file in os.listdir(original_images_dir):
    # Check if the file is an image
    ext = os.path.splitext(img_file)[1].lower()
    if ext not in valid_extensions:
        continue  # Skip non-image files

    # Derive the base filename without extension to find corresponding .txt
    base_name = os.path.splitext(img_file)[0]
    train_txt_path = os.path.join(train_txt_dir, base_name + ".txt")
    test_txt_path = os.path.join(test_txt_dir, base_name + ".txt")

    # Determine where the annotation file exists and copy the image accordingly
    if os.path.exists(train_txt_path):
        shutil.copy(os.path.join(original_images_dir, img_file), target_images_train_dir)
        print(f"Copied {img_file} to train images folder.")
    elif os.path.exists(test_txt_path):
        shutil.copy(os.path.join(original_images_dir, img_file), target_images_test_dir)
        print(f"Copied {img_file} to test images folder.")
    else:
        # If no corresponding txt annotation found, skip or handle as needed
        print(f"No annotation found for {img_file}, skipping.")


A YAML file needs o be conformed in order to train the YOLO model. Once all the preprocessing is done, we must train a specific YOLO model to train. YOLOv8 seems to be the most academic approach. Before training we will install the needed modules

In [None]:
pip install ultralytics

In [None]:

pip install clearml

In [None]:
# {   15   }
from ultralytics import YOLO
from ultralytics.yolo.engine.callback import Callback

class BatchLossLogger(Callback):
    def __init__(self, log_file="batch_loss_log.csv"):
        super().__init__()
        self.log_file = log_file
        # First line of loss file
        with open(self.log_file, 'w') as f:
            f.write("epoch,step,loss_total,loss_obj,loss_cls,loss_box\n")
    
    def on_train_batch_end(self, trainer, batch, outputs, batch_loss, **kwargs):
        # This method is called after every training step (batch).
        # trainer: contains training state
        # batch: data from the current batch.
        # outputs: model loss.
        # batch_loss: batch loss.

        # Obtain training state info
        epoch = trainer.epoch  # current epoch
        step = trainer.iter    # batch counter

        # Total loss
        loss_total = batch_loss.item() if isinstance(batch_loss, torch.Tensor) else batch_loss

        # Extract sub losses.
        loss_obj = outputs.get("loss_obj", 0.0) if isinstance(outputs, dict) else 0.0
        loss_cls = outputs.get("loss_cls", 0.0) if isinstance(outputs, dict) else 0.0
        loss_box = outputs.get("loss_box", 0.0) if isinstance(outputs, dict) else 0.0

        # Save to CSV file
        with open(self.log_file, 'a') as f:
            f.write(f"{epoch},{step},{loss_total},{loss_obj},{loss_cls},{loss_box}\n")

# Load the model
model=YOLO('yolov8n.pt')

# Callback to store loss throughout the training
callbacks = [BatchLossLogger(log_file="/kaggle/working/batch_loss_log.csv")]

# Train the model
results=model.train(data='/kaggle/input/YOLO_format.yaml', imgsz=640, epochs=10, batch=8, name='yolov8_model_train', callbacks=callbacks)

We can plot the training loss evolution

In [None]:
# {   16   }
import pandas as pd
import matplotlib.pyplot as plt 

# Load the CSV log file
log_file = "/kaggle/working/batch_loss_log.csv"  # Asegúrate de que esta ruta es la correcta
df = pd.read_csv(log_file)

# Plot loss_box (regression loss)
plt.figure(figsize=(10, 4))
plt.plot(df['step'], df['loss_box'], label='Box Regression Loss')
plt.xlabel('Global Step (accumulated batch index)')
plt.ylabel('Loss')
plt.title('Evolution of Box Regression Loss')
plt.grid(True)
plt.tight_layout()
plt.show()

# Plot loss_cls (classification loss)
plt.figure(figsize=(10, 4))
plt.plot(df['step'], df['loss_cls'], label='Classification Loss')
plt.xlabel('Global Step (accumulated batch index)')
plt.ylabel('Loss')
plt.title('Evolution of Classifier Loss')
plt.grid(True)
plt.tight_layout()
plt.show()

# Plot total loss
plt.figure(figsize=(10, 4))
plt.plot(df['step'], df['loss_total'], label='Total Loss')
plt.xlabel('Global Step (accumulated batch index)')
plt.ylabel('Loss')
plt.title('Evolution of Total Loss')
plt.grid(True)
plt.tight_layout()
plt.show()

Let`s try the model with some actual images

In [None]:
# {   17   }
import cv2
from matplotlib import pyplot as plt
from ultralytics import YOLO
import os, random
model = YOLO('/kaggle/working/runs/detect/yolov8_model_20/weights/best.pt')
dir='/kaggle/working/YOLO/images/test'
results = model.predict(source=os.path.join(dir,random.choice(os.listdir(dir))), save=False)

for r in results:
    im_array = r.plot()  # Image with boxes
    im_rgb = cv2.cvtColor(im_array, cv2.COLOR_BGR2RGB)
    plt.imshow(im_rgb)
    plt.axis('off')
    plt.show()

# RetinaNet
For this final model, we count with a very special characteristic, which is Focal Loss, that allows us to act against class unbalance during training [5]

In [None]:
# {   18   }
from functools import partial
import torchvision
from torchvision.models.detection.retinanet import RetinaNet
from torchvision.models.detection.retinanet import RetinaNetClassificationHead
from torchvision.models.detection.retinanet import RetinaNet_ResNet50_FPN_V2_Weights

# Redefine the number of classes, 15 in this case
n_classes=14

# Constructs Retina-Net model with ResNet-50-FPN backbone and pretrained weights
model=torchvision.models.detection.retinanet_resnet50_fpn_v2(weights=RetinaNet_ResNet50_FPN_V2_Weights.COCO_V1)

# Define number of anchors. For ResNet-50FPN it's 9 (3 anchor sizes x 3 aspect ratios)
num_anchors = model.head.classification_head.num_anchors

# Edit the classifiaction head for our purpose
model.head.classification_head=RetinaNetClassificationHead(
    in_channels=256, # Input channels into classification head, 256 follows the RetinaNet architecture
    num_anchors=num_anchors,
    num_classes=n_classes,
    norm_layer=partial(torch.nn.GroupNorm, 32) # Normalization layer after each convolution
)

Most of the previous training function can be reused to train the RetinaNet model. The only difference is that now, there is no region proposal network, as RetinaNet is a one stage detection method. Therfore, we must eliminate the parts where the loss associated to this feature is calculated

In [None]:
# {   19   }
import math
import sys
import time

import utils


def train_one_epoch_RetinaNet(model, optimizer, data_loader, device, epoch, print_freq, scaler=None):
    model.train() # Train mode
    metric_logger = utils.MetricLogger(delimiter="  ")
    metric_logger.add_meter("lr", utils.SmoothedValue(window_size=1, fmt="{value:.6f}"))
    header = f"Epoch: [{epoch}]"

    # To store loss values per batch
    loss_history = []

    lr_scheduler = None
    if epoch == 0:
        warmup_factor = 1.0 / 1000
        warmup_iters = min(1000, len(data_loader) - 1)

        lr_scheduler = torch.optim.lr_scheduler.LinearLR(
            optimizer, start_factor=warmup_factor, total_iters=warmup_iters # Linear learning rate warm up
        )

    for step, (images, targets) in enumerate(metric_logger.log_every(data_loader, print_freq, header)): # For each image and it´s labels
        images = [image.to(device) for image in images] # Send images to GPU
        targets = [{k: v.to(device) if isinstance(v, torch.Tensor) else v for k, v in t.items()} for t in targets] # Send labels to GPU

        with torch.cuda.amp.autocast(enabled=scaler is not None): # Mixed precision training
            loss_dict = model(images, targets) # Calculate losses made by the model
            losses = sum(loss for loss in loss_dict.values()) # Total loss

        # Reduce losses across all GPUs if using distributed training
        loss_dict_reduced = utils.reduce_dict(loss_dict)
        losses_reduced = sum(loss for loss in loss_dict_reduced.values())

        loss_value = losses_reduced.item()

        if not math.isfinite(loss_value): # Check if loss is a finite value
            print(f"Loss is {loss_value}, stopping training")
            print(loss_dict_reduced)
            sys.exit(1)

        optimizer.zero_grad() # Prepare to calculate parameters
        if scaler is not None:
            scaler.scale(losses).backward() # Backpropagation 
            scaler.step(optimizer)
            scaler.update() # Update weights
        else:
            losses.backward()
            optimizer.step()

        if lr_scheduler is not None:
            lr_scheduler.step()

        metric_logger.update(loss=losses_reduced, **loss_dict_reduced)
        metric_logger.update(lr=optimizer.param_groups[0]["lr"])

        # Save per-batch loss data
        loss_history.append({
          "epoch": epoch,
          "step": step,
          "loss": loss_value,
          "classification": loss_dict_reduced.get("classification", torch.tensor(0.0)).item(),
          "bbox_regression": loss_dict_reduced.get("bbox_regression", torch.tensor(0.0)).item(),
        })

    # Save the loss history to a CSV file
    loss_dir = "/kaggle/working/loss_evo/frcnn"
    if not os.path.exists(loss_dir):
      os.makedirs(loss_dir)
    df = pd.DataFrame(loss_history)
    df.to_csv(os.path.join(loss_dir, f"loss_epoch_{epoch}.csv"), index=False)


Again, a main loop for the training is coded

In [None]:
# {   20   }
#from engine import train_one_epoch, evaluate
import torch.nn as nn
from torch.cuda.amp import  GradScaler

# train on the GPU or on the CPU, if a GPU is not available
if torch.cuda.is_available():
  print("GPU available")
  device = torch.device('cuda')
else:
  print("GPU not available... Using CPU instead")
  device = torch.device('cpu')

# Move model to GPU or CPU
model.to(device)

# Use a scaler for mixed precision training
scaler = GradScaler()

# Set the optimizer (stochastic gradient descent)
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)

# Learning rate scheduler, modifies dinamically the lr throughout the training
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.25)

# Number of training epocs
num_epochs=20
best_map=None
n_epochs_stop=1 # If there are no improvements in this amount of epochs, stop the training
evaluate_between=False

for epoch in range(num_epochs):
    # train for one epoch, printing every 10 iterations
    train_one_epoch_RetinaNet(model, optimizer, dataloader_train, device, epoch, print_freq=10, scaler=scaler)
    # update the learning rate
    lr_scheduler.step()
    if evaluate_between==True: # Choose wether to train inside the training loop or not
      if (epoch+1) % 2 == 0 or epoch == num_epochs - 1: # Evaluate in epochs 1,3,5,7,9...
          # evaluate on the test dataset
          current_map = evaluate(model, dataloader_test, n_classes)  # returns mAP, F1 score, recall vs precision curve, confusion matrix
          if best_map is None or current_map > best_map: # If evaluation results show improvement
              best_map = current_map
              epochs_no_improve = 0
          else:
              epochs_no_improve += 1  # If not, count epochs without improving
          if epochs_no_improve >= n_epochs_stop: # Too many epochs without improving
              print(f"Early stopping in epoch {epoch} mAP is not improving")
              break


# Save the fine tuned Faster R-CNN model
# Create directory if it doesn't exist
models_dir = "/kaggle/working/trained_models"
if not os.path.exists(models_dir):
    os.makedirs(models_dir)

# Full path with filename to save the model
model_path = os.path.join(models_dir, "RetinaNet_model.pth")

# Save the model state dictionary to the specified path
torch.save(model.state_dict(), model_path)

print(f"Model saved at {model_path}")

Let´s observe the training evolution

In [None]:
# {   21   }
import os
import pandas as pd
import matplotlib.pyplot as plt 
loss_dir = "/kaggle/working/loss_evo/frcnn"  # Aquí está la carpeta donde se guardan los CSV
output_dir = os.path.join(loss_dir, "loss_plots")
os.makedirs(output_dir, exist_ok=True)

csv_files = sorted(
    [f for f in os.listdir(loss_dir) if f.startswith("loss_epoch_") and f.endswith(".csv")],
    key=lambda x: int(x.split("_")[-1].split(".")[0])
) # Get CSV file

all_dfs = []
global_step = 0

for file in csv_files:
    df = pd.read_csv(os.path.join(loss_dir, file))
    df["global_step"] = df.index + global_step
    global_step = df["global_step"].iloc[-1] + 1
    all_dfs.append(df)

full_df = pd.concat(all_dfs, ignore_index=True) # Full dataframe

# Loss components
loss_components = ["loss", "classification", "bbox_regression"]

titles = {
    "loss": "Total Loss",
    "classification": "Classification Loss",
    "bbox_regression": "Bounding Box Regression Loss"
}

for loss_name in loss_components:
    if loss_name not in full_df.columns:
        print(f"Warning: {loss_name} not found in DataFrame columns")
        continue
    plt.figure(figsize=(12, 4))
    plt.plot(full_df["global_step"], full_df[loss_name], label=titles[loss_name], linewidth=2)
    plt.xlabel("Global Step (accumulated batch index)")
    plt.ylabel("Loss")
    plt.title(f"Evolution of {titles[loss_name]}")
    plt.grid(True)
    plt.legend()
    plt.tight_layout()

    output_path = os.path.join(output_dir, f"{loss_name}_evolution.png")
    plt.savefig(output_path, dpi=300)
    plt.show()



In [None]:
# {   22   }
# Evaluation of the Faster R-CNN modelll
import torch

# Use GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load pretrained model
state_dict=torch.load("/kaggle/working/trained_models/RetinaNet_model.pth")
model.load_state_dict(state_dict)

# Evaluate the model
mAP_map=evaluate(model, dataloader_test, num_classes=n_classes)
