<a href="https://colab.research.google.com/github/guo1428397137-wq/guo-9517-ass/blob/Faster-R-CNN/Faster-R-CNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install -q torchmetrics opencv-python tqdm

import os
from pathlib import Path
from typing import List, Dict, Any
import numpy as np
import cv2
import torch
from torch.utils.data import Dataset, DataLoader
import torchvision
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
import torchvision.transforms.functional as TF
from torchmetrics.detection.mean_ap import MeanAveragePrecision
from tqdm import tqdm
import matplotlib.pyplot as plt
import random


DATA_ROOT = Path("/content/drive/MyDrive/archive")

TRAIN_IMG_DIR   = DATA_ROOT / "train" / "images"
TRAIN_LABEL_DIR = DATA_ROOT / "train" / "labels"
VAL_IMG_DIR     = DATA_ROOT / "valid" / "images"
VAL_LABEL_DIR   = DATA_ROOT / "valid" / "labels"
TEST_IMG_DIR    = DATA_ROOT / "test" / "images"
TEST_LABEL_DIR  = DATA_ROOT / "test" / "labels"

NUM_CLASSES   = 12
BATCH_SIZE    = 4
IMG_MAX_SIZE  = 800
EPOCHS        = 20
LR            = 0.001

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

# Dataset Category Name
CLASS_NAMES = [
    'background',  # 0
    'Ants', 'Bees', 'Beetle', 'Caterpillar',
    'Earthworms', 'Earwig', 'Grasshopper', 'Moth',
    'Slug', 'Snail', 'Wasp', 'Weevil'
]

# Improved tag loading function
def load_yolo_labels(label_path: Path, img_w: int, img_h: int):

    if not label_path.exists():
        return torch.zeros((0, 4), dtype=torch.float32), torch.zeros((0,), dtype=torch.int64)

    try:
        data = np.loadtxt(str(label_path), ndmin=2)
    except:
        return torch.zeros((0, 4), dtype=torch.float32), torch.zeros((0,), dtype=torch.int64)

    if data.shape[0] == 0:
        return torch.zeros((0, 4), dtype=torch.float32), torch.zeros((0,), dtype=torch.int64)

    cls = data[:, 0].astype(np.int64)
    cx  = data[:, 1] * img_w
    cy  = data[:, 2] * img_h
    bw  = data[:, 3] * img_w
    bh  = data[:, 4] * img_h

    x1 = np.clip(cx - bw / 2, 0, img_w)
    y1 = np.clip(cy - bh / 2, 0, img_h)
    x2 = np.clip(cx + bw / 2, 0, img_w)
    y2 = np.clip(cy + bh / 2, 0, img_h)

    # Invalid filter box
    valid = (x2 > x1) & (y2 > y1)
    if not valid.any():
        return torch.zeros((0, 4), dtype=torch.float32), torch.zeros((0,), dtype=torch.int64)

    boxes = np.stack([x1[valid], y1[valid], x2[valid], y2[valid]], axis=1).astype(np.float32)
    labels = cls[valid] + 1

    return torch.as_tensor(boxes, dtype=torch.float32), torch.as_tensor(labels, dtype=torch.int64)


def resize_keep_ratio(img: np.ndarray, target: Dict[str, Any], max_size: int):

    h, w = img.shape[:2]
    scale = min(max_size / max(h, w), 1.0)
    nh, nw = int(h * scale), int(w * scale)

    if (nh, nw) != (h, w):
        img = cv2.resize(img, (nw, nh), interpolation=cv2.INTER_LINEAR)
        if target["boxes"].numel() > 0:
            boxes = target["boxes"]
            boxes[:, [0, 2]] *= float(nw) / w
            boxes[:, [1, 3]] *= float(nh) / h
            boxes[:, 0] = torch.clamp(boxes[:, 0], 0, nw)
            boxes[:, 1] = torch.clamp(boxes[:, 1], 0, nh)
            boxes[:, 2] = torch.clamp(boxes[:, 2], 0, nw)
            boxes[:, 3] = torch.clamp(boxes[:, 3], 0, nh)
            target["boxes"] = boxes

    target["size"] = torch.tensor([nh, nw], dtype=torch.int64)
    return img, target



# Dataset

