# CNN for FER

By Fatemeh Ghezloo and Michelle Lin



## Usage

The purpose of this notebook is to train a CNN for Facial Expression Recognition Task on FER2013 dataset. For improving the CNN performance, we have enabled the data augmentation to add more image samples to class disgust. This is done to make the dataset more balanced among different classes.

To run this notebook, please make sure to first download the [FER2013 Kaggle dataset](https://www.kaggle.com/c/challenges-in-representation-learning-facial-expression-recognition-challenge). Because this dataset is part of a challenge, we are not allowed to share the data.



## Initial setup

Run the following sections to make sure that you have imported all necessary packages, can read from/write to your Google Drive, and have copied all data to the GPU.

In [1]:
from google.colab import drive

drive.mount('/gdrive/')
!ls /gdrive

import os

BASE_PATH = '/gdrive/My Drive/colab_files/final_project/'
if not os.path.exists(BASE_PATH):
    os.makedirs(BASE_PATH)
    # !wget https://courses.cs.washington.edu/courses/cse599g1/19au/files/homework2.tar
    # !tar -xvf homework2.tar
    # !rm homework2.tar
DATA_PATH = BASE_PATH + 'data/'

os.chdir(BASE_PATH)
!pwd
!ls


import torch
import torch.nn as nn
from torchvision import datasets
from torchvision import transforms
import numpy as np
import os
import torch.nn.functional as F
import torch.optim as optim
import h5py
import sys
import pt_util
sys.path.append(BASE_PATH)

Mounted at /gdrive/
MyDrive
/gdrive/My Drive/colab_files/final_project
1		  disgust_training.npy	neutral_training.npy  test
2		  fer2013.csv		pt_util.py	      tiny_imagenet
data		  logs			__pycache__
data.h5		  logs_sad		sad_test.npy
disgust_test.npy  neutral_test.npy	sad_training.npy


## Preprocess Input And Data Augmentation

In this section, data is loaded from a csv file and saved as h5py. 

In order to use data augmentation feature, make sue variable ***augment*** is set to True.

In [23]:
# create data and label for FER2013
# labels: 0=Angry, 1=Disgust, 2=Fear, 3=Happy, 4=Sad, 5=Surprise, 6=Neutral
import csv
import os
import numpy as np
import h5py
from sklearn.model_selection import train_test_split

file = BASE_PATH + 'fer2013.csv'

augment = False

# Creat the list to store the data and label information
y = []
x = []

datapath = os.path.join(BASE_PATH,'data.h5')
if not os.path.exists(os.path.dirname(datapath)):
    os.makedirs(os.path.dirname(datapath))

with open(file,'r') as csvin:
    data=csv.reader(csvin)
    for row in data:
      if row[-1] == 'Training' or row[-1] == "PublicTest" or row[-1] == 'PrivateTest':
        temp_list = []
        for pixel in row[1].split( ):
            temp_list.append(int(pixel))
        I = np.asarray(temp_list)
        y.append(int(row[0]))
        x.append(I.tolist())


#Data Augmentation
if augment:
  augmented_data_path = BASE_PATH + 'logs/epoch_1200/disgust_new.npy'
  new_disgust = np.load(augmented_data_path)
  new_disgust = new_disgust.reshape(len(new_disgust), 2304)
  I = new_disgust.tolist()

  for row in I:
    y.append(1)
    x.append(row)


X_train, X_test, y_train, y_test = train_test_split(x, y, test_size=0.33, train_size=0.66, shuffle = True, stratify=y)

print(np.shape(X_train))
print(np.shape(X_test))

datafile = h5py.File(datapath, 'w')

datafile.create_dataset("Test_pixel", dtype = 'uint8', data=X_test)
datafile.create_dataset("Test_label", dtype = 'int64', data=y_test)
datafile.create_dataset("Train_pixel", dtype = 'uint8', data=X_train)
datafile.create_dataset("Train_label", dtype = 'int64', data=y_train)
datafile.close()

print("Save data finish!!!")

(23685, 2304)
(11843, 2304)
Save data finish!!!


##FER Dataset

Run this to build FER dataset and it's dataloader.

In [2]:
''' Fer2013 Dataset class'''

from __future__ import print_function
from PIL import Image
import numpy as np
import h5py
import torch.utils.data as data

class FER2013(data.Dataset):

    def __init__(self, split='Training', transform=None):
        self.transform = transform
        self.split = split  # training set or test set
        self.data = h5py.File(BASE_PATH + 'data.h5', 'r', driver='core')
        # now load the picked numpy arrays
        if self.split == 'Training':
            self.train_data = self.data['Train_pixel']
            self.train_labels = self.data['Train_label']
            self.train_data = np.asarray(self.train_data)
            self.train_data = self.train_data.reshape((23685, 48, 48))

        elif self.split == 'Test':
            self.PublicTest_data = self.data['Test_pixel']
            self.PublicTest_labels = self.data['Test_label']
            self.PublicTest_data = np.asarray(self.PublicTest_data)
            self.PublicTest_data = self.PublicTest_data.reshape((11843, 48, 48))

        else:
            self.PrivateTest_data = self.data['PrivateTest_pixel']
            self.PrivateTest_labels = self.data['PrivateTest_label']
            self.PrivateTest_data = np.asarray(self.PrivateTest_data)
            self.PrivateTest_data = self.PrivateTest_data.reshape((3589, 48, 48))

    def __getitem__(self, index):
        """
        Args:
            index (int): Index
        Returns:
            tuple: (image, target) where target is index of the target class.
        """
        if self.split == 'Training':
            img, target = self.train_data[index], self.train_labels[index]
        elif self.split == 'Test':
            img, target = self.PublicTest_data[index], self.PublicTest_labels[index]
        else:
            img, target = self.PrivateTest_data[index], self.PrivateTest_labels[index]

        # doing this so that it is consistent with all other datasets
        # to return a PIL Image
        img = img[:, :, np.newaxis]
        img = np.concatenate((img, img, img), axis=2)
        img = Image.fromarray(img)
        if self.transform is not None:
            img = self.transform(img)
        return img, target

    def __len__(self):
        if self.split == 'Training':
            return len(self.train_data)
        elif self.split == 'Test':
            return len(self.PublicTest_data)
        else:
            return len(self.PrivateTest_data)

##CNN Structures

###CNN - Paper Structure

Here we implemented the CNN structure from this [paper](https://arxiv.org/abs/1711.00648) that we replicated. 



In [3]:
class FERCNN(nn.Module):
    
    def __init__(self):
        super(FERCNN, self).__init__()
        
        self.conv_layer = nn.Sequential(
            
            nn.Conv2d(in_channels=3, out_channels=64, kernel_size=3, padding=1, stride=1),                      
            nn.MaxPool2d(3, stride=2),
            nn.InstanceNorm2d(64),
            nn.ReLU(inplace=True),

            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1, stride=1),
            nn.MaxPool2d(3, stride=2),
            nn.InstanceNorm2d(128),
            nn.ReLU(inplace=True),
     
        )


        self.fc_layer = nn.Sequential(
            nn.Dropout(p=0.1),
            nn.Linear(6400, 256),
            nn.ReLU(inplace=True), 
            nn.Dropout(p=0.1),
            nn.Linear(256, 256),
            nn.ReLU(inplace=True), 
            nn.Linear(256, 7)
        )
 

        self.accuracy = None

    def forward(self, x):
        # conv layers
        x = self.conv_layer(x)
        
        x = x.view(x.size(0), -1)
        print(x.shape)

        # fc layer
        x = self.fc_layer(x)

        return x

    def loss(self, prediction, label, reduction='mean'):
        loss_val = F.cross_entropy(prediction, label.squeeze(), reduction=reduction)
        return loss_val

    def save_model(self, file_path, num_to_keep=1):
        pt_util.save(self, file_path, num_to_keep)
        
    def save_best_model(self, accuracy, file_path, num_to_keep=1):
        if self.accuracy == None or accuracy > self.accuracy:
            self.accuracy = accuracy
            self.save_model(file_path, num_to_keep)

    def load_model(self, file_path):
        pt_util.restore(self, file_path)

    def load_last_model(self, dir_path):
        return pt_util.restore_latest(self, dir_path)

###CNN - VGG19

We chose VGG19 model as a deeper network and In this section we implemented it.

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable


cfg = {
    'VGG11': [64, 'M', 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 512, 'M'],
    'VGG13': [64, 64, 'M', 128, 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 512, 'M'],
    'VGG16': [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 'M', 512, 512, 512, 'M', 512, 512, 512, 'M'],
    'VGG19': [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 256, 'M', 512, 512, 512, 512, 'M', 512, 512, 512, 512, 'M'],
}


class VGG(nn.Module):
    def __init__(self, vgg_name):
        super(VGG, self).__init__()
        self.features = self._make_layers(cfg[vgg_name])
        self.classifier = nn.Linear(512, 7)
        self.accuracy = None

    def forward(self, x):
        out = self.features(x)
        out = out.view(out.size(0), -1)
        out = F.dropout(out, p=0.5, training=self.training)
        out = self.classifier(out)
        return out

    def _make_layers(self, cfg):
        layers = []
        in_channels = 3
        for x in cfg:
            if x == 'M':
                layers += [nn.MaxPool2d(kernel_size=2, stride=2)]
            else:
                layers += [nn.Conv2d(in_channels, x, kernel_size=3, padding=1),
                           nn.BatchNorm2d(x),
                           nn.ReLU(inplace=True)]
                in_channels = x
        layers += [nn.AvgPool2d(kernel_size=1, stride=1)]
        return nn.Sequential(*layers)


    def loss(self, prediction, label, reduction='mean'):
        loss_val = F.cross_entropy(prediction, label.squeeze(), reduction=reduction)
        return loss_val

    def save_model(self, file_path, num_to_keep=1):
        pt_util.save(self, file_path, num_to_keep)
        
    def save_best_model(self, accuracy, file_path, num_to_keep=1):
        if self.accuracy == None or accuracy > self.accuracy:
            self.accuracy = accuracy
            self.save_model(file_path, num_to_keep)

    def load_model(self, file_path):
        pt_util.restore(self, file_path)

    def load_last_model(self, dir_path):
        return pt_util.restore_latest(self, dir_path)

###CNN - Modified Paper Structure 

In this section we modified paper's structure by adding one more convolution layer to it. Also, to prevent overfitting, we added dropouts at the fully connection part.
 



In [4]:
class FERCNNModified(nn.Module):
    
    def __init__(self):
        super(FERCNNModified, self).__init__()
        
        self.conv_layer = nn.Sequential(
            
            nn.Conv2d(in_channels=3, out_channels=64, kernel_size=3, padding=1, stride=1),                      
            nn.MaxPool2d(3, stride=2),
            nn.InstanceNorm2d(64),
            nn.ReLU(inplace=True),

            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1, stride=1),
            nn.MaxPool2d(3, stride=2),
            nn.InstanceNorm2d(128),
            nn.ReLU(inplace=True),

            nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, padding=1, stride=1),
            nn.MaxPool2d(3, stride=2),
            nn.InstanceNorm2d(256),
            nn.ReLU(inplace=True),
     
        )


        self.fc_layer = nn.Sequential(
            nn.Dropout(p=0.1),
            nn.Linear(6400, 256),
            nn.ReLU(inplace=True), 
            nn.Dropout(p=0.1),
            nn.Linear(256, 256),
            nn.ReLU(inplace=True), 
            nn.Linear(256, 7)
        )
 

        self.accuracy = None

    def forward(self, x):
        # conv layers
        x = self.conv_layer(x)
        
        x = x.view(x.size(0), -1)
        # print(x.shape)

        # fc layer
        x = self.fc_layer(x)

        return x

    def loss(self, prediction, label, reduction='mean'):
        loss_val = F.cross_entropy(prediction, label.squeeze(), reduction=reduction)
        return loss_val

    def save_model(self, file_path, num_to_keep=1):
        pt_util.save(self, file_path, num_to_keep)
        
    def save_best_model(self, accuracy, file_path, num_to_keep=1):
        if self.accuracy == None or accuracy > self.accuracy:
            self.accuracy = accuracy
            self.save_model(file_path, num_to_keep)

    def load_model(self, file_path):
        pt_util.restore(self, file_path)

    def load_last_model(self, dir_path):
        return pt_util.restore_latest(self, dir_path)

