# Image Segmentation

##Imports

In [1]:
import os
import numpy as np
import pandas as pd
import torch
from PIL import Image
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from torch.utils.data import Dataset

from pycocotools.coco import COCO
import torchvision
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection.mask_rcnn import MaskRCNNPredictor

from torchvision.utils import draw_bounding_boxes
import torchvision.transforms.functional as F


## unzipping dataset zip

In [None]:
#Unzipping RM_Segmentation_Assignment_dataset.zip
!unzip RM_Segmentation_Assignment_dataset.zip

Archive:  RM_Segmentation_Assignment_dataset.zip
replace RM_Segmentation_Assignment_dataset (1) (2)/test-30/000000001371.jpg? [y]es, [n]o, [A]ll, [N]one, [r]ename: 

Fetching training, testing, and validation data

In [2]:
# Define the mapping for our 4 classes Using subset data
CATEGORY_MAPPING = {
    'cake': 1,
    'car': 2,
    'dog': 3,
    'person': 4
}

def create_dataset(image_dir, annotation_file, transforms=None):
    """
    Create a custom COCO dataset for object detection
    """
    coco = COCO(annotation_file)

    # Get the image IDs that contain our target categories
    category_ids = []
    for category_name in CATEGORY_MAPPING.keys():
        cats = coco.loadCats(coco.getCatIds(catNms=[category_name]))
        category_ids.extend([cat['id'] for cat in cats])

    image_ids = []
    for cat_id in category_ids:
        image_ids.extend(coco.getImgIds(catIds=[cat_id]))
    image_ids = list(set(image_ids))  # Remove duplicates

    # Process images and annotations
    processed_data = []
    for img_id in image_ids:
        img_info = coco.loadImgs(img_id)[0]
        img_path = os.path.join(image_dir, img_info['file_name'])

        if not os.path.exists(img_path):
            continue

        ann_ids = coco.getAnnIds(imgIds=img_id, catIds=category_ids)
        annotations = coco.loadAnns(ann_ids)

        # Filter and process annotations
        boxes = []
        labels = []

        for ann in annotations:
            category_name = coco.loadCats([ann['category_id']])[0]['name']
            if category_name in CATEGORY_MAPPING:
                bbox = ann['bbox']  # [x, y, width, height]
                # Convert to [x1, y1, x2, y2] format
                boxes.append([
                    bbox[0],
                    bbox[1],
                    bbox[0] + bbox[2],
                    bbox[1] + bbox[3]
                ])
                labels.append(CATEGORY_MAPPING[category_name])

        if len(boxes) > 0:  # Only include images that have valid annotations
            processed_data.append({
                'img_path': img_path,
                'boxes': torch.tensor(boxes, dtype=torch.float32),
                'labels': torch.tensor(labels, dtype=torch.int64)
            })

    return processed_data


def get_transform():
    """
    Define the transformation pipeline
    """
    transforms_list = [
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406],
                           std=[0.229, 0.224, 0.225])
    ]
    return transforms.Compose(transforms_list)

def collate_fn(batch):
    """
    Custom collate function for the DataLoader
    """
    return tuple(zip(*batch))

# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Prepare datasets
train_data = create_dataset(
    'RM_Segmentation_Assignment_dataset (1) (2)/train-300/data',
    'RM_Segmentation_Assignment_dataset (1) (2)/train-300/labels.json',
    transforms=get_transform()
)

val_data = create_dataset(
    'RM_Segmentation_Assignment_dataset (1) (2)/validation-300/data',
    'RM_Segmentation_Assignment_dataset (1) (2)/validation-300/labels.json',
    transforms=get_transform()
)







loading annotations into memory...
Done (t=0.09s)
creating index...
index created!
loading annotations into memory...
Done (t=0.13s)
creating index...
index created!


In [3]:

class CocoDataset(Dataset):
    def __init__(self, processed_data, transforms=None):
        self.processed_data = processed_data
        self.transforms = transforms

    def __getitem__(self, idx):
        data = self.processed_data[idx]
        img = Image.open(data['img_path']).convert('RGB')

        target = {
            'boxes': data['boxes'],
            'labels': data['labels']
        }

        if self.transforms is not None:
            img = self.transforms(img)

        return img, target

    def __len__(self):
        return len(self.processed_data)



In [4]:
# Create datasets and dataloaders
train_dataset = CocoDataset(train_data, transforms=get_transform())
val_dataset = CocoDataset(val_data, transforms=get_transform())

train_loader = DataLoader(
    train_dataset,
    batch_size=2,
    shuffle=True,
    collate_fn=collate_fn,
    num_workers=4
)

val_loader = DataLoader(
    val_dataset,
    batch_size=2,
    shuffle=False,
    collate_fn=collate_fn,
    num_workers=4
)

#Loading Model

Using Faster R-CNN model

In [5]:
def load_model(num_classes):
    """
    Load and configure the Faster R-CNN model
    """
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes + 1)  # +1 for background
    return model

## Initialize model

In [6]:
# Initialize model
model = load_model(num_classes=len(CATEGORY_MAPPING))
model.to(device)

Downloading: "https://download.pytorch.org/models/fasterrcnn_resnet50_fpn_coco-258fb6c6.pth" to /home/hamza/.cache/torch/hub/checkpoints/fasterrcnn_resnet50_fpn_coco-258fb6c6.pth
100%|████████████████████████████████████████████████████████████████████████████████| 160M/160M [00:41<00:00, 4.06MB/s]


FasterRCNN(
  (transform): GeneralizedRCNNTransform(
      Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
      Resize(min_size=(800,), max_size=1333, mode='bilinear')
  )
  (backbone): BackboneWithFPN(
    (body): IntermediateLayerGetter(
      (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
      (bn1): FrozenBatchNorm2d(64, eps=0.0)
      (relu): ReLU(inplace=True)
      (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
      (layer1): Sequential(
        (0): Bottleneck(
          (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn1): FrozenBatchNorm2d(64, eps=0.0)
          (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
          (bn2): FrozenBatchNorm2d(64, eps=0.0)
          (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn3): FrozenBatchNorm2d(256, eps=0.0)
          (relu): ReLU(

#Training model

In [7]:
# Training configuration
def train_one_epoch(model, optimizer, data_loader, device):
    model.train()
    total_loss = 0

    for images, targets in data_loader:
        images = [img.to(device) for img in images]
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())

        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

        total_loss += losses.item()

    return total_loss / len(data_loader)

## training paramaters

In [8]:
# Set up optimizer
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)

## Training Loop
Epoch = 10

In [None]:

# Training loop
num_epochs = 3
for epoch in range(num_epochs):

    model.train()
    total_loss = 0

    for images, targets in train_loader:
        images = [img.to(device) for img in images]
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())

        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

        total_loss += losses.item()

    loss = total_loss / len(train_loader)
    print(f'Epoch {epoch+1}/{num_epochs}, Loss: {loss:.4f}')

    # Save model checkpoint
    torch.save({
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'loss': loss,
    }, f'checkpoint_epoch_{epoch+1}.pth')