In [32]:
import os
import sys

sys.path.append('/work/home/dsu/EngagementRecognition/')
sys.path.append('/work/home/dsu/datatools/')
sys.path.append('/work/home/dsu/simple-HRNet-master/')

from functools import partial
from typing import Dict, Tuple, Callable, List

import torch
import tqdm
import pandas as pd
import numpy as np
import torch
from scipy.stats import stats
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from torch.utils.data import DataLoader
import torchvision.transforms as T
from torchvision.io import read_image

from pytorch_utils.data_preprocessing import convert_image_to_float_and_scale
from pytorch_utils.models.input_preprocessing import resize_image_saving_aspect_ratio, EfficientNet_image_preprocessor
from pytorch_utils.models.CNN_models import Modified_EfficientNet_B1, Modified_EfficientNet_B4
from pytorch_utils.models.Pose_estimation.HRNet import Modified_HRNet
from visualization.ConfusionMatrixVisualization import plot_and_save_confusion_matrix

In [2]:
def get_preprocessing_functions(model_type):
    if model_type not in ['EfficientNet-B1', 'EfficientNet-B4', 'Modified_HRNet']:
        raise ValueError('The model type should be either "EfficientNet-B1", "EfficientNet-B4" or "Modified_HRNet".')
    # define preprocessing functions
    if model_type == 'EfficientNet-B1':
        preprocessing_functions = [partial(resize_image_saving_aspect_ratio, expected_size=240),
                                   EfficientNet_image_preprocessor()]
    elif model_type == 'EfficientNet-B4':
        preprocessing_functions = [partial(resize_image_saving_aspect_ratio, expected_size=380),
                                   EfficientNet_image_preprocessor()]
    elif model_type == 'Modified_HRNet':
        preprocessing_functions = [partial(resize_image_saving_aspect_ratio, expected_size=256),
                                   convert_image_to_float_and_scale,
                                   T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
                                   ]  # From HRNet
    else:
        raise ValueError(f'The model type should be either "EfficientNet-B1", "EfficientNet-B4", or "Modified_HRNet".'
                         f'Got {model_type} instead.')
    return preprocessing_functions

In [3]:
def get_class_from_range(value):
    if value > 0 and value<=3:
        return 0
    elif value > 3 and value<=7:
        return 1
    elif value > 7 and value<=10:
        return 2
    else:
        raise ValueError(f'The value should be from the range 1 to 10. Got:', value)

# data loading function
def load_data(path_to_data:str)->Dict[str, pd.DataFrame]:
    # load csv file
    data = pd.read_csv(path_to_data)
    data = data[['path_to_frame','timestamp','engagement_hhi']]
    # transform labels. They are now in range 1 to 10, while we need classes (0, 1, 2). We transform them in a way that:
    # 1-3 means disengagement (class 0)
    # 4-7 means neutral (class 1)
    # 8-10 means engagement (class 2)
    data['engagement_hhi'] = data['engagement_hhi'].apply(lambda x: get_class_from_range(x))
    # cleaning the column names
    data = data.rename(columns={"path_to_frame": "path",
                                "engagement_hhi":"label"})
    # transform data to Dict[str, pd.DataFrame], where str is the video name and the pd.DataFrame is paths to images with labels
    separated_videos = {}
    video_names = data['path'].apply(lambda x:x.split("/")[-2])
    video_names = video_names.unique()
    for video_name in video_names:
        separated_videos[video_name] = data[data['path'].str.contains('/' + video_name + '/')]
    return separated_videos

In [4]:
def create_model(model_type:str):
    if model_type == "EfficientNet-B1":
        model = Modified_EfficientNet_B1(embeddings_layer_neurons=256, num_classes=3,
                                     num_regression_neurons=None)
    elif model_type == "EfficientNet-B4":
        model = Modified_EfficientNet_B4(embeddings_layer_neurons=256, num_classes=3,
                                         num_regression_neurons=None)
    elif model_type == "Modified_HRNet":
        model = Modified_HRNet(pretrained=True,
                               path_to_weights=HRNET_WEIGHTS,
                               embeddings_layer_neurons=256, num_classes=3,
                               num_regression_neurons=None,
                               consider_only_upper_body=True)
    else:
        raise ValueError("Unknown model type: %s" % model_type)
    return model

In [5]:
def load_and_preprocess_image(path:str, preprocessing_functions:List[Callable])->torch.Tensor:
    image = read_image(path)
    for function in preprocessing_functions:
        image = function(image)
    return image

