<h1 align=center> 📹 End-to-End Video Emotion Recognition using MediaPipe and Pre-trained CNNs 🤖 </h1>

This project details a complete system for recognizing human emotions from short video clips, designed for a journaling application 📓 that uses video as a cover for entries.

A key innovation of this notebook is its unique approach to feature engineering. Instead of analyzing landmark data, this system processes videos by:
1.  Using **MediaPipe** 🌐 to detect and crop the face in each frame.
2.  Sampling these cropped face images at regular intervals throughout a video clip.
3.  Stacking the sequence of grayscale images into a single multi-channel tensor.

This tensor, which captures both spatial and temporal information, is then fed into a powerful, pre-trained **2D Convolutional Neural Network (CNN)** 🧠, such as EfficientNetV2 or MobileNetV3, for the final emotion classification. 😊

The resulting five emotion labels used for training are: 
- Anger
- Happy 
- Shock
- Neutral
- Sad

## 💡 Use Case Possibilities

While this model was initially designed for a journaling app, the core technology can be adapted for a wide range of powerful applications:

* **📓 Enhanced Digital Journaling & Mental Wellness:** The primary use case. The app can track mood patterns over time, helping users gain insight into their emotional wellbeing. It could suggest activities or resources based on detected emotional trends.

* **🧠 User Experience (UX) Research:** Companies can analyze user reactions to new software, websites, or products in real-time. This provides authentic, unbiased feedback on whether a feature is delightful, confusing, or frustrating.

* **📚 Adaptive E-Learning Platforms:** An online learning system could gauge a student's emotional state. If the system detects confusion or frustration, it could automatically offer hints, supplementary materials, or a different teaching approach.

* **🎬 Audience Reaction Analysis:** Media companies could use this to analyze audience reactions during movie screenings or trailer tests to gauge emotional engagement with the content.

* **🚗 Driver Monitoring Systems:** In-car cameras could use the model to detect driver states like drowsiness, distraction, or road rage, triggering safety alerts to prevent accidents.

<video width="600" controls>
  <source src="./Assets/journaling_illustration.mp4" type="video/mp4">
</video>

*The illustration is generated by AI

## ⚙️ 1. Project Setup and Data Acquisition

This initial section handles all the preliminary setup required to get the project running. It begins by importing the essential Python libraries that will be used throughout the notebook, including:
- **PyTorch & Torchvision** for building and training the neural networks 🧠.
- **OpenCV & MediaPipe** for video processing and facial landmark detection 📹.
- **Numpy & Pandas** for numerical operations and data handling 📊.
- **Scikit-learn** for performance evaluation metrics 📈.
- **Matplotlib & Seaborn** for data visualization 🎨.

Following the imports, the script automatically downloads the video dataset, which is stored as a ZIP file on Google Drive. It then extracts the contents into a local `./Dataset/Scrapping` directory and cleans up the downloaded ZIP file, ensuring the data is ready for the next stage of processing.

In [None]:
import os
import zipfile
import time
import csv
import gc

import cv2
import mediapipe as mp
import numpy as np
import pandas as pd
import tifffile as tiff

from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

import torchvision.transforms as T
import torchvision.models as models
from torchinfo import summary

from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score

import matplotlib.pyplot as plt
import seaborn as sns

import gdown

from dotenv import load_dotenv
load_dotenv()

In [None]:
dataset_download_id = os.getenv('DATASET_DOWNLOAD_ID')
zip_path = os.path.join(os.getcwd(), 'Dataset', 'Scrapping.zip')
destination_path = os.path.join(os.getcwd(), 'Dataset', 'Scrapping')

# Download from Google Drive
os.makedirs(destination_path, exist_ok=True)
gdown.download(id=dataset_download_id, output=zip_path, quiet=False)

# Extract the downloaded ZIP file
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(destination_path)

# Remove downloaded ZIP
os.remove(zip_path)

In [None]:
# File Paths
VIDEO_SOURCE_DIR = os.path.join(os.getcwd(), 'Dataset', 'Scrapping')

for dir_path in [VIDEO_SOURCE_DIR]:
    os.makedirs(dir_path, exist_ok=True)
    
# Feature Export Parameter
SEQUENCE_LENGTH = 90
MIN_SEQUENCE_LENGTH = 75
FACE_IMG_SIZE = (224, 224)
IMG_CAPTURE_INTERVAL = 5
VALIDATION_IDS = ['0016', '0017', '0018', '0019', '0020'] 

