### EMOVOICE: Real-time Speech Emotion Recognition Using Raw Audio Features and Deep Learning

### Importing Data

In [None]:
%pip install kagglehub torchaudio librosa numpy torch noisereduce evaluate transformers[torch]
# %pip install -U flash-attn --no-build-isolation

In [None]:
import kagglehub
import torchaudio as ta
import torchaudio.transforms as T
from torch.utils.data import Dataset, DataLoader
import os
import torch as t
from torch import nn
import numpy as np
import librosa
import noisereduce as nr
from typing import Optional, Callable

In [None]:
# Download latest version
path = kagglehub.dataset_download("uwrfkaggler/ravdess-emotional-speech-audio")
print("Path to dataset files:", path)

In [None]:
class SpeechDataset(Dataset):
    def __init__(self, root_dir, transform=None, segment_length=1000):
        self.root_dir = root_dir
        self.transform = transform
        self.segment_length = segment_length        
        
        self.filelist = []
        for root, dirs, files in os.walk(self.root_dir):
            for name in files:
                self.filelist.append(os.path.join(root, name))
        self.filelist = np.array(self.filelist)


    def __len__(self):
        return len(self.filelist)

    def __getitem__(self, idx):
        if t.is_tensor(idx):
            idx = idx.tolist()
        
        path = os.path.join(self.root_dir, self.filelist[idx])
        
        label = os.path.basename(path)
        mod, chan, emo, inten, state, repit, act = str(label).split("-")
        
        audio, rate = ta.load(path)
        emotion = t.tensor(int(emo))


        num_samples = int(self.segment_length * rate)
        if audio.shape[1] > num_samples:
            # Random crop for training
            start = np.random.randint(0, audio.shape[1] - num_samples)
            audio = audio[:, start:start+num_samples]
        elif audio.shape[1] < num_samples:
            # Pad with zeros if too short
            padding = num_samples - audio.shape[1]
            audio = t.nn.functional.pad(audio, (0, padding))

        if self.transform:
            audio = self.transform(audio)

        return {"input_values": audio, "label": emotion}

In [None]:
from transformers import AutoFeatureExtractor

class Pipeline(nn.Module):
    def __init__(self, input_rate, noise_reduce=True, normalize=True):
        super().__init__()
        self.noise_reduce = noise_reduce
        self.input_rate = input_rate
        self.feature_extractor = AutoFeatureExtractor.from_pretrained("facebook/wav2vec2-base", do_normalize=normalize)

        if self.input_rate != self.feature_extractor.sampling_rate:
            self.resampler = T.Resample(self.input_rate,  self.feature_extractor.sampling_rate)
        else:
            self.resampler = None

        
    def forward(self, waveform):
        if self.resampler:
            waveform = self.resampler(waveform)

        if waveform.shape[0] > 1:
            waveform = t.mean(waveform, dim=0, keepdim=True)
        
        if self.noise_reduce:
            waveform_np = waveform.numpy()[0]
            reduced_noise = nr.reduce_noise(
                y=waveform_np, 
                sr=self.input_rate,
                stationary=True
            )
            waveform = t.from_numpy(reduced_noise).unsqueeze(0)


        waveform = self.feature_extractor(
        waveform, sampling_rate=self.feature_extractor.sampling_rate, return_tensors="pt")
        
        return waveform 

In [None]:
Data = SpeechDataset(path, None)
_, rate = ta.load(Data.filelist[0])

pipe = Pipeline(rate)
Data.transform = pipe
trainset, testset = t.utils.data.random_split(Data, [0.85, 0.15])

### Training

In [None]:
trainloader = t.utils.data.DataLoader(trainset, batch_size=10, shuffle=True)
testloader = t.utils.data.DataLoader(testset, batch_size=10, shuffle=True)

In [None]:
from transformers import AutoModelForAudioClassification, TrainingArguments, Trainer, DataCollatorWithPadding
import evaluate

device = t.device("cuda" if t.cuda.is_available() else "cpu")

model = AutoModelForAudioClassification.from_pretrained("facebook/wav2vec2-base").to(device)

In [None]:
accuracy = evaluate.load("accuracy")
def compute_metrics(eval_pred):
    predictions = np.argmax(eval_pred.predictions, axis=1)
    return accuracy.compute(predictions=predictions, references=eval_pred.label_ids)


training_args = TrainingArguments(
    output_dir="model_data",
    eval_strategy="epoch",
    save_strategy="epoch",
    learning_rate=3e-5,
    per_device_train_batch_size=32,
    gradient_accumulation_steps=4,
    per_device_eval_batch_size=32,
    num_train_epochs=10,
    warmup_ratio=0.1,
    logging_steps=10,
    load_best_model_at_end=True,
    metric_for_best_model="accuracy",
    push_to_hub=False,
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=trainset,
    eval_dataset=testset,
    compute_metrics=compute_metrics,
)

trainer.train()

### Visualizing

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix
import pandas as pd

