In [None]:
# --- 1. INSTALLS AND IMPORTS ---
!pip install -q scikit-learn pandas numpy Pillow

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import numpy as np
import pandas as pd
from PIL import Image
import os
from google.colab import drive
import sys
import warnings
warnings.filterwarnings("ignore")

# --- 2. DRIVE MOUNT ---
print("Mounting Google Drive...")
if 'google.colab' in sys.modules:
    drive.mount('/content/drive')
print("Drive mounted.")

# --- 3. CONFIGURATION (FACIAL MODEL) ---
# >>>>>> CRITICAL STEP: REPLACE THIS PATH with the EXACT path to your fer2013.csv file on Drive <<<<<<
CSV_FILE_PATH = '/content/drive/MyDrive/multimodel-fusion/fer2013.csv'
MODEL_SAVE_PATH_FACE = '/content/drive/MyDrive/best_fer2013_cnn.pth'

NUM_CLASSES = 7  # Angry, Disgust, Fear, Happy, Sad, Surprise, Neutral
BATCH_SIZE = 64
EPOCHS = 20
LEARNING_RATE = 0.001
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

EMOTION_MAP_FACE = {0: 'Angry', 1: 'Disgust', 2: 'Fear', 3: 'Happy', 4: 'Sad', 5: 'Surprise', 6: 'Neutral'}

print(f"\nUsing device: {DEVICE}")
print(f"Facial Model will be saved to: {MODEL_SAVE_PATH_FACE}")

Mounting Google Drive...
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Drive mounted.

Using device: cuda
Facial Model will be saved to: /content/drive/MyDrive/best_fer2013_cnn.pth


In [None]:
# --- 4. CUSTOM PYTORCH DATASET CLASS ---
class FER2013Dataset(Dataset):
    def __init__(self, data_frame, transform=None):
        self.data_frame = data_frame
        self.transform = transform

    def __len__(self):
        return len(self.data_frame)

    def __getitem__(self, idx):
        # 1. Extract pixel string and convert to 48x48 numpy array
        pixel_string = self.data_frame.iloc[idx]['pixels']
        pixels = np.fromstring(pixel_string, dtype=int, sep=' ').reshape(48, 48)
        image = pixels.astype(np.uint8)

        # 2. Convert to PIL Image
        image = Image.fromarray(image).convert('L') # 'L' for grayscale

        # 3. Apply transformations
        if self.transform:
            image = self.transform(image)

        # 4. Get label
        label = self.data_frame.iloc[idx]['emotion']
        return image, torch.tensor(label, dtype=torch.long)

# --- 5. DATA LOADING AND PREPROCESSING ---
try:
    df = pd.read_csv(CSV_FILE_PATH)
    print("✅ Facial DataFrame loaded successfully from Drive.")
except FileNotFoundError:
    print(f"❌ FATAL ERROR: CSV file not found at {CSV_FILE_PATH}. Please check your path and Cell 1.")
    sys.exit(1)

# Standard FER-2013 Split based on the 'Usage' column
df_train = df[df['Usage'] == 'Training']
df_val = df[df['Usage'] == 'PrivateTest']
df_test = df[df['Usage'] == 'PublicTest']

print(f"Unique 'Usage' values found in CSV: {df['Usage'].unique()}")

# Data Transformations (Augmentation is used on training set)
train_transform = transforms.Compose([
    transforms.RandomCrop(48, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5])
])

test_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5])
])

# Create Dataset and DataLoader instances
train_dataset = FER2013Dataset(df_train, transform=train_transform)
val_dataset = FER2013Dataset(df_val, transform=test_transform)
test_dataset = FER2013Dataset(df_test, transform=test_transform)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)

print(f"Train samples: {len(train_dataset)}, Validation samples: {len(val_dataset)}, Test samples: {len(test_dataset)}")

