#Downloads

In [None]:
!pip install torchcodec --quiet
!pip install torchinfo --quiet
!pip install transformers accelerate sentencepiece torchaudio diffusers datasets soundfile pillow --quiet

#Imports

In [None]:
import os
import shutil
import soundfile as sf

import torch
import torchaudio
import torch.nn as nn
from torchinfo import summary
from torchvision import datasets
from torchvision.models import vit_b_16
from torch.utils.data import DataLoader, random_split, TensorDataset, Dataset

from PIL import Image
from diffusers import FluxPipeline
from transformers import WhisperProcessor, WhisperModel, WhisperForConditionalGeneration, pipeline, AutoTokenizer, AutoModelForSequenceClassification, AutoModelForCausalLM, GenerationConfig


import gc
import json
import uuid
import time
import random
import librosa
import logging
import kagglehub
import numpy as np
import pandas as pd
import seaborn as sns
from tqdm import tqdm
from pathlib import Path
import matplotlib.pyplot as plt
from dataclasses import dataclass
from sklearn.svm import LinearSVC
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report

from huggingface_hub import login
login()
os.environ["HF_TOKEN"] = ""

#Dataset Preparation

Download the mixed dataset (we merged) From Kaggle, unzip it and copy it to /content/.

Check and delete corrupted files.
Setup dataloaders and preprocess data, make it compatible with Whisper.

In [None]:
path = kagglehub.dataset_download("kamilhanna/emotion-dataset", force_download=True)
print("Path to dataset files:", path)

In [None]:
!mv "{path}" "/content"
!mv "/content/1/content/Emotion" "/content/"
!rm -rf "/content/1"

# #Dropping this class cause low data
!rm -rf "/content/Emotion/calm"

In [None]:
#Detete corrupted files
def clean_corrupted_audio(root):
    removed = 0
    checked = 0

    print(f"Scanning audio under: {root}\n")

    for folder, _, files in os.walk(root):
        for f in files:
            if not f.lower().endswith((".wav", ".m4a")):
                continue

            path = os.path.join(folder, f)
            checked += 1

            try:
                audio, sr = torchaudio.load(path)
            except Exception as e:
                print(f"[CORRUPTED] Removing: {path}   -->   {e}")
                try:
                    os.remove(path)
                    removed += 1
                except Exception as re:
                    print(f"[ERROR] Could not delete {path}: {re}")
                continue

    print("\n===== SUMMARY =====")
    print(f"Checked:  {checked} files")
    print(f"Removed:  {removed} corrupted files\n")
    return removed

clean_corrupted_audio("/content/Emotion")

In [None]:
#Setup dataloaders

def audio_loader(path):
    audio, sr = torchaudio.load(path)
    # mono
    if audio.shape[0] > 1:
        audio = audio.mean(dim=0)
    else:
        audio = audio[0]
    # resample
    if sr != 16000:
        audio = torchaudio.functional.resample(audio, sr, 16000)

    return audio.contiguous()

full_dataset = datasets.DatasetFolder(
    root="/content/Emotion",
    loader=audio_loader,
    extensions=("wav", "m4a"),
    transform=None
)

train_size = int(0.8 * len(full_dataset))
val_size = int(0.1 * len(full_dataset))
test_size = len(full_dataset) - train_size - val_size

train_dataset, val_dataset, test_dataset = random_split(full_dataset, [train_size, val_size, test_size])

num_classes = len(full_dataset.classes)
print(f"number of classes is {num_classes}\nSize of train dataset {len(train_dataset)}\nSize of val dataset {len(val_dataset)}\nSize of test dataset {len(test_dataset)}")

processor = WhisperProcessor.from_pretrained("openai/whisper-large-v3")

print("Class â†’ Index mapping:")
for cls_name, idx in full_dataset.class_to_idx.items():
    print(f"{cls_name} --> {idx}")


@dataclass
class WhisperCollate:
    processor: WhisperProcessor

    def __call__(self, batch):
        audios = []
        labels = []

        for audio_tensor, label in batch:
          audio = audio_tensor.squeeze().float().cpu().numpy()
          audios.append(audio)
          labels.append(label)

        inputs = self.processor(
            audios,
            sampling_rate=16000,
            padding="max_length",
            return_attention_mask=True,
            return_tensors="pt"
        )
        labels = torch.tensor(labels, dtype=torch.long)

        return {
            "input_features": inputs.input_features,
            "attention_mask": inputs.attention_mask,
            "labels": labels
        }



