In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms, datasets
from torch.optim.lr_scheduler import ReduceLROnPlateau

import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.preprocessing import LabelEncoder
import os
import time
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')

In [None]:
torch.manual_seed(42)
np.random.seed(42)
if torch.cuda.is_available():
    torch.cuda.manual_seed(42)

print("PyTorch Version:", torch.__version__)
print("CUDA Available:", torch.cuda.is_available())

IMG_HEIGHT = 96
IMG_WIDTH = 96
IMG_CHANNELS = 1
NUM_CLASSES = 7
BATCH_SIZE = 16
EPOCHS = 50
LEARNING_RATE = 0.0001

EMOTION_LABELS = ['angry', 'disgust', 'fear', 'happy', 'sad', 'surprise', 'neutral']

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

PyTorch Version: 2.7.1+cu118
CUDA Available: True
Using device: cuda


In [None]:
def get_data_transforms():
    train_transform = transforms.Compose([
        transforms.Grayscale(num_output_channels=1),
        transforms.Resize((IMG_HEIGHT, IMG_WIDTH)),
        transforms.RandomRotation(5),  
        transforms.RandomAffine(degrees=0, translate=(0.05, 0.05)),  
        transforms.RandomHorizontalFlip(p=0.3),  
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485], std=[0.229]) 
    ])
    
    val_transform = transforms.Compose([
        transforms.Grayscale(num_output_channels=1),
        transforms.Resize((IMG_HEIGHT, IMG_WIDTH)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485], std=[0.229])
    ])
    
    return train_transform, val_transform


def create_data_loaders(train_dir, val_dir, test_dir=None):
    train_transform, val_transform = get_data_transforms()    
    train_dataset = datasets.ImageFolder(train_dir, transform=train_transform)  
    val_dataset = datasets.ImageFolder(val_dir, transform=val_transform)
    
    train_loader = DataLoader(
        train_dataset,
        batch_size=BATCH_SIZE,
        shuffle=True,
        num_workers=4,
        pin_memory=True
    )
    
    val_loader = DataLoader(
        val_dataset,
        batch_size=BATCH_SIZE,
        shuffle=False,
        num_workers=4,
        pin_memory=True
    )
    
    test_loader = None
    if test_dir and os.path.exists(test_dir):
        test_dataset = datasets.ImageFolder(test_dir, transform=val_transform)
        test_loader = DataLoader(
            test_dataset,
            batch_size=BATCH_SIZE,
            shuffle=False,
            num_workers=4,
            pin_memory=True
        )
    
    return train_loader, val_loader, test_loader, train_dataset.classes

In [11]:
class EmotionCNN(nn.Module):    
    def __init__(self, num_classes=NUM_CLASSES):
        super(EmotionCNN, self).__init__()
        
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32, 32, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(32)
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.dropout1 = nn.Dropout2d(0.25)
        
        self.conv3 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(64)
        self.conv4 = nn.Conv2d(64, 64, kernel_size=3, padding=1)
        self.bn4 = nn.BatchNorm2d(64)
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.dropout2 = nn.Dropout2d(0.25)
        
        self.conv5 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.bn5 = nn.BatchNorm2d(128)
        self.conv6 = nn.Conv2d(128, 128, kernel_size=3, padding=1)
        self.bn6 = nn.BatchNorm2d(128)
        self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.dropout3 = nn.Dropout2d(0.3)
        
        self.conv7 = nn.Conv2d(128, 64, kernel_size=3, padding=1)
        self.bn7 = nn.BatchNorm2d(64)
        self.conv8 = nn.Conv2d(64, 64, kernel_size=3, padding=1)
        self.bn8 = nn.BatchNorm2d(64)
        
        self.conv9 = nn.Conv2d(64, 32, kernel_size=3, padding=1)
        self.bn9 = nn.BatchNorm2d(32)
        self.conv10 = nn.Conv2d(32, 16, kernel_size=3, padding=1)
        self.bn10 = nn.BatchNorm2d(16)
        self.pool4 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.dropout4 = nn.Dropout2d(0.3)
        
        self.global_avg_pool = nn.AdaptiveAvgPool2d((1, 1))
        
        self.fc1 = nn.Linear(16, 64)  
        self.dropout5 = nn.Dropout(0.4)
        self.fc2 = nn.Linear(64, num_classes)
        
        self._initialize_weights()
        
    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.constant_(m.bias, 0)

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = self.pool1(x)
        x = self.dropout1(x)
        
        x = F.relu(self.bn3(self.conv3(x)))
        x = F.relu(self.bn4(self.conv4(x)))
        x = self.pool2(x)
        x = self.dropout2(x)
        
        x = F.relu(self.bn5(self.conv5(x)))
        x = F.relu(self.bn6(self.conv6(x)))
        x = self.pool3(x)
        x = self.dropout3(x)
        
        x = F.relu(self.bn7(self.conv7(x)))
        x = F.relu(self.bn8(self.conv8(x)))
        
        x = F.relu(self.bn9(self.conv9(x)))
        x = F.relu(self.bn10(self.conv10(x)))
        x = self.pool4(x)
        x = self.dropout4(x)
        
        x = self.global_avg_pool(x)
        x = x.view(x.size(0), -1)
        
        x = F.relu(self.fc1(x))
        x = self.dropout5(x)
        x = self.fc2(x)
        
        return x  

