# Imports

In [None]:
import time

import torch
import torch.nn as nn
from torch.nn import Sequential
from torch.optim import Adam
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms

import matplotlib.pyplot as plt
import cv2 as cv

from src.constants import (
    COLLAPSED_ANNOTATIONS_PATH,
    POSITIVES_PATH,
    POSITIVES_VALIDATION_PATH,
    VALIDATION_ANNOTATIONS_PATH,
    MODEL_PATH,
    LABELS_MAP,
)
from src.utils.readers import get_images, get_annotations

from src.utils.helpers import check_if_dirs_exist

# Constants

In [None]:
from pathlib import Path

MODEL_PATH = Path("../" + str(MODEL_PATH))
POSITIVES_PATH = Path("../" + str(POSITIVES_PATH))
POSITIVES_VALIDATION_PATH = Path("../" + str(POSITIVES_VALIDATION_PATH))
COLLAPSED_ANNOTATIONS_PATH = Path("../" + str(COLLAPSED_ANNOTATIONS_PATH))
VALIDATION_ANNOTATIONS_PATH = Path("../" + str(VALIDATION_ANNOTATIONS_PATH))

INIT_LR = 1e-4
BATCH_SIZE = 64
EPOCHS = 100

# Initializations

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Data loading

In [None]:
class CustomDataset(Dataset):
    def __init__(self, images, labels):
        self.images = images
        self.labels = labels
        self.transform = transforms.Compose([transforms.ToTensor()])

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels[idx]
        label = torch.tensor(LABELS_MAP[label], dtype=torch.long)
        image = self.transform(image)

        return image, label

In [None]:
trainImages = get_images(POSITIVES_PATH)
trainImages = [cv.cvtColor(image, cv.COLOR_BGR2RGB) for image in trainImages]
trainAnnotations = get_annotations(COLLAPSED_ANNOTATIONS_PATH)
trainLabels = []

valImages = get_images(POSITIVES_VALIDATION_PATH)
valImages = [cv.cvtColor(image, cv.COLOR_BGR2RGB) for image in valImages]
valAnnotations = get_annotations(VALIDATION_ANNOTATIONS_PATH)
valLabels = []

for image_name, detections_list in trainAnnotations.items():
    for _, char in detections_list:
        trainLabels.append(char)
        trainLabels.append(char)

for image_name, detections_list in valAnnotations.items():
    for _, char in detections_list:
        valLabels.append(char)
        valLabels.append(char)

trainDataset = CustomDataset(trainImages, trainLabels)
valDataset = CustomDataset(valImages, valLabels)

trainDataLoader = DataLoader(trainDataset, batch_size=BATCH_SIZE, shuffle=True)
valDataLoader = DataLoader(valDataset, batch_size=BATCH_SIZE, shuffle=True)

print("[INFO] number of training images: {}".format(len(trainImages)))
print("[INFO] number of training labels: {}".format(len(trainLabels)))
print("[INFO] number of validation images: {}".format(len(valImages)))
print("[INFO] number of validation labels: {}".format(len(valLabels)))

# Visualize data

In [None]:
found = set()  # Using a set to store unique labels found
fig, axes = plt.subplots(1, 5, figsize=(15, 5))  # Creating subplots for 5 images

for images, labels in trainDataLoader:
    for image, label in zip(images, labels):
        if label.item() not in found:
            found.add(label.item())
            image = image.permute(1, 2, 0).numpy()

            # Plot the image in the next available subplot
            ax = axes[len(found) - 1]
            ax.imshow(image)
            ax.set_title(label)
            ax.axis("off")

        if len(found) == 5:
            break
    if len(found) == 5:
        break

# Show the subplots
plt.tight_layout()
plt.show()

# Data frequency

In [None]:
from collections import Counter

# Choose between trainLabels or valLabels
labels = trainLabels  # Change this to valLabels for validation dataset

# Count occurrences of each label
label_counts = Counter(labels)

# Plotting the bar chart
plt.figure(figsize=(8, 6))
plt.bar(label_counts.keys(), label_counts.values())
plt.title("Label Frequency Distribution")
plt.xlabel("Labels")
plt.ylabel("Frequency")
plt.show()

# Model

In [None]:
model = Sequential(
    # Input: 3x40x40
    nn.Conv2d(in_channels=3, out_channels=16, kernel_size=3, padding=1),
    nn.ReLU(),
    nn.MaxPool2d(2, 2),
    nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, padding=1),
    nn.ReLU(),
    nn.MaxPool2d(2, 2),
    nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1),
    nn.ReLU(),
    nn.MaxPool2d(2, 2),
    nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, padding=1),
    nn.ReLU(),
    nn.MaxPool2d(2, 2),
    nn.Flatten(),
    nn.Dropout(0.25),
    nn.Linear(256, 256),
    nn.ReLU(),
    nn.Dropout(0.25),
    nn.Linear(256, 5),
    nn.Softmax(1),
).to(device)

# Optimizer and loss function

In [None]:
optimizer = Adam(model.parameters(), lr=INIT_LR)
loss_function = nn.CrossEntropyLoss()

trainSteps = len(trainDataLoader.dataset) // BATCH_SIZE
valSteps = len(valDataLoader.dataset) // BATCH_SIZE

# Training

In [None]:
print("[INFO] training the network...")
startTime = time.time()

for e in range(EPOCHS):
    model.train()
    totalTrainLoss = 0
    trainCorrect = 0

    for x, y in trainDataLoader:
        optimizer.zero_grad()
        (x, y) = (x.to(device), y.to(device))
        pred = model(x)
        loss = loss_function(pred, y)
        loss.backward()
        optimizer.step()
        totalTrainLoss += loss.item()

        pred = torch.argmax(pred, dim=1)
        trainCorrect += (pred == y).sum().item()

    avgTrainLoss = totalTrainLoss / trainSteps
    trainAccuracy = trainCorrect / len(trainDataLoader.dataset)

    print("[INFO] EPOCH: {}/{}".format(e + 1, EPOCHS))
    print("Train loss: {:.6f}, Train accuracy: {:.4f}".format(avgTrainLoss, trainAccuracy))

endTime = time.time()
print("[INFO] total time taken to train the model: {:.2f}s".format(endTime - startTime))

# Test the model on validation data

In [None]:
model.eval()
valCorrect = 0

with torch.no_grad():
    for x, y in valDataLoader:
        (x, y) = (x.to(device), y.to(device))
        pred = model(x)
        pred = torch.argmax(pred, dim=1)
        valCorrect += (pred == y).sum().item()

valAccuracy = valCorrect / len(valDataLoader.dataset)

print("Validation accuracy: {:.4f}".format(valAccuracy))

# Saving the model

In [None]:
check_if_dirs_exist([MODEL_PATH])
model.save(str(MODEL_PATH / "task2_cnn.pth"))