In [None]:
from google.colab import drive

drive.mount('/content/gdrive/', force_remount=True)
!unzip /content/gdrive/MyDrive/Dl/archive.zip

# Importing Libraries

In [None]:
# Imports 
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import transforms
import torch.optim as optim
import torchvision.transforms.functional as FT

import os
from PIL import Image
import pandas as pd

from collections import Counter

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")


# Architecture

In [None]:
cfg = [
    # (Filters, size, stride, padding)
    (64,7,2,3),
    'm',
    (192,3,1,1),
    'm',
    (128,1,1,0),
    (256,3,1,1),
    (256,1,1,0),
    (512,3,1,1),
    'm',
    [4, (256,1,1,0), (512,3,1,1)],
    (512,1,1,0),
    (1024,3,1,1),
    'm',
    [2,(512,1,1,0),(1024,3,1,1)],
    (1024,3,1,1),
    (1024,3,2,1),
    (1024,3,1,1),
    (1024,3,1,1)
]

In [None]:
# Creating each CNN block

class CNNBlock(nn.Module):
  def __init__(self, in_channels, out_channels, kernel, stride, padding):
    super().__init__()
    self.conv = nn.Conv2d(in_channels, out_channels, kernel_size = kernel, stride = stride, padding = padding, bias=False)
    self.bn = nn.BatchNorm2d(out_channels)
    self.leaky_relu = nn.LeakyReLU(0.1)
  def forward(self, x):
    return self.leaky_relu(self.bn(self.conv(x)))


In [None]:
class YOLO_V1(nn.Module):
  def __init__(self, in_channels = 3, **kwargs):
    super().__init__()
    self._in_channels = in_channels
    self._config = cfg
    self._darknet = self._create_darknet(self._config)
    self._fcs = self._create_fcs(**kwargs)
  
  def forward(self,x):
    x = self._darknet(x)
    x = self._fcs(x)
    return x

  # Darknet is the name of the CNN architecture
  def _create_darknet(self,config):
    _layers = []
    in_channels = self._in_channels
    for item in config:
      if type(item) == tuple:
        _layers += [
            CNNBlock(in_channels, item[0],item[1], item[2], item[3])
        ]
        in_channels = item[0]
      elif type(item) == str:
        _layers += [nn.MaxPool2d(kernel_size=2,stride=2)]
      elif type(item) == list:
        conv_1 = item[1]
        conv_2 = item[2]
        for i in range(item[0]):
          _layers += [CNNBlock(in_channels, conv_1[0], conv_1[1], conv_1[2], conv_1[3])]
          _layers += [CNNBlock(conv_1[0], conv_2[0], conv_2[1], conv_2[2], conv_2[3])]
    return nn.Sequential(*_layers)
  def _create_fcs(self, split_size, bb_no, no_classes):
    self.split_size = split_size
    self.bb_no = bb_no
    self.no_classes = no_classes
    return nn.Sequential(
        nn.Flatten(),
        nn.Linear(self.split_size*split_size*1024, 4096),
        nn.Linear(4096, self.split_size*split_size*(self.bb_no*5 + self.no_classes))
    )

In [None]:
def test(S = 7, num_boxes = 2, no_classes = 20 ):
  model = YOLO_V1(split_size = S, bb_no = num_boxes, no_classes=no_classes)
  x = torch.rand(1,3,448,448)
  print(model(x).shape)

test()


torch.Size([1, 1470])


# Utility functions

## Analytic functions

