In [None]:
import pickle
pickle.HIGHEST_PROTOCOL = 4

import numpy as np
import pandas as pd

import os
from pathlib import Path
import func_proc_filepath as mFILE
from tqdm import tqdm
from IPython.display import display
import shutil

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder

import tensorflow as tf
import keras
from keras.models import Sequential, Model
from keras.layers import Dense, Dropout, Activation, Flatten, BatchNormalization, GRU, Bidirectional
from keras.optimizers import Adam
from keras_self_attention import SeqSelfAttention
from keras.utils import Sequence
from keras import backend as K

import random

from sklearn.metrics import accuracy_score, f1_score, recall_score, precision_score
from keras.callbacks import Callback, EarlyStopping, ModelCheckpoint

from sklearn.ensemble import RandomForestClassifier
# LightGBM
import lightgbm as lgb

import warnings
warnings.filterwarnings('ignore')

In [None]:
def f1(y_true, y_pred):
    def recall(y_true, y_pred):
        """Recall metric.

        Only computes a batch-wise average of recall.

        Computes the recall, a metric for multi-label classification of
        how many relevant items are selected.
        """
        true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
        possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
        recall = true_positives / (possible_positives + K.epsilon())
        return recall

    def precision(y_true, y_pred):
        """Precision metric.

        Only computes a batch-wise average of precision.

        Computes the precision, a metric for multi-label classification of
        how many selected items are relevant.
        """
        true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
        predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
        precision = true_positives / (predicted_positives + K.epsilon())
        return precision
    precision = precision(y_true, y_pred)
    recall = recall(y_true, y_pred)
    return 2*((precision*recall)/(precision+recall+K.epsilon()))

