In [1]:
from google.colab import drive
import os

# Google Colab Patch
use_colab = True
if use_colab:
    from google.colab import drive
    drive.mount('/content/drive')
    import sys
    # ----------------------------------------
    dir = "/content/drive/MyDrive/Colab Notebooks/328/assignment4"
    # ----------------------------------------
    sys.path.append(dir)
from A4_utils import *

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
import torch
from torch.utils.data.dataset import Dataset  # For custom data-sets
from torchvision import transforms
import torchvision
from skimage.io import imread
from PIL import Image
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
import math
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection.rpn import AnchorGenerator
from tqdm import tqdm

CLASSES = [
    '__background__', '0'
    '1', '2', '3',
    '4', '5', '6',
    '7', '8', '9'
]

Num_classes = 11

data_path = "/content/drive/MyDrive/Colab Notebooks/328/assignment4/mnistdd_rgb_train_valid/"

# training
train_image_path = data_path + 'train_X.npy'
train_label_path = data_path + 'train_Y.npy'
train_bboxes_path = data_path + 'train_bboxes.npy'
train_seg_path = data_path + 'train_seg.npy'

# validation
valid_image_path= data_path + 'valid_X.npy'
valid_label_path= data_path +'valid_Y.npy'
valid_bboxes_path = data_path +'valid_bboxes.npy'
valid_seg_path = data_path +'valid_seg.npy'


class CustomDataset(Dataset):
    def __init__(self, image_paths, label_paths, bounding_box_path, train=True):   # initial logic happens like transform
        self.images = np.load(image_paths)
        self.label =  np.load(label_paths)
        self.bboxes = np.load(bounding_box_path)
        self.transforms_image = torchvision.transforms.Compose([torchvision.transforms.ToTensor(),
                                            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])])
    def __getitem__(self, index):

         # Retrieve and preprocess a sample
        image = self.images[index].reshape(64, 64, 3)
        labels = torch.from_numpy(self.label[index]).to(torch.int64)

        bounding_boxes1 = self.bboxes[index][0]
        bounding_boxes2 = self.bboxes[index][1]

        boxes = torch.zeros([2,4], dtype=torch.float32)
        boxes1 = [bounding_boxes1[1], bounding_boxes1[0], bounding_boxes1[3], bounding_boxes1[2]]
        boxes2 = [bounding_boxes2[1], bounding_boxes2[0], bounding_boxes2[3], bounding_boxes2[2]]
        boxes[0] = torch.tensor(boxes1)
        boxes[1] = torch.tensor(boxes2)

        # Apply any transformations if needed
        image = self.transforms_image(image)

        return {"image":image, 'boxes': boxes, 'labels': labels}

    def __len__(self):  # return count of sample we have
        return len(self.images)

train_dataset = CustomDataset(train_image_path, train_label_path, train_bboxes_path, train=True)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=16, shuffle=True)

valid_dataset = CustomDataset(valid_image_path, valid_label_path, valid_bboxes_path, train=False)
valid_loader = torch.utils.data.DataLoader(valid_dataset, batch_size=16, shuffle=False)

In [4]:
import matplotlib.pyplot as plt

"""
dataiter = enumerate(train_loader)
batch_idx, (image, var) = next(dataiter)

print(f"images shape {image.shape}")
print(f"labels shape {var['labels'].shape}")
print(f"bboxes shape {var['boxes'].shape}")

plt.imshow(image[0].permute(1,2,0).data)
plt.show()

# plt.imshow(img_array, cmap='gray')
# plt.show()
"""

'\ndataiter = enumerate(train_loader)\nbatch_idx, (image, var) = next(dataiter)\n\nprint(f"images shape {image.shape}")\nprint(f"labels shape {var[\'labels\'].shape}")\nprint(f"bboxes shape {var[\'boxes\'].shape}")\n\nplt.imshow(image[0].permute(1,2,0).data)\nplt.show()\n\n# plt.imshow(img_array, cmap=\'gray\')\n# plt.show()\n'

load model

In [5]:
import torchvision
import torch.nn.functional as F
import torch

from torch import nn
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.rpn import AnchorGenerator

class TwoMLPHead(nn.Module):
    """
    Standard heads for FPN-based models

    Args:
        in_channels (int): number of input channels
        representation_size (int): size of the intermediate representation
    """

    def __init__(self, in_channels, representation_size):
        super().__init__()

        self.fc6 = nn.Linear(in_channels, representation_size)
        self.fc7 = nn.Linear(representation_size, representation_size)

    def forward(self, x):
        x = x.flatten(start_dim=1)

        x = F.relu(self.fc6(x))
        x = F.relu(self.fc7(x))

        return x

class FastRCNNPredictor(nn.Module):
    """
    Standard classification + bounding box regression layers
    for Fast R-CNN.

    Args:
        in_channels (int): number of input channels
        num_classes (int): number of output classes (including background)
    """

    def __init__(self, in_channels, num_classes):
        super().__init__()
        self.cls_score = nn.Linear(in_channels, num_classes)
        self.bbox_pred = nn.Linear(in_channels, num_classes * 4)

    def forward(self, x):
        if x.dim() == 4:
            torch._assert(
                list(x.shape[2:]) == [1, 1],
                f"x has the wrong shape, expecting the last two dimensions to be [1,1] instead of {list(x.shape[2:])}",
            )
        x = x.flatten(start_dim=1)
        scores = self.cls_score(x)
        bbox_deltas = self.bbox_pred(x)

        return scores, bbox_deltas

