In [3]:
from torchvision.datasets import VOCDetection

In [17]:
train_dataset = VOCDetection(root="./data",
                             year="2007",
                             download=True,
                             image_set="trainval",
                             transform=None)
test_dataset = VOCDetection(root="./data",
                             year="2007",
                             download=True,
                             image_set="test",
                             transform=None)           

Using downloaded and verified file: ./data/VOCtrainval_06-Nov-2007.tar
Extracting ./data/VOCtrainval_06-Nov-2007.tar to ./data
Using downloaded and verified file: ./data/VOCtest_06-Nov-2007.tar
Extracting ./data/VOCtest_06-Nov-2007.tar to ./data


In [18]:
img, target = train_dataset[1]

In [20]:
target

{'annotation': {'folder': 'VOC2007',
  'filename': '000007.jpg',
  'source': {'database': 'The VOC2007 Database',
   'annotation': 'PASCAL VOC2007',
   'image': 'flickr',
   'flickrid': '194179466'},
  'owner': {'flickrid': 'monsieurrompu', 'name': 'Thom Zemanek'},
  'size': {'width': '500', 'height': '333', 'depth': '3'},
  'segmented': '0',
  'object': [{'name': 'car',
    'pose': 'Unspecified',
    'truncated': '1',
    'difficult': '0',
    'bndbox': {'xmin': '141', 'ymin': '50', 'xmax': '500', 'ymax': '330'}}]}}

In [23]:
len(test_dataset)

4952

In [None]:
import numpy as np
import torch
from torch import nn
from torch.utils.data import Dataset
import torch.optim as optim
import torchvision
from torchvision import models
import matplotlib.pyplot as plt
#import selective_search

import cv2

import xml.etree.ElementTree as ET
from PIL import Image
import os

In [None]:
import sys

def ss_config(ss, img, mode):
    ss.setBaseImage(img)

    if mode == 's':
        ss.switchToSingleStrategy()
    elif mode == 'f':
        ss.switchToSelectiveSearchFast()
    elif mode == 'q':
        ss.switchToSelectiveSearchQuality()
    else:
        print("Re-enter the mode. s or f or q")
        sys.exit(1)


def selective_search(img, mode='q'):
    # Initiate Selective Search
    ss = cv2.ximgproc.segmentation.createSelectiveSearchSegmentation()
    # Configure the mode and image
    ss_config(ss, img, mode)

    # Process Selective-Search
    bboxes = ss.process() # bboxes: listof [x, y, w, h]
    bboxes[:, 2] += bboxes[:, 0] # bboxes -> listof [x, y, x + w, h]
    bboxes[:, 3] += bboxes[:, 1] # bboxes -> listof [x, y, x + w, h + y]

    return bboxes

In [None]:
def parse_xml_boxes(root):
    bboxes=[]
    for obj in root.findall('object'):
        cls_name = obj.find('name').text
        cls_idx = VOC_CLASSES.index(cls_name)
        xmin = int(obj.find('bndbox').find('xmin').text)
        ymin = int(obj.find('bndbox').find('ymin').text)
        xmax = int(obj.find('bndbox').find('xmax').text)
        ymax = int(obj.find('bndbox').find('ymax').text)
        bboxes.append([xmin, ymin, ymin, ymax, cls_idx]) 
    return bboxes

In [None]:
def compute_iou(boxA, boxB):
    # Compute the Intersection over Union (IoU) between two bounding boxes
    xA = max(boxA[0], boxB[0])  # max of xmin
    yA = max(boxA[1], boxB[1])  # max of ymin
    xB = min(boxA[2], boxB[2])  # min of xmax
    yB = min(boxA[3], boxB[3])  # min of ymax

    # Compute the area of the intersection rectangle
    interWidth = max(0, xB - xA + 1)
    interHeight = max(0, yB - yA + 1)
    interArea = interWidth * interHeight

    # Compute the area of both bounding boxes
    boxAArea = (boxA[2] - boxA[0] + 1) * (boxA[3] - boxA[1] + 1)
    boxBArea = (boxB[2] - boxB[0] + 1) * (boxB[3] - boxB[1] + 1)

    # Compute the Intersection over Union by dividing the intersection area by the sum of both areas minus the intersection
    iou = interArea / float(boxAArea + boxBArea - interArea) if interArea > 0 else 0.0

    return iou

