# Manifold Learning for Speech Emotion Recognition
## Efthymios Tzinis

In [None]:
# Load the appropriate modules 
import os, sys, glob
import numpy as np
sys.path.append('../')
import config
sys.path.append(config.BASE_PATH)
from dataloader import fused_features_IEMOCAP as IEMOCAP_loader

sys.path.append(config.PATTERN_SEARCH_MDS_PATH)

In [None]:
# Session Folds Generator
def get_dataset_in_one_array(features_dic,
                             included_sessions=['Ses01', 'Ses02']):
    speaker_indices = {}
    x_all_list = []
    Y_all = []
    prev_ind = 0
    for te_speaker, te_data in features_dic.items():  
        ses_name = te_speaker[:-1]
        if not ses_name in included_sessions:
            continue
        x_all_list.append(te_data['x'])
        Y_all += te_data['y']
        this_speaker_samples = len(te_data['y'])
        
        speaker_indices[te_speaker] = (prev_ind, prev_ind + this_speaker_samples)
        prev_ind += this_speaker_samples
        X_all = np.concatenate(x_all_list, axis=0)
    return X_all, Y_all, speaker_indices, len(included_sessions)


def generate_session_folds(X_all, Y_all, features_dic, speaker_indices):
    sorted_speakers = sorted(speaker_indices)
    for i in np.arange(0, len(sorted_speakers), 2):
        sp1 = sorted_speakers[i]
        sp2 = sorted_speakers[i+1]
        
        session_name = sp1[:-1]
        
        st1, et1 = speaker_indices[sp1]
        st2, et2 = speaker_indices[sp2]
        
        Y_te = features_dic[sp1]['y'] + features_dic[sp2]['y']
        X_te = np.concatenate([X_all[st1:et1, :], X_all[st2:et2, :]], axis=0)
        
        x_tr_list = []
        Y_tr = []
        for sp in sorted_speakers:
            if sp == sp1 or sp == sp2:
                continue
            st, et = speaker_indices[sp] 
            x_tr_list.append(X_all[st:et, :])
            Y_tr += features_dic[sp]['y']
            
        X_tr = np.concatenate(x_tr_list, axis=0)    
        
        yield session_name, X_te, Y_te, X_tr, Y_tr 

def generate_folds(features_dic,
                   group_by = 'speaker'):
    if group_by == 'speaker':
        for te_speaker, te_data in features_dic.items():
            x_tr_list = []
            Y_tr = []
            for tr_speaker, tr_data in features_dic.items():
                if tr_speaker == te_speaker:
                    continue
                x_tr_list.append(tr_data['x'])
                Y_tr += tr_data['y']

            X_tr = np.concatenate(x_tr_list, axis=0)
            yield te_speaker, te_data['x'], te_data['y'], X_tr, Y_tr
     
    elif group_by == 'session':
        already_tested = []
        for te_speaker, te_data in features_dic.items():
            if not (te_speaker[:-1] in already_tested) :
                already_tested.append(te_speaker[:-1])
            else:
                continue
            X_val =  te_data['x']
            Y_val = te_data['y']
            x_tr_list = []
            Y_tr = []
            ses_name = te_speaker[:-1]
            for tr_speaker, tr_data in features_dic.items():
                if tr_speaker == te_speaker:
                    continue
                if tr_speaker[:-1] == ses_name:
                    val_speaker = tr_speaker
                    X_val = tr_data['x']
                    Y_val = tr_data['y']
                    continue
                x_tr_list.append(tr_data['x'])
                Y_tr += tr_data['y']

            X_tr = np.concatenate(x_tr_list, axis=0)
            X_ses = np.concatenate([te_data['x'], X_val], axis=0)
            Y_ses = te_data['y'] + Y_val
            yield ses_name, X_ses, Y_ses, X_tr, Y_tr
            
def fuse_excited_happiness(l):
    return ['happy + excited' 
            if (e == 'excited' or e == 'happy') 
            else e for e in l ]


In [None]:
# Initialize all avaiulable Manifold Methods
import multidimensional
import multidimensional.common
import multidimensional.mds 
import multidimensional.smacof
from sklearn import manifold, decomposition

