# Facial Expression Recognition with Multi-Task Learning

This notebook implements a comprehensive facial expression recognition system using PyTorch with multi-task learning for:
- Expression classification (8 classes)
- Valence regression
- Arousal regression

## Features:
- Multiple model architectures (ResNet18, MobileNetV2, UltraSimple)
- Data augmentation and preprocessing
- Comprehensive evaluation metrics
- Visualization tools
- GPU optimization


## 1. Imports and Setup


In [17]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
plt.switch_backend('Agg')
plt.ioff()
import seaborn as sns
import cv2
from typing import Tuple, List, Dict, Any
import warnings
warnings.filterwarnings('ignore')
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader as TorchDataLoader
from torchvision import transforms, models
from sklearn.metrics import (accuracy_score, f1_score, cohen_kappa_score, 
                           roc_auc_score, average_precision_score, 
                           mean_squared_error, classification_report,
                           confusion_matrix)
from sklearn.preprocessing import label_binarize
from scipy.stats import pearsonr
from scipy import stats
import krippendorff


In [18]:
# Set random seeds for reproducibility
np.random.seed(42)
torch.manual_seed(42)
torch.cuda.manual_seed(42)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# Backward-compat alias for notebooks/snippets that expect `DEVICE`
DEVICE = device

if torch.cuda.is_available():
    print(f"GPU detected: {torch.cuda.get_device_name(0)}")
    print(f"GPU memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")
    # Enable memory optimization
    torch.backends.cudnn.benchmark = True
    torch.backends.cudnn.deterministic = False  # Allow non-deterministic for speed
    # Enable memory efficient attention if available
    torch.backends.cuda.matmul.allow_tf32 = True
    torch.backends.cudnn.allow_tf32 = True
    # Prefer high matmul precision on Ampere+
    try:
        torch.set_float32_matmul_precision('high')
    except Exception:
        pass
    # Clear cache
    torch.cuda.empty_cache()
    print("✅ GPU optimizations enabled for maximum performance")
else:
    print("GPU not available, using CPU")


GPU detected: NVIDIA GeForce MX230
GPU memory: 2.1 GB
✅ GPU optimizations enabled for maximum performance


## 2. Configuration


In [19]:
class Config:
    """Configuration class for hyperparameters and paths"""
    
    # Paths
    DATASET_PATH = "Dataset"
    IMAGES_PATH = os.path.join(DATASET_PATH, "images")
    ANNOTATIONS_PATH = os.path.join(DATASET_PATH, "annotations")
    
    # Model parameters - Ultra conservative for stability
    IMG_SIZE = 224
    BATCH_SIZE = 16  # Very small batch size
    EPOCHS = 10  # run for two epochs
    LEARNING_RATE = 1e-5  # Extremely conservative learning rate
    # Single-model selection for notebook conversion
    # Options: 'UltraSimple', 'ResNet18', 'MobileNetV2'
    MODEL_NAME = 'ResNet18'
    # Whether to include regression losses during training
    USE_REGRESSION_LOSSES = False
    
    # Classes
    NUM_CLASSES = 8
    CLASS_NAMES = ['Neutral', 'Happy', 'Sad', 'Surprise', 'Fear', 'Disgust', 'Anger', 'Contempt']
    
    # Multi-task loss weights
    CLASSIFICATION_WEIGHT = 1.0
    VALENCE_WEIGHT = 0.5
    AROUSAL_WEIGHT = 0.5
    
    # Data split
    TRAIN_SPLIT = 0.8
    VAL_SPLIT = 0.2


## 3. Data Loading and Preprocessing


In [20]:
class FacialExpressionDataLoader:
    """Data loading and preprocessing class following Keras Idiomatic Programmer patterns"""
    
    def __init__(self, config: Config):
        self.config = config
        self.image_files = []
        self.annotations = {}
        
    def load_dataset(self) -> Tuple[np.ndarray, Dict[str, np.ndarray]]:
        """
        Load and preprocess the complete dataset
        
        Returns:
            images: Array of preprocessed images
            labels: Dictionary containing expression, valence, arousal, landmarks
        """
        print("Loading dataset...")
        
        # Get all image files
        image_files = [f for f in os.listdir(self.config.IMAGES_PATH) if f.endswith('.jpg')]
        image_ids = [f.split('.')[0] for f in image_files]
        
        images = []
        expressions = []
        valences = []
        arousals = []
        landmarks = []
        valid_indices = []
        
        for idx, image_id in enumerate(image_ids):
            try:
                # Load image
                img_path = os.path.join(self.config.IMAGES_PATH, f"{image_id}.jpg")
                img = cv2.imread(img_path)
                img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
                img = cv2.resize(img, (self.config.IMG_SIZE, self.config.IMG_SIZE))
                
                # Load annotations
                exp = np.load(os.path.join(self.config.ANNOTATIONS_PATH, f"{image_id}_exp.npy"))
                val = np.load(os.path.join(self.config.ANNOTATIONS_PATH, f"{image_id}_val.npy"))
                aro = np.load(os.path.join(self.config.ANNOTATIONS_PATH, f"{image_id}_aro.npy"))
                lnd = np.load(os.path.join(self.config.ANNOTATIONS_PATH, f"{image_id}_lnd.npy"))
                
                # Convert to proper numeric types
                exp = float(exp)
                val = float(val)
                aro = float(aro)
                lnd = lnd.astype(np.float32)
                
                # Filter out invalid entries (valence or arousal == -2)
                if val != -2 and aro != -2:
                    images.append(img)
                    expressions.append(int(exp))
                    valences.append(float(val))
                    arousals.append(float(aro))
                    landmarks.append(lnd)
                    valid_indices.append(idx)
                    
            except Exception as e:
                print(f"Error loading {image_id}: {e}")
                continue
        
        print(f"Loaded {len(images)} valid samples from {len(image_ids)} total images")
        
        # Convert to numpy arrays
        images = np.array(images, dtype=np.float32) / 255.0  # Normalize to [0,1]
        # Ensure images are in [H, W, C] format
        if len(images.shape) == 4 and images.shape[-1] == 3:
            pass  # Already in correct format
        else:
            print(f"Warning: Unexpected image shape: {images.shape}")
        
        expressions = np.array(expressions)
        valences = np.array(valences)
        arousals = np.array(arousals)
        landmarks = np.array(landmarks)
        
        # Prepare labels dictionary
        labels = {
            'expression': expressions,
            'valence': valences,
            'arousal': arousals,
            'landmarks': landmarks
        }
        
        return images, labels
    
    def create_data_splits(self, images: np.ndarray, labels: Dict[str, np.ndarray]) -> Tuple:
        """
        Create train/validation splits
        
        Args:
            images: Image data
            labels: Label dictionary
            
        Returns:
            Tuple of (train_images, train_labels, val_images, val_labels)
        """
        n_samples = len(images)
        n_train = int(n_samples * self.config.TRAIN_SPLIT)
        
        # Random shuffle
        indices = np.random.permutation(n_samples)
        train_indices = indices[:n_train]
        val_indices = indices[n_train:]
        
        # Split data
        train_images = images[train_indices]
        val_images = images[val_indices]
        
        train_labels = {key: val[train_indices] for key, val in labels.items()}
        val_labels = {key: val[val_indices] for key, val in labels.items()}
        
        print(f"Training samples: {len(train_images)}")
        print(f"Validation samples: {len(val_images)}")
        
        return train_images, train_labels, val_images, val_labels