#Here we had some problems when batch size was 64 and high num of workers
collate_fn = WhisperCollate(processor)
BATCH_SIZE = 16
NUM_WORKERS = 0


train_loader = DataLoader(
    train_dataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=NUM_WORKERS,
    pin_memory=True,
    collate_fn=collate_fn
)

val_loader = DataLoader(
    val_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=NUM_WORKERS,
    pin_memory=True,
    collate_fn=collate_fn
)

test_loader = DataLoader(
    test_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=NUM_WORKERS,
    pin_memory=True,
    collate_fn=collate_fn
)


#Train MLP From Whisper Encoded data

Instead of doing encoder forward pass + MLP forward pass each time to train.

Convert all audio files to hidden state of encoder and create a new dataset.

Train MLP.

In [None]:
def extract_embeddings(model, dataloader, save_path):
    model.eval()
    all_embs = []
    all_labels = []
    skipped_batches = 0

    with torch.no_grad():
        for batch in tqdm(dataloader, desc="Extracting"):
            if batch is None:
                skipped_batches += 1
                logger.warning(f"Skipped batch (total: {skipped_batches})")
                continue

            try:
                feats = batch["input_features"].cuda()
                mask = batch["attention_mask"].cuda()
                labels = batch["labels"]

                enc = model.encoder(
                    input_features=feats,
                    attention_mask=mask,
                    return_dict=True
                ).last_hidden_state
                pooled = enc.mean(dim=1).cpu()

                all_embs.append(pooled)
                all_labels.append(labels)

            except Exception as e:
                logger.error(f"Error processing batch: {type(e).__name__} - {str(e)}")
                continue

    if len(all_embs) == 0:
        logger.error("No embeddings extracted! Check your data.")
        return

    X = torch.cat(all_embs, dim=0)
    y = torch.cat(all_labels, dim=0)

    torch.save((X, y), save_path)
    print(f"Saved {len(X)} embeddings to {save_path} (Skipped {skipped_batches} batches)")

In [None]:
# Extract embeddings
model = WhisperModel.from_pretrained("openai/whisper-large-v3").cuda()

extract_embeddings(model, train_loader, "train_emb.pt")
extract_embeddings(model, val_loader, "val_emb.pt")
extract_embeddings(model, test_loader, "test_emb.pt")

In [None]:
#Dataset class for whisper encoder embeddings used for MLP training
class EmbDataset(torch.utils.data.Dataset):
    def __init__(self, X, y):
        self.X = X.float()
        self.y = y.long()

    def __len__(self):
        return len(self.X)

    def __getitem__(self, i):
        return self.X[i], self.y[i]

path = "/content/drive/MyDrive/GenAI/Project"
X_train, y_train = torch.load(f"{path}/train_emb.pt")
X_val, y_val = torch.load(f"{path}/val_emb.pt")
X_test, y_test = torch.load(f"{path}/test_emb.pt")
train_ds = EmbDataset(X_train, y_train)
val_ds   = EmbDataset(X_val, y_val)
test_ds  = EmbDataset(X_test, y_test)


BATCH_SIZE = 64
NUM_WORKERS = 4
train_loader_mlp = DataLoader(train_ds,
                           batch_size=BATCH_SIZE,
                           shuffle=True,
                           num_workers=NUM_WORKERS,
                           pin_memory=True)
val_loader_mlp = DataLoader(val_ds,
                           batch_size=BATCH_SIZE,
                           shuffle=False,
                           num_workers=NUM_WORKERS,
                           pin_memory=True)
test_loader_mlp = DataLoader(test_ds,
                           batch_size=BATCH_SIZE,
                           shuffle=False,
                           num_workers=NUM_WORKERS,
                           pin_memory=True)
num_classes = len(y_train.unique())
print(num_classes)

