In [None]:
pip install torchvision --upgrade

In [None]:
from torchvision import datasets, transforms
from torchvision.transforms.transforms import GaussianBlur
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch

import os
import cv2
import shutil
from PIL import Image

import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

from skimage.io import imread, imsave
from tqdm import tqdm
from skimage.transform import rotate
from skimage.util import random_noise
from skimage.filters import gaussian
from scipy import ndimage

In [None]:
# Get device name
if torch.cuda.is_available():
  print(torch.cuda.get_device_name(0))

# Assign cuda to device
_DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
_DEVICE

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
dev2 = '/content/drive/MyDrive/Coin Project 1/Development_2/'
pictures_cropped_4_01 = '/content/drive/MyDrive/Coin Project 1/Development_2/Pictures_Cropped_4/01'
pictures_cropped_4_02 = '/content/drive/MyDrive/Coin Project 1/Development_2/Pictures_Cropped_4/02'
pictures_cropped_3_01 = '/content/drive/MyDrive/Coin Project 1/Development_2/Pictures_Cropped_3/01'
pictures_cropped_3_02 = '/content/drive/MyDrive/Coin Project 1/Development_2/Pictures_Cropped_3/02'
pictures_cropped_3_03 = '/content/drive/MyDrive/Coin Project 1/Development_2/Pictures_Cropped_3/03'


pictures = '/content/drive/MyDrive/Coin Project 1/Development_2/Pictures'
pictures_cropped = '/content/drive/MyDrive/Coin Project 1/Development_2/Pictures_Cropped'
pictures_test = '/content/drive/MyDrive/Coin Project 1/Development_2/CNN/Test_Data'
pictures_train = '/content/drive/MyDrive/Coin Project 1/Development_2/CNN/Train_Data'

# **Delete Train and Test Folder to be able to use them now**

In [None]:
import time

# Test Data
os.chdir(pictures_test)
!ls
!rm -rf Alert
!rm -rf Anger
!rm -rf Anxious
!rm -rf Fear
!rm -rf Neutral
!rm -rf Relaxed
!rm -rf Anger_Fear
!rm -rf Neutral_Relaxed
!rm -rf Anger_Fear_Anxious
!ls
time.sleep(60) # wait 60 seconds 

Anger_Fear  Neutral  Relaxed


In [None]:
import time

# Train Data
os.chdir(pictures_train)
!ls
!rm -rf Alert
!rm -rf Anger
!rm -rf Anxious
!rm -rf Neutral_Relaxed
!rm -rf Fear
!rm -rf Neutral
!rm -rf Relaxed
!rm -rf Anger_Fear
!rm -rf Anger_Fear_Anxious
!ls
time.sleep(60) # wait 60 seconds 

# **Methods to Create new Folders**

In [None]:
def create_folders_6_emotions(where):
  os.mkdir(os.path.join(where, 'Alert'))
  os.mkdir(os.path.join(where, 'Anger'))
  os.mkdir(os.path.join(where, 'Anxious'))
  os.mkdir(os.path.join(where, 'Fear'))
  os.mkdir(os.path.join(where, 'Neutral'))
  os.mkdir(os.path.join(where, 'Relaxed'))

In [None]:
def create_folders_4_emotions_01(where):
  # Test
  os.mkdir(os.path.join(where, 'Alert'))
  os.mkdir(os.path.join(where, 'Neutral'))
  os.mkdir(os.path.join(where, 'Relaxed'))
  os.mkdir(os.path.join(where, 'Anger_Fear'))

In [None]:
def create_folders_4_emotions_02(where):
  os.mkdir(os.path.join(where, 'Alert'))
  os.mkdir(os.path.join(where, 'Neutral'))
  os.mkdir(os.path.join(where, 'Relaxed'))
  os.mkdir(os.path.join(where, 'Anger_Fear_Anxious'))

