In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision import transforms, datasets
from transformers import CLIPModel
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import seaborn as sns
import numpy as np
from contextlib import nullcontext

In [None]:
class EmotionAdapter(nn.Module):
    """
    Adapter that projects CLIP image features to emotion classification space
    """
    def __init__(self, clip_dim=512, hidden_dim=1024, num_emotions=7, dropout=0.2):
        super().__init__()
        self.projection = nn.Sequential(
            nn.Linear(clip_dim, hidden_dim),
            nn.BatchNorm1d(hidden_dim),
            nn.ReLU(),
            nn.Dropout(dropout),
            
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.BatchNorm1d(hidden_dim // 2),
            nn.ReLU(),
            nn.Dropout(dropout),
            
            nn.Linear(hidden_dim // 2, hidden_dim // 4),
            nn.BatchNorm1d(hidden_dim // 4),
            nn.ReLU(),
            nn.Dropout(dropout),
        )
        
        self.classifier = nn.Linear(hidden_dim // 4, num_emotions)
        self.dropout = nn.Dropout(dropout)
        
        # Initialize weights
        self._init_weights()
    
    def _init_weights(self):
        for module in self.modules():
            if isinstance(module, nn.Linear):
                nn.init.xavier_uniform_(module.weight)
                if module.bias is not None:
                    nn.init.constant_(module.bias, 0)
    
    def forward(self, clip_features):
        """
        Args:
            clip_features: (batch_size, 512) - CLIP image features
        Returns:
            emotion_logits: (batch_size, num_emotions) - emotion classification logits
        """
        # Project through adapter layers
        projected = self.projection(clip_features)
        
        # Final classification
        logits = self.classifier(projected)
        
        return logits

In [None]:
class VisionEmotionClassifier(nn.Module):
    """
    Complete Vision-to-Emotion classifier using CLIP + Custom Adapter
    """
    def __init__(self, clip_model_name="openai/clip-vit-base-patch32", 
                 num_emotions=7, freeze_clip=True):
        super().__init__()
        
        # Load CLIP model
        self.clip_model = CLIPModel.from_pretrained(clip_model_name)
        self.clip_dim = self.clip_model.config.vision_config.hidden_size
        
        # Freeze CLIP parameters if specified
        if freeze_clip:
            for param in self.clip_model.parameters():
                param.requires_grad = False
            print("üîí CLIP parameters frozen")
        else:
            print("üîì CLIP parameters will be fine-tuned")
        
        # Emotion adapter
        self.emotion_adapter = EmotionAdapter(
            clip_dim=self.clip_dim,
            hidden_dim=1024,
            num_emotions=num_emotions,
            dropout=0.2
        )
        
        self.num_emotions = num_emotions
    
    def forward(self, images):
        """
        Args:
            images: (batch_size, 3, 224, 224) - preprocessed images
        Returns:
            emotion_logits: (batch_size, num_emotions)
        """
        # Extract image features using CLIP
        with torch.no_grad() if self.training == False else nullcontext():
            image_features = self.clip_model.get_image_features(images)
            # L2 normalize features
            image_features = F.normalize(image_features, p=2, dim=1)
        
        # Classify emotions
        emotion_logits = self.emotion_adapter(image_features)
        
        return emotion_logits
    
    def predict(self, images):
        """
        Get emotion predictions with probabilities
        """
        self.eval()
        with torch.no_grad():
            logits = self.forward(images)
            probabilities = F.softmax(logits, dim=1)
            predictions = torch.argmax(logits, dim=1)
        
        return predictions, probabilities

In [None]:
def debug_model_dimensions(model, sample_batch, device):
    """
    Debug function to check all tensor dimensions through the model
    """
    print("üîç Debugging model dimensions...")
    
    model.eval()
    images, labels = sample_batch
    images = images.to(device)
    
    print(f"Input images shape: {images.shape}")
    
    with torch.no_grad():
        # CLIP features
        clip_features = model.clip_model.get_image_features(images)
        print(f"CLIP features shape: {clip_features.shape}")
        
        # Normalized features
        normalized_features = F.normalize(clip_features, p=2, dim=1)
        print(f"Normalized features shape: {normalized_features.shape}")
        
        # Through adapter
        try:
            output = model.emotion_adapter(normalized_features)
            print(f"Adapter output shape: {output.shape}")
            print("‚úÖ Model forward pass successful!")
        except Exception as e:
            print(f"‚ùå Error in adapter: {e}")
            print(f"Expected adapter input dim: {model.emotion_adapter.projection[0].in_features}")
            print(f"Actual input dim: {normalized_features.shape[1]}")


In [None]:
def setup_training(model, train_loader, val_loader, device, 
                  learning_rate=1e-4, weight_decay=1e-4):
    """
    Setup optimizer, scheduler, and loss function
    """
    # Optimizer - different learning rates for CLIP vs adapter
    clip_params = []
    adapter_params = []
    
    for name, param in model.named_parameters():
        if param.requires_grad:
            if 'clip_model' in name:
                clip_params.append(param)
            else:
                adapter_params.append(param)
    
    optimizer = torch.optim.AdamW([
        {'params': clip_params, 'lr': learning_rate * 0.1},  # Lower LR for CLIP
        {'params': adapter_params, 'lr': learning_rate}       # Higher LR for adapter
    ], weight_decay=weight_decay)
    
    # Learning rate scheduler
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
        optimizer, T_max=len(train_loader) * 20, eta_min=1e-6
    )
    
    # Loss function with label smoothing
    criterion = nn.CrossEntropyLoss(label_smoothing=0.1)
    
    # Mixed precision setup
    device_type = "cuda" if device.type == "cuda" else "cpu"
    amp_dtype = torch.float16 if device_type == "cuda" else torch.bfloat16
    amp_ctx = (torch.amp.autocast(device_type=device_type, dtype=amp_dtype)
               if device_type == "cuda" else nullcontext())
    scaler = torch.amp.GradScaler(device_type, enabled=(device_type == "cuda"))
    
    return optimizer, scheduler, criterion, amp_ctx, scaler


def train_epoch(model, train_loader, optimizer, criterion, scaler, amp_ctx, device):
    """
    Train for one epoch
    """
    model.train()
    total_loss = 0
    correct = 0
    total = 0
    
    for batch_idx, (images, labels) in enumerate(train_loader):
        images, labels = images.to(device), labels.to(device)
        
        optimizer.zero_grad(set_to_none=True)
        
        with amp_ctx:
            outputs = model(images)
            loss = criterion(outputs, labels)
        
        # Backward pass with mixed precision
        if scaler.is_enabled():
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
        else:
            loss.backward()
            optimizer.step()
        
        # Statistics
        total_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        
        if batch_idx % 20 == 0:
            print(f'Batch [{batch_idx}/{len(train_loader)}], '
                  f'Loss: {loss.item():.4f}, '
                  f'Acc: {100.*correct/total:.2f}%')
    
    epoch_loss = total_loss / len(train_loader)
    epoch_acc = 100. * correct / total
    
    return epoch_loss, epoch_acc


def validate_epoch(model, val_loader, criterion, device):
    """
    Validate for one epoch
    """
    model.eval()
    total_loss = 0
    correct = 0
    total = 0
    all_predictions = []
    all_labels = []
    
    with torch.no_grad():
        for images, labels in val_loader:
            images, labels = images.to(device), labels.to(device)
            
            outputs = model(images)
            loss = criterion(outputs, labels)
            
            total_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            
            all_predictions.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    epoch_loss = total_loss / len(val_loader)
    epoch_acc = 100. * correct / total
    
    return epoch_loss, epoch_acc, all_predictions, all_labels

In [None]:
# STEP 5: LOAD GPT2
llm = GPT2LMHeadModel.from_pretrained("gpt2").cuda()
tokenizer = GPT2Tokenizer.from_pretrained("gpt2")
tokenizer.pad_token = tokenizer.eos_token
llm.train()
for param in llm.parameters():
    param.requires_grad = True

In [None]:
def train_emotion_classifier(train_dataset, val_dataset, emotion_classes, 
                           num_epochs=20, batch_size=64, device='cuda'):
    """
    Complete training pipeline for emotion classification
    """
    print("üöÄ Starting Pure Classification Training")
    print(f"üìä Train samples: {len(train_dataset)}")
    print(f"üìä Val samples: {len(val_dataset)}")
    print(f"üòä Emotion classes: {emotion_classes}")
    
    # Data loaders
    train_loader = DataLoader(train_dataset, batch_size=batch_size, 
                            shuffle=True, num_workers=4, pin_memory=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, 
                          shuffle=False, num_workers=4, pin_memory=True)
    
    # Model
    model = VisionEmotionClassifier(
        num_emotions=len(emotion_classes),
        freeze_clip=True  # Start with frozen CLIP
    ).to(device)
    
    print(f"üìê Model parameters: {sum(p.numel() for p in model.parameters() if p.requires_grad):,}")
    
    # Debug dimensions with a sample batch
    sample_batch = next(iter(train_loader))
    debug_model_dimensions(model, sample_batch, device)
    
    # Training setup
    optimizer, scheduler, criterion, amp_ctx, scaler = setup_training(
        model, train_loader, val_loader, device
    )
    
    # Training history
    train_losses, train_accs = [], []
    val_losses, val_accs = [], []
    best_val_acc = 0
    
    # Training loop
    for epoch in range(num_epochs):
        print(f"\n{'='*60}")
        print(f"üîÑ Epoch {epoch+1}/{num_epochs}")
        print(f"{'='*60}")
        
        # Train
        train_loss, train_acc = train_epoch(
            model, train_loader, optimizer, criterion, scaler, amp_ctx, device
        )
        
        # Validate
        val_loss, val_acc, val_preds, val_labels = validate_epoch(
            model, val_loader, criterion, device
        )
        
        # Update scheduler
        scheduler.step()
        
        # Save best model
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            torch.save({
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'val_acc': val_acc,
                'emotion_classes': emotion_classes
            }, 'best_emotion_classifier.pth')
            print(f"üíæ Best model saved! Val Acc: {val_acc:.2f}%")
        
        # Store history
        train_losses.append(train_loss)
        train_accs.append(train_acc)
        val_losses.append(val_loss)
        val_accs.append(val_acc)
        
        # Print epoch summary
        print(f"üìà Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%")
        print(f"üìä Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%")
        print(f"üéØ Best Val Acc: {best_val_acc:.2f}%")
        
        # Unfreeze CLIP after some epochs for fine-tuning
        if epoch == 10:
            print("üîì Unfreezing CLIP for fine-tuning...")
            for param in model.clip_model.parameters():
                param.requires_grad = True
            # Update optimizer to include CLIP parameters
            optimizer.add_param_group({'params': model.clip_model.parameters(), 'lr': 1e-6})
    
    return model, (train_losses, train_accs, val_losses, val_accs)

In [None]:
# STEP 7: TRAIN ADAPTER FOR FEW STEPS
llm.eval()
adapter.train()
EPOCHS = 3
for epoch in range(EPOCHS):
    print(f"\nEpoch {epoch+1}/{EPOCHS}")
    for i, (images, labels) in enumerate(dataloader):
        images = images.cuda()
        labels = labels.cuda()

        # 1. Get CLIP image features (no gradient needed here)
        with torch.no_grad():
            image_features = clip_model.get_image_features(images)
            image_features = image_features / image_features.norm(dim=-1, keepdim=True)

        # 2. Adapt features
        adapted = adapter(image_features).unsqueeze(1)  # shape: (batch_size, 1, 768)

        # 3. Generate text inputs
        prompt_text = "Describe the emotion the person is feeling in a full sentence:"
        prompts = [prompt_text + " " + class_names[label.item()] for label in labels]

        tokenized = tokenizer(prompts, return_tensors="pt", padding=True)
        input_ids = tokenized.input_ids.cuda()
        attention_mask = tokenized.attention_mask.cuda()
        labels_text = input_ids.clone()

        # Text embeddings
        text_embeds = llm.transformer.wte(input_ids)

        # Combine adapted visual embeddings + text
        inputs_embeds = torch.cat([adapted, text_embeds], dim=1)

        # Mask the visual token for loss
        ignore = torch.full((labels.size(0), 1), -100).cuda()
        labels_text = torch.cat([ignore, labels_text], dim=1)

        # 4. Forward + loss (outside no_grad!)
        outputs = llm(inputs_embeds=inputs_embeds, labels=labels_text)
        loss = outputs.loss

        # 5. Backward
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if i % 10 == 0:
            print(f"Step {i}, Loss: {loss.item():.4f}")


In [None]:
def evaluate_model(model, test_loader, emotion_classes, device):
    """
    Comprehensive evaluation of the trained model
    """
    model.eval()
    all_predictions = []
    all_labels = []
    all_probabilities = []
    
    print("üîç Running evaluation...")
    
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            
            predictions, probabilities = model.predict(images)
            
            all_predictions.extend(predictions.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
            all_probabilities.extend(probabilities.cpu().numpy())
    
    # Convert to numpy arrays
    all_predictions = np.array(all_predictions)
    all_labels = np.array(all_labels)
    all_probabilities = np.array(all_probabilities)
    
    # Calculate metrics
    accuracy = accuracy_score(all_labels, all_predictions)
    
    print(f"üéØ Test Accuracy: {accuracy:.4f}")
    print("\nüìä Classification Report:")
    print(classification_report(all_labels, all_predictions, 
                              target_names=emotion_classes))
    
    # Confusion Matrix
    cm = confusion_matrix(all_labels, all_predictions)
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=emotion_classes, yticklabels=emotion_classes)
    plt.title('Confusion Matrix')
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')
    plt.show()
    
    return accuracy, all_predictions, all_probabilities


def visualize_predictions(model, test_loader, emotion_classes, device, num_samples=8):
    """
    Visualize model predictions on test samples
    """
    model.eval()
    
    # Get one batch
    images, labels = next(iter(test_loader))
    images, labels = images.to(device), labels.to(device)
    
    # Get predictions
    predictions, probabilities = model.predict(images[:num_samples])
    
    # Denormalize images for visualization
    mean = torch.tensor([0.4815, 0.4578, 0.4082]).view(3, 1, 1)
    std = torch.tensor([0.2686, 0.2613, 0.2758]).view(3, 1, 1)
    
    plt.figure(figsize=(16, 8))
    for i in range(num_samples):
        plt.subplot(2, 4, i+1)
        
        # Denormalize image
        img = images[i].cpu() * std + mean
        img = torch.clamp(img, 0, 1)
        
        plt.imshow(img.permute(1, 2, 0))
        plt.axis('off')
        
        # Get prediction info
        true_emotion = emotion_classes[labels[i]]
        pred_emotion = emotion_classes[predictions[i]]
        confidence = probabilities[i][predictions[i]] * 100
        
        # Color coding: green if correct, red if wrong
        color = 'green' if predictions[i] == labels[i] else 'red'
        
        plt.title(f'True: {true_emotion}\nPred: {pred_emotion} ({confidence:.1f}%)', 
                 color=color, fontsize=10)
    
    plt.tight_layout()
    plt.show()


In [None]:
if __name__ == "__main__":
    # Setup
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"üîß Using device: {device}")
    
    # Data transforms 
    image_transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize([0.4815, 0.4578, 0.4082], [0.2686, 0.2613, 0.2758]),
    ])
    
    # Load datasets 
    TRAIN_PATH = "data/emotion-detection-fer/train"
    TEST_PATH  = "data/emotion-detection-fer/test"
    
    train_dataset = datasets.ImageFolder(TRAIN_PATH, transform=image_transform)
    test_dataset = datasets.ImageFolder(TEST_PATH, transform=image_transform)
    
    emotion_classes = train_dataset.classes
    
    # Split training set for validation (80-20 split)
    train_size = int(0.8 * len(train_dataset))
    val_size = len(train_dataset) - train_size
    train_dataset, val_dataset = torch.utils.data.random_split(
        train_dataset, [train_size, val_size]
    )
    
    # Train the model
    model, training_history = train_emotion_classifier(
        train_dataset=train_dataset,
        val_dataset=val_dataset,
        emotion_classes=emotion_classes,
        num_epochs=20,
        batch_size=64,
        device=device
    )
    
    # Evaluate on test set
    test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
    accuracy, predictions, probabilities = evaluate_model(
        model, test_loader, emotion_classes, device
    )
    
    # Visualize some predictions
    visualize_predictions(model, test_loader, emotion_classes, device)
    
    print("‚úÖ Training and evaluation complete!")