In [1]:
from dotenv import load_dotenv
load_dotenv()

True

In [2]:
import os
import kagglehub
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision.io import read_image
import pandas as pd
from pathlib import Path
import numpy as np
import matplotlib.pyplot as plt
from utils import intersection_over_union
from PIL import Image

In [3]:
path = kagglehub.dataset_download("slavkoprytula/aquarium-data-cots")



In [5]:
def build_dataframe(path):
    filenames = []
    for file in path.iterdir():
        filenames.append(file.stem)
    return pd.DataFrame({
        "filename": filenames
    })
        

In [6]:
train_df = build_dataframe(Path(f"{path}/aquarium_pretrain/train/labels"))
test_df = build_dataframe(Path(f"{path}/aquarium_pretrain/test/labels"))
valid_df = build_dataframe(Path(f"{path}/aquarium_pretrain/valid/labels"))

In [21]:
class UnderWaterDataset(Dataset):

    def __init__(self, df, path, split_size, boxes, classes, transform=None):
        self.df = df
        self.path = path
        self.S = split_size
        self.B = boxes
        self.C = classes

    def __len__(self):
        return len(self.df)

    def __getitem__(self, index):
        filename = self.df.loc[index,'filename']
        boxes = []
        with open(f"{self.path}/labels/{filename}.txt") as f:
            for line in f.readlines():
                label, x, y, width, height = line.split()
                boxes.append([int(label), float(x), float(y), float(width), float(height)])

        label_matrix = torch.zeros((self.S, self.S, 5 + self.C))

        image = Image.open(f"{self.path}/images/{filename}.jpg")

        for box in boxes:
            label, x, y, width, height = box
            i, j = int(self.S * y), int(self.S * x)
            x_cell, y_cell = self.S * x - j, self.S * y - i
            width_cell, height_cell = width * self.S, height * self.S

            if label_matrix[i, j, self.C] == 0:
                label_matrix[i, j, self.C] = 1
                box_coordinates = torch.tensor(
                    [x_cell, y_cell, width_cell, height_cell]
                )
                label_matrix[i, j, self.C+1:self.C+5] = box_coordinates
                label_matrix[i, j, label] = 1

        return image, label_matrix

In [22]:
train_dataset = UnderWaterDataset(train_df, f"{path}/aquarium_pretrain/train", 7, 2, 7)

In [23]:
dataset_iterator = iter(train_dataset)
element = next(dataset_iterator)

In [57]:
# (kernel size, out channels, strides, padding)

architecture_config = [
    (7, 64, 2, 3),
    "M",
    (3, 192, 1, 1),
    "M",
    (1, 128, 1, 0),
    (3, 256, 1, 1),
    (1, 256, 1, 0),
    (3, 512, 1, 1),
    "M",
    [(1, 256, 1, 0), (3, 512, 1, 1), 4],
    (1, 512, 1, 0),
    (3, 1024, 1, 1),
    "M",
    [(1, 512, 1, 0), (3, 1024, 1, 1), 2],
    (3, 1024, 1, 1),
    (3, 1024, 2, 1),
    (3, 1024, 1, 1),
    (3, 1024, 1, 1),
]

In [58]:
class CNNBlock(nn.Module):
    def __init__(self, in_channels, out_channels, **kwargs):
        super(CNNBlock, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, bias=False, **kwargs)
        self.batchnorm = nn.BatchNorm2d(out_channels)
        self.leakyrelu = nn.LeakyReLU(0.1)

    def forward(self, x):
        return self.leakyrelu(self.batchnorm(self.conv(x)))        
        

