In [None]:
import pandas as pd
import os
import numpy as np 
import librosa
import matplotlib.pyplot as plt
import librosa.display
import IPython
from IPython.display import Audio
from IPython.display import Image

In [None]:
EMOTIONS = {1:'neutral', 2:'calm', 3:'happy', 4:'sad', 5:'angry', 6:'fear', 7:'disgust', 0:'surprise'} # surprise je promenjen sa 8 na 0
DATA_PATH = '..//Speech Emotion Recognition//Radvess'
SAMPLE_RATE = 48000

# data = pd.DataFrame(columns=['Emotion', 'Emotion intensity', 'Gender','Path'])
data_list = []
for dirname, _, filenames in os.walk(DATA_PATH):
    for filename in filenames:
        file_path = os.path.join(dirname, filename)
        identifiers = filename.split('.')[0].split('-')
        emotion = (int(identifiers[2]))
        if emotion == 8: # promeni surprise sa 8 na 0
            emotion = 0
        if int(identifiers[3]) == 1:
            emotion_intensity = 'normal' 
        else:
            emotion_intensity = 'strong'
        if int(identifiers[6])%2 == 0:
            gender = 'female'
        else:
            gender = 'male'
        
        data_list.append({"Emotion": emotion,
                            "Emotion intensity": emotion_intensity,
                            "Gender": gender,
                            "Path": file_path
                             }
                          )

In [None]:
data = pd.DataFrame(data_list)
data.head()

In [None]:
import soundfile as sf

mel_spectrograms = []
signals = []

for i, file_path in enumerate(data.Path):
    # Use soundfile to read audio data
    audio, sample_rate = sf.read(file_path, dtype='float32')

    # Convert stereo to mono by averaging channels
    if audio.ndim > 1:
        audio = np.mean(audio, axis=1)

    # Limit duration and offset using array slicing
    duration = 3 * sample_rate
    offset = int(0.5 * sample_rate)
    audio = audio[offset:offset + duration]

    # Pad or truncate audio to ensure consistent length
    signal = np.zeros((3 * sample_rate,), dtype='float32')
    signal[:len(audio)] = audio

    signals.append(signal)
    print("\rProcessed {}/{} files".format(i + 1, len(data)), end='')

signals = np.stack(signals, axis=0)


In [None]:
import librosa
from sklearn.model_selection import train_test_split

X_train, X_val, X_test = [], [], []
Y_train, Y_val, Y_test = [], [], []

for emotion in range(len(EMOTIONS)):
    emotion_data = data[data['Emotion'] == emotion]
    X, Y = [], []

    for file_path in emotion_data['Path']:
        try:
            audio, sample_rate = librosa.load(file_path, sr=None)
            # Process audio data if needed (e.g., extracting features, resizing)
            # For example, to resize audio to a fixed length:
            audio = librosa.util.fix_length(audio, 3 * sample_rate)  # Adjust to desired length
            X.append(audio)
            Y.append(emotion)
        except Exception as e:
            print(f"Error processing file {file_path}: {e}")

    # Split data for the current emotion into train, validation, and test sets (80-10-10 split)
    X_train_temp, X_test_temp, Y_train_temp, Y_test_temp = train_test_split(X, Y, test_size=0.2, random_state=42, stratify=Y)
    X_val_temp, X_test_temp, Y_val_temp, Y_test_temp = train_test_split(X_test_temp, Y_test_temp, test_size=0.5, random_state=42, stratify=Y_test_temp)

    # Append the splits to respective lists
    X_train.extend(X_train_temp)
    Y_train.extend(Y_train_temp)
    X_val.extend(X_val_temp)
    Y_val.extend(Y_val_temp)
    X_test.extend(X_test_temp)
    Y_test.extend(Y_test_temp)

# Convert lists to numpy arrays
X_train = np.array(X_train)
Y_train = np.array(Y_train)
X_val = np.array(X_val)
Y_val = np.array(Y_val)
X_test = np.array(X_test)
Y_test = np.array(Y_test)