In [None]:
def train_epoch(model, train_loader, criterion, optimizer, device): 
    model.train()   
    running_loss = 0.0
    correct_predictions = 0
    total_samples = 0

    train_bar = tqdm(train_loader, desc="Training")
    
    for data, target in train_bar:
        data, target = data.to(device), target.to(device)
        
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        _, predicted = torch.max(output.data, 1)
        total_samples += target.size(0)
        correct_predictions += (predicted == target).sum().item()
        
        train_bar.set_postfix({
            'Loss': f'{loss.item():.4f}',
            'Acc': f'{100.*correct_predictions/total_samples:.2f}%'
        })
    
    epoch_loss = running_loss / len(train_loader)
    epoch_acc = correct_predictions / total_samples
    
    return epoch_loss, epoch_acc

def validate_epoch(model, val_loader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct_predictions = 0
    total_samples = 0   
    
    with torch.no_grad():
        val_bar = tqdm(val_loader, desc="Validation")
        
        for data, target in val_bar:
            data, target = data.to(device), target.to(device)
            
            output = model(data)
            loss = criterion(output, target)
            
            running_loss += loss.item()
            _, predicted = torch.max(output.data, 1)
            total_samples += target.size(0)
            correct_predictions += (predicted == target).sum().item()
            
            val_bar.set_postfix({
                'Loss': f'{loss.item():.4f}',
                'Acc': f'{100.*correct_predictions/total_samples:.2f}%'
            })
    
    epoch_loss = running_loss / len(val_loader)
    epoch_acc = correct_predictions / total_samples
    
    return epoch_loss, epoch_acc

def train_model(model, train_loader, val_loader, epochs=EPOCHS):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE, weight_decay=1e-4)  
    scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, 
                                patience=3, min_lr=1e-6)
    
    history = {
        'train_loss': [],
        'train_acc': [],
        'val_loss': [],
        'val_acc': []
    }
    
    best_val_acc = 0.0
    patience_counter = 0
    patience = 10
    
    print(f"Starting training for {epochs} epochs...")
    print(f"Training batches: {len(train_loader)}")
    print(f"Validation batches: {len(val_loader)}")
    
    for epoch in range(epochs):
        print(f"\nEpoch {epoch+1}/{epochs}")
        print("-" * 50)
        
        train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
        
        val_loss, val_acc = validate_epoch(model, val_loader, criterion, device)
        
        scheduler.step(val_loss)
        
        history['train_loss'].append(train_loss)
        history['train_acc'].append(train_acc)
        history['val_loss'].append(val_loss)
        history['val_acc'].append(val_acc)
        
        print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}")
        print(f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")
        
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            patience_counter = 0
            torch.save(model.state_dict(), 'best_emotion_model.pth')
            print(f"New best validation accuracy: {best_val_acc:.4f} - Model saved!")
        else:
            patience_counter += 1
            
        if patience_counter >= patience:
            print(f"Early stopping triggered after {epoch+1} epochs")
            break
    
    model.load_state_dict(torch.load('best_emotion_model.pth'))
    print(f"Best model loaded with validation accuracy: {best_val_acc:.4f}")
    
    return history

def evaluate_model(model, test_loader, class_labels=EMOTION_LABELS):
    if test_loader is None:
        print("No test data available for evaluation.")
        return
    
    print("\n" + "="*50)
    print("MODEL EVALUATION")
    print("="*50)
    
    model.eval()
    all_predictions = []
    all_targets = []
    correct = 0
    total = 0
    
    with torch.no_grad():
        test_bar = tqdm(test_loader, desc="Testing")
        
        for data, target in test_bar:
            data, target = data.to(device), target.to(device)
            output = model(data)
            
            _, predicted = torch.max(output.data, 1)
            
            all_predictions.extend(predicted.cpu().numpy())
            all_targets.extend(target.cpu().numpy())
            
            total += target.size(0)
            correct += (predicted == target).sum().item()
    
    test_accuracy = correct / total
    print(f"\nTest Accuracy: {test_accuracy:.4f} ({test_accuracy*100:.2f}%)")    
    print("\nDetailed Classification Report:")
    print("-" * 50)
    report = classification_report(
        all_targets, 
        all_predictions, 
        target_names=class_labels,
        digits=4
    )
    print(report)
    
    print("\nConfusion Matrix:")
    print("-" * 30)
    cm = confusion_matrix(all_targets, all_predictions)
    print(cm)
    
    return test_accuracy, report

def plot_training_history(history):  
    fig, axes = plt.subplots(1, 2, figsize=(12, 4))
    
    epochs = range(1, len(history['train_accuracy']) + 1)
    
    axes[0].plot(epochs, history['train_acc'], 'b-', label='Training Accuracy')
    axes[0].plot(epochs, history['val_acc'], 'r-', label='Validation Accuracy')
    axes[0].set_title('Model Accuracy')
    axes[0].set_xlabel('Epoch')
    axes[0].set_ylabel('Accuracy')
    axes[0].legend()
    axes[0].grid(True)
    
    axes[1].plot(epochs, history['train_loss'], 'b-', label='Training Loss')
    axes[1].plot(epochs, history['val_loss'], 'r-', label='Validation Loss')
    axes[1].set_title('Model Loss')
    axes[1].set_xlabel('Epoch')
    axes[1].set_ylabel('Loss')
    axes[1].legend()
    axes[1].grid(True)
    
    plt.tight_layout()
    plt.show()

def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)


