# Checking Cuda Environment

In [None]:
import torch
import os
from ghostnet import ghostnet
from moganet import MogaNet
import torch.nn as nn
from torchvision import transforms
from torch.utils.data import DataLoader
from timm.data import create_dataset
from tqdm import tqdm
import copy
import pandas as pd
import numpy as np
from torch.utils import data
from PIL import Image

assert torch.cuda.is_available()

# Get the GPU device name.
device_name = torch.cuda.get_device_name()
n_gpu = torch.cuda.device_count()
print(f"Found device: {device_name}, n_gpu: {n_gpu}")
print(f'Cuda version: {torch.version.cuda}')
device = torch.device("cuda")
print(f'n_cpu: {os.cpu_count()}')

# Select Network and Dataset for training and test

In [None]:
networks = ['ghostnet', 'ghostnet_prelu', 'moganet']
datasets = ['affectnet', 'affectnet_lr', 'affectnet_lr_noise_free', 'fer2013']

# Select the network and dataset.
network = networks[2]
dataset_name = datasets[2]
num_mlp = 3 # (1 or 3)

# Loading Pre-trained Network

In [None]:
num_classes = 7 if dataset_name == datasets[3] else 8

def replace_relu_with_prelu(module):
    for name, child_module in module.named_children():
        if isinstance(child_module, nn.ReLU):
            setattr(module, name, nn.PReLU())
        else:
            replace_relu_with_prelu(child_module)
            
def get_classifier_MLP(in_features):
    dropout_prob = 0.5
    num_classes = 8

    if num_mlp == 1:
        return nn.Linear(in_features, num_classes)
    else:
        return nn.Sequential(
            nn.Dropout(dropout_prob).cuda(),
            nn.Linear(in_features, 1000).cuda(),
            nn.ReLU(inplace=True).cuda(),
            nn.Dropout(dropout_prob).cuda(),
            nn.Linear(1000, 256).cuda(),
            nn.ReLU(inplace=True).cuda(),
            nn.Linear(256, num_classes).cuda(),
            nn.Softmax(dim=1))

def load_pretrained_netwok(net = network):
    if net == networks[0]:
        model = ghostnet(num_classes=1000, width=1.0, dropout=0.2)
        model.load_state_dict(torch.load('state_dict_73.98.pth'))
        num_features = model.classifier.in_features
        new_last_layer = get_classifier_MLP(num_features)
        model.classifier = new_last_layer
    elif net == networks[1]:
        model = ghostnet(num_classes=1000, width=1.0, dropout=0.2)
        model.load_state_dict(torch.load('state_dict_73.98.pth'))
        num_features = model.classifier.in_features
        new_last_layer = get_classifier_MLP(num_features)
        model.classifier = new_last_layer
        replace_relu_with_prelu(model)
    elif net == networks[2]:
        model = MogaNet()
        model.load_state_dict(torch.load('moganet_tiny_sz224_8xbs128_ep300.pth.tar')['state_dict'])
        num_features = model.head.in_features
        new_last_layer = get_classifier_MLP(num_features)
        model.head = new_last_layer
    model = torch.nn.DataParallel(model, device_ids = list(range(1)))
    return model

model = load_pretrained_netwok()

# Load Dataset