class IdentityData(object):
    def __init__(self):
        pass 
    
    def fit_transform(self, x):
        return x

def get_manifold_methods(target_dim):
    method_n_comp = 66
    radius_barrier = 1e-3
    explore_dim_percent = .9
    starting_radius = 32
    max_turns = 10000
    point_filter = (multidimensional.point_filters.FixedStochasticFilter(keep_percent=1, recalculate_each=10))
    radius_update = (multidimensional.radius_updates.AdaRadiusHalving(tolerance=.5*1e-3, burnout_tolerance=100000))

    mds_obj = multidimensional.mds.MDS(target_dim, point_filter, radius_update, starting_radius=starting_radius, 
                                       radius_barrier=radius_barrier,
                max_turns=max_turns, keep_history=False,
                explore_dim_percent=explore_dim_percent)

    manifold_methods = {
        'Pattern Search MDS': { 'results': {}, 'object': multidimensional.mds.MDS(target_dim, point_filter, 
                                                         radius_update, starting_radius=starting_radius, 
                                                         radius_barrier=radius_barrier, max_turns=max_turns, 
                                                         keep_history=False,
                                                         dissimilarities='precomputed',
                                                         explore_dim_percent=explore_dim_percent)},
        'MDS SMACOF': { 'results': {}, 'object': multidimensional.smacof.MDS(n_components=target_dim, n_init=1, 
                                                 max_iter=max_turns, dissimilarity='euclidean', n_jobs=8)},
        'LTSA': { 'results': {}, 'object': manifold.LocallyLinearEmbedding(method_n_comp, target_dim, 
                                           eigen_solver='auto', method='ltsa',n_jobs=8)},
        'Modified LLE': { 'results': {}, 'object': manifold.LocallyLinearEmbedding(method_n_comp, target_dim, 
                                           eigen_solver='auto', method='modified',n_jobs=8)},
        'Hessian LLE': { 'results': {}, 'object': manifold.LocallyLinearEmbedding(method_n_comp, target_dim, 
                                           eigen_solver='auto', method='hessian',n_jobs=8)},
        'LLE': { 'results': {}, 'object': manifold.LocallyLinearEmbedding(method_n_comp, target_dim, 
                                           eigen_solver='auto', method='standard',n_jobs=8)},
        'Truncated SVD': { 'results': {}, 'object': decomposition.TruncatedSVD(n_components=target_dim)},
        'Spectral Embedding': { 'results': {}, 'object': manifold.SpectralEmbedding(n_components=target_dim, 
                                                                                    n_jobs=8)},
        'TSNE': { 'results': {}, 'object': manifold.TSNE(n_components=target_dim)},
        'ISOMAP': { 'results': {}, 'object': manifold.Isomap(12, target_dim)},
        'Original Data': { 'results': {}, 'object': IdentityData()}

    }
    return manifold_methods

In [None]:
# Find the best performing nonlinear features for KNN classification after dimensionality reduction
from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix 
from sklearn.preprocessing import StandardScaler
from sklearn.neighbors import KNeighborsClassifier
import pprint 
import pandas as pd 