In [None]:
def create_folders_3_emotions_01(where):
  os.mkdir(os.path.join(where, 'Neutral'))
  os.mkdir(os.path.join(where, 'Relaxed'))
  os.mkdir(os.path.join(where, 'Anger_Fear'))

In [None]:
def create_folders_3_emotions_02(where):
  os.mkdir(os.path.join(where, 'Alert'))
  os.mkdir(os.path.join(where, 'Neutral_Relaxed'))
  os.mkdir(os.path.join(where, 'Anger_Fear_Anxious'))

In [None]:
def create_folders_3_emotions_03(where):
  os.mkdir(os.path.join(where, 'Neutral'))
  os.mkdir(os.path.join(where, 'Relaxed'))
  os.mkdir(os.path.join(where, 'Anger_Fear_Anxious'))

# **Other Methods**

In [None]:
def get_length_of_minimum_emotion(path):
  sizes = []
  for folder in os.listdir(path):
    emotion_path = path + '/' + str(folder)
    sizes.append(len(os.listdir(emotion_path)))
    print('Number of Samples for folder ', folder, ': ', len(os.listdir(emotion_path)))
  sizes = [i for i in sizes if i != 0]
  print('Sizes: ', sizes)
  return min(sizes)

In [None]:
for folder in os.listdir(pictures_cropped_3_01):
  print(folder)

# **Copy Data to Train and Test Folders**

In [None]:
# Input Parameters:
# emotion_category: '401', '402', '301', '302', '303'
# path: path of pictures from where to copy
def copy_images_to_train_test_data(emotion_category, path):
  if emotion_category is '401':
    # create folders
    try:
      create_folders_4_emotions_01(pictures_train)
    except Exception as e:
      print(e)
    try:
      create_folders_4_emotions_01(pictures_test)
    except Exception as e:
      print(e)
  elif emotion_category is '402':
    # create folders
    try:
      create_folders_4_emotions_02(pictures_train)
    except Exception as e:
      print(e)
    try:
      create_folders_4_emotions_02(pictures_test)
    except Exception as e:
      print(e)
  elif emotion_category is '301':
    # create folders
    try:
      create_folders_3_emotions_01(pictures_train)
    except Exception as e:
      print(e)
    try:
      create_folders_3_emotions_01(pictures_test)
    except Exception as e:
      print(e)
  elif emotion_category is '302':
    # create folders
    try:
      create_folders_3_emotions_02(pictures_train)
    except Exception as e:
      print(e)
    try:
      create_folders_3_emotions_02(pictures_test)
    except Exception as e:
      print(e)
  elif emotion_category is '303':
    # create folders
    try:
      create_folders_3_emotions_03(pictures_train)
    except Exception as e:
      print(e)
    try:
      create_folders_3_emotions_03(pictures_test)
    except Exception as e:
      print(e)
  else:
    print('Emotion Category Unknown!')


  # for each emotion folder
  for folder in os.listdir(path):

    emotion_path = path + '/' + str(folder)
    emotion = str(folder)

    # get all images inside that emotion folder
    all_images = os.listdir(emotion_path)

    smallest_amount = get_length_of_minimum_emotion(path)

    all_images = all_images[:smallest_amount]
    # split into train and test
    train_images, test_images = train_test_split(all_images, test_size = 0.2)

    print(emotion)
    print(len(train_images)), print(len(test_images))
    print("----------")

    output_folder_test = pictures_test + "/" + emotion
    output_folder_train = pictures_train + "/" + emotion
    
    for image in train_images:
      image_name = os.path.join(emotion_path, image)
      shutil.copy(image_name, output_folder_train)

    for image in test_images:
      image_name = os.path.join(emotion_path, image)
      shutil.copy(image_name, output_folder_test)

In [None]:
copy_images_to_train_test_data('303', pictures_cropped_3_03)

# **Neural Network**