In [None]:
VOC_CLASSES = [
    'aeroplane', 'bicycle', 'bird', 'boat', 'bottle', 'bus', 'car', 
    'cat', 'chair', 'cow', 'diningtable', 'dog', 'horse', 'motorbike', 
    'person', 'pottedplant', 'sheep', 'sofa', 'train', 'tvmonitor'
]

In [None]:
class CustomDataset(Dataset):
    def __init__(self, root, img_set='trainval', transform=None):
        self.root = root
        self.transform = transform
        self.img_set = img_set
        
        self.annotation_path = os.path.join(self.root, f'VOC{self.img_set}_06-Nov-2007', 'VOCdevkit', 'VOC2007', 'Annotations')
        self.img_path = os.path.join(self.root, f'VOC{self.img_set}_06-Nov-2007', 'VOCdevkit', 'VOC2007', 'JPEGImages')
        self.annotations = [os.path.join(self.annotation_path, xml) for xml in sorted(os.listdir(self.annotation_path)) if not xml.startswith('.')]
        self.images = [os.path.join(self.img_path, xml) for xml in sorted(os.listdir(self.img_path)) if not xml.startswith('.')] 
        
    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, idx):
        # Load image
        image = cv2.imread(self.images[idx])
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        # Load annotation
        tree = ET.parse(self.annotations[idx])
        gt_bboxes_labels = parse_xml_boxes(tree.getroot())
        gt_bboxes_labels = np.array(gt_bboxes_labels)
        bboxes, labels= gt_bboxes_labels[:, :4], gt_bboxes_labels[:, -1]
        # Apply transformation if any
        if self.transform:
            image = self.transform(image)
        # Convert data to tensors
        #image = torch.tensor(image).permute(2, 0, 1).float()
        bboxes = torch.tensor(bboxes).float()
        labels = torch.tensor(labels).long()
        
        return image, bboxes, labels

In [None]:
from torchvision.ops import roi_pool

vgg = models.vgg16_bn()
vgg.load_state_dict(torch.load('/kaggle/input/vgg16/pytorch/default/1/vgg16_bn.pth'))