# Display shapes of train, validation, and test sets
print(f'X_train:{X_train.shape}, Y_train:{Y_train.shape}')
print(f'X_val:{X_val.shape}, Y_val:{Y_val.shape}')
print(f'X_test:{X_test.shape}, Y_test:{Y_test.shape}')


In [None]:
def addAWGN(signal, num_bits=16, augmented_num=2, snr_low=15, snr_high=30): 
    signal_len = len(signal)
    
    # Generate White Gaussian noise
    noise = np.random.normal(size=(augmented_num, signal_len))
    
    # Normalize signal and noise
    norm_constant = 2.0 ** (num_bits - 1)
    signal_norm = signal / norm_constant
    noise_norm = noise / norm_constant
    
    # Compute signal and noise power
    s_power = np.sum(signal_norm ** 2) / signal_len
    n_power = np.sum(noise_norm ** 2, axis=1) / signal_len
    
    # Random SNR: Uniform [15, 30] in dB
    target_snr = np.random.randint(snr_low, snr_high)
    
    # Compute K (scaling factor) for each noise instance
    scaling_factors = np.sqrt((s_power / n_power) * 10 ** (-target_snr / 10))
    scaled_noises = noise_norm * scaling_factors[:, np.newaxis]
    
    # Generate noisy signals by adding scaled noise to the original signal
    noisy_signals = np.array([signal + scaled_noise * norm_constant for scaled_noise in scaled_noises])
    
    return noisy_signals

In [None]:
aug_signals = []
aug_labels = []
data_to_concat = []  # List to store rows to be concatenated

for i in range(X_train.shape[0]):
    signal = X_train[i, :]
    augmented_signals = addAWGN(signal)
    aug_labels.extend([Y_train[i]] * augmented_signals.shape[0])  # Extend labels for augmented signals
    aug_signals.extend(augmented_signals)  # Extend augmented signals
    data_to_concat.extend([data.iloc[i]] * augmented_signals.shape[0])  # Extend corresponding data rows

aug_signals = np.stack(aug_signals, axis=0)
X_train_augmented = np.concatenate([X_train, aug_signals], axis=0)

Y_train_augmented = np.array(aug_labels)  # Convert to numpy array

# Concatenate the list of rows to create a new DataFrame
data_augmented = pd.concat(data_to_concat, ignore_index=True)

print('')
print(f'X_train_augmented: {X_train_augmented.shape}, Y_train_augmented: {Y_train_augmented.shape}')

In [None]:
def getMELspectrogram(audio, sample_rate):
    mel_spec = librosa.feature.melspectrogram(y=audio,
                                              sr=sample_rate,
                                              n_fft=1024,
                                              win_length = 512,
                                              window='hamming',
                                              hop_length = 256,
                                              n_mels=128,
                                              fmax=sample_rate/2
                                             )
    mel_spec_db = librosa.power_to_db(mel_spec, ref=np.max)
    return mel_spec_db

# test function
audio, sample_rate = librosa.load(data.loc[0,'Path'], duration=3, offset=0.5,sr=SAMPLE_RATE)
signal = np.zeros((int(SAMPLE_RATE*3,)))
signal[:len(audio)] = audio
mel_spectrogram = getMELspectrogram(signal, SAMPLE_RATE)
librosa.display.specshow(mel_spectrogram, y_axis='mel', x_axis='time')
print('MEL spectrogram shape: ',mel_spectrogram.shape)

In [None]:
def getMFCCs(audio, sample_rate, num_mfcc=13):
    mfccs = librosa.feature.mfcc(y=audio,
                                  sr=sample_rate,
                                  n_mfcc=num_mfcc)
    return mfccs

# Test function
audio, sample_rate = librosa.load(data.loc[0, 'Path'], duration=3, offset=0.5, sr=SAMPLE_RATE)
signal = np.zeros((int(SAMPLE_RATE * 3,)))
signal[:len(audio)] = audio
mfccs = getMFCCs(signal, SAMPLE_RATE)