In [None]:
MODEL_HIDDEN_DIMENSION = 1280
class EmotionClassifier(nn.Module):
    def __init__(self, num_classes):
        super().__init__()

        # hidden_1 = MODEL_HIDDEN_DIMENSION // 2
        # hidden_2 = MODEL_HIDDEN_DIMENSION // 4
        # hidden_3 = MODEL_HIDDEN_DIMENSION // 8

        # self.net = nn.Sequential(
        #     nn.Linear(MODEL_HIDDEN_DIMENSION, MODEL_HIDDEN_DIMENSION // 2),
        #     nn.ReLU(),
        #     nn.Dropout(0.3),
        #     nn.Linear(MODEL_HIDDEN_DIMENSION // 2, MODEL_HIDDEN_DIMENSION // 4),
        #     nn.ReLU(),
        #     nn.Dropout(0.3),
        #     nn.Linear(MODEL_HIDDEN_DIMENSION // 4, num_classes)
        # )

        # self.net = nn.Sequential(
        #     nn.Linear(MODEL_HIDDEN_DIMENSION, hidden_1),
        #     nn.GELU(),
        #     nn.Dropout(0.15),
        #     nn.BatchNorm1d(hidden_1),

        #     nn.Linear(hidden_1, hidden_1),
        #     nn.GELU(),
        #     nn.Dropout(0.15),
        #     nn.BatchNorm1d(hidden_1),

        #     nn.Linear(hidden_1, hidden_1),
        #     nn.GELU(),
        #     nn.Dropout(0.15),
        #     nn.BatchNorm1d(hidden_1),

        #     nn.Linear(hidden_1, hidden_1),
        #     nn.GELU(),
        #     nn.Dropout(0.15),
        #     nn.BatchNorm1d(hidden_1),

        #     nn.Linear(hidden_1, hidden_2),
        #     nn.GELU(),
        #     nn.Dropout(0.15),
        #     nn.BatchNorm1d(hidden_2),

        #     nn.Linear(hidden_2, hidden_2),
        #     nn.GELU(),
        #     nn.Dropout(0.15),
        #     nn.BatchNorm1d(hidden_2),

        #     nn.Linear(hidden_2, hidden_2),
        #     nn.GELU(),
        #     nn.Dropout(0.15),
        #     nn.BatchNorm1d(hidden_2),

        #     nn.Linear(hidden_2, hidden_3),
        #     nn.GELU(),
        #     nn.Dropout(0.15),
        #     nn.BatchNorm1d(hidden_3),

        #     nn.Linear(hidden_3, num_classes)
        # )

        #0.77
        # hidden_1 = 1024
        # hidden_2 = 512
        # hidden_3 = 256
        # hidden_4 = 128
        # hidden_5 = 64
        # self.net = nn.Sequential(
        #     nn.Linear(MODEL_HIDDEN_DIMENSION, hidden_1),
        #     nn.GELU(),
        #     nn.Dropout(0.15),
        #     nn.BatchNorm1d(hidden_1),

        #     nn.Linear(hidden_1, hidden_2),
        #     nn.GELU(),
        #     nn.Dropout(0.15),
        #     nn.BatchNorm1d(hidden_2),

        #     nn.Linear(hidden_2, hidden_3),
        #     nn.GELU(),
        #     nn.Dropout(0.15),
        #     nn.BatchNorm1d(hidden_3),

        #     nn.Linear(hidden_3, hidden_3),
        #     nn.GELU(),
        #     nn.Dropout(0.15),
        #     nn.BatchNorm1d(hidden_3),

        #     nn.Linear(hidden_3, hidden_4),
        #     nn.GELU(),
        #     nn.Dropout(0.15),
        #     nn.BatchNorm1d(hidden_4),

        #     nn.Linear(hidden_4, hidden_5),
        #     nn.GELU(),
        #     nn.Dropout(0.15),
        #     nn.BatchNorm1d(hidden_5),

        #     nn.Linear(hidden_5, num_classes)
        # )

        #83 + 84% test
        hidden_1 = 1024
        hidden_2 = 512
        hidden_3 = 256
        hidden_4 = 128
        hidden_5 = 64
        self.net = nn.Sequential(
            nn.Linear(MODEL_HIDDEN_DIMENSION, MODEL_HIDDEN_DIMENSION * 2),
            nn.GELU(),
            nn.Dropout(0.15),
            nn.BatchNorm1d(MODEL_HIDDEN_DIMENSION * 2),

            nn.Linear(MODEL_HIDDEN_DIMENSION * 2, hidden_1),
            nn.GELU(),
            nn.Dropout(0.15),
            nn.BatchNorm1d(hidden_1),

            nn.Linear(hidden_1, hidden_2),
            nn.GELU(),
            nn.Dropout(0.15),
            nn.BatchNorm1d(hidden_2),

            nn.Linear(hidden_2, hidden_3),
            nn.GELU(),
            nn.Dropout(0.15),
            nn.BatchNorm1d(hidden_3),

            nn.Linear(hidden_3, hidden_3),
            nn.GELU(),
            nn.Dropout(0.15),
            nn.BatchNorm1d(hidden_3),

            nn.Linear(hidden_3, hidden_4),
            nn.GELU(),
            nn.Dropout(0.15),
            nn.BatchNorm1d(hidden_4),

            nn.Linear(hidden_4, hidden_5),
            nn.GELU(),
            nn.Dropout(0.15),
            nn.BatchNorm1d(hidden_5),

            nn.Linear(hidden_5, num_classes)
        )

        # self.net = nn.Sequential(
        #     nn.Linear(1280, 1024),
        #     nn.ReLU(),
        #     nn.Dropout(0.2),

        #     nn.Linear(1024, 512),
        #     nn.ReLU(),
        #     nn.Dropout(0.2),

        #     nn.Linear(512, 256),
        #     nn.ReLU(),

        #     nn.Linear(256, num_classes)
        # )

    def forward(self, x):
        return self.net(x)


