# CNN Baseline Experiment - Full 32 Subject LOSO

## Development of an Explainable EEG Emotion Recognition Dashboard Using CNN and CLIP Models

---

### Research Questions:
- **RQ1:** How does CLIP-based transfer learning compare to CNN trained from scratch for EEG emotion classification?
- **RQ2:** How well do these approaches generalise across different subjects?
- **RQ3:** What features do the models learn, and do they align with known neurophysiological markers?

### Research Objectives:
- **RO1:** Implement and evaluate a CNN baseline for EEG spectrogram classification
- **RO2:** Compare CNN vs CLIP using Leave-One-Subject-Out cross-validation
- **RO3:** Analyse model interpretability using XAI techniques

---

### This Notebook:
Runs the **CNN baseline** with full 32-subject LOSO cross-validation on the DEAP dataset.

**Requirements:** Colab Pro (25GB RAM, T4/A100 GPU)  
**Estimated Time:** 1-2 hours with T4 GPU

In [None]:
#@title Step 1: System Check { display-mode: "form" }
import torch
import psutil
import os

print("=" * 60)
print("SYSTEM INFORMATION")
print("=" * 60)

# RAM Check
ram_gb = psutil.virtual_memory().total / (1024**3)
print(f"\nRAM: {ram_gb:.1f} GB", end="")
if ram_gb >= 20:
    print(" ✓ (Colab Pro detected)")
else:
    print(" ⚠ (Consider upgrading to Colab Pro for 25GB+ RAM)")

# GPU Check
print(f"\nPyTorch Version: {torch.__version__}")
print(f"CUDA Available: {torch.cuda.is_available()}")

if torch.cuda.is_available():
    gpu_name = torch.cuda.get_device_name(0)
    gpu_mem = torch.cuda.get_device_properties(0).total_memory / (1024**3)
    print(f"GPU: {gpu_name} ({gpu_mem:.1f} GB) ✓")
    DEVICE = torch.device('cuda')
else:
    print("\n❌ ERROR: No GPU detected!")
    print("Go to: Runtime > Change runtime type > GPU")
    DEVICE = torch.device('cpu')

print(f"\nUsing: {DEVICE}")
print("=" * 60)

In [None]:
#@title Step 2: Upload Data { display-mode: "form" }
from google.colab import files
import zipfile

print("Please upload: spectrogram_cache.zip")
print("(Located at: results/spectrogram_cache.zip in your project)\n")

uploaded = files.upload()

# Extract uploaded file
for filename in uploaded.keys():
    if filename.endswith('.zip'):
        print(f"\nExtracting {filename}...")
        with zipfile.ZipFile(filename, 'r') as zf:
            zf.extractall('.')
        print("Extraction complete!")

# Verify extraction
import glob
npz_files = glob.glob('spectrogram_cache/*.npz')
print(f"\nFound {len(npz_files)} subject files ✓")

In [None]:
#@title Step 3: Configuration { display-mode: "form" }
import json
from datetime import datetime

CONFIG = {
    # Paths
    'cache_path': 'spectrogram_cache',
    'results_path': 'cnn_results',
    
    # Experiment
    'num_subjects': 32,
    'image_size': 224,
    
    # Training
    'batch_size': 32,
    'epochs': 15,
    'learning_rate': 0.001,
    'weight_decay': 0.01,
    'early_stopping_patience': 5,
    
    # Reproducibility
    'random_seed': 42,
}

# Create results directory
os.makedirs(CONFIG['results_path'], exist_ok=True)

print("EXPERIMENT CONFIGURATION")
print("=" * 40)
for key, value in CONFIG.items():
    print(f"{key:25s}: {value}")
print("=" * 40)

In [None]:
#@title Step 4: Import Libraries { display-mode: "form" }
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import (
    accuracy_score, f1_score, precision_score, recall_score,
    balanced_accuracy_score, roc_auc_score, confusion_matrix
)
import gc

# Set random seeds
np.random.seed(CONFIG['random_seed'])
torch.manual_seed(CONFIG['random_seed'])
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(CONFIG['random_seed'])

print("All libraries imported ✓")

In [None]:
#@title Step 5: Define CNN Model { display-mode: "form" }

