In [None]:
# !pip install yolov5


In [None]:
# !git clone https://github.com/ultralytics/yolov5.git
# !pip install -r yolov5/requirements.txt


In [None]:
# Include all packages
import os
import cv2
from time import time
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from yolov5.models.yolo import Model
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split


In [None]:
# from google.colab import drive
# drive.mount('/content/drive')
# import zipfile
# with zipfile.ZipFile('/content/drive/MyDrive/DL/FinalProject/signDatabasePublicFramesOnly.zip', 'r') as zip_ref:
#     zip_ref.extractall('./DataSet')


In [None]:

def ResizeImage(image: np.ndarray, x1: int, y1: int, x2: int, y2: int, newWidth: int, newHeight: int) -> tuple:
    originalHeight, originalWidth = image.shape[:2]
    widthScale = newWidth / originalWidth
    heightScale = newHeight / originalHeight
    resizedImage = cv2.resize(
        image, (newWidth, newHeight), interpolation=cv2.INTER_LINEAR)
    x1New, y1New = int(x1 * widthScale), int(y1 * heightScale)
    x2New, y2New = int(x2 * widthScale), int(y2 * heightScale)
    return resizedImage, x1New, y1New, x2New, y2New


In [None]:
def LoadDataSet(dataSetFolderPath: str) -> tuple:
    images = []
    annotations = []
    annotationsFilePath = dataSetFolderPath+"/allAnnotations.csv"
    annotationsDataFrame = pd.read_csv(annotationsFilePath, sep=";")
    uniqueSigns = annotationsDataFrame['Annotation tag'].unique().tolist()
    for index, row in annotationsDataFrame[1:].iterrows():
        image = cv2.imread(dataSetFolderPath+"/"+row[0])
        images.append(image)
        annotations.append(
            [uniqueSigns.index(row[1]), row[2], row[3], row[4], row[5]])

    del annotationsDataFrame

    return images, annotations, len(uniqueSigns)


In [None]:
def PreProcessDataSet(images: list, annotations: list, batchSize: int, resize: tuple) -> tuple:
    resizedImages = []
    newAnnotations = []
    for i, image in enumerate(images):
        [label, x1, y1, x2, y2] = annotations[i]
        resizedImage, x1New, y1New, x2New, y2New = ResizeImage(
            image, x1, y1, x2, y2, resize[0], resize[1])
        resizedImages.append(resizedImage)
        newAnnotations.append(
            [(i % batchSize), label, x1New, y1New, x2New, y2New])

    X_train, X_val, y_train, y_val = train_test_split(
        resizedImages, newAnnotations, test_size=0.3, random_state=42)

    return X_train, X_val, y_train, y_val


In [None]:
class CustomDataset(Dataset):
    def __init__(self, data, transform=None):
        self.data = data
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        inputData, label = self.data[idx]

        if self.transform:
            inputData = self.transform(inputData)
        inputData = torch.from_numpy(inputData).float()
        label = torch.tensor(label).float()
        return inputData, label


In [None]:
def CreateDataLoaders(X_train, X_val, y_train, y_val, batchSize):
    trainDataSet = []
    valDataSet = []
    for i in range(len(X_train)):
        trainDataSet.append((X_train[i], y_train[i]))

    for i in range(len(X_val)):
        valDataSet.append((X_val[i], y_val[i]))

    trainDataSet = CustomDataset(trainDataSet)
    valDataSet = CustomDataset(valDataSet)
    trainDataLoader = DataLoader(
        trainDataSet, batchSize=batchSize, shuffle=True, num_workers=4)
    valDataLoader = DataLoader(
        valDataSet, batchSize=batchSize, shuffle=False, num_workers=4)

    return trainDataLoader, valDataLoader


In [None]:
def TargetstoTensors(targets, batchSize, numAnchors, gridSizes, numClasses):
    targetObject = []
    targetClass = []
    targetBox = []
    for grid_size in gridSizes:
        targetObject.append(torch.zeros(
            (batchSize, numAnchors, grid_size, grid_size, 1)))
        targetClass.append(torch.zeros(
            (batchSize, numAnchors, grid_size, grid_size, numClasses)))
        targetBox.append(torch.zeros(
            (batchSize, numAnchors, grid_size, grid_size, 4)))

    for target in targets:
        batch_index, cls, x_center, y_center, width, height = target.long()

        for i, grid_size in enumerate(gridSizes):

            x_cell, y_cell = int(
                x_center * grid_size), int(y_center * grid_size)
            anchor = 0
            try:
                targetObject[i][batch_index, anchor, y_cell, x_cell, 0] = 1
                targetClass[i][batch_index, anchor, y_cell, x_cell, cls] = 1
                targetBox[i][batch_index, anchor, y_cell, x_cell] = torch.tensor(
                    [x_center, y_center, width, height])
            except Exception as e:
                # print(e)
                pass
    return targetObject, targetClass, targetBox


