# Speech Emotion Recognition

## Installing Dependencies

In [None]:
%pip install -r requirements.txt

## Importing Libraries

In [2]:
import os
import gc
import math
import torch
import functools
import numpy as np
import pandas as pd
import torch.nn as nn
import seaborn as sns
import librosa.display
import pytorch_lightning as pl
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader
from torchmetrics import Accuracy, F1Score
from module.data.thaiser import InitialData
from sklearn.preprocessing import LabelEncoder
from pytorch_lightning import LightningDataModule
from sklearn.model_selection import train_test_split
from torch.optim.lr_scheduler import ReduceLROnPlateau
from pytorch_lightning.loggers import TensorBoardLogger
from module.data.dataset import Wav2VecDataset, wav2vec2_collate_fn
from sklearn.metrics import confusion_matrix, classification_report
from pytorch_lightning.callbacks import (
    ModelCheckpoint, 
    EarlyStopping,
    BatchSizeFinder,
    LearningRateMonitor
)
from transformers import (
    Wav2Vec2Processor, 
    Wav2Vec2Config,
    Wav2Vec2Model
)

## Health Check

In [None]:
# Device Configuration
def list_pytorch_devices():
    devices_info = []
    # Check CUDA (NVIDIA GPUs)
    if torch.cuda.is_available():
        for i in range(torch.cuda.device_count()):
            device_info = {
                "type": "CUDA",
                "index": i,
                "name": torch.cuda.get_device_name(i)
            }
            devices_info.append(device_info)
    
    # Check MPS (Apple Silicon GPUs)
    if torch.backends.mps.is_available() and torch.backends.mps.is_built():
        device_info = {
            "type": "MPS",
            "index": 0,
            "name": "Apple Silicon GPU"
        }
        devices_info.append(device_info)
    
    # Default CPU device
    devices_info.append({
        "type": "CPU",
        "index": 0,
        "name": "CPU"
    })

    # Print device list
    print("Available Devices:")
    for device in devices_info:
        print(f"  ✅ {device['type']} | Index: {device['index']} | Name: {device['name']}")

    # Return the "best" device 
    # Priority: CUDA -> MPS -> CPU
    if torch.cuda.is_available():
        return "gpu", 1
    elif torch.backends.mps.is_available() and torch.backends.mps.is_built():
        return "mps", 1
    else:
        return "cpu", 1

accelerator, device = list_pytorch_devices()
print(f"Selected Accelerator: {accelerator}, Devices: {device}")

### Configuration

In [4]:
model_name = "facebook/wav2vec2-base-960h"

In [5]:
save_features_mfcc = True
experiment_dir = './logs/'
os.makedirs(experiment_dir, exist_ok=True)

In [None]:
# Clear garbage collector
gc.collect()

## Data Wrangling

In [None]:
InitialData(
    test_fold=0,
    include_zoom=True,
    download_dir="./dataset/",
).extract()

In [None]:
thaiser_df = pd.read_csv('./dataset/labels.csv').sample(frac=1, random_state=42).reset_index(drop=True)
thaiser_df.tail()

In [None]:
thaiser_df.info()

## Data Exploration

In [None]:
thaiser_df["Emotion"].value_counts()

In [None]:
emotion_counts = thaiser_df["Emotion"].value_counts()

plt.figure(figsize=(5, 5))
plt.pie(
    emotion_counts, 
    labels=emotion_counts.index, 
    autopct='%1.1f%%', 
    startangle=140
)

plt.title("Distribution of Emotions", fontweight='bold')
plt.show()

In [None]:
unique_folds = thaiser_df['FoldID'].unique()
num_folds = len(unique_folds)

# Determine grid size
rows = math.ceil(num_folds / 3) 
cols = 4

plt.figure(figsize=(18, rows * 4))

for idx, i in enumerate(unique_folds):
    df_value_counter = thaiser_df[thaiser_df['FoldID'] == i]['Emotion'].value_counts()
    
    plt.subplot(rows, cols, idx + 1) 
    plt.pie(
        df_value_counter, 
        labels=df_value_counter.index, 
        autopct='%1.1f%%', 
        startangle=140
    )
    plt.title(f"Distribution of Emotions in Fold {i}", fontweight='bold')

plt.tight_layout()
plt.show()

In [None]:
sample = thaiser_df.groupby('Emotion').sample(1, random_state=42).reset_index(drop=True)
sample_paths = sample['Path'].tolist()

