In [1]:
import os
import sys

import cv2
import pickle
import numpy as np
import pandas as pd
from PIL import Image
from pathlib import Path
from random import shuffle
import matplotlib.pyplot as plt

import torch 
import torchvision
import torch.nn as nn
import torch.utils.data as data
import torch.nn.functional as F
import torchvision.models as models
from torch.autograd import Variable
import torchvision.transforms as transforms

from skimage.transform import resize
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder, LabelEncoder
from sklearn.metrics import accuracy_score, mean_absolute_error, r2_score, f1_score
from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier
from sklearn.svm import SVR, SVC

%matplotlib inline

In [2]:
torch.device(0)
os.environ["CUDA_VISIBLE_DEVICES"]= "0"

# Data Loader

In [3]:
class Dataset_Loader(data.Dataset):
    def __init__(self, data_path, files, labels,  transform=None):
        self.data_path = data_path
        self.labels = labels
        self.files = files
        self.transform = transform

    def __len__(self):
        return len(self.files)

    def read_image(self, path, selected_file, use_transform):
        filename = path+selected_file+"/frame1.jpg"       
        image = Image.open(filename)      
        if use_transform is not None:
            image = use_transform(image)
        return image

    def __getitem__(self, index):
        X = self.read_image(self.data_path, self.files[index][:-4], self.transform)
        y = torch.FloatTensor(self.labels[index])
        return X, y

# CNN Model

In [4]:
# 2D CNN model using VGG-19 pretrained
class CNNModel(nn.Module):
    def __init__(self, out= 6, fc_hidden1= 512, fc_hidden2=64, pretrained=True):
        super(CNNModel, self).__init__()
        
        self.vgg19 = models.vgg19(pretrained=pretrained)
        num_features = self.vgg19.classifier[6].in_features
        modules = list(self.vgg19.classifier.children())[:-1] # Remove last layer

        modules.extend([nn.Linear(num_features, fc_hidden1)]) 
        modules.extend([nn.ReLU(inplace=True)]) 
        modules.extend([nn.Linear(fc_hidden1, fc_hidden2)]) 
        modules.extend([nn.ReLU(inplace=True)]) 
        modules.extend([nn.Linear(fc_hidden2, out)]) 
        
        self.vgg19.classifier = nn.Sequential(*modules) # Replace the model classifier
        
        self.sigmoid = torch.nn.Sigmoid()
        
    def forward(self, x_inp):
        
        x = self.vgg19(x_inp) 
        x = self.sigmoid(x)
            
        return x

# Setting up the Data Loading

In [5]:
#Set Path
training_data_path = "/home/ramsub/first-impressions/data/image_data/training_data/"    
validation_data_path = "/home/ramsub/first-impressions/data/image_data/validation_data/"
test_data_path = "/home/ramsub/first-impressions/data/image_data/test_data/"
save_model_path = "./saved_models/"

#Read CSV files
aud_train = pd.read_csv('../audio/pickle_files/training_df_all.csv')
aud_test = pd.read_csv('../audio/pickle_files/test_df_all.csv')
aud_val = pd.read_csv('../audio/pickle_files/validation_df_all.csv')

labels = ['interview_score', 'openness', 'conscientiousness', 'extraversion', 'agreeableness', 'neuroticism']

train_label = aud_train[labels].values
train_list = aud_train['video_id'].values

test_label = aud_test[labels].values
test_list = aud_test['video_id'].values

val_label = aud_val[labels].values
val_list = aud_val['video_id'].values

In [6]:
# Train Function
def train(model, device, train_loader, optimizer, epoch, log_interval):
    model.train()

    losses = []
    N_count = 0 
    for batch_idx, (X, y) in enumerate(train_loader):
        X, y = X.to(device), y.to(device)
        
        N_count += X.size(0)

        optimizer.zero_grad()
        output = model(X) 
        
        criterion = nn.MSELoss(reduction = 'sum')
        loss = criterion(output, y)
        losses.append(loss.item())
        
        loss.backward()
        optimizer.step()
        
        if (batch_idx + 1) % log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\t\tLoss: {:.6f}'.format(
                epoch + 1, N_count, len(train_loader.dataset), 100. * (batch_idx + 1) / len(train_loader), loss.item()))

    return losses