class EEG_CNN(nn.Module):
    """
    Lightweight CNN for EEG Spectrogram Emotion Classification.
    
    Architecture:
    ================================================
    Input: (batch, 3, 224, 224) - RGB spectrogram
    
    Conv Block 1: Conv2d(32) -> BatchNorm -> ReLU -> MaxPool
        Output: (batch, 32, 112, 112)
    
    Conv Block 2: Conv2d(64) -> BatchNorm -> ReLU -> MaxPool
        Output: (batch, 64, 56, 56)
    
    Conv Block 3: Conv2d(128) -> BatchNorm -> ReLU -> MaxPool
        Output: (batch, 128, 28, 28)
    
    Global Average Pooling -> (batch, 128)
    
    MLP Classifier:
        Linear(128, 256) -> ReLU -> Dropout(0.3)
        Linear(256, 128) -> ReLU -> Dropout(0.2)
        Linear(128, 2) -> Output logits
    """
    
    def __init__(self, num_classes=2):
        super(EEG_CNN, self).__init__()
        
        # Convolutional Blocks
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.pool1 = nn.MaxPool2d(2, 2)
        
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.pool2 = nn.MaxPool2d(2, 2)
        
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        self.pool3 = nn.MaxPool2d(2, 2)
        
        # Global Average Pooling
        self.global_pool = nn.AdaptiveAvgPool2d(1)
        
        # MLP Classifier
        self.fc1 = nn.Linear(128, 256)
        self.dropout1 = nn.Dropout(0.3)
        self.fc2 = nn.Linear(256, 128)
        self.dropout2 = nn.Dropout(0.2)
        self.fc3 = nn.Linear(128, num_classes)
    
    def forward(self, x):
        # Conv Block 1
        x = self.pool1(F.relu(self.bn1(self.conv1(x))))
        # Conv Block 2
        x = self.pool2(F.relu(self.bn2(self.conv2(x))))
        # Conv Block 3
        x = self.pool3(F.relu(self.bn3(self.conv3(x))))
        # Global Pooling
        x = self.global_pool(x)
        x = x.view(x.size(0), -1)
        # MLP
        x = self.dropout1(F.relu(self.fc1(x)))
        x = self.dropout2(F.relu(self.fc2(x)))
        x = self.fc3(x)
        return x

# Validate model
model = EEG_CNN().to(DEVICE)
dummy = torch.randn(2, 3, 224, 224).to(DEVICE)
out = model(dummy)
params = sum(p.numel() for p in model.parameters())

print("CNN MODEL ARCHITECTURE")
print("=" * 40)
print(f"Input shape:  (batch, 3, 224, 224)")
print(f"Output shape: {tuple(out.shape)}")
print(f"Parameters:   {params:,}")
print("=" * 40)

del model, dummy, out
torch.cuda.empty_cache()

In [None]:
#@title Step 6: Data Loading Functions { display-mode: "form" }

def load_subject(subject_id):
    """
    Load spectrogram data for a single subject.
    
    Returns:
        specs: (N, C, H, W) float32 tensor normalized to [0,1]
        valence: (N,) int64 binary labels
        arousal: (N,) int64 binary labels
    """
    path = f"{CONFIG['cache_path']}/s{subject_id:02d}_spectrograms.npz"
    
    if not os.path.exists(path):
        return None, None, None
    
    data = np.load(path)
    specs = data['spectrograms'].astype(np.float32) / 255.0
    valence = data['valence'].astype(np.int64)
    arousal = data['arousal'].astype(np.int64)
    data.close()
    
    # Convert (N, H, W, C) -> (N, C, H, W)
    specs = np.transpose(specs, (0, 3, 1, 2))
    
    return specs, valence, arousal


# Discover available subjects
available_subjects = []
for s in range(1, CONFIG['num_subjects'] + 1):
    path = f"{CONFIG['cache_path']}/s{s:02d}_spectrograms.npz"
    if os.path.exists(path):
        available_subjects.append(s)

print(f"Available subjects: {len(available_subjects)}/{CONFIG['num_subjects']}")
print(f"Subject IDs: {available_subjects}")

# Test loading one subject
test_specs, test_v, test_a = load_subject(available_subjects[0])
print(f"\nSample data shape: {test_specs.shape}")
print(f"Valence labels: {np.bincount(test_v)} (neg/pos)")
print(f"Arousal labels: {np.bincount(test_a)} (neg/pos)")
del test_specs, test_v, test_a

In [None]:
#@title Step 7: Training Function { display-mode: "form" }

