In [None]:
import os
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.optim as optim
import numpy as np
import torchaudio
from torchaudio.pipelines import HUBERT_BASE
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix
from sklearn.model_selection import StratifiedKFold, train_test_split
from pytorch_metric_learning.losses import TripletMarginLoss
from pytorch_metric_learning.miners import TripletMarginMiner
from pytorch_metric_learning.distances import CosineSimilarity
from tqdm import tqdm
import matplotlib.pyplot as plt
import seaborn as sns


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Load dataset
full_data = pd.read_csv('/home/misbahfarooq/Desktop/111-cross-iemocap/iemocap/IEMOCAP_full_release/iemocap_4class-s+d.csv')

# Map emotions to integers
emotion_mapping = {'neu': 0, 'hap': 1, 'sad': 2, 'ang': 3}
full_data['Emotion_label'] = full_data['Emotion'].map(emotion_mapping)

# Train-test split
train_val_data, test_data = train_test_split(
    full_data, test_size=0.2, stratify=full_data['Emotion_label'], random_state=42
)

X_full = train_val_data['wav_path'].tolist()
y_full = train_val_data['Emotion_label'].values
X_test = test_data['wav_path'].tolist()
y_test = test_data['Emotion_label'].values

# Dataset class
class HuBERTEmotionDataset(Dataset):
    def __init__(self, wav_paths, labels, target_length=48000):
        self.wav_paths = wav_paths
        self.labels = torch.tensor(labels)
        self.resampler = torchaudio.transforms.Resample(orig_freq=48000, new_freq=48000)
        self.target_length = target_length  # target length for padding/truncation

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        waveform, sr = torchaudio.load(self.wav_paths[idx])
        if sr != 48000:
            waveform = self.resampler(waveform)

        # Padding or truncating to target_length
        if waveform.size(1) < self.target_length:
            padding = self.target_length - waveform.size(1)
            waveform = torch.nn.functional.pad(waveform, (0, padding))
        elif waveform.size(1) > self.target_length:
            waveform = waveform[:, :self.target_length]

        return waveform.squeeze(0), self.labels[idx]


# Model definition
class HuBERTClassifierWithEmbeddings(nn.Module):
    def __init__(self):
        super().__init__()
        self.hubert = HUBERT_BASE.get_model()
        self.project = nn.Linear(768, 768)
        self.dropout = nn.Dropout(0.1)
        self.classifier = nn.Linear(768, 4)

    def forward(self, waveforms):
        with torch.no_grad():
            features, _ = self.hubert(waveforms)
            
        embeddings = features.mean(dim=1)
        embeddings = self.project(embeddings)
        embeddings = self.dropout(embeddings)
        logits = self.classifier(embeddings)
        return logits, embeddings

# Evaluation function
def evaluate(model, loader, return_preds=False):
    model.eval()
    y_true, y_pred = [], []
    with torch.no_grad():
        for waveforms, labels in loader:
            waveforms = waveforms.to(device)
            labels = labels.to(device)
            logits, _ = model(waveforms)
            preds = torch.argmax(logits, dim=1)
            y_true.extend(labels.cpu().numpy())
            y_pred.extend(preds.cpu().numpy())
    acc = accuracy_score(y_true, y_pred)
    prec, rec, f1, _ = precision_recall_fscore_support(y_true, y_pred, average='weighted')
    if return_preds:
        return acc, prec, rec, f1, y_true, y_pred
    return acc, prec, rec, f1

# Confusion matrix plot
def plot_confusion_matrix(y_true, y_pred, labels):
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(6, 5))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=labels, yticklabels=labels)
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.title("Confusion Matrix")
    plt.tight_layout()
    plt.show()