In [7]:
def main():    
    print("="*60)
    print("EMOTION DETECTION CNN MODEL - PyTorch")
    print("="*60)
    print(f"Input Shape: ({IMG_CHANNELS}, {IMG_HEIGHT}, {IMG_WIDTH})")
    print(f"Number of Classes: {NUM_CLASSES}")
    print(f"Classes: {', '.join(EMOTION_LABELS)}")
    print("="*60)
    train_dir = '../data/images/train'
    val_dir = '../data/images/validation'
    test_dir = '../data/images/test'

    if not os.path.exists(train_dir):
        print(f"Warning: Training directory '{train_dir}' not found.")
        return
    
    print("\n1. Creating data loaders with augmentation...")
    train_loader, val_loader, test_loader, class_names = create_data_loaders(
        train_dir, val_dir, test_dir
    )
    
    print(f"Found {len(train_loader.dataset)} training images")
    print(f"Found {len(val_loader.dataset)} validation images")
    if test_loader:
        print(f"Found {len(test_loader.dataset)} test images")
    print(f"Classes: {class_names}")
    
    print("\n2. Building CNN model...")
    model = EmotionCNN(num_classes=NUM_CLASSES).to(device)
    
    print("\n3. Model Information:")
    print(f"Total trainable parameters: {count_parameters(model):,}")
    print(f"Model device: {next(model.parameters()).device}")
    
    print("\n4. Model Architecture:")
    print(model)
    
    print("\n5. Training model...")
    history = train_model(model, train_loader, val_loader, epochs=EPOCHS)
    
    print("\n6. Plotting training history...")
    plot_training_history(history)
    
    if test_loader:
        print("\n7. Evaluating model on test set...")
        test_accuracy, report = evaluate_model(model, test_loader, class_names)
    else:
        print("\n7. Evaluating model on validation set...")
        test_accuracy, report = evaluate_model(model, val_loader, class_names)
    
    model_save_path = 'emotion_detection_model.pth'
    torch.save({
        'model_state_dict': model.state_dict(),
        'model_architecture': model,
        'class_names': class_names,
        'num_classes': NUM_CLASSES,
    }, model_save_path)
    print(f"\n8. Model saved as '{model_save_path}'")
    
    print("\n" + "="*60)
    print("TRAINING COMPLETED SUCCESSFULLY!")
    print("="*60)