In [None]:
def intersection_over_union(bb_1, bb_2):
  """
  Get two bounding boxes and calculate their IOU
  i/p format : (batch_Size, S, S, box_cordinates[5])
  box_cordinates : (c_x,c_y,w,h) 
  """
  bb_1_x_left = (bb_1[...,0:1] - bb_1[...,2:3])/2
  bb_1_y_top = (bb_1[...,1:2] - bb_1[...,3:4])/2
  bb_1_x_right = (bb_1[...,0:1] + bb_1[...,2:3])/2
  bb_1_y_bottom = (bb_1[...,1:2] + bb_1[...,3:4])/2
  # print(bb_1_x_left, bb_1_x_right)
  # print(bb_1_y_top, bb_1_y_bottom)
  
  bb_2_x_left = (bb_2[...,0:1] - bb_2[...,2:3])/2
  bb_2_y_top =  (bb_2[...,1:2] - bb_2[...,3:4])/2
  bb_2_x_right = (bb_2[...,0:1] + bb_2[...,2:3])/2
  bb_2_y_bottom = (bb_2[...,1:2] + bb_2[...,3:4])/2

  x_left = torch.max(bb_1_x_left, bb_2_x_left)
  y_top = torch.max(bb_1_y_top, bb_2_y_top)

  # print(x_left, y_top)

  x_right = torch.min(bb_1_x_right, bb_2_x_right)
  y_bottom = torch.min(bb_1_y_bottom, bb_2_y_bottom)

  # print(x_right, y_bottom)

  inter_width = x_right - x_left
  inter_height = y_bottom - y_top

  # print(inter_width, inter_height)

  inter_area = inter_width * inter_height

  total_area = (bb_1[...,2:3]* bb_1[...,3:4]) + (bb_2[...,2:3]* bb_2[...,3:4])

  union = total_area- inter_area

  iou = inter_area/ union

  return iou

In [None]:
intersection_over_union(torch.tensor([0,0,2,3]), torch.tensor([0,0,2,3]))

tensor([1.])

In [None]:
def mean_average_precision(preds, labels, iou_threshold= 0.5, NUM_CLASSES= 20):
    '''
    Calculates the mean average precision scores
    pred =[[idx,class,obj,x,y,w,h],...]
    label =[[idx,class,oj,x,y,w,h],...]
    '''
    avg_prec = []
    # For each class 
    for c in range(NUM_CLASSES):
        # variables to store the predictions and true labels for the class
        predictions = []
        true_label = []
        # print(predictions, true_label)

        # Filter out predicitons and true lables for the class
        predictions = [pred for pred in preds if pred[1] == c]
        true_label = [label for label in labels if label[1] == c]

        if len(true_label) == 0:
            continue

        # Number of ground truths per image
        boxes_per_image = Counter([bbox[0] for bbox in true_label])
        # print(boxes_per_image)
        for key, val in boxes_per_image.items():
            boxes_per_image[key] = torch.zeros(val)
        

        # variables for calucating precision and recall
        true_pos = torch.tensor([0]*len(predictions))
        false_pos = torch.tensor([0]*len(predictions))

        # Sort in order of highest objectness score 
        # predictions.sort(key=lambda x:x[2], reverse= True)
        for id_pred, pred in enumerate(predictions):
            image_boxes = [bbox for bbox in true_label if bbox[0] == pred[0]]
            best_iou = 0
            for gt_idx, gt in enumerate(image_boxes):
                iou = intersection_over_union(
                    torch.tensor(pred[3:]),
                    torch.tensor(gt[3:])
                    )
                # print(iou)
                if best_iou < iou:
                    best_iou = iou 
                    best_idx = id_pred
                    true_box = gt[0]
                    true_box_id = gt_idx

            if best_iou > iou_threshold:
                if boxes_per_image[true_box][true_box_id] == 0:
                    true_pos[best_idx] = 1
                    boxes_per_image[true_box][true_box_id] = 1
                else:
                    false_pos[id_pred] = 1
            else:
                false_pos[id_pred] = 1
        
        tp_cumsum = torch.cumsum(true_pos, dim = 0)
        fp_cumsum = torch.cumsum(false_pos, dim = 0)

        # print(true_pos, false_pos)
        # print(tp_cumsum, fp_cumsum)

        precision = tp_cumsum/(tp_cumsum+fp_cumsum + 1e-6)
        recall = tp_cumsum/(len(true_label)+1e-6)

        precision = torch.cat((torch.tensor([1]), precision))
        recall = torch.cat((torch.tensor([0]), recall))

        # print(precision, recall)
        # Using trapezoidal sum approximation
        avg_prec.append(torch.trapz(precision,recall))
        # print(avg_prec)

    
    return sum(avg_prec)/len(avg_prec)