# Training function
def train_and_evaluate(X_train, y_train, X_val, y_val, num_epochs=10, batch_size=64):
    train_loader = DataLoader(HuBERTEmotionDataset(X_train, y_train), batch_size=batch_size, shuffle=True, num_workers=4, pin_memory=True)
    val_loader = DataLoader(HuBERTEmotionDataset(X_val, y_val), batch_size=batch_size, num_workers=4, pin_memory=True)

    model = HuBERTClassifierWithEmbeddings().to(device)
    clf_criterion = nn.CrossEntropyLoss()
    distance = CosineSimilarity()
    triplet_loss_fn = TripletMarginLoss(margin=0.5, distance=distance)
    miner = TripletMarginMiner(margin=0.5, distance=distance, type_of_triplets="hard")

    optimizer = optim.Adam(model.parameters(), lr=1e-3, betas=(0.9, 0.99))
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=30, gamma=0.1)

    triplet_loss_weight = 0.5
    best_val_f1 = 0
    best_model_state = None

    for epoch in range(num_epochs):
        model.train()
        total_clf_loss, total_triplet_loss = 0.0, 0.0
        for waveforms, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}"):
            waveforms = waveforms.to(device)
            labels = labels.to(device)
            logits, embeddings = model(waveforms)
            embeddings = nn.functional.normalize(embeddings, p=2, dim=1)

            clf_loss = clf_criterion(logits, labels)
            hard_triplets = miner(embeddings, labels)
            if len(hard_triplets[0]) > 0:
                triplet_loss = triplet_loss_fn(embeddings, labels, hard_triplets)
            else:
                triplet_loss = torch.tensor(0.0).to(device)

            total_loss = clf_loss + triplet_loss_weight * triplet_loss
            optimizer.zero_grad()
            total_loss.backward()
            optimizer.step()

            total_clf_loss += clf_loss.item()
            total_triplet_loss += triplet_loss.item()

        scheduler.step()

        _, _, _, val_f1 = evaluate(model, val_loader)
        if val_f1 > best_val_f1:
            best_val_f1 = val_f1
            best_model_state = model.state_dict()

        print(f"[Epoch {epoch+1}] Clf Loss: {total_clf_loss/len(train_loader):.4f}, "
              f"Triplet Loss: {total_triplet_loss/len(train_loader):.4f}, Val F1: {val_f1:.4f}")

    model.load_state_dict(best_model_state)
    return model

# 5-Fold Cross-Validation
skf = StratifiedKFold(n_splits=5)
fold_results = []
best_model_state_dict = None
best_f1_score = 0
best_fold = -1

for fold, (train_idx, val_idx) in enumerate(skf.split(X_full, y_full)):
    print(f"\n===== Fold {fold+1} =====")
    X_tr = [X_full[i] for i in train_idx]
    y_tr = y_full[train_idx]
    X_va = [X_full[i] for i in val_idx]
    y_va = y_full[val_idx]

    model = train_and_evaluate(X_tr, y_tr, X_va, y_va)

    test_loader = DataLoader(HuBERTEmotionDataset(X_test, y_test), batch_size=64)
    test_acc, test_prec, test_rec, test_f1 = evaluate(model, test_loader)

    print(f"\u2192 Fold {fold+1} Test Accuracy: {test_acc*100:.2f}%, F1: {test_f1:.4f}, "
          f"Precision: {test_prec:.4f}, Recall: {test_rec:.4f}")
    fold_results.append((test_acc, test_prec, test_rec, test_f1))

    if test_f1 > best_f1_score:
        best_f1_score = test_f1
        best_model_state_dict = model.state_dict()
        best_fold = fold

# Summary
fold_results = np.array(fold_results)
print("\n===== Final Cross-Validated Test Results =====")
print(f"Avg Accuracy: {fold_results[:,0].mean()*100:.2f}%")
print(f"Avg Precision: {fold_results[:,1].mean():.4f}")
print(f"Avg Recall: {fold_results[:,2].mean():.4f}")
print(f"Avg F1 Score: {fold_results[:,3].mean():.4f}")

# Final evaluation
print(f"\nBest Model Found at Fold {best_fold+1} with F1 Score: {best_f1_score:.4f}")
final_model = HuBERTClassifierWithEmbeddings().to(device)
final_model.load_state_dict(best_model_state_dict)

test_loader = DataLoader(HuBERTEmotionDataset(X_test, y_test), batch_size=64,num_workers=4, pin_memory=True)
test_acc, test_prec, test_rec, test_f1, y_true, y_pred = evaluate(final_model, test_loader, return_preds=True)

