# Imports

In [None]:
import librosa
import librosa.display
import os
import glob
import pickle

In [None]:
  !pip install python_speech_features



In [None]:
import wave
import python_speech_features as ps

In [None]:
import itertools
import random
import numpy as np
import pandas as pd
import statistics 
import scipy.stats
import math

In [None]:
import tensorflow as tf
from tensorflow.keras import Input, layers, Model, utils, initializers, losses, optimizers, Sequential, callbacks, backend
from keras.utils import conv_utils

In [None]:
import matplotlib.pyplot as plt
from sklearn.metrics import recall_score as recall
from sklearn.metrics import confusion_matrix as confusion

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
%cd '/content/drive/My Drive/BTP - Dev Priya and Kushagra/Speech Emotion Recognition/'

/content/drive/My Drive/BTP - Dev Priya and Kushagra/Speech Emotion Recognition


In [None]:
dataset_dir = 'Datasets/SincNet'
ser_output_dir = 'Final Outputs/SincNet'

In [None]:
speaker_list = ['1F', '1M', '2F', '2M', '3F', '3M', '4F', '4M', '5F', '5M']

# Preprocessing 

## For Sincnet Raw features

In [None]:
class PreProcess:

    def __init__(self, root_dir, wav_files=None):
        self.eps = 1e-5
        self.sampling_rate = 16000
        self.segment_length = 4000
        self.sample_length = {'hap': 6000, 'ang': 8000, 'neu': 20000, 'sad': 16000}
        self.root_dir = root_dir
        self.valid_session = None
        self.valid_gender = None
        self.test_session = None
        self.test_gender = None
        self.output_file_name = None
        self.num_per_emo = None

        self.train_num = 0
        self.test_utterance_num = 0
        self.valid_utterance_num = 0
        self.test_segment_num = 0
        self.valid_segment_num = 0

        self.train_emt = {'hap': 0, 'ang': 0, 'neu': 0, 'sad': 0}
        self.test_emt = {'hap': 0, 'ang': 0, 'neu': 0, 'sad': 0}
        self.valid_emt = {'hap': 0, 'ang': 0, 'neu': 0, 'sad': 0}

        self.train_data = None
        self.test_data = None
        self.valid_data = None

        self.train_label = None
        self.test_label_utterance = None
        self.test_label_segment = None
        self.valid_label_utterance = None
        self.valid_label_segment = None

        self.test_segments_per_utterance = None
        self.valid_segments_per_utterance = None

        self.mean, self.std = 0, 0

        self.wav_files = wav_files
        self.read_IEMOCAP()
    
    def read_wav_file(self, wav_filename):
        """Read the audio files in wav format and store the wave data"""
        y, sr = librosa.load(wav_filename, sr=self.sampling_rate)
        return y

    def get_audio_length(self, wav_filename):
        """Read the audio files in wav format and store the wave data"""
        y, sr = librosa.load(wav_filename, sr=self.sampling_rate)
        return y.shape[0]

    @staticmethod
    def generate_label(emotion):
        if emotion == 'ang':
            return 0
        elif emotion == 'sad':
            return 1
        elif emotion == 'hap':
            return 2
        elif emotion == 'neu':
            return 3
        return 4

    @staticmethod
    def parse_emo_file(emo_file_name):
        emo_map = {}
        with open(emo_file_name, 'r') as emo_file:
            while True:
                line = emo_file.readline()
                if not line:
                    break
                if line[0] != '[':
                    continue
                t = line.split()
                emo_map[t[3]] = t[4]
        return emo_map

    def read_IEMOCAP(self):
        """Read the data files and generate a dict with generated features"""
        if self.wav_files is not None:
            return
        self.wav_files = {}
        for session in sorted(os.listdir(self.root_dir)):
            if session[0] != 'S':
                continue
            wav_dir = os.path.join(self.root_dir, session, 'sentences', 'wav')
            emo_labels_dir = os.path.join(self.root_dir, session, 'dialog', 'EmoEvaluation')
            for impro in sorted(os.listdir(wav_dir)):
                if impro[7] != 'i':
                    continue
                emo_file_name = os.path.join(emo_labels_dir, impro + '.txt')
                emo_map = self.parse_emo_file(emo_file_name)
                file_dir = os.path.join(wav_dir, impro, '*.wav')
                files = glob.glob(file_dir)
                print(file_dir)
                for filename in sorted(files):
                    wav_name = os.path.basename(filename)
                    wav_name = os.path.splitext(wav_name)[0]
                    emotion = emo_map[wav_name]
                    if emotion not in ['hap', 'ang', 'neu', 'sad']:
                        continue
                    len = self.get_audio_length(filename)
                    self.wav_files[wav_name] = {
                        'emotion': emotion,
                        'len': len,
                        'path': filename
                    }
        # print(self.wav_files)

    def update_count(self, emotion, set_type):
        if set_type == 'train':
            self.train_emt[emotion] += 1
            self.train_num += 1
        elif set_type == 'test':
            self.test_emt[emotion] += 1
            self.test_segment_num += 1
        else:
            self.valid_emt[emotion] += 1
            self.valid_segment_num += 1

    def initialize(self):
        self.train_data = np.empty((self.train_num, self.segment_length), dtype=np.float32)
        self.test_data = np.empty((self.test_segment_num, self.segment_length), dtype=np.float32)
        self.valid_data = np.empty((self.valid_segment_num, self.segment_length), dtype=np.float32)

        self.train_label = np.empty((self.train_num, 1), dtype=np.int8)
        self.test_label_segment = np.empty((self.test_segment_num, 1), dtype=np.int8)
        self.valid_label_segment = np.empty((self.valid_segment_num, 1), dtype=np.int8)
        self.test_label_utterance = np.empty((self.test_utterance_num, 1), dtype=np.int8)
        self.valid_label_utterance = np.empty((self.valid_utterance_num, 1), dtype=np.int8)

        self.test_segments_per_utterance = np.zeros((self.test_utterance_num, 1), dtype=np.int8)
        self.valid_segments_per_utterance = np.zeros((self.valid_utterance_num, 1), dtype=np.int8)

        self.train_num = 0
        self.test_segment_num = 0
        self.valid_segment_num = 0
        self.test_utterance_num = 0
        self.valid_utterance_num = 0

    def count_data(self):
        self.train_num = 0
        self.test_utterance_num = 0
        self.valid_utterance_num = 0
        self.test_segment_num = 0
        self.valid_segment_num = 0

        self.train_emt = {'hap': 0, 'ang': 0, 'neu': 0, 'sad': 0}
        self.test_emt = {'hap': 0, 'ang': 0, 'neu': 0, 'sad': 0}
        self.valid_emt = {'hap': 0, 'ang': 0, 'neu': 0, 'sad': 0}

        for wav_name in sorted(self.wav_files.keys()):
            len = self.wav_files[wav_name]['len']
            emotion = self.wav_files[wav_name]['emotion']
            set_type = self.find_set(wav_name)  # train/test/validation

            if set_type == 'train':
                if len <= self.sample_length[emotion]:
                    continue
                samples, rem = divmod(len, self.sample_length[emotion])
                start_times = [i * self.sample_length[emotion] for i in range(samples)]
                if rem >= self.segment_length:
                    start_times += [samples * self.sample_length[emotion]]
                end_times = [i + self.sample_length[emotion] for i in start_times]
            else:
                segments, rem = divmod(len, self.segment_length)
                start_times = [i * self.segment_length for i in range(segments)]
                if rem >= self.segment_length // 2:
                    start_times += [samples * self.segment_length[emotion]]
                end_times = [i + self.segment_length for i in start_times]
            
            for begin, end in zip(start_times, end_times):
                self.update_count(emotion, set_type)

            if set_type == 'validation':
                self.valid_utterance_num += 1
            if set_type == 'test':
                self.test_utterance_num += 1

        print(self.train_emt)
        print(self.test_emt)
        print(self.valid_emt)
        self.num_per_emo = min(self.train_emt['hap'], self.train_emt['sad'], 
                               self.train_emt['ang'], self.train_emt['neu'])
        
        print(self.num_per_emo)

    def add_to_set(self, part, emotion, set_type):
        # TODO extend it for test validation
        if set_type == 'train':
            self.train_data[self.train_num, :] = part.copy()
            self.train_label[self.train_num] = emotion
            self.train_num += 1
        elif set_type == 'test':
            self.test_data[self.test_segment_num, :] = part.copy()
            self.test_label_segment[self.test_segment_num] = emotion
            self.test_segments_per_utterance[self.test_utterance_num] += 1
            self.test_segment_num += 1
        else:
            self.valid_data[self.valid_segment_num, :] = part.copy()
            self.valid_label_segment[self.valid_segment_num] = emotion
            self.valid_segments_per_utterance[self.valid_utterance_num] += 1
            self.valid_segment_num += 1
        # print('\t'.join((str(train_num), wavname, '0', 'self.frame_num', emotion)))

    def find_set(self, wav_name):
        """returns whether the wav_name should be part of train/test/validation set"""
        if wav_name[4] == self.valid_session and wav_name[-4] == self.valid_gender:
            return 'validation'
        if wav_name[4] == self.test_session and wav_name[-4] == self.test_gender:
            return 'test'
        return 'train'

    def data_padding(self, data):
        """Padding short segments of data with 0s"""
        return np.pad(data, (0, self.segment_length - data.shape[0]), 'constant', constant_values=0)

    def generate_data(self):
        """generates train test validation sets before calculating zscore """
        for wav_name in sorted(self.wav_files.keys()):
            len = self.wav_files[wav_name]['len']
            emotion = self.wav_files[wav_name]['emotion']
            set_type = self.find_set(wav_name)  # train/test/validation
            filename = self.wav_files[wav_name]['path']

            part = self.read_wav_file(filename)

            if set_type == 'train':
                if len <= self.sample_length[emotion]:
                    continue
                samples = divmod(len, self.sample_length[emotion])[0] 
                start_times = [random.randint(i * self.sample_length[emotion], 
                                              (i+1)* self.sample_length[emotion]-self.segment_length) for i in range(samples)]
            else:
                segments = divmod(len, self.segment_length)[0] 
                start_times = [i * self.segment_length for i in range(segments)]
            
            end_times = [i + self.segment_length for i in start_times]

            emotion = self.generate_label(emotion)
            for begin, end in zip(start_times, end_times):
                self.add_to_set(part[begin:end], emotion, set_type) 

            if set_type == 'validation':
                self.valid_label_utterance[self.valid_utterance_num] = emotion
                self.valid_utterance_num += 1
            if set_type == 'test':
                self.test_label_utterance[self.test_utterance_num] = emotion
                self.test_utterance_num += 1

    def calculate_zscore(self):
        """calculates zscore from train data """
        self.mean = np.mean(self.train_data.reshape(self.train_num * self.segment_length),
                             axis=0)
        self.std = np.std(self.train_data.reshape(self.train_num * self.segment_length),
                           axis=0)

    def standardize_data(self):
        """Standardize train test validation sets after the calculation of zscore"""
        for i in range(self.train_num):
            self.train_data[i, :] = (self.train_data[i, :] - self.mean) / (self.std + self.eps)

        for i in range(self.test_segment_num):
            self.test_data[i, :] = (self.test_data[i, :] - self.mean) / (self.std + self.eps)

        for i in range(self.valid_segment_num):
            self.valid_data[i, :] = (self.valid_data[i, :] - self.mean) / (self.std + self.eps)

    def class_indices(self):
        """"Index of each emotion class instance in the training data"""
        hap_index = np.arange(self.train_emt['hap'])
        neu_index = np.arange(self.train_emt['neu'])
        sad_index = np.arange(self.train_emt['sad'])
        ang_index = np.arange(self.train_emt['ang'])

        h2 = 0
        a0 = 0
        n3 = 0
        s1 = 0

        for i in range(self.train_num):
            if self.train_label[i] == 0:
                ang_index[a0] = i
                a0 = a0 + 1
            elif self.train_label[i] == 1:
                sad_index[s1] = i
                s1 = s1 + 1
            elif self.train_label[i] == 2:
                hap_index[h2] = i
                h2 = h2 + 1
            elif self.train_label[i] == 3:
                neu_index[n3] = i
                n3 = n3 + 1

        return hap_index, sad_index, neu_index, ang_index

    def generate_training_batch(self, hap_index, sad_index, neu_index, ang_index):
        """Generating a training batch with self.frame_num segments from each emotion"""
        np.random.seed(0)
        np.random.shuffle(neu_index)
        np.random.seed(0)
        np.random.shuffle(hap_index)
        np.random.seed(0)
        np.random.shuffle(sad_index)
        np.random.seed(0)
        np.random.shuffle(ang_index)

        self.train_num = 4 * self.num_per_emo 

        train_label = np.empty((self.train_num, 1), dtype=np.int8)
        train_data = np.empty((self.train_num, self.segment_length), dtype=np.float32)
        train_data[0:self.num_per_emo] = self.train_data[hap_index[0:self.num_per_emo]].copy()
        train_label[0:self.num_per_emo] = self.train_label[hap_index[0:self.num_per_emo]].copy()
        train_data[self.num_per_emo:2 * self.num_per_emo] = self.train_data[sad_index[0:self.num_per_emo]].copy()
        train_label[self.num_per_emo:2 * self.num_per_emo] = self.train_label[sad_index[0:self.num_per_emo]].copy()
        train_data[2 * self.num_per_emo:3 * self.num_per_emo] = self.train_data[ang_index[0:self.num_per_emo]].copy()
        train_label[2 * self.num_per_emo:3 * self.num_per_emo] = self.train_label[ang_index[0:self.num_per_emo]].copy()
        train_data[3 * self.num_per_emo:] = self.train_data[neu_index[0:self.num_per_emo]].copy()
        train_label[3 * self.num_per_emo:] = self.train_label[neu_index[0:self.num_per_emo]].copy()
       
        arr = np.arange(self.train_num)
        np.random.seed(0)
        np.random.shuffle(arr)
        self.train_data = train_data[arr[0:]]
        self.train_label = train_label[arr[0:]]

    def preprocess(self, output_file_name, valid_session, valid_gender,
                   test_session, test_gender):
        """Process the audio files to generate train/test/validation data with extracted features"""
        # self.read_IEMOCAP()
        self.valid_session = valid_session
        self.valid_gender = valid_gender
        self.test_session = test_session
        self.test_gender = test_gender
        self.output_file_name = output_file_name

        self.count_data()
        self.initialize()
        self.generate_data()

        hap_index, sad_index, neu_index, ang_index = self.class_indices()
        self.generate_training_batch(hap_index, sad_index, neu_index, ang_index)

        self.calculate_zscore()
        self.standardize_data()

        f = open(self.output_file_name, 'wb')
        pickle.dump((
            self.train_data, self.train_label,
            self.test_data, self.test_label_utterance, self.test_label_segment, self.test_segments_per_utterance,
            self.valid_data, self.valid_label_utterance, self.valid_label_segment, self.valid_segments_per_utterance),
            f)
        f.close()

