In [None]:
import os
import random
import shutil
import xml.etree.ElementTree as ET
from PIL import Image

import torch
import numpy as np
from torch import nn
from torch.utils.data import DataLoader, Dataset
from torchvision.transforms import ToTensor
from torchvision.models.detection import fasterrcnn_resnet50_fpn

In [None]:
def select_device():
    if torch.backends.mps.is_available():
        return torch.device("mps")
    elif torch.cuda.is_available():
        return torch.device("cuda")
    else:
        return torch.device("cpu")

In [None]:
import os
import shutil
import random

def split_xml_files(
    xml_folder,
    output_root,
    train_ratio=0.75,
    test_ratio=0.15,
    val_ratio=0.10,
    shuffle=False
):
    all_xmls = [f for f in os.listdir(xml_folder) if f.endswith('.xml')]

    if shuffle:
        random.shuffle(all_xmls)
    else:
        all_xmls = sorted(all_xmls)

    total = len(all_xmls)
    train_end = int(total * train_ratio)
    test_end  = int(total * (train_ratio + test_ratio))

    train_xmls = all_xmls[:train_end]
    test_xmls  = all_xmls[train_end:test_end]
    val_xmls   = all_xmls[test_end:]

    train_dir = os.path.join(output_root, 'train')
    test_dir  = os.path.join(output_root, 'test')
    val_dir   = os.path.join(output_root, 'val')
    for d in [train_dir, test_dir, val_dir]:
        os.makedirs(d, exist_ok=True)

    for x in train_xmls:
        shutil.copy2(os.path.join(xml_folder, x), os.path.join(train_dir, x))
    for x in test_xmls:
        shutil.copy2(os.path.join(xml_folder, x), os.path.join(test_dir, x))
    for x in val_xmls:
        shutil.copy2(os.path.join(xml_folder, x), os.path.join(val_dir, x))

    print(f"Total XML: {total}")
    print(f"Train: {len(train_xmls)}, Test: {len(test_xmls)}, Val: {len(val_xmls)}")


In [None]:
from torchvision.models.detection.faster_rcnn import FasterRCNN_ResNet50_FPN_Weights, FastRCNNPredictor

def build_model(model_type, num_classes=6):
    if model_type.lower() == "fasterrcnn":
        model = fasterrcnn_resnet50_fpn(weights=FasterRCNN_ResNet50_FPN_Weights.COCO_V1)
        in_features = model.roi_heads.box_predictor.cls_score.in_features
        model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
        return model
    else:
        raise ValueError(f"Unknown model_type: {model_type}")

In [None]:
CLASSES = [
    'background', 'car', 'big-truck', 'motorbike', 'truck', 'van',
]

class TrackingDataset(Dataset):
    def __init__(self, xml_folder, images_folder, transform=None):
        super().__init__()
        self.xml_folder = xml_folder
        self.images_folder = images_folder
        self.transform = transform

        self.xml_files = sorted([f for f in os.listdir(xml_folder) if f.endswith('.xml')])

    def __len__(self):
        return len(self.xml_files)

    def __getitem__(self, idx):
        xml_name = self.xml_files[idx]
        xml_path = os.path.join(self.xml_folder, xml_name)

        tree = ET.parse(xml_path)
        root = tree.getroot()

        filename = root.find('filename').text
        if filename is None:
            raise ValueError(f"No <filename> tag in XML {xml_name}")
        
        image_path = os.path.join(self.images_folder, filename)

        img = Image.open(image_path).convert('RGB')

        boxes = []
        labels = []

        for obj in root.findall('object'):
            cls_node = obj.find('class')
            if cls_node is not None and cls_node.text:
                class_name = cls_node.text.strip()
            else:
                label_id = CLASSES.index('car')
                

            label_id = CLASSES.index(class_name)

            bndbox = obj.find('bndbox')
            if bndbox is not None:
                xmin = float(bndbox.find('xmin').text)
                ymin = float(bndbox.find('ymin').text)
                xmax = float(bndbox.find('xmax').text)
                ymax = float(bndbox.find('ymax').text)
                boxes.append([xmin, ymin, xmax, ymax])
                labels.append(label_id)

        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.int64)

        target = {
            'boxes': boxes,
            'labels': labels
        }

        if self.transform:
            img = self.transform(img)
        else:
            img = ToTensor()(img)

        return img, target

In [None]:
import torch
from tqdm import tqdm