model = EmotionClassifier(num_classes).cuda()

summary(model, input_size=(1, MODEL_HIDDEN_DIMENSION))

In [None]:
def accuracy_from_logits(logits, labels):
    preds = logits.argmax(dim=-1)
    return (preds == labels).float().mean().item()

def train_model(model, train_loader_mlp, val_loader_mlp, num_epochs=20):

    model = model.cuda()
    criterion = nn.CrossEntropyLoss(label_smoothing=0.05)
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-4, weight_decay=1e-4)
    # scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    #     optimizer, mode="max", factor=0.5, patience=3, min_lr=1e-6
    # )
    T_max = 300
    eta_min = 1e-6
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
        optimizer,
        T_max=T_max,
        eta_min=eta_min
    )
    # scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=50, gamma=0.5)
    # scheduler = torch.optim.lr_scheduler.OneCycleLR(
    #     optimizer,
    #     max_lr=3e-4,
    #     steps_per_epoch=len(train_loader_mlp),
    #     epochs=num_epochs,
    #     pct_start=0.3,
    # )

    best_val_acc = 0.0

    for epoch in range(num_epochs):
        if (epoch + 1) % 10 == 0:
          print(f"\n===== Epoch {epoch+1}/{num_epochs} =====")
        model.train()
        train_loss = 0.0
        train_acc = 0.0

        pbar = tqdm(train_loader_mlp, desc="Training", leave=False)
        for batch in pbar:
            X, y = batch
            X, y = X.cuda(), y.cuda()

            optimizer.zero_grad()

            out = model(X)

            loss = criterion(out, y)
            loss.backward()
            optimizer.step()

            acc = accuracy_from_logits(out, y)

            train_loss += loss.item()
            train_acc += acc

            pbar.set_postfix(loss=loss.item(), acc=acc)

        train_loss /= len(train_loader_mlp)
        train_acc /= len(train_loader_mlp)

        model.eval()
        val_loss = 0.0
        val_acc = 0.0

        with torch.no_grad():
            pbar = tqdm(val_loader_mlp, desc="Validation", leave=False)
            for batch in pbar:
                X, y = batch
                X, y = X.cuda(), y.cuda()

                out = model(X)

                loss = criterion(out, y)
                acc = accuracy_from_logits(out, y)

                val_loss += loss.item()
                val_acc += acc

                pbar.set_postfix(loss=loss.item(), acc=acc)

        val_loss /= len(val_loader_mlp)
        val_acc /= len(val_loader_mlp)


        # scheduler.step(val_acc)
        scheduler.step()

        if val_acc > best_val_acc:
            best_val_acc = val_acc
            torch.save(model.state_dict(), "best_model.pt")

        if (epoch +  1) % 10 == 0:
          print(
              f"Epoch {epoch+1} Summary:\n"
              f"  Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.4f}\n"
              f"  Val Loss:   {val_loss:.4f} | Val Acc:   {val_acc:.4f}\n"
              f"  LR:         {optimizer.param_groups[0]['lr']:.6f}\n"
              f"  Best Val Acc So Far: {best_val_acc:.4f}"
          )

    print("\nTraining finished.")
    return best_val_acc
model = EmotionClassifier(num_classes)
train_model(model, train_loader_mlp, val_loader_mlp, num_epochs=300)

#Evaluation Trained MLP