In [84]:
def _evaluate_model_one_video(video:pd.DataFrame, *, model, preprocessing_functions, batch_size,
                             device:torch.device)->Tuple[int, int]:
    labels = []
    predictions = []
    images = []
    for idx in range(video.shape[0]):
        path_to_image = video.iloc[idx,0]
        label = video.iloc[idx,-1]
        image = load_and_preprocess_image(path_to_image, preprocessing_functions)
        images.append(image)
        labels.append(label)
    # convert images and labels into numpy array
    images = [np.array(image)[np.newaxis,...] for image in images]
    labels = [np.array(label)[np.newaxis,...] for label in labels]
    images = np.concatenate(images, axis=0)
    labels = np.concatenate(labels, axis=0)
    # if there is only one image for the entire video, skip it
    if images.shape[0]==1:
        return np.NaN, np.NaN
    # predicting batch_wise, because it is faster
    with torch.no_grad():
        for i in range(0,images.shape[0],batch_size):
            start = i
            end = i + batch_size
            # check if we have only one image in the last batch. If so, the BatchNorm will arise error.
            # therefore, we include that last image in the previous batch and then just do continue on the last iteration
            if end == images.shape[0]-1:
                end+=1
            data = images[start:end]
            if data.shape[0]==1:
                continue
            data = torch.from_numpy(data)
            data = data.to(device)
            pred = model(data)
            # reallocate to CPU
            pred = pred.cpu().detach()
            predictions.append(pred)
    # take mode for labels
    labels = stats.mode(labels, axis=None)[0][0]
    # sum up all prediction and take a softmax (since we have class probabilities, it is even smarter to do so to get the most probable label for entire video)
    predictions = torch.cat(predictions, dim=0)
    predictions = torch.sum(predictions, dim = 0).argmax().cpu().numpy().squeeze()

    # return two values
    return labels, predictions

def evaluate_model(videos:Dict[str,pd.DataFrame], model, preprocessing_functions, batch_size,
                  device:torch.device)->None:
    evaluation_metrics_classification = {'accuracy_classification': accuracy_score,
                                     'precision_classification': partial(precision_score, average='macro'),
                                     'recall_classification': partial(recall_score, average='macro'),
                                     'f1_classification': partial(f1_score, average='macro'),
                                     }
    labels = []
    predictions = []
    for video_name, video in tqdm.tqdm(videos.items()):
        #print("processing video:", video_name)
        l, p = _evaluate_model_one_video(video, model=model, preprocessing_functions=preprocessing_functions, 
                                                      batch_size=batch_size, device=device)
        predictions.append(p)
        labels.append(l)
    

    #[print(item) for item in predictions]
    predictions = np.array(predictions).squeeze()
    labels = np.array(labels).squeeze()
    return predictions, labels


In [85]:
# main for facial model evaluation
# params
model_type = "EfficientNet-B1"
model_weights = "/work/home/dsu/tmp/deep-capybara-42.pth" # TODO: complete it
path_to_data = "/media/external_hdd_1/MHHRI/mhhri/prepared_data/HHI_Ego_Recordings/faces/MHHRI_facial_labels.csv"
batch_size = 64
preprocessing_functions = get_preprocessing_functions(model_type)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# create and load model
model = create_model(model_type)
model.load_state_dict(torch.load(model_weights))
model = model.to(device)
# load data
data = load_data(path_to_data) # Dict[str,pd.DataFrame]
#data = {k: data[k] for k in list(data)[:10]}
# evaluate model
predictions, labels = evaluate_model(videos=data, model=model, preprocessing_functions=preprocessing_functions, batch_size=batch_size,
                        device=device)



  labels = stats.mode(labels, axis=None)[0][0]
  labels = stats.mode(labels, axis=None)[0][0]
100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 290/290 [02:32<00:00,  1.90it/s]


In [97]:
path_to_data = "/media/external_hdd_1/MHHRI/mhhri/prepared_data/HHI_Ego_Recordings/faces/MHHRI_facial_labels.csv"
data = pd.read_csv(path_to_data)
data = data[['path_to_frame','timestamp','engagement_hhi']]
# transform labels. They are now in range 1 to 10, while we need classes (0, 1, 2). We transform them in a way that:
# 1-3 means disengagement (class 0)
# 4-7 means neutral (class 1)
# 8-10 means engagement (class 2)
#data['engagement_hhi'] = data['engagement_hhi'].apply(lambda x: get_class_from_range(x))
# cleaning the column names
data = data.rename(columns={"path_to_frame": "path",
                            "engagement_hhi":"label"})