def train_one_epoch(model, optimizer, dataloader, device):
    model.train()
    epoch_loss = 0.0
    total_batches = len(dataloader)

    progress_bar = tqdm(dataloader, desc="Training", unit="batch")

    for batch_idx, (images, targets) in enumerate(progress_bar):
        images = [img.to(device) for img in images]
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())
        epoch_loss += losses.item()

        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

        progress_bar.set_postfix({
            "loss": f"{losses.item():.4f}",
            "batch": f"{batch_idx + 1}/{total_batches}"
        })

    avg_loss = epoch_loss / total_batches
    print(f"Finished epoch with average loss {avg_loss:.4f}")
    return avg_loss


In [None]:
from collections import defaultdict
from tqdm import tqdm

def evaluate_model(model, data_loader, device, iou_threshold=0.5):
    model.eval()
    model.to(device)

    all_predictions = defaultdict(list)
    all_gts = defaultdict(list)

    image_id = 0

    progress_bar = tqdm(data_loader, desc="Evaluating", unit="batch")

    with torch.no_grad():
        for batch_idx, (images, targets) in enumerate(progress_bar):
            images = [img.to(device) for img in images]
            for t in targets:
                t["boxes"]  = t["boxes"].to(device)
                t["labels"] = t["labels"].to(device)

            outputs = model(images)

            for out, tgt in zip(outputs, targets):
                pred_boxes  = out['boxes'].cpu().numpy()
                pred_scores = out['scores'].cpu().numpy()
                pred_labels = out['labels'].cpu().numpy()

                gt_boxes  = tgt['boxes'].cpu().numpy()
                gt_labels = tgt['labels'].cpu().numpy()

                for box, score, label in zip(pred_boxes, pred_scores, pred_labels):
                    all_predictions[label].append((image_id, box, score))

                for box, label in zip(gt_boxes, gt_labels):
                    all_gts[label].append((image_id, box))

                image_id += 1

    ap_per_class = {}
    all_classes = sorted(set(list(all_predictions.keys()) + list(all_gts.keys())))

    for cls in all_classes:
        predictions = all_predictions[cls]
        gts = all_gts[cls]

        if len(gts) == 0 and len(predictions) == 0:
            ap_per_class[cls] = 0.0
            continue

        predictions.sort(key=lambda x: x[2], reverse=True)

        gt_boxes_by_image = defaultdict(list)
        for (img_id, box) in gts:
            gt_boxes_by_image[img_id].append(box)
        total_gts = len(gts)

        matched = {}
        for img_id, boxes in gt_boxes_by_image.items():
            matched[img_id] = np.zeros(len(boxes), dtype=bool)

        tp = np.zeros(len(predictions), dtype=float)
        fp = np.zeros(len(predictions), dtype=float)

        for i, (img_id, pred_box, score) in enumerate(predictions):
            if img_id not in gt_boxes_by_image:
                fp[i] = 1.0
                continue

            gt_box_list = np.array(gt_boxes_by_image[img_id])
            ious = box_iou_numpy(pred_box, gt_box_list)
            max_iou_idx = np.argmax(ious)
            max_iou = ious[max_iou_idx]

            if max_iou >= iou_threshold and not matched[img_id][max_iou_idx]:
                tp[i] = 1.0
                matched[img_id][max_iou_idx] = True
            else:
                fp[i] = 1.0

        tp_cum = np.cumsum(tp)
        fp_cum = np.cumsum(fp)

        recalls = tp_cum / (total_gts + 1e-6)
        precisions = tp_cum / (tp_cum + fp_cum + 1e-6)

        ap_per_class[cls] = voc_ap(recalls, precisions)

    valid_classes = [c for c in all_classes if len(all_gts[c]) > 0]
    if len(valid_classes) > 0:
        mAP = np.mean([ap_per_class[c] for c in valid_classes])
    else:
        mAP = 0.0

    results = {}
    for c in all_classes:
        results[f'AP_class_{c}'] = ap_per_class[c]
    results['mAP'] = mAP

    return results


def box_iou_numpy(box, boxes):
    x1, y1, x2, y2 = box
    box_area = (x2 - x1 + 1) * (y2 - y1 + 1)

    xx1 = np.maximum(x1, boxes[:, 0])
    yy1 = np.maximum(y1, boxes[:, 1])
    xx2 = np.minimum(x2, boxes[:, 2])
    yy2 = np.minimum(y2, boxes[:, 3])

    inter_w = np.maximum(0, xx2 - xx1 + 1)
    inter_h = np.maximum(0, yy2 - yy1 + 1)
    intersection = inter_w * inter_h

    boxes_area = (boxes[:, 2] - boxes[:, 0] + 1) * (boxes[:, 3] - boxes[:, 1] + 1)
    union = box_area + boxes_area - intersection
    iou = intersection / (union + 1e-6)
    return iou


