In [82]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pathlib import Path
from scipy.io import wavfile
import os.path
import IPython.display
import seaborn as sns
import librosa
import librosa.display
import soundfile
from audiomentations import Compose, AddGaussianNoise, TimeStretch, PitchShift

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import confusion_matrix

import tensorflow as tf
from tensorflow.keras import utils
from keras.models import Sequential
from keras.layers import Dense, Dropout, Activation, Conv1D, MaxPooling1D, Flatten, BatchNormalization
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout
from keras import optimizers

from keras import optimizers,regularizers
from tensorflow.python.keras.callbacks import ModelCheckpoint
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from sklearn.decomposition import PCA
import warnings

In [2]:
!pip install pydub
!pip install audiomentations

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [16]:
import re
import os
import wave
import glob
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pydub import AudioSegment
from IPython.display import Audio, display
import librosa as lib
import librosa.display

from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [17]:
PATH = "/content/drive/MyDrive/Colab Notebooks/Speech_Emotion/Crema"
AUDIO_PATH = "/content/drive/MyDrive/Colab Notebooks/Speech_Emotion/Crema/1001_DFA_ANG_XX.wav"

In [18]:
def zeroCrossingRate(audio):
  return lib.feature.zero_crossing_rate(audio)

In [19]:
def energy(audio):
  # using a spectrogram will give a more accurate representation
  # of energy over time because its frames can be windowed
  S, phase = lib.magphase(lib.stft(audio))
  return lib.feature.rms(S=S).mean()

In [20]:
def chromaStft(audio, sr):
  stft = np.abs(librosa.stft(audio))
  return librosa.feature.chroma_stft(S=stft, sr=sr, n_fft=200).mean()

In [21]:
def mfcc(data, sr):
  return librosa.feature.mfcc(y=data, sr=sr, n_fft=200).mean()

In [22]:
def melSpectrogram(audio, sr):
  mel_spectrogram = lib.feature.melspectrogram(y=audio, sr=sr, n_fft=200, n_mels=64)
  log_mel_spectrogram = librosa.power_to_db(mel_spectrogram)
  return log_mel_spectrogram

In [23]:
def playAudio(audio_file):
  audio = AudioSegment.from_wav(audio_file)
  # Play the audio
  audio.export('temp_audio.wav', format='wav')
  audio_data = open('temp_audio.wav', 'rb').read()
  display(Audio(audio_data))
  # Delete the temporary audio file
  os.remove('temp_audio.wav')

In [24]:
def visualize_waveform(audio, sr):
    plt.figure(figsize=(12, 4))
    plt.plot(audio)
    plt.xlabel('Time')
    plt.ylabel('Amplitude')
    plt.title('Waveform')
    plt.show()

In [26]:
def splitData(dataset_path):
  audio_files = glob.glob(os.path.join(dataset_path, "*.wav"))
  audio_train, audio_test = train_test_split(audio_files, test_size=0.1, random_state=42, shuffle=True)
  return audio_train, audio_test

In [27]:
D_train, D_test = splitData(PATH)

In [28]:
print(len(D_train), len(D_test))

6697 745


In [29]:
D_test[0]

'/content/drive/MyDrive/Colab Notebooks/Speech_Emotion/Crema/1026_TIE_HAP_XX.wav'

In [30]:
def getMaxLen(audio_files):
  max_length = 0
  for audio_file in audio_files:
    audio, _ = librosa.load(audio_file)
    length = len(audio)
    max_length = max(max_length, length)
  return max_length

In [154]:
def loadAudioTime(audio_files):
  classes = ["SAD", "ANG", "DIS", "FEA", "HAP", "NEU"]
  # for data augmentation
  augmentations = Compose([
    AddGaussianNoise(min_amplitude=0.001, max_amplitude=0.01, p=0.5),
    TimeStretch(min_rate=0.8, max_rate=1.2, p=0.5),
    PitchShift(min_semitones=-4, max_semitones=4, p=0.5)
])
  # max_length = getMaxLen(audio_files)
  max_length = 200
  D, Y = [], []
  for audio_file in audio_files: 
    # load the audio file
    audio, sr = lib.load(audio_file, sr=4444)
    # extract zero crossing rate
    zcr = zeroCrossingRate(audio)
    # extract energy
    rms = energy(audio)
    # chroma stft
    cs = chromaStft(audio, sr)
    # mfcc
    mfc = mfcc(audio, sr)
    # extract mel spectrogram
    mel_spec = melSpectrogram(audio, sr).mean()
    combined_features = np.concatenate(([rms, cs, mfc, mel_spec], (np.pad(zcr[0], (0, 300 - len(zcr[0]))))))
    D.append(combined_features)
    for cls in classes:
      if re.search(cls, audio_file): Y.append(cls)
    # don't augment every example
    if np.random.rand() >= 0.5:
      augmented_audio = augmentations(samples=audio, sample_rate=sr)
      # extract zero crossing rate
      zcr = zeroCrossingRate(augmented_audio)
      # extract energy
      rms = energy(augmented_audio)
      # chroma stft
      cs = chromaStft(augmented_audio, sr)
      # mfcc
      mfc = mfcc(augmented_audio, sr)
      # extract mel spectrogram
      mel_spec = melSpectrogram(augmented_audio, sr).mean()
      combined_features = np.concatenate(([rms, cs, mfc, mel_spec], (np.pad(zcr[0], (0, 300 - len(zcr[0]))))))
      D.append(combined_features)
      for cls in classes:
        if re.search(cls, audio_file): Y.append(cls)
  return D, Y

