In [10]:
import torch
import torchvision
from torch import optim
from torchvision.models.detection import fasterrcnn_resnet50_fpn, FasterRCNN_ResNet50_FPN_V2_Weights, \
    fasterrcnn_resnet50_fpn_v2
from torchvision.datasets import VOCDetection
from torchvision.transforms import functional as F
from torch.utils.data import DataLoader
from tqdm import tqdm
from matplotlib import patches
import matplotlib.pyplot as plt


In [11]:
import numpy as np
from torchvision.datasets import OxfordIIITPet


class PetDataset(torch.utils.data.Dataset):
    def __init__(self, root, split="trainval"):
        self.dataset = OxfordIIITPet(
            root=root,
            split=split,
            target_types="segmentation",
            download=True
        )

    def __getitem__(self, idx):
        img, mask = self.dataset[idx]
        img = F.to_tensor(img)

        mask = np.array(mask)
        binary_mask = mask > 0

        if binary_mask.sum() == 0:
            # αν δεν υπάρχει object, παραλείπουμε
            return self[(idx+1) % len(self)]

        binary_mask = torch.as_tensor(binary_mask, dtype=torch.float32)

        pos = torch.where(binary_mask)
        xmin = torch.min(pos[1])
        xmax = torch.max(pos[1])
        ymin = torch.min(pos[0])
        ymax = torch.max(pos[0])

        boxes = torch.tensor([[xmin, ymin, xmax, ymax]], dtype=torch.float32)
        labels = torch.ones((1,), dtype=torch.int64)
        masks = binary_mask.unsqueeze(0).float()  # float tensor

        target = {
            "boxes": boxes,
            "labels": labels,
            "masks": masks
        }

        return img, target


    def __len__(self):
        return len(self.dataset)

In [12]:
def collate_fn(batch):
    return tuple(zip(*batch))


train_dataset = PetDataset("./data", split="trainval")
val_dataset = PetDataset("./data", split="test")

from torch.utils.data import Subset
train_dataset = Subset(train_dataset, range(100))
val_dataset = Subset(val_dataset, range(50))


train_loader = DataLoader(train_dataset, batch_size=2, shuffle=True, collate_fn=collate_fn, num_workers=0, pin_memory=True, persistent_workers=False)
val_loader = DataLoader(val_dataset, batch_size=2, shuffle=False, collate_fn=collate_fn)

  0%|          | 1.21M/792M [00:05<55:21, 238kB/s]   


KeyboardInterrupt: 

In [None]:
def train_one_epoch(model, loader, optimizer, device, epoch, epochs):
    model.train()
    running_loss = 0

    pbar = tqdm(loader, desc=f"Epoch {epoch+1}/{epochs} [Train]", leave=False)

    for images, targets in pbar:
        images = [img.to(device) for img in images]
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        loss_dict = model(images, targets)
        # σε περίπτωση που επιστραφεί list αντί για dict
        if isinstance(loss_dict, list):
        # παίρνουμε το πρώτο στοιχείο
            loss_dict = loss_dict[0]
        loss = sum(loss for loss in loss_dict.values())

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        pbar.set_postfix(loss=loss.item())

    return running_loss / len(loader)


In [None]:
def train_model(model, train_loader, optimizer, device, epochs):
    history = {"train_loss": []}

    for epoch in range(epochs):
        train_loss = train_one_epoch(
            model, train_loader, optimizer, device, epoch, epochs
        )

        history["train_loss"].append(train_loss)

        print(
            f"Epoch {epoch+1}/{epochs} | "
            f"Train Loss: {train_loss:.4f}"
        )

    return history


In [None]:
def plot_curves(results):
    plt.figure(figsize=(8,5))

    for name, hist in results.items():
        plt.plot(hist["train_loss"], label=f"{name} Train")
        plt.plot(hist["val_loss"], linestyle="--", label=f"{name} Val")

    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.title("Training & Validation Loss")
    plt.legend()
    plt.grid(True)
    plt.show()

In [None]:
def show_detection(model, dataset, device, n, score_thr=0.5):
    model.eval()

    import random
    idxs = random.sample(range(len(dataset)), n)

    for idx in idxs:
        img, target = dataset[idx]

        with torch.no_grad():
            pred = model([img.to(device)])[0]

        img_np = img.permute(1,2,0).cpu().numpy()

        plt.figure(figsize=(6,6))
        plt.imshow(img_np)
        ax = plt.gca()

        # GT boxes (green)
        for box in target["boxes"]:
            x1,y1,x2,y2 = box
            rect = patches.Rectangle((x1,y1), x2-x1, y2-y1,
                                     linewidth=2, edgecolor='g', facecolor='none')
            ax.add_patch(rect)

        # Pred boxes (red)
        for box, score in zip(pred["boxes"], pred["scores"]):
            if score < score_thr:
                continue
            x1,y1,x2,y2 = box.cpu()
            rect = patches.Rectangle((x1,y1), x2-x1, y2-y1,
                                     linewidth=2, edgecolor='r', facecolor='none')
            ax.add_patch(rect)

        plt.title("Green=GT, Red=Prediction")
        plt.axis("off")
        plt.show()


In [None]:
from torchvision.models.detection import maskrcnn_resnet50_fpn
from torchvision.models.detection.mask_rcnn import MaskRCNNPredictor

def create_model(num_classes, device):
    # φορτώνουμε προεκπαιδευμένο Mask R-CNN
    model = maskrcnn_resnet50_fpn(weights="DEFAULT")

    # αντικαθιστούμε τον predictor για το class+box
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = torchvision.models.detection.faster_rcnn.FastRCNNPredictor(
        in_features, num_classes
    )

    # αντικαθιστούμε και τον mask predictor για τον αριθμό των classes
    in_features_mask = model.roi_heads.mask_predictor.conv5_mask.in_channels
    hidden_layer = 256
    model.roi_heads.mask_predictor = MaskRCNNPredictor(
        in_features_mask, hidden_layer, num_classes
    )

    return model.to(device)


In [None]:
from torchmetrics.detection.mean_ap import MeanAveragePrecision

def evaluate_map(model, loader, device):
    model.eval()
    metric = MeanAveragePrecision(iou_type="bbox")

    with torch.no_grad():
        for images, targets in loader:
            images = [img.to(device) for img in images]
            preds = model(images)

            preds = [{k: v.cpu() for k, v in p.items()} for p in preds]
            targets = [{k: v.cpu() for k, v in t.items()} for t in targets]

            metric.update(preds, targets)

    return metric.compute()


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

In [None]:
results = {}
lr = 1e-3

for epochs in [5, 10, 15]:
    print(f"Training with epochs={epochs}")

    model = create_model(num_classes=2, device=device)

    optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9)

    history = train_model(model, train_loader, optimizer, device, epochs)
    map_results = evaluate_map(model, val_loader, device)

    print(f"Epochs: {epochs}, LR: {lr}")
    print("mAP:", map_results["map"].item())
    print("mAP@0.5:", map_results["map_50"].item())

# 3️⃣ VISUAL RESULTS
    show_detection(model, val_dataset, device, n=6)

    results[f"epochs={epochs}"] = history

plot_curves(results)

In [None]:
show_detection(model, val_dataset, device, n=8)