In [1]:
from google.colab import drive
import os
import json

drive.mount('/content/drive')
audio_folder = "/content/drive/MyDrive/multimodal_emotion_recognition/data"

Mounted at /content/drive


In [9]:
import os
import glob
import numpy as np
from PIL import Image

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, random_split
from torchvision import transforms

mfcc_dir = "/content/drive/MyDrive/multimodal_emotion_recognition/mfccs"
spec_dir = "/content/drive/MyDrive/multimodal_emotion_recognition/spectrograms"

# MFCC Dataset
class MFCCDataset(Dataset):
    def __init__(self, mfcc_dir):
        self.mfcc_dir = mfcc_dir
        self.files = sorted([f for f in os.listdir(mfcc_dir) if f.endswith('.npy')])
        self.label_map = {
            "neutral": 0, "calm": 1, "happy": 2, "sad": 3,
            "angry": 4, "fearful": 5, "disgust": 6, "surprised": 7
        }

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        file = self.files[idx]
        mfcc = np.load(os.path.join(self.mfcc_dir, file))
        mfcc = torch.tensor(mfcc, dtype=torch.float32).unsqueeze(0)
        label = self.label_map[file.split("_")[0]]
        return mfcc, label

# Spectrogram Dataset (sorted)
class SpectrogramDataset(Dataset):
    def __init__(self, spec_dir, transform=None):
        self.spec_files = sorted(glob.glob(os.path.join(spec_dir, "*.png")))
        self.transform = transform
        self.label_map = {
            "neutral": 0, "calm": 1, "happy": 2, "sad": 3,
            "angry": 4, "fearful": 5, "disgust": 6, "surprised": 7
        }

    def __len__(self):
        return len(self.spec_files)

    def __getitem__(self, idx):
        img_path = self.spec_files[idx]
        img = Image.open(img_path).convert('RGB')
        if self.transform:
            img = self.transform(img)
        label = self.label_map[os.path.basename(img_path).split("_")[0]]
        return img, label

# Combined Ensemble Dataset
class EnsembleDataset(Dataset):
    def __init__(self, mfcc_dataset, spec_dataset):
        self.mfcc_dataset = mfcc_dataset
        self.spec_dataset = spec_dataset
        assert len(mfcc_dataset) == len(spec_dataset)

    def __len__(self):
        return len(self.mfcc_dataset)

    def __getitem__(self, idx):
        mfcc, label1 = self.mfcc_dataset[idx]
        spec, label2 = self.spec_dataset[idx]
        assert label1 == label2
        return mfcc, spec, label1

# Define Models
class MFCCCNN(nn.Module):
    def __init__(self, num_classes=8):
        super(MFCCCNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 16, kernel_size=3, padding=1)
        self.pool1 = nn.MaxPool2d(2, 2)

        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, padding=1)
        self.pool2 = nn.MaxPool2d(2, 2)

        self.fc1 = nn.Linear(32 * 10 * 50, 128)
        self.fc2 = nn.Linear(128, num_classes)

    def forward(self, x):
        x = self.pool1(F.relu(self.conv1(x)))
        x = self.pool2(F.relu(self.conv2(x)))
        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        return self.fc2(x)

class SpectrogramCNN(nn.Module):
    def __init__(self):
        super(SpectrogramCNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, 3, padding=1)
        self.conv2 = nn.Conv2d(16, 32, 3, padding=1)
        self.conv3 = nn.Conv2d(32, 64, 3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        self.dropout = nn.Dropout(0.4)

        self.fc1 = nn.Linear(64 * 16 * 16, 128)
        self.fc2 = nn.Linear(128, 8)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = self.pool(F.relu(self.conv3(x)))
        x = x.view(-1, 64 * 16 * 16)
        x = self.dropout(F.relu(self.fc1(x)))
        return self.fc2(x)

# Ensemble Evaluation Logic

def evaluate_ensemble(model_mfcc, model_spec, dataloader, device):
    model_mfcc.eval()
    model_spec.eval()
    correct, total = 0, 0

    with torch.no_grad():
        for mfccs, specs, labels in dataloader:
            mfccs, specs, labels = mfccs.to(device), specs.to(device), labels.to(device)
            out1 = model_mfcc(mfccs)
            out2 = model_spec(specs)
            outputs = (out1 + out2) / 2
            _, preds = torch.max(outputs, 1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)

    return 100 * correct / total

# Training Loop for Individual Models

def train_model(model, dataloader, criterion, optimizer, device, num_epochs=10):
    model = model.to(device)
    for epoch in range(num_epochs):
        model.train()
        running_loss, correct, total = 0.0, 0, 0

        for mfccs, specs, labels in dataloader:  # Unpack mfcc, spec, label
            mfccs, specs, labels = mfccs.to(device), specs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(mfccs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * labels.size(0)
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

        acc = 100 * correct / total
        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {running_loss / total:.4f}, Accuracy: {acc:.2f}%")


# Main Execution Logic

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


transform = transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.ToTensor(),
])

mfcc_dataset = MFCCDataset(mfcc_dir)
spec_dataset = SpectrogramDataset(spec_dir, transform)
ensemble_dataset = EnsembleDataset(mfcc_dataset, spec_dataset)

# Split dataset
train_size = int(0.8 * len(ensemble_dataset))
val_size = len(ensemble_dataset) - train_size
train_dataset, val_dataset = random_split(ensemble_dataset, [train_size, val_size])

train_loader_mfcc = DataLoader([d[0:2:2] for d in train_dataset], batch_size=32, shuffle=True)
train_loader_spec = DataLoader([d[1::2] + (d[2],) for d in train_dataset], batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

model_mfcc = MFCCCNN()
model_spec = SpectrogramCNN()

criterion = nn.CrossEntropyLoss()
optimizer_mfcc = torch.optim.Adam(model_mfcc.parameters(), lr=0.001)
optimizer_spec = torch.optim.Adam(model_spec.parameters(), lr=0.001)

# Train individual models
train_model(model_mfcc, train_loader_mfcc, criterion, optimizer_mfcc, device, num_epochs=10)
train_model(model_spec, train_loader_spec, criterion, optimizer_spec, device, num_epochs=10)

# Evaluate ensemble
ensemble_accuracy = evaluate_ensemble(model_mfcc, model_spec, val_loader, device)
print(f"Ensemble Validation Accuracy: {ensemble_accuracy:.2f}%")


ValueError: not enough values to unpack (expected 3, got 1)