# Preparation

In [35]:
import os
import random
import shutil

import cv2

import matplotlib.pyplot as plt

import numpy as np

from PIL import Image

import torch
import torch.nn as nn
from torch.optim import SGD, Adam
from torch.utils.data import DataLoader, Dataset
from torch.utils.tensorboard import SummaryWriter

from torchmetrics import F1Score

from torchvision import  datasets, transforms
import torchvision.models as models

import tqdm


torch.set_default_dtype(torch.float32)

# splitting a dataset into training, validation, and testing sets

In [36]:
data_dir = './handwritten'
split_dir = './ttv'
train_dir = train_data_path = f'{split_dir}/train'
val_dir = val_data_path = f'{split_dir}/val'
test_dir = test_data_path = f'{split_dir}/test'

os.makedirs(train_dir, exist_ok=True)
os.makedirs(val_dir, exist_ok=True)
os.makedirs(test_dir, exist_ok=True)

if not os.path.isdir(split_dir):
    for class_folder in os.listdir(data_dir):
        class_path = os.path.join(data_dir, class_folder)
        if not os.path.isdir(class_path):
            continue

        # Create class folders in train, validation, and test directories
        train_class_path = os.path.join(train_dir, class_folder)
        val_class_path = os.path.join(val_dir, class_folder)
        test_class_path = os.path.join(test_dir, class_folder)
        os.makedirs(train_class_path, exist_ok=True)
        os.makedirs(val_class_path, exist_ok=True)
        os.makedirs(test_class_path, exist_ok=True)

        # List all images in the class folder
        images = os.listdir(class_path)

        # Shuffle the images
        random.shuffle(images)

        # Split the images into train, validation, and test sets (e.g., 70-20-10 split)
        train_split_index = int(0.7 * len(images))
        val_split_index = int(0.9 * len(images))
        train_images = images[:train_split_index]
        val_images = images[train_split_index:val_split_index]
        test_images = images[val_split_index:]

        # Move images to train, validation, and test folders
        for image in train_images:
            src = os.path.join(class_path, image)
            dst = os.path.join(train_class_path, image)
            shutil.copy(src, dst)

        for image in val_images:
            src = os.path.join(class_path, image)
            dst = os.path.join(val_class_path, image)
            shutil.copy(src, dst)

        for image in test_images:
            src = os.path.join(class_path, image)
            dst = os.path.join(test_class_path, image)
            shutil.copy(src, dst)

# test some transformations, gain visual insight into the data

##### Define Transformations

In [37]:
transform = transforms.Compose([
    transforms.RandomRotation(degrees=25),  # Rotate the image randomly between -45 and 45 degrees
    #transforms.RandomHorizontalFlip(p=0.5),  # Randomly flip the image horizontally with a probability of 0.5
    transforms.RandomResizedCrop(size=224, scale=(0.8, 1.0), ratio=(0.75, 1.333)),  # Randomly crop and resize the image
    transforms.ToTensor(),  # Convert the image to a PyTorch tensor
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize the image with mean and standard deviation
])

##### Load datasets

In [38]:
train_dataset = datasets.ImageFolder(train_data_path, transform=transform)
val_dataset = datasets.ImageFolder(val_data_path, transform=transform)
test_dataset = datasets.ImageFolder(test_data_path, transform=transform)

##### Create DataLoaders

In [39]:
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)

##### Dataset sizes

In [None]:
print('Training dataset size: ', len(train_dataset))
print('Validation dataset size: ', len(val_dataset))
print('Test dataset size: ', len(test_dataset))

In [None]:
# Iterate over DataLoader
for images, labels in train_loader:
    print(images.shape)
    print(labels.shape)
    break  # break after printing the first batch

# Display one image
plt.imshow(images[0].numpy().squeeze().transpose(1, 2, 0), cmap='gray_r')
plt.title(f'Label: {labels[0]}')
plt.show()

# Display multiple images
figure = plt.figure(figsize=(15, 10))
num_of_images = 60
for index in range(1, num_of_images + 1):
    plt.subplot(6, 10, index)
    plt.axis('off')
    plt.imshow(images[index].numpy().squeeze().transpose(1, 2, 0), cmap='gray_r')
    plt.title(f'Label: {labels[index]}')
plt.show()

# Model Definitions

##### CustomDataset