In [21]:
class FacialExpressionDataset(Dataset):
    """PyTorch Dataset for facial expression recognition"""
    
    def __init__(self, images: np.ndarray, labels: Dict[str, np.ndarray], 
                 config: Config, transform=None, is_training=True):
        self.images = images
        self.labels = labels
        self.config = config
        self.transform = transform
        self.is_training = is_training
        
        # Convert to tensors
        self.expressions = torch.LongTensor(labels['expression'])
        self.valences = torch.FloatTensor(labels['valence'])
        self.arousals = torch.FloatTensor(labels['arousal'])
        # Normalize landmarks to [0,1] using image size (assumed 0..IMG_SIZE)
        lnd = labels['landmarks']
        # If landmarks are pixel coords, scale by IMG_SIZE; guard divide-by-zero
        denom = max(1, self.config.IMG_SIZE)
        self.landmarks = torch.FloatTensor(lnd / denom)
        
    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, idx):
        # Get image
        image = self.images[idx]
        
        # Convert to PIL Image for transforms
        if isinstance(image, np.ndarray):
            image = (image * 255).astype(np.uint8)
            # Keep as ndarray; transforms pipeline will handle ToPILImage
        
        # Apply transforms
        if self.transform:
            image = self.transform(image)
        else:
            # Ensure proper tensor format: [C, H, W]
            image = transforms.ToTensor()(image)
        
        # Get labels
        expression = self.expressions[idx]
        valence = self.valences[idx]
        arousal = self.arousals[idx]
        landmarks = self.landmarks[idx]
        
        return {
            'image': image,
            'landmarks': landmarks,
            'expression': expression,
            'valence': valence,
            'arousal': arousal
        }


