In [1]:
import sys
sys.path.append('..')
from utils.conversion_helpers import yolo_to_bbox
from utils.evaluation_helper import ObjectDetectionEvaluator
import os
import cv2
import numpy as np
import torch
from torch.amp import autocast, GradScaler
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from torchvision.models.detection import fasterrcnn_resnet50_fpn_v2
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torch.utils.data import Subset
import random
from tqdm import tqdm
# with help of this tutorial: https://pytorch.org/tutorials/intermediate/torchvision_tutorial.html

In [2]:
class LicensePlateDataset(Dataset):
    def __init__(self, images_dir, annotations_dir, transform=None):
        self.images_dir = images_dir
        self.annotations_dir = annotations_dir
        self.transform = transform
        self.image_files = sorted([f for f in os.listdir(images_dir) if f.endswith('.jpg')])

    def __len__(self):
        return len(self.image_files)
    
    def __getitem__(self, idx):
        image_name = self.image_files[idx]
        image_path = os.path.join(self.images_dir, image_name)
        annotation_path = os.path.join(self.annotations_dir, image_name.replace('.jpg', '.txt'))

        image = cv2.imread(image_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        H, W, _ = image.shape
        boxes = yolo_to_bbox(annotation_path, W, H)
        
        #image, boxes = self.resize_and_pad(image, boxes, (640, 640)) -> gave me worse results than internal scaling
        image = transforms.ToTensor()(image)
        # Handle empty bounding boxes
        if len(boxes) == 0:
            boxes = torch.zeros((0, 4), dtype=torch.float32)  # No bounding boxes
            labels = torch.zeros(1, dtype=torch.int64)  # No labels
        else:
            boxes = torch.tensor(boxes, dtype=torch.float32)
            labels = torch.ones(len(boxes), dtype=torch.int64)
        target = {
            "boxes": boxes,
            "labels": labels
        }
        
        return image, target
    
    def resize_and_pad(self, image, boxes, target_size):
        # gave me worse results than internal scaling
        h, w, _ = image.shape
        scale = min(target_size[0] / w, target_size[1] / h)
        new_w, new_h = int(w * scale), int(h * scale)
        
        resized_image = cv2.resize(image, (new_w, new_h))
        
        pad_x = (target_size[0] - new_w) // 2
        pad_y = (target_size[1] - new_h) // 2
        
        padded_image = np.full((target_size[1], target_size[0], 3), 0, dtype=np.uint8)
        padded_image[pad_y:pad_y + new_h, pad_x:pad_x + new_w] = resized_image
        
        adjusted_boxes = []
        for box in boxes:
            x_min, y_min, x_max, y_max = box
            x_min = x_min * scale + pad_x
            y_min = y_min * scale + pad_y
            x_max = x_max * scale + pad_x
            y_max = y_max * scale + pad_y
            adjusted_boxes.append([x_min, y_min, x_max, y_max])
        
        return transforms.ToTensor()(padded_image), adjusted_boxes
    
    def __len__(self):
        return len(self.image_files)

In [3]:
# Training function
def train_one_epoch(model, optimizer, data_loader, device, epoch, scaler=None):
    model.train()
    
    lr_scheduler = None
    if epoch == 0:
        warmup_factor = 1.0 / 1000
        warmup_iters = min(1000, len(data_loader) - 1)
        lr_scheduler = torch.optim.lr_scheduler.LinearLR(
            optimizer, start_factor=warmup_factor, total_iters=warmup_iters
        )
    
    total_loss = 0
    for images, targets in tqdm(data_loader, total=len(data_loader), desc="Processing batches"):
        images = [image.to(device) for image in images]
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
        # Enable mixed precision -> helps with GPU Memory
        with autocast("cuda", enabled=scaler is not None):
            loss_dict = model(images, targets)
            losses = sum(loss for loss in loss_dict.values())
        
        optimizer.zero_grad()
        if scaler is not None:
            scaler.scale(losses).backward()
            scaler.step(optimizer)
            scaler.update()
        else:
            losses.backward()
            optimizer.step()
            
        if lr_scheduler is not None:
            lr_scheduler.step()
        
        total_loss += losses.item()
        
    return total_loss / len(data_loader)


@torch.inference_mode()
def fasterrcnn_predict(model, img, conf_threshold = 0.001):
    model.eval()
    if not isinstance(img, list):
        img = [img]

    with torch.inference_mode():
        outputs = model(img)

    confidences = []
    boxes = []
    
    for output in outputs:
        if "scores" in output and "boxes" in output:
            scores = output["scores"].cpu().numpy()
            bboxes = output["boxes"].cpu().numpy()
            
            valid_indices = scores > conf_threshold
            scores = scores[valid_indices]
            bboxes = bboxes[valid_indices]
    
            confidences.extend(scores.tolist())
            boxes.extend(bboxes.tolist())
    
    return confidences, boxes


@torch.inference_mode()
def evaluate(model, data_loader, device):
    model.eval()
    
    all_imges = []
    all_bboxes = []
    for images, targets in data_loader:
        all_imges.extend([img.to(device) for img in images])
        all_bboxes.extend([target["boxes"].cpu().numpy().tolist() for target in targets])
       
    evaluator = ObjectDetectionEvaluator(model, all_imges, all_bboxes, fasterrcnn_predict)
    metric_summary = evaluator.get_metric_summary(verbose=False)
    
    return metric_summary

In [4]:
def collate_fn(batch):
    return tuple(zip(*batch))

def get_subset(dataset, fraction=0.2):
    subset_size = int(len(dataset) * fraction)
    random.seed(1234)
    indices = random.sample(range(len(dataset)), subset_size)
    return Subset(dataset, indices)

# Train Dataset
images_dir = "../../data/merged/images/train"
annotations_dir = "../../data/merged/labels/train"
train_dataset = LicensePlateDataset(images_dir, annotations_dir)
#train_subset = get_subset(train_dataset, fraction=0.03) # for testing
train_loader = DataLoader(train_dataset, batch_size=5, shuffle=True, collate_fn=collate_fn)

# Validation Dataset
validation_images_dir = "../../data/merged/images/val"
validation_annotation_dir = "../../data/merged/labels/val"
val_dataset = LicensePlateDataset(validation_images_dir, validation_annotation_dir)
#val_subset = get_subset(val_dataset, fraction=0.2) # for testing
val_loader = DataLoader(val_dataset, batch_size=1, shuffle=False, collate_fn=collate_fn)

In [None]:
# use pretrained weights
model = fasterrcnn_resnet50_fpn_v2(weights="DEFAULT")
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes=2)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
print(f"Loaded Model fasterrcnn_resnet50_fpn_v2 to {device}")

