In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [1]:
import os
from glob import glob
import pickle
import itertools
import numpy as np
from scipy.stats import zscore
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split

### Graph imports ###
import matplotlib.pyplot as plt
from PIL import Image
import pandas as pd

### Audio import ###
import librosa
import IPython
from IPython.display import Audio

### Plot imports ###
from IPython.display import Image
import matplotlib.pyplot as plt

### Time Distributed ConvNet imports ###
import tensorflow as tf
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Input, Dense, Dropout, Activation, TimeDistributed, concatenate
from tensorflow.keras.layers import Conv2D, MaxPooling2D, AveragePooling2D, BatchNormalization, LeakyReLU, Flatten
from tensorflow.keras.layers import LSTM
from tensorflow.keras.optimizers import Adam, SGD
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras import backend as K
from keras.utils import np_utils
from keras.utils.vis_utils import plot_model
from sklearn.preprocessing import LabelEncoder

### Warning ###
import warnings
warnings.filterwarnings('ignore')

In [2]:
Ravdess = "/content/drive/MyDrive/actors/"
Crema = "/content/drive/MyDrive/AudioWAV/"
Tess = "/content/drive/MyDrive/TESS Toronto emotional speech set data/"
Savee = "/content/drive/MyDrive/ALL/"

In [3]:
ravdess_directory_list = os.listdir(Ravdess)

file_emotion = []
file_path = []
for dir in ravdess_directory_list:
    # as their are 20 different actors in our previous directory we need to extract files for each actor.
    actor = os.listdir(Ravdess + dir)
    for file in actor:
        part = file.split('.')[0]
        part = part.split('-')
        # third part in each file represents the emotion associated to that file.
        try:
          file_emotion.append(int(part[2]))
          file_path.append(Ravdess + dir + '/' + file)
        except IndexError:
          pass
        
# dataframe for emotion of files
emotion_df = pd.DataFrame(file_emotion, columns=['Emotions'])

# dataframe for path of files.
path_df = pd.DataFrame(file_path, columns=['Path'])
Ravdess_df = pd.concat([emotion_df, path_df], axis=1)

# changing integers to actual emotions.
Ravdess_df.Emotions.replace({1:'neutral', 2:'calm', 3:'happy', 4:'sad', 5:'angry', 6:'fear', 7:'disgust', 8:'surprise'}, inplace=True)
Ravdess_df = Ravdess_df[(Ravdess_df["Emotions"] != 'surprise') &(Ravdess_df["Emotions"] != 'calm')] 
Ravdess_df.head()
np.unique(Ravdess_df.Emotions)

array(['angry', 'disgust', 'fear', 'happy', 'neutral', 'sad'],
      dtype=object)

In [None]:
crema_directory_list = os.listdir(Crema)

file_emotion = []
file_path = []

for file in crema_directory_list:
    # storing file paths
    file_path.append(Crema + file)
    # storing file emotions
    part=file.split('_')
    if part[2] == 'SAD':
        file_emotion.append('sad')
    elif part[2] == 'ANG':
        file_emotion.append('angry')
    elif part[2] == 'DIS':
        file_emotion.append('disgust')
    elif part[2] == 'FEA':
        file_emotion.append('fear')
    elif part[2] == 'HAP':
        file_emotion.append('happy')
    elif part[2] == 'NEU':
        file_emotion.append('neutral')
    else:
        file_emotion.append('Unknown')
        
# dataframe for emotion of files
emotion_df = pd.DataFrame(file_emotion, columns=['Emotions'])

# dataframe for path of files.
path_df = pd.DataFrame(file_path, columns=['Path'])
Crema_df = pd.concat([emotion_df, path_df], axis=1)
np.unique(Crema_df.Emotions)

array(['angry', 'disgust', 'fear', 'happy', 'neutral', 'sad'],
      dtype=object)

In [None]:
tess_directory_list = os.listdir(Tess)

file_emotion = []
file_path = []

for dir in tess_directory_list:
    directories = os.listdir(Tess + dir)
    for file in directories:
        part = file.split('.')[0]
        try:
          part = part.split('_')[2]
          if part=='ps':
              # file_emotion.append('surprise')
              continue
          else:
              file_emotion.append(part)
          file_path.append(Tess + dir + '/' + file)
        except IndexError:
          pass
        
# dataframe for emotion of files
emotion_df = pd.DataFrame(file_emotion, columns=['Emotions'])