In [22]:
class DataAugmentation:
    """Data augmentation class for training data"""
    
    def __init__(self, config: Config):
        self.config = config
        
        # Training transforms with augmentation
        self.train_transform = transforms.Compose([
            transforms.ToPILImage(),
            transforms.RandomRotation(30),
            transforms.RandomResizedCrop(config.IMG_SIZE, scale=(0.8, 1.0)),
            transforms.RandomHorizontalFlip(),
            transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])
        
        # Validation transforms without augmentation
        self.val_transform = transforms.Compose([
            transforms.ToPILImage(),
            transforms.Resize((config.IMG_SIZE, config.IMG_SIZE)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])
    
    def get_transforms(self, is_training=True):
        """Get appropriate transforms"""
        return self.train_transform if is_training else self.val_transform


## 4. Model Definitions


In [23]:
class UltraSimpleModel(nn.Module):
    """Ultra simple model for debugging NaN issues"""
    
    def __init__(self, config: Config):
        super(UltraSimpleModel, self).__init__()
        self.config = config
        
        # Very simple CNN
        self.conv_layers = nn.Sequential(
            nn.Conv2d(3, 16, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            
            nn.Conv2d(16, 32, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            
            nn.Conv2d(32, 64, 3, padding=1),
            nn.ReLU(),
            nn.AdaptiveAvgPool2d((4, 4))
        )
        
        # Simple landmark processing
        self.landmark_net = nn.Sequential(
            nn.Linear(136, 32),
            nn.ReLU()
        )
        
        # Simple fusion
        self.fusion = nn.Sequential(
            nn.Linear(64 * 4 * 4 + 32, 128),
            nn.ReLU()
        )
        
        # Only expression classification for now
        self.expression_head = nn.Linear(128, config.NUM_CLASSES)
        
        # Initialize weights very conservatively
        self._initialize_weights()
    
    def _initialize_weights(self):
        """Very conservative weight initialization"""
        for m in self.modules():
            if isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, mean=0, std=0.01)
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Conv2d):
                nn.init.normal_(m.weight, mean=0, std=0.01)
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
    
    def forward(self, image, landmarks):
        # Extract image features
        img_features = self.conv_layers(image)
        img_features = img_features.view(img_features.size(0), -1)
        
        # Process landmarks
        landmark_features = self.landmark_net(landmarks)
        
        # Fuse features
        fused_features = torch.cat([img_features, landmark_features], dim=1)
        fused_features = self.fusion(fused_features)
        
        # Only expression output for now
        expression_output = self.expression_head(fused_features)
        
        # Return dummy outputs for valence/arousal to maintain compatibility
        batch_size = expression_output.size(0)
        valence_output = torch.zeros(batch_size, 1, device=expression_output.device)
        arousal_output = torch.zeros(batch_size, 1, device=expression_output.device)
        
        return {
            'expression': expression_output,
            'valence': valence_output,
            'arousal': arousal_output
        }


In [24]:
class MultiTaskNet(nn.Module):
    """Unified multi-task head wrapper for CNN backbones (expression, valence, arousal)."""
    def __init__(self, backbone: nn.Module, feature_dim: int, num_classes: int):
        super().__init__()
        self.backbone = backbone
        self.expression_head = nn.Linear(feature_dim, num_classes)
        self.arousal_head = nn.Linear(feature_dim, 1)
        self.valence_head = nn.Linear(feature_dim, 1)

        # Light-weight initialization for heads
        nn.init.xavier_uniform_(self.expression_head.weight, gain=0.1)
        nn.init.constant_(self.expression_head.bias, 0)
        nn.init.xavier_uniform_(self.arousal_head.weight, gain=0.1)
        nn.init.constant_(self.arousal_head.bias, 0)
        nn.init.xavier_uniform_(self.valence_head.weight, gain=0.1)
        nn.init.constant_(self.valence_head.bias, 0)

    def forward(self, image, landmarks=None):
        # Backbones here produce flat feature vectors already (Identity classifiers)
        features = self.backbone(image)
        # Some backbones may return tuples
        if isinstance(features, (list, tuple)):
            features = features[0]
        expression_output = self.expression_head(features)
        # Keep regression heads unconstrained; evaluation can clip if needed
        arousal_output = self.arousal_head(features)
        valence_output = self.valence_head(features)
        return {
            'expression': expression_output,
            'valence': valence_output,
            'arousal': arousal_output
        }

def build_resnet18_multitask(num_classes: int) -> nn.Module:
    """ResNet18 backbone with multi-task heads (pretrained on ImageNet)."""
    m = models.resnet18(weights=models.ResNet18_Weights.IMAGENET1K_V1)
    feat_dim = m.fc.in_features
    m.fc = nn.Identity()
    return MultiTaskNet(m, feat_dim, num_classes)

def build_mobilenet_v2_multitask(num_classes: int) -> nn.Module:
    """MobileNetV2 backbone with multi-task heads (pretrained on ImageNet)."""
    m = models.mobilenet_v2(weights=models.MobileNet_V2_Weights.IMAGENET1K_V1)
    feat_dim = m.last_channel
    # Expose flat pooled features by bypassing classifier
    m.classifier = nn.Identity()
    return MultiTaskNet(m, feat_dim, num_classes)


## 5. Metrics and Evaluation


In [25]:
class MetricsCalculator:
    """Comprehensive metrics calculation for multi-task learning"""
    
    @staticmethod
    def classification_metrics(y_true: np.ndarray, y_pred: np.ndarray, 
                             y_pred_proba: np.ndarray = None) -> Dict[str, float]:
        """
        Calculate classification metrics
        
        Args:
            y_true: Ground truth labels
            y_pred: Predicted labels
            y_pred_proba: Predicted probabilities
            
        Returns:
            Dictionary of metrics
        """
        metrics = {}
        
        # Basic metrics
        metrics['accuracy'] = accuracy_score(y_true, y_pred)
        metrics['f1_macro'] = f1_score(y_true, y_pred, average='macro')
        metrics['f1_weighted'] = f1_score(y_true, y_pred, average='weighted')
        metrics['cohen_kappa'] = cohen_kappa_score(y_true, y_pred)
        
        # Krippendorff's Alpha (requires specific format)
        try:
            # Convert to reliability data format for krippendorff
            reliability_data = np.array([y_true, y_pred])
            metrics['krippendorff_alpha'] = krippendorff.alpha(reliability_data, level_of_measurement='ordinal')
        except:
            metrics['krippendorff_alpha'] = 0.0
        
        # Multi-class AUC metrics
        if y_pred_proba is not None:
            try:
                # One-vs-Rest AUC
                y_true_bin = label_binarize(y_true, classes=range(8))
                metrics['auc_roc_ovr'] = roc_auc_score(y_true_bin, y_pred_proba, 
                                                      multi_class='ovr', average='macro')
                metrics['auc_pr_macro'] = average_precision_score(y_true_bin, y_pred_proba, 
                                                                 average='macro')
            except:
                metrics['auc_roc_ovr'] = 0.0
                metrics['auc_pr_macro'] = 0.0
        
        return metrics
    
    @staticmethod
    def regression_metrics(y_true: np.ndarray, y_pred: np.ndarray) -> Dict[str, float]:
        """
        Calculate regression metrics
        
        Args:
            y_true: Ground truth values
            y_pred: Predicted values
            
        Returns:
            Dictionary of metrics
        """
        metrics = {}
        
        # RMSE
        metrics['rmse'] = np.sqrt(mean_squared_error(y_true, y_pred))
        
        # Pearson Correlation
        corr, p_value = pearsonr(y_true.flatten(), y_pred.flatten())
        metrics['pearson_corr'] = corr
        metrics['pearson_p_value'] = p_value
        
        # Sign Agreement Metric (SAGR)
        # Penalize sign mismatches and deviation
        sign_true = np.sign(y_true)
        sign_pred = np.sign(y_pred)
        sign_agreement = np.mean(sign_true == sign_pred)
        
        # Additional penalty for magnitude deviation
        magnitude_penalty = np.mean(np.abs(y_true - y_pred))
        metrics['sagr'] = sign_agreement - 0.1 * magnitude_penalty
        
        # Concordance Correlation Coefficient (CCC)
        # CCC = 2 * rho * std_x * std_y / (std_x^2 + std_y^2 + (mean_x - mean_y)^2)
        mean_true = np.mean(y_true)
        mean_pred = np.mean(y_pred)
        std_true = np.std(y_true)
        std_pred = np.std(y_pred)
        
        numerator = 2 * corr * std_true * std_pred
        denominator = std_true**2 + std_pred**2 + (mean_true - mean_pred)**2
        metrics['ccc'] = numerator / (denominator + 1e-8)  # Add small epsilon to avoid division by zero
        
        return metrics


## 6. Training Pipeline


In [26]:
class Trainer:
    """Training pipeline manager"""
    
    def __init__(self, config: Config):
        self.config = config
        self.history = {}
        self.device = device
        
    def create_data_loaders(self, train_data: Tuple, val_data: Tuple, augmentation: DataAugmentation):
        """Create PyTorch data loaders"""
        train_images, train_labels = train_data
        val_images, val_labels = val_data
        
        # Create datasets
        train_dataset = FacialExpressionDataset(
            train_images, train_labels, self.config, 
            transform=augmentation.get_transforms(is_training=True), 
            is_training=True
        )
        
        val_dataset = FacialExpressionDataset(
            val_images, val_labels, self.config,
            transform=augmentation.get_transforms(is_training=False),
            is_training=False
        )
        
        # Create data loaders with Windows-compatible parameters
        train_loader = TorchDataLoader(
            train_dataset, 
            batch_size=self.config.BATCH_SIZE, 
            shuffle=True, 
            num_workers=0,  # Set to 0 for Windows compatibility
            pin_memory=True,
        )
        
        val_loader = TorchDataLoader(
            val_dataset, 
            batch_size=self.config.BATCH_SIZE, 
            shuffle=False, 
            num_workers=0,  # Set to 0 for Windows compatibility
            pin_memory=True,
        )
        
        return train_loader, val_loader
    
    def compute_loss(self, outputs: Dict, targets: Dict):
        """Compute loss - focus only on expression classification for debugging"""
        # Classification loss (with label smoothing to avoid collapse)
        exp_loss = F.cross_entropy(
            outputs['expression'], targets['expression'], label_smoothing=0.1
        )

        if getattr(self.config, 'USE_REGRESSION_LOSSES', False):
            # Optional regression losses when heads are present
            val_loss = F.mse_loss(outputs['valence'].squeeze(), targets['valence'])
            aro_loss = F.mse_loss(outputs['arousal'].squeeze(), targets['arousal'])
            total_loss = (
                self.config.CLASSIFICATION_WEIGHT * exp_loss
                + self.config.VALENCE_WEIGHT * val_loss
                + self.config.AROUSAL_WEIGHT * aro_loss
            )
        else:
            # Dummy losses for compatibility
            val_loss = torch.tensor(0.0, device=exp_loss.device)
            aro_loss = torch.tensor(0.0, device=exp_loss.device)
            total_loss = exp_loss
        
        return {
            'total_loss': total_loss,
            'expression_loss': exp_loss,
            'valence_loss': val_loss,
            'arousal_loss': aro_loss
        }
    
    def train_model(self, model: nn.Module, train_data: Tuple, val_data: Tuple, 
                   model_name: str, augmentation: DataAugmentation) -> Dict[str, Any]:
        """
        Train a PyTorch model with the given data
        
        Args:
            model: PyTorch model to train
            train_data: Training data tuple (images, labels)
            val_data: Validation data tuple (images, labels)
            model_name: Name for saving/logging
            augmentation: Data augmentation object
            
        Returns:
            Training history and metrics
        """
        # Move model to device
        model = model.to(self.device)
        
        # Create data loaders
        train_loader, val_loader = self.create_data_loaders(train_data, val_data, augmentation)
        
        # Setup optimizer with optimized parameters
        optimizer = optim.AdamW(model.parameters(), lr=self.config.LEARNING_RATE, weight_decay=1e-4, eps=1e-8)
        scheduler = optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer, T_0=10, T_mult=2, eta_min=1e-6)
        
        # Training history
        history = {
            'train_loss': [], 'val_loss': [],
            'train_exp_acc': [], 'val_exp_acc': [],
            'train_val_mse': [], 'val_val_mse': [],
            'train_aro_mse': [], 'val_aro_mse': []
        }
        
        print(f"\nTraining {model_name}...")
        print(f"Model parameters: {sum(p.numel() for p in model.parameters()):,}")
        
        best_val_loss = float('inf')
        patience_counter = 0
        
        import time
        start_time = time.time()
        
        # Enable mixed precision training for faster training
        scaler = torch.cuda.amp.GradScaler() if torch.cuda.is_available() else None
        
        for epoch in range(self.config.EPOCHS):
            epoch_start_time = time.time()
            
            # Training phase
            model.train()
            train_losses = []
            train_exp_correct = 0
            train_total = 0
            train_val_mse = 0
            train_aro_mse = 0
            
            # Progress tracking
            num_batches = len(train_loader)
            
            for batch_idx, batch in enumerate(train_loader):
                # Move data to device (non-blocking for better performance)
                images = batch['image'].to(self.device, non_blocking=True)
                # Use channels_last for better GPU throughput on NVIDIA
                if images.is_cuda:
                    images = images.to(memory_format=torch.channels_last)
                landmarks = batch['landmarks'].to(self.device, non_blocking=True)
                exp_targets = batch['expression'].to(self.device, non_blocking=True)
                val_targets = batch['valence'].to(self.device, non_blocking=True)
                aro_targets = batch['arousal'].to(self.device, non_blocking=True)
                
                # Debug: Check input values
                if batch_idx == 0:
                    print(f"🔍 Debug - Batch {batch_idx+1}:")
                    print(f"  Images: min={images.min():.4f}, max={images.max():.4f}, mean={images.mean():.4f}")
                    print(f"  Landmarks: min={landmarks.min():.4f}, max={landmarks.max():.4f}, mean={landmarks.mean():.4f}")
                    print(f"  Expression targets: {exp_targets[:5]}")
                    print(f"  Valence targets: {val_targets[:5]}")
                    print(f"  Arousal targets: {aro_targets[:5]}")
                
                # Mixed precision training
                if scaler is not None:
                    with torch.cuda.amp.autocast():
                        outputs = model(images, landmarks)
                        targets = {
                            'expression': exp_targets,
                            'valence': val_targets,
                            'arousal': aro_targets
                        }
                        losses = self.compute_loss(outputs, targets)
                    
                    # Check for NaN loss
                    if torch.isnan(losses['total_loss']):
                        print(f"⚠️  NaN loss detected at batch {batch_idx+1}, skipping...")
                        if batch_idx < 5:  # Debug first few batches
                            print(f"  Outputs: exp={outputs['expression'][:2]}, val={outputs['valence'][:2]}, aro={outputs['arousal'][:2]}")
                        continue
                    
                    optimizer.zero_grad()
                    scaler.scale(losses['total_loss']).backward()
                    # Gradient clipping
                    scaler.unscale_(optimizer)
                    torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
                    scaler.step(optimizer)
                    scaler.update()
                else:
                    outputs = model(images, landmarks)
                    targets = {
                        'expression': exp_targets,
                        'valence': val_targets,
                        'arousal': aro_targets
                    }
                    losses = self.compute_loss(outputs, targets)
                    
                    # Check for NaN loss
                    if torch.isnan(losses['total_loss']):
                        print(f"⚠️  NaN loss detected at batch {batch_idx+1}, skipping...")
                        continue
                    
                    optimizer.zero_grad()
                    losses['total_loss'].backward()
                    # Gradient clipping
                    torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
                    optimizer.step()
                
                # Track metrics
                train_losses.append(losses['total_loss'].item())
                
                # Expression accuracy
                exp_pred = torch.argmax(outputs['expression'], dim=1)
                train_exp_correct += (exp_pred == exp_targets).sum().item()
                train_total += exp_targets.size(0)
                
                # Regression MSE
                train_val_mse += F.mse_loss(outputs['valence'].squeeze(), val_targets).item()
                train_aro_mse += F.mse_loss(outputs['arousal'].squeeze(), aro_targets).item()
                
                # Print progress every 10% of batches
                if (batch_idx + 1) % max(1, num_batches // 10) == 0:
                    progress = (batch_idx + 1) / num_batches * 100
                    print(f"  Batch {batch_idx+1}/{num_batches} ({progress:.1f}%) - Loss: {losses['total_loss'].item():.4f}")
            
            # Validation phase
            model.eval()
            val_losses = []
            val_exp_correct = 0
            val_total = 0
            val_val_mse = 0
            val_aro_mse = 0
            
            with torch.no_grad():
                for batch in val_loader:
                    images = batch['image'].to(self.device, non_blocking=True)
                    if images.is_cuda:
                        images = images.to(memory_format=torch.channels_last)
                    landmarks = batch['landmarks'].to(self.device, non_blocking=True)
                    exp_targets = batch['expression'].to(self.device, non_blocking=True)
                    val_targets = batch['valence'].to(self.device, non_blocking=True)
                    aro_targets = batch['arousal'].to(self.device, non_blocking=True)
                    
                    if scaler is not None:
                        with torch.cuda.amp.autocast():
                            outputs = model(images, landmarks)
                            targets = {
                                'expression': exp_targets,
                                'valence': val_targets,
                                'arousal': aro_targets
                            }
                            losses = self.compute_loss(outputs, targets)
                    else:
                        outputs = model(images, landmarks)
                        targets = {
                            'expression': exp_targets,
                            'valence': val_targets,
                            'arousal': aro_targets
                        }
                        losses = self.compute_loss(outputs, targets)
                    
                    val_losses.append(losses['total_loss'].item())
                    
                    # Expression accuracy
                    exp_pred = torch.argmax(outputs['expression'], dim=1)
                    val_exp_correct += (exp_pred == exp_targets).sum().item()
                    val_total += exp_targets.size(0)
                    
                    # Regression MSE
                    val_val_mse += F.mse_loss(outputs['valence'].squeeze(), val_targets).item()
                    val_aro_mse += F.mse_loss(outputs['arousal'].squeeze(), aro_targets).item()
            
            # Calculate epoch metrics
            train_loss = np.mean(train_losses)
            val_loss = np.mean(val_losses)
            train_exp_acc = train_exp_correct / train_total
            val_exp_acc = val_exp_correct / val_total
            train_val_mse_avg = train_val_mse / len(train_loader)
            val_val_mse_avg = val_val_mse / len(val_loader)
            train_aro_mse_avg = train_aro_mse / len(train_loader)
            val_aro_mse_avg = val_aro_mse / len(val_loader)
            
            # Update history
            history['train_loss'].append(train_loss)
            history['val_loss'].append(val_loss)
            history['train_exp_acc'].append(train_exp_acc)
            history['val_exp_acc'].append(val_exp_acc)
            history['train_val_mse'].append(train_val_mse_avg)
            history['val_val_mse'].append(val_val_mse_avg)
            history['train_aro_mse'].append(train_aro_mse_avg)
            history['val_aro_mse'].append(val_aro_mse_avg)
            
            # Calculate epoch time
            epoch_time = time.time() - epoch_start_time
            
            # Print detailed epoch results
            print(f'\n{"="*80}')
            print(f'EPOCH {epoch+1}/{self.config.EPOCHS} - {model_name}')
            print(f'{"="*80}')
            print(f'⏱️  Epoch Time: {epoch_time:.2f}s')
            print(f'📊 Training Metrics:')
            print(f'   Loss: {train_loss:.4f} | Accuracy: {train_exp_acc:.4f}')
            print(f'   Valence MSE: {train_val_mse_avg:.4f} | Arousal MSE: {train_aro_mse_avg:.4f}')
            print(f'📈 Validation Metrics:')
            print(f'   Loss: {val_loss:.4f} | Accuracy: {val_exp_acc:.4f}')
            print(f'   Valence MSE: {val_val_mse_avg:.4f} | Arousal MSE: {val_aro_mse_avg:.4f}')
            print(f'🎯 Learning Rate: {optimizer.param_groups[0]["lr"]:.2e}')
            
            # Learning rate scheduling
            scheduler.step()
            
            # Early stopping
            if np.isfinite(val_loss) and val_loss < best_val_loss:
                best_val_loss = val_loss
                patience_counter = 0
                # Save best model
                torch.save(model.state_dict(), f'best_{model_name}.pth')
            else:
                patience_counter += 1
                if patience_counter >= 10:
                    print(f'Early stopping at epoch {epoch+1}')
                    break
        
        training_time = time.time() - start_time
        
        # Load best model if it exists; otherwise save current model as best
        best_ckpt_path = f'best_{model_name}.pth'
        if os.path.exists(best_ckpt_path):
            model.load_state_dict(torch.load(best_ckpt_path))
        else:
            print(f"\n⚠️  No best checkpoint found for {model_name}. Saving current model as best.")
            torch.save(model.state_dict(), best_ckpt_path)
        
        # Store results
        self.history[model_name] = {
            'history': history,
            'training_time': training_time,
            'model': model
        }
        
        print(f"\n{model_name} training completed in {training_time:.2f} seconds")
        
        return self.history[model_name]


In [27]:
# This cell has been merged into the Trainer class above


In [28]:
class Evaluator:
    """Model evaluation and comparison"""
    
    def __init__(self, config: Config):
        self.config = config
        self.results = {}
        
    def evaluate_model(self, model: nn.Module, test_data: Tuple, model_name: str) -> Dict[str, Any]:
        """
        Comprehensive evaluation of a trained model
        
        Args:
            model: Trained PyTorch model
            test_data: Test data tuple (images, labels)
            model_name: Model name for logging
            
        Returns:
            Comprehensive evaluation results
        """
        test_images, test_labels = test_data
        
        print(f"\nEvaluating {model_name}...")
        
        # Set model to evaluation mode
        model.eval()
        device = next(model.parameters()).device
        
        # Build evaluation dataset and dataloader to ensure correct tensor layout and normalization
        val_transform = DataAugmentation(self.config).get_transforms(is_training=False)
        eval_dataset = FacialExpressionDataset(
            test_images, test_labels, self.config, transform=val_transform, is_training=False
        )
        eval_loader = TorchDataLoader(
            eval_dataset,
            batch_size=self.config.BATCH_SIZE,
            shuffle=False,
            num_workers=0,
            pin_memory=True,
        )

        exp_pred_proba = []
        val_pred = []
        aro_pred = []

        with torch.no_grad():
            for batch in eval_loader:
                batch_images = batch['image'].to(device, non_blocking=True)
                if batch_images.is_cuda:
                    batch_images = batch_images.to(memory_format=torch.channels_last)
                batch_landmarks = batch['landmarks'].to(device, non_blocking=True)

                outputs = model(batch_images, batch_landmarks)

                exp_pred_proba.append(outputs['expression'].detach().cpu().numpy())
                val_pred.append(outputs['valence'].detach().cpu().numpy().flatten())
                aro_pred.append(outputs['arousal'].detach().cpu().numpy().flatten())
        
        # Concatenate all predictions
        exp_pred_proba = np.concatenate(exp_pred_proba, axis=0)
        val_pred = np.concatenate(val_pred, axis=0)
        aro_pred = np.concatenate(aro_pred, axis=0)
        
        exp_pred = np.argmax(exp_pred_proba, axis=1)
        
        # Calculate metrics
        # Classification metrics
        exp_metrics = MetricsCalculator.classification_metrics(
            test_labels['expression'], exp_pred, exp_pred_proba
        )
        
        # Regression metrics
        val_metrics = MetricsCalculator.regression_metrics(
            test_labels['valence'], val_pred
        )
        
        aro_metrics = MetricsCalculator.regression_metrics(
            test_labels['arousal'], aro_pred
        )
        
        # Combine results
        results = {
            'model_name': model_name,
            'expression_metrics': exp_metrics,
            'valence_metrics': val_metrics,
            'arousal_metrics': aro_metrics,
            'predictions': {
                'expression': exp_pred,
                'expression_proba': exp_pred_proba,
                'valence': val_pred,
                'arousal': aro_pred
            }
        }
        
        self.results[model_name] = results
        
        # Print summary
        print(f"\n{model_name} Results:")
        print(f"Expression Accuracy: {exp_metrics['accuracy']:.4f}")
        print(f"Expression F1 (macro): {exp_metrics['f1_macro']:.4f}")
        print(f"Valence RMSE: {val_metrics['rmse']:.4f}")
        print(f"Valence CCC: {val_metrics['ccc']:.4f}")
        print(f"Arousal RMSE: {aro_metrics['rmse']:.4f}")
        print(f"Arousal CCC: {aro_metrics['ccc']:.4f}")
        
        return results
    
    def create_comparison_table(self) -> pd.DataFrame:
        """Create comparison table of all models"""
        comparison_data = []
        
        for model_name, results in self.results.items():
            row = {
                'Model': model_name,
                'Expression_Accuracy': results['expression_metrics']['accuracy'],
                'Expression_F1': results['expression_metrics']['f1_macro'],
                'Expression_Kappa': results['expression_metrics']['cohen_kappa'],
                'Valence_RMSE': results['valence_metrics']['rmse'],
                'Valence_CCC': results['valence_metrics']['ccc'],
                'Valence_Corr': results['valence_metrics']['pearson_corr'],
                'Arousal_RMSE': results['arousal_metrics']['rmse'],
                'Arousal_CCC': results['arousal_metrics']['ccc'],
                'Arousal_Corr': results['arousal_metrics']['pearson_corr']
            }
            comparison_data.append(row)
        
        return pd.DataFrame(comparison_data)


## 7. Visualization


In [29]:
class Visualizer:
    """Visualization utilities for results and analysis"""
    
    def __init__(self, config: Config):
        self.config = config
        
    def plot_training_history(self, history_dict: Dict[str, Any], save_path: str = None):
        """Plot training history for all models"""
        plt.figure(figsize=(20, 12))
        
        metrics_to_plot = ['loss', 'expression_output_accuracy', 'valence_output_mse', 'arousal_output_mse']
        
        for i, metric in enumerate(metrics_to_plot, 1):
            plt.subplot(2, 2, i)
            
            for model_name, model_data in history_dict.items():
                history = model_data['history']
                if metric in history:
                    plt.plot(history[metric], label=f'{model_name} train')
                    if f'val_{metric}' in history:
                        plt.plot(history[f'val_{metric}'], label=f'{model_name} val', linestyle='--')
            
            plt.title(f'Training {metric.replace("_", " ").title()}')
            plt.xlabel('Epoch')
            plt.ylabel(metric.replace("_", " ").title())
            plt.legend()
            plt.grid(True, alpha=0.3)
        
        plt.tight_layout()
        if save_path:
            plt.savefig(save_path, dpi=300, bbox_inches='tight')
        plt.show()
    
    def plot_confusion_matrix(self, y_true: np.ndarray, y_pred: np.ndarray, 
                            model_name: str, save_path: str = None):
        """Plot confusion matrix for expression classification"""
        cm = confusion_matrix(y_true, y_pred)
        
        plt.figure(figsize=(10, 8))
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                   xticklabels=self.config.CLASS_NAMES,
                   yticklabels=self.config.CLASS_NAMES)
        plt.title(f'Confusion Matrix - {model_name}')
        plt.xlabel('Predicted')
        plt.ylabel('Actual')
        
        if save_path:
            plt.savefig(save_path, dpi=300, bbox_inches='tight')
        plt.show()
    
    def plot_regression_scatter(self, y_true: np.ndarray, y_pred: np.ndarray, 
                              label: str, model_name: str, save_path: str = None):
        """Plot scatter plot for regression results"""
        plt.figure(figsize=(8, 6))
        plt.scatter(y_true, y_pred, alpha=0.6)
        plt.plot([-1, 1], [-1, 1], 'r--', label='Perfect prediction')
        plt.xlabel(f'True {label}')
        plt.ylabel(f'Predicted {label}')
        plt.title(f'{label} Prediction - {model_name}')
        plt.legend()
        plt.grid(True, alpha=0.3)
        
        # Add correlation info
        corr = np.corrcoef(y_true, y_pred)[0, 1]
        plt.text(0.05, 0.95, f'Correlation: {corr:.3f}', 
                transform=plt.gca().transAxes, fontsize=12,
                bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))
        
        if save_path:
            plt.savefig(save_path, dpi=300, bbox_inches='tight')
        plt.show()
    
    def visualize_predictions(self, images: np.ndarray, true_labels: Dict[str, np.ndarray], 
                            predictions: Dict[str, np.ndarray], model_name: str, 
                            num_samples: int = 10, save_path: str = None):
        """Visualize sample predictions"""
        # Get correct and incorrect predictions
        correct_mask = true_labels['expression'] == predictions['expression']
        correct_indices = np.where(correct_mask)[0]
        incorrect_indices = np.where(~correct_mask)[0]
        
        # Sample indices
        correct_sample = np.random.choice(correct_indices, min(5, len(correct_indices)), replace=False)
        incorrect_sample = np.random.choice(incorrect_indices, min(5, len(incorrect_indices)), replace=False)
        
        fig, axes = plt.subplots(2, 5, figsize=(20, 8))
        fig.suptitle(f'Prediction Samples - {model_name}', fontsize=16)
        
        # Plot correct predictions
        for i, idx in enumerate(correct_sample):
            if i < 5:
                axes[0, i].imshow(images[idx])
                axes[0, i].set_title(
                    f'✓ True: {self.config.CLASS_NAMES[true_labels["expression"][idx]]}\n'
                    f'Pred: {self.config.CLASS_NAMES[predictions["expression"][idx]]}\n'
                    f'V: {true_labels["valence"][idx]:.2f}→{predictions["valence"][idx]:.2f}\n'
                    f'A: {true_labels["arousal"][idx]:.2f}→{predictions["arousal"][idx]:.2f}',
                    fontsize=10, color='green'
                )
                axes[0, i].axis('off')
        
        # Plot incorrect predictions
        for i, idx in enumerate(incorrect_sample):
            if i < 5:
                axes[1, i].imshow(images[idx])
                axes[1, i].set_title(
                    f'✗ True: {self.config.CLASS_NAMES[true_labels["expression"][idx]]}\n'
                    f'Pred: {self.config.CLASS_NAMES[predictions["expression"][idx]]}\n'
                    f'V: {true_labels["valence"][idx]:.2f}→{predictions["valence"][idx]:.2f}\n'
                    f'A: {true_labels["arousal"][idx]:.2f}→{predictions["arousal"][idx]:.2f}',
                    fontsize=10, color='red'
                )
                axes[1, i].axis('off')
        
        plt.tight_layout()
        if save_path:
            plt.savefig(save_path, dpi=300, bbox_inches='tight')
        plt.show()