# Evaluation Function
def evaluation(model, device, loader):
    model.eval()
    
    loss = 0
    all_y = []
    all_y_pred = []
    score = []
    with torch.no_grad():
        for (X, y) in loader:
            X, y = X.to(device), y.to(device)

            output = model(X)
            
            criterion = nn.MSELoss(reduction = 'sum')
            loss = criterion(output, y)
            loss += loss.item() 
            
            # collect all y and y_pred in all batches
            all_y.extend(y.cpu().detach().numpy())
            all_y_pred.extend(output.cpu().detach().numpy())

    loss /= len(loader.dataset)
    
    all_y = np.asarray(all_y)
    all_y_pred = np.asarray(all_y_pred)
    
    for i in range(all_y.shape[1]):
        score.extend([1 - mean_absolute_error(all_y[:,i], all_y_pred[:,i])])

    return loss.cpu().detach().numpy(), np.asarray(score), all_y, all_y_pred

# Setting up the CNN Model params

In [7]:
img_size = 224
epochs = 1
batch_size = 64
learning_rate = 1e-4
l_decay = 5e-4
log_interval = 10

use_cuda = torch.cuda.is_available()                   # check if GPU exists
device = torch.device("cuda" if use_cuda else "cpu")   # use CPU or GPU
print(device)

train_params = {'batch_size': batch_size, 'shuffle': True, 'num_workers': 0, 'pin_memory': True} if use_cuda else {}
test_params = {'batch_size': batch_size, 'shuffle': False, 'num_workers': 0, 'pin_memory': True} if use_cuda else {}

transform = transforms.Compose([transforms.Resize([img_size, img_size]),
                                transforms.RandomHorizontalFlip(),
                                transforms.ToTensor(),
                                transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
                               ])

cuda


In [8]:
train_set = Dataset_Loader(training_data_path, train_list, train_label, transform=transform)
train_loader = data.DataLoader(train_set, **train_params)

val_set = Dataset_Loader(validation_data_path, val_list, val_label, transform=transform)
val_loader = data.DataLoader(val_set, **test_params)

test_set = Dataset_Loader(test_data_path, test_list, test_label, transform=transform)
test_loader = data.DataLoader(test_set, **test_params)

model = CNNModel().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

val_losses = []
val_scores = []
train_losses = []
train_scores = []

epoch_train_loss, epoch_train_score, train_annot, train_annot_pred = evaluation(model, device, train_loader)
epoch_val_loss, epoch_val_score, test_annot, test_annot_pred = evaluation(model, device, val_loader)
    
train_losses.append(epoch_train_loss.tolist())    
train_scores.append(epoch_train_score.tolist())
val_losses.append(epoch_val_loss.tolist())    
val_scores.append(epoch_val_score.tolist())

# Training

In [None]:
for epoch in range(epochs):
    train(model, device, train_loader, optimizer, epoch, log_interval)    
    
    epoch_train_loss, epoch_train_score, train_annot, train_annot_pred = evaluation(model, device, train_loader)
    epoch_val_loss, epoch_val_score, test_annot, test_annot_pred = evaluation(model, device, val_loader)
    
    train_losses.append(epoch_train_loss.tolist())    
    train_scores.append(epoch_train_score.tolist())
    val_losses.append(epoch_val_loss.tolist())    
    val_scores.append(epoch_val_score.tolist())

    if min(val_losses) == epoch_val_loss:
        torch.save(model.state_dict(), os.path.join(save_model_path, 'vgg_model_trained.pth'))

np.save(save_model_path+'/train_score',train_scores)
np.save(save_model_path+'/train_loss',train_losses)
np.save(save_model_path+'/val_score',val_scores)
np.save(save_model_path+'/val_loss',val_losses)

# Evaluation

In [9]:
saved_model_t = CNNModel(out= 6, fc_hidden1= 512, fc_hidden2=64, pretrained=False).to(device)
saved_model_t.load_state_dict(torch.load(os.path.join(save_model_path, 'vgg_model_trained.pth')))

test_loss, test_score, test_annot, test_annot_pred = evaluation(saved_model_t, device, test_loader)
print('\nTesting loss: {:.7f}\n Interview: {:.4f}\n openness: {:.4f}\n conscientiousness: {:.4f}\n extraversion: {:.4f}\n agreeableness: {:.4f}\n neuroticism: {:.4f}'.format(test_loss, test_score[0],test_score[1],test_score[2],test_score[3],test_score[4],test_score[5]))


Testing loss: 0.0014462
 Interview: 0.9106
 openness: 0.9048
 conscientiousness: 0.9070
 extraversion: 0.9044
 agreeableness: 0.9078
 neuroticism: 0.9019