In [None]:
def non_max_suppression(predictions, iou_threshold = 0.5):
    '''
    Algorithm to suppress multiple boxes for the same detection
    '''
    best_boxes = []
    predictions = sorted(predictions, key = lambda x:x[1], reverse=True)
    print(predictions)
    assert type(predictions) == list

    while(predictions):
        best_box = predictions.pop(0)
        predictions = [box for box in predictions if intersection_over_union(torch.tensor(box[2:]) , torch.tensor(best_box[2:])) < iou_threshold or best_box[0] != box[0]]

        best_boxes.append(best_box)
    
    return best_boxes

## Convenience Functions

In [None]:
def cell_to_image(pred_boxes, S=7):
    '''
    Converts the predictions from cell cordinates to Image cordinates
    '''

    batch_size = pred_boxes.shape[0]
    pred_boxes = pred_boxes.reshape(-1, S, S, 30)
    boxes_1 = pred_boxes[...,21:25]
    boxes_2 = pred_boxes[..., 26:30]
    scores_1 = pred_boxes[...,20:21]
    scores_2 = pred_boxes[...,25:26]
    best_confidence = torch.max(scores_1, scores_2)
    best_score = torch.cat((scores_1, scores_2), dim = -1).argmax(-1).unsqueeze(-1)
    # print(best_score.shape, boxes_1.shape)
    # print(((best_score) * boxes_2).shape)
    best_class = pred_boxes[...,:20].argmax(-1).unsqueeze(-1)
    # print(best_class.shape)

    best_box = (1-best_score)*boxes_1 + (best_score * boxes_2)
    # print(best_box.shape)
    for batch in range(batch_size):
        for i in range(S):
            for j in range(S):
                x = (best_box[...,:1]+j)*1/S
                y = (best_box[...,1:2]+i)*1/S
                w_h = (best_box[...,2:4])*1/S
                bb_concat = torch.cat((x,y,w_h),-1)
    corrected_box = torch.cat((best_class, best_confidence, bb_concat), dim = -1)
    return corrected_box

In [None]:
def get_bboxes(
    loader,
    model,
    iou_threshold,
    threshold,
    pred_format="cells",
    box_format="midpoint",
    device="cuda",
):
    all_pred_boxes = []
    all_true_boxes = []

    # make sure model is in eval before get bboxes
    model.eval()
    train_idx = 0

    for batch_idx, (x, labels) in enumerate(loader):
        x = x.to(device)
        labels = labels.to(device)

        with torch.no_grad():
            predictions = model(x)

        batch_size = x.shape[0]
        true_bboxes = cellboxes_to_boxes(labels)
        bboxes = cellboxes_to_boxes(predictions)

        for idx in range(batch_size):
            nms_boxes = non_max_suppression(
                bboxes[idx],
                iou_threshold=iou_threshold,
            )

            for nms_box in nms_boxes:
                all_pred_boxes.append([train_idx] + nms_box)

            for box in true_bboxes[idx]:
                # many will get converted to 0 pred
                if box[1] > threshold:
                    all_true_boxes.append([train_idx] + box)

            train_idx += 1

    model.train()
    return all_pred_boxes, all_true_boxes


def cellboxes_to_boxes(out, S=7):
    converted_pred = (out).reshape(out.shape[0], S * S, -1)
    converted_pred[..., 0] = converted_pred[..., 0].long()
    all_bboxes = []

    for ex_idx in range(out.shape[0]):
        bboxes = []

        for bbox_idx in range(S * S):
            bboxes.append([x.item() for x in converted_pred[ex_idx, bbox_idx, :]])
        all_bboxes.append(bboxes)

    return all_bboxes

def save_checkpoint(state, filename="my_checkpoint.pth.tar"):
    print("=> Saving checkpoint")
    torch.save(state, filename)


def load_checkpoint(checkpoint, model, optimizer):
    print("=> Loading checkpoint")
    model.load_state_dict(checkpoint["state_dict"])
    optimizer.load_state_dict(checkpoint["optimizer"])

## Plotting Function

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import torch
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as patches