In [None]:
class AffectNet(data.Dataset):
    def __init__(self, aff_path, mode, use_cache=True, transforms=None, force=False):
        self.mode = mode
        self.transforms = transforms
        self.aff_path = aff_path
        self.base_path = aff_path
        
        if mode == "train": 
            df = pd.read_csv('DownScaledAffectNet/new_train_labels.csv')
        elif mode == "test":
            df = pd.read_csv('DownScaledAffectNet/new_test_labels.csv')
        
        self.data = df 
        self.file_paths = self.data.loc[:, 'pth'].values
        self.label = self.data.loc[:, 'label'].values
        self.emotion_labels=['Neutral','Happiness', 'Sadness', 'Surprise', 'Fear', 'Disgust', 'Anger', 'Contempt']
        sample_label, sample_counts = np.unique(self.label, return_counts=True)
        self.dick = { 'neutral' :0 ,'happy' : 1, 'sad' :2 , 'surprise':3, 'fear':4, 'disgust':5, 'anger':6, 'contempt':7}
        for l, c in zip(sample_label, sample_counts):
            print(f'{self.emotion_labels[self.dick[l.lower()]]}: {c} ', end='')
        print(f'\n{len(self)} images')


    def get_weight(self):
        self.emotion_labels=['Neutral','Happiness', 'Sadness', 'Surprise', 'Fear', 'Disgust', 'Anger', 'Contempt']
        self.class_to_idx = {}
        self.idx_to_class = {}
        for i, emotion in enumerate(self.emotion_labels):
            self.class_to_idx[emotion] = i
            self.idx_to_class[i] = emotion
        sample_label, sample_counts = np.unique(self.label, return_counts=True)
        for l, c in zip(sample_label, sample_counts):
            print(f'{self.emotion_labels[self.dick[l.lower()]]}: {c} ', end='')
        print('')
        
        cw = 1/sample_counts
        cw /= cw.min()
        class_weights = {self.dick[i.lower()]:cwi for i, cwi in zip(sample_label, cw)}
        print(class_weights)
        return class_weights

    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, idx):
        yo = self.file_paths[idx]
        path = os.path.join(self.base_path, yo)
        image = Image.open(path).convert('RGB')
        label = self.label[idx]

        if self.transforms is not None:
            image = self.transforms(image)
        
        return image, self.dick[label]

In [None]:
imsize = 224
loader = transforms.Compose([
    transforms.Resize(imsize),  # scale imported image
    transforms.CenterCrop(imsize),
    transforms.ToTensor()])  # transform it into a torch tensor
batch_size = 64

def load_dataset(data_name=dataset_name):
    if data_name == datasets[0]:
        dataset = create_dataset(name='', root='AffectNet/affectnet/train', transform=loader)
        test_dataset = create_dataset(name='', root='AffectNet/affectnet/val_class', transform=loader)
        train_size = int(0.9 * len(dataset))  # 90% for training
        val_size = len(dataset) - train_size  # Remaining 10% for test
        train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])
    elif data_name == datasets[1]:
        dataset = create_dataset(name='', root='DownScaledAffectNet', transform=loader)
        train_size = int(0.8 * len(dataset))  # 80% for training
        val_size = int(0.1 * len(dataset))   # 10% for validation
        test_size = len(dataset) - train_size - val_size  # Remaining 10% for test
        train_dataset, val_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, val_size, test_size])
    elif data_name == datasets[2]:
        affectnet_dir = 'DownScaledAffectNet'
        dataset = AffectNet(affectnet_dir, 'train', transforms=loader, force=False)
        train_size = int(0.9 * len(dataset))  # 90% for training
        val_size = len(dataset) - train_size  # Remaining 10% for val
        train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])
        test_dataset = AffectNet(affectnet_dir, 'test', transforms=loader)
    elif data_name == datasets[3]:
        dataset = create_dataset(name='', root='FER-2013/train', transform=loader)
        test_dataset = create_dataset(name='', root='FER-2013/test', transform=loader)
        train_size = int(0.9 * len(dataset))  # 90% for training
        val_size = len(dataset) - train_size  # Remaining 10% for val
        train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])
    num_workers = 8
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers, pin_memory=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, num_workers=num_workers)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, num_workers=num_workers)
    return train_loader, val_loader, test_loader

train_loader, val_loader, test_loader = load_dataset()

# Training the model

