# Import libraries, set up paths, and device

This cell imports necessary libraries and defines file paths and device settings.


In [1]:
import os
import cv2
import json
import torch
import optuna
import datetime
import numpy as np
from tqdm import tqdm
from pathlib import Path
from PIL import Image
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import pandas as pd
import pickle
from torchvision.models import mobilenet_v2, MobileNet_V2_Weights
from torch.amp import GradScaler, autocast
import matplotlib.pyplot as plt

# Define paths
BASE_DIR = Path("C:/Users/abhis/Downloads/Documents/Learner Engagement Project")
DATA_DIR = BASE_DIR / "data" / "DAiSEE"
FRAMES_DIR = DATA_DIR / "ExtractedFrames"
LABELS_DIR = DATA_DIR / "Labels"
MODEL_DIR = BASE_DIR / "models"
MODEL_DIR.mkdir(parents=True, exist_ok=True)

# Precomputed directory for caching best frames
CACHE_DIR = BASE_DIR / "cache"
CACHE_DIR.mkdir(exist_ok=True)

# Set device and CUDA configuration
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"
os.environ["PYTORCH_NO_CUDA_MEMORY_CACHING"] = "1"
os.environ["CUDA_VISIBLE_DEVICES"] = "0"

# Define data transforms for training and validation.
train_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
val_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

  from .autonotebook import tqdm as notebook_tqdm


Using device: cuda


# Define helper function for selecting 30 best frames using face detection and sharpness

This cell defines a function that, given a folder of extracted frames, divides them into 30 equal temporal segments and selects the best frame from each segment based on face detection and sharpness.


In [2]:
# Mapping CSV clip IDs.
def get_csv_clip_id(video_stem: str) -> str:
    base = video_stem.strip()
    if base.startswith("110001"):
        base = base.replace("110001", "202614", 1)
    return base

# Select best frames using face detection and Laplacian variance.
def select_impactful_frames(video_folder: Path, num_frames=30):
    frame_files = sorted(video_folder.glob("frame_*.jpg"))
    total_frames = len(frame_files)
    if total_frames == 0:
        return []
    if total_frames <= num_frames:
        return frame_files
    segment_size = total_frames // num_frames
    cascade_path = cv2.data.haarcascades + "haarcascade_frontalface_default.xml"
    face_cascade = cv2.CascadeClassifier(cascade_path)
    selected_frames = []
    for i in range(num_frames):
        start_idx = i * segment_size
        end_idx = (i + 1) * segment_size if i < num_frames - 1 else total_frames
        best_score = -1
        best_frame = None
        for fp in frame_files[start_idx:end_idx]:
            img = cv2.imread(str(fp))
            if img is None:
                continue
            gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
            faces = face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5)
            if len(faces) > 0:
                face = max(faces, key=lambda r: r[2]*r[3])
                x, y, w, h = face
                region = gray[y:y+h, x:x+w]
                quality = cv2.Laplacian(region, cv2.CV_64F).var()
            else:
                quality = cv2.Laplacian(gray, cv2.CV_64F).var()
            if quality > best_score:
                best_score = quality
                best_frame = fp
        if best_frame is not None:
            selected_frames.append(best_frame)
    return selected_frames

In [3]:
def precompute_best_frames(csv_file: Path, video_root: Path, num_frames=30, transform=None):
    """
    Precompute and cache the best frame paths for each video in the CSV.
    The results are saved to a pickle file and returned.
    """
    data = pd.read_csv(csv_file, dtype=str)
    data.columns = data.columns.str.strip()
    split = Path(csv_file).stem.replace("Labels", "").strip()
    precomputed = []  # list of lists for each video
    valid_indices = []  # valid data indices
    skipped_count = 0

    for idx, row in tqdm(data.iterrows(), total=len(data), desc="Precomputing best frames", dynamic_ncols=True):
        clip_id = str(row["ClipID"]).strip()
        if clip_id.endswith(('.avi', '.mp4')):
            clip_id = clip_id.rsplit('.', 1)[0]
        mapped_id = get_csv_clip_id(clip_id)
        video_folder = video_root / split / mapped_id
        if video_folder.exists():
            frame_files = sorted(video_folder.glob("frame_*.jpg"))
            if len(frame_files) >= num_frames:
                selected_frames = select_impactful_frames(video_folder, num_frames)
                precomputed.append(selected_frames)
                valid_indices.append(idx)
            else:
                skipped_count += 1
        else:
            skipped_count += 1
    print(f"Precomputation: Skipped {skipped_count} videos out of {len(data)}.")
    cache_data = {"valid_indices": valid_indices, "precomputed_frames": precomputed}
    cache_file = CACHE_DIR / f"precomputed_{Path(csv_file).stem}_frames.pkl"
    with open(cache_file, "wb") as f:
        pickle.dump(cache_data, f)
    print(f"Precomputed results saved to {cache_file}")
    return cache_data