def voc_ap(recalls, precisions):
    mrec = np.concatenate(([0.], recalls, [1.]))
    mpre = np.concatenate(([0.], precisions, [0.]))

    for i in range(len(mpre) - 1, 0, -1):
        mpre[i-1] = max(mpre[i-1], mpre[i])

    idxs = np.where(mrec[1:] != mrec[:-1])[0]

    ap = 0.0
    for i in idxs:
        ap += (mrec[i+1] - mrec[i]) * mpre[i+1]

    return ap


In [None]:
import numpy as np
import torch

def train_model_with_early_stopping(
    model,
    optimizer,
    train_loader,
    test_loader,
    device,
    num_epochs=10,
    patience=3
):
    best_metric = -np.inf
    best_model_state = None
    epochs_no_improve = 0

    model.to(device)

    for epoch in range(num_epochs):
        train_loss = train_one_epoch(model, optimizer, train_loader, device)

        val_results = evaluate_model(model, test_loader, device)
        val_mAP = val_results.get('mAP', 0.0)

        print(f"\n[Epoch {epoch+1}/{num_epochs}]")
        print(f"Train loss: {train_loss:.4f}")
        class_keys = [k for k in val_results.keys() if k.startswith("AP_class_")]
        if class_keys:
            print("Validation per-class AP:")
            for cls_key in sorted(class_keys):
                print(f"  {cls_key}: {val_results[cls_key]:.4f}")
        print(f"Validation mAP: {val_mAP:.4f}")

        if val_mAP > best_metric:
            best_metric = val_mAP
            best_model_state = model.state_dict()
            epochs_no_improve = 0
        else:
            epochs_no_improve += 1

        if epochs_no_improve >= patience:
            print("Early stopping triggered.")
            break

    if best_model_state is not None:
        model.load_state_dict(best_model_state)

    return model, best_metric


In [None]:
labels_folder = "filtered_labels"
output_split_folder = "labels_split"
os.makedirs(output_split_folder, exist_ok=True)


split_xml_files(
    xml_folder=labels_folder,
    output_root=output_split_folder,
    train_ratio=0.75,
    test_ratio=0.15,
    val_ratio=0.10,
    shuffle=False
)

images_folder = "images"

train_xml_dir = os.path.join(output_split_folder, 'train')
test_xml_dir  = os.path.join(output_split_folder, 'test')
val_xml_dir   = os.path.join(output_split_folder, 'val')

train_dataset = TrackingDataset(train_xml_dir, images_folder)
test_dataset  = TrackingDataset(test_xml_dir,  images_folder)
val_dataset   = TrackingDataset(val_xml_dir,   images_folder)

train_loader = DataLoader(train_dataset, batch_size=4, shuffle=False, collate_fn=lambda x: list(zip(*x)), num_workers=0)
test_loader  = DataLoader(test_dataset,  batch_size=4, shuffle=False, collate_fn=lambda x: list(zip(*x)), num_workers=0)
val_loader   = DataLoader(val_dataset,   batch_size=1, shuffle=False, collate_fn=lambda x: list(zip(*x)))

device = select_device()
print(f"Using device: {device}")

In [None]:
model_types    = ["fasterrcnn"]
optim_types    = ["adam"]
learning_rates = [1e-4]

num_epochs = 10
patience   = 3
num_classes = len(CLASSES)

best_overall_metric = -np.inf
best_model_path = None

for m_type in model_types:
    for opt_type in optim_types:
        for lr in learning_rates:
            model = build_model(m_type, num_classes=num_classes)
            model.to(device)

            if opt_type.lower() == "sgd":
                optimizer = torch.optim.SGD(model.parameters(), lr=lr, momentum=0.9)
            elif opt_type.lower() == "adam":
                optimizer = torch.optim.Adam(model.parameters(), lr=lr)
            else:
                raise ValueError(f"Unknown optimizer: {opt_type}")
            
            print(f"\n===== Training {m_type} with {opt_type}, LR={lr} =====")
            trained_model, best_val_metric = train_model_with_early_stopping(
                model,
                optimizer,
                train_loader,
                test_loader,
                device,
                num_epochs=num_epochs,
                patience=patience
            )

            model_folder = "trained_models"
            os.makedirs(model_folder, exist_ok=True)
            model_name = f"{m_type}_{opt_type}_{lr}".replace(".", "_")+".pth"
            save_path = os.path.join(model_folder, model_name)
            torch.save(trained_model.state_dict(), save_path)
            print(f"Model saved to {save_path}, best validation metric = {best_val_metric}")

            if best_val_metric > best_overall_metric:
                best_overall_metric = best_val_metric
                best_model_path = save_path