# dataframe for path of files.
path_df = pd.DataFrame(file_path, columns=['Path'])
Tess_df = pd.concat([emotion_df, path_df], axis=1)
np.unique(Tess_df.Emotions)

array(['angry', 'disgust', 'fear', 'happy', 'neutral', 'sad', 'sad (1)'],
      dtype=object)

In [None]:
savee_directory_list = os.listdir(Savee)

file_emotion = []
file_path = []

for file in savee_directory_list:
    file_path.append(Savee + file)
    part = file.split('_')[1]
    ele = part[:-6]
    if ele=='a':
        file_emotion.append('angry')
    elif ele=='d':
        file_emotion.append('disgust')
    elif ele=='f':
        file_emotion.append('fear')
    elif ele=='h':
        file_emotion.append('happy')
    elif ele=='n':
        file_emotion.append('neutral')
    elif ele=='sa':
        file_emotion.append('sad')
    else:
        file_emotion.append('surprise')        
# dataframe for emotion of files
emotion_df = pd.DataFrame(file_emotion, columns=['Emotions'])

# dataframe for path of files.
path_df = pd.DataFrame(file_path, columns=['Path'])

Savee_df = pd.concat([emotion_df, path_df], axis=1)
Savee_df = Savee_df[Savee_df['Emotions'] != 'surprise']
np.unique(Savee_df['Emotions'])

array(['angry', 'disgust', 'fear', 'happy', 'neutral', 'sad'],
      dtype=object)

In [6]:
# data_path = pd.concat([Ravdess_df, Crema_df, Tess_df, Savee_df], axis = 0)
data_path = Ravdess_df
data_path = shuffle(data_path).reset_index(drop=True)
data_path.to_csv("data_path.csv",index=False)
data_path = data_path
RAV_df = data_path.Path
data_path.head(20)

Unnamed: 0,Emotions,Path
0,happy,/content/drive/MyDrive/actors/Actor_03/03-01-0...
1,sad,/content/drive/MyDrive/actors/Actor_17/03-01-0...
2,neutral,/content/drive/MyDrive/actors/Actor_15/03-01-0...
3,angry,/content/drive/MyDrive/actors/Actor_19/03-01-0...
4,fear,/content/drive/MyDrive/actors/Actor_14/03-01-0...
5,fear,/content/drive/MyDrive/actors/Actor_21/03-01-0...
6,sad,/content/drive/MyDrive/actors/Actor_01/03-01-0...
7,happy,/content/drive/MyDrive/actors/Actor_03/03-01-0...
8,angry,/content/drive/MyDrive/actors/Actor_20/03-01-0...
9,disgust,/content/drive/MyDrive/actors/Actor_15/03-01-0...


In [7]:
labels = data_path.Emotions.replace({'neutral':1, 'happy':2, 'sad':3,'sad (1)':3, 'angry':4, 'fear':5, 'disgust':6})
labels = labels.ravel()

In [8]:
unique, counts = np.unique(labels, return_counts=True)

result = np.column_stack((unique, counts)) 
print (result)

[[  1  96]
 [  2 192]
 [  3 192]
 [  4 192]
 [  5 192]
 [  6 192]]


In [9]:
signal = []

# Sample rate (16.0 kHz)
sample_rate = 16000     

# Max pad lenght (3.0 sec)
max_pad_len = 49100

for index,path in enumerate(RAV_df[:1]):
    X, sample_rate = librosa.load(path
                                  ,duration=3
                                  ,offset=0.5
                                 )
    sample_rate = np.array(sample_rate)
    
    y = zscore(X)
        
    # Padding or truncated signal 
    if len(y) < max_pad_len:    
        y_padded = np.zeros(max_pad_len)
        y_padded[:len(y)] = y
        y = y_padded
    elif len(y) > max_pad_len:
        y = np.asarray(y[:max_pad_len])

    # Add to signal list
    signal.append(y)
    

In [12]:
def noisy_signal(signal, snr_low=15, snr_high=30, nb_augmented=2):
    
    # Signal length
    signal_len = len(signal)

    # Generate White noise
    noise = np.random.normal(size=(nb_augmented, signal_len))
    
    # Compute signal and noise power
    s_power = np.sum((signal / (2.0 ** 15)) ** 2) / signal_len
    n_power = np.sum((noise / (2.0 ** 15)) ** 2, axis=1) / signal_len
    
    # Random SNR: Uniform [15, 30]
    snr = np.random.randint(snr_low, snr_high)
    
    # Compute K coeff for each noise
    K = np.sqrt((s_power / n_power) * 10 ** (- snr / 10))
    K = np.ones((signal_len, nb_augmented)) * K
    
    # Generate noisy signal
    return signal + K.T * noise

