In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from PIL import Image, ImageFile
import os

## Data

In [None]:
ImageFile.LOAD_TRUNCATED_IMAGES = True

def check_image(path):
    """Check if the image is valid or not"""
    try:
        im = Image.open(path)
        return True
    except:
        return False

In [None]:
Data_path = "./Pascal VOC 2012.v1-raw.coco/"

train_path = os.path.join(Data_path, "train")
val_path = os.path.join(Data_path, "val")


## Model

In [None]:
class MyModel(nn.Module):
    def __init__(self):
        super(MyModel, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.classifier = nn.Sequential(
            nn.Linear(256, 128),
            nn.ReLU(inplace=True),
            nn.Dropout(),
            nn.Linear(128, 64),
            nn.ReLU(inplace=True),
            nn.Dropout(),
            nn.Linear(64, 24),
        )
    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x

In [None]:
model = MyModel()

In [None]:
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:
class ObjectDetectionLoss(nn.Module):
    def __init__(self, lambda_coord = 5.0):
        super(ObjectDetectionLoss, self).__init__()
        self.cross_entropy = nn.CrossEntropyLoss()
        self.smooth_l1 = nn.SmoothL1Loss()
        self.lamdba = lambda_coord
    def forward(self, output, target):
        prediction_prob = output[:,:20]
        prediction_box = output[:,20:]
        
        target_prob = target[:,:20]
        target_box = target[:,20:]
        
        classification_loss = self.cross_entropy(prediction_prob, target_prob)
        box_loss = self.smooth_l1(prediction_box, target_box)
        
        total_loss = classification_loss + self.lamdba * box_loss
        return total_loss       

In [None]:
criterion = ObjectDetectionLoss()

## Training

In [None]:
def find_lr(model, loss_fn, optimizer, train_loader, init_value=1e-8, final_value=10.0, device="cpu"):
    number_in_epoch = len(train_loader) - 1
    update_step = (final_value / init_value) ** (1 / number_in_epoch)
    lr = init_value
    optimizer.param_groups[0]["lr"] = lr
    best_loss = 0.0
    batch_num = 0
    losses = []
    log_lrs = []
    for data in train_loader:
        batch_num += 1
        inputs, targets = data
        inputs = inputs.to(device)
        targets = targets.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = loss_fn(outputs, targets)

        if batch_num > 1 and loss > 4 * best_loss:
            if(len(log_lrs) > 20):
                return log_lrs[10:-5], losses[10:-5]
            else:
                return log_lrs, losses

        if loss < best_loss or batch_num == 1:
            best_loss = loss

        losses.append(loss.item())
        log_lrs.append((lr))

        loss.backward()
        optimizer.step()

        lr *= update_step
        optimizer.param_groups[0]["lr"] = lr
        
    if(len(log_lrs) > 20):
        return log_lrs[10:-5], losses[10:-5]
    else:
        return log_lrs, losses      

In [None]:
log_lrs, losses = find_lr(model, criterion, optimizer, train_loader, device="cuda")

plt.plot(log_lrs, losses)
plt.xscale("log")
plt.xlabel("Learning rate")
plt.ylabel("Loss")
plt.show()

In [None]:
lr = 1e-4
omtimizer = optim.Adam(model.parameters(), lr=lr)

In [4]:
def train(model, optimizer, criterion, train_loader, val_loader, num_epochs, device = "cpu"):
    for epoch in range(num_epochs):
        model.train()
        trainning_loss = 0.0
        valid_loss = 0.0
        for batch in train_loader:
            optimizer.zero_grad()
            inputs, targets = batch
            inputs = inputs.to(device)
            targets = targets.to(device)
            output = model(inputs)
            loss = criterion(output, targets)
            loss.backward()
            optimizer.step()
            trainning_loss += loss.item() * inputs.size(0)
        trainning_loss = trainning_loss / len(train_loader.dataset)
        

        model.eval()
        for batch in val_loader:
            inputs, targets = batch
            inputs = inputs.to(device)
            targets = targets.to(device)
            output = model(inputs)
            loss = criterion(output, targets)
            valid_loss += loss.item() * inputs.size(0)
        valid_loss = valid_loss / len(val_loader.dataset)
        
        print('Epoch: {}, Training Loss: {:.2f}, Validation Loss: {:.2f}'.format(epoch, trainning_loss, valid_loss))           

In [None]:
train(model, optimizer, criterion, train_loader, val_loader, 10, device="cuda")

## Making Predictions

In [None]:
image = Image.open(os.path.join(val_path, "2007_000033.jpg"))
image = img_transform(image)
output = model(image.unsqueeze(0).to("cuda"))
output = output[0].detach().cpu().numpy()

boxes = output[20:].reshape(-1, 4)
boxes = boxes * 224
boxes = boxes.astype(np.int32)
boxes = boxes.tolist()

predictions = F.Softmax(output[:20],dim = 1)
class_idx = torch.argmax(predictions).item()
class_label = labels[class_idx]

fig, ax = plt.subplots()
ax.imshow(image.permute(1, 2, 0))