def train_loso_fold(train_subjects, test_subject, label_key, device=DEVICE):
    """
    Train CNN for one Leave-One-Subject-Out fold.
    
    Args:
        train_subjects: List of subject IDs for training
        test_subject: Subject ID for testing (held out)
        label_key: 'valence' or 'arousal'
        device: torch device
    
    Returns:
        metrics: Dictionary with all evaluation metrics
    """
    
    # === Load Training Data ===
    X_train_list, y_train_list = [], []
    
    for subj in train_subjects:
        specs, valence, arousal = load_subject(subj)
        if specs is not None:
            X_train_list.append(specs)
            y_train_list.append(valence if label_key == 'valence' else arousal)
    
    X_train = np.concatenate(X_train_list, axis=0)
    y_train = np.concatenate(y_train_list, axis=0)
    del X_train_list, y_train_list
    
    # === Load Test Data ===
    X_test, valence, arousal = load_subject(test_subject)
    y_test = valence if label_key == 'valence' else arousal
    
    # === Create DataLoaders ===
    train_dataset = TensorDataset(
        torch.FloatTensor(X_train),
        torch.LongTensor(y_train)
    )
    test_dataset = TensorDataset(
        torch.FloatTensor(X_test),
        torch.LongTensor(y_test)
    )
    
    train_loader = DataLoader(
        train_dataset, 
        batch_size=CONFIG['batch_size'], 
        shuffle=True,
        num_workers=2,
        pin_memory=True
    )
    test_loader = DataLoader(
        test_dataset, 
        batch_size=CONFIG['batch_size'],
        num_workers=2,
        pin_memory=True
    )
    
    # Free memory
    n_train, n_test = len(X_train), len(X_test)
    pos_rate = float(np.mean(y_test))
    del X_train, X_test
    gc.collect()
    torch.cuda.empty_cache()
    
    # === Initialize Model ===
    model = EEG_CNN(num_classes=2).to(device)
    
    # Class weights for imbalanced data
    class_counts = np.bincount(y_train, minlength=2)
    class_weights = torch.FloatTensor(
        [len(y_train) / (2 * max(c, 1)) for c in class_counts]
    ).to(device)
    
    criterion = nn.CrossEntropyLoss(weight=class_weights)
    optimizer = optim.Adam(
        model.parameters(), 
        lr=CONFIG['learning_rate'], 
        weight_decay=CONFIG['weight_decay']
    )
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(
        optimizer, mode='min', patience=2, factor=0.5
    )
    
    # === Training Loop ===
    best_loss = float('inf')
    best_state = None
    patience_counter = 0
    
    for epoch in range(CONFIG['epochs']):
        model.train()
        epoch_loss = 0.0
        
        for batch_X, batch_y in train_loader:
            batch_X = batch_X.to(device)
            batch_y = batch_y.to(device)
            
            optimizer.zero_grad()
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y)
            loss.backward()
            
            # Gradient clipping
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            
            optimizer.step()
            epoch_loss += loss.item()
        
        avg_loss = epoch_loss / len(train_loader)
        scheduler.step(avg_loss)
        
        # Early stopping check
        if avg_loss < best_loss:
            best_loss = avg_loss
            best_state = {k: v.cpu().clone() for k, v in model.state_dict().items()}
            patience_counter = 0
        else:
            patience_counter += 1
            if patience_counter >= CONFIG['early_stopping_patience']:
                break
    
    # Load best model
    if best_state is not None:
        model.load_state_dict(best_state)
    
    # === Evaluation ===
    model.eval()
    all_preds = []
    all_probs = []
    all_labels = []
    
    with torch.no_grad():
        for batch_X, batch_y in test_loader:
            batch_X = batch_X.to(device)
            outputs = model(batch_X)
            probs = F.softmax(outputs, dim=1)
            preds = torch.argmax(probs, dim=1)
            
            all_preds.extend(preds.cpu().numpy())
            all_probs.extend(probs[:, 1].cpu().numpy())
            all_labels.extend(batch_y.numpy())
    
    y_true = np.array(all_labels)
    y_pred = np.array(all_preds)
    y_prob = np.array(all_probs)
    
    # === Calculate Metrics ===
    metrics = {
        'accuracy': float(accuracy_score(y_true, y_pred)),
        'f1_score': float(f1_score(y_true, y_pred, zero_division=0)),
        'precision': float(precision_score(y_true, y_pred, zero_division=0)),
        'recall': float(recall_score(y_true, y_pred, zero_division=0)),
        'balanced_accuracy': float(balanced_accuracy_score(y_true, y_pred)),
        'n_train': n_train,
        'n_test': n_test,
        'pos_rate': pos_rate,
    }
    
    # ROC-AUC (handle edge cases)
    try:
        if len(np.unique(y_true)) > 1:
            metrics['roc_auc'] = float(roc_auc_score(y_true, y_prob))
        else:
            metrics['roc_auc'] = 0.5
    except:
        metrics['roc_auc'] = 0.5
    
    # === Cleanup ===
    del model, train_loader, test_loader, train_dataset, test_dataset
    torch.cuda.empty_cache()
    gc.collect()
    
    return metrics