# Model

## SincNet: Convolution Layer

In [None]:
def sinc(band, t_right):

    y_right = tf.math.sin(2 * math.pi * band * t_right) / (2 * math.pi * band * t_right)
    y_left = tf.reverse(y_right, [0])
    y = tf.concat([y_left, tf.ones(1), y_right], 0)
    return y

In [None]:
class SincConv1D(tf.keras.layers.Layer):
    def __init__(self, *args, **kwargs):
        self.N_filt = kwargs.pop('N_filt')
        self.Filt_dim = kwargs.pop('Filt_dim')
        self.fs = kwargs.pop('fs')
        self.hidden_size = kwargs.pop('hidden_size')
        
        super(SincConv1D, self).__init__(*args, **kwargs)
        
        # The filters are trainable parameters.
    
        self.filt_b1 = tf.Variable(
            initializers.GlorotNormal(seed=0)(shape=[self.N_filt]), 
            dtype=tf.float32,
            trainable=True,
            name='filt_b1')
        self.filt_band = tf.Variable(
            initializers.GlorotNormal(seed=0)(shape=[self.N_filt]), 
            dtype=tf.float32,
            trainable=True,
            name='filt_band')
        # Mel Initialization of the filterbanks

        low_freq_mel = 80
        high_freq_mel = (2595 * np.log10(1 + (self.fs / 2) / 700))  # Convert Hz to Mel
        mel_points = np.linspace(low_freq_mel, high_freq_mel, self.N_filt)  # Equally spaced in Mel scale
        f_cos = (700 * (10**(mel_points / 2595) - 1)) # Convert Mel to Hz
        b1 = np.roll(f_cos, 1)
        b2 = np.roll(f_cos, -1)
        b1[0] = 30
        b2[-1] = (self.fs / 2) - 100
        self.freq_scale = self.fs * 1.0
        self.filt_b1.assign(tf.convert_to_tensor(b1/self.freq_scale, dtype=tf.float32))
        self.filt_band.assign(tf.convert_to_tensor((b2-b1)/self.freq_scale, dtype=tf.float32))
    
    def call(self, x, **kwargs):
        # Get beginning and end frequencies of the filters.
        min_freq = 50.0
        min_band = 50.0
        filt_beg_freq = tf.math.abs(self.filt_b1) + min_freq / self.freq_scale
        filt_end_freq = filt_beg_freq + (tf.math.abs(self.filt_band) + min_band / self.freq_scale)

        # Filter window (hamming).
        n = np.linspace(0, self.Filt_dim, self.Filt_dim)
        window = 0.54 - 0.46 * tf.math.cos(2 * math.pi * n / self.Filt_dim)
        window = tf.cast(tf.convert_to_tensor(window, dtype=tf.float64), tf.float32)

        # TODO what is this?
        t_right_linspace = np.linspace(1, (self.Filt_dim - 1) / 2, int((self.Filt_dim -1) / 2))
        t_right = tf.cast(tf.convert_to_tensor(t_right_linspace / self.fs, dtype=tf.float64), tf.float32)

        # Compute the filters.
        output_list = []

        for i in range(self.N_filt):
            low_pass1 = 2 * filt_beg_freq[i] * sinc(filt_beg_freq[i] * self.freq_scale, t_right)
            low_pass2 = 2 * filt_end_freq[i] * sinc(filt_end_freq[i] * self.freq_scale, t_right)
            band_pass= (low_pass2 - low_pass1)
            # print(band_pass.shape)
            band_pass = band_pass / tf.reduce_max(band_pass)
            output_list.append(band_pass * window)
        
        filters = tf.stack(output_list)

        # Reshape the filters. must have 3 dims
        filters = tf.reshape(filters, (self.N_filt, 1, self.Filt_dim))

        # Do the convolution.
        out = tf.nn.conv1d(
            input=x, 
            filters=filters,
            stride=1,
            padding='SAME',
        )
        return out
        
    def get_config(self):
        config = ({
            'N_filt': self.N_filt,
            'Filt_dim': self.Filt_dim,
            'fs': self.fs,
            'hidden_size': self.hidden_size,
        })
        base_config = super(SincConv1D, self).get_config()
        return dict(list(base_config.items()) + list(config.items()))

