In [None]:

import os
import sys
import time
import shutil
import yaml
import glob
import random
import numpy as np
import pandas as pd
import cv2
import matplotlib.pyplot as plt
import torch
import torchvision

# Install specific versions for stability if not present
try:
    import albumentations as A
    from albumentations.pytorch import ToTensorV2
    from torchmetrics.detection.mean_ap import MeanAveragePrecision
except ImportError:
    print("Installing dependencies...")
    os.system("pip install -q albumentations==1.4.0 torchmetrics")
    import albumentations as A
    from albumentations.pytorch import ToTensorV2
    from torchmetrics.detection.mean_ap import MeanAveragePrecision

from torch.utils.data import Dataset, DataLoader
from torchvision.models.detection import fasterrcnn_mobilenet_v3_large_fpn
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor, FasterRCNN_MobileNet_V3_Large_FPN_Weights
from google.colab import drive

# Mount Google Drive
if not os.path.exists('/content/drive'):
    drive.mount('/content/drive')

# Ensure Deterministic Behavior (Reproducibility)
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False

set_seed(42)

# Configuration
BASE_SAVE_DIR = "/content/drive/MyDrive/Model/FasterRCNN_Large"
DRIVE_YAML_PATH = "/content/drive/MyDrive/Dataset/FINAL_YOLO_SPLIT/dataset.yaml"
LOCAL_DATA_DIR = "/content/local_dataset"
DEVICE = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

print(f"Computation Device: {DEVICE}")
os.makedirs(BASE_SAVE_DIR, exist_ok=True)

In [None]:

if not os.path.exists(LOCAL_DATA_DIR):
    print(f"Copying dataset from Drive to {LOCAL_DATA_DIR}...")
    try:
        drive_data_dir = os.path.dirname(DRIVE_YAML_PATH)
        shutil.copytree(drive_data_dir, LOCAL_DATA_DIR)
        print("Dataset copy complete.")
    except Exception as e:
        print(f"Error copying dataset: {e}")
else:
    print(f"Local dataset found at {LOCAL_DATA_DIR}. Skipping copy.")

# Class Configuration
# Mapping: YOLO ID (from .txt) -> Model ID
# YOLO: 0=Brain (Ignore), 1=CSP, 2=LV
# Model: 0=Background, 1=CSP, 2=LV
CLASS_NAMES = ['CSP', 'LV']
NUM_CLASSES = len(CLASS_NAMES) + 1
TARGET_MAPPING = {1: 1, 2: 2}

print(f"Classes: {CLASS_NAMES}")

