Two-Way Feature Extraction for Speech Emotion Recognition Using Deep Learning

https://www.ncbi.nlm.nih.gov/pmc/articles/PMC8949356/

In [1]:
import pandas as pd
import numpy as np

import os
import sys

from datetime import datetime
import pickle

import librosa
import librosa.display

from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score
from sklearn.model_selection import train_test_split

import torch
from torch import nn
from torch import optim as opt
from torch.utils.data import Dataset, DataLoader 
import torchvision
from torchvision import transforms as T, datasets  
from torch.utils.tensorboard import SummaryWriter 

import warnings
if not sys.warnoptions:
    warnings.simplefilter("ignore")
warnings.filterwarnings("ignore", category=DeprecationWarning) 

In [2]:
Ravdess = "/kaggle/input/ravdess-emotional-speech-audio/"
Tess = "/kaggle/input/toronto-emotional-speech-set-tess/TESS Toronto emotional speech set data/"

In [3]:
ravdess_directory_list = os.listdir(Ravdess)
ravdess_emotion = ['neutral','calm','happy','sad','angry','fear','disgust','surprise']

file_emotion = []
file_path = []
for dir in ravdess_directory_list:
    if (dir == "audio_speech_actors_01-24"):
        continue
    actor = os.listdir(Ravdess + dir)
    for file in actor:
        # get the emotion of this file
        part = file.split('.')[0]
        part = part.split('-')
        id = int(part[2])
        file_emotion.append(ravdess_emotion[id-1])
        # get file's path
        file_path.append(Ravdess + dir + '/' + file)
        
# convert to dataframe
emotion_df = pd.DataFrame(file_emotion, columns=['Emotions'])
path_df = pd.DataFrame(file_path, columns=['Path'])
Ravdess_df = pd.concat([emotion_df, path_df], axis=1)

Ravdess_df.shape

(1440, 2)

In [4]:
tess_directory_list = os.listdir(Tess)

file_emotion = []
file_path = []

for dir in tess_directory_list:
    directories = os.listdir(Tess + dir)
    for file in directories:
        part = file.split('.')[0]
        part = part.split('_')[2]
        if part=='ps':
            file_emotion.append('surprise')
        else:
            file_emotion.append(part)
        file_path.append(Tess + dir + '/' + file)
        
# convert to dataframe
emotion_df = pd.DataFrame(file_emotion, columns=['Emotions'])
path_df = pd.DataFrame(file_path, columns=['Path'])
Tess_df = pd.concat([emotion_df, path_df], axis=1)

Tess_df.shape

(2800, 2)

In [5]:
data_path = pd.concat([Ravdess_df, Tess_df], axis = 0)
data_path.to_csv("data_path.csv",index=False)
data_path.head()

Unnamed: 0,Emotions,Path
0,surprise,/kaggle/input/ravdess-emotional-speech-audio/A...
1,neutral,/kaggle/input/ravdess-emotional-speech-audio/A...
2,disgust,/kaggle/input/ravdess-emotional-speech-audio/A...
3,disgust,/kaggle/input/ravdess-emotional-speech-audio/A...
4,neutral,/kaggle/input/ravdess-emotional-speech-audio/A...


In [6]:
def extract_features(data,sample_rate):
    result = np.array([])
    
    # MFCC
    mfcc = np.mean(librosa.feature.mfcc(y = data, sr = sample_rate).T, axis = 0)
    result = np.hstack((result, mfcc))
    
    # Log Mel-Spectrogram
    mel = np.mean(librosa.feature.melspectrogram(y = data, sr = sample_rate).T, axis = 0)
    result = np.hstack((result, mel)) 
    
    # Chroma
    chroma_stft = np.mean(librosa.feature.chroma_stft(S = np.abs(librosa.stft(data)), sr = sample_rate).T, axis = 0)
    result = np.hstack((result, chroma_stft))
    
    # Spectral centroid
    spec_centroid = np.mean(librosa.feature.spectral_centroid(y = data, sr = sample_rate).T, axis = 0)
    result = np.hstack((result, spec_centroid))
    
    # Spectral rolloff
    rolloff = np.mean(librosa.feature.spectral_rolloff(y = data, sr = sample_rate).T, axis = 0)
    result = np.hstack((result, rolloff))
    
    return result

In [7]:
def noise(data):
    noise_amp = 0.03 * np.random.uniform() * np.amax(data)
    data = data + noise_amp * np.random.normal(size = data.shape[0])
    return data

def stretch(data, rate = 0.8):
    return librosa.effects.time_stretch(data, rate)

def pitch(data, sampling_rate, pitch_factor = 0.7):
    return librosa.effects.pitch_shift(data, sampling_rate, pitch_factor)