In [13]:
print("Data Augmentation: START")
augmented_signal = list(map(noisy_signal, signal))
print("Data Augmentation: END!")

Data Augmentation: START
Data Augmentation: END!


In [11]:
def mel_spectrogram(y, sr=16000, n_fft=512, win_length=256, hop_length=128, window='hamming', n_mels=128, fmax=4000):
    
    np.nan_to_num(y, copy = False, nan = 0)
    # Compute spectogram
    mel_spect = np.abs(librosa.stft(y, n_fft=n_fft, window=window, win_length=win_length, hop_length=hop_length)) ** 2
    
    # Compute mel spectrogram
    mel_spect = librosa.feature.melspectrogram(S=mel_spect, sr=sr, n_mels=n_mels, fmax=fmax)
    
    # Compute log-mel spectrogram
    mel_spect = librosa.power_to_db(mel_spect, ref=np.max)
    
    return mel_spect

In [14]:
mel_spect = np.asarray(list(map(mel_spectrogram, signal)))
augmented_mel_spect = [np.asarray(list(map(mel_spectrogram, augmented_signal[i]))) for i in range(len(augmented_signal))]


In [17]:
MEL_SPECT_train, MEL_SPECT_test, AUG_MEL_SPECT_train, AUG_MEL_SPECT_test, label_train, label_test = train_test_split(mel_spect, augmented_mel_spect,labels, test_size=0.2,random_state=1)

# Build augmented labels and train
aug_label_train = np.asarray(list(itertools.chain.from_iterable([[label] * 2 for label in label_train])))
AUG_MEL_SPECT_train = np.asarray(list(itertools.chain.from_iterable(AUG_MEL_SPECT_train)))

# Concatenate original and augmented
X_train = np.concatenate((MEL_SPECT_train, AUG_MEL_SPECT_train))
y_train = np.concatenate((label_train, aug_label_train))

# Build test set
X_test = MEL_SPECT_test
y_test = label_test

ValueError: ignored

In [None]:
MEL_SPECT_train, MEL_SPECT_test, label_train, label_test = train_test_split(mel_spect, labels, test_size=0.2,random_state=1)

# Concatenate original and augmented
X_train = MEL_SPECT_train
y_train = label_train

# Build test set
X_test = MEL_SPECT_test
y_test = label_test

In [16]:
win_ts = 128
hop_ts = 64

# Split spectrogram into frames
def frame(x, win_step=128, win_size=64):
    nb_frames = 1 + int((x.shape[2] - win_size) / win_step)
    frames = np.zeros((x.shape[0], nb_frames, x.shape[1], win_size)).astype(np.float32)
    for t in range(nb_frames):
        frames[:,t,:,:] = np.copy(x[:,:,(t * win_step):(t * win_step + win_size)]).astype(np.float32)
    return frames

# Frame for TimeDistributed model
X_train = frame(X_train, hop_ts, win_ts)
X_test = frame(X_test, hop_ts, win_ts)

NameError: ignored

# Новый раздел

In [None]:
lb = LabelEncoder()
y_train = np_utils.to_categorical(lb.fit_transform(np.ravel(y_train)))
y_test = np_utils.to_categorical(lb.transform(np.ravel(y_test)))

In [None]:
X_train = X_train.reshape(X_train.shape[0], X_train.shape[1] , X_train.shape[2], X_train.shape[3], 1)
X_test = X_test.reshape(X_test.shape[0], X_test.shape[1] , X_test.shape[2], X_test.shape[3], 1)


In [None]:
np.save('/content/drive/MyDrive/X_train', X_train)
np.save('/content/drive/MyDrive/X_test', X_test)
np.save('/content/drive/MyDrive/y_test', y_test)
np.save('/content/drive/MyDrive/y_train', y_train)

NameError: ignored

In [None]:
X_train = np.load('/content/drive/MyDrive/X_train.npy')
X_test = np.load('/content/drive/MyDrive/X_test.npy')
y_test = np.load('/content/drive/MyDrive/y_test.npy')
y_train = np.load('/content/drive/MyDrive/y_train.npy')

In [None]:
X_train.shape

(2532, 5, 128, 128, 1)

In [None]:
K.clear_session()

# Define two sets of inputs: MFCC and FBANK
input_y = Input(shape=X_train.shape[1:], name='InputMELSPECT')

