**Sections Overview:**

1. SETUP & CONFIGURATION
   - Everything needed before running
   - All hyperparameters in one place
   - Easy to adjust settings

2. DATA PREPARATION
   - All data-related code together
   - Text processing, dataset classes, data loading
   - Transforms and preprocessing

3. MODEL ARCHITECTURE
   - Model components
   - Transformer encoder
   - Full multimodal classifier

4. TRAINING & EVALUATION
   - Training setup (optimizer, loss)
   - Training loop
   - Test evaluation
   - Results saving

# 1. SETUP & CONFIGURATION

### Architecture (Model B) Description

- **Image Input**: 100×100 grayscale → 224×224 RGB (ResNet-18 compatible)
- **Image Backbone**: ResNet-18 (pretrained) → 512-D image feature
- **Text Input**: Short text metadata (tokenized with subword units, e.g., BPE)
- **Text Encoder**: Transformer encoder (2–4 layers, 4–8 heads) → 512-D text embedding
- **Fusion**: Concatenate [512-D image, 512-D text] → 1024-D
- **Dropout**: p=0.3 (randomly drops ~30% of fused features during training)
- **Head**: Linear (1024 → 7), Softmax for probabilities
- **Loss**: Cross-Entropy

### KAGGLE setup instructions


#### Using on Kaggle:
1. Find "Configuration" and change variable "RUM_IN_KAGGLE = True"
2. Confirm directory information is correct

#### Using API:
0. Find "Configuration" and change variable "RUM_IN_KAGGLE = False"
1. Get Kaggle API key from https://www.kaggle.com/account
   Go to https://www.kaggle.com/account
    Click "Create New API Token"
    Download kaggle.json
2. Place kaggle.json in ~/.kaggle/ directory
3. Run this notebook - datasets will download automatically

### Training/Testing Instructions

Adjust parameters in "Training Configuration" and run entire notebook.



## Imports
imports used for the specific model tasks

In [None]:
# Install scikit-learn if needed
import subprocess
import sys

try:
    from sklearn.model_selection import train_test_split
except ImportError:
    print("Installing scikit-learn...")
    subprocess.check_call([sys.executable, "-m", "pip", "install", "scikit-learn"])
    from sklearn.model_selection import train_test_split
    print("scikit-learn installed successfully!")

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder
import timm

import pandas as pd
import numpy as np
import random

import re
from datasets import load_dataset

import sys
import os
from tqdm.notebook import tqdm


import time
import math
from collections import Counter

# Text processing library - minimal approach
from transformers import AutoTokenizer

# train_test_split is already imported above

# Set up device for GPU/CPU usage throughout the notebook
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Check CUDA availability and GPU info
if torch.cuda.is_available():
    print(f"CUDA is available!")
    print(f"GPU count: {torch.cuda.device_count()}")
    print(f"Current GPU: {torch.cuda.current_device()}")
    print(f"GPU name: {torch.cuda.get_device_name(0)}")
    print(f"GPU memory: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.1f} GB")
else:
    print("CUDA is not available, using CPU")


## Configuration

Modify parameters here for training/testing

In [None]:
# ===== TRAINING HYPERPARAMETERS =====
NUMBER_OF_EPOCHS = 50          # Number of training epochs (lower for faster experiments)
LEARNING_RATE = 0.0001         # Learning rate for optimizer (reduced from 0.001 for more stable training)
BATCH_SIZE = 64                # Batch size (higher = faster training, uses more GPU memory)

# ===== HARDWARE / DATA LOADING =====
RUN_ON_KAGGLE = False           # See instructions above.

NUM_WORKERS = 6                 # Number of DataLoader worker processes (higher = faster loading, uses more CPU RAM)
                                # Reduce if experiencing high memory swap (try 2-4 for 16GB RAM, 1-2 for 8GB RAM)
PIN_MEMORY = True if torch.cuda.is_available() else False  # Faster CPU->GPU transfer (use True if using GPU)

# ===== EXPERIMENT ID =====
EXPERIMENT_ID = 11               # Experiment ID number - check test_results.csv to make sure it's unique

# ===== MODEL ARCHITECTURE PARAMETERS =====
DROPOUT_P = 0.2                # Dropout probability for fusion layer (regularization) - 0.5 was too high!

# ===== TRANSFORMER ARCHITECTURE =====
TRANSFORMER_NUM_LAYERS = 3     # Number of transformer encoder layers (2-4 typical)
TRANSFORMER_NHEAD = 8          # Number of attention heads (4-8 typical)

# ===== OPTIMIZER =====
OPTIMIZER_TYPE = 'AdamW'        # Optimizer type: 'Adam', 'SGD', 'AdamW'

# ===== DATA AUGMENTATION =====
USE_TRANSFORM = 'transform_b'  # Options: 'transform_a' (minimal) or 'transform_b' (with augmentation)

# ===== FUSION METHOD (VQA-Inspired) =====
USE_CROSS_ATTENTION = True    # If True, uses cross-attention fusion; If False, uses concatenation