def plot_image(image, boxes):
    """Plots predicted bounding boxes on the image"""
    im = np.array(image)
    height, width, _ = im.shape

    # Create figure and axes
    fig, ax = plt.subplots(1)
    # Display the image
    ax.imshow(im)

    # box[0] is x midpoint, box[2] is width
    # box[1] is y midpoint, box[3] is height

    # Create a Rectangle potch
    for box in boxes:
        box = box[2:]
        assert len(box) == 4, "Got more values than in x, y, w, h, in a box!"
        upper_left_x = box[0] - box[2] / 2
        upper_left_y = box[1] - box[3] / 2
        rect = patches.Rectangle(
            (upper_left_x * width, upper_left_y * height),
            box[2] * width,
            box[3] * height,
            linewidth=1,
            edgecolor="r",
            facecolor="none",
        )
        # Add the patch to the Axes
        ax.add_patch(rect)

    plt.show()

# YOLO Loss function

In [None]:
class YOLO_LOSS(nn.Module):
  """
  The yolo loss function :
  Hyperparameter : Lambda coord, lambda no obj
  1. A mean squared loss over box cordinates ( x,y,w,h). Takes loss over the best boxes (IOU scores) and ignores boxes without objects
  2. A mean squared loss over objectness score
  3. A mean squared loss over no object (inverse of objectness)
  4. A mean squared loss over the class probabilities
  """
  def __init__(self, S=7, B=2, C=20):
    super().__init__()
    self.S = S
    self.B = B
    self.C = C
    self.mse = nn.MSELoss(reduction='sum')
    self.lambda_noobj = 0.5
    self.lambda_coord = 5
  
  def forward(self, predictions, target):
    predictions = predictions.reshape(-1, self.S, self.S, self.C+ self.B*5)
    # 0:20 : Class scores 
    # 20 : Probability scores
    # 21:25 : Box 1
    # 25 : Probability scores
    # 26:30 : Box 2
    iou_b1 = intersection_over_union(predictions[..., 21:25], target[...,21:25])
    iou_b2 = intersection_over_union(predictions[...,26:30], target[...,21:25])
    ious = torch.cat([iou_b1.unsqueeze(0), iou_b2.unsqueeze(0)], dim = 0)
    iou_maxes, best_box = torch.max(ious, dim=0)  # best box = 0 or 1
    # Target shape :  (batch, num_boxes, 25--> obj, x,y,w,h,20 classes)
    exists_box = target[..., 20].unsqueeze(3) #Iobj_i , 3 Dim tensor

    ###                           ###
    # Box cordinates : ( x, y, w, h)#
    ###                           ###
    # 4 Dim -> 3 Dim
    # print(exists_box.shape, best_box.shape, predictions.shape)
    box_predictions = exists_box * ( best_box * predictions[...,26:30] +
                                    (1 - best_box) * predictions[..., 21:25])
    box_targets = exists_box * target[..., 21:25]
    # for width and height
    box_predictions[..., 2:4] = torch.sign(box_predictions[..., 2:4]) * \
                                            torch.sqrt( torch.abs(box_predictions[...,2:4] + 1e-6))
    box_targets[...,2:4] = torch.sqrt(box_targets[..., 2:4])

    box_loss = self.mse(
        torch.flatten(box_predictions, end_dim=-2),
        torch.flatten(box_targets, end_dim = -2)
    )

    ###              ##
    # For object loss #
    ###              ##
    pred_box = (
        best_box * predictions[...,25:26] + ( 1- best_box) * predictions[..., 20:21]
    )

    object_loss = self.mse(
        torch.flatten(exists_box * pred_box),
        torch.flatten(exists_box * target[..., 20:21])
    )

    # For no object
    no_object_loss = self.mse(
        torch.flatten((1 - exists_box) * predictions[..., 20:21], start_dim =1),
        torch.flatten((1 - exists_box) * target[..., 20:21], start_dim =1)
    )
    no_object_loss += self.mse(
        torch.flatten((1 - exists_box) * predictions[..., 20:21], start_dim =1),
        torch.flatten((1 - exists_box) * target[..., 20:21], start_dim =1)
    )

    ###
    # Class loss
    ###

    class_loss = self.mse(
        torch.flatten( exists_box * predictions[..., :20], end_dim = -2),
        torch.flatten( exists_box * target[..., :20], end_dim = -2)
    )

    # From paper 
    loss = (
        self.lambda_coord * box_loss +
        object_loss +
        self.lambda_noobj * no_object_loss
        +class_loss
    )

    return loss