## First LFLB (local feature learning block)
y = TimeDistributed(Conv2D(64, kernel_size=(3, 3), strides=(1, 1), padding='same'), name='Conv1MELSPECT')(input_y)
y = TimeDistributed(BatchNormalization(), name='BatchNorm1MELSPECT')(y)
y = TimeDistributed(Activation('elu'), name='Activ1MELSPECT')(y)
y = TimeDistributed(MaxPooling2D(pool_size=(2, 2), strides=(2, 2), padding='same'), name='MaxPool1MELSPECT')(y)
y = TimeDistributed(Dropout(0.2), name='Drop1MELSPECT')(y)     

## Second LFLB (local feature learning block)
y = TimeDistributed(Conv2D(64, kernel_size=(3, 3), strides=(1, 1), padding='same'), name='Conv2MELSPECT')(y)
y = TimeDistributed(BatchNormalization(), name='BatchNorm2MELSPECT')(y)
y = TimeDistributed(Activation('elu'), name='Activ2MELSPECT')(y)
y = TimeDistributed(MaxPooling2D(pool_size=(4, 4), strides=(4, 4), padding='same'), name='MaxPool2MELSPECT')(y)
y = TimeDistributed(Dropout(0.2), name='Drop2MELSPECT')(y)

## Second LFLB (local feature learning block)
y = TimeDistributed(Conv2D(128, kernel_size=(3, 3), strides=(1, 1), padding='same'), name='Conv3MELSPECT')(y)
y = TimeDistributed(BatchNormalization(), name='BatchNorm3MELSPECT')(y)
y = TimeDistributed(Activation('elu'), name='Activ3MELSPECT')(y)
y = TimeDistributed(MaxPooling2D(pool_size=(4, 4), strides=(4, 4), padding='same'), name='MaxPool3MELSPECT')(y)
y = TimeDistributed(Dropout(0.2), name='Drop3MELSPECT')(y)

## Second LFLB (local feature learning block)
y = TimeDistributed(Conv2D(128, kernel_size=(3, 3), strides=(1, 1), padding='same'), name='Conv4MELSPECT')(y)
y = TimeDistributed(BatchNormalization(), name='BatchNorm4MELSPECT')(y)
y = TimeDistributed(Activation('elu'), name='Activ4MELSPECT')(y)
y = TimeDistributed(MaxPooling2D(pool_size=(4, 4), strides=(4, 4), padding='same'), name='MaxPool4MELSPECT')(y)
y = TimeDistributed(Dropout(0.2), name='Drop4MELSPECT')(y)  

## Flat
y = TimeDistributed(Flatten(), name='FlatMELSPECT')(y)                      
                               
# Apply 2 LSTM layer and one FC
y = LSTM(256, return_sequences=False, dropout=0.2, name='LSTM1')(y)
y = Dense(y_train.shape[1], activation='softmax', name='FC')(y)

# Build final model
model = Model(inputs=input_y, outputs=y)

# Early stopping
early_stopping = EarlyStopping(monitor='val_accuracy', patience=30, verbose=1, mode='max')



In [None]:
model.compile(optimizer=SGD(lr=0.01, decay=1e-6, momentum=0.8), loss='categorical_crossentropy', metrics=['accuracy'])

# Fit model
history = model.fit(X_train, y_train, batch_size=64, epochs=5, validation_data=(X_test, y_test), callbacks=[early_stopping])
model.save('/content/drive/MyDrive/my_model_1')

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5




INFO:tensorflow:Assets written to: /content/drive/MyDrive/my_model_1/assets


INFO:tensorflow:Assets written to: /content/drive/MyDrive/my_model_1/assets


In [None]:
# Early stopping
early_stopping = EarlyStopping(monitor='val_accuracy', patience=30, verbose=1, mode='max')

In [None]:
from tensorflow import keras

for i in range(19): 
  model = keras.models.load_model('/content/drive/MyDrive/my_model_1')
  history = model.fit(X_train, y_train, batch_size=64, epochs=5, validation_data=(X_test, y_test), callbacks=[early_stopping])
  model.save('/content/drive/MyDrive/my_model_1')

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5




INFO:tensorflow:Assets written to: /content/drive/MyDrive/my_model_2/assets


INFO:tensorflow:Assets written to: /content/drive/MyDrive/my_model_2/assets


In [None]:
+model = keras.models.load_model('/content/drive/MyDrive/my_model_1')
score = model.evaluate(X_test, y_test, verbose=0)
score


[0.8821555376052856, 0.7358490824699402]