# ============================================================================
# Print configuration summary
# ============================================================================
print("=" * 70)
print("TRAINING CONFIGURATION SUMMARY")
print("=" * 70)
print(f"Training: {NUMBER_OF_EPOCHS} epochs, LR={LEARNING_RATE}, Batch={BATCH_SIZE}")
print(f"Hardware: {NUM_WORKERS} DataLoader workers, Pin Memory={PIN_MEMORY}")
print(f"Model: Dropout={DROPOUT_P}")
print(f"Transformer: {TRANSFORMER_NUM_LAYERS} layers, {TRANSFORMER_NHEAD} heads")
print(f"Optimizer: {OPTIMIZER_TYPE}")
print(f"Data Augmentation: {USE_TRANSFORM}")
print(f"Fusion: {'Cross-Attention (VQA-inspired)' if USE_CROSS_ATTENTION else 'Concatenation'}")
print("=" * 70)


# 2. DATA PREPARATION

### Run in Kaggle Toggle

In [None]:
running_on_kaggle = RUN_ON_KAGGLE
if(running_on_kaggle):
    print("Running On Kaggle")
    
    #Variable set up
    # Image Dataset
    str_image_data_dir = "/kaggle/input/balanced-raf-db-dataset-7575-grayscale"

    # Text Dataset
    str_text_data_dir = "/kaggle/input/emotions-dataset/emotions.csv"
    complete_csv = pd.read_csv(str_text_data_dir)
    
else:
    print("Running On Something Other Than Kaggle")
    #Imports Needed
    import kaggle
    import kagglehub
    from kagglehub import KaggleDatasetAdapter

    #Variable set up
    # Image Dataset
    str_image_data_dir = kagglehub.dataset_download("dollyprajapati182/balanced-raf-db-dataset-7575-grayscale")

    # Text Dataset
    str_text_data_dir = "bhavikjikadara/emotions-dataset"

    # Download the dataset first
    dataset_path = kagglehub.dataset_download(str_text_data_dir)
    print("Dataset downloaded to:", dataset_path)

    # Load the CSV file from the downloaded dataset
    import os
    csv_files = [f for f in os.listdir(dataset_path) if f.endswith('.csv')]
    if csv_files:
        csv_path = os.path.join(dataset_path, csv_files[0])
        complete_csv = pd.read_csv(csv_path)

print("Path to dataset files:", str_image_data_dir)
print(complete_csv)

### Text Processing Functions

In [None]:
# Text Processing using transformers library (minimal code approach)
# Initialize tokenizer - uses BPE tokenization automatically

tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')

def tokenize_text(text, max_length=None):
    """Tokenize text using pre-trained tokenizer - minimal code
    Uses MAX_TEXT_LENGTH from config if max_length is not provided"""
    if max_length is None:
        # Try to use config value, fallback to 15 if config not yet defined
        try:
            max_length = MAX_TEXT_LENGTH
        except NameError:
            max_length = 15  # Default fallback
    encoded = tokenizer(
        text,
        max_length=max_length,
        padding='max_length',
        truncation=True,
        return_tensors='pt'
    )
    return encoded['input_ids'].squeeze(0)  # Return token IDs as tensor
    
def get_vocab_size():
    """Get vocabulary size from tokenizer"""
    return tokenizer.vocab_size


### Dictionaries


In [None]:
#Label Dictionary
# class number : class as string
label_dict ={
    0:"Angry",
    1:"Disgust",
    2:"Fear",
    3:"Happy",
    4:"Neutral",
    5:"Sad",
    6:"Surprise"
}

In [None]:
#Translation Dictionary
# Text Class Number : Images Class Number
# So you can put in the text class number and get out the version of the number as the images dataset uses.
translation_dictionary = {
    0:5, #Sadness -> Sad
    1:3, #Joy -> Happy
    2:3, #Love -> Happy
    3:0, #Anger -> Angry
    4:2, #Fear -> Fear
    5:6  #Surprise -> Surprise
}

In [None]:
# Fixed model architecture constants (required for model initialization)
NUM_CLASSES = 7                        # Number of emotion classes
TRANSFORMER_D_MODEL = 512              # Transformer model dimension
TRANSFORMER_DROPOUT = 0.1              # Transformer internal dropout
TRANSFORMER_DIM_FEEDFORWARD = 2048     # Transformer feedforward dimension


### MultiModal Dataset Class

In [None]:
class OurMultiModalDataSet(Dataset):
    """Multimodal dataset combining images and text for Architecture B"""
    def __init__(self, data_directory, text_dataframe, transform=None, max_text_length=None):
        # Use config value if not provided
        max_text_length = max_text_length if max_text_length is not None else MAX_TEXT_LENGTH
        self.data_image = ImageFolder(data_directory, transform=transform)
        self.text_dataframe = text_dataframe
        self.max_text_length = max_text_length
        
        # Ensure text data matches image data length
        # For now, we'll sample text randomly or use a mapping strategy
        # In production, you'd want proper image-text pairing
        
    def __len__(self):
        return len(self.data_image)
    
    def __getitem__(self, at_index):
        # Get image and label
        image, label = self.data_image[at_index]
        
        # Get corresponding text - sample from text data with matching label
        # If no exact match, use random text with same label
        matching_texts = self.text_dataframe[self.text_dataframe['label'] == label]['text']
        if len(matching_texts) > 0:
            text = matching_texts.sample(n=1).iloc[0]
        else:
            # Fallback: use any text
            text = self.text_dataframe.sample(n=1).iloc[0]['text']
        
        # Tokenize text
        text_tokens = tokenize_text(text, max_length=self.max_text_length)
        
        return image, text_tokens, label

    @property
    def classes(self):
        return self.data_image.classes
