In [1]:
import os
import numpy as np
import pandas as pd
from PIL import Image
from tqdm import tqdm
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

from typing import List, Optional, Tuple

In [2]:
TRAIN_DIR = 'images/train'
TEST_DIR = 'images/test'
IMAGE_SIZE = 96
BATCH_SIZE = 64
NUM_EPOCHS = 30
LEARNING_RATE = 0.0016
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(DEVICE)

EMOTIONS = ['angry', 'disgust', 'fear', 'happy', 'neutral', 'sad', 'surprise']

cuda


In [3]:
class EmotionDataset(Dataset):
	"""
	Custom PyTorch Dataset for loading facial emotion images.

	Each sample consists of:
	- A grayscale image loaded from a file path.
	- An associated emotion label.

	Args:
		image_paths (List[str]): List of file paths to the images.
		labels (List[int]): List of corresponding emotion labels.
		transform (Optional[callable], optional): Optional transformation to be applied on an image.
	"""

	def __init__(self, image_paths: List[str], labels: List[int], transform: Optional[callable] = None) -> None:
		self.image_paths = image_paths
		self.labels = labels
		self.transform = transform

	def __len__(self) -> int:
		"""
		Returns the total number of samples in the dataset.

		Returns:
			int: Number of samples.
		"""
		return len(self.image_paths)

	def __getitem__(self, idx: int) -> Tuple[torch.Tensor, int]:
		"""
		Retrieves the image and label at the specified index.

		Args:
			idx (int): Index of the sample to retrieve.

		Returns:
			Tuple[torch.Tensor, int]: A tuple containing the image tensor and its label.
		"""
		# Load image in grayscale mode
		img_path: str = self.image_paths[idx]
		image: Image.Image = Image.open(img_path).convert('L')

		# Apply transformations if provided
		if self.transform:
			image = self.transform(image)

		label: int = self.labels[idx]

		return image, label

In [4]:

def create_dataframe(dir: str) -> Tuple[List[str], List[str]]:
	"""
	Scans a directory structure and creates lists of image file paths and their corresponding labels.

	Expected directory structure:
		dir/
			|-- label_1/
			|     |-- img1.jpg
			|     |-- img2.jpg
			|-- label_2/
			|     |-- img3.jpg
			|     |-- img4.jpg

	Args:
		dir (str): Path to the root directory containing subdirectories for each label.

	Returns:
		Tuple[List[str], List[str]]: 
			- List of full image paths.
			- List of corresponding labels (as strings).
	"""
	image_paths: List[str] = []
	labels: List[str] = []

	
	for label in os.listdir(dir):
		label_dir: str = os.path.join(dir, label)
		if os.path.isdir(label_dir):  
			for image_name in os.listdir(label_dir):
				# Add full image path and corresponding label
				image_paths.append(os.path.join(label_dir, image_name))
				labels.append(label)
			print(f"{label} completed")

	return image_paths, labels

In [5]:
def get_label_idx(label_name: str) -> int:
	"""
	Returns the index of a given emotion label based on the EMOTIONS list.

	Args:
		label_name (str): Name of the emotion (e.g., 'happy', 'sad').

	Returns:
		int: Corresponding index of the emotion in the EMOTIONS list.
	"""
	return EMOTIONS.index(label_name)

In [6]:
transform = transforms.Compose([
    transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5])
])

In [7]:
def create_datasets() -> Tuple[DataLoader, DataLoader]:
	"""
	Creates training and testing datasets and their corresponding DataLoaders.

	Process:
	- Scans training and testing directories to retrieve image paths and labels.
	- Converts label names to numeric indices.
	- Applies transformations to images.
	- Prepares DataLoaders for training and testing.

	Returns:
		Tuple[DataLoader, DataLoader]: 
			- DataLoader for the training dataset.
			- DataLoader for the testing dataset.
	"""
	# Get training data
	train_image_paths, train_label_names = create_dataframe(TRAIN_DIR)
	train_labels = [get_label_idx(label) for label in train_label_names]

	# Get testing data
	test_image_paths, test_label_names = create_dataframe(TEST_DIR)
	test_labels = [get_label_idx(label) for label in test_label_names]

	# Create Dataset instances
	train_dataset = EmotionDataset(train_image_paths, train_labels, transform)
	test_dataset = EmotionDataset(test_image_paths, test_labels, transform)

	# Create DataLoaders
	train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
	test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=True)

	return train_loader, test_loader

