In [None]:
# Emotion Classifier Training with RAVDESS Dataset

# This notebook fine-tunes the HuBERT model (`superb/hubert-base-superb-er`) for emotion classification using the RAVDESS dataset.

# **Dataset**: RAVDESS (Ryerson Audio-Visual Database of Emotional Speech and Song)
# - 8 emotions: neutral, calm, happy, sad, angry, fearful, surprise, disgust
# - ~7,350 audio files from 24 actors

# **Training Strategy**:
# - Freeze all HuBERT layers (keep pre-trained features)
# - Train only the classification head (8 classes)
# - Speaker-independent train/val/test split

In [None]:
import os
import re
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
import librosa
import soundfile as sf
from pathlib import Path
from typing import List, Tuple, Dict
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from transformers import (
    AutoModelForAudioClassification,
    AutoFeatureExtractor,
    AutoConfig
)
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# RAVDESS emotion labels (8 classes)
RAVDESS_EMOTIONS = [
    "neutral",
    "calm",
    "happy",
    "sad",
    "angry",
    "fearful",
    "surprise",
    "disgust"
]

EMOTION_TO_IDX = {emotion: idx for idx, emotion in enumerate(RAVDESS_EMOTIONS)}
IDX_TO_EMOTION = {idx: emotion for emotion, idx in EMOTION_TO_IDX.items()}

print(f"Emotion classes: {RAVDESS_EMOTIONS}")
print(f"Number of classes: {len(RAVDESS_EMOTIONS)}")


In [None]:
## Step 1: Load and Preprocess RAVDESS Dataset

# RAVDESS files are named with pattern: `[Modality]-[Vocal]-[Emotion]-[Intensity]-[Statement]-[Repetition]-[Actor].wav`

# - Modality: 01=full AV, 02=video-only, 03=audio-only
# - Emotion: 01=neutral, 02=calm, 03=happy, 04=sad, 05=angry, 06=fearful, 07=surprise, 08=disgust
# - Actor: 01-24 (12 male, 12 female)


In [None]:
class RAVDESSDataset(Dataset):
    """Dataset class for RAVDESS audio files."""
    
    def __init__(self, file_paths: List[str], labels: List[int], feature_extractor, max_length: int = 16000 * 4):
        """
        Args:
            file_paths: List of audio file paths
            labels: List of emotion labels (indices)
            feature_extractor: HuggingFace feature extractor
            max_length: Maximum audio length in samples (default: 4 seconds at 16kHz)
        """
        self.file_paths = file_paths
        self.labels = labels
        self.feature_extractor = feature_extractor
        self.max_length = max_length
        self.sample_rate = 16000  # RAVDESS is 16kHz
    
    def __len__(self):
        return len(self.file_paths)
    
    def __getitem__(self, idx):
        file_path = self.file_paths[idx]
        label = self.labels[idx]
        
        # Load audio file
        try:
            audio, sr = librosa.load(file_path, sr=self.sample_rate)
            
            # Pad or truncate to max_length
            if len(audio) > self.max_length:
                audio = audio[:self.max_length]
            else:
                audio = np.pad(audio, (0, self.max_length - len(audio)), mode='constant')
            
            inputs = self.feature_extractor(audio, sampling_rate=self.sample_rate, return_tensors="pt", padding=True)
            
            return {
                'input_values': inputs['input_values'].squeeze(0),
                'labels': torch.tensor(label, dtype=torch.long)
            }
        except Exception as e:
            print(f"Error loading {file_path}: {e}")
            # Return zeros if file can't be loaded
            inputs = self.feature_extractor(
                np.zeros(self.max_length),
                sampling_rate=self.sample_rate,
                return_tensors="pt",
                padding=True
            )
            return {
                'input_values': inputs['input_values'].squeeze(0),
                'labels': torch.tensor(0, dtype=torch.long)
            }