In [61]:
class YoloV1(nn.Module):

    def __init__(self, in_channels=3, **kwargs):
        super(YoloV1, self).__init__()
        self.in_channels = in_channels
        self.architecture = architecture_config

        self.darknet = self._create_conv_layers(self.architecture)
        self.fcs = self._create_fcs(**kwargs)

    def forward(self, x):
        x = self.darknet(x)
        x = self.fcs(torch.flatten(x, start_dim=1))
        return x

    def _create_conv_layers(self, architecture):
        layers = []
        in_channels = self.in_channels

        for x in architecture:
            if type(x) == tuple:
                layers += [CNNBlock(in_channels, out_channels=x[1], kernel_size=x[0], stride=x[2], padding=x[3])]
                in_channels = x[1]
            elif type(x) == str:
                layers += [nn.MaxPool2d(kernel_size=2, stride=2)]
            elif type(x) == list:
                conv_1 = x[0]
                conv_2 = x[1]
                n = x[2]
                for _ in range(n):
                    layers += [CNNBlock(in_channels, out_channels=conv_1[1], kernel_size=conv_1[0], stride=conv_1[2], padding=conv_1[3])]
                    layers += [CNNBlock(conv_1[1], out_channels=conv_2[1], kernel_size=conv_2[0], stride=conv_2[2], padding=conv_2[3])]
                    in_channels = conv_2[1]
                    
        return nn.Sequential(*layers)


    def _create_fcs(self, split_size, num_boxes, num_classes):

        S, B, C = split_size, num_boxes, num_classes
        return nn.Sequential(
            nn.Flatten(),
            nn.Linear(1024 * S * S, 496),
            nn.Dropout(0.0),
            nn.LeakyReLU(0.1),
            nn.Linear(496, S * S * (B * 5 + C))
        )

In [62]:
model = YoloV1(split_size=7, num_boxes=2, num_classes=20)
x = torch.randn((2, 3, 448, 448))
logits = model(x)
print(logits.shape)

torch.Size([2, 1470])


In [None]:
class YoloLoss(nn.Module):

    def __init__(self, S, B, C):
        super(YoloLoss, self).__init__()
        self.mse = nn.MSELoss(reduction="sum")
        self.B = B
        self.S = S
        self.C = C

        self.lambda_noobj = 0.5
        self.lambda_coord = 5

    def forward(self, predictions, targets):
        predictions = predictions.view(-1, self.S, self.S, self.C + self.B * 5)

        iou_b1 = intersection_over_union(predictions[:, :, :, 21:25], targets[:, :, :, 21:25])
        iou_b2 = intersection_over_union(predictions[:, :, :, 26:30], targets[:, :, :, 21:25])

        ious = torch.cat([iou_b1.unsqueeze(0), iou_b2.unsqueeze(0)], dim=0)
        iou_maxes, best_box = torch.max(ious, dim=0)
        exists_box = targets[:, :, :, 20].unsqueeze(3) # identity object i


        box_predictions = exists_box * (
            (
                ((1 - best_box) * predictions[:, :, :, 21:25]) +
                best_box * predictions[:, :, :, 26:30]
            )
        )

        box_targets = exists_box * targets[:, 21:25]

        box_predictions[:, :, :, 2:4] =  (
            torch.sign(box_predictions[:, :, :, 2:4]) * 
            torch.sqrt(torch.abs(box_predictions[:, :, :, 2:4]) + 1e-6)
        )

        box_targets[: , :, :, 2:4] = torch.sqrt(box_targets[:, :, :, 2:4])

        box_loss = self.mse(
            torch.flatten(box_predictions, end_dim=-2),
            torch.flatten(box_targets, end_dim=-2)
        )



        pred_box = (
            ((1 - best_box) * predictions[:, :, :, 20:21]) +
            (best_box * predictions[:, :, :, 25:26])
        )

        object_loss = self.mse(
            torch.flatten(exists_box * pred_box)
            torch.flatten(exists_box * targets[:, :, :, 20:21])
        )



        
        no_object_loss = self.mse(
            torch.flatten((1-exists_box) * predictions[:, :, :, 20:21], start_dim=1)
            torch.flatten((1-exists_box) * targets[:, :, :, 20:21], start_dim=1)
        )

        no_object_loss += self.mse(
            torch.flatten((1-exists_box) * predictions[:, :, :, 25:26], start_dim=1)
            torch.flatten((1-exists_box) * targets[:, :, :, 20:21], start_dim=1)
        )



        class_loss = self.mse(
            torch.flatten(exists_box * predictions[:, :, :, :20], end_dim=-2)
            torch.flatten(exists_box * targets[:, :, :, :20], end_dim=-2)
        )


        general_loss = (
            self.lambda_coord * box_loss
            + object_loss
            + self.lambda_noobj * no_object_loss
            + class_loss
        )

        return general_loss        