plt.figure(figsize=(14, 10))
for i in range(len(sample_paths)):
    plt.subplot(4, 2, i+1)
    # Load the audio file and set the sampling rate to 44100
    data, sr = librosa.load(sample_paths[i], sr = None)
    # Plot the waveform
    librosa.display.waveshow(data, sr=sr)
    # Add a title
    plt.title(f"Waveplot Emotion: {sample['Emotion'].values[i]}", fontweight='bold')
    # Add labels to the x and y axes
    plt.ylabel('Amplitude')
    plt.xlabel('Time (seconds)')
    # Adjust the layout so there are no overlapping titles
plt.tight_layout()

In [None]:
plt.figure(figsize=(14, 10))

for i in range(len(sample_paths)):
    y, sr = librosa.load(sample_paths[i], sr=None)
    S = librosa.stft(y)
    S_db = librosa.amplitude_to_db(np.abs(S), ref=np.max)
    
    plt.subplot(3, 2, i + 1) 
    librosa.display.specshow(S_db, sr=sr, x_axis='time', y_axis='log', cmap='magma')
    plt.colorbar(format='%+2.0f dB')
    plt.title(sample['Emotion'].values[i], fontweight='bold')

plt.tight_layout()
plt.show()

## Data Preprocessing

### Label Encoding

In [15]:
encoder = LabelEncoder()
thaiser_df["EmotionEncoded"] = encoder.fit_transform(thaiser_df["Emotion"])

In [None]:
print("Classes:")
for i, label in enumerate(encoder.classes_):
    print(f"  {i}: {label}")

### Data Splitting

In [17]:
train_df, valid_df = train_test_split(thaiser_df, test_size=0.2, random_state=42, shuffle=True)
valid_df, test_df = train_test_split(valid_df, test_size=0.1, random_state=42, shuffle=True)

In [None]:
train_value_counter = train_df['Emotion'].value_counts()
valid_value_counter = valid_df['Emotion'].value_counts()

plt.figure(figsize=(10, 5))
plt.subplot(1, 2, 1)
plt.pie(
    train_value_counter,
    labels=train_value_counter.index,
    autopct='%1.1f%%',
    startangle=140
)
plt.title("Distribution of Emotions in Train Data", fontweight='bold')

plt.subplot(1, 2, 2)
plt.pie(
    valid_value_counter,
    labels=valid_value_counter.index,
    autopct='%1.1f%%',
    startangle=140
)
plt.title("Distribution of Emotions in Valid Data", fontweight='bold')

plt.tight_layout()
plt.show()

### Data Loader

In [19]:
class SpeechEmotionDataModule(LightningDataModule):
    def __init__(self, 
                 train_df, 
                 val_df, 
                 test_df, 
                 processor, 
                 batch_size=16, 
                 num_workers=0,
                 persistent_workers=False,
                 collate_fn=None
                 ):
        super().__init__()
        self.train_df = train_df
        self.val_df = val_df
        self.test_df = test_df
        self.processor = processor
        self.batch_size = batch_size
        self.num_workers = num_workers
        self.persistent_workers = persistent_workers
        self.collate_fn = collate_fn

    def setup(self, stage=None):
        self.train_dataset = Wav2VecDataset(
            self.train_df["Path"].tolist(),
            self.train_df["EmotionEncoded"].tolist(),
            processor=self.processor
        )
        self.val_dataset = Wav2VecDataset(
            self.val_df["Path"].tolist(),
            self.val_df["EmotionEncoded"].tolist(),
            processor=self.processor
        )
        self.test_dataset = Wav2VecDataset(
            self.test_df["Path"].tolist(),
            self.test_df["EmotionEncoded"].tolist(),
            processor=self.processor
        )

    def train_dataloader(self):
        return DataLoader(
            self.train_dataset, 
            batch_size=self.batch_size, 
            shuffle=True, 
            num_workers=self.num_workers,
            persistent_workers=self.persistent_workers,
            collate_fn=self.collate_fn
        )

    def val_dataloader(self):
        return DataLoader(
            self.val_dataset, 
            batch_size=self.batch_size//2, 
            shuffle=False, 
            num_workers=self.num_workers,
            persistent_workers=self.persistent_workers,
            collate_fn=self.collate_fn,
        )

    def test_dataloader(self):
        return DataLoader(
            self.test_dataset, 
            batch_size=self.batch_size//2, 
            shuffle=False, 
            num_workers=self.num_workers,
            persistent_workers=self.persistent_workers,
            collate_fn=self.collate_fn,
        )