In [None]:
def evaluate_test(model, test_loader_mlp):
    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for X, y in test_loader_mlp:
            X = X.cuda()
            y = y.cuda()

            logits = model(X)
            preds = logits.argmax(dim=-1)

            all_preds.append(preds.cpu().numpy())
            all_labels.append(y.cpu().numpy())

    all_preds = np.concatenate(all_preds)
    all_labels = np.concatenate(all_labels)

    test_acc = accuracy_score(all_labels, all_preds)
    cm = confusion_matrix(all_labels, all_preds)

    return test_acc, cm



model.load_state_dict(torch.load("best_model.pt"))
model = model.cuda()
model.eval()
test_acc, cm = evaluate_test(model, test_loader_mlp)

print("Test Accuracy:", test_acc)


plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues")
plt.xlabel("Predicted")
plt.ylabel("True")
plt.title("Confusion Matrix")
plt.tight_layout()
plt.show()


In [None]:
# Class names
class_names = [
    "angry", "contempt", "disgust", "fear",
    "happy", "neutral", "sad", "surprised"
]


cm_float = cm.astype(float)


TP = np.diag(cm_float)


FP = np.sum(cm_float, axis=0) - TP


FN = np.sum(cm_float, axis=1) - TP

TN = np.sum(cm_float) - (TP + FP + FN)


precision = TP / (TP + FP + 1e-9)
recall = TP / (TP + FN + 1e-9)
f1 = 2 * (precision * recall) / (precision + recall + 1e-9)


true_accuracy = TP / (TP + FN + 1e-9)


df_metrics = pd.DataFrame({
    "Class": class_names,
    "True Accuracy": true_accuracy,
    "Precision": precision,
    "Recall": recall,
    "F1 Score": f1
})

print(df_metrics)


overall_accuracy = TP.sum() / cm.sum()
print("\nOverall True Accuracy:", overall_accuracy)

#Try SVM instead of MLP (Check if problem is simpler, Did not work well)

In [None]:
def loader_to_numpy(data_loader):
    X_list, y_list = [], []
    for X, y in data_loader:
        X_list.append(X.numpy())
        y_list.append(y.numpy())
    return np.vstack(X_list), np.hstack(y_list)


X_train_np, y_train_np = loader_to_numpy(train_loader_mlp)
X_val_np,   y_val_np   = loader_to_numpy(val_loader_mlp)
X_test_np,  y_test_np  = loader_to_numpy(test_loader_mlp)

print(X_train_np.shape, y_train_np.shape)
print(X_test_np.shape, y_test_np.shape)


In [None]:
svm_model = LinearSVC(
    C=0.7,
    class_weight="balanced",
    max_iter=5000
)

svm_model.fit(X_train_np, y_train_np)
print("SVM training complete.")

#or

# svm_rbf = SVC(
#     kernel='rbf',
#     C=4,
#     gamma='scale',
#     class_weight="balanced"
# )

# svm_rbf.fit(X_train_np, y_train_np)

# svm_model = svm_rbf
# y_pred_rbf = svm_model.predict(X_test_np)
# print("RBF Test Accuracy:", accuracy_score(y_test_np, y_pred_rbf))



#Evaluate SVM (Check if problem is simpler, Did not work well)

In [None]:
y_pred = svm_model.predict(X_test_np)

test_acc = accuracy_score(y_test_np, y_pred)
cm = confusion_matrix(y_test_np, y_pred)

print("Test Accuracy:", test_acc)
print("\nClassification Report:")
print(classification_report(y_test_np, y_pred))


In [None]:
plt.figure(figsize=(8,6))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues")
plt.xlabel("Predicted")
plt.ylabel("True")
plt.title("SVM Confusion Matrix")
plt.tight_layout()
plt.show()


In [None]:
class_names = [
    "angry", "contempt", "disgust", "fear",
    "happy", "neutral", "sad", "surprised"
]

cm_float = cm.astype(float)

TP = np.diag(cm_float)
FP = np.sum(cm_float, axis=0) - TP
FN = np.sum(cm_float, axis=1) - TP
TN = np.sum(cm_float) - (TP + FP + FN)

precision = TP / (TP + FP + 1e-9)
recall = TP / (TP + FN + 1e-9)
f1 = 2 * (precision * recall) / (precision + recall + 1e-9)
true_acc = TP / (TP + FN + 1e-9)

df_metrics = pd.DataFrame({
    "Class": class_names,
    "True Accuracy": true_acc,
    "Precision": precision,
    "Recall": recall,
    "F1 Score": f1
})

print(df_metrics)

overall_accuracy = TP.sum() / cm.sum()
print("\nOverall True Accuracy:", overall_accuracy)
