In [None]:
import os
import json
from collections import Counter
import soundfile as sf                                                     
from tqdm import tqdm
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import librosa
import random
from sklearn.metrics import confusion_matrix, f1_score
import seaborn as sns
import tensorflow as tf
import pickle
from tensorflow import keras

In [None]:
#choose 25 actors from 158 actors
df = pd.DataFrame(x)
df['actor'] = df['root'].apply(lambda x: x.split('_')[2])
df['condition'] = df['root'].apply(lambda x: 'zoom' if 'z' == x.split('/')[3][0] else 'studio')
df['microphone'] = df['root'].apply(lambda x: x.split('_')[1])
samples = random.sample(list(df['actor'].unique()),k=25)
test = df[df['actor'].isin(samples)].reset_index(drop=True)
print(test['emotion'].value_counts())
train = df[~df['actor'].isin(samples)].reset_index(drop=True)
test.to_csv('test_actor.csv',index=False)
train.to_csv('train_actor.csv', index=False)

In [None]:
def majority_eval(pred_sequences):
    out = []
    for i in pred_sequences:
        emotions = dict(Counter(i))
        y_pred = sorted(emotions, key=lambda k: emotions[k],reverse=True)[0]
        out.append(y_pred)
    return out

def read_feature(fname, offset, sr=8000, frame_size=0.6, hop=0.3, n_fft=200, hop_length=80):
    tmp_frame_size = int(frame_size*sr)
    wav_data, _ = librosa.load(fname, sr=sr, res_type='kaiser_fast',duration=frame_size, offset=offset)
    wav_data = librosa.effects.preemphasis(wav_data)
    if wav_data.shape[0] != tmp_frame_size:
        wav_data = np.pad(wav_data, (0, tmp_frame_size - wav_data.shape[0]), 'constant', constant_values=(0, 0))
    if tmp_frame_size != wav_data.shape[0]:
        print('bug dimension!!!!!', fname)
    return librosa.feature.melspectrogram(y=wav_data, sr=sr, n_fft=n_fft, hop_length=hop_length)