class YoloDetectionDataset(Dataset):
    def __init__(self, img_dir: Path, label_dir: Path, training: bool = True, img_max_size: int = 800):
        self.img_dir   = Path(img_dir)
        self.label_dir = Path(label_dir)
        self.training  = training
        self.img_max_size = img_max_size

        exts = [".jpg", ".jpeg", ".png"]
        self.img_paths = [p for p in self.img_dir.rglob("*") if p.suffix.lower() in exts]
        self.img_paths.sort()

        if len(self.img_paths) == 0:
            raise RuntimeError(f"No images found in {self.img_dir}")

    def __len__(self):
        return len(self.img_paths)

    def __getitem__(self, idx: int):
        img_path = self.img_paths[idx]
        label_path = self.label_dir / (img_path.stem + ".txt")

        img = cv2.imread(str(img_path))
        if img is None:
            raise FileNotFoundError(str(img_path))
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        h, w = img.shape[:2]

        boxes, labels = load_yolo_labels(label_path, w, h)

        if boxes.numel() > 0:
            area = (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1])
        else:
            area = torch.zeros((0,), dtype=torch.float32)

        iscrowd = torch.zeros((labels.shape[0],), dtype=torch.int64)

        target = {
            "boxes":    boxes,
            "labels":   labels,
            "image_id": torch.tensor([idx]),
            "area":     area,
            "iscrowd":  iscrowd,
        }

        if self.training and boxes.numel() > 0:

            if np.random.rand() < 0.5:
                img = np.ascontiguousarray(img[:, ::-1])
                w_img = img.shape[1]
                b = target["boxes"].clone()
                x1 = b[:, 0].clone()
                x2 = b[:, 2].clone()
                b[:, 0] = w_img - x2
                b[:, 2] = w_img - x1
                target["boxes"] = b


            if np.random.rand() < 0.3:
                img = cv2.convertScaleAbs(img, alpha=np.random.uniform(0.8, 1.2),
                                         beta=np.random.uniform(-10, 10))


        img, target = resize_keep_ratio(img, target, self.img_max_size)


        img = TF.to_tensor(img)
        img = TF.normalize(img, mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])

        return img, target


def collate_fn(batch):
    imgs, targets = list(zip(*batch))
    return list(imgs), list(targets)


# Create a dataset
train_dataset = YoloDetectionDataset(TRAIN_IMG_DIR, TRAIN_LABEL_DIR, training=True, img_max_size=IMG_MAX_SIZE)
val_dataset   = YoloDetectionDataset(VAL_IMG_DIR, VAL_LABEL_DIR, training=False, img_max_size=IMG_MAX_SIZE)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True,
                          num_workers=2, collate_fn=collate_fn, pin_memory=True)
val_loader   = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False,
                          num_workers=2, collate_fn=collate_fn, pin_memory=True)

print(f"Train samples: {len(train_dataset)}, Val samples: {len(val_dataset)}")


print("\ncheck first sample:")
img, target = train_dataset[0]
print(f"Image shape: {img.shape}")
print(f"Boxes shape: {target['boxes'].shape}")
print(f"Labels: {target['labels']}")
print(f"Boxes: {target['boxes'][:5] if len(target['boxes']) > 0 else 'No boxes'}")

# model
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights="DEFAULT")

in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, NUM_CLASSES + 1)

model.to(device)

print(f"\nModel parameter count: {sum(p.numel() for p in model.parameters()):,}")


# optimizer
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=LR, momentum=0.9, weight_decay=1e-4)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=8, gamma=0.1)
scaler = torch.cuda.amp.GradScaler(enabled=(device.type == "cuda"))


# Training and validation
def train_one_epoch(epoch: int):
    model.train()
    pbar = tqdm(train_loader, desc=f"[Train] Epoch {epoch}")
    running_loss = 0.0
    loss_components = {'cls': 0, 'box': 0, 'obj': 0, 'rpn': 0}

    for images, targets in pbar:
        images  = [img.to(device) for img in images]
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        optimizer.zero_grad(set_to_none=True)

        with torch.cuda.amp.autocast(enabled=(device.type == "cuda")):
            loss_dict = model(images, targets)
            loss = sum(loss_dict.values())

        scaler.scale(loss).backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=5.0)
        scaler.step(optimizer)
        scaler.update()

        running_loss += loss.item()
        loss_components['cls'] += loss_dict.get('loss_classifier', torch.tensor(0)).item()
        loss_components['box'] += loss_dict.get('loss_box_reg', torch.tensor(0)).item()
        loss_components['obj'] += loss_dict.get('loss_objectness', torch.tensor(0)).item()
        loss_components['rpn'] += loss_dict.get('loss_rpn_box_reg', torch.tensor(0)).item()

        pbar.set_postfix({
            "loss": f"{loss.item():.3f}",
            "cls":  f"{loss_dict.get('loss_classifier', torch.tensor(0)).item():.3f}",
            "box":  f"{loss_dict.get('loss_box_reg', torch.tensor(0)).item():.3f}",
        })

    lr_scheduler.step()

    n = len(train_loader)
    return running_loss / n, {k: v/n for k, v in loss_components.items()}