In [None]:
def get_data():
    data_CNN = '/content/drive/MyDrive/Coin Project 1/Development_2/CNN'

    '''
    Image Augmentation:
    - Rotate
    - Flip (horizontally & vertically)
    - Grayscale
    - Blur
    '''
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Resize((224, 224)),
    ])
    train_set = datasets.ImageFolder(data_CNN + '/Train_Data', transform = transform)
    test_set = datasets.ImageFolder(data_CNN + '/Test_Data', transform = transform)

    return train_set, test_set

In [None]:
train_set, val_set = get_data()
image_show_dl = torch.utils.data.DataLoader(train_set, batch_size=8, shuffle=True, num_workers=1)

In [None]:
_CLASSES = {
    0: 'Anger/Fear/Anxious',
    1: 'Neutral',
    2: 'Relaxed'
}

In [None]:
def train_imshow():
  dataiter = iter(image_show_dl)
  images, labels = dataiter.next()
  print(labels)
  fig, axes = plt.subplots(figsize=(48, 20), ncols=8)
  for i in range(8):
      ax = axes[i]
      ax.imshow(images[i].permute(1, 2, 0)) 
      ax.title.set_text(' '.join('%5s' % _CLASSES[labels[i].item()]))
  plt.show()

In [None]:
train_imshow()

In [None]:
from sklearn.metrics import f1_score
def accuracy(out, yb):
  preds = torch.argmax(out, dim=1)
  return (preds == yb).float().mean(), f1_score(yb.cpu(), preds.cpu(), average='weighted')

In [None]:
# Function to plot
import matplotlib.pyplot as plt
def plot(loss_train, loss_val, labely, labelx, plotLabel1,plotLabel2):  
  # Plot loss
  fig, ax1 = plt.subplots()

  ax1.plot(loss_train, color='red', label=plotLabel1)
  ax1.plot(loss_val, color='blue', label=plotLabel2)
  ax1.set_xlabel(labelx)
  ax1.set_ylabel(labely)
  ax1.legend()
  fig.tight_layout()

In [None]:
# Training Loop
from sklearn.metrics import f1_score
import copy

def train_model(name, model, train_dl, val_dl , epochs, optimizer, loss_func, batch_size, lr):
    print("epoch | train loss | train acc | val loss | val acc | test f1 | val f1")
    df = pd.DataFrame({})
    loss_train_list = list()
    loss_val_list = list()
    accuracy_epoch_train = list()
    accuracy_epoch_val = list()

    for epoch in range(epochs):

      model.train()
      total_acc = 0
      total_loss = 0
      total_f1_score = 0

      for xb_train, yb_train in train_dl:
        # Move to gpu
        xb_train = xb_train.to(_DEVICE)
        yb_train = yb_train.to(_DEVICE)
        # Clear old gradients from last step
        optimizer.zero_grad()
        # Forward pass 
        pred = model(xb_train)
        # Calculate loss
        loss = loss_func(pred, yb_train)
        
        # Calculate gradients
        loss.backward()
        # Update weights
        optimizer.step()
        # Add loss and accs
        total_loss += loss
        accs, score = accuracy(pred, yb_train)
        total_acc += accs
        total_f1_score += score
        
      total_loss /= len(train_dl)
      total_acc /= len(train_dl)
      total_f1_score /= len(train_dl)

      total_acc_val = 0
      total_loss_val = 0
      total_f1_score_val = 0
      model.eval()

      with torch.no_grad():
        for xb_val, yb_val in val_dl:
          # Move to gpu
          xb_val = xb_val.to(_DEVICE)
          yb_val = yb_val.to(_DEVICE)
          # Forward pass
          pred_val = model(xb_val)
          # Calcualte loss
          loss_val = loss_func(pred_val, yb_val)

          # Add losses and accs
          total_loss_val += loss_val
          accs_val, score_val = accuracy(pred_val, yb_val)
          total_acc_val += accs_val
          total_f1_score_val += score_val
      
      total_acc_val /= len(val_dl)
      total_loss_val /= len(val_dl)
      total_f1_score_val /= len(val_dl)

      #if (epoch+1) % 2 == 0: # save every x iteration
      save_model(model.cpu(), name, epoch + 1, batch_size, total_loss.item(), lr)
      confusion_matrix_custom(model, val_dl, name, epoch)
        #pass

      # Debug print
      print("-------------------------------------------------------------------------")
      print(f"  {epoch}   |    {total_loss.item():.4f}  |  {total_acc.item():.4f}   |  {total_loss_val.item():.4f}  |   {total_acc_val.item():.4f} | {total_f1_score:.4f} | {total_f1_score_val:.4f} | ")

      # Add losses to list 
      loss_train_list.append(total_loss.item())
      loss_val_list.append(total_loss_val.item())
      accuracy_epoch_train.append(total_acc.item())
      accuracy_epoch_val.append(total_acc_val.item())

      performance = {
        'epoch': epoch + 1,
        'train_batch_loss': total_loss.item(),
        'val_batch_loss': total_loss_val.item(),
        'train_acc': total_acc.item(),
        'val_acc': total_acc_val.item(),
        'train_f1-score': total_f1_score,
        'val_f1-score': total_f1_score_val,
      }
      df = df.append(performance, ignore_index=True)


      model = model.to(_DEVICE)

    # return loss_train_list, loss_val_list, accuracy_epoch_train, accuracy_epoch_val
    save_csv(df, name)
    return model