In [20]:
# Initialize processor
processor = Wav2Vec2Processor.from_pretrained(model_name)
collate_fn = functools.partial(wav2vec2_collate_fn, processor=processor)

# Initialize DataModule
data_module = SpeechEmotionDataModule(
    train_df=train_df,
    val_df=valid_df,
    test_df=test_df,
    processor=processor,
    batch_size=16,
    num_workers=2,
    persistent_workers=True,
    collate_fn=collate_fn
)

# Setup DataModule
data_module.setup()

## Modelling

### Classifier Head

In [23]:
class ClassificationHead(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.config = config
        self.classifier = nn.Sequential(
            nn.Dropout(0.1),
            nn.Linear(self.config.hidden_size, self.config.hidden_size),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(self.config.hidden_size, self.config.num_labels)
        )

    def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
        hidden_states = hidden_states.contiguous()
        pooled = hidden_states.mean(dim=1)
        logits = self.classifier(pooled)
        return logits

### Model Definition

In [24]:
class Wav2Vec2SERModule(pl.LightningModule):
    def __init__(self, 
                 model_name: str,
                 config,
                 layers_to_unfreeze: list,
                 lr_base: float = 1e-5,
                 lr_head: float = 1e-4
                 ):
        
        super().__init__()
        self.save_hyperparameters()
        self.config = config
        self.layers_to_unfreeze = layers_to_unfreeze
        
        # Base Wav2Vec2 model
        self.wav2vec2 = Wav2Vec2Model.from_pretrained(
            model_name,
            config=config
        )
        
        # Classifier head
        self.classifier = ClassificationHead(config)
        
        # Loss function
        self.loss_fn = nn.CrossEntropyLoss()
        
        # Hyperparameters
        self.lr_base = lr_base
        self.lr_head = lr_head
        self.num_classes = config.num_labels
        
        # Metrics
        self.train_accuracy = Accuracy(task="multiclass", num_classes=self.num_classes)
        self.val_accuracy = Accuracy(task="multiclass", num_classes=self.num_classes)
        self.test_accuracy = Accuracy(task="multiclass", num_classes=self.num_classes)
        
        # Freeze/Unfreeze layers
        self.freeze_wav2vec2_layers() if not self.layers_to_unfreeze else None

    def freeze_wav2vec2_layers(self):
        # Unfreeze specific layers
        for name, param in self.wav2vec2.named_parameters():
            if name in self.layers_to_unfreeze:
                param.requires_grad = True
        
    def forward(self, input_values, attention_mask=None):
        # input_values shape = [batch_size, seq_len]
        outputs = self.wav2vec2(
            input_values=input_values, 
            attention_mask=attention_mask
        )
        hidden_states = outputs.last_hidden_state 
        logits = self.classifier(hidden_states)
        
        return logits

    def training_step(self, batch, batch_idx):
        logits = self.forward(batch["input_values"], batch["attention_mask"])
        labels = batch["labels"]
        loss = self.loss_fn(logits, labels)
        
        preds = torch.argmax(logits, dim=-1)
        acc = self.train_accuracy(preds, labels)
        
        self.log("train_loss", loss, on_step=True, on_epoch=True, prog_bar=True)
        self.log("train_accuracy", acc, on_step=True, on_epoch=True, prog_bar=True)
        # Log model weights
        for name, param in self.named_parameters():
            self.logger.experiment.add_histogram(name, param, global_step=self.global_step)

        return loss

    def validation_step(self, batch, batch_idx):
        logits = self.forward(batch["input_values"], batch["attention_mask"])
        labels = batch["labels"]
        loss = self.loss_fn(logits, labels)
        
        preds = torch.argmax(logits, dim=-1)
        acc = self.val_accuracy(preds, labels)
        
        self.log("val_loss", loss, on_step=False, on_epoch=True, prog_bar=True)
        self.log("val_accuracy", acc, on_step=False, on_epoch=True, prog_bar=True)

        return loss

    def test_step(self, batch, batch_idx):
        logits = self.forward(batch["input_values"], batch["attention_mask"])
        labels = batch["labels"]
        loss = self.loss_fn(logits, labels)
        
        preds = torch.argmax(logits, dim=-1)
        acc = self.test_accuracy(preds, labels)
        
        self.log("test_loss", loss, on_step=False, on_epoch=True, prog_bar=True)
        self.log("test_accuracy", acc, on_step=False, on_epoch=True, prog_bar=True)

        return loss  
    
    def on_train_epoch_end(self):
        # Log model weights
        for name, param in self.named_parameters():
            self.logger.experiment.add_histogram(name, param, global_step=self.current_epoch)
    
    def configure_optimizers(self):
        optimizer = torch.optim.AdamW(
            filter(lambda p: p.requires_grad, self.parameters()), 
            lr=1e-5, 
            weight_decay=1e-5
        )
        scheduler = {
            'scheduler': ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=2),
            'monitor': 'val_accuracy',
            'interval': 'epoch',
            'frequency': 1
        }
        
        return [optimizer], [scheduler]