def parse_ravdess_filename(filename: str) -> Dict[str, int]:
    """
    Parse RAVDESS filename to extract metadata.
    
    Format: [Modality]-[Vocal]-[Emotion]-[Intensity]-[Statement]-[Repetition]-[Actor].wav
    
    Returns:
        Dictionary with parsed values
    """
    basename = Path(filename).stem
    parts = basename.split('-')
    
    if len(parts) != 7:
        return None
    
    return {
        'modality': int(parts[0]),
        'vocal': int(parts[1]),
        'emotion': int(parts[2]),
        'intensity': int(parts[3]),
        'statement': int(parts[4]),
        'repetition': int(parts[5]),
        'actor': int(parts[6])
    }


def load_ravdess_dataset(data_dir: str) -> Tuple[List[str], List[int], List[int]]:
    """
    Load RAVDESS dataset and extract file paths, labels, and actor IDs.
    
    Args:
        data_dir: Directory containing RAVDESS audio files
        
    Returns:
        Tuple of (file_paths, labels, actors)
    """
    data_dir = Path(data_dir)
    file_paths = []
    labels = []
    actors = []
    
    # Emotion mapping: RAVDESS uses 01-08, we map to 0-7
    # Note: RAVDESS emotion 01=neutral, 02=calm, 03=happy, 04=sad, 05=angry, 06=fearful, 07=surprise, 08=disgust
    emotion_mapping = {
        1: 0,  # neutral
        2: 1,  # calm
        3: 2,  # happy
        4: 3,  # sad
        5: 4,  # angry
        6: 5,  # fearful
        7: 6,  # surprise
        8: 7   # disgust
    }
    
    # Find all WAV files
    audio_files = list(data_dir.rglob("*.wav"))
    
    print(f"Found {len(audio_files)} audio files")
    
    for audio_file in audio_files:
        parsed = parse_ravdess_filename(audio_file.name)
        if parsed is None:
            continue
        
        # Only use audio-only files (modality 03)
        if parsed['modality'] != 3:
            continue
        
        emotion_code = parsed['emotion']
        if emotion_code in emotion_mapping:
            file_paths.append(str(audio_file))
            labels.append(emotion_mapping[emotion_code])
            actors.append(parsed['actor'])
    
    print(f"Loaded {len(file_paths)} audio-only files")
    print(f"Emotion distribution: {np.bincount(labels)}")
    
    return file_paths, labels, actors


# Load dataset
# Update this path to your RAVDESS dataset location
RAVDESS_DATA_DIR = "path/to/ravdess/audio_speech_actors_01-24" 

# Uncomment when dataset is downloaded:
# file_paths, labels, actors = load_ravdess_dataset(RAVDESS_DATA_DIR)


In [None]:
## Step 2: Train/Validation/Test Split (Speaker-Independent)

In [None]:
def create_speaker_independent_split(file_paths: List[str], labels: List[int], actors: List[int], 
                                     train_ratio: float = 0.7, val_ratio: float = 0.15):
    """
    Create speaker-independent train/val/test split.
    
    Args:
        file_paths: List of file paths
        labels: List of labels
        actors: List of actor IDs
        train_ratio: Proportion for training
        val_ratio: Proportion for validation
        
    Returns:
        Train, validation, and test splits
    """
    # Get unique actors
    unique_actors = sorted(set(actors))
    num_actors = len(unique_actors)
    
    # Split actors (not files)
    num_train_actors = int(num_actors * train_ratio)
    num_val_actors = int(num_actors * val_ratio)
    
    train_actors = set(unique_actors[:num_train_actors])
    val_actors = set(unique_actors[num_train_actors:num_train_actors + num_val_actors])
    test_actors = set(unique_actors[num_train_actors + num_val_actors:])
    
    # Split files based on actor membership
    train_files, train_labels = [], []
    val_files, val_labels = [], []
    test_files, test_labels = [], []
    
    for file_path, label, actor in zip(file_paths, labels, actors):
        if actor in train_actors:
            train_files.append(file_path)
            train_labels.append(label)
        elif actor in val_actors:
            val_files.append(file_path)
            val_labels.append(label)
        elif actor in test_actors:
            test_files.append(file_path)
            test_labels.append(label)
    
    print(f"Train: {len(train_files)} files from {len(train_actors)} actors")
    print(f"Validation: {len(val_files)} files from {len(val_actors)} actors")
    print(f"Test: {len(test_files)} files from {len(test_actors)} actors")
    
    return (train_files, train_labels), (val_files, val_labels), (test_files, test_labels)