In [None]:
import torchvision.models as models

def train_all(train_set, val_set, resnet18):
  PATH = '/content/drive/MyDrive/Coin Project 1/Development_2/Notebooks/Models'
  counter = 0
  learning_rates = [0.001, 0.005, 0.01]
  batch_sizes = [16, 32]

  for batch_size in batch_sizes:   

    # dataloader
    dataloader_train = torch.utils.data.DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=1)
    dataloader_valid = torch.utils.data.DataLoader(val_set, batch_size=batch_size, shuffle=True, num_workers=1)

    for lr in learning_rates:
      if resnet18:
        mod = models.resnet18(pretrained=True).to(_DEVICE)
      else:
        mod = models.resnet152(pretrained=True).to(_DEVICE)

      # freeze parameters
      for param in mod.parameters():
        param.requires_grad = False

      # change last output layer
      mod.fc.requires_grad = True

      if resnet18:
        mod.fc = nn.Linear(in_features=512, out_features=4).to(_DEVICE)
      else:
        mod.fc = nn.Linear(in_features=2048, out_features=4).to(_DEVICE)

      # ml configurations
      loss_function = nn.CrossEntropyLoss()
      epochs = 10
      optimizer = torch.optim.Adam(mod.parameters(), lr=lr)

      # train model
      print('########################################################################################')
      print('3.03 ResNet152; Counter ', str(counter), '; Batch-Size: ', str(batch_size), ';LR: ', str(lr))
      naming = '3.03 ResNet152; Counter ' + str(counter) + 'Batch-Size: ' + str(batch_size) + 'Learning Rate' + str(lr)
      trained_model = train_model(naming, mod, dataloader_train, dataloader_valid, epochs, optimizer, loss_function, batch_size, lr)

      # confusion matrix
      
      trained_model = trained_model.to(device='cpu')
      print('########################################################################################')
      counter = counter + 1

In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sn
import pandas as pd

