In [89]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
import torch as torch
import torchvision as tv
from functools import partial
import os
import tempfile
from pathlib import Path
import fsspec
from sklearn.metrics import confusion_matrix, classification_report

In [78]:
import torch.nn.functional as F
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import random_split
import torchvision.transforms as transforms
import pyarrow.fs
import ray
from ray import tune
import ray.train as train
from ray.train import Checkpoint, get_checkpoint
from ray.tune.schedulers import ASHAScheduler
import ray.cloudpickle as pickle
from torch.utils.data import Dataset, DataLoader
from PIL import Image
from tqdm import tqdm

In [79]:
class EmotionDataset(Dataset):
    def __init__(self, data_dir, transform=None):
        self.data_dir = data_dir
        self.transform = transform
        self.images = []
        self.labels = []
        self.load_data()

    def load_data(self):
        # Assuming a structure where each class has its own subdirectory
        for label, class_dir in enumerate(os.listdir(self.data_dir)):
            class_path = os.path.join(self.data_dir, class_dir)
            for count, img_name in enumerate(os.listdir(class_path)):
                if count > 700:
                    break
                img_path = os.path.join(class_path, img_name)
                self.images.append(img_path)
                self.labels.append(label)

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img_path = self.images[idx]
        image = Image.open(img_path)
        label = self.labels[idx]

        if self.transform:
            image = self.transform(image)

        return image, label

In [80]:
def load_custom_data(data_dir="./data", batch_size=32):
    transform = transforms.Compose([
        transforms.RandomRotation(15),
        transforms.RandomAffine(
            degrees=0,
            translate=(0.01, 0.12),
            shear=(0.01, 0.03),
        ),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize((0.5), (0.5))
    ])

    train_dir = f"{data_dir}/train"
    test_dir = f"{data_dir}/test"
    
    trainset = EmotionDataset(data_dir=train_dir, transform=transform)
    testset = EmotionDataset(data_dir=test_dir, transform=transform)

    return trainset, testset

In [81]:
def accuracy(outputs, labels):
    _, preds = torch.max(outputs, dim=1)
    return torch.tensor(torch.sum(preds == labels).item() / len(preds))

In [82]:
# Can be used for any Image Classification task

class ImageClassificationBase(nn.Module):
    def training_step(self, batch):
        inputs, labels = batch
        outputs = self(inputs)
        loss = F.cross_entropy(outputs, labels)
        # loss = F.nll_loss(outputs, labels) # Convert for problem at hand
        acc = accuracy(outputs, labels)
        return {'loss': loss, 'acc': acc.detach()}
    
    def validation_step(self, batch):
        images, labels = batch 
        out = self(images)                    # Generate predictions
        loss = F.cross_entropy(out, labels)   # Calculate loss
        acc = accuracy(out, labels)           # Calculate accuracy
        return {'val_loss': loss.detach(), 'val_acc': acc.detach()}
        
    def get_metrics_epoch_end(self, outputs, validation=True):
        if validation:
            loss_ = 'val_loss'
            acc_ = 'val_acc'
        else:
            loss_ = 'loss'
            acc_ = 'acc'

        batch_losses = [x[f'{loss_}'] for x in outputs]
        epoch_loss = torch.stack(batch_losses).mean()   
        batch_accs = [x[f'{acc_}'] for x in outputs]
        epoch_acc = torch.stack(batch_accs).mean()      
        return {f'{loss_}': epoch_loss.detach().item(), f'{acc_}': epoch_acc.detach().item()}
    
    def epoch_end(self, epoch, result, num_epochs):
        print(f"Epoch: {epoch+1}/{num_epochs} -> lr: {result['lrs'][-1]:.5f} loss: {result['loss']:.4f}, acc: {result['acc']:.4f}, val_loss: {result['val_loss']:.4f}, val_acc: {result['val_acc']:.4f}\n")

In [96]:
class EmotionRecognition(ImageClassificationBase):
    def __init__(self):
        super().__init__()
        self.network = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(32),
            nn.MaxPool2d(2, 2), # output: 16 x 24 x 24

            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(64),
            nn.MaxPool2d(2, 2), # output: 64 x 12 x 12

            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.Conv2d(128, 128, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(128),
            nn.MaxPool2d(2, 2), # output: 128 x 6 x 6

            nn.Flatten(), 
            nn.Linear(128*6*6, 512),
            nn.ReLU(),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Linear(256, 7))
        
    def forward(self, xb):
        return self.network(xb)

    def __repr__(self):
        return f"{self.network}"
    
    def __str__(self):
        summary(self.network, (1, 48, 48))