In [None]:
config = Wav2Vec2Config.from_pretrained(
    model_name,
    num_labels=len(encoder.classes_)
)

model = Wav2Vec2SERModule(
    model_name = model_name,
    config=config,
    layers_to_unfreeze=[],
)

### Monitoring

In [23]:
tensorboard_logger = TensorBoardLogger(save_dir=experiment_dir, name="training", log_graph=True, default_hp_metric=True)
tensorboard_logger.log_hyperparams({
    "learning_rate": model.lr_head, 
    "batch_size": data_module.batch_size
    })

In [None]:
%load_ext tensorboard
%tensorboard --logdir ./logs/training

### Model Training

### Callbacks

In [None]:
checkpoint_callback = ModelCheckpoint(
    dirpath=f'{experiment_dir}/checkpoints',
    filename="best-checkpoint",
    save_top_k=1,
    verbose=True,
    monitor="val_accuracy",
    mode="max"
)

early_stop_callback = EarlyStopping(
    monitor="val_accuracy",
    patience=10,
    mode="max"
)

batch_size_finder = BatchSizeFinder(
    mode='power', 
    steps_per_trial=3, 
    init_val=4, 
    max_trials=5, 
    batch_arg_name='batch_size'
)

lr_monitor = LearningRateMonitor(
    logging_interval="step"
)

### Training

In [None]:
trainer = pl.Trainer(
    max_epochs=200,
    callbacks=[checkpoint_callback, early_stop_callback, lr_monitor],
    logger=tensorboard_logger,
    accelerator=accelerator,
    precision=32,    
)

In [None]:
trainer.fit(model, data_module)

## Evaluation

In [None]:
print("Callback Metrics:", trainer.callback_metrics)

### Prediction

In [None]:
def get_predictions(model, dataloader, device='cpu'):
    model.eval()
    model.to(device)
    
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for batch in dataloader:
            input_values = batch["input_values"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["labels"].to(device)
            
            logits = model(input_values, attention_mask=attention_mask)
            preds = torch.argmax(logits, dim=-1)
            
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    return np.array(all_preds), np.array(all_labels)

In [None]:
y_pred, y_true = get_predictions(model, data_module.test_dataloader(), device=accelerator)

In [None]:
y_pred_labels = encoder.inverse_transform(y_pred)
y_true_labels = encoder.inverse_transform(y_true)

In [None]:
df_result = pd.DataFrame({
    'Predicted Labels': y_pred_labels,
    'Actual Labels': y_true_labels
})

df_result.head(10)

### Confusion Matrix

In [None]:
cm = confusion_matrix(y_true_labels, y_pred_labels, labels=encoder.classes_)
plt.figure(figsize = (10, 8))
cm_df = pd.DataFrame(cm, index=encoder.classes_, columns=encoder.classes_)

sns.heatmap(cm_df, annot=True, cmap='viridis', fmt='d', square=True)
plt.title('Confusion Matrix', size=20)
plt.xlabel('Predicted Labels', size=14)
plt.ylabel('Actual Labels', size=14)
plt.show()

### Classification Report

In [None]:
print(classification_report(y_true_labels, y_pred_labels, zero_division=True))

## References
- https://github.com/vistec-AI/vistec-ser
- https://github.com/SuperKogito/SER-datasets
- https://datascrutineer.com/speech-emotion-recognition-cnns-tensorflow/
- https://medium.com/airesearch-in-th/thai-ser-ชุดข้อมูลวิเคราะห์อารมณ์จากเสียงชุดแรกในประเทศไทย-aa8a38b63963
- https://colab.research.google.com/drive/1kF5xBYe7d48JRaz3KfIK65A4N5dZMqWQ?usp=sharing#scrollTo=ZxGj77nfsYf0
- https://www.geeksforgeeks.org/transformer-model-from-scratch-using-tensorflow/