# Attention-based CNN for Facial Expression Recognition

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Install required packages
!pip install wandb torch torchvision pandas numpy matplotlib seaborn scikit-learn

# Set up Kaggle API
!pip install kaggle

In [None]:
# Upload your kaggle.json to Colab and run:
!mkdir -p ~/.kaggle
!cp /content/drive/MyDrive/ColabNotebooks/kaggle_API_credentials/kaggle.json ~/.kaggle/kaggle.json
! chmod 600 ~/.kaggle/kaggle.json

In [None]:
# Download the dataset
!kaggle competitions download -c challenges-in-representation-learning-facial-expression-recognition-challenge
!unzip -q challenges-in-representation-learning-facial-expression-recognition-challenge.zip

In [None]:

import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from sklearn.metrics import classification_report, confusion_matrix
import wandb
from datetime import datetime
from tqdm import tqdm


In [None]:

# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# Configuration
CONFIG = {
    'model_name': 'attention_cnn',
    'batch_size': 64,
    'learning_rate': 0.001,
    'epochs': 50,
    'image_size': 48,
    'num_classes': 7,
    'random_seed': 42,
    'weight_decay': 1e-4,
    'dropout_rate': 0.5,
    'attention_heads': 4  # Number of attention heads
}

# Set random seeds for reproducibility
torch.manual_seed(CONFIG['random_seed'])
np.random.seed(CONFIG['random_seed'])

# Initialize Weights & Biases for experiment tracking
wandb.init(
    project="facial-expression-recognition",
    name=f"{CONFIG['model_name']}_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
    config=CONFIG,
    job_type="training"
)


## Dataset and Data Loading

In [None]:

class FERDataset(Dataset):
    def __init__(self, dataframe, indices, transform=None):
        self.data = dataframe.iloc[indices].reset_index(drop=True)
        self.transform = transform
        
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        pixels = self.data.iloc[idx]['pixels']
        image = np.array(pixels.split(), dtype=np.uint8).reshape(48, 48)
        image = image.astype(np.float32) / 255.0
        image = torch.from_numpy(image).unsqueeze(0)  # Add channel dimension
        
        if self.transform:
            image = self.transform(image)
            
        label = int(self.data.iloc[idx]['emotion'])
        return image, label


## Attention Mechanisms

In [None]:

class SpatialAttention(nn.Module):
    def __init__(self, in_channels, reduction_ratio=8):
        super(SpatialAttention, self).__init__()
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.max_pool = nn.AdaptiveMaxPool2d(1)
        
        self.fc = nn.Sequential(
            nn.Conv2d(in_channels, in_channels // reduction_ratio, 1, bias=False),
            nn.ReLU(inplace=True),
            nn.Conv2d(in_channels // reduction_ratio, in_channels, 1, bias=False),
            nn.Sigmoid()
        )
    
    def forward(self, x):
        avg_out = self.fc(self.avg_pool(x))
        max_out = self.fc(self.max_pool(x))
        out = avg_out + max_out
        return x * out

class ChannelAttention(nn.Module):
    def __init__(self, in_channels, reduction_ratio=8):
        super(ChannelAttention, self).__init__()
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.max_pool = nn.AdaptiveMaxPool2d(1)
        
        self.fc = nn.Sequential(
            nn.Linear(in_channels, in_channels // reduction_ratio, bias=False),
            nn.ReLU(inplace=True),
            nn.Linear(in_channels // reduction_ratio, in_channels, bias=False),
            nn.Sigmoid()
        )
    
    def forward(self, x):
        b, c, _, _ = x.size()
        avg_out = self.fc(self.avg_pool(x).view(b, c))
        max_out = self.fc(self.max_pool(x).view(b, c))
        out = (avg_out + max_out).view(b, c, 1, 1)
        return x * out

class CBAM(nn.Module):
    """Convolutional Block Attention Module"""
    def __init__(self, in_channels, reduction_ratio=8):
        super(CBAM, self).__init__()
        self.channel_att = ChannelAttention(in_channels, reduction_ratio)
        self.spatial_att = SpatialAttention(in_channels, reduction_ratio)
    
    def forward(self, x):
        x = self.channel_att(x)
        x = self.spatial_att(x)
        return x


In [None]:

class AttentionCNN(nn.Module):
    def __init__(self, num_classes=7, dropout_rate=0.5):
        super(AttentionCNN, self).__init__()
        
        # Initial conv block
        self.conv1 = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            nn.Conv2d(32, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2)
        )
        
        # Attention after first block
        self.att1 = CBAM(32)
        
        # Second conv block
        self.conv2 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2)
        )
        
        # Attention after second block
        self.att2 = CBAM(64)
        
        # Third conv block
        self.conv3 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2)
        )
        
        # Attention after third block
        self.att3 = CBAM(128)
        
        # Global average pooling
        self.global_avg_pool = nn.AdaptiveAvgPool2d((1, 1))
        
        # Fully connected layers
        self.fc = nn.Sequential(
            nn.Linear(128, 256),
            nn.BatchNorm1d(256),
            nn.ReLU(inplace=True),
            nn.Dropout(dropout_rate),
            nn.Linear(256, num_classes)
        )
    
    def forward(self, x):
        # First block
        x = self.conv1(x)
        x = self.att1(x)
        
        # Second block
        x = self.conv2(x)
        x = self.att2(x)
        
        # Third block
        x = self.conv3(x)
        x = self.att3(x)
        
        # Global average pooling and flatten
        x = self.global_avg_pool(x)
        x = x.view(x.size(0), -1)
        
        # Fully connected layers
        x = self.fc(x)
        
        return x


