 # Question 3
 Train a YOLOv3 model on the dataset mentioned above for object localization and evaluate its performance. Provide model output from data point used in Question 2. <br>
 [Note: Marks will be given based on different experiments and discussion]








In [1]:
# %%

import torch
import torch.nn as nn
import torchvision.models as models
import pandas as pd
import torch.optim as optim
from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, recall_score, f1_score, roc_curve, auc, precision_recall_curve
from matplotlib import pyplot as plt
import seaborn as sns
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader, random_split

In [2]:
# %%

import torch
import torch.nn as nn

class ConvBlock(nn.Module):
    """
    Convolutional Block: Conv -> BatchNorm -> LeakyReLU
    """
    def __init__(self, in_channels, out_channels, kernel_size, stride, padding):
        super(ConvBlock, self).__init__()
        self.conv = nn.Conv2d(
            in_channels, 
            out_channels, 
            kernel_size=kernel_size, 
            stride=stride, 
            padding=padding, 
            bias=False
        )
        self.bn = nn.BatchNorm2d(out_channels)
        self.leaky = nn.LeakyReLU(0.1, inplace=True)
        
    def forward(self, x):
        return self.leaky(self.bn(self.conv(x)))

In [3]:
# %%

class ResidualBlock(nn.Module):
    """
    Residual Block: (Conv -> Conv) + Skip Connection
    """
    def __init__(self, in_channels, out_channels):
        super(ResidualBlock, self).__init__()
        # First 1x1 convolution to reduce channels
        self.conv1 = ConvBlock(in_channels, out_channels, kernel_size=1, stride=1, padding=0)
        # Then 3x3 convolution to restore channels
        self.conv2 = ConvBlock(out_channels, in_channels, kernel_size=3, stride=1, padding=1)
        
    def forward(self, x):
        residual = x
        out = self.conv1(x)
        out = self.conv2(out)
        return out + residual

In [4]:
# %%

class Upsample(nn.Module):
    def __init__(self, scale_factor=2, mode = 'nearest'):
        super(Upsample, self).__init__()
        self.upsample = nn.Upsample(scale_factor=scale_factor, mode = mode)
        
    def forward(self, x):
        return self.upsample(x)

In [5]:
# %%