## Self Attention Layer

In [None]:
class CustomAttention(tf.keras.layers.Layer):
    def __init__(self, *args, **kwargs):
        self.hidden_size = kwargs.pop('hidden_size')
        super(CustomAttention, self).__init__(*args, **kwargs)
        self.W_omega = tf.Variable(
            initializers.GlorotNormal(seed=0)(shape=[self.hidden_size, 1]), 
            dtype=tf.float32,
            trainable=True,
            name="W_omega")
        self.b_omega = tf.Variable(
            initializers.GlorotNormal(seed=0)(shape=[1]), 
            dtype=tf.float32,
            trainable=True,
            name="b_omega")
        self.u_omega = tf.Variable(
            initializers.GlorotNormal(seed=0)(shape=[1]), 
            dtype=tf.float32,
            trainable=True,
            name="u_omega")
    
    def call(self, inputs, **kwargs):
        v = tf.sigmoid(tf.tensordot(inputs, self.W_omega, axes=1) + self.b_omega)
        vu = tf.tensordot(v, self.u_omega, axes=1)
        alphas = layers.Softmax()(vu)
        return tf.reduce_sum(inputs * tf.expand_dims(alphas, -1), 1)
        
    def get_config(self):
        config = ({
            'hidden_size': self.hidden_size 
        })
        base_config = super(CustomAttention, self).get_config()
        return dict(list(base_config.items()) + list(config.items()))