## 8. Main Execution Pipeline


In [30]:
# Initialize configuration and components
config = Config()

# Check if dataset exists
if not os.path.exists(config.DATASET_PATH):
    print(f"❌ Dataset not found at {config.DATASET_PATH}")
    print("Please download and extract the dataset from the provided Google Drive link.")
else:
    print("✅ Dataset found!")

# Initialize components
data_loader = FacialExpressionDataLoader(config)
trainer = Trainer(config)
evaluator = Evaluator(config)
visualizer = Visualizer(config)


✅ Dataset found!


In [31]:
# Load and preprocess data
print("\n📊 Loading and preprocessing data...")
images, labels = data_loader.load_dataset()
train_images, train_labels, val_images, val_labels = data_loader.create_data_splits(images, labels)

# For testing, use validation set (in practice, you'd have a separate test set)
test_images, test_labels = val_images, val_labels

print(f"\n📈 Dataset Statistics:")
print(f"Total samples: {len(images)}")
print(f"Training samples: {len(train_images)}")
print(f"Validation/Test samples: {len(val_images)}")
print(f"Expression distribution: {np.bincount(labels['expression'])}")
print(f"Valence range: [{labels['valence'].min():.3f}, {labels['valence'].max():.3f}]")
print(f"Arousal range: [{labels['arousal'].min():.3f}, {labels['arousal'].max():.3f}]")