✅ Facial DataFrame loaded successfully from Drive.
Unique 'Usage' values found in CSV: ['Training' 'PublicTest' 'PrivateTest']
Train samples: 28709, Validation samples: 3589, Test samples: 3589


In [None]:
# --- 6. CNN MODEL ARCHITECTURE (Facial) ---
class EmotionCNN(nn.Module):
    def __init__(self, num_classes):
        super(EmotionCNN, self).__init__()

        self.features = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=3, padding=1), nn.ReLU(), nn.BatchNorm2d(32), nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(32, 64, kernel_size=3, padding=1), nn.ReLU(), nn.BatchNorm2d(64), nn.MaxPool2d(kernel_size=2, stride=2), nn.Dropout(0.3),
            nn.Conv2d(64, 128, kernel_size=3, padding=1), nn.ReLU(), nn.BatchNorm2d(128), nn.MaxPool2d(kernel_size=2, stride=2), nn.Dropout(0.3)
        )

        self.classifier = nn.Sequential(
            nn.Linear(128 * 6 * 6, 256),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(256, num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        x = x.view(x.size(0), -1)
        x = self.classifier(x)
        return x

model_face = EmotionCNN(NUM_CLASSES).to(DEVICE)
print("Facial Model architecture defined.")

Facial Model architecture defined.


In [None]:
# --- 7. TRAINING AND TESTING FUNCTIONS (Facial) ---
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model_face.parameters(), lr=LEARNING_RATE, weight_decay=1e-5)

def train_model(model, loader, criterion, optimizer, device):
    model.train()
    total_loss = 0.0
    for images, labels in loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        loss = criterion(outputs, labels)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_loss += loss.item() * images.size(0)
    return total_loss / len(loader.dataset)

def evaluate_model(model, loader, device):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    return correct / total

# --- 8. MAIN TRAINING LOOP (Facial) ---
best_val_accuracy = 0.0
print("\nStarting Facial Emotion Model Training...")
for epoch in range(EPOCHS):
    train_loss = train_model(model_face, train_loader, criterion, optimizer, DEVICE)
    val_accuracy = evaluate_model(model_face, val_loader, DEVICE)

    print(f'Epoch [{epoch+1}/{EPOCHS}], Train Loss: {train_loss:.4f}, Val Accuracy: {val_accuracy*100:.2f}%')

    # Save the best model to Drive
    if val_accuracy > best_val_accuracy:
        best_val_accuracy = val_accuracy
        torch.save(model_face.state_dict(), MODEL_SAVE_PATH_FACE)
        print(f">>> Best Facial model saved to Drive: {MODEL_SAVE_PATH_FACE}")

# --- 9. FINAL TEST EVALUATION ---
model_face.load_state_dict(torch.load(MODEL_SAVE_PATH_FACE, map_location=DEVICE))
final_test_accuracy = evaluate_model(model_face, test_loader, DEVICE)
print(f"\nFINAL TEST ACCURACY of BEST FACIAL MODEL: {final_test_accuracy*100:.2f}%")


Starting Facial Emotion Model Training...
Epoch [1/20], Train Loss: 1.7609, Val Accuracy: 41.29%
>>> Best Facial model saved to Drive: /content/drive/MyDrive/best_fer2013_cnn.pth
Epoch [2/20], Train Loss: 1.5500, Val Accuracy: 47.06%
>>> Best Facial model saved to Drive: /content/drive/MyDrive/best_fer2013_cnn.pth
Epoch [3/20], Train Loss: 1.4559, Val Accuracy: 51.16%
>>> Best Facial model saved to Drive: /content/drive/MyDrive/best_fer2013_cnn.pth
Epoch [4/20], Train Loss: 1.4079, Val Accuracy: 52.55%
>>> Best Facial model saved to Drive: /content/drive/MyDrive/best_fer2013_cnn.pth
Epoch [5/20], Train Loss: 1.3647, Val Accuracy: 54.97%
>>> Best Facial model saved to Drive: /content/drive/MyDrive/best_fer2013_cnn.pth
Epoch [6/20], Train Loss: 1.3379, Val Accuracy: 55.08%
>>> Best Facial model saved to Drive: /content/drive/MyDrive/best_fer2013_cnn.pth
Epoch [7/20], Train Loss: 1.3140, Val Accuracy: 55.75%
>>> Best Facial model saved to Drive: /content/drive/MyDrive/best_fer2013_cnn.pt

In [None]:
# --- 10. INSTALLS AND IMPORTS (Speech) ---
!pip install -q librosa soundfile

import torch.nn as nn
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder
import librosa
import soundfile
import glob
import sys

# --- 11. CONFIGURATION (SPEECH MODEL) ---
# >>>>>> CRITICAL STEP: REPLACE THIS PATH with the EXACT path to the folder that contains your RAVDESS Actor_xx folders on your Drive <<<<<<
RAVDESS_PATH = '/content/drive/MyDrive/multimodel-fusion/archive (2)'
MODEL_SAVE_PATH_SPEECH = '/content/drive/MyDrive/best_ravdess_speech_cnnlstm.pth'

# Fixed parameters for audio processing
SAMPLE_RATE = 22050
N_MFCC = 40
MAX_PAD_LENGTH = 174
BATCH_SIZE = 64

# Emotion Mapping based on RAVDESS filename (Part 3)
EMOTION_MAP_RAVDESS = {
    '01': 'neutral', '02': 'calm', '03': 'happy', '04': 'sad',
    '05': 'angry', '06': 'fearful', '07': 'disgust', '08': 'surprised'
}
NUM_CLASSES_SPEECH = len(EMOTION_MAP_RAVDESS)
print(f"Speech model will be saved to: {MODEL_SAVE_PATH_SPEECH}")


# --- 12. FEATURE EXTRACTION FUNCTION (MFCCs) ---
def extract_feature(file_name, **kwargs):
    mfcc = kwargs.get("mfcc")
    try:
        with soundfile.SoundFile(file_name) as sound_file:
            X = sound_file.read(dtype="float32")
            sample_rate = sound_file.samplerate

            if sample_rate != SAMPLE_RATE:
                X = librosa.resample(X, orig_sr=sample_rate, target_sr=SAMPLE_RATE)
                sample_rate = SAMPLE_RATE

            if mfcc:
                result = librosa.feature.mfcc(y=X, sr=sample_rate, n_mfcc=N_MFCC)

                # Padding/Truncation
                if result.shape[1] > MAX_PAD_LENGTH:
                    result = result[:, :MAX_PAD_LENGTH]
                elif result.shape[1] < MAX_PAD_LENGTH:
                    pad_width = MAX_PAD_LENGTH - result.shape[1]
                    result = np.pad(result, pad_width=((0, 0), (0, pad_width)), mode='constant')

                return result
    except Exception as e:
        # Pass silently if file is corrupted, or raise error if needed
        return None

Speech model will be saved to: /content/drive/MyDrive/best_ravdess_speech_cnnlstm.pth


In [None]:
# --- 13. LOAD DATA AND EXTRACT FEATURES ---
def load_data(ravdess_dir):
    features = []
    labels = []

    if not os.path.isdir(ravdess_dir):
        print(f"❌ FATAL ERROR: RAVDESS directory not found at {ravdess_dir}.")
        sys.exit(1)

    print("Loading RAVDESS data and extracting MFCC features...")
    # Search for all WAV files recursively under the RAVDESS_PATH
    search_path = os.path.join(ravdess_dir, '**', '*.wav')

    file_list = glob.glob(search_path, recursive=True)
    if not file_list:
        print(f"❌ FATAL ERROR: No .wav files found in {ravdess_dir}. Is the folder structure correct?")
        sys.exit(1)

    for file in file_list:
        basename = os.path.basename(file)
        emotion_code = basename.split('-')[2]

        if emotion_code in EMOTION_MAP_RAVDESS:
            label = EMOTION_MAP_RAVDESS[emotion_code]
            feature = extract_feature(file, mfcc=True)

            if feature is not None:
                features.append(feature.T)
                labels.append(label)

    return np.array(features), np.array(labels)

# Load data (This can take time)
X, y = load_data(RAVDESS_PATH)
if X.size == 0:
    print("❌ FATAL ERROR: No audio files loaded.")
    sys.exit(1)
print(f"Total extracted samples: {X.shape[0]}, Feature shape (TimeSteps, N_MFCC): {X.shape[1:]}")

# --- 14. DATA PREPARATION ---
encoder = OneHotEncoder()
y_encoded = encoder.fit_transform(y.reshape(-1, 1)).toarray()

X_train_raw, X_test_raw, y_train_enc, y_test_enc = train_test_split(
    X, y_encoded, test_size=0.25, random_state=42, shuffle=True
)

scaler = StandardScaler()
X_train_flat = X_train_raw.reshape(-1, N_MFCC)
scaler.fit(X_train_flat)

X_train_scaled = scaler.transform(X_train_flat).reshape(X_train_raw.shape)
X_test_scaled = scaler.transform(X_test_raw.reshape(-1, N_MFCC)).reshape(X_test_raw.shape)

X_train_tensor = torch.tensor(X_train_scaled, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train_enc, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test_scaled, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test_enc, dtype=torch.float32)

train_data = torch.utils.data.TensorDataset(X_train_tensor, y_train_tensor)
test_data = torch.utils.data.TensorDataset(X_test_tensor, y_test_tensor)

train_loader_speech = DataLoader(train_data, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)
test_loader_speech = DataLoader(test_data, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)

print(f"Train samples (Speech): {len(train_data)}, Test samples (Speech): {len(test_data)}")

Loading RAVDESS data and extracting MFCC features...
Total extracted samples: 2870, Feature shape (TimeSteps, N_MFCC): (174, 40)
Train samples (Speech): 2152, Test samples (Speech): 718


In [None]:
# --- 15. CNN-LSTM MODEL ARCHITECTURE for SER ---
class SpeechEmotionCNN_LSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(SpeechEmotionCNN_LSTM, self).__init__()

        # 1D CNN Layers
        self.conv1d = nn.Sequential(
            nn.Conv1d(in_channels=input_size, out_channels=64, kernel_size=3, padding=1), nn.ReLU(), nn.BatchNorm1d(64), nn.Dropout(0.2),
            nn.Conv1d(in_channels=64, out_channels=64, kernel_size=3, padding=1), nn.ReLU(), nn.BatchNorm1d(64), nn.MaxPool1d(kernel_size=2)
        )

        # LSTM Layer
        self.lstm = nn.LSTM(
            input_size=64,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            bidirectional=True
        )

        # Fully Connected Classifier
        self.fc = nn.Linear(hidden_size * 2, num_classes)
        self.dropout = nn.Dropout(0.3)

    def forward(self, x):
        # Transpose for Conv1D: [B, N_MFCC, TimeSteps]
        x = x.transpose(1, 2)
        cnn_out = self.conv1d(x)

        # Transpose back for LSTM: [B, seq_len, features]
        lstm_input = cnn_out.transpose(1, 2)
        lstm_out, (h_n, c_n) = self.lstm(lstm_input)

        # Concatenate the final forward and backward hidden states
        final_state = torch.cat((h_n[-2,:,:], h_n[-1,:,:]), dim=1)

        out = self.dropout(final_state)
        out = self.fc(out)
        return out

# Model Hyperparameters
HIDDEN_SIZE = 128
NUM_LAYERS = 2

model_speech = SpeechEmotionCNN_LSTM(
    input_size=N_MFCC,
    hidden_size=HIDDEN_SIZE,
    num_layers=NUM_LAYERS,
    num_classes=NUM_CLASSES_SPEECH
).to(DEVICE)

print("Speech Model architecture defined.")

Speech Model architecture defined.


In [None]:
# --- 16. TRAINING AND TESTING FUNCTIONS (Speech) ---
criterion_speech = nn.CrossEntropyLoss()
optimizer_speech = optim.Adam(model_speech.parameters(), lr=LEARNING_RATE)
EPOCHS_SPEECH = 50

def train_model_speech(model, loader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    for X_batch, y_batch in loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        labels_index = torch.argmax(y_batch, dim=1)

        outputs = model(X_batch)
        loss = criterion(outputs, labels_index)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * X_batch.size(0)

    return running_loss / len(loader.dataset)

def evaluate_model_speech(model, loader, device):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for X_batch, y_batch in loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            labels_index = torch.argmax(y_batch, dim=1)

            outputs = model(X_batch)
            _, predicted = torch.max(outputs.data, 1)
            total += labels_index.size(0)
            correct += (predicted == labels_index).sum().item()

    accuracy = correct / total
    return accuracy

# --- 17. MAIN TRAINING LOOP (Speech) ---
best_val_accuracy_speech = 0.0
print("\nStarting Speech Model Training...")
for epoch in range(EPOCHS_SPEECH):
    train_loss = train_model_speech(model_speech, train_loader_speech, criterion_speech, optimizer_speech, DEVICE)
    val_accuracy = evaluate_model_speech(model_speech, test_loader_speech, DEVICE)

    print(f'Epoch [{epoch+1}/{EPOCHS_SPEECH}], Train Loss: {train_loss:.4f}, Val Accuracy: {val_accuracy*100:.2f}%')

    # Save the best model to Drive
    if val_accuracy > best_val_accuracy_speech:
        best_val_accuracy_speech = val_accuracy
        torch.save(model_speech.state_dict(), MODEL_SAVE_PATH_SPEECH)
        print(f">>> Best Speech Model saved to Drive: {MODEL_SAVE_PATH_SPEECH}")

# --- 18. FINAL TEST EVALUATION ---
model_speech.load_state_dict(torch.load(MODEL_SAVE_PATH_SPEECH, map_location=DEVICE))
final_test_accuracy_speech = evaluate_model_speech(model_speech, test_loader_speech, DEVICE)
print(f"\nFINAL TEST ACCURACY of BEST SPEECH MODEL: {final_test_accuracy_speech*100:.2f}%")


Starting Speech Model Training...
Epoch [1/50], Train Loss: 1.8500, Val Accuracy: 36.77%
>>> Best Speech Model saved to Drive: /content/drive/MyDrive/best_ravdess_speech_cnnlstm.pth
Epoch [2/50], Train Loss: 1.6508, Val Accuracy: 36.35%
Epoch [3/50], Train Loss: 1.5590, Val Accuracy: 40.95%
>>> Best Speech Model saved to Drive: /content/drive/MyDrive/best_ravdess_speech_cnnlstm.pth
Epoch [4/50], Train Loss: 1.5323, Val Accuracy: 39.00%
Epoch [5/50], Train Loss: 1.4838, Val Accuracy: 44.15%
>>> Best Speech Model saved to Drive: /content/drive/MyDrive/best_ravdess_speech_cnnlstm.pth
Epoch [6/50], Train Loss: 1.4744, Val Accuracy: 42.76%
Epoch [7/50], Train Loss: 1.3636, Val Accuracy: 46.24%
>>> Best Speech Model saved to Drive: /content/drive/MyDrive/best_ravdess_speech_cnnlstm.pth
Epoch [8/50], Train Loss: 1.2738, Val Accuracy: 50.56%
>>> Best Speech Model saved to Drive: /content/drive/MyDrive/best_ravdess_speech_cnnlstm.pth
Epoch [9/50], Train Loss: 1.2181, Val Accuracy: 50.84%
>>> B