In [None]:
class YOLODataset(Dataset):
    def __init__(self, root_dir, split='train', transforms=None, mapping=None):
        self.root_dir = root_dir
        self.split = split
        self.transforms = transforms
        self.target_mapping = mapping if mapping else {1: 1, 2: 2}

        self.img_dir = os.path.join(root_dir, split, 'images')
        self.label_dir = os.path.join(root_dir, split, 'labels')

        # Load all valid image files
        self.img_files = sorted(glob.glob(os.path.join(self.img_dir, "*.jpg")) +
                                glob.glob(os.path.join(self.img_dir, "*.png")))

    def __len__(self):
        return len(self.img_files)

    def __getitem__(self, idx):
        img_path = self.img_files[idx]
        file_name = os.path.basename(img_path)
        label_file = os.path.splitext(file_name)[0] + ".txt"
        label_path = os.path.join(self.label_dir, label_file)

        image = cv2.imread(img_path)
        if image is None:
            return self.__getitem__((idx + 1) % len(self.img_files))

        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        h, w, _ = image.shape

        boxes = []
        labels = []

        # Parse YOLO Labels
        if os.path.exists(label_path):
            with open(label_path, 'r') as f:
                lines = f.readlines()

            for line in lines:
                parts = list(map(float, line.strip().split()))
                cls_id_raw = int(parts[0])

                if cls_id_raw in self.target_mapping:
                    final_cls_id = self.target_mapping[cls_id_raw]
                    x_c, y_c, bw, bh = parts[1], parts[2], parts[3], parts[4]

                    # Convert xywh (normalized) to xyxy (absolute)
                    x_min = (x_c - bw / 2) * w
                    y_min = (y_c - bh / 2) * h
                    x_max = (x_c + bw / 2) * w
                    y_max = (y_c + bh / 2) * h

                    # Clip to image boundaries
                    x_min = max(0, x_min)
                    y_min = max(0, y_min)
                    x_max = min(w, x_max)
                    y_max = min(h, y_max)

                    if x_max > x_min and y_max > y_min:
                        boxes.append([x_min, y_min, x_max, y_max])
                        labels.append(final_cls_id)

        # Convert to Tensor
        if len(boxes) > 0:
            boxes = torch.as_tensor(boxes, dtype=torch.float32)
            labels = torch.as_tensor(labels, dtype=torch.int64)
        else:
            boxes = torch.zeros((0, 4), dtype=torch.float32)
            labels = torch.zeros((0,), dtype=torch.int64)

        # Apply Transforms
        if self.transforms:
            try:
                transformed = self.transforms(image=image, bboxes=boxes, labels=labels)
                image = transformed['image']
                boxes = torch.as_tensor(transformed['bboxes'], dtype=torch.float32)
                labels = torch.as_tensor(transformed['labels'], dtype=torch.int64)
            except Exception:
                # If augmentation fails (rare geometry error), return original
                image = ToTensorV2()(image=image)["image"]

        else:
            image = ToTensorV2()(image=image)["image"]

        # Final check for empty boxes after transform
        if len(boxes) == 0:
             boxes = torch.zeros((0, 4), dtype=torch.float32)
             labels = torch.zeros((0,), dtype=torch.int64)

        target = {}
        target["boxes"] = boxes
        target["labels"] = labels
        target["image_id"] = torch.tensor([idx])

        # Calculate Area (required for COCO evaluation)
        if len(boxes) > 0:
            target["area"] = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])
            target["iscrowd"] = torch.zeros((len(boxes),), dtype=torch.int64)
        else:
            target["area"] = torch.as_tensor([], dtype=torch.float32)
            target["iscrowd"] = torch.as_tensor([], dtype=torch.int64)

        # Normalize Image (0-255 to 0-1)
        if isinstance(image, torch.Tensor):
            if image.dtype == torch.uint8:
                image = image.float() / 255.0

        return image, target

def collate_fn(batch):
    return tuple(zip(*batch))

def get_transforms(condition='Raw'):
    """
    Returns Albumentations composition.
    Raw: Resize + ToTensor
    Tuned: Resize + Geometric Augmentations + ToTensor
    """
    bbox_params = A.BboxParams(format='pascal_voc', label_fields=['labels'], min_visibility=0.1)

    base_ops = [
        A.Resize(height=640, width=640),
        ToTensorV2()
    ]

    if condition == 'Tuned':
        aug_ops = [
            A.Affine(scale=(0.6, 1.4), translate_percent=(0, 0.2), rotate=(-45, 45), shear=(-5, 5), p=1.0),
            A.HorizontalFlip(p=0.5),
            A.Perspective(scale=(0.05, 0.1), p=0.5)
        ]
        return A.Compose(aug_ops + base_ops, bbox_params=bbox_params)
    else:
        return A.Compose(base_ops, bbox_params=bbox_params)

In [None]:
def get_model_mobilenetv3_large(num_classes):
    weights = FasterRCNN_MobileNet_V3_Large_FPN_Weights.DEFAULT
    model = fasterrcnn_mobilenet_v3_large_fpn(weights=weights)

    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

    return model

if __name__ == "__main__":
    model_test = get_model_mobilenetv3_large(NUM_CLASSES)
    print(f"Model Architecture Ready: Faster R-CNN MobileNetV3-Large")

# Evaluate