#END CLASS

### Text data loading
Load the CSV and split it into sub-sections

In [None]:
#Read CSV into a data-frame
print("Preview of the CSV contents:")
print(complete_csv)
print("-- -- -- -- -- -- --")

#Fix class labeling missmatch
complete_csv['label'] = complete_csv['label'].replace(translation_dictionary)
print("Preview of altered CSV contents:")
print(complete_csv)
print("-- -- -- -- -- -- --")

#Split CSV into segments for Testing, Training, and Validation
from sklearn.model_selection import train_test_split

# Fixed data split values
TRAIN_TEST_SPLIT = 0.3  # 70% train, 30% val+test
VAL_TEST_SPLIT = 0.5    # 15% val, 15% test
RANDOM_STATE = 42       # Random seed for reproducibility

# Split: Uses config values (default: 70% train, 15% val, 15% test)
train_text, temp_text, train_labels, temp_labels = train_test_split(
    complete_csv['text'], complete_csv['label'], 
    test_size=TRAIN_TEST_SPLIT, random_state=RANDOM_STATE, stratify=complete_csv['label']
)
val_text, test_text, val_labels, test_labels = train_test_split(
    temp_text, temp_labels,
    test_size=VAL_TEST_SPLIT, random_state=RANDOM_STATE, stratify=temp_labels
)

# Create dataframes for each split
train_text_df = pd.DataFrame({'text': train_text, 'label': train_labels})
val_text_df = pd.DataFrame({'text': val_text, 'label': val_labels})
test_text_df = pd.DataFrame({'text': test_text, 'label': test_labels})

print(f"Train: {len(train_text_df)}, Val: {len(val_text_df)}, Test: {len(test_text_df)}")


### Datasets/Transforms

In [None]:
# Fixed data processing values
MAX_TEXT_LENGTH = 15  # Maximum length for text tokenization

# Strings of data directories
str_data_dir_train = str_image_data_dir + '/train'
str_data_dir_valid = str_image_data_dir + '/val'
str_data_dir_test  = str_image_data_dir + '/test'

#Transform
# A- this is meant for the balanced grey-scale RAF data set
# FIXED: Added Resize and proper preprocessing for ResNet
transform_a = transforms.Compose([
    transforms.Resize((224, 224)),  # ResNet requires 224x224
    # transforms.Grayscale(num_output_channels=3),  # UNCOMMENT if images are grayscale
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # ImageNet normalization
])

# B- this is meant for the RAF data set
transform_b = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(p=0.5),
    # transforms.Grayscale(num_output_channels=3),  # UNCOMMENT if images are grayscale
    transforms.ColorJitter(brightness=0.1, contrast=0.1),   
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Select transform based on config (uses config value)
transform_used = transform_a if USE_TRANSFORM == 'transform_a' else transform_b

# Batch size from config
batch_size = BATCH_SIZE

# Create Multimodal Datasets for Architecture B (uses config values)
dataset_mm_train = OurMultiModalDataSet(str_data_dir_train, train_text_df, transform=transform_used, max_text_length=MAX_TEXT_LENGTH)
dataset_mm_valid = OurMultiModalDataSet(str_data_dir_valid, val_text_df, transform=transform_used, max_text_length=MAX_TEXT_LENGTH)
dataset_mm_test = OurMultiModalDataSet(str_data_dir_test, test_text_df, transform=transform_used, max_text_length=MAX_TEXT_LENGTH)

# Create multimodal data loaders
def collate_multimodal(batch):
    """Custom collate function for multimodal data"""
    images = torch.stack([item[0] for item in batch])
    text_tokens = torch.stack([item[1] for item in batch])
    labels = torch.tensor([item[2] for item in batch], dtype=torch.long)
    return images, text_tokens, labels

# DataLoader optimization for GPU speed
# num_workers: Parallel data loading (uses NUM_WORKERS from Configuration section)
# pin_memory: Faster CPU->GPU transfer (uses PIN_MEMORY from Configuration section)
# persistent_workers: Keep workers alive between epochs (faster, uses more RAM)
# Note: NUM_WORKERS and PIN_MEMORY are defined in the Configuration section above

loader_mm_train = DataLoader(
    dataset_mm_train, 
    batch_size=batch_size, 
    shuffle=True, 
    collate_fn=collate_multimodal,
    num_workers=NUM_WORKERS,
    pin_memory=PIN_MEMORY,
    persistent_workers=True if NUM_WORKERS > 0 else False
)
loader_mm_valid = DataLoader(
    dataset_mm_valid, 
    batch_size=batch_size, 
    shuffle=False, 
    collate_fn=collate_multimodal,
    num_workers=NUM_WORKERS,
    pin_memory=PIN_MEMORY,
    persistent_workers=True if NUM_WORKERS > 0 else False
)
loader_mm_test = DataLoader(
    dataset_mm_test, 
    batch_size=batch_size, 
    shuffle=False, 
    collate_fn=collate_multimodal,
    num_workers=NUM_WORKERS,
    pin_memory=PIN_MEMORY,
    persistent_workers=True if NUM_WORKERS > 0 else False
)

print(f"Multimodal datasets created:")
print(f"Train: {len(dataset_mm_train)}, Val: {len(dataset_mm_valid)}, Test: {len(dataset_mm_test)}")