# Creating the dataset

In [None]:
class YOLO_VOC(torch.utils.data.Dataset):
  def __init__(self, csv_file, img_dir, label_dir, S=7, B=2, C=20, transform= None):
    self.annotations = pd.read_csv(csv_file)
    self.img_dir = img_dir
    self.label_dir = label_dir
    self.transform = transform
    self.S = S
    self.B = B
    self.C = C

  def __len__(self):
    return (len(self.annotations))
  
  def __getitem__(self, index):
    label_path = os.path.join(self.label_dir, self.annotations.iloc[index, 1])
    boxes = []
    with open((label_path)) as f:
      for label in f.readlines():
        class_label, x,y, w, h = [
            float(x) if float(x) != int(float(x)) else int(x) \
            for x in label.replace("\n","").split()
        ]
        boxes.append([class_label, x, y, w, h])
    img_path = os.path.join(self.img_dir, self.annotations.iloc[index, 0])
    image = Image.open(img_path)
    boxes = torch.tensor(boxes)
    # print(self.transform)
    if self.transform:
      image = self.transform(image)
    label_matrix = torch.zeros((self.S, self.S, self.C + 5*self.B))
    for box in boxes:
      class_label, x, y, w, h = box.tolist()
      class_label  = int(class_label)
      i, j = int(self.S * y), int(self.S * x)
      x_cell, y_cell = self.S*x - j, self.S * y -i
      width_cell, height_cell =(
          w * self.S,
          h * self.S,
      )
      if label_matrix[i, j , 20] == 0:
        label_matrix[i,j, 20] = 1
        box_coordinates = torch.tensor(
            [x_cell, y_cell, width_cell, height_cell]
        )
        label_matrix[i, j , 21:25] = box_coordinates
        label_matrix[i,j, class_label] = 1
    
    return image, label_matrix

In [None]:

img_dir = "/content/images"
label_dir = "/content/labels"
csv_file = "/content/100examples.csv"

transform = transforms.Compose([
    # you can add other transformations in this list
    transforms.Resize((448,448)),
    transforms.ToTensor()
])


train_loader = DataLoader(
    dataset = YOLO_VOC(csv_file, img_dir, label_dir, transform = transform),
    batch_size = 32,
    shuffle = True
)

# Training 

In [None]:
def train(train_loader, model, optimizer, loss_fn):
    loop = tqdm(train_loader, leave=True)
    mean_loss = []
    model.train()
    for batch_idx, (x, y) in enumerate(train_loader):
        x, y = x.to(DEVICE), y.to(DEVICE)
        out = model(x)
        loss = loss_fn(out, y)
        mean_loss.append(loss.item())
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        loop.set_postfix(loss = loss.item())

    print(f"Mean loss was {sum(mean_loss)/len(mean_loss)}")

In [None]:
model = YOLO_V1(split_size = 7, bb_no =2, no_classes = 20).to(DEVICE)
optimizer = torch.optim.Adam(model.parameters(), lr = 2e-6)
loss_fn = YOLO_LOSS()


In [None]:
LOAD_MODEL = False
if LOAD_MODEL:
      load_checkpoint(torch.load("/content/output.pth.tar"), model, optimizer)
EPOCHS = 1000
for epoch in range(EPOCHS):

        pred_boxes, target_boxes = get_bboxes(
            train_loader, model, iou_threshold=0.5, threshold=0.4
        )

        mean_avg_prec = mean_average_precision(pred_boxes, target_boxes, iou_threshold=0.5)
        print(f"Train mAP: {mean_avg_prec}")

        if mean_avg_prec > 0.9:
           checkpoint = {
               "state_dict": model.state_dict(),
               "optimizer": optimizer.state_dict(),
           }
           save_checkpoint(checkpoint, filename="/content/output.pth.tar")
           import time
           time.sleep(10)

        train(train_loader, model, optimizer, loss_fn)