# scaler helps me with fitting larger batches in the GPU memory, lowers the precision of floats
scaler = GradScaler("cuda") 
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.002, momentum=0.9, weight_decay=0.0001)
lr_scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=10)

Loaded Model fasterrcnn_resnet50_fpn_v2 to cuda


In [7]:
best_mAP50 = 0.0
best_mAP50_95 = 0.0
stopping_counter = 0
patience = 5
num_epochs = 10
start_epoch = 0

continue_training = True
if continue_training:
    checkpoint = torch.load("../../models/best_fastrcnn.pth")
    model.load_state_dict(checkpoint["model_state_dict"])
    optimizer.load_state_dict(checkpoint["optimizer_state_dict"])
    lr_scheduler.load_state_dict(checkpoint["lr_scheduler_state_dict"])
    start_epoch = checkpoint["epoch"] + 1
    print(f"Loaded Checkpoint! Start epoch = {start_epoch}")

Loaded Checkpoint! Start epoch = 0


In [6]:
for epoch in range(start_epoch, num_epochs):
    train_loss = train_one_epoch(model, optimizer, train_loader, device, epoch, scaler)
    print(f"Epoch {epoch}: Train Loss = {train_loss:.4f}")
    
    lr_scheduler.step()
    
    metrics = evaluate(model, val_loader, device=device) 
    mAP50 = metrics['mAP50']
    mAP50_95 = metrics['mAP50-95']
    print(f"Epoch {epoch} Validation: mAP50 = {mAP50:.4f}, mAP50-95 = {mAP50_95:.4f}")
    if mAP50 > best_mAP50 or mAP50_95 > best_mAP50_95:
        best_mAP50 = max(mAP50, best_mAP50)
        best_mAP50_95 = max(mAP50_95, best_mAP50_95)
        stopping_counter = 0
        torch.save({
            "epoch": epoch,
            "model_state_dict": model.state_dict(),
            "optimizer_state_dict": optimizer.state_dict(),
            "lr_scheduler_state_dict": lr_scheduler.state_dict(),
        }, f"../../models/best_fastrcnn.pth")
        print(f"Model saved at epoch {epoch}.")
    else:
        stopping_counter += 1
    
    if stopping_counter == patience:
        print("Early stopping triggered. Stopping training.")
        break
 

Processing batches: 100%|██████████| 5167/5167 [1:32:34<00:00,  1.07s/it]


Epoch 0: Train Loss = 0.0894


Processing images: 100%|██████████| 1132/1132 [28:28<00:00,  1.51s/it]


Epoch 0 Validation: mAP50 = 0.9146, mAP50-95 = 0.6440
Model saved at epoch 0.


In [None]:
evaluate(model, val_loader, device=device)