# Define the custom Dataset for video classification

This cell defines a PyTorch Dataset that reads a CSV file with video IDs and 4 labels. For each video, it loads the 30 best frames using the above function, applies transforms, and returns a tensor and its label.


In [4]:
class VideoDataset(torch.utils.data.Dataset):
    def __init__(self, csv_file, video_root, transform=None, num_frames=30):
        self.csv_file = Path(csv_file)
        self.data = pd.read_csv(self.csv_file, dtype=str)
        self.data.columns = self.data.columns.str.strip()
        self.video_root = Path(video_root)
        self.transform = transform
        self.num_frames = num_frames
        self.split = self.csv_file.stem.replace("Labels", "").strip()
        cache_file = CACHE_DIR / f"precomputed_{self.csv_file.stem}_frames.pkl"
        if cache_file.exists():
            with open(cache_file, "rb") as f:
                cache_data = pickle.load(f)
            valid_indices = cache_data["valid_indices"]
            self.precomputed_frames = cache_data["precomputed_frames"]
            self.data = self.data.iloc[valid_indices].reset_index(drop=True)
            print(f"Loaded precomputed frames for {len(self.data)} videos from cache.")
        else:
            valid_rows = []
            self.precomputed_frames = []
            skipped_count = 0
            for idx, row in self.data.iterrows():
                clip_id = str(row["ClipID"]).strip()
                if clip_id.endswith(('.avi', '.mp4')):
                    clip_id = clip_id.rsplit('.', 1)[0]
                mapped_id = get_csv_clip_id(clip_id)
                video_folder = self.video_root / self.split / mapped_id
                if video_folder.exists():
                    frame_files = sorted(video_folder.glob("frame_*.jpg"))
                    if len(frame_files) >= num_frames:
                        selected_frames = select_impactful_frames(video_folder, num_frames)
                        valid_rows.append(row)
                        self.precomputed_frames.append(selected_frames)
                    else:
                        skipped_count += 1
                else:
                    skipped_count += 1
            self.data = pd.DataFrame(valid_rows)
            print(f"Computed frames on the fly: Skipped {skipped_count} videos.")
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        row = self.data.iloc[idx]
        selected_frame_paths = self.precomputed_frames[idx]
        frames = []
        for fp in selected_frame_paths:
            img = Image.open(fp).convert("RGB")
            if self.transform:
                img = self.transform(img)
            frames.append(img)
        frames_tensor = torch.stack(frames)
        labels = torch.tensor([
            int(row["Engagement"]),
            int(row["Boredom"]),
            int(row["Confusion"]),
            int(row["Frustration"])
        ], dtype=torch.long)
        return frames_tensor, labels

# Define the MobileNetV2-TCN model

This cell defines the MobileNetV2-TCN model. It processes a sequence of frames by applying MobileNetV2 on each frame, stacking the features, and feeding them to a temporal convolution network (TCN).


In [5]:
class MobileNetTCN(nn.Module):
    def __init__(self):
        super(MobileNetTCN, self).__init__()
        self.mobilenet = mobilenet_v2(weights=MobileNet_V2_Weights.IMAGENET1K_V1)
        self.mobilenet.classifier = nn.Identity()
        self.tcn = nn.Sequential(
            nn.Conv1d(1280, 512, kernel_size=3, dilation=2, padding=2),
            nn.ReLU(),
            nn.Conv1d(512, 16, kernel_size=1)
        )
    
    def forward(self, x):
        batch_size, num_frames, C, H, W = x.size()
        x_reshaped = x.view(-1, C, H, W)
        features_reshaped = self.mobilenet(x_reshaped)
        features = features_reshaped.view(batch_size, num_frames, -1).permute(0, 2, 1)
        out = self.tcn(features)
        out = out[:, :, -1]
        return out

# Define training, checkpointing, and evaluation functions

This cell defines the training loop which uses mixed precision (`torch.cuda.amp`), shows progress with `tqdm`, and saves checkpoints to resume training.