In [97]:
@torch.no_grad()
def evaluate(model: object, val_loader: object) -> dict:
    '''
        Evaluate model on the validation set
        Input:
            model: training model object
            val_loder: validation data loader object
        Output:
            validation metrics
    '''

    model.eval()
    outputs = [model.validation_step(batch) for batch in val_loader]
    return model.get_metrics_epoch_end(outputs=outputs, validation=True)


def get_lr(optimizer: object) -> float:
    ''' Returns current learning rate'''

    for param_group in optimizer.param_groups:
        return param_group['lr']


def fit_model(model: object, 
              epochs: int, 
              lr: float, 
              train_loader: object, 
              val_loader: object,
              weight_decay: float=0, 
              grad_clip: float=None, 
              opt_func: object=torch.optim.SGD):
    '''
        This function is responsible for training our model.
        We use a One Cycle learning rate policy to update our learning rate 
        with each epoch.
        The best model is saved during each epoch.
        Input:
            model_name: str 
            model: object
            epochs: int -> Max epochs
            max_lr: float -> Maximum allowed learning rate during learning
            train_loader: training set data loader
            val_loader: validation set data loader
            weight_decay: float -> value to decrease weights during training of each batch
            grad_clip: float -> maximum allowed gradient value
            opt_func: optimzer object
        Output:
            history: list of metrics
    '''

    model_name = "test_model"
    BEST_VAL_SCORE = 0.0 # for keeping track of best model score
    history = []

    optimizer = opt_func(model.parameters(), lr, weight_decay=weight_decay)
    scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer=optimizer, max_lr=lr,
                                                    epochs=epochs, 
                                                    steps_per_epoch=len(train_loader))

    for epoch in range(epochs):
        train_history = []
        lrs = []

        # Training Phase 
        model.train()
        for batch in tqdm(train_loader, ascii=True, desc=f'Epoch: {epoch+1}/{epochs}'):
            info = model.training_step(batch)
            loss = info['loss']
            # contains batch loss and acc for training phase
            train_history.append(info)
            loss.backward()

            # Gradient clipping
            if grad_clip:
                nn.utils.clip_grad_value_(model.parameters(), grad_clip)

            optimizer.step()
            optimizer.zero_grad()

            lrs.append(get_lr(optimizer))
            scheduler.step()


        train_result = model.get_metrics_epoch_end(train_history, validation=False)
        val_result = evaluate(model, val_loader)
        result = {**train_result, **val_result}
        result['lrs'] = lrs
        model.epoch_end(epoch, result, epochs)

        # Save the best model
        if result['val_acc'] > BEST_VAL_SCORE:
            BEST_VAL_SCORE = result['val_acc']
            save_name = f"{model_name}_epoch-{epoch+1}_score-{round(result['val_acc'], 4)}.pth"
            !rm -f '{model_name}'_*
            torch.save(model.state_dict(), save_name)

        history.append(result)
    return history

@torch.no_grad()
def generate_prediction(model, data) -> None:
    '''Generate prediction on the test set'''

    # load test dataset
    test_dl = DataLoader(data, batch_size=32, shuffle=True)

    # generate prediction using the validation step method defined in Base class
    with torch.no_grad():
        model.eval()
        outputs = [model.validation_step(batch) for batch in test_dl]
        metrics = model.get_metrics_epoch_end(outputs=outputs, validation=True)

    print(f"Test Scores:\n Loss: {round(metrics['val_loss'], 3)}, Accuracy: {round(metrics['val_acc'], 3)}")

In [98]:
def end_to_end(trainset, testset, parameters: dict=None) -> dict:
    '''
        A simple function end-to-end training and testing on the selected model.
        Inputs:
            model_name: str -> chosen model name
            parameters: dict -> dictionary of hyperparameters for the model
        Outputs:
            history: dict -> dictionary containing model metrics(loss, score, lr)

    '''
    torch.cuda.empty_cache()

    # hyperparameters
    BATCH_SIZE = parameters["batch_size"]
    epochs = parameters["epochs"]
    max_lr = parameters["max_lr"]
    weight_decay = parameters["weight_decay"]
    grad_clip = parameters["grad_clip"]
    opt_func = parameters["opt_func"]

    # get transformed dataset
    train_dl = DataLoader(trainset, BATCH_SIZE, shuffle=True)
    valid_dl = DataLoader(testset, BATCH_SIZE, shuffle=True)

    # # Print batch img shape, label and plot image:
    # img, label = next(iter(train_dl))
    # print(f"Batch Shape: {img.shape}")
    # print(f"Label: {label}")
    # plt.imshow(img[0].numpy().squeeze(), cmap='gray')
    # plt.show()

    # get model
    model = EmotionRecognition()

    # move model to GPU
    # model = to_device(model, device)
    
    # train model
    history = fit_model(
                model, 
                epochs, 
                max_lr, 
                train_dl, 
                valid_dl,
                weight_decay, 
                grad_clip, 
                opt_func
            )

    # cleaning
    torch.cuda.empty_cache()

    # generate predictions
    print("Generating predictions on the Test set")
    generate_prediction(model, testset)
    return history