@torch.no_grad()
def evaluate_map():
    model.eval()
    metric = MeanAveragePrecision(
    iou_type="bbox",
    iou_thresholds=[0.3]
)
    pbar = tqdm(val_loader, desc="[Eval]")

    for images, targets in pbar:
        images  = [img.to(device) for img in images]
        outputs = model(images)

        preds, gts = [], []
        for out, tgt in zip(outputs, targets):
            preds.append({
                "boxes":  out["boxes"].detach().cpu(),
                "scores": out["scores"].detach().cpu(),
                "labels": out["labels"].detach().cpu(),
            })
            gts.append({
                "boxes":   tgt["boxes"].detach().cpu(),
                "labels":  tgt["labels"].detach().cpu(),
                "iscrowd": tgt.get("iscrowd", torch.zeros((tgt["labels"].shape[0],), dtype=torch.int64)).detach().cpu()
            })
        metric.update(preds, gts)

    res = metric.compute()
    out = {}
    if "map" in res:
        out["map"] = res["map"].item()
    if "map_50" in res:
        out["map_50"] = res["map_50"].item()
    if "map_75" in res:
        out["map_75"] = res["map_75"].item()
    return out



# Training loop
best_map = 0.0
history = {'train_loss': [], 'map': [], 'map_50': []}

print("\n" + "="*70)
print("Start training")
print("="*70)

for epoch in range(1, EPOCHS + 1):
    train_loss, loss_comp = train_one_epoch(epoch)
    metrics = evaluate_map()
    map_all = metrics["map"]
    map_50  = metrics["map_50"]

    history['train_loss'].append(train_loss)
    history['map'].append(map_all)
    history['map_50'].append(map_50)

    print(f"[Epoch {epoch:02d}] "
          f"train_loss={train_loss:.4f}  "
          f"mAP={map_all:.4f}  "
          f"mAP50={map_50:.4f}")
    print(f"  Loss components: cls={loss_comp['cls']:.4f}, box={loss_comp['box']:.4f}, "
          f"obj={loss_comp['obj']:.4f}, rpn={loss_comp['rpn']:.4f}")

    if map_all > best_map:
        best_map = map_all
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'map': map_all,
        }, "/content/fasterrcnn_best.pth")
        print("  ðŸ‘‰ Saved new best model.")
    print("-" * 70)

print(f"\nTraining completed Best mAP = {best_map:.4f}")

# Plotting training curves
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

axes[0].plot(history['train_loss'], marker='o', label='Train Loss')
axes[0].set_xlabel('Epoch')
axes[0].set_ylabel('Loss')
axes[0].set_title('Training Loss')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

axes[1].plot(history['map'], marker='o', label='mAP')
axes[1].plot(history['map_50'], marker='s', label='mAP@50')
axes[1].set_xlabel('Epoch')
axes[1].set_ylabel('mAP')
axes[1].set_title('Validation mAP')
axes[1].legend()
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.savefig('/content/training_curves.png', dpi=300, bbox_inches='tight')
plt.show()


# Visualization Functions
def denormalize_image(img_tensor):

    mean = torch.tensor([0.485, 0.456, 0.406]).view(3, 1, 1)
    std = torch.tensor([0.229, 0.224, 0.225]).view(3, 1, 1)
    img = img_tensor * std + mean
    img = torch.clamp(img, 0, 1)
    return img.permute(1, 2, 0).numpy()