def confusion_matrix_custom(model, dataloader_valid, name, epoch):
  y_pred = [] 
  y_true = [] 

  # iterate over test data
  for inputs, labels in dataloader_valid:
    labels = labels.to(_DEVICE)
    inputs = inputs.to(_DEVICE)
    model = model.to(_DEVICE)
    output = model(inputs) # Feed Network

    output = (torch.max(torch.exp(output), 1)[1]).data.cpu().numpy().tolist()
    y_pred = y_pred + output # Save Prediction
    
    labels = labels.data.cpu().numpy().tolist()
    y_true = y_true + labels # Save Truth

  classes = ('Angry/Scared/Anxious', 'Neutral', 'Relaxed')

  # Build confusion matrix
  cf_matrix = confusion_matrix(y_true, y_pred)
  df_cm = pd.DataFrame(cf_matrix, index = [i for i in classes], columns = [i for i in classes])
  plt.figure(figsize = (12,7))
  sn.heatmap(df_cm, annot=True)
  plt.savefig('/content/drive/MyDrive/Coin Project 1/Development_2/Notebooks/Models/' + name + '/' + name + '_' + str(epoch) + '_cm.png')
  plt.close()

# **Loading and Storing ResNet18 Model**

In [None]:
def load_resnet_18(name, path='/content/drive/MyDrive/Coin Project 1/Development_2/Notebooks/Models'):
  model = models.resnet18(pretrained=True)
  model.fc = nn.Linear(in_features=512, out_features=4)
  model.load_state_dict(torch.load(path + '/' + name))
  model.eval()

In [None]:
def load_resnet_152(name, path='/content/drive/MyDrive/Coin Project 1/Development_2/Notebooks/Models'):
  model = models.resnet152(pretrained=True)
  model.fc = nn.Linear(in_features=2048, out_features=4)
  model.load_state_dict(torch.load(path + '/' + name))
  model.eval()

In [None]:
import time
import os
def save_model(model, name, epoch, batch_size, loss, lr, path='/content/drive/MyDrive/Coin Project 1/Development_2/Notebooks/Models'):
  output_dir = path + '/' +  name
  try:
    os.mkdir(output_dir)
  except Exception as e:
    # already created directory
    pass
  torch.save(model.state_dict(), output_dir + '/' + name  + '_lr_' + str(lr) + '_epoch_' + str(epoch) + '_loss_' + str(loss))

In [None]:
def save_csv(df, name, path='/content/drive/MyDrive/Coin Project 1/Development_2/Notebooks/Models'):
  output_dir = path + '/' +  name
  df.to_csv(output_dir +  '/' +  'performance.csv', index=False, encoding='utf-8')

# **Training**

In [None]:
train_all(train_set, val_set, False) # ResNet18 architecture if True

# **ResNet18**

In [None]:
import torchvision.models as models

resnet18_pretrained = models.resnet18(pretrained=True).to(_DEVICE)

# freeze parameters
for param in resnet18_pretrained.parameters():
  param.requires_grad = False

# change last output layer
resnet18_pretrained.fc.requires_grad = True
resnet18_pretrained.fc = nn.Linear(in_features=512, out_features=4).to(_DEVICE)

# Define the number of epochs (the number of training iterations over the entire data set)
n_epochs_resnet18_pretrained = 8

# Define the learning rate 
lr_resnet18_pretrained = 0.01

# Define the optimizer. Here we will use Stochastic Gradient Descent
optimizer_resnet18_pretrained = torch.optim.Adam(resnet18_pretrained.parameters(), lr=lr_resnet18_pretrained)

# We will use MSE/NLLLoss as our loss function
loss_func_resnet18_pretrained = nn.CrossEntropyLoss()


# Training 
lossAndAccsList_pretrained = train(resnet18_pretrained, train_dl, val_dl , n_epochs_resnet18_pretrained, optimizer_resnet18_pretrained, loss_func_resnet18_pretrained)
plot(lossAndAccsList_pretrained[0], lossAndAccsList_pretrained[1], "Loss", "Epoch", "Loss Train", "Loss Validation")
plot(lossAndAccsList_pretrained[2], lossAndAccsList_pretrained[3], "Accuracy", "Epoch", "Accuracy Train", "Accuracy Validation")

# **ResNet152**

In [None]:
import torchvision.models as models

resnet152_pretrained = models.resnet152(pretrained=True).to(_DEVICE)

