In [1]:
####

In [None]:
! pip install selective_search

In [3]:
import torch
from torch import nn
import selective_search
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import os
from torchvision import transforms
from torchvision.utils import make_grid
import torchvision.transforms.functional as F
from tqdm.notebook import tqdm

In [4]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [5]:
def crop(image, new_shape):
    middle_height = image.shape[2] // 2
    middle_width = image.shape[3] // 2
    starting_height = middle_height - new_shape[2] // 2
    final_height = starting_height + new_shape[2]
    starting_width = middle_width - new_shape[3] // 2
    final_width = starting_width + new_shape[3]
    cropped_image = image[:, :, starting_height:final_height, starting_width:final_width]
    return cropped_image

In [6]:
def show_tensor_images(image_tensor, num_images=2, size=(3 , 512 , 512)):
  image_shifted = image_tensor
  image_unflat = image_shifted.detach().cpu().view(-1, *size)
  image_grid = make_grid(image_unflat[:num_images], nrow=5)
  plt.imshow(image_grid.permute(1, 2, 0).squeeze())
  plt.show()

In [7]:
def intersection_over_union(boxes_preds, boxes_labels, box_format="midpoint"):

    if box_format == "midpoint":
        box1_x1 = boxes_preds[..., 0:1] - boxes_preds[..., 2:3] / 2
        box1_y1 = boxes_preds[..., 1:2] - boxes_preds[..., 3:4] / 2
        box1_x2 = boxes_preds[..., 0:1] + boxes_preds[..., 2:3] / 2
        box1_y2 = boxes_preds[..., 1:2] + boxes_preds[..., 3:4] / 2
        box2_x1 = boxes_labels[..., 0:1] - boxes_labels[..., 2:3] / 2
        box2_y1 = boxes_labels[..., 1:2] - boxes_labels[..., 3:4] / 2
        box2_x2 = boxes_labels[..., 0:1] + boxes_labels[..., 2:3] / 2
        box2_y2 = boxes_labels[..., 1:2] + boxes_labels[..., 3:4] / 2

    if box_format == "corners":
        box1_x1 = boxes_preds[..., 0:1]
        box1_y1 = boxes_preds[..., 1:2]
        box1_x2 = boxes_preds[..., 2:3]
        box1_y2 = boxes_preds[..., 3:4]
        box2_x1 = boxes_labels[..., 0:1]
        box2_y1 = boxes_labels[..., 1:2]
        box2_x2 = boxes_labels[..., 2:3]
        box2_y2 = boxes_labels[..., 3:4]

    x1 = torch.max(box1_x1, box2_x1)
    y1 = torch.max(box1_y1, box2_y1)
    x2 = torch.min(box1_x2, box2_x2)
    y2 = torch.min(box1_y2, box2_y2)

    intersection = (x2 - x1).clamp(0) * (y2 - y1).clamp(0)
    box1_area = abs((box1_x2 - box1_x1) * (box1_y2 - box1_y1))
    box2_area = abs((box2_x2 - box2_x1) * (box2_y2 - box2_y1))

    return intersection / (box1_area + box2_area - intersection + 1e-6)

In [8]:
class Conv(nn.Module):
    def __init__(self ,
                 in_channels , 
                 out_channels , 
                 kernel_size = (3 , 3) , 
                 stride = (1 , 1) , 
                 padding = 1 , 
                 use_norm = True , 
                 use_activation = True , 
                 use_pool = False):
        super(Conv , self).__init__()

        self.conv1 = nn.Conv2d(in_channels ,
                               out_channels ,
                               kernel_size , 
                               stride , 
                               padding)
        self.use_norm = use_norm
        self.use_activation = use_activation
        self.use_pool = use_pool

        if self.use_norm:
            self.norm = nn.BatchNorm2d(out_channels)
        if self.use_activation:
            self.activation = nn.ReLU()
        if self.use_pool:
            self.maxpool = nn.MaxPool2d(kernel_size = (2 , 2) , stride = (2 , 2))
    
    def forward(self , x):
        x = self.conv1(x)
        if self.use_norm:
            x = self.norm(x)
        if self.use_activation:
            x = self.activation(x)
        if self.use_pool:
            x = self.maxpool(x)
        return x