# 3. MODEL ARCHITECTURE

## Transformer Encoder


In [None]:
# Transformer Encoder for Text Processing (Architecture B)
# Enhanced with cross-attention to image features (VQA-inspired)
# This must be defined before MultiModalEmotionClassifierB
class TransformerEncoder(nn.Module):
    def __init__(self, vocab_size, d_model=None, nhead=None, num_layers=None, dropout=None, dim_feedforward=None, use_cross_attention=None):
        super().__init__()
        # Use config values if not provided
        d_model = d_model if d_model is not None else TRANSFORMER_D_MODEL
        nhead = nhead if nhead is not None else TRANSFORMER_NHEAD
        num_layers = num_layers if num_layers is not None else TRANSFORMER_NUM_LAYERS
        dropout = dropout if dropout is not None else TRANSFORMER_DROPOUT
        dim_feedforward = dim_feedforward if dim_feedforward is not None else TRANSFORMER_DIM_FEEDFORWARD
        use_cross_attention = use_cross_attention if use_cross_attention is not None else USE_CROSS_ATTENTION
        
        self.d_model = d_model
        self.use_cross_attention = use_cross_attention
        
        # Embedding layer
        self.embedding = nn.Embedding(vocab_size, d_model, padding_idx=0)
        self.pos_encoding = self._create_positional_encoding(d_model)
        
        # Transformer encoder (text self-attention)
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=nhead,
            dim_feedforward=dim_feedforward,
            dropout=dropout,
            batch_first=True
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        
        # Cross-attention to image features (VQA-inspired, novel for emotion classification)
        if use_cross_attention:
            # Text attends to image features (text queries, image keys/values)
            self.cross_attention = nn.MultiheadAttention(
                embed_dim=d_model,
                num_heads=nhead,
                dropout=dropout,
                batch_first=True
            )
            self.cross_norm = nn.LayerNorm(d_model)
            self.cross_ffn = nn.Sequential(
                nn.Linear(d_model, dim_feedforward),
                nn.GELU(),
                nn.Dropout(dropout),
                nn.Linear(dim_feedforward, d_model)
            )
        
        # Output projection to 512-D
        self.output_proj = nn.Linear(d_model, 512)
        
    def _create_positional_encoding(self, d_model, max_len=100):
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        return pe.unsqueeze(0)
    
    def forward(self, text_tokens, image_features=None):
        """
        Args:
            text_tokens: [batch_size, seq_len] - text token IDs
            image_features: [batch_size, 512] - optional image features for cross-attention
        Returns:
            text_features: [batch_size, 512] - text features (with cross-attention if enabled)
        """
        # Get sequence length
        seq_len = text_tokens.size(1)
        
        # Embedding + positional encoding
        embedded = self.embedding(text_tokens) * math.sqrt(self.d_model)
        embedded = embedded + self.pos_encoding[:, :seq_len, :].to(text_tokens.device)
        
        # Create attention mask for padding tokens
        attention_mask = (text_tokens != 0).float()
        
        # Transformer encoding (text self-attention)
        transformer_output = self.transformer(embedded, src_key_padding_mask=attention_mask == 0)
        
        # Cross-attention to image features (VQA-inspired, novel for emotion classification)
        if self.use_cross_attention and image_features is not None:
            # Reshape image features to [batch_size, 1, d_model] for attention
            img_tokens = image_features.unsqueeze(1)  # [B, 1, 512]
            
            # Text attends to image: text queries attend to image keys/values
            attended, _ = self.cross_attention(
                query=transformer_output,  # text features as queries
                key=img_tokens,            # image features as keys
                value=img_tokens           # image features as values
            )
            
            # Residual connection and normalization
            transformer_output = self.cross_norm(transformer_output + attended)
            
            # Feedforward
            ffn_out = self.cross_ffn(transformer_output)
            transformer_output = self.cross_norm(transformer_output + ffn_out)
        
        # Global average pooling (mean of non-padded tokens)
        mask = attention_mask.unsqueeze(-1).expand_as(transformer_output)
        masked_output = transformer_output * mask
        text_features = masked_output.sum(dim=1) / mask.sum(dim=1)
        
        # Project to 512-D
        text_features = self.output_proj(text_features)
        
        return text_features


## MultiModal Classifier