class Darknet53(nn.Module):
    def __init__(self):
        super(Darknet53,  self).__init__()
        self.layers = self.__create_layers__()
    
    def __create_layers__(self):
        layers = []
        in_chan = 3
        layers.append(ConvBlock(in_chan, 32, 3, 1, 1))

        in_chan = 32

        res_blocks = [1,2,8,8,4]

        conv_blocks_filters = [64, 128, 256, 512, 1024]

        self.pause = []
        for i, chn_sz in enumerate(conv_blocks_filters):
            layers.append(ConvBlock(in_chan, chn_sz, 3, 2, 1))
            in_chan = chn_sz
            for b in range(res_blocks[i]):
                layers.append(ResidualBlock(in_chan, in_chan//2))
            
            if i == 2 or i==3 or i==4:
                self.pause.append(len(layers)-1)
            
            
        
        return nn.Sequential(*layers)
    
    def forward(self,x):
        outputs = []
        for layer_num, layer in enumerate(self.layers):
            x = layer(x)
            if layer_num in self.pause:
                outputs.append(x)
        return outputs

In [6]:
# %%

class DetectorHead(nn.Module):

    def __init__(self, in_channels, out_channels, num_classes):
        super(DetectorHead, self).__init__()
        self.layers = nn.Sequential(
            ConvBlock(in_channels, out_channels, kernel_size=1, stride=1, padding=0),
            ConvBlock(out_channels, in_channels * 2, kernel_size=3, stride=1, padding=1),
            ConvBlock(in_channels * 2, out_channels, kernel_size=1, stride=1, padding=0),
            ConvBlock(out_channels, in_channels * 2, kernel_size=3, stride=1, padding=1),
            ConvBlock(in_channels * 2, out_channels, kernel_size=1, stride=1, padding=0),
            ConvBlock(out_channels, in_channels * 2, kernel_size=3, stride=1, padding=1),
            ConvBlock(in_channels * 2, out_channels, kernel_size=1, stride=1, padding=0),
            # Final convolution for predictions
            nn.Conv2d(out_channels, 3 * (5 + num_classes), kernel_size=1, stride=1, padding=0)
        )
        
    def forward(self, x):
        return self.layers(x)

In [7]:
# %%

class YOLOv3(nn.Module):
    def __init__(self, num_classes=80):
        super(YOLOv3, self).__init__()
        self.num_classes = num_classes
        
        # Backbone
        self.backbone = Darknet53()
        
        # Detection Heads
        # For different scales: 13x13, 26x26, 52x52
        self.head_13 = DetectorHead(in_channels=  1024, out_channels=512, num_classes=num_classes)
        self.head_26 = DetectorHead(in_channels=  768, out_channels=256, num_classes=num_classes)
        self.head_52 = DetectorHead(in_channels=  384, out_channels=128, num_classes=num_classes)
        
        # Additional layers for upsampling and concatenation
        self.upsample = Upsample()
        self.conv_set_26 = nn.Sequential(
            ConvBlock(1024, 256, kernel_size=1, stride=1, padding=0),
            ConvBlock(256, 512, kernel_size=3, stride=1, padding=1),
            ConvBlock(512, 256, kernel_size=1, stride=1, padding=0),
            ConvBlock(256, 512, kernel_size=3, stride=1, padding=1),
            ConvBlock(512, 256, kernel_size=1, stride=1, padding=0)
        )
        
        self.conv_set_52 = nn.Sequential(
            ConvBlock( 768, 128, kernel_size=1, stride=1, padding=0),
            ConvBlock( 128, 256, kernel_size=3, stride=1, padding=1),
            ConvBlock( 256, 128, kernel_size=1, stride=1, padding=0),
            ConvBlock( 128, 256, kernel_size=3, stride=1, padding=1),
            ConvBlock( 256, 128, kernel_size=1, stride=1, padding=0)
        )
        
    def forward(self, x):
        # Backbone
        features = self.backbone(x)  # [52x52, 26x26, 13x13]
        feat_52, feat_26, feat_13 = features
        
        # Detection Head at 13x13
        out_13 = self.head_13(feat_13)
        
        # Processing for 26x26
        feat_26_processed = self.conv_set_26(feat_13)
        upsampled_26 = self.upsample(feat_26_processed)
        # Concatenate with feat_26 from backbone
        concat_26 = torch.cat([upsampled_26, feat_26], dim=1)
        # Detection Head at 26x26
        out_26 = self.head_26(concat_26)
        
        # Processing for 52x52
        feat_52_processed = self.conv_set_52(concat_26)
        upsampled_52 = self.upsample(feat_52_processed)

        # Concatenate with feat_52 from backbone
        concat_52 = torch.cat([upsampled_52, feat_52], dim=1)
        # Detection Head at 52x52
        out_52 = self.head_52(concat_52)
        
        return out_52, out_26, out_13

In [8]:
# %%

import torch
import torch.nn as nn
import torch.nn.functional as F

class YOLOv3Loss(nn.Module):
    def __init__(self, anchors, num_classes=1, ignore_thresh=0.5, lambda_coord=1, lambda_noobj=0.5):
        super(YOLOv3Loss, self).__init__()
        self.num_classes = num_classes
        self.anchors = anchors
        self.ignore_thresh = ignore_thresh
        self.lambda_coord = lambda_coord
        self.lambda_noobj = lambda_noobj

        self.mse_loss = nn.MSELoss(reduction='sum')  # For localization
        self.bce_loss = nn.BCELoss(reduction='sum')  # For objectness and class


    def forward(self, predictions, targets):
        out_52, out_26, out_13 = predictions
        loss = 0

        # Define the scales and corresponding anchors
        scales = [
            {'output': out_52, 'anchors': self.anchors[0], 'stride': 8},
            {'output': out_26, 'anchors': self.anchors[1], 'stride': 16},
            {'output': out_13, 'anchors': self.anchors[2], 'stride': 32},
        ]

        for scale in scales:
            output = scale['output']
            anchors = scale['anchors']
            stride = scale['stride']
            grid_size = output.size(2)
            batch_size = output.size(0)

            # Reshape output
            prediction = output.view(batch_size, len(anchors), 5 + self.num_classes, grid_size, grid_size)
            prediction = prediction.permute(0, 1, 3, 4, 2).contiguous()

            # Sigmoid the center_x, center_y, and objectness score
            x = torch.sigmoid(prediction[..., 0])  # Center x
            y = torch.sigmoid(prediction[..., 1])  # Center y
            w = prediction[..., 2]  # Width
            h = prediction[..., 3]  # Height
            objectness = torch.sigmoid(prediction[..., 4])
            class_probs = torch.sigmoid(prediction[..., 5:])  # Since num_classes=1

            # Create grid for calculating offsets
            grid_x = torch.arange(grid_size, device=x.device).repeat(grid_size, 1).view([1, 1, grid_size, grid_size])
            grid_y = torch.arange(grid_size, device=y.device).repeat(grid_size, 1).t().view([1, 1, grid_size, grid_size])

            # Calculate the actual center positions
            pred_boxes = torch.zeros_like(prediction[..., :4])
            pred_boxes[..., 0] = (x + grid_x) * stride
            pred_boxes[..., 1] = (y + grid_y) * stride

            anchors_tensor = torch.tensor(anchors).to(pred_boxes.device)  # Convert anchors to a tensor
            pred_boxes[..., 2] = torch.exp(w) * anchors_tensor[..., 0].view(1, -1, 1, 1)  # Width
            pred_boxes[..., 3] = torch.exp(h) * anchors_tensor[..., 1].view(1, -1, 1, 1)  # Height

            # Initialize no-object mask
            noobj_mask = torch.ones_like(objectness)
            
            # Process targets for each batch
            for b in range(batch_size):
                if len(targets[b]) == 0:
                    continue

                target = targets[b]
                tx, ty, tw, th = target  # Assuming the target format [x, y, w, h]
                
                # Calculate grid cell
                gx = int(tx // stride)
                gy = int(ty // stride)
                
                # Calculate offsets and ground truth
                gx_offset = (tx / stride) - gx
                gy_offset = (ty / stride) - gy
                gt_w = torch.log(tw / anchors_tensor[..., 0] + 1e-16)
                gt_h = torch.log(th / anchors_tensor[..., 1] + 1e-16)

                # Localization loss
                loss += self.lambda_coord * (
                    self.mse_loss(x[b, :, gy, gx], torch.tensor(gx_offset, device=x.device)) +
                    self.mse_loss(y[b, :, gy, gx], torch.tensor(gy_offset, device=y.device)) +
                    self.mse_loss(w[b, :, gy, gx], torch.tensor(gt_w, device=w.device)) +
                    self.mse_loss(h[b, :, gy, gx], torch.tensor(gt_h, device=h.device))
                )

                # Objectness loss

                target_objectness = torch.ones_like(objectness[b, :, gy, gx], device=objectness.device)

                loss += self.bce_loss(objectness[b, :, gy, gx], target_objectness)

                # Class prediction loss
                target_prob = torch.ones_like(class_probs[b, :, gy, gx], device=class_probs.device)
                loss += self.bce_loss(class_probs[b, :, gy, gx], target_prob)

                # Mark the grid cell as responsible for the object
                noobj_mask[b, :, gy, gx] = 0

            # No-object loss: For grid cells that don't contain objects
            loss += self.lambda_noobj * self.bce_loss(objectness * noobj_mask, torch.zeros_like(objectness))

        return loss / (batch_size + 1e-16)  # Normalize by batch size to avoid division by zero

In [9]:
# %%

import numpy as np

In [10]:
# %%

anchors = np.array([
    [(10,13), (16,30), (33,23)],  # 52x52
    [(30,61), (62,45), (59,119)],  # 26x26
    [(116,90), (156,198), (373,326)]  # 13x13
])

In [11]:
# %%

criterion = YOLOv3Loss(anchors=anchors, num_classes=1)

In [12]:
# %%

import os
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
import cv2
import numpy as np

class YOLODataset(Dataset):
    def __init__(self, csv_file, img_dir, part=0, target_size=(416, 416), transform=None):
        """
        Args:
            csv_file (str): Path to the CSV file with annotations.
            img_dir (str): Directory with all the images.
            target_size (tuple): Desired output size of the images (width, height).
            transform (callable, optional): Optional transform to be applied on a sample.
        """
        self.annotations = pd.read_csv(csv_file)
        train, validate, test = \
              np.split(self.annotations.sample(frac=1, random_state=22188), 
                       [int(.6*len(self.annotations)), int(.8*len(self.annotations))])
        if type == 0:
            self.annotations = train
        elif type == 1:
            self.annotations = validate
        else:
            self.annotations = test
        
        self.img_dir = img_dir
        self.target_size = target_size
        self.transform = transform

    def __len__(self):
        return len(self.annotations)

    def resize_image_and_bbox(self, image, bbox):
        """
        Resize image and adjust bounding box according to the new image size.
        """
        original_height, original_width = image.shape[:2]
        resized_image = cv2.resize(image, self.target_size)

        scale_x = self.target_size[0] / original_width
        scale_y = self.target_size[1] / original_height

        x0, y0, x1, y1 = bbox
        x0 = int(x0 * scale_x)
        y0 = int(y0 * scale_y)
        x1 = int(x1 * scale_x)
        y1 = int(y1 * scale_y)

        return resized_image, (x0, y0, x1, y1)

    def __getitem__(self, idx):
        # Get image file name and bounding box from the annotations
        img_name = os.path.join(self.img_dir, self.annotations.iloc[idx, 0])
        image = cv2.imread(img_name)

        # Get original bounding box
        bbox = self.annotations.iloc[idx, 3:7].values
        bbox = list(map(int, bbox))  # Convert to integers

        # Resize image and adjust the bounding box
        image, bbox = self.resize_image_and_bbox(image, bbox)

        # Apply any transforms (if provided)
        if self.transform:
            image = self.transform(image)

        # Convert image and bbox to PyTorch tensors
        image = torch.from_numpy(image).permute(2, 0, 1).float()  # Convert to (C, H, W) format
        bbox = torch.tensor(bbox).float()

        return image, bbox

# Usage example:

# Create dataset
csv_file = 'yolo_p3/faces.csv'
img_dir = 'yolo_p3/images/'
train = YOLODataset(csv_file=csv_file, img_dir=img_dir, part =0 )
valid = 
# Create dataloader
dataloader = DataLoader(dataset, batch_size=1, shuffle=True, num_workers=4)

  return bound(*args, **kwds)


In [None]:
# %%

import matplotlib.pyplot as plt

def draw_bbox(image, bbox, color=(0, 255, 0), thickness=2):
    """
    Draws a bounding box on the image.
    
    Args:
    - image: The image on which to draw (in numpy format).
    - bbox: The bounding box coordinates (x0, y0, x1, y1).
    - color: The color of the bounding box (default is green).
    - thickness: The thickness of the bounding box lines.
    """
    x0, y0, x1, y1 = list(map(int, bbox))
    image_with_bbox = cv2.rectangle(image.copy(), (x0, y0), (x1, y1), color, thickness)
    return image_with_bbox

In [None]:
# %%

device = "cuda"
model = YOLOv3(num_classes=1).to(device)  # Single class case
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = YOLOv3Loss(anchors)  # Your custom loss function

# Create DataLoader
dataloader = DataLoader(dataset, batch_size=16, shuffle=True, num_workers=4)

In [None]:
# %%

import torch
import torch.optim as optim
from tqdm import tqdm

# Assume these are already defined:
# - YOLOv3: Your YOLO model class
# - YOLOLoss: The loss function class
# - dataset: Your custom dataset
# - DataLoader: To load batches of data

# Set up model, optimizer, and loss function

# Training loop
num_epochs = 10  # Set your desired number of epochs

for epoch in range(num_epochs):
    model.train()  # Set model to training mode
    running_loss = 0.0
    
    # Loop over data
    for images, bboxes in tqdm(dataloader, desc=f"Epoch {epoch+1}/{num_epochs}"):
        images = images.to(device)  # Move images to the GPU/CPU
        bboxes = bboxes.to(device)  # Move bounding boxes to the GPU/CPU
        
        # Forward pass
        outputs = model(images)
        # Compute the loss (assuming loss function handles multiple scales: 13x13, 26x26, 52x52)
        loss = criterion(outputs, bboxes)

        # Backpropagation and optimization
        optimizer.zero_grad()  # Zero the gradients
        loss.backward()  # Backpropagate
        optimizer.step()  # Update model weights
        
        # Update running loss
        running_loss += loss.item()

    # Print average loss per epoch
    avg_loss = running_loss / len(dataloader)
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}")
    
    # Save the model checkpoint (optional)
    torch.save(model.state_dict(), f"yolov3_epoch_{epoch+1}.pth")

print("Training complete!")

  self.mse_loss(x[b, :, gy, gx], torch.tensor(gx_offset, device=x.device)) +
  return F.mse_loss(input, target, reduction=self.reduction)
  self.mse_loss(y[b, :, gy, gx], torch.tensor(gy_offset, device=y.device)) +
  self.mse_loss(w[b, :, gy, gx], torch.tensor(gt_w, device=w.device)) +
  self.mse_loss(h[b, :, gy, gx], torch.tensor(gt_h, device=h.device))
Epoch 1/10:   5%|▌         | 11/210 [00:04<01:27,  2.29it/s]


KeyboardInterrupt: 