In [None]:
import numpy as np

import torch as tr
import torch.nn as nn
from torch.utils.data import DataLoader
from torchvision import transforms
from torch.optim import AdamW
import torch.nn.init as init 

from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns

import os
import sys
sys.path.insert(1, '/kaggle/input/ferdata-set')
from FERData import FERDataset

from glob import glob

import matplotlib.pyplot as plt

%matplotlib inline

# Device configuration
device = tr.device('cuda' if tr.cuda.is_available() else 'cpu')

In [None]:
kaggle_path = '/kaggle/input/fer2013-custom'

'''
We apply the following transformations to the images:
On the training set:
- Resize the image to 224x224
- Random horizontal flip
- Convert the image to a tensor

On the validation and test set:
- Resize the image to 224x224
- Convert the image to a tensor
'''

train_transforms = transforms.Compose(
    [
        transforms.Resize((224, 224)),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor()
    ]
)

val_transforms = transforms.Compose(
    [
        transforms.Resize((224, 224)),
        transforms.ToTensor()
    ]
)

train_dataset = FERDataset(kaggle_path + '/dataset/train', transform = train_transforms)
val_dataset = FERDataset(kaggle_path + '/dataset/val', transform = val_transforms)
test_dataset =  FERDataset(kaggle_path + '/dataset/test', transform = val_transforms)

In [None]:
'''
After some testing we chose a 64 batch size, 
we can't test a bigger batch size due to the gpu size limit 
'''

batch_size = 64
train_dataLoader = DataLoader(train_dataset, batch_size, shuffle=True)
test_dataLoader = DataLoader(test_dataset, batch_size, shuffle=True)

val_dataLoader = DataLoader(val_dataset, batch_size, shuffle= True)

In [None]:
class VGG16(nn.Module):
    def __init__(self, num_classes=10):
        super(VGG16, self).__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU())
        self.layer2 = nn.Sequential(
            nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size = 2, stride = 2))
        self.layer3 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU())
        self.layer4 = nn.Sequential(
            nn.Conv2d(128, 128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size = 2, stride = 2))
        self.layer5 = nn.Sequential(
            nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU())
        self.layer6 = nn.Sequential(
            nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU())
        self.layer7 = nn.Sequential(
            nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size = 2, stride = 2))
        self.layer8 = nn.Sequential(
            nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU())
        self.layer9 = nn.Sequential(
            nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU())
        self.layer10 = nn.Sequential(
            nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size = 2, stride = 2))
        self.layer11 = nn.Sequential(
            nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU())
        self.layer12 = nn.Sequential(
            nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU())
        self.layer13 = nn.Sequential(
            nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size = 2, stride = 2))
        self.fc = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(7*7*512, 4096),
            nn.ReLU())
        self.fc1 = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(4096, 4096),
            nn.ReLU())
        self.fc2= nn.Sequential(
            nn.Linear(4096, num_classes))
        
    def initialize_weights(self, m):
        if isinstance(m, nn.Conv2d) or isinstance(m, nn.Linear):
            init.xavier_uniform_(m.weight)
            if m.bias is not None:
                init.constant_(m.bias, 0.0)

    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = self.layer5(out)
        out = self.layer6(out)
        out = self.layer7(out)
        out = self.layer8(out)
        out = self.layer9(out)
        out = self.layer10(out)
        out = self.layer11(out)
        out = self.layer12(out)
        out = self.layer13(out)
        out = out.view(out.size(0), -1)
        out = self.fc(out)
        out = self.fc1(out)
        out = self.fc2(out)
        return out

In [None]:
def train(dataLoader, model, optimizer, lossf):
    model.train()
    losses = []
    total = 0
    correct = 0
    
    for img, label in dataLoader:
        t_imgs = img.to(device, dtype=tr.float)
        t_labels = label.to(device, dtype=tr.int64)
        
        optimizer.zero_grad()
        
        prediction = model(t_imgs)
        loss = lossf(prediction, t_labels)
        losses.append(loss.item())
        
        loss.backward()
        optimizer.step()
        
        _, predicted = tr.max(prediction, 1)
        total += t_labels.size(0)
        correct += (predicted == t_labels).sum().item()
        
    
    train_loss = np.average(losses)
    train_acc = 100.0 * correct / total
    
    print(f'Train loss is {train_loss:.4f}')
    print(f'Training acc is {train_acc:.4f}%')
    
    return