print("\nAll training done.")
print(f"Best overall metric = {best_overall_metric}, from model {best_model_path}")

In [None]:
import os
import matplotlib.pyplot as plt
import matplotlib.patches as patches

CLASSES = [
    'background', 'car', 'big-truck', 'motorbike', 'truck', 'van',
]

output_dir = "performance"
os.makedirs(output_dir, exist_ok=True)

m = build_model("fasterrcnn")
m.load_state_dict(torch.load('fasterrcnn_checkpoint.pth', map_location='mps'))
m.eval()
m.to(device)

score_threshold = 0.9

for batch_idx, (images, targets) in enumerate(val_loader):
    images = [img.to(device) for img in images]

    with torch.no_grad():
        outputs = m(images)

    img_tensor = images[0]
    target = targets[0]
    output = outputs[0]

    img_np = img_tensor.permute(1, 2, 0).cpu().numpy()

    fig, ax = plt.subplots(1, figsize=(8, 6))
    ax.imshow(img_np)

    gt_boxes = target['boxes']
    gt_labels = target['labels']

    for i in range(len(gt_boxes)):
        box = gt_boxes[i].cpu().numpy()
        cls_id = gt_labels[i].item()
        cls_name = CLASSES[cls_id] if cls_id < len(CLASSES) else f"label_{cls_id}"

        rect = patches.Rectangle(
            (box[0], box[1]), box[2] - box[0], box[3] - box[1],
            linewidth=1, edgecolor='red', facecolor='none'
        )
        ax.add_patch(rect)

        
        ax.text(
            box[0], box[1] - 10, cls_name,
            fontsize=6, color='red',
            horizontalalignment='left', verticalalignment='bottom',
            bbox=dict(facecolor='none', edgecolor='none')
        )


    pred_boxes = output['boxes']
    pred_labels = output['labels']
    pred_scores = output['scores']


    num_pred_total = len(pred_boxes)
    high_conf_mask = pred_scores >= score_threshold
    num_pred_above_thresh = high_conf_mask.sum().item()

    pred_boxes_thresh = pred_boxes[high_conf_mask]
    pred_labels_thresh = pred_labels[high_conf_mask]
    pred_scores_thresh = pred_scores[high_conf_mask]

    for i in range(len(pred_boxes_thresh)):
        box = pred_boxes_thresh[i].cpu().numpy()
        cls_id = pred_labels_thresh[i].item()
        cls_name = CLASSES[cls_id] if cls_id < len(CLASSES) else f"label_{cls_id}"
        score_val = float(pred_scores_thresh[i].item())

        rect = patches.Rectangle(
            (box[0], box[1]), box[2] - box[0], box[3] - box[1],
            linewidth=1, edgecolor='blue', facecolor='none'
        )
        ax.add_patch(rect)

        ax.text(
            box[2], box[1] - 10, f"{cls_name} {score_val:.2f}",
            fontsize=6, color='blue',
            horizontalalignment='right', verticalalignment='bottom',
            bbox=dict(facecolor='none', edgecolor='none')
        )

    num_gt = len(gt_boxes)
    ax.set_title(
        f"Validation Sample {batch_idx}\n"
        f"#GT={num_gt}, #Predicted (all)={num_pred_total}, #Predicted (score>={score_threshold})={num_pred_above_thresh}",
        fontsize=10
    )

    output_path = os.path.join(output_dir, f"sample_{batch_idx}.png")
    plt.savefig(output_path, dpi=300, bbox_inches="tight")
    plt.close(fig)

    print(
        f"Image {batch_idx}: "
        f"#GT={num_gt}, #Predicted (all)={num_pred_total}, "
        f"#Predicted (score>={score_threshold})={num_pred_above_thresh}"
    )


In [None]:
from ultralytics import YOLO

model = YOLO('./yolov8x.pt') 

results = model.train(
    data='./yolo_config.yaml',
    epochs=100,
    imgsz=640,
    batch=8,
    save_period=3,
    device="mps" 
)