In [6]:
def save_checkpoint(model, optimizer, epoch, best_val_loss, checkpoint_path):
    state = {
        "epoch": epoch,
        "best_val_loss": best_val_loss,
        "model_state_dict": model.state_dict(),
        "optimizer_state_dict": optimizer.state_dict()
    }
    torch.save(state, checkpoint_path)

def load_checkpoint(model, optimizer, checkpoint_path):
    if os.path.exists(checkpoint_path):
        state = torch.load(checkpoint_path, map_location=device)
        model.load_state_dict(state["model_state_dict"])
        optimizer.load_state_dict(state["optimizer_state_dict"])
        return state["epoch"], state["best_val_loss"]
    return 0, float("inf")

def train_model(model, train_loader, val_loader, epochs, lr, checkpoint_path):
    model.to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    scaler = GradScaler()
    start_epoch, best_val_loss = load_checkpoint(model, optimizer, checkpoint_path)
    
    for epoch in range(start_epoch, epochs):
        model.train()
        running_loss = 0.0
        for frames, labels in tqdm(train_loader, desc=f"Epoch {epoch+1} Training", leave=False, total=len(train_loader), mininterval=1, dynamic_ncols=True):
            frames, labels = frames.to(device, non_blocking=True), labels.to(device, non_blocking=True)
            optimizer.zero_grad()
            with autocast(enabled=True, device_type='cuda'):
                outputs = model(frames)
                loss_fn = nn.CrossEntropyLoss()
                outputs_reshaped = outputs.view(outputs.size(0), 4, 4)
                loss = 0.0
                for d in range(4):
                    loss += loss_fn(outputs_reshaped[:, d, :], labels[:, d])
                loss = loss / 4.0
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
            running_loss += loss.item() * frames.size(0)
        train_loss = running_loss / len(train_loader.dataset)
        
        model.eval()
        val_loss = 0.0
        for frames, labels in tqdm(val_loader, desc=f"Epoch {epoch+1} Validation", leave=False, total=len(val_loader), mininterval=1, dynamic_ncols=True):
            frames, labels = frames.to(device, non_blocking=True), labels.to(device, non_blocking=True)
            with autocast(enabled=True, device_type='cuda'):
                outputs = model(frames)
                outputs_reshaped = outputs.view(outputs.size(0), 4, 4)
                loss = 0.0
                for d in range(4):
                    loss += loss_fn(outputs_reshaped[:, d, :], labels[:, d])
                loss = loss / 4.0
            val_loss += loss.item() * frames.size(0)
        val_loss = val_loss / len(val_loader.dataset)
        print(f"Epoch {epoch+1}/{epochs} - Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f}")
        
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            save_checkpoint(model, optimizer, epoch+1, best_val_loss, checkpoint_path)
    return best_val_loss

# Define the Optuna objective for hyperparameter tuning (using SQLite storage)

This cell defines an Optuna objective that trains the MobileNetV2-TCN model for a few epochs using hyperparameters suggested by the trial. The study is configured to use an SQLite database (`tuning.db`) for saving progress so tuning can be resumed.


In [7]:
def objective(trial):
    num_frames = trial.suggest_categorical("num_frames", [30])
    batch_size = trial.suggest_categorical("batch_size", [4, 8])
    lr = trial.suggest_float("lr", 1e-5, 5e-4, log=True)
    epochs = trial.suggest_int("epochs", 3, 5)
    
    from torch.utils.data import DataLoader
    train_dataset = VideoDataset(LABELS_DIR / "TrainLabels.csv", FRAMES_DIR, transform=train_transform, num_frames=num_frames)
    val_dataset = VideoDataset(LABELS_DIR / "ValidationLabels.csv", FRAMES_DIR, transform=val_transform, num_frames=num_frames)
    
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0, pin_memory=True)
    val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=0, pin_memory=True)
    
    model = MobileNetTCN()
    trial_checkpoint = MODEL_DIR / f"checkpoint_trial_{trial.number}.pth"
    
    try:
        best_val_loss = train_model(model, train_loader, val_loader, epochs, lr, trial_checkpoint)
        return best_val_loss
    except Exception as e:
        print(f"Trial {trial.number} failed: {e}")
        return float("inf")

# Evaluate and visualize results

This cell evaluates the final model on the test set and prints a classification report.


In [8]:
from sklearn.metrics import classification_report, confusion_matrix