##Train and Test

Implementation of Train and Test functions. Also, this section provides total accuracy per epoch, accuracy per classes and loss values.

In [5]:
import time
def train(model, device, train_loader, optimizer, epoch, log_interval):
    model.train()
    losses = []
    class_correct = list(0. for i in range(7))
    class_total = list(0. for i in range(7))
    for batch_idx, (data, label) in enumerate(train_loader):
        data, label = data.to(device), label.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = model.loss(output, label)
        losses.append(loss.item())
        loss.backward()
        optimizer.step()
        _, predicted = torch.max(output.data, 1)
        c = (predicted == label.data).squeeze()
        listofints = [int(x) for x in label.shape]
        for i in range(listofints[0]):
            l = label.data[i]
            class_correct[l] += c[i]
            class_total[l] += 1

        if batch_idx % log_interval == 0:
            print('{} Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                time.ctime(time.time()),
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))
        #     for i in range(7):
        #       if class_total[i] == 0:
        #             print('Accuracy of {} : {} / {} = {:.4f} %'.format(i, class_correct[i], class_total[i], 0))
        #       else :
        #             print('Accuracy of {} : {} / {} = {:.4f} %'.format(i, class_correct[i], class_total[i], 100 * class_correct[i].item() / class_total[i]))
    
    return np.mean(losses)