In [None]:
class YOLOv5Loss(nn.Module):
    def __init__(self, numClasses, numAnchors=3):
        super(YOLOv5Loss, self).__init__()
        self.numClasses = numClasses
        self.numAnchors = numAnchors

    def forward(self, preds, targets):
        objectLoss = torch.tensor(0.0, device=preds[0].device)
        classLoss = torch.tensor(0.0, device=preds[0].device)
        boxLoss = torch.tensor(0.0, device=preds[0].device)
        batchSize = preds[0].size(0)
        gridSizes = [pred.size(2) for pred in preds]
        targetObjectList, targetClassList, targetBoxList = TargetstoTensors(
            targets, batchSize, self.numAnchors, gridSizes, self.numClasses)
        for i, pred in enumerate(preds):
            # pred_obj = pred[..., 4].sigmoid()
            # pred_cls = pred[..., 5:].sigmoid()
            # pred_box = pred[..., :4]

            targetObject = targetObjectList[i].to(pred.device)
            targetClass = targetClassList[i].to(pred.device)
            targetBox = targetBoxList[i].to(pred.device)

            objectLoss += nn.BCEWithLogitsLoss()(pred[..., 4:5], targetObject)
            classLoss += nn.BCEWithLogitsLoss()(pred[..., 5:], targetClass)
            boxLoss += nn.MSELoss()(pred[..., :4], targetBox)

        totalLoss = objectLoss + classLoss + boxLoss
        return totalLoss


In [None]:
def CreateYolov5Model(numClasses: int, version="s"):
    congfigFile = "yolov5/models/yolov5{}.yaml".format(version)
    model = Model(congfigFile, ch=3, nc=numClasses)
    return model


In [None]:
def ValidateEpoch(model, dataLoader, optimizer, lossFunction, device):
    totalLoss = 0
    model.eval()
    dataLoaderLen = len(dataLoader)
    
    for i, (inputs, targets )in enumerate(dataLoader):
        inputs = inputs.permute(0, 3, 1, 2)
        inputs = inputs.to(device)
        targets = targets.to(device)
        optimizer.zero_grad()
        with torch.set_grad_enabled(False):
            outputs = model(inputs)
            loss = lossFunction(outputs, targets)
            
        totalLoss += loss.item() * inputs.size(0)
        if(((i*100)//dataLoaderLen) % 10 == 0):
            print((i*100//dataLoaderLen), end="%,")
    print()
    epochLoss = totalLoss / dataLoaderLen
    return epochLoss

In [None]:
def TrainEpoch(model, dataLoader, optimizer, lossFunction, device):
    totalLoss = 0
    model.train()
    dataLoaderLen = len(dataLoader)
    for i, (inputs, targets )in enumerate(dataLoader):
        inputs = inputs.permute(0, 3, 1, 2)
        inputs = inputs.to(device)
        targets = targets.to(device)
        optimizer.zero_grad()
        with torch.set_grad_enabled(True):
            outputs = model(inputs)
            loss = lossFunction(outputs, targets)
            loss.backward()
            optimizer.step()
        totalLoss += loss.item() * inputs.size(0)
        if(((i*100)//dataLoaderLen) % 10 == 0):
            print((i*100//dataLoaderLen), end="%,")
    print()
    epochLoss = totalLoss / dataLoaderLen
    return epochLoss

In [None]:
def TrainModel(model, dataLoader, epochs, optimizer, lossFunction, device):
    for epoch in range(epochs):
        print("Epoch {}/{}:".format(epoch+1, epochs))
        startTime = time()
        print("Train Epoch:")
        TrainingEpochLoss = TrainEpoch(model, dataLoader, optimizer, lossFunction, device)
        # print("Validate Epoch:")
        # ValidateingEpochLoss = ValidateEpoch(model, dataLoader, optimizer, lossFunction, device)
        endTime = time()
        timeTaken = endTime-startTime
        print("Training Loss: {:.4f}".format(TrainingEpochLoss))
        # print("Validateing Loss: {:.4f}".format(ValidateingEpochLoss))
        print("Time taken: {}min, {}, secs".format(timeTaken//60, timeTaken % 60))

    print("Training complete.")
    return model


In [None]:
batchSize = 32
inputShape = (416, 416)
epochs = 100
numAnchors = 3
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
print("Using {} device".format(device))

In [None]:
images, annotations, numClasses = LoadDataSet("./DataSet")


In [None]:
X_train, X_val, y_train, y_val = PreProcessDataSet(
    images, annotations, batchSize, inputShape)
del images
del annotations


In [None]:
trainDataLoader, valDataLoader = CreateDataLoaders(
    X_train, X_val, y_train, y_val, batchSize)
del X_train
del y_train
del X_val
del y_val


In [None]:
yolov5Model = CreateYolov5Model(numClasses)
optimizer = optim.Adam(yolov5Model.parameters(), lr=0.001)
yolov5LossFunction= YOLOv5Loss(numClasses=numClasses)


In [None]:
trainedModel = TrainModel(yolov5Model, trainDataLoader, epochs, optimizer, yolov5LossFunction, device)

In [None]:
torch.save(yolov5Model.state_dict(), 'Trained Model/trained_yolov5Modelv3.pth')