In [None]:
def get_face_bbox(frame_shape, landmarks, margin=0.1):
    """Calculate bounding box from landmarks with a margin."""
    h, w, _ = frame_shape
    x_coords = [lm.x * w for lm in landmarks]
    y_coords = [lm.y * h for lm in landmarks]
    
    x_min, x_max = min(x_coords), max(x_coords)
    y_min, y_max = min(y_coords), max(y_coords)
    
    x_margin = (x_max - x_min) * margin
    y_margin = (y_max - y_min) * margin
    
    x_min = int(max(0, x_min - x_margin))
    x_max = int(min(w, x_max + x_margin))
    y_min = int(max(0, y_min - y_margin))
    y_max = int(min(h, y_max + y_margin))
    
    return x_min, y_min, x_max, y_max

In [None]:
def write_video_chunk(output_path, frame_chunk, frame_size, fps):
    """
    Writes a list of frames to an MP4 video file.
    """
    
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    video_writer = cv2.VideoWriter(output_path, fourcc, fps, frame_size)
    for frame in frame_chunk:
        video_writer.write(frame)
    video_writer.release()

In [None]:
def normalize_landmarks(landmarks_flat):
    """
    Normalizes facial landmarks for a single frame to be scale and position invariant.
    """
    
    landmarks = np.array(landmarks_flat).reshape(-1, 3)
    
    x_coords = landmarks[:, 0]
    y_coords = landmarks[:, 1]
    z_coords = landmarks[:, 2]
    
    x_min, x_max = np.min(x_coords), np.max(x_coords)
    y_min, y_max = np.min(y_coords), np.max(y_coords)
    
    if x_max - x_min == 0 or y_max - y_min == 0:
        return landmarks_flat
        
    x_normalized = (x_coords - x_min) / (x_max - x_min)
    y_normalized = (y_coords - y_min) / (y_max - y_min)
    
    z_mean = np.mean(z_coords)
    z_normalized = z_coords - z_mean
    
    normalized_landmarks_flat = np.stack([x_normalized, y_normalized, z_normalized], axis=1).flatten().tolist()
    
    return normalized_landmarks_flat

In [None]:
def setup_video_processing(video_path, label):
    """
    Initializes paths, directories, and video capture for processing.
    Determines if the video is for training or validation based on VALIDATION_IDS.
    """
    
    base_filename = os.path.splitext(os.path.basename(video_path))[0]
    
    # Determine if the file is for training or validation
    video_id = base_filename.split('_')[1]
    split_folder = 'val' if video_id in VALIDATION_IDS else 'train'

    # Define output directories with the new structure
    output_dirs = {
        'csv': os.path.join(os.getcwd(), 'Data', 'Scrapping', split_folder, 'FaceMesh', label),
        'tiff': os.path.join(os.getcwd(), 'Data', 'Scrapping', split_folder, 'FaceImage', label),
        'video': os.path.join(os.getcwd(), 'Data', 'Scrapping', split_folder, 'Video', label)
    }

    for d in output_dirs.values():
        os.makedirs(d, exist_ok=True)

    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error: Could not open video {video_path}")
        return None

    video_properties = {
        'fps': cap.get(cv2.CAP_PROP_FPS),
        'frame_size': (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)))
    }
    
    return cap, base_filename, output_dirs, video_properties

In [None]:
def process_frame_data(face_mesh, frame, last_known_landmarks):
    """
    Processes a single frame to extract facial landmarks.
    """
    
    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    results = face_mesh.process(rgb_frame)
    
    landmarks_present = False
    if results.multi_face_landmarks:
        face_landmarks = results.multi_face_landmarks[0]
        current_landmarks_flat = [c for lm in face_landmarks.landmark for c in (lm.x, lm.y, lm.z)]
        landmarks_present = True
    else:
        face_landmarks = None
        current_landmarks_flat = last_known_landmarks

    normalized_landmarks = normalize_landmarks(current_landmarks_flat)
    return normalized_landmarks, face_landmarks, current_landmarks_flat, landmarks_present