class FastRCNN(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        # Load pre-trained VGG16
        # vgg = models.vgg16(weights="IMAGENET1K_V1")
        # Convolutional layers
        self.features = vgg.features
        # ROI Pooling
        self.roi = roi_pool
        # Classifier
        self.classifier = nn.Sequential(
            nn.Linear(512 * 7 * 7, 4096),
            nn.ReLU(True),
            nn.Dropout(),
            nn.Linear(4096, 4096),
            nn.ReLU(True),
            nn.Dropout(),
            # nn.Linear(4096, num_classes),
        )
        self.softmax = nn.Linear(4096, num_classes + 1)
        self.bbox = nn.Linear(4096, num_classes * 4)
        
    def forward(self, images, ROIs):
        # Compute feature maps
        feature_maps = self.features(images)
        # Apply ROI pooling
        pooled_features = self.roi(feature_maps, ROIs, output_size=(7, 7), spatial_scale=1.0/16.0)
        # Flatten pooled features
        x = pooled_features.view(pooled_features.size(0), -1) # .size(0) = Batch Size(N)
        # Classifier
        x = self.classifier(x)
        
        # Outputs
        cls_score = self.cls_score(x) # (N, num_classes)
        bbox_pred = self.bbox(x) # (N, num_classes * 4)
        
        return cls_score, bbox_pred

In [None]:
import torch.nn.functional as F

def MultiTaskLoss(nn.Module):
    def __init__(self, lambda_reg=1):
        super().__init()__()
        self.lambda_reg = lambda_reg
        self.cls_loss = F.cross_entropy()
    
    def forward(self, cls_preds, bbox_preds, labels, bbox_targets):
        # Cross entropy loss for classification
        cls = self.cls_lss(cls_preds, labels)
        # Bounding box loss
        

In [None]:
from torch.utils.data import DataLoader

root_dir = '/kaggle/input/pascal-voc-2007'
# Create dataset and dataloader
train_dataset = CustomDataset(root_dir, img_set="trainval", transform=None)
test_dataset = CustomDataset(root_dir, img_set="test", transform=None)
train_dataloader = DataLoader(train_dataset, batch_size=1, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=1, shuffle=True)

In [None]:
train_dataset[0]

In [None]:
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
fastrcnn = FastRCNN(20).to(DEVICE)

In [None]:
import random

def sample_proposals(proposals, gt_boxes, gt_labels):
    threshold_up = 0.5
    threshold_low = 0.1
    POS_NUM = 16
    NEG_NUM = 48
    positive_proposals, positive_labels, negative_proposals = [], [], []
    
    for proposal in proposals:
        best_iou = 0
        best_label = None
        
        for gt_box, gt_label in zip(gt_boxes, gt_labels):
            iou = compute_iou(proposal, gt_box)
            if iou > max_iou:
                best_iou = iou
                best_label = label
        # If the proposal has IOU > 0.5
        if best_iou > threshold_up:
            positive_proposals.append(proposal)
            positivie_labels.append(best_label)
        # Else if the proposal has 0.1 < IOU <= 0.5
        elif best_iou > threshold_low:
            negative_proposals.append(proposal)
            # Doesn't need to keep track of label, because background label is fixed
        else:
            pass
        
    # Choose ~16 positives and 48 negatives (More negatives if postives is less than 16)
    if len(positive_proposals) < POS_NUM:
        POS_NUM = len(positive_proposals)
        NEG_NUM = (64 - POS_NUM)
    
    pos_indices = random.sample(range(len(positive_proposals)), POS_NUM)
    neg_indices = random.sample(range(len(negative_proposals)), NEG_NUM)
    # Sampling takes place (Replacing the object and all its references if any)
    positive_proposals[:] = [positive_proposals[i] for i in pos_indices]
    positive_labels[:] = [positive_labels[i] for i in pos_indices]
    negative_proposals[:] = [negative_proposals[i] for i in neg_indicies]
    
    total_proposals = torch.stack(positive_proposals, negative_proposals)
    total_labels = positive.labels.extend([20] * NEG_NUM) # Background class label as 20
    total_labels = torch.tensor(total_labels, dtype=torch.int64)
    return total_proposals, total_labels
    

In [None]:
num_epochs = 10
def train(model, dataloader, num_epochs, loss_fn, optimizer):
    model.train()
    train_loss, correct, train_acc = 0, 0, 0
    total_proposals = 0
    
    for epoch in range(num_epochs):
        for image, boxes, labels in dataloader:
            image = image.to(DEVICE)
            boxes = boxes.to(DEVICE)
            labels = labels.to(DEVICE)
            
            # Generate ROIs and choose 64 proposals(16 positive + 48 negative)
            ROIs = selective_search(image)
            ROIs, ROI_labels = sample_proposals(ROIs)
            ROIs = ROIs.to(DEVICE)
            
            # Forward pass
            cls_scores, bbox_preds = model(image, ROIs)
            
            # Compute loss
            loss = loss_fn(cls_scores, bbox_preds, labels, bbox_targets)
            train_loss += loss.item()
            
            # Backward pass and optimization
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            pred = torch.argmax(cls_scores, dim=1)
            correct += pred.eq(labels.view_as(pred)).sum().item()
            total_proposals += 64
    
    train_loss /= len(dataloader)
    train_acc = 100. * correct / total_proposals
    return train_loss, train_acc

In [None]:
import time
import copy

def fine_tune(model: torch.nn.Module, 
              train_dataloader: torch.utils.data.DataLoader, 
              optimizer: torch.optim.Optimizer,
              loss_fn: torch.nn.Module=nn.CrossEntropyLoss(),
              num_epochs: int=10):
    best_acc = 0.0
    best_model_wts = copy.deepcopy(model.state_dict())
    
    for epoch in range(1, num_epochs + 1):
        start_time = time.time()
        # Train the model and print save the results
        train_loss, train_acc = train(model=model,
                                      dataloader=train_dataloader, 
                                      optimizer=optimizer,
                                      loss_fn=loss_fn)
        
        if train_acc > best_acc:
            best_acc = train_acc
            best_model_wts = copy.deepcopy(model.state_dict())
            
        end_time = time.time()
        time_elapsed = end_time - start_time
        print(f"------------ epoch {epoch} ------------")
        print(f"Train loss: {train_loss:.4f} | Train acc: {train_acc:.2f}%")
        print(f"Time taken: {time_elapsed / 60:.0f}min {time_elapsed % 60:.0f}s")
        
    model.load_state_dict(best_model_wts)
    return model