def run_IEMOCAP_session_KNN(n_neighbors, target_dims, methods_to_test, data_dic, included_sessions):

    X_all, Y_all, speaker_indices, number_of_sessions = get_dataset_in_one_array(data_dic,
                                                                                included_sessions=included_sessions)
    df_results = {}
    # normalize the input vectors 
    X_high = StandardScaler().fit_transform(X_all)
    
    print X_high.shape 

    for target_dim in target_dims:
        manifold_methods = get_manifold_methods(target_dim)
    #     methods_to_test = manifold_methods.keys()
        methods_metrics = {}
        for selected_method in methods_to_test:
            metrics_l = {'uw_acc': dict([(k, 0.0) for k in n_neighbors]), 'w_acc': dict([(k, 0.0) for k in n_neighbors])}
            print 'Checking Method: {}'.format(selected_method)
            obj = manifold_methods[selected_method]['object']
            
            print 'Reducing Input from Dimension: {} to a Lower Embedded Manifold with dimensions: {}...'.format(
                   X_high.shape[1], target_dim)
            
            try:
                X_low = obj.fit_transform(X_high)
            except Exception as e:
                print e
                methods_metrics[selected_method+' UA'] = metrics_l['uw_acc']
                methods_metrics[selected_method+' WA'] = metrics_l['w_acc']
                continue                

            for k in n_neighbors:
    #             print 'Testing for Nearest Neighbors: K={}'.format(k)
                knn = KNeighborsClassifier(n_neighbors=k, weights='uniform', algorithm='brute', leaf_size=30, 
                                           p=2, metric='minkowski', metric_params=None, n_jobs=8)

                session_folds = generate_session_folds(X_low, Y_all, data_dic, speaker_indices)
                for session, X_te, Y_te, X_tr, Y_tr in session_folds:
    #                 print "Testing for Session: {}".format(session)
                    Y_te, Y_tr = fuse_excited_happiness(Y_te), fuse_excited_happiness(Y_tr)
                    
                    try:
                        knn.fit(X_tr, Y_tr) 
                        Y_predicted = knn.predict(X_te)

                        w_acc = accuracy_score(Y_predicted, Y_te)
                        cmat = confusion_matrix(Y_te, Y_predicted)
                        with np.errstate(divide='ignore'):
                            uw_acc = (cmat.diagonal() / (1.0 * cmat.sum(axis=1) + 1e-6)).mean()
                            if np.isnan(uw_acc):
                                uw_acc = 0.
                        w_acc = round(w_acc*100,1)
                        uw_acc = round(uw_acc*100,1)
                        metrics_l['uw_acc'][k] += uw_acc/number_of_sessions
                        metrics_l['w_acc'][k] += w_acc/number_of_sessions
                    except:
                        metrics_l['uw_acc'][k] += 0.
                        metrics_l['w_acc'][k] += 0.
    #             print 'Done'
            methods_metrics[selected_method+' UA'] = metrics_l['uw_acc']
            methods_metrics[selected_method+' WA'] = metrics_l['w_acc']
#             pprint.pprint(metrics_l)

        df = pd.DataFrame.from_dict(methods_metrics, orient="index")
        df_results[target_dim] = df[sorted(df.columns)]
        
    return df_results

In [None]:
# Define parameters for IEMOCAP Session Experiments 
n_neighbors = np.arange(1, 40, 4)
target_dims = [2, 5, 10]

# Find all appropriate files 
IEMOCAP_data_path = '/home/thymios/all_TRUE_IEMOCAP_feats/'
l_feats_p = IEMOCAP_data_path + 'linear/IEMOCAP_linear_emobase2010'
# nl_feats_l = glob.glob( IEMOCAP_data_path + '/utterance/*.dat')
# nl_feats_p = nl_feats_l.pop()
nl_feats_p = os.path.join(IEMOCAP_data_path, 
             'utterance/IEMOCAP-rqa-ad_hoc-tau-7-supremum-recurrence_rate-0.15-dur-0.03-fs-16000.dat')


In [None]:
included_sessions=['Ses01', 'Ses02', 'Ses03', 'Ses04', 'Ses05']
methods_to_test = ['Truncated SVD', 'Spectral Embedding', 
                   'LLE', 'Hessian LLE', 'Modified LLE', 'LTSA']   
# methods_to_test = ['Truncated SVD', 'Spectral Embedding']
data_dic = IEMOCAP_loader.get_fused_features([l_feats_p, nl_feats_p])
original_results = run_IEMOCAP_session_KNN(n_neighbors, [2014], ['Original Data'], data_dic, included_sessions)
fused_results = run_IEMOCAP_session_KNN(n_neighbors, target_dims, methods_to_test, data_dic, included_sessions)

In [None]:
from IPython.display import display
for target_dim in sorted(original_results.keys()):
    df = original_results[target_dim]
    print "Using Original Data"
    print display(df)

for target_dim in sorted(fused_results.keys()):
    df = fused_results[target_dim]
    print "For Target Dimension: {}".format(target_dim)
    print display(df)