def evaluate_model(model, test_loader):
    model.eval()
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for frames, labels in tqdm(test_loader, desc="Evaluating", dynamic_ncols=True):
            frames = frames.to(device)
            outputs = model(frames)
            outputs_reshaped = outputs.view(outputs.size(0), 4, 4)
            preds = torch.argmax(outputs_reshaped, dim=2)
            all_preds.append(preds.cpu())
            all_labels.append(labels)
    all_preds = torch.cat(all_preds, dim=0).numpy()
    all_labels = torch.cat(all_labels, dim=0).numpy()
    
    dims = ["Engagement", "Boredom", "Confusion", "Frustration"]
    for i, dim in enumerate(dims):
        print(f"Classification report for {dim}:")
        print(classification_report(all_labels[:, i], all_preds[:, i], digits=3))
        
        # Compute confusion matrix and plot it with matplotlib.
        cm = confusion_matrix(all_labels[:, i], all_preds[:, i])
        plt.figure(figsize=(6, 5))
        plt.imshow(cm, interpolation="nearest", cmap=plt.cm.Blues)
        plt.title(f"Confusion Matrix for {dim}")
        plt.colorbar()
        tick_marks = np.arange(cm.shape[0])
        plt.xticks(tick_marks, tick_marks)
        plt.yticks(tick_marks, tick_marks)
        plt.xlabel("Predicted label")
        plt.ylabel("True label")
        # Annotate each cell with its value.
        thresh = cm.max() / 2.
        for j in range(cm.shape[0]):
            for k in range(cm.shape[1]):
                plt.text(k, j, format(cm[j, k], 'd'),
                         horizontalalignment="center",
                         color="white" if cm[j, k] > thresh else "black")
        plt.tight_layout()
        plt.show()

## Main Execution


In [None]:
# Step 1: (Optional) Precompute and cache best frames for training and validation CSVs.
train_csv = LABELS_DIR / "TrainLabels.csv"
val_csv = LABELS_DIR / "ValidationLabels.csv"

cache_file_train = CACHE_DIR / f"precomputed_{Path(train_csv).stem}_frames.pkl"
if not cache_file_train.exists():
    print("Precomputing best frames for training data...")
    precompute_best_frames(train_csv, FRAMES_DIR, num_frames=30)

cache_file_val = CACHE_DIR / f"precomputed_{Path(val_csv).stem}_frames.pkl"      
if not cache_file_val.exists():
    print("Precomputing best frames for validation data...")      
    precompute_best_frames(val_csv, FRAMES_DIR, num_frames=30)

Precomputing best frames for training data...


Precomputing best frames:  83%|████████▎ | 4471/5358 [3:23:30<51:52,  3.51s/it]  

In [None]:
# Step 2: Run Optuna tuning with a progress bar.
n_trials = 10
study = optuna.create_study(
    direction="minimize",
    study_name="mobilev2_tcn_study",
    storage="sqlite:///tuning.db",
    load_if_exists=True
)
print("Starting Optuna hyperparameter tuning...")
pbar = tqdm(total=n_trials, desc="Optuna Trials", unit="trial", leave=True, dynamic_ncols=True)
study.optimize(objective, n_trials=n_trials, catch=(Exception,), callbacks=[lambda study, trial: pbar.update()])
pbar.close()
print("Optuna tuning complete.")
print("Best trial:", study.best_trial)

In [None]:
# Step 3: Final training with best hyperparameters.
best_trial = study.best_trial
num_frames = best_trial.params["num_frames"]
batch_size = best_trial.params["batch_size"]
lr = best_trial.params["lr"]
epochs = best_trial.params["epochs"]

from torch.utils.data import DataLoader
train_dataset = VideoDataset(train_csv, FRAMES_DIR, transform=train_transform, num_frames=num_frames)
val_dataset = VideoDataset(val_csv, FRAMES_DIR, transform=val_transform, num_frames=num_frames)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=0, pin_memory=True)

final_model = MobileNetTCN()
final_checkpoint = MODEL_DIR / "final_model_checkpoint.pth"
print("Starting final training with best hyperparameters...")
train_model(final_model, train_loader, val_loader, epochs, lr, final_checkpoint)

In [None]:
# Step 4: Evaluate the final model on test data with visualization.
test_csv = LABELS_DIR / "TestLabels.csv"  # update as needed
test_dataset = VideoDataset(test_csv, FRAMES_DIR, transform=val_transform, num_frames=num_frames)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=0, pin_memory=True)
print("Evaluating final model...")
evaluate_model(final_model, test_loader)