In [None]:
def evaluate_map_complete(model, dataloader, device):
    """Calculates mAP 50-95 (Global) and mAP 50 (Per Class)."""
    model.eval()
    metric_global = MeanAveragePrecision(class_metrics=True).to(device)
    metric_50 = MeanAveragePrecision(class_metrics=True, iou_thresholds=[0.5]).to(device)

    with torch.no_grad():
        for images, targets in dataloader:
            images = list(img.to(device) for img in images)
            t_clean = [{k: v.to(device) for k, v in t.items() if k in ['boxes', 'labels']} for t in targets]

            outputs = model(images)
            metric_global.update(outputs, t_clean)
            metric_50.update(outputs, t_clean)

    res_global = metric_global.compute()
    res_50 = metric_50.compute()

    return {
        'map': res_global['map'].item(),
        'map_50': res_global['map_50'].item(),
        'map_per_class': res_global['map_per_class'],
        'map_50_per_class': res_50['map_per_class']
    }

def evaluate_best_f1(model, dataloader, device, num_classes):
    """Calculates Best F1-Score, Precision, and Recall."""
    model.eval()
    class_preds = {i: [] for i in range(1, num_classes)}
    class_gt_counts = {i: 0 for i in range(1, num_classes)}

    with torch.no_grad():
        for images, targets in dataloader:
            images = list(img.to(device) for img in images)
            outputs = model(images)

            for i, output in enumerate(outputs):
                pred_boxes = output['boxes']
                pred_scores = output['scores']
                pred_labels = output['labels']
                gt_boxes = targets[i]['boxes'].to(device)
                gt_labels = targets[i]['labels'].to(device)

                for cls_id in range(1, num_classes):
                    class_gt_counts[cls_id] += (gt_labels == cls_id).sum().item()

                if len(pred_scores) == 0: continue

                # Sort predictions by confidence
                sorted_indices = torch.argsort(pred_scores, descending=True)
                pred_boxes = pred_boxes[sorted_indices]
                pred_scores = pred_scores[sorted_indices]
                pred_labels = pred_labels[sorted_indices]

                iou_matrix = torchvision.ops.box_iou(pred_boxes, gt_boxes) if len(gt_boxes) > 0 else None
                used_gt_indices = set()

                for p_idx in range(len(pred_boxes)):
                    p_label = pred_labels[p_idx].item()
                    p_score = pred_scores[p_idx].item()

                    is_tp = False
                    if iou_matrix is not None:
                        ious = iou_matrix[p_idx]
                        if len(ious) > 0:
                            max_iou, max_gt_idx = torch.max(ious, dim=0)
                            if max_iou > 0.5 and gt_labels[max_gt_idx].item() == p_label and max_gt_idx.item() not in used_gt_indices:
                                is_tp = True
                                used_gt_indices.add(max_gt_idx.item())

                    class_preds[p_label].append((p_score, is_tp))

    results = {}
    for cls_id in range(1, num_classes):
        preds = class_preds[cls_id]
        total_gt = class_gt_counts[cls_id]
        if not preds:
            results[cls_id] = {'p': 0.0, 'r': 0.0, 'f1': 0.0}
            continue

        preds.sort(key=lambda x: x[0], reverse=True)
        preds_np = np.array(preds)
        tp_cumsum = np.cumsum(preds_np[:, 1])
        fp_cumsum = np.cumsum(1 - preds_np[:, 1])

        precisions = tp_cumsum / (tp_cumsum + fp_cumsum + 1e-16)
        recalls = tp_cumsum / (total_gt + 1e-16)
        f1_scores = 2 * (precisions * recalls) / (precisions + recalls + 1e-16)

        best_idx = np.argmax(f1_scores)
        results[cls_id] = {
            'p': precisions[best_idx],
            'r': recalls[best_idx],
            'f1': f1_scores[best_idx]
        }
    return results

In [None]:
def train_one_epoch(model, optimizer, data_loader, device):
    model.train()
    loss_total = 0
    steps = 0

    for images, targets in data_loader:
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())

        optimizer.zero_grad()
        losses.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=10.0)
        optimizer.step()

        loss_total += losses.item()
        steps += 1

    return loss_total / max(steps, 1)