In [137]:
def loadAudioFreq(audio_files):
  classes = ["SAD", "ANG", "DIS", "FEA", "HAP", "NEU"]
  # for data augmentation
  augmentations = Compose([
    AddGaussianNoise(min_amplitude=0.001, max_amplitude=0.01, p=0.5),
    TimeStretch(min_rate=0.8, max_rate=1.2, p=0.5),
    PitchShift(min_semitones=-4, max_semitones=4, p=0.5)
])
  # max_length = getMaxLen(audio_files)
  max_length = 200
  D, Y = [], []
  for audio_file in audio_files: 
    # load the audio file
    audio, sr = lib.load(audio_file, sr=4444)
    # extract mel spectrogram
    mel_spec = melSpectrogram(audio, sr)
    D.append(mel_spec)
    for cls in classes:
      if re.search(cls, audio_file): Y.append(cls)
    # don't augment every example
    if np.random.rand() >= 0.5:
      augmented_audio = augmentations(samples=audio, sample_rate=sr)
      mel_spec_aug = melSpectrogram(augmented_audio, sr)
      D.append(mel_spec_aug)
      for cls in classes:
        if re.search(cls, audio_file): Y.append(cls)
  return D, Y

## Time Domain Model

In [189]:
D_time, Y_time = loadAudioTime(D_train)

KeyboardInterrupt: ignored

In [156]:
D_time = np.array(D_time)
D_time.shape

(1114, 304)

In [157]:
unicheck = np.unique(Y_time, axis=0)
print(f"Check on time domain:\n{unicheck}")

Check on time domain:
['ANG' 'DIS' 'FEA' 'HAP' 'NEU' 'SAD']


In [158]:
df_time = pd.DataFrame(Y_time)
df_time.head()

Unnamed: 0,0
0,HAP
1,SAD
2,HAP
3,SAD
4,NEU


In [159]:
# Use the get_dummies() method to one-hot encode the labels
one_hot_labels = pd.get_dummies(df_time)

# Convert the DataFrame to a NumPy array
one_hot_labels = one_hot_labels.to_numpy()

print(one_hot_labels.shape)

(1114, 6)


In [160]:
Y_time = one_hot_labels
Y_time.shape

(1114, 6)

In [169]:
# Apply PCA to the data
pca = PCA(n_components=200)
D_pca = pca.fit_transform(D_time)

In [190]:
# Split the data into train and test sets
x_train, x_test, y_train, y_test = train_test_split(np.array(D_pca), np.array(Y_time), test_size=0.05, random_state=69, shuffle=True, stratify=Y_time)
print((x_train.shape, y_train.shape, x_test.shape, y_test.shape))

x_train = x_train.reshape((x_train.shape[0], x_train.shape[1], 1))
x_test = x_test.reshape((x_test.shape[0], x_test.shape[1], 1))

model = Sequential()
model.add(Conv1D(128, 3, input_shape=(x_train.shape[1], 1), kernel_regularizer=regularizers.l2(0.001)))
model.add(BatchNormalization())
model.add(Activation('relu'))
model.add(MaxPooling1D(pool_size=2))

model.add(Conv1D(256, 3, kernel_regularizer=regularizers.l2(0.001)))
model.add(BatchNormalization())
model.add(Activation('relu'))
model.add(MaxPooling1D(pool_size=2))

model.add(Conv1D(512, 3, kernel_regularizer=regularizers.l2(0.001)))
model.add(BatchNormalization())
model.add(Activation('relu'))
model.add(MaxPooling1D(pool_size=2))

model.add(Conv1D(1024, 3,  kernel_regularizer=regularizers.l2(0.001)))
model.add(BatchNormalization())
model.add(Activation('relu'))
model.add(MaxPooling1D(pool_size=2))

model.add(Flatten())
model.add(Dense(512, kernel_regularizer=regularizers.l2(0.001)))
model.add(BatchNormalization())
model.add(Activation('relu'))
model.add(Dropout(0.3))

model.add(Dense(256, kernel_regularizer=regularizers.l2(0.001)))
model.add(BatchNormalization())
model.add(Activation('relu'))
model.add(Dropout(0.2))

model.add(Dense(128, kernel_regularizer=regularizers.l2(0.001)))
model.add(BatchNormalization())
model.add(Activation('relu'))
model.add(Dropout(0.2))