In [None]:

def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=50):
    best_val_acc = 0.0
    
    for epoch in range(num_epochs):
        # Training phase
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0
        
        train_bar = tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs} [Train]')
        
        for inputs, labels in train_bar:
            inputs, labels = inputs.to(device), labels.to(device)
            
            # Zero the parameter gradients
            optimizer.zero_grad()
            
            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            
            # Backward pass and optimize
            loss.backward()
            optimizer.step()
            
            # Statistics
            running_loss += loss.item() * inputs.size(0)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            
            # Update progress bar
            train_bar.set_postfix({
                'loss': running_loss / total,
                'acc': 100. * correct / total
            })
        
        # Log training metrics
        epoch_loss = running_loss / len(train_loader.dataset)
        epoch_acc = 100. * correct / len(train_loader.dataset)
        
        wandb.log({
            'train/loss': epoch_loss,
            'train/accuracy': epoch_acc,
            'epoch': epoch
        })
        
        # Validation phase
        val_loss, val_acc = evaluate_model(model, val_loader, criterion)
        
        # Log validation metrics
        wandb.log({
            'val/loss': val_loss,
            'val/accuracy': val_acc,
            'epoch': epoch
        })
        
        # Save the best model
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            torch.save(model.state_dict(), 'best_attention_cnn.pth')
            print(f'Model saved with validation accuracy: {val_acc:.2f}%')
    
    return model

In [None]:

def evaluate_model(model, data_loader, criterion):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for inputs, labels in tqdm(data_loader, desc='Evaluating'):
            inputs, labels = inputs.to(device), labels.to(device)
            
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            
            running_loss += loss.item() * inputs.size(0)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    avg_loss = running_loss / len(data_loader.dataset)
    accuracy = 100. * correct / len(data_loader.dataset)
    
    print(f'Validation Loss: {avg_loss:.4f}, Accuracy: {accuracy:.2f}%')
    
    # Log confusion matrix
    cm = confusion_matrix(all_labels, all_preds)
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.title('Confusion Matrix')
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')
    wandb.log({"confusion_matrix": wandb.Image(plt)})
    plt.close()
    
    # Log classification report
    class_report = classification_report(all_labels, all_preds, output_dict=True)
    wandb.log({"classification_report": class_report})
    
    return avg_loss, accuracy


In [None]:
# Load data
print("Loading data...")
train_df = pd.read_csv('../train.csv')

# Load train/val indices (assuming they're saved from previous experiments)
try:
    train_indices = np.load('../train_indices.npy')
    val_indices = np.load('../val_indices.npy')
except FileNotFoundError:
    # If indices files don't exist, create them
    from sklearn.model_selection import train_test_split
    train_indices, val_indices = train_test_split(
        np.arange(len(train_df)), 
        test_size=0.2, 
        random_state=CONFIG['random_seed'],
        stratify=train_df['emotion']
    )
    np.save('../train_indices.npy', train_indices)
    np.save('../val_indices.npy', val_indices)

# Create datasets
train_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.Normalize(mean=[0.5], std=[0.5])
])

val_transform = transforms.Compose([
    transforms.Normalize(mean=[0.5], std=[0.5])
])

train_dataset = FERDataset(train_df, train_indices, transform=train_transform)
val_dataset = FERDataset(train_df, val_indices, transform=val_transform)

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=CONFIG['batch_size'], 
                        shuffle=True, num_workers=4, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=CONFIG['batch_size'], 
                        shuffle=False, num_workers=4, pin_memory=True)


In [None]:

# Initialize model, loss function, and optimizer
model = AttentionCNN(num_classes=CONFIG['num_classes'], 
                    dropout_rate=CONFIG['dropout_rate']).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), 
                        lr=CONFIG['learning_rate'],
                        weight_decay=CONFIG['weight_decay'])

# Log model architecture to wandb
wandb.watch(model, log='all')


In [None]:

# Train the model
print("Starting training...")
model = train_model(model, train_loader, val_loader, criterion, optimizer, 
                    num_epochs=CONFIG['epochs'])

# Final evaluation
print("Final evaluation...")
evaluate_model(model, val_loader, criterion)

# Save the final model
torch.save(model.state_dict(), 'final_attention_cnn.pth')
wandb.save('*.pth')

print("Training completed!")