def run_experiment(condition, epochs=100, patience=15):
    print(f"\nStarting Experiment: {condition} | Patience: {patience}")

    train_transform = get_transforms(condition)
    val_transform = get_transforms('Raw')

    train_ds = YOLODataset(LOCAL_DATA_DIR, split='train', transforms=train_transform, mapping=TARGET_MAPPING)
    val_ds = YOLODataset(LOCAL_DATA_DIR, split='val', transforms=val_transform, mapping=TARGET_MAPPING)

    train_loader = DataLoader(train_ds, batch_size=4, shuffle=True, collate_fn=collate_fn, num_workers=2)
    val_loader = DataLoader(val_ds, batch_size=4, shuffle=False, collate_fn=collate_fn, num_workers=2)

    model = get_model_mobilenetv3_large(NUM_CLASSES).to(DEVICE)
    params = [p for p in model.parameters() if p.requires_grad]
    optimizer = torch.optim.AdamW(params, lr=0.0001, weight_decay=0.0005)
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=epochs, eta_min=1e-6)

    save_path = os.path.join(BASE_SAVE_DIR, condition)
    os.makedirs(save_path, exist_ok=True)
    history = []
    best_map = 0.0
    patience_counter = 0

    for epoch in range(epochs):
        start_time = time.time()

        train_loss = train_one_epoch(model, optimizer, train_loader, DEVICE)
        scheduler.step()

        map_metrics = evaluate_map_complete(model, val_loader, DEVICE)
        f1_metrics = evaluate_best_f1(model, val_loader, DEVICE, NUM_CLASSES)

        current_map = map_metrics['map_50']
        duration = time.time() - start_time

        avg_p = np.mean([f1_metrics[c]['p'] for c in f1_metrics])
        avg_r = np.mean([f1_metrics[c]['r'] for c in f1_metrics])

        log_entry = {
            'epoch': epoch + 1,
            'train_loss': train_loss,
            'val_map_50': current_map,
            'val_map_50_95': map_metrics['map'],
            'avg_precision': avg_p,
            'avg_recall': avg_r,
            'time': duration
        }
        history.append(log_entry)
        pd.DataFrame(history).to_csv(os.path.join(save_path, "metrics.csv"), index=False)

        print(f"Epoch {epoch+1}/{epochs} | Loss: {train_loss:.4f} | mAP@50: {current_map:.4f} | Time: {duration:.1f}s")

        if current_map > best_map:
            best_map = current_map
            patience_counter = 0
            torch.save(model.state_dict(), os.path.join(save_path, "best_model.pth"))
            print("  > Model Saved (Best mAP)")
        else:
            patience_counter += 1
            if patience_counter >= patience:
                print(f"  > Early stopping triggered at epoch {epoch+1}")
                break

    return best_map

In [None]:
if __name__ == "__main__":
    # Run Baseline (Raw)
    map_raw = run_experiment("Raw", epochs=100)

    # Run Augmented (Tuned)
    map_tuned = run_experiment("Tuned", epochs=100)

    # Final Visualization
    results = pd.DataFrame([
        {'Condition': 'Raw', 'mAP': map_raw},
        {'Condition': 'Tuned', 'mAP': map_tuned}
    ])

    plt.figure(figsize=(8, 6))
    bars = plt.bar(results['Condition'], results['mAP'], color=['gray', '#d62728'])
    plt.title("Faster R-CNN Performance: Raw vs. Augmented")
    plt.ylabel("mAP@50")
    plt.ylim(0, 1.0)
    plt.grid(axis='y', alpha=0.3)

    for bar in bars:
        yval = bar.get_height()
        plt.text(bar.get_x() + bar.get_width()/2, yval + 0.01, f'{yval:.4f}', ha='center', va='bottom', fontweight='bold')

    plt.savefig(os.path.join(BASE_SAVE_DIR, "final_comparison.png"))
    plt.show()
    print("Experiment Complete. Results saved.")