In [2]:
import os
import cv2
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
from torchvision import transforms, models
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score
import numpy as np

In [None]:
# 1. Dataset Loading and Preprocessing
class VideoEmotionDataset(data.Dataset):
    def __init__(self, root_dir, transform=None, sequence_length=30):
        self.root_dir = root_dir
        self.transform = transform
        self.sequence_length = sequence_length
        self.video_files = []
        self.labels = []

        for emotion_dir in os.listdir(root_dir):
            emotion_path = os.path.join(root_dir, emotion_dir)
            if os.path.isdir(emotion_path):
                for video_file in os.listdir(emotion_path):
                    if video_file.endswith(".mp4"):
                        self.video_files.append(os.path.join(emotion_path, video_file))
                        self.labels.append(emotion_dir)

        self.emotion_labels = sorted(list(set(self.labels)))
        self.label_to_index = {label: index for index, label in enumerate(self.emotion_labels)}
        self.labels = [self.label_to_index[label] for label in self.labels]

    def __len__(self):
        return len(self.video_files)

    def __getitem__(self, idx):
        video_path = self.video_files[idx]
        label = self.labels[idx]

        cap = cv2.VideoCapture(video_path)
        frames = []
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frames.append(frame)
        cap.release()

        if len(frames) >= self.sequence_length:
            start = 0
            if(len(frames) > self.sequence_length):
                start = torch.randint(0, len(frames) - self.sequence_length, (1,))
            frames = frames[start:start+self.sequence_length]
        else:
            pad_len = self.sequence_length - len(frames)
            frames = frames + frames[-1:] * pad_len

        if self.transform:
            frames = [self.transform(frame) for frame in frames]

        frames = torch.stack(frames)
        label = torch.tensor(label)
        return frames, label

In [None]:

# 2. Model Definition (CNN + RNN)
class CNN_RNN(nn.Module):
    def __init__(self, cnn, rnn):
        super(CNN_RNN, self).__init__()
        self.cnn = cnn
        self.rnn = rnn

    def forward(self, x):
        batch_size, time_steps, *input_shape = x.size()
        c_in = x.view(batch_size * time_steps, *input_shape)
        c_out = self.cnn(c_in)
        r_in = c_out.view(batch_size, time_steps, -1)
        r_out = self.rnn(r_in)
        return r_out

class EmotionRNN(nn.Module):
    def __init__(self, cnn_output_size, hidden_size, num_classes):
        super(EmotionRNN, self).__init__()
        self.lstm = nn.LSTM(cnn_output_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        out, _ = self.lstm(x)
        out = self.fc(out[:, -1, :])
        return out

In [None]:

# 3. Training and Evaluation
def train_model(model, train_loader, val_loader, criterion, optimizer, epochs, device):
    model.to(device)
    for epoch in range(epochs):
        model.train()
        train_loss = 0.0
        for video_batch, label_batch in train_loader:
            video_batch, label_batch = video_batch.to(device), label_batch.to(device)
            optimizer.zero_grad()
            outputs = model(video_batch)
            loss = criterion(outputs, label_batch)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
        print(f"Epoch {epoch+1}/{epochs}, Train Loss: {train_loss / len(train_loader)}")
        evaluate_model(model, val_loader, device)

def evaluate_model(model, data_loader, device):
    model.eval()
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for video_batch, label_batch in data_loader:
            video_batch, label_batch = video_batch.to(device), label_batch.to(device)
            outputs = model(video_batch)
            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(label_batch.cpu().numpy())
    accuracy = accuracy_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds, average='weighted')
    print(f"Validation Accuracy: {accuracy}, F1-Score: {f1}")

In [None]:
# 4. Main Execution
def main():
    # --- Data Paths and Hyperparameters ---
    root_dir = "dataset" # Replace with your dataset directory
    sequence_length = 30
    batch_size = 16
    epochs = 10
    learning_rate = 0.001
    hidden_size = 256
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # --- Transformations ---
    transform = transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])

    # --- Dataset and DataLoaders ---
    full_dataset = VideoEmotionDataset(root_dir=root_dir, transform=transform, sequence_length=sequence_length)
    train_size = int(0.8 * len(full_dataset))
    val_size = len(full_dataset) - train_size
    train_dataset, val_dataset = torch.utils.data.random_split(full_dataset, [train_size, val_size])

    train_loader = data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = data.DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

    # --- Model Initialization ---
    cnn = models.resnet50(pretrained=True)
    cnn = nn.Sequential(*list(cnn.children())[:-1])
    cnn_output_size = 2048
    num_classes = len(full_dataset.emotion_labels)
    rnn = EmotionRNN(cnn_output_size, hidden_size, num_classes)
    model = CNN_RNN(cnn, rnn)

    # --- Loss and Optimizer ---
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    # --- Training ---
    train_model(model, train_loader, val_loader, criterion, optimizer, epochs, device)

if __name__ == "__main__":
    main()