In [None]:
def main():
    # root folder
    dir_root = str(Path(Path().resolve()).parent)

    # input: folder path including original images
    dir_data = dir_root + "\\dataset\\aff2_images\\dataset"
    dir_audio = dir_root + "\\dataset\\aff2_audio\\dataset"

    # input: expr labels
    file_label_test = dir_root + "\\src\\test_set\\test_set_Expr_Challenge.txt"
    file_frame_list = dir_root + "\\parameters\\frame_list.csv"
    
    dir_model_single = dir_root + "\\model_expr\\model_image"
    dir_model_multi = dir_root + "\\model_expr\\model_mix"
    
    # output: folder path
    dir_out = dir_root + "\\model_expr\\submission"
    if os.path.isdir(dir_out) == False:
        os.makedirs(dir_out)
        
    file_model = dir_model_multi + "\\model_modal_multi_norm_best.h5"
    file_pre_model = dir_model_single + "\\model_image_single_pseudo_best.h5"
    
    # read pre_model
    base_model = keras.models.load_model(file_pre_model, custom_objects={'f1':f1})
    layer_name = 'vgg_au'
    pre_model = Model(inputs=base_model.input, outputs=base_model.get_layer(layer_name).output)  
    # *****
    
    # read model
    model = keras.models.load_model(file_model, custom_objects={'f1':f1})
    # *****
    
    # read test list
    df_label = pd.read_csv(file_label_test, header=None)
    df_label.columns = ["name"]
    #df_label.loc[df_label.loc[:,"name"]=="video49_right", "name"] = "video49" ### ***************** ###
    # read frame list 
    df_frames = pd.read_csv(file_frame_list)
    # test set and frames
    df_label = pd.merge(df_label, df_frames, on="name")
    display(df_label)
    
    name_list = df_label.loc[:,"name"].values
    
    length = len(name_list)
    #length =10
    
    # predict and save per file
    for i in tqdm(range(length)):
        name = name_list[i]
        # create dummy data frame
        frame_num = int(df_label.loc[df_label.loc[:, "name"] == name, "video frames"].values.ravel()[0]) + 1
        df_dummy = pd.DataFrame(np.arange(frame_num).reshape(-1,1), columns = ["frame"])
        df_dummy["expr"] = -1
        df_dummy.set_index("frame", drop=True, inplace=True)
        # read data
        file_data = dir_data + "\\" + name + ".h5"
        df_data = pd.read_hdf(file_data)
        df_data.set_index("frame", drop=True, inplace=True)
        file_audio = dir_audio + "\\" + name + ".h5"
        df_audio = pd.read_hdf(file_audio)
        df_audio.set_index("frame", drop=True, inplace=True)
        # merged
        #df_merge = pd.merge(df_dummy, df_data, on="frame", how="outer")
        df_merge = df_dummy.join([df_data, df_audio], how="outer")
        # interpolate 30 frame back
        df_merge.interpolate(method="index", limit=30, limit_direction='backward', inplace=True)
        # normalize per single subject
        # create feture data (predict from all subjects )
        np_x = df_merge.loc[:, df_merge.columns.str.contains("AU|pose_R|gaze|vgg-")].values
        pre_x = pre_model.predict(np_x)
        df = pd.DataFrame(pre_x)
        # create normalized feature data from single subject
        np_mean_sub = np.nanmean(pre_x, axis=0)
        np_std_sub = np.nanstd(pre_x, axis=0, ddof=1)
        np_x_sub = (pre_x - np_mean_sub)/np_std_sub
        df_sub = pd.DataFrame(np_x_sub)
        df_sub.replace(np.inf, 9999, inplace=True)
        df_sub.fillna(0, inplace=True)
        df_sub.mask(df_sub > 5, 5, inplace=True)
        #df_sub.mask(df_sub < -5, -5, inplace=True)
        df_sub.columns = ["sub-" + str(n) for n in df_sub.columns.values]
        
        np_audio = df_merge.loc[:, df_merge.columns.str.contains("audio")].values
        np_audio_mean = np.nanmean(np_audio, axis=0)
        np_audio_std = np.nanstd(np_audio, axis=0, ddof=1)
        np_audio_sub = (np_audio - np_audio_mean)/np_audio_std
        df_audio_sub = pd.DataFrame(np_audio_sub)
        df_audio_sub.replace(np.inf, 9999, inplace=True)
        df_audio_sub.fillna(0, inplace=True)
        df_audio_sub.mask(df_audio_sub > 5, 5, inplace=True)
        df_audio_sub.mask(df_audio_sub < -5, -5, inplace=True)
        df_audio_sub.columns = ["subau-" + str(n) for n in df_audio_sub.columns.values]
        
        # create all merged data
        #df = pd.concat([df, df_sub, df_merge.loc[:, df_merge.columns.str.contains("expr")]], axis=1)
        #df = pd.DataFrame(pre_x)
        df = pd.concat([df, df_sub, 
                        df_merge.loc[:, df_merge.columns.str.contains("audio")], df_audio_sub, 
                        df_merge.loc[:, df_merge.columns.str.contains("expr")]], axis=1)
        
        
        # create GRU features
        batch_length = 90
        feat_size = 1200
        np_x_list = []
        # create line * batch_length * feat_size data
        for i in range(len(df)):
            np_tmp = np.zeros((batch_length, feat_size))
            if i-batch_length+1 < 0:
                np_tmp = np.zeros((batch_length-i-1, feat_size))
                np_tmp2 = df.iloc[0:i+1, 0:feat_size].values
                np_tmp = np.append(np_tmp, np_tmp2, axis=0)
            else:
                np_tmp = df.iloc[i-batch_length+1:i+1, 0:feat_size].values

            np_tmp = np_tmp.astype(np.float32)
            np_tmp = np_tmp[np.newaxis, ::6, :]
            np_tmp = np.nan_to_num(np_tmp)
            np_x_list.append(np_tmp)
            
        np_test_x = np.concatenate([x for x in np_x_list], 0)
        np_x_list = None
        
        # predict
        pred_nn = model.predict(np_test_x)
        pred_one = np.argmax(pred_nn, axis=1).ravel()
        df["expr"] = pred_one
        list_out = df["expr"].values.tolist()
        
        # save result
        file_out = dir_out + "\\" + name + ".txt"
        f = open(file_out, 'w')
        f.write("Neutral,Anger,Disgust,Fear,Happiness,Sadness,Surprise\n")
        for x in list_out:
            f.write(str(x) + "\n")
        f.close()

    
    #df.to_csv("C:\\Users\\sense\\Desktop\\for_move\\sample.csv")
    display([np_test_x.shape, np_test_x.nbytes])
    display(df)
    #display(list_out)
    
    
    # Neutral,Anger,Disgust,Fear,Happiness,Sadness,Surprise
    
    

In [None]:
if __name__ == "__main__":
    main()