<a href="https://colab.research.google.com/github/NadiaHolmlund/Semester_Project/blob/main/Semester_Project_Model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Facial Emotion Recognition (FER)

The following notebook contains the fine-tuning process of three pre-trained vision models on the [FER2013](https://www.kaggle.com/datasets/deadskull7/fer2013) dataset. The dataset is a collection of 35.887 48x48 grayscale images of faces divided into 7 classes (Angry, Disgust, Fear, Happy, Sad, Surprise, Neutral). The training set consists of 28.709 images while the validation and test sets consist of 3.589 images, respectively.

The vision models applied during training includes [ViT](https://huggingface.co/docs/transformers/model_doc/vit) from Google, [Beit](https://huggingface.co/docs/transformers/model_doc/beit) from Microsoft and [Deit](https://huggingface.co/docs/transformers/model_doc/deit) from Facebook.

Please be aware that the notebook includes code for all three models in the sections: ***Defining the processor*** and ***Defining the model***. Therefore, if running the code, run only the processor and model corresponding to the one you wish to train.

Additionally, the notebook includes the training arguments for all three runs conducted for each experiment in the section: ***Defining the training arguments***. Therefore, if running the code, run only the training arguments you wish to train the model on.

The process is inspired by a tutorial by Niels Rogge, ML engineer at 🤗 [HuggingFace](https://huggingface.co'), who fine-tuned ViT on the CIFAR-10 dataset using the 🤗 [Trainer](https://huggingface.co/transformers/main_classes/trainer.html). The tutorial can be found [here](https://github.com/NielsRogge/Transformers-Tutorials/tree/master/VisionTransformer).

# Imports

In [None]:
# Pip installs
!pip install -q transformers==4.28.0 # Installing version 4.28.0 to circumvent an issue with Accelerator and the introduction of PartialState in later versions
!pip install -q transformers datasets
!pip install -q mlflow
!pip install -q pyngrok

In [2]:
# Libraries
from datasets import *
from transformers import ViTImageProcessor, ViTConfig, ViTModel
from transformers import BeitImageProcessor, BeitConfig, BeitModel
from transformers import DeiTImageProcessor, DeiTConfig, DeiTModel
from transformers import PreTrainedModel
from transformers import TrainingArguments, Trainer
from transformers.modeling_outputs import SequenceClassifierOutput
import numpy as np
import pandas as pd 
import torch.nn as nn
from matplotlib import pyplot as plt
%matplotlib inline 
import seaborn as sns
from sklearn.metrics import confusion_matrix
import mlflow
from pyngrok import ngrok
from getpass import getpass

# Connecting to Google Drive

Due to the size of the dataset (301MB) it exceeds the file-size limit on GitHub, hence it is loaded from Google Drive (requires folder access).

In [None]:
# Connecting to Google Drive
from google.colab import drive
drive.mount('/content/gdrive')

In [None]:
# Copying gdrive to Google Colab
%cd /content/gdrive/MyDrive/Semester_Project

# Setting up MLflow for experiment tracking

## Setting up MLflow UI

In [None]:
# Running tracking UI in the background
get_ipython().system_raw("mlflow ui --port 5000 &")

# Terminating open tunnels if any exist
ngrok.kill()

In [None]:
# Defining where mlrun files are stored
import os

google_drive_path = "/content/gdrive/MyDrive/Semester_Project/mlruns"
mlflow_tracking_uri = f"file://{google_drive_path}"

os.environ["MLFLOW_TRACKING_URI"] = mlflow_tracking_uri

In [None]:
# Login on ngrok.com and get your authtoken from https://dashboard.ngrok.com/auth
# Enter your auth token when the code is running
NGROK_AUTH_TOKEN = getpass('Enter the ngrok authtoken: ')
ngrok.set_auth_token(NGROK_AUTH_TOKEN)
ngrok_tunnel = ngrok.connect(addr="5000", proto="http", bind_tls=True)
print("MLflow Tracking UI:", ngrok_tunnel.public_url)

## Setting up a new experiment and/or new run

Run this section only if setting up new experiments and/or new runs.

In [None]:
# Defining experiment name and run name to be logged in MLflow
experiment_name = "DeiT_mlruns"
run_name = "Run_4"

In [None]:
# Setting up a new experiment
mlflow.create_experiment(experiment_name)

In [None]:
# Getting the experiment ID
experiment_id = mlflow.get_experiment_by_name(experiment_name).experiment_id

In [None]:
# Starting MLflow, running UI in background
mlflow.start_run(run_name=run_name, nested=True, experiment_id=experiment_id)

# Loading the data



In [5]:
# Loading the full dataset
fer_df = pd.read_csv("/content/gdrive/MyDrive/Semester_Project/FER2013.csv")

In [6]:
# Examining the dataset
fer_df.head()

Unnamed: 0,emotion,pixels,Usage
0,0,70 80 82 72 58 58 60 63 54 58 60 48 89 115 121...,Training
1,0,151 150 147 155 148 133 111 140 170 174 182 15...,Training
2,2,231 212 156 164 174 138 161 173 182 200 106 38...,Training
3,4,24 32 36 30 32 23 19 20 30 41 21 22 32 34 21 1...,Training
4,6,4 0 0 0 0 0 0 0 0 0 0 0 3 15 23 28 48 50 58 84...,Training


# Defining the processor

Run only the processor corresponding to the model you wish to train.

## ViT Processor

In [None]:
# Defining the image processor
# The image processor resizes every image to the resolution that the model expects, i.e. 224x224, and normalizes the channels
processor = ViTImageProcessor.from_pretrained('google/vit-base-patch16-224')

## BeiT processor

In [None]:
# Defining the image processor
# The image processor resizes every image to the resolution that the model expects, i.e. 224x224, and normalizes the channels
processor = BeitImageProcessor.from_pretrained('microsoft/beit-base-patch16-224')

## DeiT Processor

In [None]:
# Defining the image processor
# The image processor resizes every image to the resolution that the model expects, i.e. 224x224, and normalizes the channels
processor = DeiTImageProcessor.from_pretrained('facebook/deit-base-distilled-patch16-224')

# Preprocessing the data

In [8]:
# Creating a dictionary connecting the emotion_id and emotion_label
emotion_id = np.sort(fer_df['emotion'].unique())
emotion_label = ['Anger', 'Disgust', 'Fear', 'Happiness', 'Sadness', 'Surprise', 'Neutral']

id2label = {id: label for id, label in zip(emotion_id, emotion_label)}
label2id = {label: id for id, label in id2label.items()}

First preprocessing step

In [9]:
# Defining a function to prepare the FER2013 dataset to be loaded into a HuggingFace dataset
def prepare_fer_data(data):
    """ Prepare FER2013 for vision transformers
        input: FER2013 dataset loaded from csv
        output: dataset that can be load into a huggingface dataset """

    # outputs
    image_list = []
    image_labels = list(map(int, data['emotion']))
    
    # go over all images
    for i, row in enumerate(data.index):
        image = np.fromstring(data.loc[row, 'pixels'], dtype=int, sep=' ')
        image = np.reshape(image, (48, 48))
        # adapt grayscale to rgb format (change single values to triplets of the same value)
        image = image[..., np.newaxis]
        image = np.repeat(image, 3, axis=2)
        # convert to list format used by the later functions
        image = image.astype(int).tolist()
        # save to output
        image_list.append(image)

    output_df = pd.DataFrame(list(zip(image_list, image_labels)),
               columns =['img', 'label'])
        
    return output_df

In [10]:
# Applying the function to prepare the FER2013 dataset to be loaded into a Hugging Face dataset
# Here, the dataset is divided into train, test, and validation sets
# Due to limitations of computaional resources, the size of the datsets has been limited to 10.000/1.500/1.500
fer_train_df = prepare_fer_data(fer_df[fer_df['Usage']=='Training'].sample(n = 10000, random_state = 42))
fer_test_df = prepare_fer_data(fer_df[fer_df['Usage']=='PrivateTest'].sample(n = 1500, random_state = 42))
fer_val_df = prepare_fer_data(fer_df[fer_df['Usage']=='PublicTest'].sample(n = 1500, random_state = 42))

In [11]:
# Examining the prepared training dataset
fer_train_df.head()

Unnamed: 0,img,label
0,"[[[138, 138, 138], [147, 147, 147], [144, 144,...",6
1,"[[[17, 17, 17], [20, 20, 20], [25, 25, 25], [3...",4
2,"[[[109, 109, 109], [97, 97, 97], [125, 125, 12...",6
3,"[[[165, 165, 165], [69, 69, 69], [58, 58, 58],...",2
4,"[[[85, 85, 85], [86, 86, 86], [85, 85, 85], [8...",0


In [12]:
# Loading the datasets into HuggingFace datasets
train_ds = Dataset.from_pandas(fer_train_df)
val_ds = Dataset.from_pandas(fer_val_df)
test_ds = Dataset.from_pandas(fer_test_df)

print(train_ds)
print(val_ds)
print(test_ds)

Dataset({
    features: ['img', 'label'],
    num_rows: 10000
})
Dataset({
    features: ['img', 'label'],
    num_rows: 1500
})
Dataset({
    features: ['img', 'label'],
    num_rows: 1500
})


In [13]:
# Examining the size of the images
np.array(train_ds[0]["img"]).shape

(48, 48, 3)

Second preprocessing step

In [14]:
# Defining a function that proprocesses the images using the model specific Image Processor
def preprocess_images(examples):
    """ Prepare datasets for vision transformers
    input: dataset with images in their orignal size 
    output: dataset with pixel values computed by the image processor added """
    # get batch of images
    images = examples['img']
    # convert to list of NumPy arrays of shape (C, H, W)
    images = [np.array(image, dtype=np.uint8) for image in images]
    images = [np.moveaxis(image, source=-1, destination=0) for image in images]
    # preprocess and add pixel_values
    inputs = processor(images=images)
    examples['pixel_values'] = inputs['pixel_values']

    return examples

In [15]:
# Adding features to the new datasets with an additional column for the preprocessed 224x224x3 images 
features = Features({
    'label': ClassLabel(names=['Angry', 'Disgust', 'Fear', 'Happy', 'Sad', 'Surprise', 'Neutral']),
    'img': Array3D(dtype="int64", shape=(3,48,48)),
    'pixel_values': Array3D(dtype="float32", shape=(3, 224, 224)),
})

preprocessed_train_ds = train_ds.map(preprocess_images, batched=True, batch_size=1, features=features)
preprocessed_val_ds = val_ds.map(preprocess_images, batched=True, features=features)
preprocessed_test_ds = test_ds.map(preprocess_images, batched=True, features=features)

preprocessed_train_ds

Map:   0%|          | 0/10000 [00:00<?, ? examples/s]

Map:   0%|          | 0/1500 [00:00<?, ? examples/s]

Map:   0%|          | 0/1500 [00:00<?, ? examples/s]

Dataset({
    features: ['label', 'img', 'pixel_values'],
    num_rows: 10000
})

In [16]:
# Examining the final image size after preprocessing
print(len(preprocessed_train_ds[0]["pixel_values"]))       
print(len(preprocessed_train_ds[0]["pixel_values"][0]))     
print(len(preprocessed_train_ds[0]["pixel_values"][0][0]))  

3
224
224


# Preparing for fine-tuning

## Defining the model


Run only the model corresponding to the model you wish to train.

The model architecture is defined in PyTorch. With dropout and a linear layer added on top of the model's output of the special CLS token which represents the input picture. 

### ViT Model

In [None]:
# Defining the model and the additional layer
class ViTForImageClassification(PreTrainedModel):
    # Define architecture
    def __init__(self, config, num_labels=len(emotion_label)):
        super(ViTForImageClassification, self).__init__(config)
        self.vit = ViTModel.from_pretrained('google/vit-base-patch16-224')
        self.dropout = nn.Dropout(0.1)
        self.classifier = nn.Linear(self.vit.config.hidden_size, num_labels)
        self.num_labels = num_labels

    # Define a forward pass through that architecture + loss computation
    def forward(self, pixel_values, labels):
        outputs = self.vit(pixel_values=pixel_values)
        output = self.dropout(outputs.last_hidden_state[:, 0])
        logits = self.classifier(output)

        loss = None
        if labels is not None:
            loss_fct = nn.CrossEntropyLoss()
            loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))

        return SequenceClassifierOutput(
            loss=loss,
            logits=logits,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )

In [None]:
# Defining the model config file from Hugging Face and adding it to the model
config = ViTConfig.from_pretrained('google/vit-base-patch16-224')
model = ViTForImageClassification(config)

### BeiT Model

In [None]:
# Defining the model and the additional layer
class BeitForImageClassification(PreTrainedModel):
    # Define architecture
    def __init__(self, config, num_labels=len(emotion_label)):
        super(BeitForImageClassification, self).__init__(config)
        self.beit = BeitModel.from_pretrained('microsoft/beit-base-patch16-224')
        self.dropout = nn.Dropout(0.1)
        self.classifier = nn.Linear(self.beit.config.hidden_size, num_labels)
        self.num_labels = num_labels

    # Define a forward pass through that architecture + loss computation
    def forward(self, pixel_values, labels):
        outputs = self.beit(pixel_values=pixel_values)
        output = self.dropout(outputs.last_hidden_state[:, 0])
        logits = self.classifier(output)

        loss = None
        if labels is not None:
            loss_fct = nn.CrossEntropyLoss()
            loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))

        return SequenceClassifierOutput(
            loss=loss,
            logits=logits,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )

In [None]:
# Defining the model config file from Hugging Face and adding it to the model
config = BeitConfig.from_pretrained('microsoft/beit-base-patch16-224')
model = BeitForImageClassification(config)

### DeiT model

In [None]:
# Defining the model and the additional layer
class DeiTForImageClassification(PreTrainedModel):
    # Define architecture
    def __init__(self, config, num_labels=len(emotion_label)):
        super(DeiTForImageClassification, self).__init__(config)
        self.deit = DeiTModel.from_pretrained('facebook/deit-base-distilled-patch16-224')
        self.dropout = nn.Dropout(0.1)
        self.classifier = nn.Linear(self.deit.config.hidden_size, num_labels)
        self.num_labels = num_labels

    # Define a forward pass through that architecture + loss computation
    def forward(self, pixel_values, labels):
        outputs = self.deit(pixel_values=pixel_values)
        output = self.dropout(outputs.last_hidden_state[:, 0])
        logits = self.classifier(output)

        loss = None
        if labels is not None:
            loss_fct = nn.CrossEntropyLoss()
            loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))

        return SequenceClassifierOutput(
            loss=loss,
            logits=logits,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )

In [None]:
# Defining the model config file from Hugging Face and adding it to the model
config = DeiTConfig.from_pretrained('facebook/deit-base-distilled-patch16-224')
model = DeiTForImageClassification(config)