In [9]:
class Linear(nn.Module):
    def __init__(self ,  
                 in_channels , 
                 out_channels , 
                 use_norm = True , 
                 use_activation = True):
        super(Linear , self).__init__()

        self.linear1 = nn.Linear(in_channels , 
                                 out_channels)
        self.use_norm = use_norm
        self.use_activation = use_activation

        if self.use_norm:
            self.norm = nn.BatchNorm1d(out_channels)
        if self.use_activation:
            self.activation = nn.ReLU()

    def forward(self , x):
        x = self.linear1(x)
        if self.use_norm:
            x = self.norm(x)
        if self.use_activation:
            x = self.activation(x)
        return x

In [10]:
config = [
          # [out_channels , kernel_size , stride , paddin]
          [64 , 3 , 1 , 1] , 
          [128 , 3 , 1 , 1] , 
          "M" , 
          [128 , 3 , 1 , 1] , 
          [256 , 3 , 1 , 1] , 
          "M" , 
          [256 , 3 , 1 , 1] , 
          [512 , 3 , 1 , 1] , 
          "M" , 
          [512 , 3 , 1 , 1] ,
          [512 , 3 , 1 , 1] ,
          [512 , 3 , 1 , 1] ,
          "M" , 
          [512 , 3 , 1 , 1] ,
          [512 , 3 , 1 , 1] ,
          [512 , 3 , 1 , 1] ,
          "M" , 
          4096 
]

In [11]:
class VGG(nn.Module):
    def __init__(self , 
                 in_channels = 3 , 
                 config = config):
        super(VGG , self).__init__()

        self.layers = nn.ModuleList()

        for layer in config:
            if isinstance(layer , list):
                out_channels , kernel_size , stride , padding = layer
                self.layers.append(Conv(
                    in_channels , 
                    out_channels , 
                    kernel_size , 
                    stride , 
                    padding
                ))
                in_channels = out_channels
            elif isinstance(layer , str):
                self.layers.append(nn.MaxPool2d(kernel_size = (2 , 2) , stride = (2 , 2)))
            else:
                if layer == 4096:
                    self.layers.append(nn.Flatten())
                    self.layers.append(Linear(25088 , 4096))
                elif layer == 1000:
                    self.layers.append(Linear(4096 , 1000 , use_activation = False))
                    self.layers.append(nn.Softmax())
    def forward(self , x):
        for layer in self.layers:
            x = layer(x)
        return x

In [12]:
x = torch.randn(2 , 3 , 244 , 244).to(device)
vgg = VGG().to(device)
z = vgg(x)
z.shape

  return torch.max_pool2d(input, kernel_size, stride, padding, dilation, ceil_mode)


torch.Size([2, 4096])

In [15]:
class Dataset_(torch.utils.data.Dataset):
    def __init__(self , 
                 csv_file , 
                 img_dir ,
                 label_dir , 
                 transform = None):
        super(Dataset_ , self).__init__()

        self.csv_file = csv_file
        self.img_dir = img_dir
        self.label_dir = label_dir
        self.transform = transform
        self.df = pd.read_csv(self.csv_file)

    def __len__(self):
        return len(self.df)

    def __getitem__(self , idx):
        label_path = os.path.join(self.label_dir , self.df.iloc[idx , 1])
        boxes = []

        with open(label_path) as f:
            for label in f.readlines():
                class_label , x , y , width , height = [
                    float(x) if float(x) != int(float(x)) else int(x)
                    for x in label.replace("\n", "").split()
                ]
                boxes.append([ x , y , width , height , class_label])

        target_boxes = torch.tensor(boxes) 

        img_path = os.path.join(self.img_dir , self.df.iloc[idx , 0])
        img = np.asarray(plt.imread(img_path))
        img = torch.from_numpy(img).permute(2 , 0 , 1)

        if self.transform:
            img = self.transform(img)
        
        regions = self._get_regions(img)
        regions = torch.tensor(regions)
        
        stacked_croped_images = []
        for region in regions:
            x , y , w , h = region
            img_ = F.crop(img , x , y , w , h)
            img_ = F.resize(img_ , (244 , 244))
            stacked_croped_images.append(torch.tensor(img_))
        return img , target_boxes , regions , stacked_croped_images

    def _get_regions(self , image , topN = 2):
        image = image.permute(1 , 2 , 0)
        boxes = selective_search.selective_search(image, mode='fast')
        boxes_filter = selective_search.box_filter(boxes, min_size=20, topN=topN)
        return boxes_filter
    