## Build Model

In [None]:
default_params = {
'FILTER_CONV1' : 128,
'KERNEL_CONV1' : (3, 3),
'STRIDE_CONV1' : (1, 1),
'BIAS_INIT' : 'ones',
'KERNEL_INIT' : 'glorot_normal',
'PADDING_CONV1' : 'SAME',
'BN_MOMENTUM' : 0.9,
'LEAKY_ALPHA' : 0.01,
'SIZE_POOL_CONV1' : (2, 4),
'PADDING_POOL_CONV1' : 'VALID',
'NUM_DIL_LAYERS' : 3,
'FILTER_CONV_UFLB' : 128,
'KERNEL_CONV_UFLB' :  (3, 3),
'STRIDE_CONV_UFLB' : (1, 1),
'DIL_RATE_CONV_UFLB' : (2, 2),
'PADDING_CONV_UFLB' : 'SAME',
'BILSTM_UNITS_SPEC' : 256,
'UNITS_FCN' : [512, 256, 128, 64],
'LR' : 0.00001,
}

In [None]:
input_shape = (200, 40, 3)

In [None]:
def build_model(hparams=default_params):
    
    model_input_spec = Input(shape=input_shape, name='spec_features')

    x = layers.Conv2D(filters=hparams['FILTER_CONV1'], kernel_size=hparams['KERNEL_CONV1'], 
                      strides=hparams['STRIDE_CONV1'], bias_initializer=hparams['BIAS_INIT'], 
                      kernel_initializer=hparams['KERNEL_INIT'],
                      padding=hparams['PADDING_CONV1'], name='CONV1')(model_input_spec)

    x = layers.LeakyReLU(alpha=hparams['LEAKY_ALPHA'],
                         name='LEAKY_CONV1')(x)
    
    x = layers.MaxPooling2D(pool_size=hparams['SIZE_POOL_CONV1'], 
                            strides=hparams['SIZE_POOL_CONV1'], 
                            padding=hparams['PADDING_POOL_CONV1'], 
                            name='POOL_CONV1')(x)

    conv_layer_output = x

    for i in range(hparams['NUM_DIL_LAYERS']):
        x = layers.Conv2D(filters=hparams['FILTER_CONV_UFLB'], 
                          kernel_size=hparams['KERNEL_CONV_UFLB'], 
                          strides=hparams['STRIDE_CONV_UFLB'],
                          bias_initializer=hparams['BIAS_INIT'],
                          kernel_initializer=hparams['KERNEL_INIT'],
                          dilation_rate=hparams['DIL_RATE_CONV_UFLB'], 
                          padding=hparams['PADDING_CONV_UFLB'], 
                          name='CONV_UFLB_'+str(i+1))(x)
        x = layers.BatchNormalization(momentum=hparams['BN_MOMENTUM'], 
                                      name='BN_CONV_UFLB_'+str(i+1))(x)
        x = layers.LeakyReLU(alpha=hparams['LEAKY_ALPHA'],
                             name='LEAKY_CONV_UFLB_'+str(i+1))(x)

    skip_layer_output = layers.Conv2D(filters=hparams['FILTER_CONV_UFLB'], 
                                      kernel_size=hparams['KERNEL_CONV_UFLB'], 
                                      strides=hparams['STRIDE_CONV_UFLB'],
                                      bias_initializer=hparams['BIAS_INIT'],
                                      kernel_initializer=hparams['KERNEL_INIT'],
                                      dilation_rate=hparams['DIL_RATE_CONV_UFLB'], 
                                      padding=hparams['PADDING_CONV_UFLB'], 
                                      name='CONV_ALT')(conv_layer_output)

    skip_layer_output = layers.BatchNormalization(momentum=hparams['BN_MOMENTUM'], 
                                                  name='BN_CONV_ALT')(skip_layer_output)
    
    x = layers.Add(name='skip_connection')([x, skip_layer_output])
    
    time_step = x.shape[1]
    linear_units = x.shape[2]*x.shape[3]
    x = tf.reshape(x,[-1,time_step,linear_units])


    x = layers.LeakyReLU(alpha=hparams['LEAKY_ALPHA'],
                             name='LEAKY_LINEAR')(x)
    x = layers.Bidirectional(layers.LSTM(units=hparams['BILSTM_UNITS_SPEC'],
                                         bias_initializer=hparams['BIAS_INIT'],
                                         kernel_initializer=hparams['KERNEL_INIT'], 
                                         return_sequences=True))(x)
    x = CustomAttention(hidden_size=x.shape[2])(x)

    for i, n in enumerate(hparams['UNITS_FCN']):
        x = layers.Dense(units=n, activation="linear", 
                         name='fcn_dense'+str(i+1))(x)
        x = layers.LeakyReLU(alpha=hparams['LEAKY_ALPHA'], 
                             name='fcn_leaky'+str(i+1))(x)
    
    x = layers.Dense(units=4, activation="softmax", name='Softmax')(x)

    model = Model(inputs=model_input_spec, outputs=x, name="model_adrnn") 

    model.compile(
        loss=losses.CategoricalCrossentropy(from_logits=False),
        optimizer=optimizers.Adam(
            learning_rate=hparams['LR']
        ),
        metrics=['categorical_accuracy'],
    )
    print(model.summary())

    return model