# Uncomment when dataset is loaded:
# train_data, val_data, test_data = create_speaker_independent_split(file_paths, labels, actors)
# train_files, train_labels = train_data
# val_files, val_labels = val_data
# test_files, test_labels = test_data


In [None]:
# Load feature extractor
model_name = "superb/hubert-base-superb-er"
num_classes = len(RAVDESS_EMOTIONS)

print(f"Loading model: {model_name}")
print(f"Number of classes: {num_classes}")

feature_extractor = AutoFeatureExtractor.from_pretrained(model_name, trust_remote_code=True)

# Load model config and update for 8 classes
config = AutoConfig.from_pretrained(model_name, trust_remote_code=True)
config.num_labels = num_classes

# Load model
model = AutoModelForAudioClassification.from_pretrained(
    model_name,
    config=config,
    trust_remote_code=True,
    ignore_mismatched_sizes=True
)

# Get hidden size
hidden_size = getattr(config, 'hidden_size', 768)

# Try to get from existing classifier
if hasattr(model, 'classifier'):
    if isinstance(model.classifier, nn.Linear):
        hidden_size = model.classifier.in_features
    elif isinstance(model.classifier, nn.Sequential):
        for layer in reversed(model.classifier):
            if isinstance(layer, nn.Linear):
                hidden_size = layer.in_features
                break

print(f"Hidden size: {hidden_size}")

# Replace classification head with new 8-class classifier
new_classifier = nn.Linear(hidden_size, num_classes)

if hasattr(model, 'classification_head'):
    model.classification_head = new_classifier
else:
    model.classifier = new_classifier

# Freeze all layers except the classifier
for name, param in model.named_parameters():
    if 'classifier' in name or 'classification_head' in name:
        param.requires_grad = True
        print(f"Training: {name}")
    else:
        param.requires_grad = False

# Count trainable parameters
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
total_params = sum(p.numel() for p in model.parameters())
print(f"\nTrainable parameters: {trainable_params:,} / {total_params:,} ({100 * trainable_params / total_params:.2f}%)")

model = model.to(device)
print(f"Model moved to {device}")


In [None]:
# Create datasets
# Uncomment when data is loaded:
# train_dataset = RAVDESSDataset(train_files, train_labels, feature_extractor)
# val_dataset = RAVDESSDataset(val_files, val_labels, feature_extractor)
# test_dataset = RAVDESSDataset(test_files, test_labels, feature_extractor)

# Create data loaders
batch_size = 8  # Adjust based on GPU memory
# train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=2)
# val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=2)
# test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=2)

print("Data loaders created (uncomment when dataset is loaded)")


In [None]:
# Training hyperparameters
learning_rate = 1e-4
num_epochs = 15
patience = 3  # Early stopping patience

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(
    filter(lambda p: p.requires_grad, model.parameters()),
    lr=learning_rate,
    weight_decay=0.01
)

# Learning rate scheduler
scheduler = optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode='min', factor=0.5, patience=3, verbose=True
)

print("Training setup complete")
print(f"Learning rate: {learning_rate}")
print(f"Epochs: {num_epochs}")
print(f"Optimizer: AdamW")


