# **Speech to Emotion Recognition - Mamba Testing**
James Knee, Tyler Nguyen, Varsha Singh, Anish Sinha, Nathan Strahs



# Preprocessing Data

In [1]:
import sys
print(sys.executable)

/projectnb/ec523/projects/teamSER/miniconda/envs/mamba-env/bin/python


In [2]:
#imports
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchaudio
from torchvision import datasets
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
import os

import random
import torchaudio.transforms as T

from mamba_ssm import Mamba

In [3]:
! nvidia-smi

Wed Apr  9 19:02:39 2025       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 570.124.06             Driver Version: 570.124.06     CUDA Version: 12.8     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|   0  Tesla V100-SXM2-16GB           On  |   00000000:18:00.0 Off |                    0 |
| N/A   40C    P0             43W /  300W |       4MiB /  16384MiB |      0%   E. Process |
|                                         |                        |                  N/A |
+-----------------------------------------+------------------------+----------------------+
|   1  Tesla V100-SXM2-16GB           On  |   00

In [2]:
import mamba_ssm
print(dir(mamba_ssm))

['Mamba', 'Mamba2', 'MambaLMHeadModel', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__path__', '__spec__', '__version__', 'distributed', 'mamba_inner_fn', 'models', 'modules', 'ops', 'selective_scan_fn', 'utils']


In [6]:
# Check if a GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("USING: " + device.type)

# Define a tiny Mamba model and move it to the GPU
model = Mamba(
    d_model=16,  # embedding dimension
    d_state=8,   # internal state dimension
    d_conv=4,    # convolution dimension
    expand=2,    # expansion factor
).to(device)

# Generate random input (batch_size, sequence_length, embedding_dim) and move it to the GPU
x = torch.randn(2, 10, 16).to(device)

# Forward pass
output = model(x)

print("Mamba output shape:", output.shape)


USING: cuda
Mamba output shape: torch.Size([2, 10, 16])


In [8]:
#necessary variables, assuming root directory is /projectnb/ec523/projects/teamSER folder
DATA_PATH="AudioWAV/"

training_split=0.8
testing_split=0.2
batch_size = 32

In [9]:
class AudioDataset(Dataset):
    def __init__(self, data_dir, transform=False, target_length=160):
        self.data_dir = data_dir
        self.transform = transform
        self.target_length = target_length

        # enumeration of emotions
        self.emotion_map = {
            "ANG": 0, "DIS": 1, "FEA": 2,
            "HAP": 3, "NEU": 4, "SAD": 5
        }

        # Filter only valid files with known emotion labels
        self.audio_files = [
            f for f in os.listdir(data_dir)
            if f.endswith('.wav') and f.split('_')[2] in self.emotion_map
        ]

        # Extract labels
        self.strlabels = [f.split('_')[2] for f in self.audio_files]
        self.labels = [self.emotion_map[label] for label in self.strlabels]

        # Fixed transforms
        self.sample_rate = 16000
        self.mel_transform = T.MelSpectrogram(
            sample_rate=self.sample_rate,
            n_fft=2048,
            hop_length=512,
            n_mels=128
        )
        self.db_transform = T.AmplitudeToDB()

        # Resampler reused for efficiency
        self.resampler = T.Resample(orig_freq=48000, new_freq=self.sample_rate)  # Assume worst-case

    def __len__(self):
        return len(self.audio_files)

    def __getitem__(self, idx):
        file_path = os.path.join(self.data_dir, self.audio_files[idx])
        waveform, sample_rate = torchaudio.load(file_path)

        # Resample to 16kHz if needed
        if sample_rate != self.sample_rate:
            resample = T.Resample(orig_freq=sample_rate, new_freq=self.sample_rate)
            waveform = resample(waveform)

        # Convert stereo to mono
        if waveform.shape[0] > 1:
            waveform = waveform.mean(dim=0, keepdim=True)

        # Normalize waveform
        waveform = waveform - waveform.mean()

        # Volume augmentation on waveform
        if self.transform and random.random() < 0.5:
            waveform = T.Vol(gain=(0.5, 1.5), gain_type="amplitude")(waveform)

        # Compute Mel spectrogram and convert to dB
        mel_spec = self.mel_transform(waveform)
        mel_spec = self.db_transform(mel_spec)

        # MinMax normalization to [0, 1]
        mel_min = mel_spec.min()
        mel_max = mel_spec.max()
        mel_spec = (mel_spec - mel_min) / (mel_max - mel_min + 1e-6)

        # Spectrogram-level augmentation
        if self.transform:
            if random.random() < 0.5:
                mel_spec = T.FrequencyMasking(freq_mask_param=15)(mel_spec)
            if random.random() < 0.5:
                mel_spec = T.TimeMasking(time_mask_param=35)(mel_spec)

        # Fix time dimension by padding or cropping
        current_length = mel_spec.shape[-1]
        if current_length < self.target_length:
            pad_amount = self.target_length - current_length
            mel_spec = F.pad(mel_spec, (0, pad_amount))
        else:
            mel_spec = mel_spec[:, :, :self.target_length]

        label = torch.tensor(self.labels[idx], dtype=torch.long)

        # Remove channel dimension if needed (1, 128, T) -> (128, T)
        mel_spec = mel_spec.squeeze(0)

        return mel_spec, label


In [11]:
#this function pads per batch so that every spectogram is the same dimension per batch

def collate_fn(batch):
    spectrograms, labels = zip(*batch)
    
    max_length = max(spec.shape[1] for spec in spectrograms)

    #pad spectrograms to match longest
    spectrograms_padded = [torch.nn.functional.pad(spec, (0, max_length - spec.shape[1])) for spec in spectrograms]

    # Convert list to tensor
    spectrograms_padded = torch.stack(spectrograms_padded)

    labels = torch.tensor(labels, dtype=torch.long)
    return spectrograms_padded, labels

In [13]:
#declaring dataset
dataset = AudioDataset(DATA_PATH)

#calculate training size and testing size
train_size = int(dataset.__len__()*training_split)
test_size = dataset.__len__()-train_size

train_set, test_set = torch.utils.data.random_split(dataset, [train_size, test_size])

train_set.dataset.transform = True
test_set.dataset.transform = False

#dataloaders
train_loader = DataLoader(train_set, batch_size=batch_size, collate_fn=collate_fn, shuffle=True)
test_loader = DataLoader(test_set, batch_size=batch_size, collate_fn=collate_fn, shuffle=False)

#FINAL DIMENSIONS OF SPECS: BatchSize x 128 x MaxTimeLength

# Mamba

In [12]:
class PureAudioMamba(nn.Module):  
    def __init__(self, num_classes=6, d_model=256):
        super().__init__()
        self.input_proj = nn.Linear(128, d_model)
        
        self.mamba1 = Mamba(
            d_model=d_model,
            d_state=16,
            d_conv=4,
            expand=2
        )
        self.mamba2 = Mamba(
            d_model=d_model,
            d_state=16,
            d_conv=4,
            expand=2
        )
        
        self.attention_pool = nn.Sequential(
            nn.Linear(d_model, 1),
            nn.Softmax(dim=1)
        )
        self.classifier = nn.Linear(d_model, num_classes)

    def forward(self, x):
        x = x.permute(0, 2, 1)  
        x = self.input_proj(x)  
        
        x = self.mamba1(x)
        x = self.mamba2(x)
        
        attn_weights = self.attention_pool(x)
        x = torch.sum(x * attn_weights, dim=1)
        return self.classifier(x)

def train_model(train_loader, test_loader):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = PureAudioMamba(num_classes=6).to(device)  # Fixed class name
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)
    scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, 
                                              max_lr=1e-3,
                                              total_steps=len(train_loader)*100)

    best_acc = 0
    for epoch in range(20):
        model.train()
        train_loss = 0
        for specs, labels in train_loader:
            specs, labels = specs.to(device), labels.to(device)
            
            outputs = model(specs)
            loss = criterion(outputs, labels)
            
            optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()
            
            train_loss += loss.item()

        # Validation
        model.eval()
        val_loss = 0
        correct = 0
        total = 0
        with torch.no_grad():
            for specs, labels in test_loader:
                specs, labels = specs.to(device), labels.to(device)
                outputs = model(specs)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        # Metrics
        train_loss /= len(train_loader)
        val_loss /= len(test_loader)
        val_acc = 100 * correct / total
        
        scheduler.step(val_loss)
        
        print(f"Epoch {epoch+1}: "
              f"Train Loss: {train_loss:.4f} | "
              f"Val Loss: {val_loss:.4f} | "
              f"Val Acc: {val_acc:.2f}%")