# Display the MFCCs
plt.figure(figsize=(10, 4))
librosa.display.specshow(mfccs, x_axis='time', sr=SAMPLE_RATE)
plt.colorbar()
plt.title('MFCCs')
plt.xlabel('Time')
plt.ylabel('MFCC Coefficients')
plt.tight_layout()
plt.show()

print('MFCCs shape:', mfccs.shape)


In [None]:
from joblib import Parallel, delayed

def calculate_mel_spectrogram(signal):
    return getMELspectrogram(signal, sample_rate=SAMPLE_RATE)

# Calculate Mel spectrograms for train set
print("Calculating mel spectrograms for train set")
mel_train = Parallel(n_jobs=-1)(delayed(calculate_mel_spectrogram)(signal) for signal in X_train)
X_train = np.stack(mel_train, axis=0)
del mel_train  # Clear memory

# Calculate Mel spectrograms for validation set
print("Calculating mel spectrograms for validation set")
mel_val = Parallel(n_jobs=-1)(delayed(calculate_mel_spectrogram)(signal) for signal in X_val)
X_val = np.stack(mel_val, axis=0)
del mel_val  # Clear memory

# Calculate Mel spectrograms for test set
print("Calculating mel spectrograms for test set")
mel_test = Parallel(n_jobs=-1)(delayed(calculate_mel_spectrogram)(signal) for signal in X_test)
X_test = np.stack(mel_test, axis=0)
del mel_test  # Clear memory

print(f'X_train:{X_train.shape}, Y_train:{Y_train.shape}')
print(f'X_val:{X_val.shape}, Y_val:{Y_val.shape}')
print(f'X_test:{X_test.shape}, Y_test:{Y_test.shape}')


In [None]:
import torch
import torch.nn as nn

class ParallelModel(nn.Module):
    def __init__(self,num_emotions):
        super().__init__()
        # conv block
        self.conv2Dblock = nn.Sequential(
            # 1. conv block
            nn.Conv2d(in_channels=1,
                       out_channels=16,
                       kernel_size=3,
                       stride=1,
                       padding=1
                      ),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Dropout(p=0.3),
            # 2. conv block
            nn.Conv2d(in_channels=16,
                       out_channels=32,
                       kernel_size=3,
                       stride=1,
                       padding=1
                      ),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=4, stride=4),
            nn.Dropout(p=0.3),
            # 3. conv block
            nn.Conv2d(in_channels=32,
                       out_channels=64,
                       kernel_size=3,
                       stride=1,
                       padding=1
                      ),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=4, stride=4),
            nn.Dropout(p=0.3),
            # 4. conv block
            nn.Conv2d(in_channels=64,
                       out_channels=64,
                       kernel_size=3,
                       stride=1,
                       padding=1
                      ),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=4, stride=4),
            nn.Dropout(p=0.3)
        )
        # Transformer block
        self.transf_maxpool = nn.MaxPool2d(kernel_size=[2,4], stride=[2,4])
        transf_layer = nn.TransformerEncoderLayer(d_model=64, nhead=4, dim_feedforward=512, dropout=0.4, activation='relu')
        self.transf_encoder = nn.TransformerEncoder(transf_layer, num_layers=4)
        # Linear softmax layer
        self.out_linear = nn.Linear(320,num_emotions)
        self.dropout_linear = nn.Dropout(p=0)
        self.out_softmax = nn.Softmax(dim=1)
    def forward(self,x):
        # conv embedding
        conv_embedding = self.conv2Dblock(x) #(b,channel,freq,time)
        conv_embedding = torch.flatten(conv_embedding, start_dim=1) # do not flatten batch dimension
        # transformer embedding
        x_reduced = self.transf_maxpool(x)
        x_reduced = torch.squeeze(x_reduced,1)
        x_reduced = x_reduced.permute(2,0,1) # requires shape = (time,batch,embedding)
        transf_out = self.transf_encoder(x_reduced)
        transf_embedding = torch.mean(transf_out, dim=0)
        # concatenate
        complete_embedding = torch.cat([conv_embedding, transf_embedding], dim=1) 
        # final Linear
        output_logits = self.out_linear(complete_embedding)
        output_logits = self.dropout_linear(output_logits)
        output_softmax = self.out_softmax(output_logits)
        return output_logits, output_softmax