# freeze parameters
for param in resnet152_pretrained.parameters():
  param.requires_grad = False

# change last output layer
resnet152_pretrained.fc.requires_grad = True
resnet152_pretrained.fc = nn.Linear(in_features=2048, out_features=4).to(_DEVICE)

# check if freezed
for param in resnet152_pretrained.parameters():
  pass
  #print(param.requires_grad)

# Define the number of epochs (the number of training iterations over the entire data set)
n_epochs_resnet152_pretrained = 8

# Define the learning rate 
lr_resnet152_pretrained = 0.005

# Define the optimizer. Here we will use Stochastic Gradient Descent
optimizer_resnet152_pretrained = torch.optim.Adam(resnet152_pretrained.parameters(), lr=lr_resnet152_pretrained)

# We will use MSE/NLLLoss as our loss function
loss_func_resnet152_pretrained = nn.CrossEntropyLoss()


# Training 
lossAndAccsList_pretrained = train(resnet152_pretrained, train_dl, val_dl , n_epochs_resnet152_pretrained, optimizer_resnet152_pretrained, loss_func_resnet152_pretrained)
plot(lossAndAccsList_pretrained[0], lossAndAccsList_pretrained[1], "Loss", "Epoch", "Loss Train", "Loss Validation")
plot(lossAndAccsList_pretrained[2], lossAndAccsList_pretrained[3], "Accuracy", "Epoch", "Accuracy Train", "Accuracy Validation")

# **For Individual Predictions**

In [None]:
def preprocessImageFromPath(imagePath):
  img = Image.open(imagePath)

  transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Resize((224, 224)),
    ])
  
  img_tensor = transform(img).float()
  return img_tensor

In [None]:
def check_result(y_hat):
  tmp = y_hat.squeeze().tolist()
  max_val = max(tmp)
  predicted_class = tmp.index(max_val)
  print('Predicted Class: ', predicted_class)

In [None]:
def classify_image(model, picture_path):
  image = preprocessImageFromPath(picture_path)
  image = image.unsqueeze(0)
  # Predict image
  image = image.to(device=_DEVICE)

  with torch.no_grad():
    prediction = model(image)
    prediction_class = _CLASSES[np.argmax(prediction.to(device='cpu')).item()]
    print('Cat Emotion Prediction: ', prediction_class)

In [None]:
fear_picture = '/content/drive/MyDrive/Coin Project 1/Development_2/Pictures_to_predict/alert_100.jpg'
classify_image(resnet18_pretrained, fear_picture)

In [None]:
def predict_picture(pic):
  pic = pic.to(device=device)
  with torch.no_grad():
    #pred_1 = resnet18_pretrained(tensor_angry)
    pred_2 = resnet18_pretrained(pic)
    print(np.argmax(pred_2.to(device='cpu')).item())

In [None]:
predict_picture(fear_picture)

In [None]:
# Save model
save_path = "/content/drive/MyDrive/Coin Project 1/Development_2/resnet_model_coin"
torch.save(resnet18_pretrained.state_dict(), save_path)

In [None]:
# Load trained model
resnet18_pretrained_loaded = models.resnet18(pretrained=True)
resnet18_pretrained_loaded.fc = nn.Linear(in_features=512, out_features=6).to(device)

resnet18_pretrained_loaded.load_state_dict(torch.load(save_path))
resnet18_pretrained_loaded.eval()

In [None]:
from torch.autograd import Variable

def image_loader(image_name):
    """load image, returns cuda tensor"""
    image = Image.open(image_name)

    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Resize((128, 128)),
    ])

    image = transform(image).float()
    image = Variable(image, requires_grad=True)
    image = image.unsqueeze(0)  # this is for VGG, may not be needed for ResNet
    return image.cuda()  # assumes that you're using GPU

image = image_loader("/content/drive/MyDrive/Coin Project 1/Development_2/Pictures_to_predict/angry.png")

resnet18_pretrained(image)