In [None]:
input_shape = (8000)

In [None]:
default_params = {
'FILTER_CONV' : [80, 60, 60],
'KERNEL_CONV' : [251, 5, 5],
'SAMPLING_RATE': 16000,
'SIZE_POOL': [3, 3, 3],
'PADDING_POOL': 'VALID',
'BIAS_INIT' : 'ones',
'KERNEL_INIT' : 'glorot_normal',
'PADDING_CONV' : 'SAME',
'BN_MOMENTUM' : 0.05,
'LEAKY_ALPHA' : 0.2,
'BILSTM_UNITS' : 256,
'UNITS_FCN' : [512, 128, 64, 4],
'LR' : 0.00001,
}

In [None]:
def build_model(hparams=default_params):
    
    model_input = Input(shape=input_shape, name='raw_waveform_data')

    x = tf.expand_dims(model_input, -1)

    x = SincConv1D(N_filt=hparams['FILTER_CONV'][0], 
                   Filt_dim=hparams['KERNEL_CONV'][0], 
                   fs=hparams['SAMPLING_RATE'],
                   hidden_size=x.shape[1])(x)

    x = layers.MaxPooling1D(pool_size=hparams['SIZE_POOL'][0], 
                            padding=hparams['PADDING_POOL'], 
                            name='pool_conv1')(x)

    x = layers.BatchNormalization(momentum=hparams['BN_MOMENTUM'], 
                                  name='bn_conv1')(x)

    x = layers.LeakyReLU(alpha=hparams['LEAKY_ALPHA'],
                         name='leaky_conv1')(x)

    for i in range(1, len(hparams['FILTER_CONV'])):

        x = layers.Conv1D(filters=hparams['FILTER_CONV'][i], 
                          kernel_size=hparams['KERNEL_CONV'][i], 
                          bias_initializer=hparams['BIAS_INIT'],
                          kernel_initializer=hparams['KERNEL_INIT'],
                          padding=hparams['PADDING_CONV'], 
                          name='conv' + str(i+1))(x)

        x = layers.MaxPooling1D(pool_size=hparams['SIZE_POOL'][i], 
                                padding=hparams['PADDING_POOL'], 
                                name='pool_conv' + str(i+1))(x)
        
        x = layers.BatchNormalization(momentum=hparams['BN_MOMENTUM'], 
                                        name='bn_conv' + str(i+1))(x)

        x = layers.LeakyReLU(alpha=hparams['LEAKY_ALPHA'],
                            name='leaky_conv1' + str(i+1))(x)
    
    x = layers.Bidirectional(layers.LSTM(units=hparams['BILSTM_UNITS'],
                                         bias_initializer=hparams['BIAS_INIT'],
                                         kernel_initializer=hparams['KERNEL_INIT'], 
                                         return_sequences=True))(x)
    x = CustomAttention(hidden_size=x.shape[2])(x)

    for i, n in enumerate(hparams['UNITS_FCN']):
        x = layers.Dense(units=n, activation="linear", 
                         name='fcn_dense'+str(i+1))(x)
        x = layers.LeakyReLU(alpha=hparams['LEAKY_ALPHA'], 
                             name='fcn_leaky'+str(i+1))(x)

    model = Model(inputs=model_input, outputs=x, name="model_sincnet") 

    model.compile(
        loss=losses.CategoricalCrossentropy(from_logits=True),
        optimizer=optimizers.Adam(
            learning_rate=hparams['LR']
        ),
        metrics=['categorical_accuracy'],
    )
    print(model.summary())

    return model