train_model(train_loader, test_loader)

# NOTE: mamba uses roughly 3 * expand * d_modeel^2 parameters
# NOTE: AdamW is stochastic optimization that modifes the
# Adam optimizer by decoupling weight decay from gradient update
# NOTE: could also try switching scheduler for cosine with warmup; or lower max_lr

Epoch 1: Train Loss: 1.7834 | Val Loss: 1.7523 | Val Acc: 17.86%
Epoch 2: Train Loss: 1.7337 | Val Loss: 1.7076 | Val Acc: 29.35%
Epoch 3: Train Loss: 1.6784 | Val Loss: 1.6335 | Val Acc: 29.21%
Epoch 4: Train Loss: 1.5606 | Val Loss: 1.5326 | Val Acc: 36.74%
Epoch 5: Train Loss: 1.5035 | Val Loss: 1.4644 | Val Acc: 38.68%
Epoch 6: Train Loss: 1.4868 | Val Loss: 1.4381 | Val Acc: 38.48%
Epoch 7: Train Loss: 1.4705 | Val Loss: 1.4229 | Val Acc: 41.10%
Epoch 8: Train Loss: 1.4516 | Val Loss: 1.4133 | Val Acc: 41.71%
Epoch 9: Train Loss: 1.4399 | Val Loss: 1.4246 | Val Acc: 43.92%
Epoch 10: Train Loss: 1.4149 | Val Loss: 1.4282 | Val Acc: 43.32%
Epoch 11: Train Loss: 1.3965 | Val Loss: 1.3816 | Val Acc: 45.40%
Epoch 12: Train Loss: 1.3914 | Val Loss: 1.4011 | Val Acc: 46.34%
Epoch 13: Train Loss: 1.3815 | Val Loss: 1.4063 | Val Acc: 44.66%
Epoch 14: Train Loss: 1.3729 | Val Loss: 1.3805 | Val Acc: 47.01%
Epoch 15: Train Loss: 1.3577 | Val Loss: 1.3751 | Val Acc: 46.88%
Epoch 16: Train Los