In [None]:
# Architecture B: MultiModal Emotion Classifier with Transformer
class MultiModalEmotionClassifierB(nn.Module):
    def __init__(self, num_classes=None, vocab_size=1000, dropout_p=None, use_cross_attention=None):
        super().__init__()
        # Use config values if not provided
        num_classes = num_classes if num_classes is not None else NUM_CLASSES
        dropout_p = dropout_p if dropout_p is not None else DROPOUT_P
        use_cross_attention = use_cross_attention if use_cross_attention is not None else USE_CROSS_ATTENTION
        
        self.use_cross_attention = use_cross_attention
        
        enet_out_size = 512
        
        #Image Model (Resnet18)
        self.base_image_model = torchvision.models.resnet18(pretrained=True) #Set base model
        self.features = nn.Sequential(*list(self.base_image_model.children())[:-1])

        #Text Model (Transformer) - Architecture B (uses config values)
        # Enhanced with cross-attention to image features if enabled
        self.text_encoder = TransformerEncoder(
            vocab_size=vocab_size,
            use_cross_attention=use_cross_attention
        )

        #Dropout Method (uses config value)
        self.dropout = nn.Dropout(p=dropout_p)
        
        # Updated classifier for 1024-D input (512 image + 512 text)
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(1024, num_classes)  # Changed from 512 to 1024
        )

    def forward(self, images, text_tokens):
        # Image Processing
        image_features = self.features(images).view(images.size(0), -1)

        # Text Processing with Transformer
        # If cross-attention enabled, text encoder attends to image features (VQA-inspired)
        if self.use_cross_attention:
            text_features = self.text_encoder(text_tokens, image_features=image_features)
            # With cross-attention, text already incorporates image info, so we can use just text
            # Or combine both for richer representation
            fused_features = torch.cat([image_features, text_features], dim=1)
        else:
            text_features = self.text_encoder(text_tokens)
            # Original concatenation fusion
            fused_features = torch.cat([image_features, text_features], dim=1)

        # Dropout, randomly select p% of features to drop
        fused_features = self.dropout(fused_features)
        
        # Classify
        logits = self.classifier(fused_features)
        probabilities = torch.softmax(logits, dim=1)
        return logits, probabilities
#END CLASS

#Create the Architecture B model
# Use tokenizer's vocab size and config values
model_multi_b = MultiModalEmotionClassifierB(
    num_classes=NUM_CLASSES, 
    vocab_size=get_vocab_size(), 
    dropout_p=DROPOUT_P,
    use_cross_attention=USE_CROSS_ATTENTION
)

# Freeze ResNet (use pretrained weights only - faster training, ~3x speedup)
# Only trains text encoder + fusion layers (~11M params vs ~37M if fine-tuned)
for param in model_multi_b.features.parameters():
    param.requires_grad = False

model_multi_b.to(device)

#this is just done to show a snippet of the models layout.
print("Architecture B - Transformer-based Multimodal Model:")
print(str(model_multi_b)[:500])


# 4. TRAINING & EVALUATION


### Experiment Tracking and Results Export (THIS SECTION CAN BE IGNORED FOR NOW)


In [None]:
# Experiment Tracking System
# This module tracks experiments, calculates metrics, and exports results to CSV

from sklearn.metrics import f1_score, classification_report, confusion_matrix
import csv
from datetime import datetime
import os

class ExperimentTracker:
    """Tracks experiment configurations and results for easy comparison and CSV export"""
    
    def __init__(self, csv_file='experiment_results.csv'):
        self.experiments = []
        self.csv_file = csv_file
        self.fieldnames = ['ID', 'Cross-Attn', 'ResNet train', 'Dropout', 'LR', 
                          'Val Acc', 'Macro-F1', 'Params (M)', 'Notes']
        
    def add_experiment(self, exp_id, cross_attn, resnet_train, dropout, lr, 
                      val_acc, macro_f1, params_m, notes=''):
        """Add an experiment result"""
        experiment = {
            'ID': exp_id,
            'Cross-Attn': 'On' if cross_attn else 'Off',
            'ResNet train': resnet_train,  # 'Frozen' or 'Fine-tune'
            'Dropout': dropout,
            'LR': f'{lr:.0e}',  # Scientific notation
            'Val Acc': f'{val_acc:.2f}%' if val_acc is not None else '',
            'Macro-F1': f'{macro_f1:.4f}' if macro_f1 is not None else '',
            'Params (M)': f'{params_m:.2f}' if params_m is not None else '',
            'Notes': notes
        }
        self.experiments.append(experiment)
        return experiment
    
    def calculate_f1_macro(self, y_true, y_pred):
        """Calculate macro-averaged F1 score"""
        return f1_score(y_true, y_pred, average='macro')
    
    def print_table(self):
        """Print a formatted table of all experiments"""
        if not self.experiments:
            print("No experiments recorded yet.")
            return
        
        # Calculate column widths
        col_widths = {field: len(field) for field in self.fieldnames}
        for exp in self.experiments:
            for field in self.fieldnames:
                col_widths[field] = max(col_widths[field], len(str(exp.get(field, ''))))
        
        # Print header
        header = ' | '.join(field.ljust(col_widths[field]) for field in self.fieldnames)
        print('=' * len(header))
        print(header)
        print('=' * len(header))
        
        # Print rows
        for exp in self.experiments:
            row = ' | '.join(str(exp.get(field, '')).ljust(col_widths[field]) 
                            for field in self.fieldnames)
            print(row)
        
        print('=' * len(header))
        print(f"\nTotal experiments: {len(self.experiments)}")
    
    def export_to_csv(self):
        """Export experiments to CSV file"""
        file_exists = os.path.exists(self.csv_file)
        
        with open(self.csv_file, 'a', newline='') as csvfile:
            writer = csv.DictWriter(csvfile, fieldnames=self.fieldnames)
            
            # Write header if file is new
            if not file_exists:
                writer.writeheader()
            
            # Write all experiments
            for exp in self.experiments:
                writer.writerow(exp)
        
        print(f"Results exported to {self.csv_file}")
    
    def clear(self):
        """Clear all experiments (useful for fresh start)"""
        self.experiments = []

# Initialize global tracker
tracker = ExperimentTracker()


In [None]:
# Helper function to check if ResNet is frozen
# Note: ResNet is always frozen (pretrained only) for faster training
def is_resnet_frozen(model):
    """Check if ResNet backbone is frozen (not training) - always returns 'Frozen'"""
    return 'Frozen'  # ResNet is always frozen (pretrained only)