In [None]:
def visualize_samples(dataset, num_samples=5):
    """Visualize random audio samples and their spectrograms"""
    indices = np.random.choice(len(dataset), num_samples)
    emotion_map = {
        0: 'neutral', 1: 'calm', 2: 'happy', 3: 'sad',
        4: 'angry', 5: 'fearful', 6: 'disgust', 7: 'surprised'
    }
    
    plt.figure(figsize=(15, 3*num_samples))
    for i, idx in enumerate(indices):
        sample = dataset[idx]
        
        # Plot waveform
        plt.subplot(num_samples, 2, 2*i+1)
        plt.plot(sample['waveform'].squeeze().numpy())
        plt.title(f"Waveform - Emotion: {emotion_map[sample['emotion'].item()]}")
        plt.xlabel('Time')
        plt.ylabel('Amplitude')
        
        # Plot spectrogram
        plt.subplot(num_samples, 2, 2*i+2)
        plt.imshow(sample['spectrogram'].squeeze().numpy(), 
                  aspect='auto', origin='lower')
        plt.title("Mel Spectrogram")
        plt.colorbar()
        plt.xlabel('Time frames')
        plt.ylabel('Frequency bins')
    
    plt.tight_layout()
    plt.show()

visualize_samples(Data)

In [None]:
def plot_emotion_distribution(dataloader):
    """Plot histogram of emotion labels in the dataset"""
    all_emotions = []
    for batch in dataloader:
        all_emotions.extend(batch['emotion'].tolist())
    
    emotion_map = {
        0: 'neutral', 1: 'calm', 2: 'happy', 3: 'sad',
        4: 'angry', 5: 'fearful', 6: 'disgust', 7: 'surprised'
    }
    
    plt.figure(figsize=(10, 5))
    sns.countplot(x=all_emotions)
    plt.xticks(ticks=range(8), labels=[emotion_map[i] for i in range(8)], rotation=45)
    plt.title('Emotion Class Distribution')
    plt.xlabel('Emotion')
    plt.ylabel('Count')
    plt.show()

plot_emotion_distribution(Data)

In [None]:
def compare_augmentations(dataset, idx=0):
    """Compare original and augmented samples"""
    original = dataset[idx]
    augmented = dataset[idx]  # apply transform if defined
    
    plt.figure(figsize=(12, 6))
    
    # Original waveform
    plt.subplot(2, 2, 1)
    plt.plot(original['waveform'].squeeze().numpy())
    plt.title('Original Waveform')
    
    # Original spectrogram
    plt.subplot(2, 2, 2)
    plt.imshow(original['spectrogram'].squeeze().numpy(), 
              aspect='auto', origin='lower')
    plt.title('Original Spectrogram')
    
    # Augmented waveform
    plt.subplot(2, 2, 3)
    plt.plot(augmented['waveform'].squeeze().numpy())
    plt.title('Augmented Waveform')
    
    # Augmented spectrogram
    plt.subplot(2, 2, 4)
    plt.imshow(augmented['spectrogram'].squeeze().numpy(), 
              aspect='auto', origin='lower')
    plt.title('Augmented Spectrogram')
    
    plt.tight_layout()
    plt.show()

# after add transforms

# compare_augmentations(dataset)

In [None]:
def calculate_batch_stats(dataloader):
    """Calculate mean and std of waveforms and spectrograms"""
    waveform_sum = 0
    waveform_sq_sum = 0
    spectrogram_sum = 0
    spectrogram_sq_sum = 0
    total_samples = 0
    
    for batch in dataloader:
        waveforms = batch['waveform']
        spectrograms = batch['spectrogram']
        
        waveform_sum += waveforms.sum()
        waveform_sq_sum += (waveforms**2).sum()
        
        spectrogram_sum += spectrograms.sum()
        spectrogram_sq_sum += (spectrograms**2).sum()
        
        total_samples += waveforms.size(0)
    
    waveform_mean = waveform_sum / total_samples
    waveform_std = (waveform_sq_sum / total_samples - waveform_mean**2)**0.5
    
    spectrogram_mean = spectrogram_sum / total_samples
    spectrogram_std = (spectrogram_sq_sum / total_samples - spectrogram_mean**2)**0.5
    
    print(f"Waveform - Mean: {waveform_mean:.4f}, Std: {waveform_std:.4f}")
    print(f"Spectrogram - Mean: {spectrogram_mean:.4f}, Std: {spectrogram_std:.4f}")
    
    return waveform_mean, waveform_std, spectrogram_mean, spectrogram_std

wav_mean, wav_std, spec_mean, spec_std = calculate_batch_stats(Data)

In [None]:
# Save these values to use in your model
stats = {
    'waveform_mean': wav_mean,
    'waveform_std': wav_std,
    'spectrogram_mean': spec_mean,
    'spectrogram_std': spec_std
}

# You can save them to a file
# import pickle
# with open('dataset_stats.pkl', 'wb') as f:
#     pickle.dump(stats, f)

In [None]:
# sample = dataset[0]
# waveform = sample['waveform']
# print("Waveform stats:")
# print(f"Min: {waveform.min().item():.4f}")
# print(f"Max: {waveform.max().item():.4f}")
# print(f"Mean: {waveform.mean().item():.4f}")
# print(f"Std: {waveform.std().item():.4f}")