# Retrieve Data

In [None]:
def retrieve_preprocessed_data(session):
    idx = session - 1
    speaker = speaker_list[idx]
    next_speaker = speaker_list[(idx+1)%10]
    file_code = 'Valid_' + speaker + 'Test_' + next_speaker
    data_file_name = dataset_dir + '/data' + file_code + '.pkl'
    print(data_file_name)

    f = open(data_file_name, 'rb')
    output = pickle.load(f)
    train_features, train_labels = output[0], output[1]  
    test_features, test_labels, test_segments_per_utterance = output[2], output[3], output[5]
    valid_features, valid_labels, valid_segments_per_utterance = output[6], output[7], output[9]  
    f.close()

    train_features = tf.convert_to_tensor(train_features, dtype=tf.float32)
    valid_features = tf.convert_to_tensor(valid_features, dtype=tf.float32)
    test_features = tf.convert_to_tensor(test_features, dtype=tf.float32)
    
    train_labels = tf.one_hot(train_labels, 4, dtype=tf.float32)
    valid_labels = tf.one_hot(valid_labels, 4, dtype=tf.float32)
    test_labels = tf.one_hot(test_labels, 4, dtype=tf.float32)

    train_labels = tf.reshape(train_labels, [train_labels.shape[0], 4])
    valid_labels = tf.reshape(valid_labels, [valid_labels.shape[0], 4])
    test_labels = tf.reshape(test_labels, [test_labels.shape[0], 4])

    return train_features, train_labels, valid_features, valid_labels, valid_segments_per_utterance, \
    test_features, test_labels, test_segments_per_utterance