In [99]:
training_parameters = {
    "epochs": 10,
    "max_lr": 0.005,
    "weight_decay": 0.1,
    "grad_clip": 1e-4,
    "opt_func": torch.optim.SGD,
    "batch_size": 32

}

In [100]:
trainset, testset = load_custom_data(data_dir="C:/Users/joey5/OneDrive/KU/Fourth Year/AppML/Final_Project", batch_size=32)


In [101]:
# train the model
history = end_to_end(trainset, testset, training_parameters)

Epoch: 1/10:   0%|          | 0/898 [00:00<?, ?it/s]

Epoch: 1/10: 100%|##########| 898/898 [02:43<00:00,  5.49it/s]


Epoch: 1/10 -> lr: 0.00140 loss: 1.9187, acc: 0.2370, val_loss: 1.9277, val_acc: 0.2470



'rm' is not recognized as an internal or external command,
operable program or batch file.
Epoch: 2/10: 100%|##########| 898/898 [02:30<00:00,  5.98it/s]
Epoch: 3/10:   0%|          | 0/898 [00:00<?, ?it/s]

Epoch: 2/10 -> lr: 0.00380 loss: 1.9415, acc: 0.2516, val_loss: 1.9456, val_acc: 0.2467



Epoch: 3/10: 100%|##########| 898/898 [02:30<00:00,  5.97it/s]
Epoch: 4/10:   0%|          | 1/898 [00:00<02:02,  7.30it/s]

Epoch: 3/10 -> lr: 0.00500 loss: 1.9456, acc: 0.2511, val_loss: 1.9456, val_acc: 0.2467



Epoch: 4/10: 100%|##########| 898/898 [02:35<00:00,  5.77it/s]


Epoch: 4/10 -> lr: 0.00475 loss: 1.9456, acc: 0.2515, val_loss: 1.9456, val_acc: 0.2473



'rm' is not recognized as an internal or external command,
operable program or batch file.
Epoch: 5/10: 100%|##########| 898/898 [02:20<00:00,  6.38it/s]
Epoch: 6/10:   0%|          | 1/898 [00:00<02:11,  6.80it/s]

Epoch: 5/10 -> lr: 0.00406 loss: 1.9456, acc: 0.2513, val_loss: 1.9456, val_acc: 0.2467



Epoch: 6/10: 100%|##########| 898/898 [02:11<00:00,  6.81it/s]
Epoch: 7/10:   0%|          | 1/898 [00:00<02:10,  6.85it/s]

Epoch: 6/10 -> lr: 0.00306 loss: 1.9456, acc: 0.2511, val_loss: 1.9456, val_acc: 0.2473



Epoch: 7/10: 100%|##########| 898/898 [02:03<00:00,  7.28it/s]
Epoch: 8/10:   0%|          | 1/898 [00:00<01:59,  7.51it/s]

Epoch: 7/10 -> lr: 0.00194 loss: 1.9456, acc: 0.2513, val_loss: 1.9456, val_acc: 0.2470



Epoch: 8/10: 100%|##########| 898/898 [01:57<00:00,  7.61it/s]
Epoch: 9/10:   0%|          | 1/898 [00:00<01:46,  8.40it/s]

Epoch: 8/10 -> lr: 0.00094 loss: 1.9456, acc: 0.2513, val_loss: 1.9456, val_acc: 0.2473



Epoch: 9/10: 100%|##########| 898/898 [01:59<00:00,  7.49it/s]
Epoch: 10/10:   0%|          | 1/898 [00:00<01:51,  8.07it/s]

Epoch: 9/10 -> lr: 0.00025 loss: 1.9456, acc: 0.2511, val_loss: 1.9456, val_acc: 0.2470



Epoch: 10/10: 100%|##########| 898/898 [02:14<00:00,  6.68it/s]


Epoch: 10/10 -> lr: 0.00000 loss: 1.9456, acc: 0.2513, val_loss: 1.9456, val_acc: 0.2470

Generating predictions on the Test set
Test Scores:
 Loss: 1.946, Accuracy: 0.248


In [None]:
# Generate the confusion matrix