# A Nano backbone.
class NanoBackbone(nn.Module):
    def __init__(self, initialize_weights=True, num_classes=1000):
        super(NanoBackbone, self).__init__()

        self.num_classes = num_classes
        self.features = self._create_conv_layers()

        if initialize_weights:
            # Random initialization of the weights
            # just like the original paper.
            self._initialize_weights()

    def _create_conv_layers(self):
        conv_layers = nn.Sequential(
            nn.Conv2d(3, 64, 7, stride=2, padding=3),
            nn.LeakyReLU(0.1, inplace=True),
            nn.MaxPool2d(2),

            nn.Conv2d(64, 128, 3, padding=1),
            nn.LeakyReLU(0.1, inplace=True),
            nn.MaxPool2d(2),

            nn.Conv2d(128, 256, 1),
            nn.LeakyReLU(0.1, inplace=True),
            nn.Conv2d(256, 256, 3, padding=1),
            nn.LeakyReLU(0.1, inplace=True),
            nn.Conv2d(256, 256, 1),
            nn.LeakyReLU(0.1, inplace=True),
        )
        return conv_layers

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal(m.weight, mode='fan_in',
                    nonlinearity='leaky_relu'
                )
                if m.bias is not None:
                        nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.constant_(m.bias, 0)

def create_model(num_classes, pretrained=True, coco_model=False):
    # Load the backbone features.
    backbone = NanoBackbone(num_classes=11).features

    # We need the output channels of the last convolutional layers from
    # the features for the Faster RCNN model.
    backbone.out_channels = 256

    # Generate anchors using the RPN. Here, we are using 5x3 anchors.
    # Meaning, anchors with 5 different sizes and 3 different aspect
    # ratios.
    anchor_generator = AnchorGenerator(
        sizes=((32, 64, 128, 256, 512),),
        aspect_ratios=((0.5, 1.0, 2.0),)
    )

    # Feature maps to perform RoI cropping.
    # If backbone returns a Tensor, `featmap_names` is expected to
    # be [0]. We can choose which feature maps to use.
    roi_pooler = torchvision.ops.MultiScaleRoIAlign(
        featmap_names=['0'],
        output_size=7,
        sampling_ratio=2
    )

    representation_size = 128

    # Box head.
    box_head = TwoMLPHead(
        in_channels=backbone.out_channels * roi_pooler.output_size[0] ** 2,
        representation_size=representation_size
    )

    # Box predictor.
    box_predictor = FastRCNNPredictor(representation_size, num_classes)

    # Final Faster RCNN model.
    model = FasterRCNN(
        backbone=backbone,
        num_classes=None, # Num classes shoule be None when `box_predictor` is provided.
        rpn_anchor_generator=anchor_generator,
        box_roi_pool=roi_pooler,
        box_head=box_head,
        box_predictor=box_predictor
    )
    return model

In [7]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model = create_model(num_classes=11, pretrained=True, coco_model=False)
model.to(device)

optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-4)
num_epochs = 2
batchsize = 50
MODEL_PATH = 'object_detection_rcnn.pth'
"""
checkpoint = torch.load(MODEL_PATH)
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optim_state_dict'])
"""

train_dataset = CustomDataset(train_image_path, train_label_path, train_bboxes_path, train=True)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batchsize, shuffle=True)

valid_dataset = CustomDataset(valid_image_path, valid_label_path, valid_bboxes_path, train=False)
valid_loader = torch.utils.data.DataLoader(valid_dataset, batch_size=batchsize, shuffle=False)

for eprochs in range(num_epochs):
    model.train()
    epoch_loss = 0
    train_loader = tqdm(train_loader)
    for batch_idx, data in enumerate(train_loader):
        # print(batch_idx)
        images = []
        targets = []
        # print(data)
        for i in range(batchsize):

            images.append(data["image"][i].to(device))
            target = {}
            target['boxes'] = data['boxes'][i].to(device)
            target['labels'] = data['labels'][i].to(device)
            targets.append(target)
            # print(data['labels'][i])

        loss_dict = model(images, targets)
        loss = sum(loss for loss in loss_dict.values())
        epoch_loss += loss.cpu().detach().numpy()
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    print(f"epoch {eprochs} loss {epoch_loss}")
    torch.save({'model_state_dict': model.state_dict(),
        'optim_state_dict': optimizer.state_dict()}, MODEL_PATH)

  nn.init.kaiming_normal(m.weight, mode='fan_in',
100%|██████████| 1100/1100 [27:05<00:00,  1.48s/it]


epoch 0 loss 973.9763357937336


100%|██████████| 1100/1100 [26:30<00:00,  1.45s/it]


epoch 1 loss 508.4716504216194


In [None]:
# testing model
dataiter = enumerate(valid_loader)
batch_idx, (var) = next(dataiter)

print(f"images shape {var['image'].shape}")
print(f"labels shape {var['labels'].shape}")
print(f"bboxes shape {var['boxes'].shape}")

plt.imshow(var['image'][0].permute(1,2,0).data)
plt.show()

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model = create_model(num_classes=81, pretrained=True, coco_model=False)
model.eval()
MODEL_PATH = 'object_detection_rcnn.pth'
checkpoint = torch.load(MODEL_PATH)
model.load_state_dict(checkpoint['model_state_dict'])

image = [var['image'][0]]
# print(f"images shape {image.shape}")
output = model(image)
print(output["boxes"])
# plt.imshow(img_array, cmap='gray')
# plt.show()