In [42]:
class custom_dataset(Dataset):
    def __init__(self, mode = 'train', root = f'{split_dir}/', transforms = None):
        super().__init__()
        self.mode = mode
        self.root = root
        self.transforms = transforms
        
        #select split
        self.folder = os.path.join(self.root, self.mode)
        
        #initialize lists
        self.image_list = []
        self.label_list = []
        
        #save class lists
        self.class_list = os.listdir(self.folder)
        self.class_list.sort()
        
        for class_id in range(len(self.class_list)):
            for image in os.listdir(os.path.join(self.folder, self.class_list[class_id])):
                self.image_list.append(os.path.join(self.folder, self.class_list[class_id], image))
                label = np.zeros(len(self.class_list), dtype=np.float32)
                label[class_id] = 1.0
                self.label_list.append(label)

    def __getitem__(self, index):
        image_name = self.image_list[index]
        label = self.label_list[index]
        
        
        image = Image.open(image_name)
        if(self.transforms):
            image = self.transforms(image)
        
        label = torch.tensor(label)
        
        return image, label
            
    def __len__(self):
        return len(self.image_list)

### ResNet18 models

In [43]:
class ResNet18(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        self.resnet18 = models.resnet18(weights=None)
        self.resnet18 = torch.nn.Sequential(*(list(self.resnet18.children())[:-1]))
        self.classifier = nn.Linear(512, num_classes)

    def forward(self, image):
        # Get predictions from ResNet18
        resnet_pred = self.resnet18(image).squeeze()
        out = self.classifier(resnet_pred)
        return out

# pretrained one
class ResNet18_Pre(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        self.resnet18 = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)  # Load pretrained ResNet18
        # Modify the model by removing the final fully connected layer
        self.resnet18 = torch.nn.Sequential(*(list(self.resnet18.children())[:-1]))
        # Freeze the layers of ResNet18
        for param in self.resnet18.parameters():
            param.requires_grad = False
        
        self.classifier = nn.Linear(512, num_classes)

    def forward(self, image):
        # Get predictions from ResNet18 and remove the extra dimension
        resnet_pred = self.resnet18(image).squeeze()  # Squeeze to remove the extra dimension
        out = self.classifier(resnet_pred)
        return out

### VGG16 Models

In [44]:


class VGG16(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        self.vgg16 = models.vgg16(weights=None)

        self.vgg16.classifier = torch.nn.Sequential(*(list(self.vgg16.classifier.children())[:-1]))
        # Define the new classifier
        self.classifier = nn.Linear(4096, num_classes)

    def forward(self, x):
        # Pass input through the VGG16 backbone
        x = self.vgg16(x)
        # Flatten the output
        x = x.view(x.size(0), -1)
        # Pass through the new classifier
        x = self.classifier(x)
        return x

#pretrained one
class VGG16_Pre(nn.Module):
    def __init__(self, num_classes):
        super().__init__() 
        self.vgg16 = models.vgg16(weights=models.VGG16_Weights.DEFAULT)
        
        for param in self.vgg16.features.parameters():
            param.requires_grad = False
        self.vgg16.classifier = torch.nn.Sequential(*(list(self.vgg16.classifier.children())[:-1]))

        # Define the new classifier
        self.classifier = nn.Linear(4096, num_classes)

    def forward(self, x):
        # Pass input through the VGG16 backbone
        x = self.vgg16(x)
        # Flatten the output
        x = x.view(x.size(0), -1)
        # Pass through the new classifier
        x = self.classifier(x)
        return x

In [45]:
save_model_path = "checkpoints/"
pth_name = "saved_model.pth"

# Custom Functions

### Validate

In [46]:
def validate(model, data_val, loss_function, writer, epoch, device):
    f1score = 0
    f1 = F1Score(num_classes=len(data_val.dataset.class_list), task = 'multiclass')
    data_iterator = enumerate(data_val)  # take batches
    f1_list = []
    f1t_list = []

    with torch.no_grad():
        model.eval()  # switch model to evaluation mode
        tq = tqdm.tqdm(total=len(data_val))
        tq.set_description('Validation:')

        total_loss = 0

        for _, batch in data_iterator:
            # forward propagation
            image, label = batch
            image = image.to(device)
            label = label.to(device)
            pred = model(image)

            loss = loss_function(pred, label.float())

            pred = pred.softmax(dim=1)
            
            f1_list.extend(torch.argmax(pred, dim =1).tolist())
            f1t_list.extend(torch.argmax(label, dim =1).tolist())

            total_loss += loss.item()
            tq.update(1)  
    f1score = f1(torch.tensor(f1_list), torch.tensor(f1t_list))
    writer.add_scalar("Validation F1", f1score, epoch)
    writer.add_scalar("Validation Loss", total_loss/len(data_val), epoch)
    tq.close()
    print("F1 score: ", f1score)
    return None


### Train

In [47]:
def train(model, train_loader, val_loader, optimizer, loss_fn, n_epochs, device, writer,pth_n=pth_name):
    model.to(device)  # Move the model to the specified device (e.g., GPU or CPU)
    model.train()  # Set the model to training mode
    for epoch in range(n_epochs):

        model.train()
        running_loss = 0.0

        tq = tqdm.tqdm(total=len(train_loader))
        tq.set_description('epoch %d' % (epoch))

        for i, (images, labels) in enumerate(train_loader):
            images = images.to(device)  # Move the batch of images to the specified device
            labels = labels.to(device)  # Move the batch of labels to the specified device

            optimizer.zero_grad()  # Reset the gradients of the optimizer
            # Forward pass
            outputs = model(images)
            # Compute loss
            loss = loss_fn(outputs, labels)
            # Backward pass
            loss.backward()
            # Update model parameters
            optimizer.step()
            running_loss += loss.item()
            tq.set_postfix(loss_st='%.6f' % loss.item())
            tq.update(1)

        print("Training Loss", running_loss / len(train_loader))
        writer.add_scalar("Training Loss", running_loss / len(train_loader), epoch)
        tq.close()
        epoch_loss = running_loss / len(train_loader)
        print('Epoch [{}/{}], Loss: {:.4f}'.format(epoch + 1, n_epochs, epoch_loss))
        # check the performance of the model on unseen dataset
        validate(model, val_loader, loss_fn, writer, epoch, device)

        # save the model in pth format
        checkpoint = {
            'epoch': epoch + 1,
            'state_dict': model.state_dict(),
            'optimizer': optimizer.state_dict()
        }

        torch.save(checkpoint, os.path.join(save_model_path, pth_n))
        print("saved the model " + save_model_path)

### Evaluate

In [48]:
def evaluate(model_name, Model, test_loader):
    print(f'Model - {model_name.split('/')[-1]}:')
    print('Testing...')

    # Load the state dict from the checkpoint file
    checkpoint = torch.load(model_name, weights_only=True)

    # Extract only the model state_dict from the checkpoint
    model_state_dict = checkpoint['state_dict']

    # Initialize the model
    model = Model(num_classes=len(train_data.class_list))

    # Load the model weights into the model
    model.load_state_dict(model_state_dict)

    # Move the model to the appropriate device (e.g., 'cpu' or 'cuda')
    device = torch.device('cuda')
    model.to(device)

    # Set the model to evaluation mode
    model.eval()

    correct = 0
    total = 0
    f1 = F1Score(num_classes=len(test_loader.dataset.class_list), average='macro', task='multiclass').to(device)
    class_correct = torch.zeros(len(test_loader.dataset.class_list), device=device)
    class_total = torch.zeros(len(test_loader.dataset.class_list), device=device)        
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            _, labels = torch.max(labels, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            
            # Update class-wise accuracy
            for i in range(len(labels)):
                label = labels[i]
                class_correct[label] += (predicted[i] == label).item()
                class_total[label] += 1
                
            # Update F1 score
            f1.update(predicted, labels)
    accuracy = 100 * correct / total
    print('Overall accuracy of the network on the test images: %.3f %%' % accuracy)
    
    f1_score = f1.compute()
    print('F1 score of the network on the test images: %.3f\n' % f1_score)

    return accuracy, f1_score


In [49]:
# Define transformations
train_tr = transforms.Compose([
    transforms.RandomRotation(degrees=25),
    transforms.Resize([224, 224]),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
val_test_tr = transforms.Compose([
    transforms.Resize([224, 224]),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Load datasets
train_data = custom_dataset(mode="train", transforms=train_tr)
val_data = custom_dataset(mode="val", transforms=val_test_tr)
test_data = custom_dataset(mode="test", transforms=val_test_tr)

# Define data loaders
train_loader = DataLoader(train_data, batch_size=64, shuffle=True)
val_loader = DataLoader(val_data, batch_size=16, drop_last=True)
test_loader = DataLoader(test_data, batch_size=16, drop_last=True)

# Training

In [50]:
def train_model(model_name, Model, train_data, Optimizer, epochs, writer_name, lr, momentum=0):
    # Choose optimizer and loss function
    model = Model(num_classes=len(train_data.class_list))
    optimizer = Optimizer(model.parameters(), lr=lr)

    # Setup TensorBoard
    writer = SummaryWriter(os.path.join('runs', writer_name))
    loss_fn = nn.CrossEntropyLoss()
    train(model, train_loader, val_loader, optimizer, loss_fn, n_epochs=epochs, device='cuda', writer=writer, pth_n=model_name)

##### ResNet18 Not Pretrained SGD

In [None]:
train_model('ResNet18_sgd_not_pre', ResNet18, train_data, SGD, 20, writer_name='resnet-not-pre-sgd', lr=0.001, momentum=0.9)

##### ResNet18 Pretrained SGD

In [None]:
train_model('ResNet18_sgd_pre', ResNet18_Pre, train_data, SGD, 20, writer_name='resnet-pre-sgd', lr=0.0001, momentum=0.9)

##### ResNet18 Not Pretrained Adam

In [None]:
train_model('ResNet18_adam_not_pre', ResNet18, train_data, Adam, 20, writer_name='resnet-not-pre-adam', lr=0.00001)

##### ResNet18 Pretrained Adam

In [None]:
train_model('ResNet18_adam_pre', ResNet18_Pre, train_data, Adam, 20, writer_name='resnet-pre-adam', lr=0.00075)

##### VGG16 Not Pretrained SGD

In [None]:
train_model('VGG16_sgd_not_pre', VGG16, train_data, SGD, 10, writer_name='vgg-not-pre-sgd'lr=0.001, momentum=0.9)

##### VGG16 Pretrained SGD

In [None]:
train_model('VGG16_sgd_pre', VGG16_Pre, train_data, SGD, 10, writer_name='vgg-pre-sgd', lr=0.00025, momentum=0.9)

##### VGG16 Not Pretrained Adam

In [None]:
train_model('VGG16_adam_not_pre', VGG16, train_data, Adam, 10, writer_name='vgg-not-pre-adam', lr=0.0001)

##### VGG16 Pretrained Adam

In [None]:
train_model('VGG16_adam_pre', VGG16_Pre, train_data, Adam, 10, writer_name='vgg-pre-adam', lr=0.00001)

# Test

In [None]:
evaluate('./checkpoints/ResNet18_sgd_not_pre', ResNet18, test_loader)
evaluate('./checkpoints/ResNet18_sgd_pre', ResNet18_Pre, test_loader)
evaluate('./checkpoints/ResNet18_adam_not_pre', ResNet18, test_loader)
evaluate('./checkpoints/ResNet18_adam_pre', ResNet18_Pre, test_loader)
evaluate('./checkpoints/VGG16_sgd_not_pre', VGG16, test_loader)
evaluate('./checkpoints/VGG16_sgd_pre', VGG16_Pre, test_loader)
evaluate('./checkpoints/VGG16_adam_not_pre', VGG16, test_loader)
evaluate('./checkpoints/VGG16_adam_pre', VGG16_Pre, test_loader)

# Real-Time Inference

In [None]:
model_name, Model = './checkpoints/VGG16_sgd_not_pre', VGG16

# Load the trained model
checkpoint = torch.load(model_name, map_location='cuda', weights_only=True)  # Load for Apple Silicon

# Extract only the model state_dict from the checkpoint
model_state_dict = checkpoint['state_dict']

# Initialize the model
model = Model(num_classes=len(train_data.class_list))

# Load the model weights into the model
model.load_state_dict(model_state_dict)

# Move the model to the appropriate device (e.g., 'cpu' or 'cuda')
device = torch.device('cuda')
model.to(device)

# Set the model to evaluation mode
model.eval()

# Initialize the webcam
cap = cv2.VideoCapture(1)  # Use 0 for the default camera

while True:
    ret, frame = cap.read()
    if not ret:
        break

    # Convert BGR (OpenCV default) to RGB for PIL compatibility
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

    # Preprocess the frame
    resized_frame = cv2.resize(frame_rgb, (224, 224))  # Resize for the model
    pil_image = Image.fromarray(resized_frame)  # Convert NumPy array to PIL Image
    tensor_frame = transform(pil_image).unsqueeze(0).to("cuda")  # Add batch dim and move to cuda

    # Predict using the model
    with torch.no_grad():
        output = model(tensor_frame)
        print(output.shape)  # Debugging step: Inspect the output shape
        if output.dim() == 1:  # Case 1: Single instance with no batch dimension
            predicted_class = output.argmax(dim=0).item()
        elif output.dim() == 2:  # Case 2: Batch dimension present
            predicted_class = output.argmax(dim=1).item()
        else:
            raise ValueError(f'Unexpected output shape: {output.shape}')

    # Display the result
    cv2.putText(frame, f'Prediction: {predicted_class}', (10, 90),
                cv2.FONT_HERSHEY_SIMPLEX, 3, (255, 0, 255), 6, cv2.LINE_AA)
    cv2.imshow('Number Recognition', frame)

    # Break the loop on 'q' key press
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()