def prepare_test_set(filename, min_duration=0.3, sr=8000, frame_size=0.6,hop=0.3, n_fft=200, hop_length=80):
    test_mapper = {'root': [], 'mel_spec': [],'start':[], 'end':[], 'emotion':[]}
    test = pd.read_csv(filename)
    with tqdm(total = len(test)) as pbar:
        for row in test.itertuples(index=False):
            if row.duration >= min_duration:
                num_iter = int(1 + (row.duration - min_duration)//hop)
                # print(row.duration, min_duration, hop, num_iter)
                for i in range(num_iter):
                    tmp_mel = read_feature(row.root, hop*i, sr=sr, frame_size=frame_size, hop=hop, n_fft=n_fft, hop_length=hop_length)
                    test_mapper['root'].append(row.root)
                    test_mapper['mel_spec'].append(tmp_mel)
                    test_mapper['start'].append(hop*i)
                    test_mapper['end'].append(min(hop*i+frame_size,row.duration))
                    test_mapper['emotion'].append(row.emotion)
            else:
                print('very short file', row.root, root.duration)
            pbar.update(1)
    return test_mapper

def prepare_train_set(df, min_duration=0.3, sr=8000, frame_size=0.6,hop=0.3, n_fft=200, hop_length=80, 
                      labels={'Neutral':0, 'Frustrated':1, 'Angry':2, 'Happy':3, 'Sad':4}):
    x = []
    y = []
    durations = list(df['duration'])
    roots = list(df['root'])
    emotions = list(df['emotion'])
    with tqdm(total=len(df)) as pbar:
        for i in range(len(emotions)):
            tmp_mel = read_feature(roots[i], random.random()*(durations[i]-min_duration), sr=sr, frame_size=frame_size, hop=hop, n_fft=n_fft, hop_length=hop_length)
            x.append(tmp_mel)
            y.append(labels[emotions[i]])
            pbar.update(1)
    x = np.reshape(np.stack(x), (len(x),1,x[0].shape[0],x[0].shape[1]))
    return x, y
def eval_result(test_mapper, 
                emo_pred, 
                label_filename, 
                labels=['Neutral', 'Frustrated', 'Angry', 'Happy', 'Sad'],
                verbose=0):
    df = pd.read_csv(label_filename)
    tmp_df = pd.DataFrame({'root':test_mapper['root'],
                         'start':test_mapper['start'],
                         'end':test_mapper['end'],
                         'emotion':test_mapper['emotion'],
                         'emo_pred':emo_pred})
    y_pred = tmp_df[['root', 'emo_pred']].groupby(['root', 'emo_pred']).size().groupby(level=0).idxmax().apply(lambda x: x[1]).reset_index(name='pred_emotion').rename(columns={'root':'root2'})
    df = df.merge(y_pred, left_on='root', right_on='root2').drop('root2',axis=1)
    if verbose:
        print()
        print('################## Error analysis #################################')
        print()
    conf_mat = confusion_matrix(df['emotion'], df['pred_emotion'], labels=labels)
    f1s = f1_score(df['emotion'], df['pred_emotion'],average=None)
    if verbose:
        print(labels, f1s, 'avg:', np.mean(f1s))
        sns.heatmap(conf_mat, xticklabels=labels, yticklabels=labels, annot=True)
        plt.show()
    # conf_mat = confusion_matrix(df['emotion'], df['pred_emotion'], labels=labels, normalize='true')
    # print(conf_mat)
    # sns.heatmap(conf_mat, xticklabels=labels, yticklabels=labels, annot=True)
    # plt.show()
    if verbose:
        print()
        print('################## Duration analysis #################################')
        print()
    thres1, thres2 = np.percentile(df['duration'],[33,66])
    tmp = df[df['duration']<=thres1][['emotion', 'pred_emotion']]
    conf_mat = confusion_matrix(tmp['emotion'], tmp['pred_emotion'], labels=labels)
    if verbose:
        print('for duration <=', thres1)
        sns.heatmap(conf_mat, xticklabels=labels, yticklabels=labels, annot=True)
        plt.show()
    tmp = df[(df['duration']<=thres2)&(df['duration']>thres1)][['emotion', 'pred_emotion']]

    conf_mat = confusion_matrix(tmp['emotion'], tmp['pred_emotion'], labels=labels)
    if verbose:
        print('for', thres1, '< duration <=', thres2)
        sns.heatmap(conf_mat, xticklabels=labels, yticklabels=labels, annot=True)
        plt.show()
    tmp = df[df['duration']>thres2][['emotion', 'pred_emotion']]
    conf_mat = confusion_matrix(tmp['emotion'], tmp['pred_emotion'], labels=labels)
    if verbose:
        print('for duration >', thres2)
        sns.heatmap(conf_mat, xticklabels=labels, yticklabels=labels, annot=True)
        plt.show()
        print()
        print('################## Microphone analysis #################################')
        print()
    for i in df['microphone'].unique():

        tmp = df[df['microphone']==i][['emotion', 'pred_emotion']]
        conf_mat = confusion_matrix(tmp['emotion'], tmp['pred_emotion'], labels=labels)
        if verbose:
              print('for microphone:', i)
              sns.heatmap(conf_mat, xticklabels=labels, yticklabels=labels, annot=True)
              plt.show()
    if verbose:
        print()
        print('################## Condition analysis #################################')
        print()
    for i in df['condition'].unique():
        tmp = df[df['condition']==i][['emotion', 'pred_emotion']]
        conf_mat = confusion_matrix(tmp['emotion'], tmp['pred_emotion'], labels=labels)
        if verbose:
              print('for condition:', i)
              sns.heatmap(conf_mat, xticklabels=labels, yticklabels=labels, annot=True)
              plt.show()
    return df, f1s



In [None]:
def build_model(input_shape, num_classes=5):
    inputs = keras.layers.Input(shape=input_shape, name="input")

    x = keras.layers.Conv2D(64,kernel_size=3,padding="same",activation='relu',use_bias=True,kernel_regularizer=None,bias_regularizer=None,activity_regularizer=None)(inputs)
    x = keras.layers.Conv2D(32,kernel_size=3,padding="same",activation='relu',use_bias=True,kernel_regularizer=None,bias_regularizer=None,activity_regularizer=None)(x)
    x = keras.layers.GlobalAveragePooling2D()(x)
    x = keras.layers.Dense(32, activation="relu")(x)
    # x = keras.layers.Dense(128, activation="relu")(x)
    outputs = keras.layers.Dense(num_classes, activation="softmax", name="output")(x)
    return keras.models.Model(inputs=inputs, outputs=outputs)

In [None]:
keras.backend.clear_session()
model = build_model((1,128, 61), num_classes=5)
model.summary()