In [None]:
def test(dataLoader, model, lossf):
  model.eval()
  losses, predictions, labels = np.array([]), np.array([]), np.array([])
  result = []

  for img, label in dataLoader:

    t_imgs = img.to(device, dtype=tr.float)
    t_labels = label.to(device, dtype=tr.int64)
    labels = np.append(labels, t_labels.cpu())

    output = model(t_imgs)
    _, prediction = tr.max(output, 1)
    predictions = np.append(predictions, prediction.cpu())

    #create result
    result.append(list(zip(img, label, prediction.cpu())))

    loss = lossf(output, t_labels)
    losses = np.append(losses, loss.item())

  return np.average(losses), predictions, labels, result

In [None]:
def val(dataLoader, model, lossf):
    model.eval()
    losses, predictions, result = [], [], []

    total = 0
    correct = 0

    for img, label in dataLoader:
        t_imgs = img.to(device, dtype=tr.float)
        t_labels = label.to(device, dtype=tr.int64)
        
        with tr.no_grad():
            output = model(t_imgs)
            _, prediction = tr.max(output, 1)
            predictions.extend(prediction.cpu().tolist())

        # Create result
        result.extend(list(zip(img, label, prediction.cpu().numpy())))

        loss = lossf(output, t_labels)
        losses.append(loss.item())

        total += t_labels.size(0)
        correct += (prediction ==  t_labels).sum().item()


    val_loss = np.average(losses)
    val_acc = 100.0 * correct / total

    print(f'Val loss is {val_loss:.4f}')
    print(f'Val acc is {val_acc:.4f}%')

    return val_loss, predictions, result


In [None]:
def run(train_dataLoader, val_dataLoader, model, optimizer, lossf, epochs=50):
    es_counter = 0  # counter early stopping
    es_limit = 20  # epoch limit for early stopping

    best_model = model
    best_epoch = None
    min_avgLosses = float('inf')
    
    path_model = None

    for epoch in range(epochs):
        print(f'Epoch {epoch} -', end = ' ' )
        
        train(train_dataLoader, model, optimizer, lossf)

        val_avgLosses,_, _ = val(val_dataLoader, model, lossf)

        if val_avgLosses < min_avgLosses:
            if path_model:
                # remove the old model if exists
                os.remove(path_model)
            
            min_avgLosses = val_avgLosses
            best_epoch = epoch
            best_model = model
            
            checkpoint = {'model': VGG16(7),
              'state_dict': best_model.state_dict(),
              'optimizer' : optimizer.state_dict()}
            path_model = f'checkpoint_{min_avgLosses}.pth'
            
            tr.save(checkpoint, path_model)
            print('-------MODELLO SALVATO-------')
            
            es_counter = 0
        else:
            es_counter += 1

        if es_counter > es_limit:
            print('---- EARLY STOPPING -----')
            break

    print(f'------> Best epoch: {best_epoch}, minAvgLosses: {min_avgLosses:.4f}')
    return best_model, min_avgLosses


In [None]:
model = VGG16(7).to(device)
model.apply(model.initialize_weights)
# set if you want best last trained model
best_model_flag = False
if best_model_flag:
    models_list = glob('/models/*')
    if models_list:
        best_last_model = sorted(models_list)[0] 
        model.load_state_dict(tr.load(best_last_model, map_location=device)) #must be a state dict
        print(f'{best_last_model} caricato!')
    else:
        print('Errore nel recupero del modello')
        
else:
  print('Caricato modello Vanilla!')

optimizer = AdamW(model.parameters(),lr = 0.0001,weight_decay=5e-4)
lossf = nn.CrossEntropyLoss()

epochs = 50

best_model, min_avgLosses = run(train_dataLoader, val_dataLoader, model, optimizer, lossf, epochs=epochs)

In [None]:
#carico il miglior modello dal train nel modello attuale
#model.load_state_dict(best_model.state_dict())
model = best_model
val_avgLosess, val_predictions, val_result = val(val_dataLoader, model, lossf)
print(classification_report(val_labels, val_predictions, digits=4))

In [None]:
#carico il miglior modello dal train nel modello attuale
#model.load_state_dict(best_model)

model = best_model
test_avgLosess, test_predictions, test_labels, test_result = test(test_dataLoader, model, lossf)
print(classification_report(test_labels, test_predictions, digits=4))