# Train and Evaulate

In [None]:
def train_and_evaluate(model, num_epochs=1200):
    i=0
    best_valid_accuracy = 0
    best_epoch = 0

    while i< num_epochs:
        start = (i*60)%train_features.shape[0]
        end = min(start+60, train_features.shape[0])
        train_batch = train_features[start:end, :]
        train_batch_label = train_labels[start:end,:]

        loss = model.train_on_batch(train_batch, train_batch_label, return_dict=True)

        if((i+1)%5==0):
            valid_acc_uw, valid_conf = evaluate(model, valid_features, 
                                                valid_segments_per_utterance, 
                                                valid_labels)
            
            if valid_acc_uw > best_valid_accuracy:
                best_epoch = i+1
                best_valid_accuracy = valid_acc_uw
                test_accuracy, test_conf = evaluate(model, test_features, 
                                                    test_segments_per_utterance, 
                                                    test_labels)
                print('*'*30)
                print("Epoch: %05d" %(i+1))
                print("Training accuracy: " + str(loss['categorical_accuracy']))
                print("Valid_UA: " + str(valid_acc_uw)) 
                print("Test UA: " + str(test_accuracy))    

        i += 1

    print('*'*30)
    print("Best Epoch: %05d" %(best_epoch))
    print("Best Valid Accuracy: " + str(best_valid_accuracy))
    print("Test_UA: " + str(test_accuracy))    
    print('Test Confusion Matrix:["ang","sad","hap","neu"]')
    print(test_conf)

    return 