In [None]:
def save_data_chunk(base_filename, chunk_idx, output_dirs, data_chunks, video_properties):
    """
    Saves the collected data chunks to files.
    """
    
    chunk_filename_base = f"{base_filename}_{chunk_idx:04d}"
    
    # Save CSV
    csv_path = os.path.join(output_dirs['csv'], f"{chunk_filename_base}.csv")
    header = ['frame'] + [f'p_{i}' for i in range(468 * 3)]
    with open(csv_path, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(header)
        writer.writerows(data_chunks['landmarks'])

    # Save TIFF
    if data_chunks['images']:
        tiff_path = os.path.join(output_dirs['tiff'], f"{chunk_filename_base}.tiff")
        images_array = np.array(data_chunks['images'])
        images_with_channel = np.expand_dims(images_array, axis=0) # Shape: (1, D, H, W)
        tiff.imwrite(tiff_path, images_with_channel, imagej=True)

    # Save Video
    video_path_out = os.path.join(output_dirs['video'], f"{chunk_filename_base}.mp4")
    write_video_chunk(video_path_out, data_chunks['frames'], video_properties['frame_size'], video_properties['fps'])

In [None]:
def process_video(video_path, label):
    """
    Video feature extraction pipeline.
    """
    
    setup_info = setup_video_processing(video_path, label)
    if not setup_info:
        return
    cap, base_filename, output_dirs, video_properties = setup_info

    frame_idx, chunk_idx = 0, 1
    landmark_chunk, image_chunk, frame_chunk = [], [], []
    last_known_landmarks = [0] * (468 * 3)
    last_known_image = np.zeros(FACE_IMG_SIZE, dtype=np.uint8)

    mp_face_mesh = mp.solutions.face_mesh
    with mp_face_mesh.FaceMesh(static_image_mode=False, max_num_faces=1, refine_landmarks=False, min_detection_confidence=0.5, min_tracking_confidence=0.5) as face_mesh:
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break

            frame_chunk.append(frame)
            
            normalized_landmarks, face_landmarks_obj, current_landmarks, landmarks_found = process_frame_data(face_mesh, frame, last_known_landmarks)
            last_known_landmarks = current_landmarks
            
            if frame_idx % IMG_CAPTURE_INTERVAL == 0:
                if landmarks_found:
                    x_min, y_min, x_max, y_max = get_face_bbox(frame.shape, face_landmarks_obj.landmark)
                    face_crop = frame[y_min:y_max, x_min:x_max]
                    if face_crop.size > 0:
                        gray_face = cv2.cvtColor(face_crop, cv2.COLOR_BGR2GRAY)
                        resized_face = cv2.resize(gray_face, FACE_IMG_SIZE, interpolation=cv2.INTER_AREA)
                        last_known_image = resized_face # Update the last known image
                        image_chunk.append(last_known_image)
                    else:
                        # If crop is empty, use the last known good image
                        image_chunk.append(last_known_image)
                else:
                    # If no landmarks are found, use the last known good image
                    image_chunk.append(last_known_image)

            landmark_chunk.append([frame_idx] + normalized_landmarks)
            frame_idx += 1

            if len(landmark_chunk) == SEQUENCE_LENGTH:
                data_chunks = {'landmarks': landmark_chunk, 'images': image_chunk, 'frames': frame_chunk}
                save_data_chunk(base_filename, chunk_idx, output_dirs, data_chunks, video_properties)
                
                chunk_idx += 1
                landmark_chunk, image_chunk, frame_chunk = [], [], []
                # Reset last_known_image for the new chunk to avoid carry-over
                last_known_image = np.zeros(FACE_IMG_SIZE, dtype=np.uint8)

    if len(landmark_chunk) >= MIN_SEQUENCE_LENGTH:
        last_landmark_row = landmark_chunk[-1]
        last_frame = frame_chunk[-1]
        
        # Pad the image chunk to its expected size using the last known image
        num_images_expected = -(-SEQUENCE_LENGTH // IMG_CAPTURE_INTERVAL) # Ceiling division
        while len(image_chunk) < num_images_expected:
            image_chunk.append(last_known_image)

        # Pad the landmark and frame chunks to the full sequence length
        while len(landmark_chunk) < SEQUENCE_LENGTH:
            landmark_chunk.append(last_landmark_row)
            frame_chunk.append(last_frame)
        
        data_chunks = {'landmarks': landmark_chunk, 'images': image_chunk, 'frames': frame_chunk}
        save_data_chunk(base_filename, chunk_idx, output_dirs, data_chunks, video_properties)

    cap.release()

In [None]:
def worker_task(args):
    filename, label = args
    video_path = os.path.join(VIDEO_SOURCE_DIR, filename)
    try:
        process_video(video_path, label)
    except Exception as e:
        print(f"Failed to process {video_path}. Error: {e}")

all_tasks = []
for filename in os.listdir(VIDEO_SOURCE_DIR):
    if filename.endswith('.mp4'):
        try:
            label = filename.split('_')[0] 
            all_tasks.append((filename, label))
        except IndexError:
            print(f"Skipping file with unexpected format: {filename}")


with ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
    list(tqdm(executor.map(worker_task, all_tasks), total=len(all_tasks), desc="Processing Videos"))

In [None]:
DATA_DIR = os.path.join(os.getcwd(), 'Data', 'Scrapping')
BATCH_SIZE = 24

TRAIN_DIR = os.path.join(DATA_DIR, 'train')
VAL_DIR = os.path.join(DATA_DIR, 'val')

for dir_path in [TRAIN_DIR, VAL_DIR]:
    os.makedirs(dir_path, exist_ok=True)

In [None]:
class ImageDataset(Dataset):
    """
    A dataset that loads only the TIFF image sequences and their corresponding labels.
    """
    def __init__(self, image_file_paths, labels, label_map, is_train=False):
        super().__init__()
        self.image_file_paths = image_file_paths
        self.str_labels = labels
        self.label_map = label_map
        self.is_train = is_train

        # Define the image augmentation pipeline, only used if is_train is True
        if self.is_train:
            self.image_augment = T.Compose([
                T.RandomHorizontalFlip(p=0.5),
                T.RandomAffine(degrees=30, translate=(0.2, 0.2), scale=(0.75, 1.05)),
                T.GaussianBlur(kernel_size=(3, 3), sigma=(0.1, 1.0)),
                T.RandomErasing(p=0.2, scale=(0.02, 0.1), ratio=(0.3, 3.3), value=0),
            ])

        self.image_tensors = []
        self.label_tensors = []

        def _load_file(args):
            image_path, label_str = args
            try:
                # Load image data (TIFF), normalize to [0, 1], and squeeze the first dimension
                image_data = torch.tensor(tiff.imread(image_path), dtype=torch.float32).squeeze(0) / 255.0

                # Apply augmentation only to the training set
                if self.is_train:
                    image_data = self.image_augment(image_data)

                # Get label
                label_int = self.label_map[label_str]
                label = torch.tensor(label_int, dtype=torch.long)
                
                return image_data, label
            except Exception as e:
                print(f"Skipping file due to error: {image_path} | Error: {e}")
                return None, None

        # Parallelized Data Loading
        tasks = zip(self.image_file_paths, self.str_labels)
        with ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
            results = list(tqdm(executor.map(_load_file, tasks), total=len(self.image_file_paths), desc="Processing image files"))

        for img_data, lbl in results:
            if all(x is not None for x in [img_data, lbl]):
                self.image_tensors.append(img_data)
                self.label_tensors.append(lbl)

    def __len__(self):
        return len(self.label_tensors)

    def __getitem__(self, idx):
        return self.image_tensors[idx], self.label_tensors[idx]

In [None]:
def get_image_files_and_labels(split_dir):
    files = []
    labels = []
    faceimage_dir = os.path.join(split_dir, 'FaceImage')
    if not os.path.isdir(faceimage_dir):
        return [], []
    
    emotion_folders = [d for d in os.listdir(faceimage_dir) if os.path.isdir(os.path.join(faceimage_dir, d))]
    for emotion in emotion_folders:
        emotion_path = os.path.join(faceimage_dir, emotion)
        tiff_files = [os.path.join(emotion_path, f) for f in os.listdir(emotion_path) if f.endswith('.tiff')]
        files.extend(tiff_files)
        labels.extend([emotion] * len(tiff_files))
    return files, labels

train_image_files, train_image_labels_str = get_image_files_and_labels(TRAIN_DIR)
val_image_files, val_image_labels_str = get_image_files_and_labels(VAL_DIR)

emotion_folders = [d for d in os.listdir(os.path.join(TRAIN_DIR, 'FaceImage')) if os.path.isdir(os.path.join(TRAIN_DIR, 'FaceImage', d))]
unique_labels = sorted(emotion_folders)
label_map = {label: i for i, label in enumerate(unique_labels)}
num_classes = len(label_map)

train_image_dataset = ImageDataset(train_image_files, train_image_labels_str, label_map, is_train=True)
val_image_dataset = ImageDataset(val_image_files, val_image_labels_str, label_map, is_train=False)

train_image_loader = DataLoader(train_image_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=0)
val_image_loader = DataLoader(val_image_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)

In [None]:
class EfficientNetV2ModelM(nn.Module):
    def __init__(self, num_classes, num_input_channels):
        super(EfficientNetV2ModelM, self).__init__()

        self.efficientnet = models.efficientnet_v2_m(weights=models.EfficientNet_V2_M_Weights.IMAGENET1K_V1)

        original_first_layer = self.efficientnet.features[0][0]
        # Create a new conv layer with the correct number of input channels
        first_layer = nn.Conv2d(
            in_channels=num_input_channels,
            out_channels=original_first_layer.out_channels,
            kernel_size=original_first_layer.kernel_size,
            stride=original_first_layer.stride,
            padding=original_first_layer.padding,
            bias=original_first_layer.bias
        )
        
        with torch.no_grad():
            first_layer.weight.data = original_first_layer.weight.data.mean(dim=1, keepdim=True).repeat(1, num_input_channels, 1, 1)

        self.efficientnet.features[0][0] = first_layer

        in_features = self.efficientnet.classifier[1].in_features
        hidden_dim = 512
        
        # Dense layers after EfficientNet
        self.efficientnet.classifier = nn.Sequential(
            nn.Dropout(p=0.6, inplace=False),
            nn.Linear(in_features, hidden_dim),
            nn.ReLU(inplace=False),
            nn.Dropout(p=0.5, inplace=False),
            nn.Linear(hidden_dim, num_classes)
        )

    def forward(self, x):
        return self.efficientnet(x)

In [None]:
class EfficientNetV2ModelS(nn.Module):
    def __init__(self, num_classes, num_input_channels):
        super(EfficientNetV2ModelS, self).__init__()

        self.efficientnet = models.efficientnet_v2_s(weights=models.EfficientNet_V2_S_Weights.IMAGENET1K_V1)

        original_first_layer = self.efficientnet.features[0][0]
        # Create a new conv layer with the correct number of input channels
        first_layer = nn.Conv2d(
            in_channels=num_input_channels,
            out_channels=original_first_layer.out_channels,
            kernel_size=original_first_layer.kernel_size,
            stride=original_first_layer.stride,
            padding=original_first_layer.padding,
            bias=original_first_layer.bias
        )
        
        with torch.no_grad():
            first_layer.weight.data = original_first_layer.weight.data.mean(dim=1, keepdim=True).repeat(1, num_input_channels, 1, 1)
            
        self.efficientnet.features[0][0] = first_layer
        
        in_features = self.efficientnet.classifier[1].in_features
        hidden_dim = 512
        
        # Dense layers after EfficientNet
        self.efficientnet.classifier = nn.Sequential(
            nn.Dropout(p=0.6, inplace=False),
            nn.Linear(in_features, hidden_dim),
            nn.ReLU(inplace=False),
            nn.Dropout(p=0.5, inplace=False),
            nn.Linear(hidden_dim, num_classes)
        )

    def forward(self, x):
        return self.efficientnet(x)

In [None]:
class MobileNetV3Large(nn.Module):
    def __init__(self, num_classes, num_input_channels):
        super(MobileNetV3Large, self).__init__()

        self.mobilenet = models.mobilenet_v3_large(weights=models.MobileNet_V3_Large_Weights.IMAGENET1K_V2)

        original_first_layer = self.mobilenet.features[0][0]
        first_layer = nn.Conv2d(
            in_channels=num_input_channels,
            out_channels=original_first_layer.out_channels,
            kernel_size=original_first_layer.kernel_size,
            stride=original_first_layer.stride,
            padding=original_first_layer.padding,
            bias=original_first_layer.bias
        )
        with torch.no_grad():
            first_layer.weight.data = original_first_layer.weight.data.mean(dim=1, keepdim=True).repeat(1, num_input_channels, 1, 1)
        self.mobilenet.features[0][0] = first_layer

        in_features = self.mobilenet.classifier[0].in_features
        hidden_dim = 512
        
        # Dense layers after MobileNet
        self.mobilenet.classifier = nn.Sequential(
            nn.Dropout(p=0.6, inplace=False),
            nn.Linear(in_features, hidden_dim),
            nn.ReLU(inplace=False),
            nn.Dropout(p=0.5, inplace=False),
            nn.Linear(hidden_dim, num_classes)
        )

    def forward(self, x):
        return self.mobilenet(x)

In [None]:
def train_model(
    model,
    train_loader,
    val_loader,
    num_epochs,
    device,
    save_dir,
    log_file_name,
    learning_rate=1e-4,
    weight_decay=1e-5,
    gamma=0.985
):
    
    os.makedirs(save_dir, exist_ok=True)
    log_file_path = os.path.join(save_dir, log_file_name)
    log_header = ['epoch', 'time_seconds', 'train_acc', 'train_loss', 'val_acc', 'val_loss', 'learning_rate']

    optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
    scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=gamma)
    criterion = nn.CrossEntropyLoss()

    best_val_acc = 0.0
    best_val_loss = float('inf')

    with open(log_file_path, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(log_header)

    start_time = time.time()

    for epoch in range(num_epochs):
        epoch_start_time = time.time()
        model.train()
        train_loss, train_correct, train_total = 0, 0, 0

        train_pbar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs} [Train]", leave=False)
        for image_batch, labels_batch in train_pbar:
            image_batch, labels_batch = image_batch.to(device), labels_batch.to(device)
            optimizer.zero_grad()
            outputs = model(image_batch)
            loss = criterion(outputs, labels_batch)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            train_total += labels_batch.size(0)
            train_correct += (predicted == labels_batch).sum().item()
            train_pbar.set_postfix({'loss': f'{loss.item():.4f}', 'acc': f'{(predicted == labels_batch).sum().item() / labels_batch.size(0):.4f}'})

        train_acc = 100 * train_correct / train_total
        avg_train_loss = train_loss / len(train_loader)

        # Validation
        model.eval()
        val_loss, val_correct, val_total = 0, 0, 0
        val_pbar = tqdm(val_loader, desc=f"Epoch {epoch+1}/{num_epochs} [Val]", leave=False)
        with torch.no_grad():
            for image_batch, labels_batch in val_pbar:
                image_batch, labels_batch = image_batch.to(device), labels_batch.to(device)
                outputs = model(image_batch)
                loss = criterion(outputs, labels_batch)
                val_loss += loss.item()
                _, predicted = torch.max(outputs.data, 1)
                val_total += labels_batch.size(0)
                val_correct += (predicted == labels_batch).sum().item()
                val_pbar.set_postfix({'loss': f'{loss.item():.4f}', 'acc': f'{(predicted == labels_batch).sum().item() / labels_batch.size(0):.4f}'})

        val_acc = 100 * val_correct / val_total
        avg_val_loss = val_loss / len(val_loader)
        scheduler.step()
        current_lr = scheduler.get_last_lr()[0]

        # Logging and Model Saving
        epoch_duration = time.time() - epoch_start_time
        print(f"Epoch {epoch+1}/{num_epochs} | Time: {epoch_duration:.2f}s | "
              f"Train Loss: {avg_train_loss:.4f}, Train Acc: {train_acc:.2f}% | "
              f"Val Loss: {avg_val_loss:.4f}, Val Acc: {val_acc:.2f}% | "
              f"LR: {current_lr:.6f}")

        with open(log_file_path, 'a', newline='') as f:
            writer = csv.writer(f)
            writer.writerow([epoch + 1, epoch_duration, train_acc, avg_train_loss, val_acc, avg_val_loss, current_lr])

        torch.save(model.state_dict(), os.path.join(save_dir, 'last_model.pth'))

        if val_acc > best_val_acc:
            best_val_acc = val_acc
            torch.save(model.state_dict(), os.path.join(save_dir, 'best_val_acc_model.pth'))
            print(f"🎉 New best val acc model saved: {val_acc:.2f}%")

        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            torch.save(model.state_dict(), os.path.join(save_dir, 'best_val_loss_model.pth'))
            print(f"✨ New best val loss model saved: {avg_val_loss:.4f}")

    total_training_time = time.time() - start_time
    print(f"\n--- Training Finished ---")
    print(f"Total Training Time: {total_training_time / 60:.2f} minutes")
    print(f"Best Validation Accuracy: {best_val_acc:.2f}%")
    print(f"Lowest Validation Loss: {best_val_loss:.4f}")
    
    del model
    del optimizer
    torch.cuda.empty_cache()
    gc.collect()

In [None]:
def evaluate_model(
    model_class, 
    model_kwargs, 
    best_model_path, 
    val_loader, 
    device, 
    label_map
):
    # Instantiate and load weights
    model = model_class(**model_kwargs).to(device)
    model.load_state_dict(torch.load(best_model_path, map_location=device))
    model.eval()

    all_preds = []
    all_labels = []

    with torch.no_grad():
        for images, labels in val_loader:
            images = images.to(device)
            outputs = model(images)
            preds = torch.argmax(outputs, dim=1).cpu().numpy()
            all_preds.extend(preds)
            all_labels.extend(labels.numpy())

    # Reverse label_map for readable output
    idx_to_label = {v: k for k, v in label_map.items()}
    target_names = [idx_to_label[i] for i in range(len(idx_to_label))]

    print("Classification Report:")
    print(classification_report(all_labels, all_preds, target_names=target_names))

    cm = confusion_matrix(all_labels, all_preds)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=target_names, yticklabels=target_names)
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.title("Confusion Matrix")
    plt.show()