def test(model, device, test_loader, log_interval=None):
    model.eval()
    test_loss = 0
    correct = 0
    class_correct = list(0. for i in range(7))
    class_total = list(0. for i in range(7))
    with torch.no_grad():
        for batch_idx, (data, label) in enumerate(test_loader):
            data, label = data.to(device), label.to(device)
            
            output = model(data)
            test_loss_on = model.loss(output, label, reduction='sum').item()
            test_loss += test_loss_on
            pred = output.max(1)[1]
            correct_mask = pred.eq(label.view_as(pred))
            num_correct = correct_mask.sum().item()
            correct += num_correct
            _, predicted = torch.max(output.data, 1)
            c = (predicted == label.data).squeeze()

            listofints = [int(x) for x in label.shape]
            for i in range(listofints[0]):
              l = label.data[i]
              class_correct[l] += c[i]
              class_total[l] += 1

            if log_interval is not None and batch_idx % log_interval == 0:
                print('{} Test: [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                    time.ctime(time.time()),
                    batch_idx * len(data), len(test_loader.dataset),
                    100. * batch_idx / len(test_loader), test_loss_on))


    test_loss /= len(test_loader.dataset)
    test_accuracy = 100. * correct / len(test_loader.dataset)


    print("Test acc:")
    for i in range(7):
      
      if class_total[i] == 0:
         print('Accuracy of class {} : {} / {} = {:.4f} %'.format(i, class_correct[i], class_total[i], 0))
      else :
         print('Accuracy of class {} : {} / {} = {:.4f} %'.format(i, class_correct[i], class_total[i], 100 * class_correct[i].item() / class_total[i]))
    

    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset), test_accuracy))
    return test_loss, test_accuracy