# Initialize data augmentation
augmentation = DataAugmentation(config)



📊 Loading and preprocessing data...
Loading dataset...
Loaded 3999 valid samples from 3999 total images
Training samples: 3199
Validation samples: 800

📈 Dataset Statistics:
Total samples: 3999
Training samples: 3199
Validation/Test samples: 800
Expression distribution: [500 500 500 500 500 500 500 499]
Valence range: [-0.987, 0.982]
Arousal range: [-0.667, 0.984]


In [32]:
# Build and train ResNet18 model
print("\n🏗️ Building ResNet18 model...")
model_resnet = build_resnet18_multitask(config.NUM_CLASSES)

print(f"\n{'='*60}")
print(f"🎯 Training: ResNet18")
print(f"{'='*60}")

training_result_resnet = trainer.train_model(
    model_resnet,
    (train_images, train_labels),
    (val_images, val_labels),
    'ResNet18',
    augmentation
)



🏗️ Building ResNet18 model...



🎯 Training: ResNet18

Training ResNet18...
Model parameters: 11,181,642
🔍 Debug - Batch 1:
  Images: min=-2.1179, max=2.6400, mean=-0.0068
  Landmarks: min=-0.0428, max=1.1187, mean=0.5485
  Expression targets: tensor([1, 6, 5, 5, 7], device='cuda:0')
  Valence targets: tensor([ 0.5420, -0.3678, -0.7350, -0.8318, -0.6349], device='cuda:0')
  Arousal targets: tensor([-0.5323,  0.6001,  0.4555,  0.3867,  0.6429], device='cuda:0')
  Batch 20/200 (10.0%) - Loss: 2.0795
  Batch 40/200 (20.0%) - Loss: 2.0950
  Batch 60/200 (30.0%) - Loss: 2.0676
  Batch 80/200 (40.0%) - Loss: 2.0403
  Batch 100/200 (50.0%) - Loss: 2.0840
  Batch 120/200 (60.0%) - Loss: 2.0382
  Batch 140/200 (70.0%) - Loss: 2.0858
  Batch 160/200 (80.0%) - Loss: 2.0118
  Batch 180/200 (90.0%) - Loss: 2.0355
  Batch 200/200 (100.0%) - Loss: 2.0881