In [None]:
def loss_fnc(predictions, targets):
    return nn.CrossEntropyLoss()(input=predictions,target=targets)

In [None]:
import torch

class Trainer:
    def __init__(self, model, loss_fn, optimizer, device, epochs, dataset_size, batch_size):
        self.model = model
        self.loss_fn = loss_fn
        self.optimizer = optimizer
        self.device = device
        self.epochs = epochs
        self.dataset_size = dataset_size
        self.batch_size = batch_size

    def train_step(self, X, Y):
        self.model.train()
        self.optimizer.zero_grad()

        output_logits, output_softmax = self.model(X)
        predictions = torch.argmax(output_softmax, dim=1)
        accuracy = torch.sum(Y == predictions).item() / len(Y)

        loss = self.loss_fn(output_logits, Y)
        loss.backward()
        self.optimizer.step()

        return loss.item(), accuracy * 100

    def validate(self, X, Y):
        with torch.no_grad():
            self.model.eval()

            output_logits, output_softmax = self.model(X)
            predictions = torch.argmax(output_softmax, dim=1)
            accuracy = torch.sum(Y == predictions).item() / len(Y)

            loss = self.loss_fn(output_logits, Y)

        return loss.item(), accuracy * 100, predictions

In [None]:
from sklearn.preprocessing import StandardScaler

X_train = np.expand_dims(X_train,1)
X_val = np.expand_dims(X_val,1)
X_test = np.expand_dims(X_test,1)

scaler = StandardScaler()

b,c,h,w = X_train.shape
X_train = np.reshape(X_train, newshape=(b,-1))
X_train = scaler.fit_transform(X_train)
X_train = np.reshape(X_train, newshape=(b,c,h,w))

b,c,h,w = X_test.shape
X_test = np.reshape(X_test, newshape=(b,-1))
X_test = scaler.transform(X_test)
X_test = np.reshape(X_test, newshape=(b,c,h,w))

b,c,h,w = X_val.shape
X_val = np.reshape(X_val, newshape=(b,-1))
X_val = scaler.transform(X_val)
X_val = np.reshape(X_val, newshape=(b,c,h,w))

In [None]:
EPOCHS=1500
DATASET_SIZE = X_train.shape[0]
BATCH_SIZE = 32
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Selected device is {}'.format(device))
model = ParallelModel(num_emotions=len(EMOTIONS)).to(device)
print('Number of trainable params: ',sum(p.numel() for p in model.parameters()) )
OPTIMIZER = torch.optim.SGD(model.parameters(),lr=0.01, weight_decay=1e-3, momentum=0.8)

# Create an instance of the Trainer class
trainer = Trainer(model=model,
                  loss_fn=loss_fnc,
                  optimizer=OPTIMIZER,
                  device=device,
                  epochs=EPOCHS,
                  dataset_size=DATASET_SIZE,
                  batch_size=BATCH_SIZE)

losses = []
val_losses = []
for epoch in range(EPOCHS):
    # Shuffle data indices
    ind = np.random.permutation(DATASET_SIZE)
    X_train = X_train[ind]
    Y_train = Y_train[ind]
    
    epoch_acc = 0
    epoch_loss = 0
    iters = int(DATASET_SIZE / BATCH_SIZE)
    
    for i in range(iters):
        batch_start = i * BATCH_SIZE
        batch_end = min(batch_start + BATCH_SIZE, DATASET_SIZE)
        actual_batch_size = batch_end - batch_start
        X = X_train[batch_start:batch_end]
        Y = Y_train[batch_start:batch_end]
        X_tensor = torch.tensor(X, device=device).float()
        Y_tensor = torch.tensor(Y, dtype=torch.long, device=device)
        loss, acc = trainer.train_step(X_tensor, Y_tensor)
        
        epoch_acc += acc * actual_batch_size / DATASET_SIZE
        epoch_loss += loss * actual_batch_size / DATASET_SIZE
        print(f"\r Epoch {epoch}: iteration {i}/{iters}", end='')
    
    X_val_tensor = torch.tensor(X_val, device=device).float()
    Y_val_tensor = torch.tensor(Y_val, dtype=torch.long, device=device)
    val_loss, val_acc, predictions = trainer.validate(X_val_tensor, Y_val_tensor)
    
    losses.append(epoch_loss)
    val_losses.append(val_loss)
    
    print('')
    print(f"Epoch {epoch} --> loss:{epoch_loss:.4f}, acc:{epoch_acc:.2f}%, val_loss:{val_loss:.4f}, val_acc:{val_acc:.2f}%")