In [10]:
main()

EMOTION DETECTION CNN MODEL - PyTorch
Input Shape: (1, 48, 48)
Number of Classes: 7
Classes: angry, disgust, fear, happy, sad, surprise, neutral

1. Creating data loaders with augmentation...
Found 28821 training images
Found 7066 validation images
Classes: ['angry', 'disgust', 'fear', 'happy', 'neutral', 'sad', 'surprise']

2. Building CNN model...

3. Model Information:
Total trainable parameters: 66,423
Model device: cuda:0

4. Model Architecture:
EmotionCNN(
  (conv1): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv2): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (pool1): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (dropout1): Dropout2d(p=0.25, inplace=False)
  (conv3): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv4): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (leaky_relu1): LeakyReLU(negative_slope=0.5)
  (conv5): Conv2d(32, 32, kernel_size=(3, 3), 

Training: 100%|██████████| 901/901 [01:11<00:00, 12.56it/s, Loss=1.9273, Acc=24.65%]
Validation: 100%|██████████| 221/221 [00:27<00:00,  7.99it/s, Loss=2.1654, Acc=25.83%]


Train Loss: 1.9179, Train Acc: 0.2465
Val Loss: 1.9074, Val Acc: 0.2583
New best validation accuracy: 0.2583 - Model saved!

Epoch 2/50
--------------------------------------------------


Training: 100%|██████████| 901/901 [00:22<00:00, 39.50it/s, Loss=1.9273, Acc=24.86%] 
Validation: 100%|██████████| 221/221 [00:15<00:00, 13.84it/s, Loss=2.1654, Acc=25.83%]


Train Loss: 1.9169, Train Acc: 0.2486
Val Loss: 1.9074, Val Acc: 0.2583

Epoch 3/50
--------------------------------------------------


Training: 100%|██████████| 901/901 [00:23<00:00, 39.01it/s, Loss=2.0226, Acc=24.86%] 
Validation: 100%|██████████| 221/221 [00:16<00:00, 13.68it/s, Loss=2.1654, Acc=25.83%]


Train Loss: 1.9169, Train Acc: 0.2486
Val Loss: 1.9074, Val Acc: 0.2583

Epoch 4/50
--------------------------------------------------


Training: 100%|██████████| 901/901 [00:22<00:00, 39.52it/s, Loss=1.8321, Acc=24.86%] 
Validation: 100%|██████████| 221/221 [00:16<00:00, 13.65it/s, Loss=2.1654, Acc=25.83%] 


Train Loss: 1.9168, Train Acc: 0.2486
Val Loss: 1.9074, Val Acc: 0.2583

Epoch 5/50
--------------------------------------------------


Training: 100%|██████████| 901/901 [00:23<00:00, 37.62it/s, Loss=1.9749, Acc=24.86%] 
Validation: 100%|██████████| 221/221 [00:28<00:00,  7.77it/s, Loss=2.1654, Acc=25.83%]


Train Loss: 1.9169, Train Acc: 0.2486
Val Loss: 1.9074, Val Acc: 0.2583

Epoch 6/50
--------------------------------------------------


Training:  99%|█████████▊| 888/901 [01:00<00:00, 14.71it/s, Loss=1.9154, Acc=24.86%]


KeyboardInterrupt: 