In [None]:
def train_model(alpha=1, num_epochs = 50):
    loss_fn = torch.nn.CrossEntropyLoss().cuda()

    if network == networks[2]:
        optimizer = torch.optim.AdamW(filter(lambda p: p.requires_grad, model.parameters()), lr=alpha*0.001, weight_decay=0.04)
        scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones=[2,4,8,16,32], gamma=0.5)
    else:
        optimizer = torch.optim.RMSprop(filter(lambda p: p.requires_grad, model.parameters()), lr=alpha*0.0001, weight_decay=0.01, momentum=0.9)
        scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones=[5,15,25], gamma=0.1)

    best_acc=0
    best_loss= float('inf')
    best_model=None

    for epoch in range(num_epochs):
        model.train()
        total_samples = 0
        correct_predictions = 0
        total_loss = 0

        loop = tqdm(train_loader)
        for images, labels in loop:
            optimizer.zero_grad()

            outputs = model(images)
            _, predicted = torch.max(outputs, dim=1)
            loss = loss_fn(outputs, labels.cuda())

            total_samples += labels.cuda().size(0)
            correct_predictions += (predicted == labels.cuda()).sum().item()
            total_loss += loss.item()

            loss.backward()
            optimizer.step()

            loop.set_description(f"Epoch [{epoch}/{num_epochs}]")
            loop.set_postfix(loss=loss.item(), acc=(predicted == labels.cuda()).sum().item()/labels.cuda().size(0))

        scheduler.step()
        avg_loss = total_loss / len(train_loader)
        avg_acc = correct_predictions / total_samples
        print("TRAINING")
        print("loss=", avg_loss, ", accuracy=", avg_acc)

        total_samples = 0
        correct_predictions = 0
        total_loss = 0
        model.eval()
        with torch.no_grad():
            epoch_val_accuracy = 0
            epoch_val_loss = 0
            for images, labels in val_loader:
                outputs = model(images)
                _, predicted = torch.max(outputs, dim=1)
                loss = loss_fn(outputs, labels.cuda())

                total_samples += labels.cuda().size(0)
                correct_predictions += (predicted == labels.cuda()).sum().item()
                total_loss += loss.item()

        avg_loss = total_loss / len(val_loader)
        avg_acc = correct_predictions / total_samples
        print("VALIDATION")
        print("loss=", avg_loss, ", accuracy=", avg_acc)

        if best_acc < avg_acc:
            best_acc = avg_acc
            best_model=copy.deepcopy(model.state_dict())
            torch.save(model.state_dict(), './checkpoint_model_1.pth')
        
def FreezeParams(exception='head'):
    for name, param in model.module.named_parameters():
        if(exception not in name):
            param.requires_grad = False # Freeze all layers except the last layer
        else:
            param.requires_grad = True

if num_mlp > 1:
    if network == networks[2]:
        FreezeParams('head')
    else:
        FreezeParams('classifier')
    train_model(num_epochs=10)
    for param in model.parameters():
        param.requires_grad = True
    train_model(alpha=0.1, num_epochs=20)
else:
    train_model(num_epochs=20)

# Testing Model

In [None]:
if best_model is not None:
    model.load_state_dict(best_model)
    print(f"Best Validation acc:{best_acc}")

    total_samples = 0
    correct_predictions = 0
    total_loss = 0
    model.eval()
    with torch.no_grad():
        epoch_val_accuracy = 0
        epoch_val_loss = 0
        for images, labels in test_loader:
            outputs = model(images)
            _, predicted = torch.max(outputs, dim=1)
            loss = loss_fn(outputs, labels.cuda())

            total_samples += labels.size(0)
            correct_predictions += (predicted == labels.cuda()).sum().item()
            total_loss += loss.item()

    avg_loss = total_loss / len(test_loader)
    avg_acc = correct_predictions / total_samples
    print("TESTING")
    print("loss=", avg_loss, ", accuracy=", avg_acc)
    
#     torch.save(best_model, f'ghostnet_checkpoints/best_model_0.pth')
else:
    print(f"No best model Best acc:{best_acc}")