In [None]:
def train_epoch(model, train_loader, criterion, optimizer, device):
    """Train for one epoch."""
    model.train()
    total_loss = 0
    correct = 0
    total = 0
    
    progress_bar = tqdm(train_loader, desc="Training")
    for batch in progress_bar:
        input_values = batch['input_values'].to(device)
        labels = batch['labels'].to(device)
        
        optimizer.zero_grad()
        
        outputs = model(input_values=input_values)
        logits = outputs.logits
        
        loss = criterion(logits, labels)
        loss.backward()
        
        # Gradient clipping
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        
        optimizer.step()
        
        total_loss += loss.item()
        _, predicted = torch.max(logits.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        
        progress_bar.set_postfix({
            'loss': loss.item(),
            'acc': 100 * correct / total
        })
    
    avg_loss = total_loss / len(train_loader)
    accuracy = 100 * correct / total
    return avg_loss, accuracy


def validate(model, val_loader, criterion, device):
    """Validate the model."""
    model.eval()
    total_loss = 0
    correct = 0
    total = 0
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for batch in tqdm(val_loader, desc="Validating"):
            input_values = batch['input_values'].to(device)
            labels = batch['labels'].to(device)
            
            outputs = model(input_values=input_values)
            logits = outputs.logits
            
            loss = criterion(logits, labels)
            total_loss += loss.item()
            
            _, predicted = torch.max(logits.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    avg_loss = total_loss / len(val_loader)
    accuracy = 100 * correct / total
    return avg_loss, accuracy, all_preds, all_labels


# Training loop
# Uncomment when data loaders are ready:
"""
best_val_loss = float('inf')
best_val_acc = 0
patience_counter = 0
train_losses = []
val_losses = []
train_accs = []
val_accs = []

for epoch in range(num_epochs):
    print(f"\nEpoch {epoch + 1}/{num_epochs}")
    print("-" * 50)
    
    # Train
    train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
    train_losses.append(train_loss)
    train_accs.append(train_acc)
    
    # Validate
    val_loss, val_acc, val_preds, val_labels = validate(model, val_loader, criterion, device)
    val_losses.append(val_loss)
    val_accs.append(val_acc)
    
    # Learning rate scheduling
    scheduler.step(val_loss)
    
    print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%")
    print(f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%")
    
    # Save best model
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        best_val_acc = val_acc
        patience_counter = 0
        
        # Save model
        os.makedirs("checkpoints", exist_ok=True)
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'val_loss': val_loss,
            'val_acc': val_acc,
        }, "checkpoints/best_model.pt")
        print("âœ“ Saved best model")
    else:
        patience_counter += 1
    
    # Early stopping
    if patience_counter >= patience:
        print(f"Early stopping at epoch {epoch + 1}")
        break

print(f"\nBest validation accuracy: {best_val_acc:.2f}%")
"""


In [None]:
# Load best model and evaluate on test set
# Uncomment when training is complete:
"""
# Load best model
checkpoint = torch.load("checkpoints/best_model.pt")
model.load_state_dict(checkpoint['model_state_dict'])
print("Loaded best model from checkpoint")

# Evaluate on test set
test_loss, test_acc, test_preds, test_labels = validate(model, test_loader, criterion, device)
print(f"\nTest Results:")
print(f"Test Loss: {test_loss:.4f}")
print(f"Test Accuracy: {test_acc:.2f}%")

# Classification report
print("\nClassification Report:")
print(classification_report(
    test_labels,
    test_preds,
    target_names=RAVDESS_EMOTIONS,
    digits=4
))

# Confusion matrix
print("\nConfusion Matrix:")
cm = confusion_matrix(test_labels, test_preds)
print(cm)
"""


In [None]:
# Save the final model
# Uncomment when training is complete:
"""
save_dir = "../model/checkpoints/emotion_classifier_ravdess"
os.makedirs(save_dir, exist_ok=True)

# Save model and tokenizer
model.save_pretrained(save_dir)
feature_extractor.save_pretrained(save_dir)

# Also save PyTorch state dict
torch.save(model.state_dict(), os.path.join(save_dir, "pytorch_model.bin"))

print(f"Model saved to {save_dir}")
print("Model is ready to use in worker.py!")
"""
