<a href="https://colab.research.google.com/github/dcafarelli/CMT-ABAW2020-EXPR/blob/main/test/test_competition.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
import torch
import torch.nn as nn
import pandas as pd
import numpy as np
import torchvision
from torchvision import transforms, models
from torch.utils.data import DataLoader, Dataset
from PIL import Image
import matplotlib.pyplot as plt
import cv2
import sys
from fastprogress.fastprogress import master_bar, progress_bar
import sklearn.metrics as sm

In [None]:
#CUDA FOR PYTORCH

use_cuda = torch.cuda.is_available()
device = torch.device("cuda:0" if use_cuda else "cpu")
torch.backends.cudnn.benchmark = True #This flag allows you to enable the inbuilt cudnn auto-tuner to find the best algorithm to use for your hardware.
print(device)

In [None]:
path_best_model = '/best_performance_model/best_model-senet50-0.43.pt'

#--------- ckp path to load the model ---------
model_base_path_colab = '/content/gdrive/My Drive/TESI/FER/AffWild2/model_checkpoint/pytorch_models/senet50_ft_pytorch.pth'
model_ckp_path = '/content/gdrive/MyDrive/TESI/FER/AffWild2/model_checkpoint/pytorch_models/models_ckp_78561.pth.tar'

In [None]:
test_set_frames_path = '/Test_Set/test_set_only_path.pkl' #dataframe with test frames path

# TEST SET CREATION

In [None]:
class AffWild2TestSet(Dataset):
    def __init__(self, transform=None):

      pkl_path = test_set_frames_path

      self.emotion_frame = pd.read_pickle(pkl_path)
      self.transform = transform

    def __len__(self):
        return len(self.emotion_frame)
    
    def __getitem__(self, index):

      img_path = self.emotion_frame.iloc[index, 0]           
      fp = os.path.join('/content/cropped_aligned_test%s' %img_path) #here the path to test frames
      assert os.path.exists(fp), "Image not found at: {}".format(fp)
        
      test_set_face = Image.fromarray(cv2.imread(fp))
      if self.transform:
        test_set_face = self.transform(test_set_face)
        
      return test_set_face

In [None]:
#DATA TRANSFORMATION
def subtract_mean(x):
    mean_vector = [91.4953, 103.8827, 131.0912]
    x *= 255.
    x[0] -= mean_vector[0]
    x[1] -= mean_vector[1]
    x[2] -= mean_vector[2]
    return x

In [None]:
transformed_test = transforms.Compose([
                      transforms.Resize((224,224)),
                      transforms.ToTensor(),
                      transforms.Lambda(lambda x : subtract_mean(x))
                      ])

In [None]:
test_set = AffWild2TestSet(transform=transformed_test)

In [None]:
test_generator = DataLoader(test_set, batch_size = 64, num_workers = 0, pin_memory=True)

classes = ('Neutral', 'Anger', 'Disgust', 'Fear', 'Happiness', 'Sadness', 'Surprise')

# LOAD BEST MODEL SENET50

In [None]:
sys.path.append('/path/where/MainModel.py/is_located') #append the path where MainModel.py is located
import MainModel

In [None]:
def load_models(model_base_path, device="cpu", model_ckp=None):
    assert os.path.exists(model_base_path), "Base model checkpoint not found at: {}".format(model_base_path)
    model = torch.load(model_base_path)
    if model_ckp is not None:
        assert os.path.exists(model_ckp), f"Model checkpoint not found at: {model_ckp}"
        ckp = torch.load(model_ckp, map_location='cpu')
        [p.data.copy_(torch.from_numpy(ckp['model_state_dict'][n].numpy())) for n, p in model.named_parameters()]
        for n, m in model.named_modules():
            if isinstance(m, nn.BatchNorm2d):
                m.momentum = 0.1
                m.running_var = ckp['model_state_dict'][n + '.running_var']
                m.running_mean = ckp['model_state_dict'][n + '.running_mean']
                m.num_batches_tracked = ckp['model_state_dict'][n + '.num_batches_tracked']
    
    return model.to(device)

In [None]:
model = load_models(model_base_path_colab, device, None)

for k, m in model.named_modules():
  m._non_persistent_buffers_set = set()  # pytorch 1.6.0 compatability

In [None]:
model.classifier_1 = nn.Linear(2048, len(classes))

In [None]:
#FREEZING ALL THE LAYER EXCEPT THE FULLY CONNECTED ONE
for param in model.parameters():
  param.requires_grad = False

In [None]:
model = model.to(device)

In [None]:
#Load state dict
ckp = torch.load(path_best_model)
model.load_state_dict(ckp['state_dict'])

# PREDICTION ON TEST SET

In [None]:
def inference(model):
  pred = []

  model.eval()
  print("Enter Evaluation. Is Training?", model.training)
  with torch.no_grad():
    for j, faces in enumerate(progress_bar(test_generator)):

      faces = faces.to(device)

      _, outputs = model(faces)
      _, predictions = torch.max(outputs.data, 1)

      pred.append(predictions.cpu())
           
  return pred

In [None]:
predictions = inference(model)

In [None]:
pred_array = [t.numpy() for t in predictions]

pred_array = np.concatenate(pred_array, axis=0 )

In [None]:
#store predictions array 
np.save('/Test_Set/test_inference.npy', pred_array)

# WRITE LABEL ON FILES

In [None]:
lab_array = np.load('/Test_Set/test_inference.npy')

In [None]:
def ex_to_str(arr):
  str = "{:d}".format(arr)
  return str

In [None]:
#write array values on a txt file

text_file = open('/Test_Set/test_inference.txt', "w")
# write the values
for i in range(lab_array.shape[0]):
  n = text_file.write(ex_to_str(lab_array[i]))
  n = text_file.write('\n')
text_file.close()  

In [None]:
def create_test_output(filename):
  d = dict()
  with open(filename,'r') as fp:
    lines = fp.readlines()#.sort()
    lines.sort()
    lines = [line.strip().split('/')[1:3] for line in lines]
    print('\ntotal: ', len(lines), lines[:2])#,lines[-20:])
    
    for line in lines:
      key,value = line[0], line[1]
      #print(value)
      
      video_file = '/Test_Set/predictions/'+key+'.txt'
      if not os.path.exists(video_file):
        f = open(video_file,'w')
        f.write('Neutral,Anger,Disgust,Fear,Happiness,Sadness,Surprise')
      else:
        f = open(video_file,'a') 
            
      name, emotion = value.replace("'","").split(';')#v.replace("'","").split(',')[0], v.replace("'","").split(',')[1]
      label  = emotion
             
      f.write('\n'+ str(label))
      #print('\n'+name+' '+str(label))
      f.close()
             
  print('\nTest out created.')

In [None]:
'''
Create a .csv file with frame path/label(taken from 'inference.txt')
crate_test_output will output 223 .txt files with the predictions
'''

create_test_output('/Test_Set/test_set_inference.csv')