## Preprocessing

In [26]:
import pandas as pd
import numpy as np

In [27]:
df = pd.read_csv('image_emotion3.csv')
df

Unnamed: 0,filename,emotion
0,image_06637.jpg,angry
1,image_06638.jpg,angry
2,image_06639.jpg,disgust
3,image_06641.jpg,neutral
4,image_06642.jpg,sad
...,...,...
5831,image_08036.jpg,angry
5832,image_08035.jpg,neutral
5833,image_08038.jpg,sad
5834,image_08028.jpg,fear


In [28]:
def categorize(row):
    if row['emotion'] == 'angry':
        return 1
    elif row['emotion'] == 'sad':
        return 2
    elif row['emotion'] == 'happy':
        return 3
    elif row['emotion'] == 'fear':
        return 4
    elif row['emotion'] == 'surprise':
        return 5
    elif row['emotion'] == 'disgust':
        return 6
    else:
        return 7

In [29]:
df['label'] = df.apply(lambda x: categorize(x), axis=1)
df

Unnamed: 0,filename,emotion,label
0,image_06637.jpg,angry,1
1,image_06638.jpg,angry,1
2,image_06639.jpg,disgust,6
3,image_06641.jpg,neutral,7
4,image_06642.jpg,sad,2
...,...,...,...
5831,image_08036.jpg,angry,1
5832,image_08035.jpg,neutral,7
5833,image_08038.jpg,sad,2
5834,image_08028.jpg,fear,4


In [30]:
df.to_csv('train_emotions2.csv', index=False)

## Setup

### Import packages

In [31]:
import cv2
import csv
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import torch
import torchvision
from PIL import Image
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import torchvision.models as models

from torch.utils.data import Dataset, DataLoader
import torch.nn as nn 
import torch.nn.functional as F
import torch.optim as optim
from torchvision.io import read_image
from torchvision.transforms import ToTensor, ToPILImage, Normalize, Compose

### Set device

In [32]:
device = None

if device == None:
    if torch.cuda.is_available():
        print("CUDA used")
        device = torch.device('cuda')

        # Should be > 0
        print(torch.cuda.device_count())

        # Index of device used (can be 0)
        print(torch.cuda.current_device())

        # GPU location
        print(torch.cuda.device(0))

        # Name of GPU
        print(torch.cuda.get_device_name(0))

    else:
        print("CPU used")
        device = torch.device('cpu')

# Force CPU use
# device = torch.device('cpu')

CPU used


### Load data

In [33]:
data = []

with open('train_emotions2.csv') as file_obj:
    reader_obj = csv.reader(file_obj)
    next(reader_obj)
    
    for row in reader_obj:
        image, emotion, label = row[0], row[1], row[2]
        path = 'portrait_faces/' + image
        new_row = [path, int(label)]
        data.append(new_row)

In [34]:
print(len(data))

5836


In [35]:
train_set = data[0:int(len(data) * 0.8)]
test_set = data[int(len(data) * 0.8):]
print(len(train_set) + len(test_set))

5836


In [36]:
# Create a map for the labels and their codes
labels = ['angry', 'sad', 'happy', 'fear', 'surprise', 'disgust', 'neutral']
class_map = {}
i = 1
    
for label in labels:
    class_map[i] = label
    i += 1

In [37]:
print(class_map)

{1: 'angry', 2: 'sad', 3: 'happy', 4: 'fear', 5: 'surprise', 6: 'disgust', 7: 'neutral'}


In [38]:
# Class for loading of the data
class own_data(Dataset):
    def __init__(self, train=0, transform=None):
        if train == 0:
            self.data = train_set
        else:
            self.data = test_set
            
        self.transform = transform
        self.to_pil = ToPILImage()

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_path, class_id = self.data[idx]
        image = read_image(img_path)
        
        if self.transform:
            image = self.transform(self.to_pil(image))
        
        return image, class_id