In [None]:
def evaluate(model, features, segments_per_utterance, labels, pooling='avg'):
    y_pred = np.empty((len(segments_per_utterance),4),dtype=np.float32)
    y_pred_segments = model.predict(features)
    index=0
    for j in range(len(segments_per_utterance)):
        if pooling == 'max':
            y_pred[j,:] = np.max(y_pred_segments[index:index+segments_per_utterance[j][0],:],0) 
        else:
            y_pred[j,:] = np.sum(y_pred_segments[index:index+segments_per_utterance[j][0],:],0)
        index+=(segments_per_utterance[j][0])

    acc_uw = recall(np.argmax(labels,1),np.argmax(y_pred,1),average='macro')
    conf = confusion(np.argmax(labels, 1),np.argmax(y_pred,1))
    return acc_uw, conf

# Sincnet

In [None]:
model = build_model()
train_and_evaluate(model)

Model: "model_sincnet"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
raw_waveform_data (InputLaye [(None, 4000)]            0         
_________________________________________________________________
tf.expand_dims_7 (TFOpLambda (None, 4000, 1)           0         
_________________________________________________________________
sinc_conv1d_7 (SincConv1D)   (None, 4000, 251)         160       
_________________________________________________________________
pool_conv1 (MaxPooling1D)    (None, 1333, 251)         0         
_________________________________________________________________
bn_conv1 (BatchNormalization (None, 1333, 251)         1004      
_________________________________________________________________
leaky_conv1 (LeakyReLU)      (None, 1333, 251)         0         
_________________________________________________________________
conv2 (Conv1D)               (None, 1333, 60)        