## Test how many Epochs is best

In [13]:
class PureAudioMamba(nn.Module):  
    def __init__(self, num_classes=6, d_model=256):
        super().__init__()
        self.input_proj = nn.Linear(128, d_model)
        self.mamba1 = Mamba(d_model=d_model, d_state=16, d_conv=4, expand=2)
        self.mamba2 = Mamba(d_model=d_model, d_state=16, d_conv=4, expand=2)
        self.attention_pool = nn.Sequential(
            nn.Linear(d_model, 1),
            nn.Softmax(dim=1)
        )
        self.classifier = nn.Linear(d_model, num_classes)

    def forward(self, x):
        x = x.permute(0, 2, 1)
        x = self.input_proj(x)
        x = self.mamba1(x)
        x = self.mamba2(x)
        attn_weights = self.attention_pool(x)
        x = torch.sum(x * attn_weights, dim=1)
        return self.classifier(x)

def train_model(train_loader, test_loader):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = PureAudioMamba(num_classes=6).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)
    
    best_val_acc = 0
    best_epoch = 0
    patience = 5  # Stop after 5 epochs without improvement
    early_stop = False
    
    for epoch in range(100):  # Max epochs set high for early stopping
        if early_stop:
            print(f"Early stopping at epoch {epoch+1}")
            break
            
        # Training phase
        model.train()
        train_loss = 0.0
        train_correct = 0
        train_total = 0
        
        for specs, labels in train_loader:
            specs, labels = specs.to(device), labels.to(device)
            
            optimizer.zero_grad()
            outputs = model(specs)
            loss = criterion(outputs, labels)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()
            
            # Track training metrics
            train_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            train_total += labels.size(0)
            train_correct += (predicted == labels).sum().item()
        
        # Calculate training metrics
        train_loss /= len(train_loader)
        train_acc = 100 * train_correct / train_total
        
        # Validation phase
        model.eval()
        val_loss = 0.0
        val_correct = 0
        val_total = 0
        
        with torch.no_grad():
            for specs, labels in test_loader:
                specs, labels = specs.to(device), labels.to(device)
                outputs = model(specs)
                loss = criterion(outputs, labels)
                
                val_loss += loss.item()
                _, predicted = torch.max(outputs.data, 1)
                val_total += labels.size(0)
                val_correct += (predicted == labels).sum().item()
        
        # Calculate validation metrics
        val_loss /= len(test_loader)
        val_acc = 100 * val_correct / val_total
        
        # Print metrics
        print(f"Epoch {epoch+1:03d}: "
              f"Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.2f}% | "
              f"Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.2f}%")
        
        # Early stopping check
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            best_epoch = epoch + 1
        elif (epoch + 1 - best_epoch) >= patience:
            print(f"No improvement for {patience} epochs. Early stopping...")
            early_stop = True
            
    print(f"\nTraining complete. Best validation accuracy: {best_val_acc:.2f}% at epoch {best_epoch}")

# Start training
train_model(train_loader, test_loader)


Epoch 001: Train Loss: 1.7303 | Train Acc: 22.12% | Val Loss: 1.6675 | Val Acc: 26.33%
Epoch 002: Train Loss: 1.5496 | Train Acc: 33.93% | Val Loss: 1.4819 | Val Acc: 34.45%
Epoch 003: Train Loss: 1.5505 | Train Acc: 35.71% | Val Loss: 1.4555 | Val Acc: 37.61%
Epoch 004: Train Loss: 1.4787 | Train Acc: 37.44% | Val Loss: 1.4295 | Val Acc: 39.76%
Epoch 005: Train Loss: 1.4563 | Train Acc: 39.58% | Val Loss: 1.4077 | Val Acc: 41.97%
Epoch 006: Train Loss: 1.4305 | Train Acc: 40.92% | Val Loss: 1.3753 | Val Acc: 43.72%
Epoch 007: Train Loss: 1.4165 | Train Acc: 41.95% | Val Loss: 1.3702 | Val Acc: 42.78%
Epoch 008: Train Loss: 1.3881 | Train Acc: 43.66% | Val Loss: 1.3334 | Val Acc: 46.54%
Epoch 009: Train Loss: 1.3653 | Train Acc: 44.73% | Val Loss: 1.3124 | Val Acc: 47.08%
Epoch 010: Train Loss: 1.3493 | Train Acc: 45.39% | Val Loss: 1.3073 | Val Acc: 46.68%
Epoch 011: Train Loss: 1.3311 | Train Acc: 46.14% | Val Loss: 1.3354 | Val Acc: 45.94%
Epoch 012: Train Loss: 1.3279 | Train Acc: 