In [16]:
transform = transforms.Compose([
                                transforms.ToPILImage() , 
                                transforms.Resize((512 , 512)) , 
                                transforms.ToTensor()
])
dataset = Dataset_(
    csv_file = "/content/drive/MyDrive/Yolo_Dataset/train.csv" , 
    img_dir = "/content/drive/MyDrive/Yolo_Dataset/images/" , 
    label_dir = "/content/drive/MyDrive/Yolo_Dataset/labels/" , 
    transform = transform
)
dataloader = torch.utils.data.DataLoader(dataset , batch_size=1 , shuffle=True)

In [None]:
for img , trg , regions , imgs in dataloader:
    show_tensor_images(img)
    print(trg.shape)
    print(regions.shape)
    break

In [18]:
class RCNN(nn.Module):
    def __init__(self , 
                 in_channels = 3 , 
                 out_channels = 25):
        super(RCNN , self).__init__()

        self.backbone = VGG()
        self.linear1 = nn.Linear(4096 , out_channels)
        self.softmax = nn.Softmax()

    def forward(self , x):
        x = self.backbone(x)
        x = self.softmax(self.linear1(x))
        return x


In [None]:
x = torch.randn(2 , 3 , 244 , 244).to(device)
rcnn = RCNN().to(device)
z = rcnn(x)
z.shape

In [43]:
BCE_criterion = nn.BCEWithLogitsLoss()
L1_criterion = nn.L1Loss()
lambda_recon = 200
betas = (0.5 , 0.999)


n_epochs = 200
in_channels = 3
out_channels = 3
display_steps = 1
lr = 0.0002
target_shape = 512

In [44]:
model = RCNN().to(device)
opt = torch.optim.Adam(model.parameters() , lr=lr , betas=betas)

In [45]:
def train():
    mean_rcnn_loss = 0
    cur_step = 0
    for epoch in range(n_epochs):
        for img , target_boxes , regions , croped_imgs in tqdm(dataloader):
            cur_batch_size = img.shape[0]
            opt.zero_grad()
            croped_imgs = torch.stack(croped_imgs)
            print(croped_imgs.shape)
            print(img.shape)
            print(target_boxes.shape)
            print(regions.shape)

            croped_imgs = croped_imgs.to(device)
            img = img.to(device)
            target_boxes = target_boxes.to(device)
            regions = regions.to(device)

            iou_boxes = []
            box_len = target_boxes.shape[1]
            iou_threshold = 0.5
            iou_threshold = torch.tensor([iou_threshold for _ in range(regions.shape[1])]).unsqueeze(0).unsqueeze(2)
            for l in range(box_len):
                iou = intersection_over_union(regions , target_boxes[: , l , :4].unsqueeze(1))
                print(iou.shape)
                ious = iou < iou_threshold
                ious = ious.int()
                iou_boxes.append(ious)
            iou_boxes = torch.stack(iou_boxes)
            
            model_pred = model(croped_imgs.squeeze(1)) # [20 , 25]

            obj_loss = L1_criterion(model_pred[... , 24:25] , iou_boxes.squeeze(0).squeeze(0))
            co_loss = 0
            for l in range(model_pred.shape[0]):
                co_loss += L1_criterion(model_pred[l , 20:24] , target_boxes[... , 0:4].squeeze(0))

            loss = (obj_loss + co_loss)/2
            loss.backward()
            opt.step()

            mean_rcnn_loss += loss.item() / display_steps
            if cur_step % display_steps == 0:
                print(f'Epoch {epoch} , Step {cur_step} , Mean RCNN Loss {mean_rcnn_loss}')
            cur_step +=1
        mean_rcnn_loss = 0


In [None]:
train()