In [39]:
# Define the transform function
test_transform = transforms.Compose([
    transforms.Resize(232), 
    transforms.ToTensor(), 
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

train_transform = transforms.Compose([
    transforms.Resize(256), 
    transforms.CenterCrop(size=224), 
    transforms.ToTensor(), 
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

In [40]:
# Initialize the batch size
batch_size = 64

In [41]:
# Load the train and test data
train_data = own_data(0, train_transform)
test_data = own_data(1, test_transform)

train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=True)

## Model

### Initializing Resnet50 pretrained model

In [42]:
weights = models.ResNet50_Weights.IMAGENET1K_V2
resnet50 = models.resnet50(weights=weights)
print(resnet50)

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 

In [43]:
# Freeze model parameters
for param in resnet50.parameters():
    param.requires_grad = False

for param in resnet50.layer4.parameters():
    param.requires_grad = True
    
# Change the final layer of ResNet50 Model for Transfer Learning
fc_inputs = resnet50.fc.in_features
resnet50.fc = nn.Sequential(
    nn.Linear(fc_inputs, 256),
    nn.Dropout(0.2, inplace=False),
    nn.BatchNorm1d(num_features=256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
    nn.Linear(256, 7),
    nn.LogSoftmax(dim=1) # For using NLLLoss()/kldiv
)

print(resnet50)

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 

### Set model to train

In [44]:
model = resnet50
model.train()

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 

In [45]:
## Some loss functions
nll = nn.NLLLoss()

cross = nn.CrossEntropyLoss()

kldiv = nn.KLDivLoss(reduction="batchmean")

In [46]:
# Some optimizers
sgd = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

adam = optim.Adam(model.parameters(), lr=3e-4)

RMSProp = optim.RMSprop(model.parameters(), lr=1e-3, momentum=0.9)

adamw = optim.AdamW(model.parameters(), lr=3e-4)

### Define training & test functions for model

In [47]:
# Define the train model function
def train_model(train_loader, model, loss_function, optimizer, batch_size, device=None):
    for batch, (image, label) in enumerate(train_loader):
        # Move labels and images to GPU
        image = image.to(device)
        label = label.to(device)

        optimizer.zero_grad()
        output = model(image)
        label = label - 1
        loss_func = loss_function(output, label)
        loss_func.backward()
        optimizer.step()
        
        if batch % 64 == 0:
            print('[%5d] loss: %.3f' % (batch + 1, loss_func.item()))

In [48]:
# Test function
def test_model(test_loader, model, loss_function, batch_size, device=None):
    size = len(test_loader.dataset)
    test_loss, correct = 0, 0
    
    with torch.no_grad():
        for image, label in test_loader:
            # Move labels and images to GPU
            image = image.to(device)
            label = label.to(device)
            
            output = model(image)
            label = label - 1
            test_loss += loss_function(output, label).item()
            correct += (output.argmax(1) == label).type(torch.float).sum().item()
    
    test_loss /= batch_size
    correct /= size
    print(f"Test error: \n Accuracy: {(100 * correct):>0.01f}%, Avg loss: {test_loss:>8f} \n")

### Run model for epochs

In [49]:
epochs = 5
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    model.train()
    train_model(train_loader, model, nll, adam, batch_size, device=device)
    model.eval()
    test_model(test_loader, model, nll, batch_size, device=device)
print("Done!")

Epoch 1
-------------------------------
[    1] loss: 1.966
[   65] loss: 0.765
Test error: 
 Accuracy: 56.1%, Avg loss: 0.342376 

Epoch 2
-------------------------------
[    1] loss: 0.674
[   65] loss: 0.475
Test error: 
 Accuracy: 56.0%, Avg loss: 0.378502 

Epoch 3
-------------------------------
[    1] loss: 0.148
[   65] loss: 0.107
Test error: 
 Accuracy: 55.4%, Avg loss: 0.416510 

Epoch 4
-------------------------------
[    1] loss: 0.045
[   65] loss: 0.018
Test error: 
 Accuracy: 56.6%, Avg loss: 0.411251 

Epoch 5
-------------------------------
[    1] loss: 0.017
[   65] loss: 0.013
Test error: 
 Accuracy: 56.3%, Avg loss: 0.410610 

Done!