## Defining the training arguments

Training arguments are defined using the 🤗HuggingFace [Trainer Class](https://huggingface.co/docs/transformers/v4.29.1/en/main_classes/trainer)

### Run 1

In [None]:
# Defining TrainingArguments for Run 1
metric_name = "accuracy"

args = TrainingArguments(
    f"HF_Training",
    evaluation_strategy="epoch",
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    learning_rate=5e-5,
    num_train_epochs=3,
    weight_decay=0,
    save_strategy="epoch",
    load_best_model_at_end=True,
    metric_for_best_model=metric_name,
    logging_dir="HF_Training",
)

### Run 2

In [None]:
# Defining TrainingArguments for Run 2
metric_name = "accuracy"

args = TrainingArguments(
    f"HF_Training",
    evaluation_strategy="epoch",
    per_device_train_batch_size=4,
    per_device_eval_batch_size=4,
    learning_rate=4e-5,
    num_train_epochs=1,
    weight_decay=0.0001,
    save_strategy="epoch",
    load_best_model_at_end=True,
    metric_for_best_model=metric_name,
    logging_dir="HF_Training",
)

### Run 3

In [None]:
# Defining TrainingArguments for Run 3
metric_name = "accuracy"

args = TrainingArguments(
    f"HF_Training",
    evaluation_strategy="epoch",
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    learning_rate=6e-5,
    num_train_epochs=5,
    weight_decay=0.01,
    save_strategy="epoch",
    load_best_model_at_end=True,
    metric_for_best_model=metric_name,
    logging_dir="HF_Training",
)

## Defining metrics and the trainer

In [None]:
# Defining metrics to evaluate the training process
metric = load_metric("accuracy")

def compute_metrics(eval_pred):
    predictions, labels = eval_pred
    predictions = np.argmax(predictions, axis=1)
    return metric.compute(predictions=predictions, references=labels)

In [None]:
# Defining the Trainer to fine-tune the model
trainer = Trainer(
    model = model,
    args = args,
    train_dataset = preprocessed_train_ds,
    eval_dataset = preprocessed_val_ds,
    compute_metrics = compute_metrics,
)

# Fine-tuning the model


Fine-tuning the model by calling the `train()` method

In [None]:
# Fine-tuning the model
trainer.train()

# Evaluating the model

Evaluating the model on the test set

In [None]:
# Applying the fine-tuned model on the test set
outputs = trainer.predict(preprocessed_test_ds)
print(outputs.metrics)

In [None]:
# Displaying the results on the test set as a confusion matrix
y_true = outputs.label_ids
y_pred = outputs.predictions.argmax(1)

cm = confusion_matrix(y_true, y_pred)

fig, ax = plt.subplots(figsize=(8,6))  
ax = sns.heatmap(cm, annot=True, fmt="d", linewidths=.5, xticklabels=emotion_label, yticklabels=emotion_label)
plt.xlabel("Predicted")
plt.ylabel("True")
plt.show()

# Saving metrics and artifacts in MLflow

In [None]:
# Generating a unique filename based on the MLflow run ID
model_filename = f"Model_{run_name}"
experiment_id = mlflow.active_run().info.experiment_id
run_id = mlflow.active_run().info.run_id

print('Model filename: ' + model_filename)
print('Experiment id: ' + experiment_id)
print('Run id: ' + run_id)

In [None]:
# Saving the fine-tuned model
model.save_pretrained(f"/content/gdrive/MyDrive/Semester_Project/mlruns/{experiment_id}/{run_id}/artifacts/{model_filename}")

In [None]:
# Extracting test metrics
test_loss = outputs.metrics['test_loss']
test_accuracy = outputs.metrics['test_accuracy']
test_runtime = outputs.metrics['test_runtime']
test_samples_per_second = outputs.metrics['test_samples_per_second']
test_steps_per_second = outputs.metrics['test_steps_per_second']

In [None]:
# Saving test metrics
mlflow.log_metric("test_loss", test_loss)
mlflow.log_metric("test_accuracy", test_accuracy)
mlflow.log_metric("test_runtime", test_runtime)
mlflow.log_metric("test_samples_per_second", test_samples_per_second)
mlflow.log_metric("test_steps_per_second", test_steps_per_second)

In [None]:
# Ending the MLflow run
mlflow.end_run()