# Visualize the prediction results
def visualize_predictions(dataset: Dataset, num_images: int = 6, score_thresh: float = 0.05):

    model.eval()


    idxs = random.sample(range(len(dataset)), k=min(num_images, len(dataset)))

    fig, axes = plt.subplots(2, 3, figsize=(18, 12))
    axes = axes.flatten()

    for i, idx in enumerate(idxs):
        img, target = dataset[idx]

        img_np = denormalize_image(img.cpu())

        # predict
        with torch.no_grad():
            out = model([img.to(device)])[0]

        boxes  = out["boxes"].cpu().numpy()
        scores = out["scores"].cpu().numpy()
        labels = out["labels"].cpu().numpy()

        keep = scores >= score_thresh
        boxes, scores, labels = boxes[keep], scores[keep], labels[keep]

        vis = (img_np * 255).astype(np.uint8).copy()

        # Draw the prediction box
        for (x1, y1, x2, y2), s, lb in zip(boxes, scores, labels):
            x1, y1, x2, y2 = map(int, [x1, y1, x2, y2])
            cv2.rectangle(vis, (x1, y1), (x2, y2), (255, 0, 0), 2)

            class_name = CLASS_NAMES[int(lb)] if int(lb) < len(CLASS_NAMES) else str(int(lb))
            text = f"{class_name}: {s:.2f}"

            (text_w, text_h), _ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2)
            cv2.rectangle(vis, (x1, max(0, y1-text_h-5)), (x1+text_w, y1), (255, 0, 0), -1)
            cv2.putText(vis, text, (x1, max(text_h, y1-5)),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)

        # Draw a realistic outline
        gt_boxes = target["boxes"].numpy()
        gt_labels = target["labels"].numpy()
        for (x1, y1, x2, y2), lb in zip(gt_boxes, gt_labels):
            x1, y1, x2, y2 = map(int, [x1, y1, x2, y2])
            cv2.rectangle(vis, (x1, y1), (x2, y2), (0, 255, 0), 2)

        axes[i].imshow(vis)
        axes[i].axis('off')
        axes[i].set_title(f'Sample {idx} | Pred: {len(boxes)} boxes', fontsize=10, fontweight='bold')


    from matplotlib.patches import Patch
    legend_elements = [
        Patch(facecolor='red', edgecolor='red', label='Prediction'),
        Patch(facecolor='green', edgecolor='green', label='Ground Truth')
    ]
    fig.legend(handles=legend_elements, loc='upper right', fontsize=12)

    plt.tight_layout()
    plt.savefig('/content/predictions.png', dpi=300, bbox_inches='tight')
    plt.show()


print("\nVisualize the prediction results:")
visualize_predictions(val_dataset, num_images=6, score_thresh=0.05)

# Evaluate

@torch.no_grad()
def detailed_evaluation(dataset, score_thresh=0.5):
    model.eval()

    class_tp = {i: 0 for i in range(1, NUM_CLASSES+1)}
    class_fp = {i: 0 for i in range(1, NUM_CLASSES+1)}
    class_fn = {i: 0 for i in range(1, NUM_CLASSES+1)}

    for idx in tqdm(range(len(dataset)), desc='Detailed Eval'):
        img, target = dataset[idx]

        with torch.no_grad():
            out = model([img.to(device)])[0]

        pred_boxes = out["boxes"].cpu().numpy()
        pred_scores = out["scores"].cpu().numpy()
        pred_labels = out["labels"].cpu().numpy()

        keep = pred_scores >= score_thresh
        pred_labels = pred_labels[keep]

        gt_labels = target["labels"].numpy()


        for label in range(1, NUM_CLASSES+1):
            n_pred = np.sum(pred_labels == label)
            n_gt = np.sum(gt_labels == label)

            if n_gt > 0:
                if n_pred > 0:
                    class_tp[label] += min(n_pred, n_gt)
                    if n_pred > n_gt:
                        class_fp[label] += (n_pred - n_gt)
                    else:
                        class_fn[label] += (n_gt - n_pred)
                else:
                    class_fn[label] += n_gt
            else:
                if n_pred > 0:
                    class_fp[label] += n_pred


    print("\n" + "="*70)
    print("Performance of each category:")
    print("="*70)
    print(f"{'Class':<15} {'TP':>6} {'FP':>6} {'FN':>6} {'Precision':>10} {'Recall':>10} {'F1':>10}")
    print("-"*70)

    for label in range(1, NUM_CLASSES+1):
        tp = class_tp[label]
        fp = class_fp[label]
        fn = class_fn[label]

        precision = tp / (tp + fp) if (tp + fp) > 0 else 0
        recall = tp / (tp + fn) if (tp + fn) > 0 else 0
        f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0

        class_name = CLASS_NAMES[label] if label < len(CLASS_NAMES) else f"Class_{label}"
        print(f"{class_name:<15} {tp:6d} {fp:6d} {fn:6d} {precision:10.4f} {recall:10.4f} {f1:10.4f}")

    print("="*70)

# run
detailed_evaluation(val_dataset, score_thresh=0.05)

