# cw_MLPv1

In [2]:
#update opencv; restart runtime
!pip install opencv-python==4.5.5.64 --quiet
!pip install wandb --quiet


[K     |████████████████████████████████| 60.5 MB 74 kB/s 
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
albumentations 0.1.12 requires imgaug<0.2.7,>=0.2.5, but you have imgaug 0.2.9 which is incompatible.[0m
[K     |████████████████████████████████| 1.8 MB 20.6 MB/s 
[K     |████████████████████████████████| 181 kB 54.6 MB/s 
[K     |████████████████████████████████| 145 kB 67.6 MB/s 
[K     |████████████████████████████████| 63 kB 1.8 MB/s 
[?25h  Building wheel for pathtools (setup.py) ... [?25l[?25hdone


In [3]:
#import libraries
import os
import re
import cv2
import time
import random
import warnings
import wandb
import timeit
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm.notebook import trange, tqdm
from random import randint
from google.colab import drive
from skimage.feature import hog
from sklearn import svm, metrics
from skimage import io, transform, img_as_ubyte, color
from torch.utils.data import Dataset, DataLoader
from imblearn.over_sampling import RandomOverSampler
from imblearn.under_sampling import RandomUnderSampler 
from sklearn.model_selection import train_test_split
from sklearn.cluster import MiniBatchKMeans
from sklearn.neural_network import MLPClassifier
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
sns.set_style("whitegrid")

# 🔍 Import Images and Extracting Features

## 🖼 Create dataset

The steps are similar to loading on the SVM except I'll be using the Pytorch module to iterate over the data.

In [6]:
#Mount Google Drive
drive.mount('/content/drive')
GOOGLE_DRIVE_PATH_AFTER_MYDRIVE = 'Computer Vision Coursework/CW_Folder_PG' 
GOOGLE_DRIVE_PATH = os.path.join('drive', 'My Drive', GOOGLE_DRIVE_PATH_AFTER_MYDRIVE)
print(os.listdir(GOOGLE_DRIVE_PATH))

Mounted at /content/drive
['.DS_Store', 'Models', 'CW_Dataset', 'Code', 'Video', 'test_functions_scratch.ipynb']


In [7]:
# Identify path to zipped dataset
zip_path = os.path.join(GOOGLE_DRIVE_PATH, 'CW_Dataset/CW_Dataset.zip')

# Copy it to Colab
!cp '{zip_path}' .

# Unzip it
!yes|unzip -q CW_Dataset.zip

# Delete zipped version from Colab (not from Drive)
!rm CW_Dataset.zip

In [8]:
def load_images(root_dir, label_path):
    """Return an array of images and array of labels.

    Args:
        root_dir (string): Folder containing all the images with sequential numbering.
        label_path (string): Path to text file listing image filenames (sequential) and labels in this order.

    Returns:
        A tuple containing an array of images and an array of labels
    """
    # import filenames and respective labels
    df = pd.read_csv(label_path,
                     delimiter=' ',
                     header=None,
                     names=['filename', 'label'])
    df.sort_values('filename', inplace=True)
    # import images
    images = []
    # !sort image folder
    image_folder = sorted(os.listdir(root_dir))
    # generator to extract each image path
    images_in_folder = (file for file in image_folder if file.endswith('.jpg'))
    count = 0  # index
    for image_path, image_label in zip(images_in_folder, (df['filename'])):
        count += 1  # check if filenames and filenames in labels list are equal
        if re.findall(r'\d+', image_path) == re.findall(r'\d+', image_label):
            image = io.imread(os.path.join(root_dir, image_path))
            images.append(image)
        else:
            print(f'Found unmatched image file {image_path} and label {image_label} \
                at index {count}')
            pass

    return np.array(images), np.array(df['label'])


def plot_classification(true_label,
                        predicted_labels,
                        classifier: str = 'SVM-BoVW',
                        des: str = 'train'):
    """
    Return classification report and plot matrix.
    """
    with warnings.catch_warnings():
        label_axis = ['Surprise', 'Fear', 'Disgust', 'Happiness', 'Sadness', 'Anger', 'Neutral']
        warnings.simplefilter('ignore')
        print(f"""Classification report for classifier {classifier} on {str(des)} set:
          {metrics.classification_report(true_label, predicted_labels)}\n""")
        cm = metrics.confusion_matrix(true_label, predicted_labels)
        sns.heatmap(cm/np.sum(cm), cmap='YlGnBu', annot=True, fmt='.2%', xticklabels=label_axis, yticklabels=label_axis)

In [9]:
#Create training and validation sets
#load images
X, y = load_images('/content/train',
           '/content/labels/list_label_train.txt')

#split X, y
X_train, X_val, y_train, y_val = train_test_split(
    X, y, test_size=0.2, shuffle=True, stratify=y)

## 📊 Create HOG feature descriptors

I'll be using `feature_vector = True` which flattens the HOG descriptors to vector.

In [10]:
# Extract HOG features for training set
hog_list_xtrain = []
for image in X_train:
    hos_descriptor =  hog(image,
                          orientations=9,
                          pixels_per_cell=(8, 8),
                          cells_per_block=(1, 1),
                          feature_vector = True,
                          visualize=False,
                          transform_sqrt=True,
                          multichannel=True)
    hog_list_xtrain.append(hos_descriptor) 

In [11]:
# Doing the same for the Test set
hog_list_xval = []
for image in X_val:
    hos_descriptor =  hog(image,
                          orientations=9,
                          pixels_per_cell=(8, 8),
                          cells_per_block=(1, 1),
                          feature_vector = True,
                          visualize=False,
                          transform_sqrt=True,
                          multichannel=True)
    hog_list_xval.append(hos_descriptor) 

## 📦 Set-up Dataset for Pytorch

In [12]:
class DatasetHog(Dataset):
    #adapted from https://pytorch.org/tutorials/beginner/data_loading_tutorial.html
    """Images dataset for pytorch."""

    def __init__(self, descriptors, labels, transform=None):
        """
        Args:
            labels : Array of labels.
            descriptors (string): Array of HOG descriptors.
            transform (callable, optional): Optional transform to be applied
                on a sample.
        """
        #self.labels = pd.read_csv(
                                  #label_path,
                                  #delimiter=' ',
                                  #header=None,
                                  #names=['filename', 'label']
                                #)
        #self.root_dir = root_dir
        self.labels = (labels - 1) #convert labels to 0-6
        self.descriptors = descriptors
        self.transform = transform
        #self.image_path = sorted(os.listdir(root_dir))
        #self.transform = transforms.Compose(
                                #[transforms.Normalize(),
                                #transforms.ToTensor()]
                                #)
    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        #img_name = os.path.join(self.root_dir,
                                #self.labels.iloc[idx, 0])
        #image = io.imread(img_name)
        descriptors = self.descriptors[idx]
        #label = self.labels.iloc[idx, 1]
        label = self.labels[idx]
        

        if self.transform:
            image = self.transform(descriptors)
        
        return descriptors, label

In [43]:
# Initialize the training data iterator
dataset_hog = DatasetHog(hog_list_xtrain, y_train)

trainloader = DataLoader(dataset_hog, batch_size=100,
                        shuffle=True, num_workers=0)

# And the test one
dataset_hog_val = DatasetHog(hog_list_xval, y_val)
testloader = DataLoader(dataset_hog_val, batch_size=100,
                        shuffle=False, num_workers=0)

In [14]:
# This cell's just for testing the outputs
# of the above cells are alright
count = 0
for image_batch, label in trainloader:
    count += 1
    if count > 10:
      x5 = image_batch
      break

# 🧠 MLP - HOG

## ⏯ Create and initialize model

I'll initially be creating a MLP with two hidden layers with 256, 128 sizes respectively. The activation functions are ReLU and I'll be using a CrossEntropyLoss function with an Adam optimizer.

The model will be evaluated on the validation set after every epoch and I'll set up an early stopping condition.

In [15]:
#adapted from https://github.com/christianversloot/machine-learning-articles/blob/main/creating-a-multilayer-perceptron-with-pytorch-and-lightning.md

class MLP(nn.Module):
  '''
    Multilayer Perceptron.
  '''
  def __init__(self, input_dim, output_dim):
    super().__init__()
    self.layers = nn.Sequential(
      nn.Linear(input_dim, 256),
      nn.ReLU(),
      nn.Linear(256, 128),
      nn.ReLU(),
      nn.Dropout(0.3)
      nn.Linear(128, output_dim)
    )

  def forward(self, x):
    '''Forward pass'''
    #batch_size = x.shape[0]
    #x = x.view(batch_size, -1)
    return self.layers(x)

Next I'll initialize the model and create the Loss function and the optimizer

In [16]:
# Initialize model
model = MLP(1296, 7)

# Create the Loss function and the optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001) #momentum=0.9)

## ⏩ Set-up Training loop

Create training loop
adapted from https://colab.research.google.com/github/bentrevett/pytorch-image-classification/blob/master/1_mlp.ipynb#scrollTo=p-gtfzafREwc

In [18]:
def flattens(results_list):
    y_label_list, y_pred_list = [], []
    for i in range(len(results_list) - 1):
      label_ = results_list[i][0]
      pred_ = results_list[i][1]
      y_label_list.append(label_)
      y_pred_list.append(pred_)
    return [item.cpu().numpy() for sublist in y_label_list for item in sublist], [item.cpu().numpy() for sublist in y_pred_list for item in sublist]

# Helper function to calculate accuracy
def calculate_accuracy(y_pred, y):
    top_pred = y_pred.argmax(1, keepdim=True)
    #predss = torch.max(y_pred.data, 1)
    correct = top_pred.eq(y.view_as(top_pred)).sum()

    acc = correct.float() / y.shape[0]
    return acc, #balance_accuracy, f1_score

In [34]:
def train(model, iterator, optimizer, criterion, device):

    epoch_loss = 0
    epoch_acc = 0

    model.to(device)
    model.train()

    for (x, y) in tqdm(iterator, desc="Training", leave=False):

        x = x.float().to(device)
        y = y.to(device)

        optimizer.zero_grad()

        y_pred = model(x)

        loss = criterion(y_pred, y)
        

        acc = calculate_accuracy(y_pred, y)

        loss.backward()

        optimizer.step()

        epoch_loss += loss.item()
        epoch_acc += acc.item()

    return epoch_loss / len(iterator), epoch_acc / len(iterator)

In [35]:
def evaluate(model, iterator, criterion, device):
    results_list = []

    epoch_loss = 0
    epoch_acc = 0

    model.eval()

    with torch.no_grad():

        for (x, y) in tqdm(iterator, desc="Evaluating", leave=False):

            x = x.float().to(device)
            y = y.to(device)

            y_pred = model(x)
            pred = torch.argmax(y_pred, dim=1)
            loss = criterion(y_pred, y)

            acc  = calculate_accuracy(y_pred, y)

            epoch_loss += loss.item()
            epoch_acc += acc.item()
            results_list.append([y, pred])


    return epoch_loss / len(iterator), epoch_acc / len(iterator)

In [36]:
def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

## 🔥 Start training and calculate metrics

In [47]:
# Define evaluation function

def test(model, test_loader, logging=False):
    model.eval()

    predictions = []
    targets = []
    
    # Run the model on some test examples
    with torch.no_grad():
        correct, total, cumu_loss = 0, 0, 0
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            # Get logits
            outputs = model(images.float())
            outputs = outputs.float()
            # Calculate loss
            loss = criterion(outputs, labels)
            cumu_loss += loss.item()
            #Get predictions
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

            # Save predictions
            pred = predicted.detach().cpu().detach().numpy()
            labels_list = labels.detach().cpu().detach().numpy()

            #Flatten
            for i in range(len(pred)):
              predictions.append(pred[i])
              targets.append(labels_list[i])
            

        return predictions, targets


In [56]:
max_balance = -1
patience = 5

accuracy_score_epoch = []


# Initialize model
net = MLP(1296, 7)

# transfer network to GPU
net.to(device)

# define the loss and the optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(net.parameters(), lr=0.001)

t0 = time.time()

for epoch in range(100):

    running_loss = 0.0
    for i, data in enumerate(trainloader, 0):
        # transfer data to GPU
        inputs, labels = data[0].to(device), data[1].to(device)

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward pass ->
        outputs = net(inputs.float())
        loss = criterion(outputs, labels)

        # backward pass <-
        loss.backward()
        optimizer.step()
        _, predicted = torch.max(outputs, 1)
        #print(metrics.classification_report(labels.cpu(), predicted.cpu()))
        # print statistics (loss.item() returns the mean loss in the mini-batch)
        running_loss += loss.item()
    #print('[%d, %5d] loss: %.3f' %
    #(epoch + 1, i + 1, running_loss / 2000))
    running_loss = 0.0

    val_loss, targets = test(net, testloader, False)
    balance_accuracy = metrics.balanced_accuracy_score(targets,
                                                      val_loss)
    accuracy_score_epoch.append(balance_accuracy)                                                                 #val_loss += loss                                                                #val_loss = val_loss / len(trainloader)
    if balance_accuracy > max_balance:
    #Saving the model
      print(f'The Balanced accuracy is {balance_accuracy}')
      max_balance = balance_accuracy
      best_model = torch.save(net.state_dict(), 'MLP_model.pt')
      #print('Min loss %0.2f' % min_val_loss)
      f1_score = metrics.f1_score(targets,
                                  val_loss,
                                  average=None)
      trigger_times = 0
      min_val_loss = val_loss
    else: 
      trigger_times += 1
      print(f'The Balanced accuracy is {balance_accuracy}')
    if trigger_times >= patience:
        print('Early stopping!\nStart to test process.')
        break

print('Finished Training on GPU: total time in seconds =', time.time() - t0)

The Balanced accuracy is 0.29111861874778444
The Balanced accuracy is 0.382692028733154
The Balanced accuracy is 0.4103341831375032
The Balanced accuracy is 0.4250384758981398
The Balanced accuracy is 0.44198212263204173
The Balanced accuracy is 0.4696509262015763
The Balanced accuracy is 0.4644777831604171
The Balanced accuracy is 0.4858257797519738
The Balanced accuracy is 0.460840146855696
The Balanced accuracy is 0.4647040583775947
The Balanced accuracy is 0.48020271772079587
The Balanced accuracy is 0.5216588778778158
The Balanced accuracy is 0.49988521813305553
The Balanced accuracy is 0.522858696437995
The Balanced accuracy is 0.5139242159714559
The Balanced accuracy is 0.5328918564254664
The Balanced accuracy is 0.4850242257102316
The Balanced accuracy is 0.5236011547762791
The Balanced accuracy is 0.5082823502487654
The Balanced accuracy is 0.5344474344127391
The Balanced accuracy is 0.5110570969162636
The Balanced accuracy is 0.5290392528759075
The Balanced accuracy is 0.5108

In [37]:
EPOCHS = 100
count = 0
best_valid_loss = float('inf')
best_balance_accuracy = float(-1)
model = MLP(1296, 7)

# Create the Loss function and the optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
for epoch in trange(EPOCHS):

    start_time = time.monotonic()

    train_loss, train_acc = train(model, trainloader, optimizer, criterion, device)
    valid_loss, valid_acc, results_list, balance_accuracy, f1_score = evaluate(model, testloader, criterion, device)

    y_, pred_ = flattens(results_list)

    balance_accuracy = metrics.balanced_accuracy_score(y_,
                                                      pred_)
    f1_score = metrics.f1_score(y_,
                                pred_,
                                  average=None)


    if valid_loss < best_valid_loss:
        print(f'BEST Balance Accuracy {balance_accuracy}!')
        best_balance_accuracy = balance_accuracy
        #best_valid_loss = valid_loss
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'mlp-hog.pt')
        count = 0
    else:
      count += 1

    end_time = time.monotonic()

    epoch_mins, epoch_secs = epoch_time(start_time, end_time)
    print(f'Epoch: {epoch+1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s')
    print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
    print(f'\t Val. Loss: {valid_loss:.3f} |  Val. Acc: {valid_acc*100:.2f}%')
    #(f'\t Balanced accuracy: {balance_accuracy:.3f} | ')
    #print(f'F1 scores are {f1_score}')

    if count >= 5:
      break
    

  0%|          | 0/100 [00:00<?, ?it/s]

Training:   0%|          | 0/99 [00:00<?, ?it/s]

NameError: ignored

The initial Run had best balance accuracy of 0.5503950940828651 with accuracy of 66.29. The  macro F1 scores were 0.60508083 0.4109589  0.12121212 0.8328968  0.51612903 0.55555556
 0.625     ] with 17 epochs with a patience of 5


 I tried using a weighted cross entropy loss. The balanced accuracy was BEST Balance Accuracy 0.5629784192331861! and the F1 scores were [0.56886228 0.39726027 0.26031746 0.77208153 0.4953271  0.49180328
 0.5862069 ] with a Validation accuracy of 59%



In [127]:
# Adapted from https://discuss.pytorch.org/t/how-to-handle-imbalanced-classes/11264/5
class_sample_count = np.unique(y_train, return_counts=True)[1]
y = y_train - 1
weight = 1 / class_sample_count