epochs_range = range(1, EPOCHS + 1)

# Plotting losses
plt.figure(figsize=(10, 5))
plt.plot(epochs_range, losses, label='Training Loss')
plt.plot(epochs_range, val_losses, label='Validation Loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

In [None]:
torch.save(model.state_dict(), 'trained_model_state.pth')
model.load_state_dict(torch.load('trained_model_state.pth'))  # Load the saved state dictionary

In [None]:
X_test_tensor = torch.tensor(X_test,device=device).float()
Y_test_tensor = torch.tensor(Y_test,dtype=torch.long,device=device)
test_loss, test_acc, predictions = trainer.validate(X_test_tensor,Y_test_tensor)
print(f'Test loss is {test_loss:.3f}')
print(f'Test accuracy is {test_acc:.2f}%')

In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sn

predicted_emotions = predictions.cpu().numpy()
emotions_groundtruth = Y_test

# build confusion matrix and normalized confusion matrix
conf_matrix = confusion_matrix(emotions_groundtruth, predicted_emotions)
conf_matrix_norm = confusion_matrix(emotions_groundtruth, predicted_emotions,normalize='true')

emotion_names = [emotion for emotion in EMOTIONS.values()]

confmatrix_df = pd.DataFrame(conf_matrix, index=emotion_names, columns=emotion_names)
confmatrix_df_norm = pd.DataFrame(conf_matrix_norm, index=emotion_names, columns=emotion_names)

# plot confusion matrices
plt.figure(figsize=(16,6))
sn.set(font_scale=1.8) 
plt.subplot(1,2,1)
plt.title('Confusion Matrix')
sn.heatmap(confmatrix_df, annot=True, annot_kws={"size": 18}) 
plt.subplot(1,2,2)

In [None]:
correct_strong = 0
correct_normal = 0
wrong_strong = 0
wrong_normal = 0
for i in range(len(X_test)):
    intensity = data.loc[i,'Emotion intensity']
    if Y_test[i] == predictions[i]: 
        if  intensity == 'normal':
            correct_normal += 1
        else:
            correct_strong += 1
    else: 
        if intensity == 'normal':
            wrong_normal += 1
        else:
            wrong_strong += 1
array = np.array([[wrong_normal,wrong_strong],[correct_normal,correct_strong]])
df = pd.DataFrame(array,['wrong','correct'],['normal','strong'])
sn.set(font_scale=1.4) 
sn.heatmap(df, annot=True, annot_kws={"size": 16}) 
plt.show()

In [None]:
correct_female = 0
correct_male = 0
wrong_female = 0
wrong_male = 0

for i in range(len(X_test)):
    gender = data.loc[i, 'Gender']
    
    if Y_test[i] == predictions[i]:  
        if gender == 'female':
            correct_female += 1
        else:
            correct_male += 1
    else:  
        if gender == 'female':
            wrong_female += 1
        else:
            wrong_male += 1

# Create a 2x2 array with the counts
array = np.array([[wrong_female, wrong_male], [correct_female, correct_male]])

# Create a DataFrame for plotting
df = pd.DataFrame(array, ['wrong', 'correct'], ['female', 'male'])

# Plot the correlation matrix as a heatmap
plt.figure(figsize=(8, 6))
sn.set(font_scale=1.4)  
sn.heatmap(df, annot=True, annot_kws={"size": 16})  
plt.xlabel('Gender')
plt.ylabel('Correctness')
plt.title('Correlation Matrix: Correctness vs Gender')
plt.show()