EPOCH 1/10 - ResNet18
⏱️  Epoch Time: 824.18s
📊 Training Metrics:
   Loss: 2.0594 | Accuracy: 0.1922
   Valence MSE: 0.2882 | Arousal MSE: 0.2374
📈 Validation Metrics:
   Loss: 2.

In [33]:
# Build and train MobileNetV2 model
print("\n🏗️ Building MobileNetV2 model...")
model_mobilenet = build_mobilenet_v2_multitask(config.NUM_CLASSES)

print(f"\n{'='*60}")
print(f"🎯 Training: MobileNetV2")
print(f"{'='*60}")

training_result_mobilenet = trainer.train_model(
    model_mobilenet,
    (train_images, train_labels),
    (val_images, val_labels),
    'MobileNetV2',
    augmentation
)



🏗️ Building MobileNetV2 model...


Downloading: "https://download.pytorch.org/models/mobilenet_v2-b0353104.pth" to C:\Users\Admin/.cache\torch\hub\checkpoints\mobilenet_v2-b0353104.pth
100%|██████████| 13.6M/13.6M [00:04<00:00, 3.25MB/s]



🎯 Training: MobileNetV2

Training MobileNetV2...
Model parameters: 2,236,682
🔍 Debug - Batch 1:
  Images: min=-2.1179, max=2.6400, mean=0.0524
  Landmarks: min=-0.0237, max=1.0853, mean=0.5413
  Expression targets: tensor([0, 2, 6, 3, 7], device='cuda:0')
  Valence targets: tensor([ 0.0629, -0.7549, -0.7480,  0.4444, -0.6388], device='cuda:0')
  Arousal targets: tensor([-0.0151, -0.3388,  0.4477,  0.8730,  0.6436], device='cuda:0')
  Batch 20/200 (10.0%) - Loss: 2.0603
  Batch 40/200 (20.0%) - Loss: 2.0845
  Batch 60/200 (30.0%) - Loss: 2.0953
  Batch 80/200 (40.0%) - Loss: 2.0646
  Batch 100/200 (50.0%) - Loss: 2.0716
  Batch 120/200 (60.0%) - Loss: 2.0737
  Batch 140/200 (70.0%) - Loss: 2.0760
  Batch 160/200 (80.0%) - Loss: 2.0512
  Batch 180/200 (90.0%) - Loss: 2.0578
  Batch 200/200 (100.0%) - Loss: 2.0269

