In [2]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import transforms, datasets
import splitfolders
import kagglehub
from torch.optim.lr_scheduler import ReduceLROnPlateau
import numpy as np
import time

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
# Check if CUDA is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


In [4]:
TRAINING_DIR = '../images/train'
VALIDATION_DIR = '../images/validation'

In [5]:
# Data transformations
train_transforms = transforms.Compose([
    transforms.Grayscale(num_output_channels=1),  # Convert to grayscale
    transforms.Resize((56, 56)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(20),
    transforms.RandomAffine(0, translate=(0.1, 0.1)),
    transforms.ToTensor(),
    transforms.Normalize([0.5], [0.5])  # Normalize to [-1, 1]
])

val_transforms = transforms.Compose([
    transforms.Grayscale(num_output_channels=1),  # Convert to grayscale
    transforms.Resize((56, 56)),
    transforms.ToTensor(),
    transforms.Normalize([0.5], [0.5])  # Normalize to [-1, 1]
])

In [6]:
# Load datasets
train_dataset = datasets.ImageFolder(root=TRAINING_DIR, transform=train_transforms)
val_dataset = datasets.ImageFolder(root=VALIDATION_DIR, transform=val_transforms)

In [7]:
# Create data loaders
batch_size = 64
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=4)

print(f"Number of training samples: {len(train_dataset)}")
print(f"Number of validation samples: {len(val_dataset)}")
print(f"Classes: {train_dataset.classes}")

Number of training samples: 56656
Number of validation samples: 6955
Classes: ['angry', 'fear', 'happy', 'neutral', 'sad', 'surprise']


In [8]:
# Define the model architecture
class FacialExpressionModel(nn.Module):
    def __init__(self, num_classes=7):
        super(FacialExpressionModel, self).__init__()
        
        # First Convolutional Block
        self.conv1 = nn.Sequential(
            nn.Conv2d(1, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Dropout(0.25)
        )
        
        # Second Convolutional Block
        self.conv2 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=5, padding=2),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Dropout(0.25)
        )
        
        # Third Convolutional Block
        self.conv3 = nn.Sequential(
            nn.Conv2d(128, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Dropout(0.25)
        )
        
        # Fourth Convolutional Block
        self.conv4 = nn.Sequential(
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Dropout(0.25)
        )
        
        # Calculate the flattened size
        self.flat_features = 512 * 3 * 3
        
        # First Fully Connected Layer
        self.fc1 = nn.Sequential(
            nn.Linear(self.flat_features, 256),
            nn.BatchNorm1d(256),
            nn.ReLU(),
            nn.Dropout(0.25)
        )
        
        # Second Fully Connected Layer
        self.fc2 = nn.Sequential(
            nn.Linear(256, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.Dropout(0.25)
        )
        
        # Output Layer
        self.output = nn.Linear(512, num_classes)
        
    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = x.reshape(-1, self.flat_features)
        x = self.fc1(x)
        x = self.fc2(x)
        x = self.output(x)
        return x

In [9]:
model = FacialExpressionModel().to(device)
print(model)

FacialExpressionModel(
  (conv1): Sequential(
    (0): Conv2d(1, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (4): Dropout(p=0.25, inplace=False)
  )
  (conv2): Sequential(
    (0): Conv2d(64, 128, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
    (1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (4): Dropout(p=0.25, inplace=False)
  )
  (conv3): Sequential(
    (0): Conv2d(128, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (4): Dropout(p=0.25,

In [10]:
# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)
scheduler = ReduceLROnPlateau(optimizer, mode='max', factor=0.1, patience=10, verbose=True)
num_epochs = 1



In [11]:
# Function to save the model
def save_checkpoint(state, filename="model_weights.pth"):
    torch.save(state, filename)
    print(f"Checkpoint saved to {filename}")

In [12]:
def train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs):
    best_val_acc = 0.0

    for epoch in range(num_epochs):
        start_time = time.time()  # ⏱️ Start timing
        
        # Training phase
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * inputs.size(0)
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()

        train_loss = running_loss / len(train_loader.dataset)
        train_acc = 100. * correct / total

        # Validation phase
        model.eval()
        val_loss = 0.0
        correct = 0
        total = 0

        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)

                val_loss += loss.item() * inputs.size(0)
                _, predicted = outputs.max(1)
                total += labels.size(0)
                correct += predicted.eq(labels).sum().item()

        val_loss = val_loss / len(val_loader.dataset)
        val_acc = 100. * correct / total

        # Update learning rate
        scheduler.step(val_acc)

        end_time = time.time()  # ⏱️ End timing
        epoch_duration = end_time - start_time

        print(f'Epoch {epoch+1}/{num_epochs} '
              f'(Duration: {epoch_duration:.2f}s) | '
              f'Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.2f}% | '
              f'Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.2f}%')

        # Save best model
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            save_checkpoint({
                'epoch': epoch + 1,
                'state_dict': model.state_dict(),
                'best_val_acc': best_val_acc,
                'optimizer': optimizer.state_dict(),
            })

        if val_acc >= 83.0 and train_acc >= 83.0:
            print(f"Reached target accuracy. Stopping training at epoch {epoch+1}")
            break

In [13]:
# Train the model
train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs)

KeyboardInterrupt: 

In [None]:
# Load the best model and evaluate
def load_best_model(model):
    checkpoint = torch.load('model_weights.pth')
    model.load_state_dict(checkpoint['state_dict'])
    print(f"Loaded best model from epoch {checkpoint['epoch']} with validation accuracy: {checkpoint['best_val_acc']:.2f}%")
    return model

In [None]:
model = load_best_model(model)

Loaded best model from epoch 150 with validation accuracy: 82.57%


In [None]:
# Final evaluation
model.eval()
correct = 0
total = 0

class_correct = list(0. for i in range(7))
class_total = list(0. for i in range(7))

In [None]:
with torch.no_grad():
    for inputs, labels in val_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        
        # Per-class accuracy
        c = (predicted == labels).squeeze()
        for i in range(len(labels)):
            label = labels[i]
            class_correct[label] += c[i].item()
            class_total[label] += 1

print(f'Final Accuracy on the validation set: {100 * correct / total:.2f}%')

# Print per-class accuracy
for i in range(len(train_dataset.classes)):
    print(f'Accuracy of {train_dataset.classes[i]}: {100 * class_correct[i] / class_total[i]:.2f}%')

Final Accuracy on the validation set: 82.57%
Accuracy of angry: 74.69%
Accuracy of fear: 68.27%
Accuracy of happy: 94.96%
Accuracy of neutral: 83.72%
Accuracy of sad: 75.68%
Accuracy of surprise: 90.09%