In [None]:
X, Y = [], []
for path, emotion in zip(data_path.Path, data_path.Emotions):
    # load data
    data, sample_rate = librosa.load(path, duration=3)
    
    # augmentation
    noise_data = noise(data)
    stretch_pitch_data = stretch(data)
    stretch_pitch_data = pitch(stretch_pitch_data, sample_rate)
    
    # original speech
    feature = extract_features(data, sample_rate)
    feature = np.array(feature)
    X.append(feature)
    Y.append(emotion)
    
    # noise speech
    feature_noise = extract_features(noise_data, sample_rate)
    feature_noise = np.array(feature_noise)
    X.append(feature_noise)
    Y.append(emotion)
    
    # stretch and pitch speech
    feature_stretch_pitch = extract_features(stretch_pitch_data, sample_rate)
    feature_stretch_pitch = np.array(feature_stretch_pitch)
    X.append(feature_stretch_pitch)
    Y.append(emotion)

In [None]:
# convert to df and save
Features = pd.DataFrame(X)
Features['labels'] = Y
Features.to_csv('features.csv', index=False)

In [8]:
# Load features from file
Features = pd.read_csv("/kaggle/working/features.csv")
# Features = pd.read_csv("../input/fe2way/features.csv")
X = Features.iloc[: ,:-1].values
Y = Features['labels'].values
len(X), len(Y), data_path.Path.shape

(12720, 12720, (4240,))

In [9]:
# One hot endcoding for Y.
encoder = OneHotEncoder()
Y = encoder.fit_transform(np.array(Y).reshape(-1,1)).toarray()

In [10]:
# splitting data
x_train, x_test, y_train, y_test = train_test_split(X, Y, random_state = 0, shuffle = True, test_size=0.2)
x_train.shape, y_train.shape, x_test.shape, y_test.shape

((10176, 162), (10176, 8), (2544, 162), (2544, 8))

In [11]:
import pickle

# scaling our data and save the scaler
scaler = StandardScaler()
x_train = scaler.fit_transform(x_train)

pickle.dump(scaler, open('scaler_two.pkl','wb'))
scaler = pickle.load(open('scaler_two.pkl','rb'))

x_test = scaler.transform(x_test)
x_train.shape, y_train.shape, x_test.shape, y_test.shape

((10176, 162), (10176, 8), (2544, 162), (2544, 8))

In [12]:
class Dataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        X = torch.tensor(self.X[idx]).type(torch.float)
        y = torch.tensor(self.y[idx]).type(torch.float)

        return X, y

In [14]:
#Convert X to tensor
X_train_2 = torch.from_numpy(x_train)
X_test_2 = torch.from_numpy(x_test)
print(X_train_2.shape)

torch.Size([10176, 162])


In [15]:
BATCH_SIZE = 64
train_data = Dataset(X_train_2, y_train)
test_data = Dataset(X_test_2, y_test)

train_dataloader = DataLoader(train_data, batch_size=BATCH_SIZE, num_workers=os.cpu_count(), shuffle=True)
test_dataloader = DataLoader(test_data, batch_size=BATCH_SIZE, num_workers=os.cpu_count())

In [16]:
class DNN(nn.Module):
    
    def __init__(self, ):
        super(DNN, self).__init__()
        
        # Block #1: 
        self.layer1 = nn.Sequential(
            nn.Linear(in_features=162, out_features=1024),
            nn.ReLU(),
            nn.Dropout(p=0.3)
        )
        
        # Block #2: 
        self.layer2 = nn.Sequential(
            nn.Linear(in_features=1024, out_features=512),
            nn.ReLU(),
            nn.Dropout(p=0.3)
        )
        
        # Block #3: 
        self.layer3 = nn.Sequential(
            nn.Linear(in_features=512, out_features=256),
            nn.ReLU(),
            nn.Dropout(p=0.3)
        )
        
        # Block #4: 
        self.layer4 = nn.Sequential(
            nn.Linear(in_features=256, out_features=128),
            nn.ReLU(),
            nn.Dropout(p=0.3)
        )
        
        # Block #5: 
        self.layer5 = nn.Sequential(
            nn.Linear(in_features=128, out_features=64),
            nn.ReLU(),
            nn.Dropout(p=0.2)
        )
        
        # Block #6: 
        self.layer6 = nn.Sequential(
            nn.Linear(in_features=64, out_features=32),
            nn.ReLU(),
            nn.Dropout(p=0.2)
        )

        # FC 8 → softmax
        self.fc = nn.Linear(in_features=32, out_features=8)
        self.softmax = nn.Softmax(dim=1)
        
    def forward(self, x):
        
        # Channel x H = 1 x 162
#         x = x.view(-1,80)
        out = self.layer1(x)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = self.layer5(out)
        out = self.layer6(out)
        
        out = self.fc(out)
        out = self.softmax(out)
        
        return out

