In [None]:
# ! cp drive/MyDrive/Challenge1.zip ./

In [None]:
# ! mkdir -p Challenge1
# ! mv Challenge1.zip Challenge1

In [None]:
# ! cd Challenge1 && unzip Challenge1.zip

In [None]:
# ! ls ./Challenge1/coin-dataset/ | grep ^_anno

In [None]:
import os
import torch
import numpy as np
import random
from torch.utils.data import DataLoader, Subset
from torchvision.models.detection import maskrcnn_resnet50_fpn_v2, MaskRCNN_ResNet50_FPN_V2_Weights
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection.mask_rcnn import MaskRCNNPredictor
from torchvision.transforms import functional as F
from pycocotools.coco import COCO
from PIL import Image

# -------- Custom COCO Dataset --------
class COCODataset(torch.utils.data.Dataset):
    def __init__(self, img_dir, ann_path, transforms=None):
        self.coco = COCO(ann_path)
        self.img_dir = img_dir
        self.ids = list(sorted(self.coco.imgs.keys()))
        self.transforms = transforms

    def __getitem__(self, index):
        img_id = self.ids[index]
        ann_ids = self.coco.getAnnIds(imgIds=img_id)
        anns = self.coco.loadAnns(ann_ids)
        path = self.coco.loadImgs(img_id)[0]['file_name']

        img = Image.open(os.path.join(self.img_dir, path)).convert('RGB')

        boxes = []
        labels = []
        masks = []

        for ann in anns:
            x, y, w, h = ann['bbox']
            boxes.append([x, y, x + w, y + h])
            labels.append(ann['category_id'])

            if 'segmentation' in ann and ann['segmentation']:
                mask = self.coco.annToMask(ann)
            else:
                mask = np.zeros((img.height, img.width), dtype=np.uint8)
            masks.append(mask)

        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.int64)
        masks = torch.as_tensor(np.array(masks), dtype=torch.uint8)

        target = {
            'boxes': boxes,
            'labels': labels,
            'masks': masks,
            'image_id': torch.tensor([img_id])
        }

        if self.transforms:
            img = self.transforms(img)

        return img, target

    def __len__(self):
        return len(self.ids)

# -------- Split dataset into train and val --------
def split_dataset(dataset, split_ratio=0.7):
    indices = list(range(len(dataset)))
    random.shuffle(indices)
    split = int(split_ratio * len(dataset))
    return Subset(dataset, indices[:split]), Subset(dataset, indices[split:])

def collate_fn(batch):
    return tuple(zip(*batch))

# -------- Training Function --------
def train():
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Training on device: {device}")

    img_dir = '/content/Challenge1/coin-dataset'                  # Folder with images
    ann_path = '/content/Challenge1/coin-dataset/_annotations.coco.json' # COCO json annotation file

    # Dataset and DataLoader
    full_dataset = COCODataset(img_dir, ann_path, transforms=F.to_tensor)
    train_dataset, val_dataset = split_dataset(full_dataset)

    train_loader = DataLoader(train_dataset, batch_size=2, shuffle=True, collate_fn=collate_fn)
    val_loader = DataLoader(val_dataset, batch_size=1, shuffle=False, collate_fn=collate_fn)

    # -------- Load Pretrained Model --------
    weights = MaskRCNN_ResNet50_FPN_V2_Weights.COCO_V1
    model = maskrcnn_resnet50_fpn_v2(weights=weights)

    # -------- Update Number of Classes --------
    num_classes = len(full_dataset.coco.getCatIds()) + 1  # +1 for background

    # Replace box predictor head
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

    # Replace mask predictor head
    in_features_mask = model.roi_heads.mask_predictor.conv5_mask.in_channels
    hidden_layer = 256
    model.roi_heads.mask_predictor = MaskRCNNPredictor(in_features_mask, hidden_layer, num_classes)

    model.to(device)

    # -------- Optimizer and LR Scheduler --------
    params = [p for p in model.parameters() if p.requires_grad]
    optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)
    lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

    num_epochs = 20

    for epoch in range(num_epochs):
        model.train()
        total_loss = 0

        for images, targets in train_loader:
            images = [img.to(device) for img in images]
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

            loss_dict = model(images, targets)
            losses = sum(loss for loss in loss_dict.values())

            optimizer.zero_grad()
            losses.backward()
            optimizer.step()

            total_loss += losses.item()

        print(f'Epoch [{epoch+1}/{num_epochs}] - Loss: {total_loss:.4f}')
        lr_scheduler.step()

torch.save(model.state_dict(), "maskrcnn_finetuned_v2.pth")
print("Training complete. Model saved as maskrcnn_finetuned_v2.pth")

train()