print("Training function defined ✓")

In [None]:
#@title Step 8: Checkpoint Functions { display-mode: "form" }

def save_checkpoint(results):
    """Save results checkpoint after each fold."""
    path = f"{CONFIG['results_path']}/checkpoint.json"
    with open(path, 'w') as f:
        json.dump(results, f, indent=2)

def load_checkpoint():
    """Load existing checkpoint if available."""
    path = f"{CONFIG['results_path']}/checkpoint.json"
    if os.path.exists(path):
        with open(path, 'r') as f:
            return json.load(f)
    return None

print("Checkpoint functions ready ✓")
print("Results will be saved after each subject to prevent data loss.")

In [None]:
#@title Step 9: Run Complete LOSO Experiment { display-mode: "form" }

print("=" * 70)
print("CNN BASELINE EXPERIMENT - FULL 32-SUBJECT LOSO CROSS-VALIDATION")
print("=" * 70)
print(f"\nStarted: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Device:  {DEVICE}")
print(f"Subjects: {len(available_subjects)}")

# Initialize or load results
checkpoint = load_checkpoint()

if checkpoint:
    results = checkpoint
    print("\n✓ Resuming from checkpoint")
else:
    results = {
        'valence': {'per_subject': {}},
        'arousal': {'per_subject': {}},
        'config': CONFIG,
        'model': 'EEG_CNN',
        'architecture': {
            'type': '3 Conv Blocks + Global Avg Pool + MLP',
            'conv_channels': [32, 64, 128],
            'mlp_dims': [128, 256, 128, 2],
            'dropout': [0.3, 0.2]
        },
        'started': datetime.now().isoformat()
    }
    print("\n✓ Starting fresh experiment")

# Run experiment for each task
for task in ['valence', 'arousal']:
    print(f"\n{'='*70}")
    print(f"TASK: {task.upper()}")
    print(f"{'='*70}")
    
    completed = set(results[task]['per_subject'].keys())
    print(f"Completed: {len(completed)}/{len(available_subjects)} subjects")
    
    for idx, test_subj in enumerate(available_subjects):
        subj_key = str(test_subj)
        
        # Skip if already completed
        if subj_key in completed:
            m = results[task]['per_subject'][subj_key]
            print(f"[{idx+1:2d}/{len(available_subjects)}] S{test_subj:02d}: DONE (F1={m['f1_score']:.3f})")
            continue
        
        print(f"\n[{idx+1:2d}/{len(available_subjects)}] S{test_subj:02d}: Training...", end=" ")
        
        # Get training subjects (all except test)
        train_subjs = [s for s in available_subjects if s != test_subj]
        
        # Train and evaluate
        metrics = train_loso_fold(train_subjs, test_subj, task)
        
        # Store results
        results[task]['per_subject'][subj_key] = metrics
        
        # Save checkpoint
        save_checkpoint(results)
        
        # Print results
        print(f"F1={metrics['f1_score']:.3f}, Acc={metrics['accuracy']:.3f}, "
              f"BAcc={metrics['balanced_accuracy']:.3f}, Pos={metrics['pos_rate']:.1%} [Saved]")

print(f"\n{'='*70}")
print("ALL FOLDS COMPLETE!")
print(f"{'='*70}")

In [None]:
#@title Step 10: Calculate Summary Statistics { display-mode: "form" }

print("\n" + "=" * 70)
print("CALCULATING SUMMARY STATISTICS")
print("=" * 70)