In [None]:
image_channels = -(-SEQUENCE_LENGTH // IMG_CAPTURE_INTERVAL)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

In [None]:
efficientnetv2s_model = EfficientNetV2ModelS(
    num_classes=num_classes,
    num_input_channels=image_channels
).to(device)

print("EfficientNetV2-S Summary:")
print(summary(efficientnetv2s_model, input_size=(BATCH_SIZE, image_channels, FACE_IMG_SIZE[0], FACE_IMG_SIZE[1])), '\n\n')

train_model(
    efficientnetv2s_model,
    train_image_loader,
    val_image_loader,
    num_epochs=50,
    device=device,
    save_dir=os.path.join(os.getcwd(), "Models", f"Models_EfficientNetV2S_{IMG_CAPTURE_INTERVAL}"),
    log_file_name="training_logs_efficientnetv2s.csv"
)

In [None]:
efficientnetv2m_model = EfficientNetV2ModelM(
    num_classes=num_classes,
    num_input_channels=image_channels
).to(device)

print("EfficientNetV2-M Summary:")
print(summary(efficientnetv2m_model, input_size=(BATCH_SIZE, image_channels, FACE_IMG_SIZE[0], FACE_IMG_SIZE[1])), '\n\n')

train_model(
    efficientnetv2m_model,
    train_image_loader,
    val_image_loader,
    num_epochs=50,
    device=device,
    save_dir=os.path.join(os.getcwd(), "Models", f"Models_EfficientNetV2M_{IMG_CAPTURE_INTERVAL}"),
    log_file_name="training_logs_efficientnetv2m.csv"
)

In [None]:
mobilenetv3large_model = MobileNetV3Large(
    num_classes=num_classes,
    num_input_channels=image_channels
).to(device)

print("MobileNetV3Large Summary:")
print(summary(mobilenetv3large_model, input_size=(BATCH_SIZE, image_channels, FACE_IMG_SIZE[0], FACE_IMG_SIZE[1])), '\n\n')

train_model(
    mobilenetv3large_model,
    train_image_loader,
    val_image_loader,
    num_epochs=50,
    device=device,
    save_dir=os.path.join(os.getcwd(), "Models", f"Models_MobileNetV3Large_{IMG_CAPTURE_INTERVAL}"),
    log_file_name="training_logs_mobilenetv3large.csv"
)

In [None]:
evaluate_model(
    EfficientNetV2ModelS,
    {"num_classes": num_classes, "num_input_channels": image_channels},
    os.path.join(os.getcwd(), "Models", f"Models_EfficientNetV2S_{IMG_CAPTURE_INTERVAL}", "best_val_acc_model.pth"),
    val_image_loader,
    device,
    label_map
)

In [None]:
evaluate_model(
    EfficientNetV2ModelM,
    {"num_classes": num_classes, "num_input_channels": image_channels},
    os.path.join(os.getcwd(), "Models", f"Models_EfficientNetV2M_{IMG_CAPTURE_INTERVAL}", "best_val_acc_model.pth"),
    val_image_loader,
    device,
    label_map
)

In [None]:
evaluate_model(
    MobileNetV3Large,
    {"num_classes": num_classes, "num_input_channels": image_channels},
    os.path.join(os.getcwd(), "Models", f"Models_MobileNetV3Large_{IMG_CAPTURE_INTERVAL}", "best_val_acc_model.pth"),
    val_image_loader,
    device,
    label_map
)