In [19]:
# model = DNN()
# print(model)
# model = CNN()
# print(model)
!pip install torch-summary
from torchsummary import summary

model = DNN()
summary(model, (64, 162))

Layer (type:depth-idx)                   Output Shape              Param #
├─Sequential: 1-1                        [-1, 64, 1024]            --
|    └─Linear: 2-1                       [-1, 64, 1024]            166,912
|    └─ReLU: 2-2                         [-1, 64, 1024]            --
|    └─Dropout: 2-3                      [-1, 64, 1024]            --
├─Sequential: 1-2                        [-1, 64, 512]             --
|    └─Linear: 2-4                       [-1, 64, 512]             524,800
|    └─ReLU: 2-5                         [-1, 64, 512]             --
|    └─Dropout: 2-6                      [-1, 64, 512]             --
├─Sequential: 1-3                        [-1, 64, 256]             --
|    └─Linear: 2-7                       [-1, 64, 256]             131,328
|    └─ReLU: 2-8                         [-1, 64, 256]             --
|    └─Dropout: 2-9                      [-1, 64, 256]             --
├─Sequential: 1-4                        [-1, 64, 128]             --


Layer (type:depth-idx)                   Output Shape              Param #
├─Sequential: 1-1                        [-1, 64, 1024]            --
|    └─Linear: 2-1                       [-1, 64, 1024]            166,912
|    └─ReLU: 2-2                         [-1, 64, 1024]            --
|    └─Dropout: 2-3                      [-1, 64, 1024]            --
├─Sequential: 1-2                        [-1, 64, 512]             --
|    └─Linear: 2-4                       [-1, 64, 512]             524,800
|    └─ReLU: 2-5                         [-1, 64, 512]             --
|    └─Dropout: 2-6                      [-1, 64, 512]             --
├─Sequential: 1-3                        [-1, 64, 256]             --
|    └─Linear: 2-7                       [-1, 64, 256]             131,328
|    └─ReLU: 2-8                         [-1, 64, 256]             --
|    └─Dropout: 2-9                      [-1, 64, 256]             --
├─Sequential: 1-4                        [-1, 64, 128]             --


In [23]:
from sklearn.metrics import accuracy_score
class Trainer:
    def __init__(self, train_dataloader, test_dataloader,
                 model, loss_fn, optimizer, scheduler, logger, device='cpu'):
        self.model = model.to(device)
        self.train_dataloader = train_dataloader
        self.test_dataloader = test_dataloader
        self.logger = logger
        self.loss_fn = loss_fn
        self.optimizer = optimizer
        self.scheduler = scheduler
        self.device = device

    def train_epoch(self):
        # Train data
        n_samples = len(self.train_dataloader.dataset)
        train_loss = 0

        for batch_idx, (X, y) in enumerate(self.train_dataloader):
            X = X.to(self.device)
            y = y.to(self.device)
            # Forward
            pred = self.model(X)
            loss = self.loss_fn(pred, torch.argmax(y, dim=1))
            
            # Backward
            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()
            train_loss += loss
        self.scheduler.step()
        return train_loss / n_samples

    def test_epoch(self):
        # Test data
        n_samples = len(self.test_dataloader.dataset)
        test_loss = 0

        for batch_idx, (X,y) in enumerate(self.test_dataloader):
            X = X.to(self.device)
            y = y.to(self.device)
            with torch.no_grad():
                # Forward
                pred = self.model(X)
                loss = self.loss_fn(pred, torch.argmax(y, dim=1))

            test_loss += loss

        return test_loss / n_samples

    def evaluation(self, dataloader):
        y_true = []
        y_pred = []
        for X, y in dataloader:
            X = X.to(self.device)
            y_true.append(y.detach().cpu())
            y_pred.append(self.model(X).detach().cpu())
        
        y_true = torch.cat(y_true, dim=0)
        y_pred = torch.cat(y_pred, dim=0)

        true_labels = torch.argmax(y_true, dim=1)
        pred_labels = torch.argmax(y_pred, dim=1)
        accuracy = accuracy_score(true_labels.cpu(), pred_labels.cpu())

        return accuracy

    def train(self, epochs=10):
        for i in range(epochs):
            self.current_epoch = i+1
            # Training
            train_loss = self.train_epoch()
            test_loss = self.test_epoch()

            # Evaluation
            train_acc = self.evaluation(self.train_dataloader)
            test_acc = self.evaluation(self.test_dataloader)
            
            # Logging
            self.logger.add_scalar('Loss/train', train_loss.item(), i+1)
            self.logger.add_scalar('Loss/test', test_loss.item(), i+1)
            self.logger.add_scalar('Accuracy/train', train_acc.item(), i+1)
            self.logger.add_scalar('Accuracy/test', test_acc.item(), i+1)

            ## Log histogram
            for name, params in model.named_parameters():
                if 'weight' in name:
                    self.logger.add_histogram(name, params, i+1)

            # if ((i+1) % 10 == 0):
            print(f"Epoch {i+1}: Train Loss = {train_loss.item():.5f}, Test Loss = {test_loss.item():.5f}, "
            f"Train accuracy score = {train_acc.item():.5f}, Test accuracy score = {test_acc.item():.5f}")