model.add(Dense(6, activation='softmax'))

model.compile(loss='categorical_crossentropy',
             optimizer=optimizers.Adam(lr=0.0005),
             metrics=['accuracy'])

# Specify thecallbacks for the model
callbacks = [EarlyStopping(monitor='val_loss', patience=10)]

model.summary()

# Train the model with early stopping
history = model.fit(x_train, y_train,
                    validation_data=(x_train, y_train),
                    epochs=50,
                    batch_size=32,
                    callbacks=callbacks)

((1058, 200), (1058, 6), (56, 200), (56, 6))
Model: "sequential_20"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv1d_40 (Conv1D)          (None, 198, 128)          512       
                                                                 
 batch_normalization_70 (Bat  (None, 198, 128)         512       
 chNormalization)                                                
                                                                 
 activation_70 (Activation)  (None, 198, 128)          0         
                                                                 
 max_pooling1d_40 (MaxPoolin  (None, 99, 128)          0         
 g1D)                                                            
                                                                 
 conv1d_41 (Conv1D)          (None, 97, 256)           98560     
                                                                 
 batch_n

  super().__init__(name, **kwargs)


Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


## Frequency Domain Model

In [188]:
D_freq, Y_freq = loadAudioFreq(D_train)

KeyboardInterrupt: ignored

In [174]:
unicheck = np.unique(Y_freq, axis=0)
print(f"Check on freq domain:\n{unicheck}")

Check on freq domain:
['ANG' 'DIS' 'FEA' 'HAP' 'NEU' 'SAD']


In [175]:
df_freq = pd.DataFrame(Y_freq)
df_freq.head()

Unnamed: 0,0
0,HAP
1,SAD
2,SAD
3,HAP
4,HAP


In [176]:
# Use the get_dummies() method to one-hot encode the labels
one_hot_labels = pd.get_dummies(df_freq)

# Convert the DataFrame to a NumPy array
one_hot_labels = one_hot_labels.to_numpy()

print(one_hot_labels.shape)

(1106, 6)


In [177]:
Y_freq = one_hot_labels
Y_freq.shape

(1106, 6)

In [178]:
for i, array in enumerate(D_freq):
    D_freq[i] = np.pad(array, ((0, 0), (0, 256 - array.shape[1])))
D_freq = np.array(D_freq)

In [179]:
D_freq.shape

(1106, 64, 256)

In [180]:
x_train, x_test, y_train, y_test = train_test_split(np.array(D_freq), np.array(Y_freq), test_size=0.05)
print((x_train.shape, y_train.shape, x_test.shape, y_test.shape))

((1050, 64, 256), (1050, 6), (56, 64, 256), (56, 6))


In [181]:
x_train = x_train.reshape((x_train.shape[0], x_train.shape[1], x_train.shape[2], 1))
x_test = x_test.reshape((x_test.shape[0], x_test.shape[1], x_test.shape[2], 1))
print(x_train.shape, x_test.shape)

(1050, 64, 256, 1) (56, 64, 256, 1)


In [185]:
model = Sequential()

# Layer 1: Convolutional Layer
model.add(Conv2D(32, (3, 3), activation='relu', input_shape=(x_train.shape[1], x_train.shape[2], 1)))
model.add(MaxPooling2D(pool_size=(2, 2)))

# Layer 2: Convolutional Layer
model.add(Conv2D(64, (3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))

# Layer 3: Convolutional Layer
model.add(Conv2D(128, (3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))

# Layer 4: Flatten Layer
model.add(Flatten())

# Layer 5: Fully Connected Layer
model.add(Dense(256, activation='relu'))
model.add(Dropout(0.5))

# Layer 6: Fully Connected Layer
model.add(Dense(128, activation='relu'))
model.add(Dropout(0.5))

# Layer 7: Output Layer
model.add(Dense(6, activation='sigmoid'))

model.compile(loss='categorical_crossentropy',
             optimizer=optimizers.Adam(lr=0.0005),
             metrics=['accuracy'])

model.summary()

Model: "sequential_19"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d_23 (Conv2D)          (None, 62, 254, 32)       320       
                                                                 
 max_pooling2d_21 (MaxPoolin  (None, 31, 127, 32)      0         
 g2D)                                                            
                                                                 
 conv2d_24 (Conv2D)          (None, 29, 125, 64)       18496     
                                                                 
 max_pooling2d_22 (MaxPoolin  (None, 14, 62, 64)       0         
 g2D)                                                            
                                                                 
 conv2d_25 (Conv2D)          (None, 12, 60, 128)       73856     
                                                                 
 max_pooling2d_23 (MaxPoolin  (None, 6, 30, 128)     

In [187]:
# Specify thecallbacks for the model
callbacks = [EarlyStopping(monitor='val_loss', patience=10)]

history = model.fit(x_train, y_train,
                    epochs=50,
                    batch_size=128,
                    validation_data=(x_test, y_test),
                    callbacks=callbacks)

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