##Data Augmentation using Transforms

Adding transforms to train set to prevent our model from simply memorizing the input data.

In [6]:
transform_train = transforms.Compose([
    transforms.ColorJitter(brightness=10, contrast=5, saturation=0, hue=0),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
])

transform_test = transforms.Compose([
    transforms.ToTensor(),
])

trainset = FER2013(split = 'Training', transform=transform_train)
train_loader = torch.utils.data.DataLoader(trainset, batch_size=32, shuffle=True, num_workers=1)
PublicTestset = FER2013(split = 'Test', transform=transform_test)
test_loader = torch.utils.data.DataLoader(PublicTestset, batch_size=32, shuffle=False, num_workers=1)

##Evaluation

Training model for 500 epochs. Plotting train loss, test loss and test accuracy.

Also you can choose among the three different network structure implemented in this notebook. Simply uncomment your desired model and comment the other two in the *choose model* section. We reported our results based on the modified version of the paper structue(i.e. model = FERCNNModified().to(device))

In [8]:
BATCH_SIZE = 32
TEST_BATCH_SIZE = 32
EPOCHS = 500
LEARNING_RATE = 0.001
MOMENTUM = 0.9
USE_CUDA = True
SEED = 0
PRINT_INTERVAL = 100
WEIGHT_DECAY = 0.0005