for task in ['valence', 'arousal']:
    per_subject = results[task]['per_subject']
    
    if not per_subject:
        continue
    
    metrics_list = list(per_subject.values())
    n = len(metrics_list)
    
    # Calculate all summary statistics
    results[task]['summary'] = {
        # F1 Score
        'mean_f1': float(np.mean([m['f1_score'] for m in metrics_list])),
        'std_f1': float(np.std([m['f1_score'] for m in metrics_list])),
        'min_f1': float(np.min([m['f1_score'] for m in metrics_list])),
        'max_f1': float(np.max([m['f1_score'] for m in metrics_list])),
        
        # Accuracy
        'mean_accuracy': float(np.mean([m['accuracy'] for m in metrics_list])),
        'std_accuracy': float(np.std([m['accuracy'] for m in metrics_list])),
        
        # Balanced Accuracy
        'mean_balanced_accuracy': float(np.mean([m['balanced_accuracy'] for m in metrics_list])),
        'std_balanced_accuracy': float(np.std([m['balanced_accuracy'] for m in metrics_list])),
        
        # Precision & Recall
        'mean_precision': float(np.mean([m['precision'] for m in metrics_list])),
        'std_precision': float(np.std([m['precision'] for m in metrics_list])),
        'mean_recall': float(np.mean([m['recall'] for m in metrics_list])),
        'std_recall': float(np.std([m['recall'] for m in metrics_list])),
        
        # ROC-AUC
        'mean_roc_auc': float(np.mean([m['roc_auc'] for m in metrics_list])),
        'std_roc_auc': float(np.std([m['roc_auc'] for m in metrics_list])),
        
        # Metadata
        'num_subjects': n,
        'total_test_samples': sum(m['n_test'] for m in metrics_list),
    }
    
    # Print summary
    s = results[task]['summary']
    print(f"\n{task.upper()} ({n} subjects):")
    print(f"  F1 Score:       {s['mean_f1']:.3f} ± {s['std_f1']:.3f}  [{s['min_f1']:.3f} - {s['max_f1']:.3f}]")
    print(f"  Accuracy:       {s['mean_accuracy']:.3f} ± {s['std_accuracy']:.3f}")
    print(f"  Balanced Acc:   {s['mean_balanced_accuracy']:.3f} ± {s['std_balanced_accuracy']:.3f}")
    print(f"  Precision:      {s['mean_precision']:.3f} ± {s['std_precision']:.3f}")
    print(f"  Recall:         {s['mean_recall']:.3f} ± {s['std_recall']:.3f}")
    print(f"  ROC-AUC:        {s['mean_roc_auc']:.3f} ± {s['std_roc_auc']:.3f}")

# Add completion timestamp
results['completed'] = datetime.now().isoformat()

# Save final results
final_path = f"{CONFIG['results_path']}/cnn_baseline_results.json"
with open(final_path, 'w') as f:
    json.dump(results, f, indent=2)

print(f"\n✓ Results saved to: {final_path}")

In [None]:
#@title Step 11: Results Table { display-mode: "form" }

print("\n" + "=" * 70)
print("RESULTS - CNN BASELINE PERFORMANCE")
print("=" * 70)

v = results['valence']['summary']
a = results['arousal']['summary']

print("\n┌─────────────────────┬──────────────────────┬──────────────────────┐")
print("│ Metric              │ Valence              │ Arousal              │")
print("├─────────────────────┼──────────────────────┼──────────────────────┤")
print(f"│ F1 Score            │ {v['mean_f1']:.3f} ± {v['std_f1']:.3f}          │ {a['mean_f1']:.3f} ± {a['std_f1']:.3f}          │")
print(f"│ Accuracy            │ {v['mean_accuracy']:.3f} ± {v['std_accuracy']:.3f}          │ {a['mean_accuracy']:.3f} ± {a['std_accuracy']:.3f}          │")
print(f"│ Balanced Accuracy   │ {v['mean_balanced_accuracy']:.3f}                │ {a['mean_balanced_accuracy']:.3f}                │")
print(f"│ Precision           │ {v['mean_precision']:.3f} ± {v['std_precision']:.3f}          │ {a['mean_precision']:.3f} ± {a['std_precision']:.3f}          │")
print(f"│ Recall              │ {v['mean_recall']:.3f} ± {v['std_recall']:.3f}          │ {a['mean_recall']:.3f} ± {a['std_recall']:.3f}          │")
print(f"│ ROC-AUC             │ {v['mean_roc_auc']:.3f}                │ {a['mean_roc_auc']:.3f}                │")
print(f"│ Min F1              │ {v['min_f1']:.3f}                │ {a['min_f1']:.3f}                │")
print(f"│ Max F1              │ {v['max_f1']:.3f}                │ {a['max_f1']:.3f}                │")
print("└─────────────────────┴──────────────────────┴──────────────────────┘")

print(f"\nEvaluation: Leave-One-Subject-Out (LOSO) Cross-Validation")
print(f"Subjects: {v['num_subjects']}")
print(f"Total test samples: {v['total_test_samples']:,}")

In [None]:
#@title Step 12: Download Results { display-mode: "form" }
from google.colab import files
import shutil

# Create zip archive
zip_name = 'cnn_baseline_results'
shutil.make_archive(zip_name, 'zip', CONFIG['results_path'])

print("DOWNLOAD INSTRUCTIONS")
print("=" * 50)
print("\n1. Click the download link below")
print("2. Extract the ZIP file")
print("3. Copy 'cnn_baseline_results.json' to:")
print("   results/cnn_baseline_experiment/\n")

# Trigger download
files.download(f'{zip_name}.zip')

print("\n✓ Experiment complete!")