In [8]:
class EmotionCNN(nn.Module):
	"""
	A Convolutional Neural Network (CNN) for emotion classification from grayscale facial images.

	Architecture Overview:
	- 3 convolutional blocks (Conv2D + ReLU + MaxPooling + Dropout)
	- Fully connected layers for feature integration and final classification.

	Args:
		num_classes (int): Number of output classes (default: 7).
	"""

	def __init__(self, num_classes: int = 7) -> None:
		super(EmotionCNN, self).__init__()

		# First convolutional block
		self.conv1: nn.Conv2d = nn.Conv2d(in_channels=1, out_channels=128, kernel_size=3, padding=1)
		self.pool1: nn.MaxPool2d = nn.MaxPool2d(kernel_size=2, stride=2)
		self.dropout1: nn.Dropout = nn.Dropout(0.2)

		# Second convolutional block
		self.conv2: nn.Conv2d = nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, padding=1)
		self.pool2: nn.MaxPool2d = nn.MaxPool2d(kernel_size=2, stride=2)
		self.dropout2: nn.Dropout = nn.Dropout(0.2)

		# Third convolutional block
		self.conv3: nn.Conv2d = nn.Conv2d(in_channels=256, out_channels=512, kernel_size=3, padding=1)
		self.pool3: nn.MaxPool2d = nn.MaxPool2d(kernel_size=2, stride=2)
		self.dropout3: nn.Dropout = nn.Dropout(0.2)

		# Calculate flattened feature size
		# After 3 pooling layers (stride=2), input size reduces by a factor of 8
		self.flat_features: int = 512 * (IMAGE_SIZE // 8) * (IMAGE_SIZE // 8)

		# Fully connected layers
		self.fc1: nn.Linear = nn.Linear(self.flat_features, 512)
		self.dropout4: nn.Dropout = nn.Dropout(0.2)
		self.fc2: nn.Linear = nn.Linear(512, 256)
		self.dropout5: nn.Dropout = nn.Dropout(0.2)
		self.fc3: nn.Linear = nn.Linear(256, num_classes)

	def forward(self, x: torch.Tensor) -> torch.Tensor:
		"""
		Defines the forward pass of the network.

		Args:
			x (torch.Tensor): Input tensor of shape (batch_size, 1, 96, 96).

		Returns:
			torch.Tensor: Output tensor of shape (batch_size, num_classes).
		"""
		# First convolutional block
		x = F.relu(self.conv1(x))
		x = self.pool1(x)
		x = self.dropout1(x)

		# Second convolutional block
		x = F.relu(self.conv2(x))
		x = self.pool2(x)
		x = self.dropout2(x)

		# Third convolutional block
		x = F.relu(self.conv3(x))
		x = self.pool3(x)
		x = self.dropout3(x)

		# Flatten the tensor
		x = x.view(-1, self.flat_features)

		# Fully connected layers
		x = F.relu(self.fc1(x))
		x = self.dropout4(x)
		x = F.relu(self.fc2(x))
		x = self.dropout5(x)
		x = self.fc3(x)  

		return x

In [9]:
def train_model(
	model: nn.Module,
	train_loader: DataLoader,
	test_loader: DataLoader,
	criterion: nn.Module,
	optimizer: torch.optim.Optimizer,
	num_epochs: int
) -> None:
	"""
	Trains and validates the model over a specified number of epochs.

	Args:
		model (nn.Module): The neural network model to be trained.
		train_loader (DataLoader): DataLoader for the training dataset.
		test_loader (DataLoader): DataLoader for the validation/testing dataset.
		criterion (nn.Module): Loss function to optimize.
		optimizer (torch.optim.Optimizer): Optimizer for updating model weights.
		num_epochs (int): Number of epochs to train the model.

	Returns:
		None
	"""
	best_accuracy: float = 0.0

	for epoch in range(num_epochs):
		# Training phase
		model.train()
		running_loss: float = 0.0
		correct: int = 0
		total: int = 0

		for inputs, labels in tqdm(train_loader, desc=f'Epoch {epoch + 1}/{num_epochs}'):
			inputs = inputs.to(DEVICE)
			labels = labels.to(DEVICE)

			
			optimizer.zero_grad()

			
			outputs = model(inputs)
			loss = criterion(outputs, labels)

			
			loss.backward()
			optimizer.step()

			# Update training statistics
			running_loss += loss.item() * inputs.size(0)
			_, predicted = torch.max(outputs, 1)
			total += labels.size(0)
			correct += (predicted == labels).sum().item()

		epoch_loss: float = running_loss / len(train_loader.dataset)
		epoch_acc: float = correct / total

		# Validation phase
		model.eval()
		val_loss: float = 0.0
		val_correct: int = 0
		val_total: int = 0

		with torch.no_grad():
			for inputs, labels in tqdm(test_loader, desc='Validation'):
				inputs = inputs.to(DEVICE)
				labels = labels.to(DEVICE)

				outputs = model(inputs)
				loss = criterion(outputs, labels)

				val_loss += loss.item() * inputs.size(0)
				_, predicted = torch.max(outputs, 1)
				val_total += labels.size(0)
				val_correct += (predicted == labels).sum().item()

		val_loss = val_loss / len(test_loader.dataset)
		val_acc = val_correct / val_total

		
		print(f'Epoch {epoch + 1}/{num_epochs}:')
		print(f'Train Loss: {epoch_loss:.4f} | Train Acc: {epoch_acc:.4f}')
		print(f'Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.4f}')

		
		if val_acc > best_accuracy:
			best_accuracy = val_acc
			torch.save(model.state_dict(), 'emotion_model.pth')
			print(f'Model saved with accuracy: {best_accuracy:.4f}')

	print(f'Best validation accuracy: {best_accuracy:.4f}')

In [10]:
def predict_emotion(model: nn.Module, image_path: str) -> str:
	"""
	Predicts the emotion label for a given input image.

	Args:
		model (nn.Module): Trained emotion classification model.
		image_path (str): Path to the input image file.

	Returns:
		str: Predicted emotion label.
	"""
	model.eval()

	
	image: Image.Image = Image.open(image_path).convert('L')  # Convert to grayscale
	image = transform(image).unsqueeze(0).to(DEVICE)  # Apply transformations and add batch dimension

	# Perform prediction
	with torch.no_grad():
		outputs: torch.Tensor = model(image)
		_, predicted = torch.max(outputs, dim=1)

	return EMOTIONS[predicted.item()]

In [11]:
def main() -> None:
	"""
	Main function to train the model, evaluate it, and display predictions on sample images.

	Steps:
	- Initialize device, data loaders, model, loss function, and optimizer.
	- Train the model.
	- Load the best saved model.
	- Predict emotions for selected sample images and display results.
	
	Returns:
		None
	"""
	print(f"Using device: {DEVICE}")

	# Create data loaders
	train_loader, test_loader = create_datasets()

	# Initialize model
	model: EmotionCNN = EmotionCNN().to(DEVICE)

	# Define loss function and optimizer
	criterion: nn.Module = nn.CrossEntropyLoss()
	optimizer: optim.Optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

	# Train the model
	train_model(model, train_loader, test_loader, criterion, optimizer, NUM_EPOCHS)

	# Load best model weights
	model.load_state_dict(torch.load('emotion_model.pth', map_location=DEVICE))

	# Test some example images
	test_images: list = [
		'images/train/sad/42.jpg',
		'images/train/fear/2.jpg',
		'images/train/disgust/299.jpg',
		'images/train/happy/7.jpg',
		'images/train/surprise/15.jpg'
	]

	for image_path in test_images:
		true_label: str = image_path.split('/')[-2]
		predicted_label: str = predict_emotion(model, image_path)

		# Load and display the image
		img = Image.open(image_path).convert('L')
		plt.imshow(img, cmap='gray')
		plt.title(f'True: {true_label.capitalize()} | Predicted: {predicted_label.capitalize()}')
		plt.axis('off')
		plt.show()

		# Print the results
		print(f"Original label : {true_label.capitalize()}")
		print(f"Predicted label: {predicted_label.capitalize()}")
		print('-' * 50)

if __name__ == "__main__":
	main()

Using device: cuda
angry completed
disgust completed
fear completed
happy completed
neutral completed
sad completed
surprise completed
angry completed
disgust completed
fear completed
happy completed
neutral completed
sad completed
surprise completed


Epoch 1/30:   3%|▎         | 15/451 [00:02<01:13,  5.94it/s]


KeyboardInterrupt: 