# Helper function to evaluate model and calculate F1
def evaluate_model_with_f1(model, data_loader, device, num_classes=7):
    """Evaluate model and return accuracy and macro-F1 score"""
    model.eval()
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for images, text_tokens, labels in data_loader:
            images = images.to(device)
            text_tokens = text_tokens.to(device)
            labels = labels.to(device)
            
            logits, probabilities = model(images, text_tokens)
            _, predicted = logits.max(1)
            
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    # Calculate accuracy
    accuracy = 100.0 * sum(p == l for p, l in zip(all_preds, all_labels)) / len(all_labels)
    
    # Calculate macro-F1
    macro_f1 = tracker.calculate_f1_macro(all_labels, all_preds)
    
    return accuracy, macro_f1

# Helper function to count model parameters
def count_parameters(model):
    """Count total number of trainable parameters in millions"""
    return sum(p.numel() for p in model.parameters() if p.requires_grad) / 1e6


## Training Setup

In [None]:
# Training Setup for Architecture B
# Loss Function
criterion = nn.CrossEntropyLoss()

# Fixed optimizer parameters
OPTIMIZER_WEIGHT_DECAY = 0.0001  # Weight decay (L2 regularization) - small regularization helps
OPTIMIZER_MOMENTUM = 0.9       # Momentum (only used for SGD)

# Optimizer for multimodal model (uses config values)
if OPTIMIZER_TYPE == 'Adam':
    optimizer_mm = optim.Adam(model_multi_b.parameters(), lr=LEARNING_RATE, weight_decay=OPTIMIZER_WEIGHT_DECAY)
elif OPTIMIZER_TYPE == 'SGD':
    optimizer_mm = optim.SGD(model_multi_b.parameters(), lr=LEARNING_RATE, momentum=OPTIMIZER_MOMENTUM, weight_decay=OPTIMIZER_WEIGHT_DECAY)
elif OPTIMIZER_TYPE == 'AdamW':
    optimizer_mm = optim.AdamW(model_multi_b.parameters(), lr=LEARNING_RATE, weight_decay=OPTIMIZER_WEIGHT_DECAY)
else:
    raise ValueError(f"Unknown optimizer type: {OPTIMIZER_TYPE}. Use 'Adam', 'SGD', or 'AdamW'")

# Learning rate scheduler - reduces LR when validation loss plateaus
# This helps fine-tune and improve accuracy
scheduler = optim.lr_scheduler.ReduceLROnPlateau(
    optimizer_mm, 
    mode='min', 
    factor=0.5,  # Reduce LR by half
    patience=3   # Wait 3 epochs without improvement
    # Note: 'verbose' parameter removed - not supported in all PyTorch versions
)

# Training parameters (uses config value)
number_of_epochs = NUMBER_OF_EPOCHS
training_losses = []
validation_losses = []
training_accuracies = []
validation_accuracies = []

# Move model to device
model_multi_b.to(device)
print(f"Architecture B model ready for training on {device}")
print(f"Model parameters: {sum(p.numel() for p in model_multi_b.parameters()):,}")


## Training Loop

In [None]:
# Training Loop for Architecture B

print(f"Starting training for {number_of_epochs} epochs...")
print(f"Training batches: {len(loader_mm_train)}")
print(f"Validation batches: {len(loader_mm_valid)}")

total_start_time = time.time()

for epoch in range(number_of_epochs):
    epoch_start_time = time.time()
    print(f"\n=== EPOCH {epoch+1}/{number_of_epochs} ===")
    
    # Training Phase
    model_multi_b.train()
    running_loss = 0.0
    correct = 0
    total = 0
    
    for images, text_tokens, labels in tqdm(loader_mm_train, desc=f'Epoch {epoch+1}/{number_of_epochs} - Training'):
        # Move to device
        images = images.to(device)
        text_tokens = text_tokens.to(device)
        labels = labels.to(device)
        
        # Forward pass
        optimizer_mm.zero_grad()
        logits, probabilities = model_multi_b(images, text_tokens)
        loss = criterion(logits, labels)
        
        # Backward pass
        loss.backward()
        optimizer_mm.step()
        
        # Metrics
        running_loss += loss.item() * labels.size(0)
        _, predicted = logits.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()
    
    # Training metrics
    train_loss = running_loss / len(loader_mm_train.dataset)
    train_acc = 100. * correct / total
    training_losses.append(train_loss)
    training_accuracies.append(train_acc)
    
    # Validation Phase
    model_multi_b.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    
    with torch.no_grad():
        for images, text_tokens, labels in tqdm(loader_mm_valid, desc=f'Epoch {epoch+1}/{number_of_epochs} - Validation'):
            # Move to device
            images = images.to(device)
            text_tokens = text_tokens.to(device)
            labels = labels.to(device)
            
            # Forward pass
            logits, probabilities = model_multi_b(images, text_tokens)
            loss = criterion(logits, labels)
            
            # Metrics
            running_loss += loss.item() * labels.size(0)
            _, predicted = logits.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()
    
    # Validation metrics
    valid_loss = running_loss / len(loader_mm_valid.dataset)
    valid_acc = 100. * correct / total
    validation_losses.append(valid_loss)
    validation_accuracies.append(valid_acc)
    
    # Learning rate scheduling - reduce LR if validation loss plateaus
    scheduler.step(valid_loss)
    current_lr = optimizer_mm.param_groups[0]['lr']
    
    # Epoch summary
    epoch_time = time.time() - epoch_start_time
    total_time = time.time() - total_start_time
    
    print(f"Epoch {epoch+1}/{number_of_epochs} Summary:")
    print(f"  Train - Loss: {train_loss:.4f}, Acc: {train_acc:.2f}%")
    print(f"  Valid - Loss: {valid_loss:.4f}, Acc: {valid_acc:.2f}%")
    print(f"  LR: {current_lr:.6f}")
    print(f"  Time: {epoch_time:.2f}s | Total: {total_time:.2f}s")