print(f"\n===== Best Fold Final Evaluation on Test Set =====")
print(f"Accuracy: {test_acc*100:.2f}%")
print(f"Precision: {test_prec:.4f}")
print(f"Recall: {test_rec:.4f}")
print(f"F1 Score: {test_f1:.4f}")

plot_confusion_matrix(y_true, y_pred, labels=list(emotion_mapping.keys()))

In [None]:
def plot_confusion_matrix(y_true, y_pred, labels):
    cm = confusion_matrix(y_true, y_pred)
    cm_percent = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis] * 100  # Convert to percentage

    plt.figure(figsize=(6, 5))
    sns.heatmap(cm_percent, annot=True, fmt='.2f', cmap='Blues',
                xticklabels=labels, yticklabels=labels)
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.title("Confusion Matrix-Audio Modality)")
    plt.tight_layout()
    plt.show()
    
plot_confusion_matrix(y_true, y_pred, labels=list(emotion_mapping.keys()))

In [None]:
from sklearn.manifold import TSNE

def extract_embeddings(model, loader):
    model.eval()
    all_embeddings = []
    all_labels = []
    with torch.no_grad():
        for waveforms, labels in tqdm(loader, desc="Extracting Embeddings"):
            waveforms = waveforms.to(device)
            labels = labels.to(device)
            _, embeddings = model(waveforms)
            embeddings = nn.functional.normalize(embeddings, p=2, dim=1)
            all_embeddings.append(embeddings.cpu().numpy())
            all_labels.append(labels.cpu().numpy())
    all_embeddings = np.vstack(all_embeddings)
    all_labels = np.concatenate(all_labels)
    return all_embeddings, all_labels


# Extract embeddings from the test set
test_embeddings, test_labels = extract_embeddings(model, test_loader)

# Apply t-SNE
tsne = TSNE(n_components=2, perplexity=50, n_iter=1000, random_state=42)
tsne_results = tsne.fit_transform(test_embeddings)

# Plot t-SNE
plt.figure(figsize=(8, 6))
palette = sns.color_palette("bright", len(emotion_mapping))
for i, label in enumerate(emotion_mapping.keys()):
    idxs = test_labels == emotion_mapping[label]
    plt.scatter(tsne_results[idxs, 0], tsne_results[idxs, 1], label=label, alpha=0.7, s=40, color=palette[i])
plt.legend()
plt.title("t-SNE-Audio-Cross-entropy loss ")
plt.xlabel("Dimension 1")
plt.ylabel("Dimension 2")
plt.tight_layout()
plt.show()


In [None]:
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import torch
import torch.nn as nn
from tqdm import tqdm

def extract_embeddings(model, loader):
    model.eval()
    all_embeddings = []
    all_labels = []
    with torch.no_grad():
        for waveforms, labels in tqdm(loader, desc="Extracting Embeddings"):
            waveforms = waveforms.to(device)
            labels = labels.to(device)
            _, embeddings = model(waveforms)
            embeddings = nn.functional.normalize(embeddings, p=2, dim=1)
            all_embeddings.append(embeddings.cpu().numpy())
            all_labels.append(labels.cpu().numpy())
    all_embeddings = np.vstack(all_embeddings)
    all_labels = np.concatenate(all_labels)
    return all_embeddings, all_labels


# Extract embeddings from the test set
test_embeddings, test_labels = extract_embeddings(model, test_loader)

# Apply t-SNE
tsne = TSNE(n_components=2, perplexity=50, n_iter=1000, random_state=42)
tsne_results = tsne.fit_transform(test_embeddings)

# Plot t-SNE
plt.figure(figsize=(8, 6))
palette = sns.color_palette("bright", len(emotion_mapping))
for i, label in enumerate(emotion_mapping.keys()):
    idxs = test_labels == emotion_mapping[label]
    plt.scatter(tsne_results[idxs, 0], tsne_results[idxs, 1], label=label, alpha=0.7, s=40, color=palette[i])

# Set labels and title with increased font size
plt.legend(fontsize=14)
plt.title("t-SNE-Audio-Cross-entropy Loss", fontsize=16)
plt.xlabel("Dimension 1", fontsize=14)
plt.ylabel("Dimension 2", fontsize=14)
plt.tight_layout()
plt.show()