EXPERIMENT_VERSION = "0.41" # increment this to start a new experiment
LOG_PATH = DATA_PATH + 'logs/' + EXPERIMENT_VERSION + '/'

# Now the actual training code
use_cuda = USE_CUDA and torch.cuda.is_available()


device = torch.device("cuda" if use_cuda else "cpu")
print('Using device', device)
import multiprocessing
print('num cpus:', multiprocessing.cpu_count())

kwargs = {'num_workers': multiprocessing.cpu_count(),
          'pin_memory': True} if use_cuda else {}

class_names = ['Angry', 'Disgust', 'Fear', 'Happy', 'Sad', 'Surprise', 'Neutral']


# Choose model
# model = VGG('VGG19').to(device)
# model = FERCNN().to(device)
model = FERCNNModified().to(device)
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE, weight_decay=WEIGHT_DECAY, betas=(0.5,0.999))
start_epoch = model.load_last_model(LOG_PATH)


train_losses, test_losses, test_accuracies = pt_util.read_log(LOG_PATH + 'log.pkl', ([], [], []))
test_loss, test_accuracy = test(model, device, test_loader)

test_losses.append((start_epoch, test_loss))
test_accuracies.append((start_epoch, test_accuracy))

try:
    for epoch in range(start_epoch, EPOCHS + 1):
        train_loss = train(model, device, train_loader, optimizer, epoch, PRINT_INTERVAL)
        test_loss, test_accuracy = test(model, device, test_loader)
        train_losses.append((epoch, train_loss))
        test_losses.append((epoch, test_loss))
        test_accuracies.append((epoch, test_accuracy))
        pt_util.write_log(LOG_PATH + 'log.pkl', (train_losses, test_losses, test_accuracies))
        model.save_best_model(test_accuracy, LOG_PATH + '%03d.pt' % epoch)


except KeyboardInterrupt as ke:
    print('Interrupted')
except:
    import traceback
    traceback.print_exc()
finally:
    model.save_model(LOG_PATH + '%03d.pt' % epoch, 0)
    ep, val = zip(*train_losses)
    pt_util.plot(ep, val, 'Train loss', 'Epoch', 'Error')
    ep, val = zip(*test_losses)
    pt_util.plot(ep, val, 'Test loss', 'Epoch', 'Error')
    ep, val = zip(*test_accuracies)
    pt_util.plot(ep, val, 'Test accuracy', 'Epoch', 'Error')

Using device cuda
num cpus: 2
Test acc:
Accuracy of class 0 : 0.0 / 1634.0 = 0.0000 %
Accuracy of class 1 : 18.0 / 181.0 = 9.9448 %
Accuracy of class 2 : 838.0 / 1690.0 = 49.5858 %
Accuracy of class 3 : 0.0 / 2966.0 = 0.0000 %
Accuracy of class 4 : 8.0 / 2005.0 = 0.3990 %
Accuracy of class 5 : 541.0 / 1321.0 = 40.9538 %
Accuracy of class 6 : 1.0 / 2046.0 = 0.0489 %

Test set: Average loss: 1.9730, Accuracy: 1406/11843 (12%)

Interrupted
Saved /gdrive/My Drive/colab_files/final_project/data/logs/0.41/000.pt



ValueError: ignored