# Final summary
total_training_time = time.time() - total_start_time
print(f"\n Training completed!")
print(f"Total training time: {total_training_time:.2f}s ({total_training_time/60:.1f} minutes)")
print(f"Best validation accuracy: {max(validation_accuracies):.2f}%")
print(f"Final validation accuracy: {validation_accuracies[-1]:.2f}%")
print(f"Final training accuracy: {training_accuracies[-1]:.2f}%")

# Check if still improving - helps decide if more epochs are worth it
if len(validation_accuracies) >= 3:
    recent_improvement = validation_accuracies[-1] - validation_accuracies[-3]
    train_val_gap = training_accuracies[-1] - validation_accuracies[-1] if training_accuracies else 0
    
    if recent_improvement > 0.5:
        print(f"\n Still improving! Recent gain: +{recent_improvement:.2f}%")
        print("   → Continue training - more epochs likely to help")
    elif recent_improvement < -1.0:
        # Significant decline - likely overfitting
        print(f"\n  Validation accuracy declining significantly: {recent_improvement:.2f}%")
        print(f"   → Training acc: {training_accuracies[-1]:.2f}%, Val acc: {validation_accuracies[-1]:.2f}%")
        print(f"   → Gap: {train_val_gap:.2f}% (large gap = overfitting)")
        print("   → STOP: Model is overfitting. More epochs will make it worse.")
        print("   → Solutions: Lower LR, increase dropout, or use best checkpoint")
    elif recent_improvement < -0.5:
        # Small decline - could be temporary or early overfitting
        print(f"\n  Validation accuracy declining: {recent_improvement:.2f}%")
        print(f"   → Training acc: {training_accuracies[-1]:.2f}%, Val acc: {validation_accuracies[-1]:.2f}%")
        print(f"   → Gap: {train_val_gap:.2f}%")
        if train_val_gap > 5.0:
            print("   → Large train-val gap suggests overfitting starting")
            print("   → Consider stopping or reducing learning rate")
        else:
            print("   → Could be temporary (LR reduction, noise). Monitor next 2-3 epochs.")
    else:
        print(f"\n➡️  Plateauing (change: {recent_improvement:.2f}%)")
        print("   → Validation accuracy stable. May need:")
        print("     - More epochs (if still room to improve)")
        print("     - Lower learning rate (if LR hasn't been reduced)")
        print("     - Different hyperparameters")


## Test Evaluation




In [None]:
# Final Test Evaluation
# ⚠️ ONLY RUN THIS AFTER TRAINING HAS COMPLETED
# This evaluates on the TEST set (unseen data) for final unbiased results

from sklearn.metrics import classification_report, confusion_matrix, precision_recall_fscore_support
import csv
global EXPERIMENT_ID
# ============================================================================
# EXPERIMENT ID - FROM CONFIGURATION SECTION
# ============================================================================
# Uses EXPERIMENT_ID from the Configuration section and formats as 3-digit string
TEST_EXPERIMENT_ID = f"{EXPERIMENT_ID:03d}"  # Format as "001", "002", etc.
# ============================================================================

print("=" * 70)
print("FINAL TEST EVALUATION")
print("=" * 70)
print(f"Experiment ID: {TEST_EXPERIMENT_ID}")
print("Evaluating on TEST set (unseen data, never used during training)...")
print()