In [None]:
# create the experiment for EmoDB Speaker independent Experiments
def get_dataset_for_all_speakers(features_dic):
    speaker_indices = {}
    x_all_list = []
    Y_all = []
    prev_ind = 0
    for te_speaker, te_data in features_dic.items():  
        x_all_list.append(te_data['x'])
        Y_all += te_data['y']
        this_speaker_samples = len(te_data['y'])
        
        speaker_indices[te_speaker] = (prev_ind, prev_ind + this_speaker_samples)
        prev_ind += this_speaker_samples
        X_all = np.concatenate(x_all_list, axis=0)
    number_of_speakers = len(features_dic.keys())
    return X_all, Y_all, speaker_indices, number_of_speakers

def generate_speaker_independent_folds(X_all, Y_all, features_dic, speaker_indices):
    sorted_speakers = sorted(speaker_indices.keys())
    for (te_speaker, (st, et)) in speaker_indices.items():
        Y_te = Y_all[st:et]
        X_te = X_all[st:et, :]
        
        x_tr_list = []
        Y_tr = []
        for sp in sorted_speakers:
            if sp == te_speaker:
                continue
            st, et = speaker_indices[sp] 
            x_tr_list.append(X_all[st:et, :])
            Y_tr += Y_all[st:et]
        X_tr = np.concatenate(x_tr_list, axis=0)    
        
        yield te_speaker, X_te, Y_te, X_tr, Y_tr 


def run_speaker_independent_KNN(n_neighbors, target_dims, methods_to_test, data_dic):

    X_all, Y_all, speaker_indices, number_of_speakers = get_dataset_for_all_speakers(data_dic)
    df_results = {}
    # normalize the input vectors 
    X_high = StandardScaler().fit_transform(X_all)
    
    print X_high.shape 

    for target_dim in target_dims:
        print "Running for Target Dimensions={}".format(target_dim)
        manifold_methods = get_manifold_methods(target_dim)
        methods_metrics = {}
        for selected_method in methods_to_test:
            metrics_l = {'uw_acc': dict([(k, 0.0) for k in n_neighbors]), 
                         'w_acc': dict([(k, 0.0) for k in n_neighbors])}
            print 'Checking Method: {}'.format(selected_method)
            try:
                print 'Reducing Input from Dimension: {} to a Lower Embedded Manifold with dimensions: {}...'.format(
                   X_high.shape[1], target_dim)
                obj = manifold_methods[selected_method]['object']
                if selected_method == 'Pattern Search MDS':
                    d_goal = multidimensional.common.DISTANCE_MATRIX(X_high.astype(np.float64))
                    X_low = obj.fit_transform(d_goal)
                else:
                    X_low = obj.fit_transform(X_high)

            except:
                methods_metrics[selected_method+' UA'] = metrics_l['uw_acc']
                methods_metrics[selected_method+' WA'] = metrics_l['w_acc']
                continue                    
            
            for k in n_neighbors:
                knn = KNeighborsClassifier(n_neighbors=k, weights='uniform', algorithm='brute', leaf_size=30, 
                                           p=2, metric='minkowski', metric_params=None, n_jobs=8)

                speaker_folds = generate_speaker_independent_folds(X_low, Y_all, data_dic, speaker_indices)
                for te_speaker, X_te, Y_te, X_tr, Y_tr in speaker_folds:                    
                    try:
                        knn.fit(X_tr, Y_tr) 
                        Y_predicted = knn.predict(X_te)

                        w_acc = accuracy_score(Y_predicted, Y_te)
                        cmat = confusion_matrix(Y_te, Y_predicted)
                        with np.errstate(divide='ignore'):
                            uw_acc = (cmat.diagonal() / (1.0 * cmat.sum(axis=1) + 1e-6)).mean()
                            if np.isnan(uw_acc):
                                uw_acc = 0.
                        w_acc = round(w_acc*100,0)
                        uw_acc = round(uw_acc*100,)
                        metrics_l['uw_acc'][k] += uw_acc/number_of_speakers
                        metrics_l['w_acc'][k] += w_acc/number_of_speakers
                    except:
                        metrics_l['uw_acc'][k] += 0.
                        metrics_l['w_acc'][k] += 0.
                    
            methods_metrics[selected_method+' UA'] = metrics_l['uw_acc']
            methods_metrics[selected_method+' WA'] = metrics_l['w_acc']

        df = pd.DataFrame.from_dict(methods_metrics, orient="index")
        df_results[target_dim] = df[sorted(df.columns)]
        
    return df_results