In [100]:
data

Unnamed: 0,path,timestamp,label
0,/media/external_hdd_2/MHHRI/mhhri/prepared_dat...,0.00,5
1,/media/external_hdd_2/MHHRI/mhhri/prepared_dat...,0.20,5
2,/media/external_hdd_2/MHHRI/mhhri/prepared_dat...,0.40,5
3,/media/external_hdd_2/MHHRI/mhhri/prepared_dat...,0.60,5
4,/media/external_hdd_2/MHHRI/mhhri/prepared_dat...,0.80,5
...,...,...,...
77691,/media/external_hdd_2/MHHRI/mhhri/prepared_dat...,31.23,8
77692,/media/external_hdd_2/MHHRI/mhhri/prepared_dat...,31.43,8
77693,/media/external_hdd_2/MHHRI/mhhri/prepared_dat...,31.63,8
77694,/media/external_hdd_2/MHHRI/mhhri/prepared_dat...,31.83,8


In [91]:
evaluation_metrics_classification = {'accuracy_classification': accuracy_score,
                                     'precision_classification': partial(precision_score, average='macro'),
                                     'recall_classification': partial(recall_score, average='macro'),
                                     'f1_classification': partial(f1_score, average='macro'),
                                     }

# filter out NaNs
labels, predictions = labels[~np.isnan(labels)], predictions[~np.isnan(labels)]
# calculate evaluation metrics
evaluation_metrics = {
    metric: evaluation_metrics_classification[metric](labels, predictions)
    for metric in evaluation_metrics_classification
}
# print evaluation metrics
for metric_name, metric_value in evaluation_metrics.items():
    print("%s: %.4f" % (metric_name, metric_value))
# plot confusion matrix
plot_and_save_confusion_matrix(y_true=labels, y_pred=predictions, name_labels=['disengaged', 'neutral', 'engaged'],
                               path_to_save='/work/home/dsu/tmp/', name_filename='MHHRI_f2f_cm.png')
return evaluation_metrics

accuracy_classification: 0.3737
precision_classification: 0.1869
recall_classification: 0.5000
f1_classification: 0.2720


  _warn_prf(average, modifier, msg_start, len(result))


ValueError: Shape of passed values is (2, 2), indices imply (3, 3)

In [62]:
# main for facial model evaluation
# params
model_type = "EfficientNet-B1"
model_weights = "/work/home/dsu/tmp/deep-capybara-42.pth" # TODO: complete it
path_to_data = "/media/external_hdd_1/MHHRI/mhhri/prepared_data/HHI_Ego_Recordings/faces/MHHRI_facial_labels.csv"
batch_size = 64
preprocessing_functions = get_preprocessing_functions(model_type)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# create and load model
model = create_model(model_type)
model.load_state_dict(torch.load(model_weights))
model = model.to(device)
# load data
data = load_data(path_to_data) # Dict[str,pd.DataFrame]
#data = {k: data[k] for k in list(data)[:10]}
# evaluate model
facial_metrics = evaluate_model(videos=data, model=model, preprocessing_functions=preprocessing_functions, batch_size=batch_size,
                        device=device)

  labels = stats.mode(labels, axis=None)[0]
  labels = stats.mode(labels, axis=None)[0]
100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 290/290 [02:32<00:00,  1.90it/s]
  labels = np.array(labels).squeeze()


TypeError: ufunc 'isnan' not supported for the input types, and the inputs could not be safely coerced to any supported types according to the casting rule ''safe''

In [None]:
# main for pose model evaluation
# params
HRNET_WEIGHTS = "/work/home/dsu/simple-HRNet-master/pose_hrnet_w32_256x192.pth"
model_type = "Modified_HRNet"
model_weights = "/work/home/dsu/tmp/fresh-bush-43.pth" # TODO: complete it
path_to_data = "/media/external_hdd_1/MHHRI/mhhri/prepared_data/HHI_Ego_Recordings/poses/MHHRI_pose_labels.csv"
batch_size = 64
preprocessing_functions = get_preprocessing_functions(model_type)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# create and load model
model = create_model(model_type)
model.load_state_dict(torch.load(model_weights))
model = model.to(device)
# load data
data = load_data(path_to_data) # Dict[str,pd.DataFrame]
# evaluate model
pose_metrics = evaluate_model(videos=data, model=model, preprocessing_functions=preprocessing_functions, batch_size=batch_size,
                        device=device)