# Evaluate on test set - get predictions and labels
model_multi_b.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for images, text_tokens, labels in tqdm(loader_mm_test, desc='Testing'):
        images = images.to(device)
        text_tokens = text_tokens.to(device)
        labels = labels.to(device)
        
        logits, probabilities = model_multi_b(images, text_tokens)
        _, predicted = logits.max(1)
        
        all_preds.extend(predicted.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

# Calculate overall metrics
test_acc = 100.0 * sum(p == l for p, l in zip(all_preds, all_labels)) / len(all_labels)
test_f1_macro = tracker.calculate_f1_macro(all_labels, all_preds)

# Calculate per-class metrics (P, R, F1 for each class) - similar to VQA paper
precision, recall, f1, support = precision_recall_fscore_support(
    all_labels, all_preds, average=None, zero_division=0
)

# Calculate macro-averaged metrics (mean across all 7 classes)
# This gives a single summary metric for each: mean Precision, mean Recall, mean F1
from sklearn.metrics import precision_score, recall_score
test_precision_macro = precision_score(all_labels, all_preds, average='macro', zero_division=0)
test_recall_macro = recall_score(all_labels, all_preds, average='macro', zero_division=0)

# Get training/validation accuracies from training
best_val_acc = max(validation_accuracies) if validation_accuracies else None
final_val_acc = validation_accuracies[-1] if validation_accuracies else None
final_train_acc = training_accuracies[-1] if training_accuracies else None

# Get emotion names in order (0-6) - used for both printing and CSV
emotion_names = [label_dict[i] for i in range(NUM_CLASSES)]

# Print results
print("=" * 70)
print("FINAL TEST RESULTS")
print("=" * 70)
print(f"Test Accuracy:      {test_acc:.2f}%")
print(f"Macro-Precision:     {test_precision_macro:.4f}  (mean across 7 classes)")
print(f"Macro-Recall:        {test_recall_macro:.4f}  (mean across 7 classes)")
print(f"Macro-F1:            {test_f1_macro:.4f}  (mean across 7 classes)")
print()
print("Training Metrics:")
if final_train_acc:
    print(f"  Final Train Acc:   {final_train_acc:.2f}%")
if final_val_acc:
    print(f"  Final Val Acc:     {final_val_acc:.2f}%")
if best_val_acc:
    print(f"  Best Val Acc:      {best_val_acc:.2f}%")
print("=" * 70)
print()

# Print per-class metrics (similar to VQA paper's per-class reporting)
# Note: VQA paper reports P, R, F1 per class - we do the same for emotion classes
print("PER-CLASS METRICS (Precision, Recall, F1):")
print("=" * 70)
print(f"{'Class':<15} {'Precision':<12} {'Recall':<12} {'F1-Score':<12}")
print("-" * 70)
for i, (emotion, p, r, f) in enumerate(zip(emotion_names, precision, recall, f1)):
    print(f"{emotion:<15} {p:<12.4f} {r:<12.4f} {f:<12.4f}")
print("=" * 70)
print()

# Print classification report
print("DETAILED CLASSIFICATION REPORT:")
print("=" * 70)
print(classification_report(all_labels, all_preds, target_names=emotion_names, zero_division=0))
print("=" * 70)

# Save to CSV
# Use absolute path to ensure it saves in the correct location
import os
test_results_file = os.path.join(os.getcwd(), 'test_results.csv')
# Alternative: if you want it in the same directory as the notebook:
# test_results_file = os.path.join(os.path.dirname(os.path.abspath('__file__')), 'test_results.csv')
print(f"Saving test results to: {test_results_file}")
test_results = {
    'Experiment_ID': TEST_EXPERIMENT_ID,  # Use the configurable experiment ID
    
    # Test Metrics
    'Test_Accuracy': f'{test_acc:.2f}%',
    'Macro_Precision': f'{test_precision_macro:.4f}',
    'Macro_Recall': f'{test_recall_macro:.4f}',
    'Macro_F1': f'{test_f1_macro:.4f}',
    
    # Training Metrics
    'Final_Train_Accuracy': f'{final_train_acc:.2f}%' if final_train_acc else '',
    'Final_Val_Accuracy': f'{final_val_acc:.2f}%' if final_val_acc else '',
    'Best_Val_Accuracy': f'{best_val_acc:.2f}%' if best_val_acc else '',
    
    # Hyperparameters (all configurable parameters)
    'Cross_Attention': 'On' if USE_CROSS_ATTENTION else 'Off',
    'Dropout': DROPOUT_P,
    'Learning_Rate': LEARNING_RATE,
    'Epochs': NUMBER_OF_EPOCHS,
    'Batch_Size': BATCH_SIZE,
    'Transformer_Layers': TRANSFORMER_NUM_LAYERS,
    'Transformer_Heads': TRANSFORMER_NHEAD,
    'Optimizer': OPTIMIZER_TYPE,
    'Data_Augmentation': USE_TRANSFORM,
}

# Add per-class metrics (P, R, F1 for each emotion class)
for i, emotion in enumerate(emotion_names):
    test_results[f'{emotion}_F1'] = f'{f1[i]:.4f}'
    test_results[f'{emotion}_Precision'] = f'{precision[i]:.4f}'
    test_results[f'{emotion}_Recall'] = f'{recall[i]:.4f}'

# Write to CSV
file_exists = os.path.exists(test_results_file)
with open(test_results_file, 'a', newline='') as csvfile:
    fieldnames = list(test_results.keys())
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
    
    if not file_exists:
        writer.writeheader()
    
    writer.writerow(test_results)

print(f"\n✓ Test results saved to {test_results_file}")
print(f"   File exists: {os.path.exists(test_results_file)}")
print(f"   File size: {os.path.getsize(test_results_file) if os.path.exists(test_results_file) else 0} bytes")
print()
print("=" * 70)
print("COMPARISON WITH VQA PAPER METRICS:")
print("=" * 70)
print("VQA Paper uses:")
print("  - VQA Accuracy (task accuracy) → Our: Test Accuracy")
print("  - Per-class P, R, F1 → Our: Per-emotion Precision, Recall, F1")
print("  - Macro-averaged metrics → Our: Macro-Precision, Macro-Recall, Macro-F1")
print("    (Mean across all 7 emotion classes)")
print()
print("Note: VQA paper also has grounding metrics (Overlap, IOU, Pointing Game)")
print("      which don't apply to emotion classification (no bounding boxes).")
print("=" * 70)
print()
print("Note: Test accuracy is your final unbiased performance metric.")
print("Validation accuracy was used during training for monitoring/tuning.")
