In [1]:
import keras
import numpy as np
import pandas as pd
import os
import pickle
from collections import Counter

from keras_vggface.vggface import VGGFace
from keras_vggface import utils
from tensorflow.keras.preprocessing.image import ImageDataGenerator

In [2]:
def fill_dropped_frame(name_folder, dropped_frames, df_old):
    # fill in the probabilities for dropped frames
    name_labels = ['prob_Neutral','prob_Anger','prob_Disgust','prob_Fear','prob_Happiness','prob_Sadness','prob_Surprise']
    df_new__ = pd.DataFrame(columns=['name_file','emotion','frame'] + name_labels)

    for i in name_folder:
        df_n = dropped_frames.loc[dropped_frames.name_file==i].copy()
        all_frame_n = df_n.frame.tolist()
        all_prob_n = pd.DataFrame(df_n.drop(['name_file', 'emotion', 'frame'], axis=1)).values
        for j in all_frame_n:
            if j - 1 not in all_frame_n:
                if j - 1 == 0:
                    c_df = df_old.loc[df_old.name_file == i].copy()
                    c_p = c_df.iloc[0,:].drop(['name_file', 'emotion', 'frame']).values
                    all_prob_n[all_frame_n.index(j)] = c_p
                else:
                    c_df = df_old[(df_old.name_file == i) & (df_old.frame <= j)]
                    if len(c_df) == 0:
                        c_df = df_old[(df_old.name_file == i) & (df_old.frame > j)]
                        c_p = c_df.iloc[0,:].drop(['name_file', 'emotion', 'frame']).values
                    else:
                        c_p = c_df.iloc[-1,:].drop(['name_file', 'emotion', 'frame']).values
                    all_prob_n[all_frame_n.index(j)]=c_p
            else:
                all_prob_n[all_frame_n.index(j)] = all_prob_n[all_frame_n.index(j-1)]

        for j in range(len(name_labels)):
            df_n[name_labels[j]]=all_prob_n[:,j].tolist()

        df_new__ = pd.concat([df_new__, df_n], ignore_index=True)
    return df_new__

def df_dropprd_frames(dict_not_frame, name_folder):
    # create df from dropprd frames
    name_file = []
    name_frame = []
    for k, v in dict_not_frame.items():
        if k in name_folder:
            name_file += [k]*len(v)
            name_frame += v

    zero = np.zeros((len(name_file),7))
    name_labels = ['prob_Neutral','prob_Anger','prob_Disgust','prob_Fear','prob_Happiness','prob_Sadness','prob_Surprise']

    dropped_frames = pd.DataFrame(columns=['name_file','emotion', 'frame'] + name_labels)

    dropped_frames.name_file = name_file
    dropped_frames.frame = name_frame

    for i in range(len(name_labels)):
        dropped_frames[name_labels[i]]=zero[:,i].tolist()

    dropped_frames[['frame']] = dropped_frames[['frame']].astype(int)
    return dropped_frames

In [3]:
# preparing data for a end-to-end model
# path to the folder with images grouped by video name
images = 'C:/Users/ryumi/Desktop/aff_wild/images_autors/'
filename = 'path_images_test.csv'
df_test = pd.read_csv('data_resnet_50/' + filename)

In [4]:
datagen_test = ImageDataGenerator(preprocessing_function=utils.preprocess_input)

size_img = (224, 224)
bs = 32

test_generator = datagen_test.flow_from_dataframe(dataframe=df_test,
                                                    directory=images,
                                                    x_col='path_images',
                                                    y_col='emotion',
                                                    target_size=size_img,
                                                    batch_size=bs,
                                                    class_mode=None,
                                                    shuffle=False)

In [5]:
# building the end-to-end model
resnet50 = VGGFace(model='resnet50', include_top=False, input_shape=(224, 224, 3), pooling='avg')

x = keras.layers.Dense(units = 1024,activation = 'relu')(resnet50.output)

x = keras.layers.Dropout(0.5)(x) 

x = keras.layers.Dense(7, activation = 'softmax')(x)         

model_resnet50 = keras.models.Model(resnet50.input, x)

In [6]:
# get predictions
path_model_img = 'models/resnet50/'
model_resnet50.load_weights(path_model_img + "/weights.h5")

test_prob=model_resnet50.predict_generator(test_generator, verbose=1)

Please note that the order of labels is set as follows: Anger, Disgust, Fear, Happiness, Neutral, Sadness, Surprise

In [7]:
name_test = test_generator.filenames
name_video_test, frame_test = [], []
name_labels = ['prob_Anger','prob_Disgust','prob_Fear','prob_Happiness','prob_Neutral','prob_Sadness','prob_Surprise']

for i in name_test:
    name_video_test.append(i.split('/')[0])
    frame_test.append(i.split('/')[1].split('.')[0]) 
    
df_ = pd.DataFrame(columns=['name_file','emotion','frame'] + name_labels)

df_.name_file=name_video_test
df_.frame=frame_test
df_[['name_file']] = df_[['name_file']].astype(str)
df_[['frame']] = df_[['frame']].astype(int)

for i in range(len(name_labels)):
    df_[name_labels[i]]=test_prob[:,i].tolist()
    
df_sort = df_[['name_file','emotion','frame','prob_Neutral','prob_Anger','prob_Disgust','prob_Fear','prob_Happiness','prob_Sadness','prob_Surprise']]

In [8]:
# create a dataframe with dropped frames
name_folder = Counter(df_.name_file.tolist()).keys()
path_counter_frame = 'files_needed_to_get_final_probabilities/counter_frame.csv'
counter_frame = pd.read_csv(path_counter_frame)
name_counter_frame = [i.split('.')[0] for i in counter_frame.name_video.tolist()]
total_counter_frame = counter_frame.total_frame.tolist()

dict_not_frame = pickle.load(open('files_needed_to_get_final_probabilities/frame_with_face_not_detected.pickle', 'rb'))
dropped_frames = df_dropprd_frames(dict_not_frame, name_folder)

In [9]:
# fill in the probabilities for dropped frames
df_new__ = fill_dropped_frame(name_folder, dropped_frames, df_)

# merge two dataframes
df_new_union = pd.concat([df_sort, df_new__], ignore_index=True)
df_new_union_sort = df_new_union.sort_values(by=['name_file', 'frame'])

In [10]:
# save probability
for i in name_folder:
    if len(i.split('_')) > 1 and len(i.split('_')[1]) > 1:
        counter_i = total_counter_frame[name_counter_frame.index(i.split('_')[0])]
    else:
        counter_i = total_counter_frame[name_counter_frame.index(i)]
    c_df = df_new_union_sort.loc[df_new_union_sort.name_file == i].copy()
    c_p = pd.DataFrame(c_df.drop(['name_file','emotion'], axis=1)).values
    df_recording = pd.DataFrame(data=c_p[:counter_i],columns=['frame_id','neutral_probability', 'anger_probability','disgust_probability','fear_probability','happiness_probability','sadness_probability','surprise_probability'])
    filename = '{}.txt'.format(i)
    df_recording.to_csv('probability/resnet50/test/' + filename, index=False)