In [24]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
LEARNING_RATE = 1e-3
LOG_DIR = "./logs/"
DEVICE = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu') 
print(DEVICE)

cuda


In [25]:
from datetime import datetime

model = DNN()
loss_fn = nn.CrossEntropyLoss() 
optimizer = opt.Adam(model.parameters(), lr=LEARNING_RATE)
scheduler = opt.lr_scheduler.StepLR(optimizer, step_size=20, gamma=1, last_epoch=- 1, verbose=False)
logger = SummaryWriter(os.path.join(LOG_DIR, datetime.now().strftime("%d_%m_%Y_%H_%M_%S"))) # Logger

# Trainer
trainer = Trainer(
    train_dataloader=train_dataloader,
    test_dataloader=test_dataloader,
    model=model,
    loss_fn=loss_fn,
    optimizer=optimizer,
    scheduler=scheduler,
    logger=logger,
    device=DEVICE
)

In [26]:
import warnings
warnings.filterwarnings('ignore')
trainer.train(epochs=20)

Epoch 1: Train Loss = 0.02919, Test Loss = 0.02671, Train accuracy score = 0.54953, Test accuracy score = 0.55936
Epoch 2: Train Loss = 0.02566, Test Loss = 0.02472, Train accuracy score = 0.70224, Test accuracy score = 0.69890
Epoch 3: Train Loss = 0.02452, Test Loss = 0.02457, Train accuracy score = 0.70666, Test accuracy score = 0.70322
Epoch 4: Train Loss = 0.02435, Test Loss = 0.02475, Train accuracy score = 0.71010, Test accuracy score = 0.69890
Epoch 5: Train Loss = 0.02419, Test Loss = 0.02411, Train accuracy score = 0.74636, Test accuracy score = 0.74057
Epoch 6: Train Loss = 0.02402, Test Loss = 0.02401, Train accuracy score = 0.75393, Test accuracy score = 0.75354
Epoch 7: Train Loss = 0.02412, Test Loss = 0.02468, Train accuracy score = 0.69684, Test accuracy score = 0.70283
Epoch 8: Train Loss = 0.02398, Test Loss = 0.02421, Train accuracy score = 0.74184, Test accuracy score = 0.73310
Epoch 9: Train Loss = 0.02411, Test Loss = 0.02399, Train accuracy score = 0.74941, Test

In [None]:
import pickle
pickle.dump(model, open('model_two.pkl', 'wb'))

In [None]:
import librosa
import pickle

Emo = ['neutral','calm','happy','sad','angry','fear','disgust','surprise']

def extract_features(data,sample_rate):
    result = np.array([])
    
    # MFCC
    mfcc = np.mean(librosa.feature.mfcc(y = data, sr = sample_rate).T, axis = 0)
    result = np.hstack((result, mfcc))
    
    # Log Mel-Spectrogram
    mel = np.mean(librosa.feature.melspectrogram(y = data, sr = sample_rate).T, axis = 0)
    result = np.hstack((result, mel)) 
    
    # Chroma
    chroma_stft = np.mean(librosa.feature.chroma_stft(S = np.abs(librosa.stft(data)), sr = sample_rate).T, axis = 0)
    result = np.hstack((result, chroma_stft))
    
    # Spectral centroid
    spec_centroid = np.mean(librosa.feature.spectral_centroid(y = data, sr = sample_rate).T, axis = 0)
    result = np.hstack((result, spec_centroid))
    
    # Spectral rolloff
    rolloff = np.mean(librosa.feature.spectral_rolloff(y = data, sr = sample_rate).T, axis = 0)
    result = np.hstack((result, rolloff))
    
    return result

def emotion_recognition(audio_file):
    trained_model = pickle.load(open('model_two.pkl', 'rb'))
    scaler = pickle.load(open('scaler_two.pkl','rb'))
    
    # load audio files with librosa
    data, sample_rate = librosa.load(audio_file)
    feat = extract_features(data,sample_rate)
    feat = np.array(feat)
    feat = feat[None,:]
    sc_feat = scaler.transform(feat)
    sc_feat = torch.from_numpy(sc_feat.astype('float32'))
    prediction = trained_model(sc_feat.cuda())
    pred = torch.argmax(prediction, dim=1)
    return Emo[pred]

In [None]:
audio_file = "/kaggle/input/ravdess-emotional-speech-audio/Actor_02/03-01-02-02-01-02-02.wav"
label = emotion_recognition(audio_file)
print(label)