EPOCH 1/10 - MobileNetV2
⏱️  Epoch Time: 2541.90s
📊 Training Metrics:
   Loss: 2.0688 | Accuracy: 0.1550
   Valence MSE: 0.2456 | Arousal MSE: 0.1705
📈 Validation Metrics:
   

In [34]:
# Evaluate both models
print(f"\n📊 Evaluating: ResNet18")
evaluation_result_resnet = evaluator.evaluate_model(
    training_result_resnet['model'],
    (test_images, test_labels),
    'ResNet18'
)

print(f"\n📊 Evaluating: MobileNetV2")
evaluation_result_mobilenet = evaluator.evaluate_model(
    training_result_mobilenet['model'],
    (test_images, test_labels),
    'MobileNetV2'
)



📊 Evaluating: ResNet18

Evaluating ResNet18...

ResNet18 Results:
Expression Accuracy: 0.3950
Expression F1 (macro): 0.3863
Valence RMSE: 0.5062
Valence CCC: 0.0249
Arousal RMSE: 0.4744
Arousal CCC: 0.0084

📊 Evaluating: MobileNetV2

Evaluating MobileNetV2...

MobileNetV2 Results:
Expression Accuracy: 0.3287
Expression F1 (macro): 0.2988
Valence RMSE: 0.4773
Valence CCC: -0.0410
Arousal RMSE: 0.4313
Arousal CCC: -0.0314


In [35]:
# Create comparison table
print("\n📊 Model Comparison Results:")
comparison_table = evaluator.create_comparison_table()
print(comparison_table.to_string(index=False))



📊 Model Comparison Results:
      Model  Expression_Accuracy  Expression_F1  Expression_Kappa  Valence_RMSE  Valence_CCC  Valence_Corr  Arousal_RMSE  Arousal_CCC  Arousal_Corr
   ResNet18              0.39500       0.386350          0.308773      0.506227      0.02486      0.082454      0.474414     0.008391      0.025112
MobileNetV2              0.32875       0.298833          0.233087      0.477285     -0.04098     -0.129536      0.431317    -0.031362     -0.105461