In [None]:
# Define parameters for Speaker Independent Experiments 
n_neighbors = np.arange(1, 40, 4)
target_dims = [2, 5, 10, 25]

# Find all appropriate files 
data_path = '/home/thymios/all_BERLIN_features/'
berlin_l_feats_p = data_path + 'linear/BERLIN_linear_emobase2010'
# nl_feats_l = glob.glob( IEMOCAP_data_path + '/utterance/*.dat')
# nl_feats_p = nl_feats_l.pop()
berlin_nl_feats_p = os.path.join(data_path, 
             'rqa/utterance/BERLIN-rqa-ad_hoc-tau-7-manhattan-recurrence_rate-0.15-dur-0.02-fs-16000.dat')


In [None]:
methods_to_test = ['Pattern Search MDS', 'MDS SMACOF','Truncated SVD', 'Spectral Embedding', 'LLE', 
                   'Hessian LLE', 'Modified LLE', 'LTSA', 'ISOMAP']   
# methods_to_test = ['Pattern Search MDS']
# methods_to_test = ['Truncated SVD']

data_dic = IEMOCAP_loader.get_fused_features([berlin_nl_feats_p])
berlin_original_nl_results = run_speaker_independent_KNN(n_neighbors, [2014], ['Original Data'], data_dic)
berlin_nl_results = run_speaker_independent_KNN(n_neighbors, target_dims, methods_to_test, data_dic)

data_dic = IEMOCAP_loader.get_fused_features([berlin_l_feats_p, berlin_nl_feats_p])
berlin_original_fused_results = run_speaker_independent_KNN(n_neighbors, [2014], ['Original Data'], data_dic)
berlin_fused_results = run_speaker_independent_KNN(n_neighbors, target_dims, methods_to_test, data_dic)

data_dic = IEMOCAP_loader.get_fused_features([berlin_l_feats_p])
berlin_original_l_results = run_speaker_independent_KNN(n_neighbors, [2014], ['Original Data'], data_dic)
berlin_l_results = run_speaker_independent_KNN(n_neighbors, target_dims, methods_to_test, data_dic)

In [None]:
from IPython.display import display
print "Using RQA Feature Set and Dimensionality Reduction..."

def latex_preformat_print(df):
    methods = {}
    for ind in df.index.values:
        if not ind[:-3] in methods and ind[-2:] == 'WA':
            methods[ind[:-3]] = list(df[[1,5,9,13,17,21]].loc[ind])
    for ind in df.index.values:
        if ind[-2:] == 'UA':
            methods[ind[:-3]] += list(df[[1,5,9,13,17,21]].loc[ind])
    
    df = pd.DataFrame.from_dict(methods, orient="index")
    print df.to_latex()

for target_dim in sorted(berlin_original_nl_results.keys()):
    df = berlin_original_nl_results[target_dim]
    print "Using Original Data"
    print display(df)
    latex_preformat_print(df)
    
for target_dim in sorted(berlin_nl_results.keys()):
    df = berlin_nl_results[target_dim]
    print "For Target Dimension: {}".format(target_dim)
    print display(df)
    latex_preformat_print(df)

In [None]:
from IPython.display import display
print "Using Emobase Features and Dimensionality Reduction..."

for target_dim in sorted(berlin_original_l_results.keys()):
    df = berlin_original_l_results[target_dim]
    print "Using Original Data"
    print display(df)
    latex_preformat_print(df)

for target_dim in sorted(berlin_l_results.keys()):
    df = berlin_l_results[target_dim]
    print "For Target Dimension: {}".format(target_dim)
    print display(df)
    latex_preformat_print(df)

In [None]:
from IPython.display import display
print "Using Fused Features and Dimensionality Reduction..."

for target_dim in sorted(berlin_original_fused_results.keys()):
    df = berlin_original_fused_results[target_dim]
    print "Using Original Data"
    print display(df)
    latex_preformat_print(df)

for target_dim in sorted(berlin_fused_results.keys()):
    df = berlin_fused_results[target_dim]
    print "For Target Dimension: {}".format(target_dim)
    print display(df)
    latex_preformat_print(df)