# Importing Required Libraries

In [None]:
import os
import shutil
import time
import cv2
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import tensorflow as tf
import tensorflow.lite as tflite

from tensorflow.keras.preprocessing.image import ImageDataGenerator, load_img
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization
from tensorflow.keras.applications import VGG16, ResNet50, MobileNetV2
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.regularizers import l2
from tensorflow.keras.mixed_precision import set_global_policy
from tensorflow.keras.models import load_model
from tensorflow.keras.preprocessing import image

import mediapipe as mp

from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report, roc_auc_score, roc_curve, precision_recall_curve

# AffectNet Dataset

The AffectNet dataset can be found at this link: https://www.kaggle.com/datasets/noamsegal/affectnet-training-data

# Github Repository

The Github repository of this code and project can be found here: https://github.com/gerlislab/fom-facial-expression-recognition-project

# Functions

In [None]:

# ===========================
# Data Preprocessing and Augmentation
# ===========================

def create_data_generators():
    """
    Creates and configures the data generators for training and testing.
    """
    train_datagen = ImageDataGenerator(
        rescale=1./255,
        rotation_range=20,
        width_shift_range=0.2,
        height_shift_range=0.2,
        shear_range=0.2,
        zoom_range=0.2,
        horizontal_flip=True,
        brightness_range=[0.7, 1.3],
        fill_mode='nearest'
    )

    test_datagen = ImageDataGenerator(rescale=1./255)

    train_generator = train_datagen.flow_from_directory(
        os.path.join(processed_images_path, "train"),
        target_size=img_size,
        batch_size=batch_size,
        class_mode='categorical',
        shuffle=True
    )

    test_generator = test_datagen.flow_from_directory(
        os.path.join(processed_images_path, "test"),
        target_size=img_size,
        batch_size=batch_size,
        class_mode='categorical',
        shuffle=False
    )
    
    return train_generator, test_generator

# ===========================
# Custom CNN Architectures
# ===========================

def create_custom_cnn():
    """
    Creates a basic custom CNN model.
    """
    model = Sequential([
        Conv2D(64, (3,3), activation='relu', input_shape=(img_size[0], img_size[1], 3)),
        MaxPooling2D((2,2)),
        Conv2D(128, (3,3), activation='relu'),
        MaxPooling2D((2,2)),
        Conv2D(256, (3,3), activation='relu'),
        MaxPooling2D((2,2)),
        Flatten(),
        Dense(512, activation='relu'),
        Dropout(0.5),
        Dense(train_generator.num_classes, activation='softmax')
    ])
    
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    return model

def create_custom_cnn_v2():
    """
    Creates an enhanced custom CNN with batch normalization and deeper architecture.
    """
    model = Sequential([
        Conv2D(64, (3,3), activation='relu', input_shape=(img_size[0], img_size[1], 3)),
        BatchNormalization(),
        MaxPooling2D((2,2)),

        Conv2D(128, (3,3), activation='relu'),
        BatchNormalization(),
        MaxPooling2D((2,2)),

        Conv2D(256, (3,3), activation='relu'),
        BatchNormalization(),
        MaxPooling2D((2,2)),

        Conv2D(512, (3,3), activation='relu'),
        BatchNormalization(),
        MaxPooling2D((2,2)),

        Conv2D(1024, (3,3), activation='relu'),
        BatchNormalization(),
        MaxPooling2D((2,2)),

        Flatten(),
        Dense(1024, activation='relu', kernel_regularizer=l2(0.001)),
        Dropout(0.3),
        Dense(train_generator.num_classes, activation='softmax')
    ])
    
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4), loss='categorical_crossentropy', metrics=['accuracy'])
    return model

# ===========================
# Pretrained Model Architectures
# ===========================

def create_vgg16_model():
    """
    Creates a VGG16-based model with a trainable feature extractor.
    """
    base_model = VGG16(weights='imagenet', include_top=False, input_shape=(img_size[0], img_size[1], 3))
    base_model.trainable = True  # Enable training for all layers

    model = Sequential([
        base_model,
        Flatten(),
        Dense(512, activation='relu'),
        Dropout(0.5),
        Dense(train_generator.num_classes, activation='softmax')
    ])
    
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    return model

def create_resnet50_model():
    """
    Creates a ResNet50-based model with a trainable feature extractor.
    """
    base_model = ResNet50(weights='imagenet', include_top=False, input_shape=(img_size[0], img_size[1], 3))
    base_model.trainable = True  # Enable training for all layers

    model = Sequential([
        base_model,
        Flatten(),
        Dense(512, activation='relu'),
        Dropout(0.5),
        Dense(train_generator.num_classes, activation='softmax')
    ])
    
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    return model

def create_mobilenetv2_model():
    """
    Creates a MobileNetV2-based model with a trainable feature extractor.
    """
    base_model = MobileNetV2(weights='imagenet', include_top=False, input_shape=(img_size[0], img_size[1], 3))
    base_model.trainable = True  # Enable training for all layers

    model = Sequential([
        base_model,
        Flatten(),
        Dense(512, activation='relu'),
        Dropout(0.5),
        Dense(train_generator.num_classes, activation='softmax')
    ])
    
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    return model

def create_resnet50_model_v2():
    """
    Creates a ResNet50-based model with full fine-tuning enabled.
    """
    base_model = ResNet50(weights='imagenet', include_top=False, input_shape=(img_size[0], img_size[1], 3))
    
    # Make all layers trainable
    for layer in base_model.layers:
        layer.trainable = True
    
    model = Sequential([
        base_model,
        Flatten(),
        BatchNormalization(),
        Dense(512, activation='relu', kernel_regularizer=l2(0.001)),
        Dropout(0.5),
        Dense(train_generator.num_classes, activation='softmax')
    ])
    
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-5), loss='categorical_crossentropy', metrics=['accuracy'])
    return model


# ===========================
# Model Evaluation and Performance Analysis
# ===========================

def measure_inference_time(model, model_name, sample_batch=10):
    """
    Measures the average inference time per image for real-time deployment.
    
    Parameters:
    - model: The trained model to evaluate.
    - model_name: A string representing the model name.
    - sample_batch: Number of images to use for inference measurement.
    
    Returns:
    - avg_time: The average time per image in seconds.
    """
    test_images, _ = next(test_generator)
    test_images = test_images[:sample_batch]  # Test only a subset of images
    
    start_time = time.time()
    model.predict(test_images)
    end_time = time.time()
    
    avg_time = (end_time - start_time) / sample_batch
    print(f"Average inference time per image ({model_name}): {avg_time:.4f} seconds")
    return avg_time

def plot_confusion_matrix(model, model_name):
    """
    Computes and visualizes the confusion matrix for model evaluation.
    
    Parameters:
    - model: The trained model to evaluate.
    - model_name: A string representing the model name.
    """
    y_true = test_generator.classes
    y_pred = model.predict(test_generator)
    y_pred_classes = np.argmax(y_pred, axis=1)
    class_labels = list(test_generator.class_indices.keys())

    cm = confusion_matrix(y_true, y_pred_classes)

    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_labels, yticklabels=class_labels)
    plt.xlabel('Predicted Labels')
    plt.ylabel('True Labels')
    plt.title(f'Confusion Matrix - {model_name}')
    plt.show()

    print(f"Classification Report for {model_name}:")
    print(classification_report(y_true, y_pred_classes, target_names=class_labels))

def plot_roc_pr_curves(model, model_name):
    """
    Plots the ROC-AUC and Precision-Recall curves for model evaluation.
    
    Parameters:
    - model: The trained model to evaluate.
    - model_name: A string representing the model name.
    """
    y_true = test_generator.classes
    y_pred = model.predict(test_generator)
    y_pred_prob = y_pred  # Softmax outputs probabilities

    n_classes = len(test_generator.class_indices)
    fpr = {}
    tpr = {}
    roc_auc = {}
    precision = {}
    recall = {}

    plt.figure(figsize=(12, 5))

    # ROC Curve
    plt.subplot(1, 2, 1)
    for i in range(n_classes):
        fpr[i], tpr[i], _ = roc_curve(y_true == i, y_pred_prob[:, i])
        roc_auc[i] = roc_auc_score(y_true == i, y_pred_prob[:, i])
        plt.plot(fpr[i], tpr[i], label=f'Class {i} (AUC = {roc_auc[i]:.2f})')

    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title(f'ROC Curve - {model_name}')
    plt.legend()

    # Precision-Recall Curve
    plt.subplot(1, 2, 2)
    for i in range(n_classes):
        precision[i], recall[i], _ = precision_recall_curve(y_true == i, y_pred_prob[:, i])
        plt.plot(recall[i], precision[i], label=f'Class {i}')

    plt.xlabel('Recall')
    plt.ylabel('Precision')
    plt.title(f'Precision-Recall Curve - {model_name}')
    plt.legend()

    plt.show()

def plot_training_history(history_dict):
    """
    Plots training and validation accuracy/loss over epochs for model evaluation.
    
    Parameters:
    - history_dict: A dictionary containing model training histories.
    """
    plt.figure(figsize=(12, 5))

    # Accuracy plot
    plt.subplot(1, 2, 1)
    for name, history in history_dict.items():
        plt.plot(history.history['accuracy'], label=f'{name} Train')
        plt.plot(history.history['val_accuracy'], label=f'{name} Val')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.title('Training & Validation Accuracy')

    # Loss plot
    plt.subplot(1, 2, 2)
    for name, history in history_dict.items():
        plt.plot(history.history['loss'], label=f'{name} Train')
        plt.plot(history.history['val_loss'], label=f'{name} Val')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.title('Training & Validation Loss')

    plt.show()


# Load CSV and Sort Images into Folders

In [None]:
# Load the CSV file containing image labels
csv_path = "labels.csv"  # Path to the labels file
image_base_path = os.getcwd()  # Base path for image directory

df = pd.read_csv(csv_path)

# Define target directory for sorted images
sorted_images_path = os.path.join(image_base_path, "sorted_images")
os.makedirs(sorted_images_path, exist_ok=True)

# Iterate over the dataset and move images to corresponding label directories
for index, row in df.iterrows():
    img_path = os.path.join(image_base_path, row["pth"])  # Original image path
    label = row["label"]  # Extract the correct label
    
    # Define target directory for the label
    target_dir = os.path.join(sorted_images_path, label)
    os.makedirs(target_dir, exist_ok=True)
    
    # Define the new image path in the sorted directory
    new_img_path = os.path.join(target_dir, os.path.basename(row["pth"]))
    
    # Move image if it exists
    if os.path.exists(img_path):
        shutil.move(img_path, new_img_path)
        print(f"Moved: {img_path} -> {new_img_path}")
    else:
        print(f"Missing: {img_path}")

print("Image sorting completed!")

# Count and Visualize Sorted Images per Label

In [None]:
# Path to the sorted images directory
sorted_images_path = "sorted_images"

# Initialize a dictionary to store image counts for each label
label_counts = {}

# Iterate through all labels (subdirectories)
for label in os.listdir(sorted_images_path):
    label_path = os.path.join(sorted_images_path, label)
    if os.path.isdir(label_path):  # Ensure it's a directory
        label_counts[label] = len(os.listdir(label_path))  # Count images

# Print image counts per label
for label, count in label_counts.items():
    print(f"{label}: {count} images")

# ===========================
# Plot Image Distribution per Emotion
# ===========================

import matplotlib.pyplot as plt

# Create a bar chart showing the number of images per emotion
plt.bar(label_counts.keys(), label_counts.values())
plt.xlabel("Emotion")
plt.ylabel("Number of Images")
plt.title("Distribution of Images per Emotion")
plt.xticks(rotation=45)
plt.show()


# Data Augmentation for Better Generalization

In [None]:
# Define augmentation parameters
augmentation = ImageDataGenerator(
    rotation_range=15,
    width_shift_range=0.1,
    height_shift_range=0.1,
    shear_range=0.1,
    zoom_range=0.2,
    horizontal_flip=True,
    brightness_range=[0.8, 1.2],
    fill_mode='nearest'
)

# Target number of images per class
target_count = 3000
sorted_images_path = "sorted_images"  # Ensure this path is correct

# Iterate through all labels (classes)
for label in os.listdir(sorted_images_path):
    label_path = os.path.join(sorted_images_path, label)
    images = os.listdir(label_path)
    current_count = len(images)
    
    if current_count < target_count:
        images = np.array(images)  # Optimize for fast sampling
        print(f"Generating {target_count - current_count} additional images for {label}...")

        while len(os.listdir(label_path)) < target_count:
            img_name = np.random.choice(images)  # Randomly select an image
            img_path = os.path.join(label_path, img_name)

            image = cv2.imread(img_path)
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            image = np.expand_dims(image, axis=0)

            # Generate augmented image
            aug_iter = augmentation.flow(image, batch_size=1)
            aug_image = next(aug_iter)[0].astype(np.uint8)

            # Save the new augmented image
            new_img_name = f"aug_{len(os.listdir(label_path))}_{img_name}"
            cv2.imwrite(os.path.join(label_path, new_img_name), cv2.cvtColor(aug_image, cv2.COLOR_RGB2BGR))

        print(f"{label} now has {target_count} images!")
    else:
        print(f"{label} already has sufficient images ({current_count}). No augmentation needed.")

print("Data augmentation completed!")

# Normalize and Split Images into Train/Test Sets

In [None]:
# Paths for sorted and processed images
sorted_images_path = "sorted_images"
processed_images_path = "processed_images"

# Target image size (modify if needed)
img_size = (224, 224)

# Ensure the processed images directory exists
os.makedirs(processed_images_path, exist_ok=True)

# Train-test split ratio
train_ratio = 0.8

# Iterate over all labels (classes) in the sorted dataset
for label in os.listdir(sorted_images_path):
    label_path = os.path.join(sorted_images_path, label)
    images = os.listdir(label_path)

    # Split images into training and testing sets
    train_images, test_images = train_test_split(images, train_size=train_ratio, random_state=42)
    
    # Process and save images in respective folders (train/test)
    for dataset, dataset_name in zip([train_images, test_images], ["train", "test"]):
        target_dir = os.path.join(processed_images_path, dataset_name, label)
        os.makedirs(target_dir, exist_ok=True)
        
        for img_name in dataset:
            img_path = os.path.join(label_path, img_name)
            img = cv2.imread(img_path)
            img = cv2.resize(img, img_size)
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

            # Save the processed image
            new_img_path = os.path.join(target_dir, img_name)
            cv2.imwrite(new_img_path, img)

print("Images have been normalized and split into Train/Test sets!")

# Load Processed Images with Data Generators

In [None]:
# Path to processed images
processed_images_path = "processed_images"

# Image size and batch size
img_size = (224, 224)
batch_size = 32

# Create data generators for training and testing
train_datagen = ImageDataGenerator(rescale=1./255)
test_datagen = ImageDataGenerator(rescale=1./255)

# Training data generator
train_generator = train_datagen.flow_from_directory(
    os.path.join(processed_images_path, "train"),
    target_size=img_size,
    batch_size=batch_size,
    class_mode='categorical',
    shuffle=True  # Shuffle training data for better generalization
)

# Testing data generator
test_generator = test_datagen.flow_from_directory(
    os.path.join(processed_images_path, "test"),
    target_size=img_size,
    batch_size=batch_size,
    class_mode='categorical',
    shuffle=False  # No shuffling for test data to maintain order
)

print("Data successfully loaded in Tensor format!")

# GPU Configuration & Mixed Precision Training

In [None]:
# Check for available GPUs and enable memory growth
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        print("GPU memory growth enabled!")
    except RuntimeError as e:
        print(e)

# Enable mixed precision training (FP16 instead of FP32 for better performance)
set_global_policy('mixed_float16')

# ===========================
# Initialize and Train Models
# ===========================

# Initialize models
models = {
    "Custom CNN": create_custom_cnn(),
    "VGG16": create_vgg16_model(),
    "ResNet50": create_resnet50_model(),
    "MobileNetV2": create_mobilenetv2_model()
}

# Dictionary to store training histories
history_dict = {}

# Number of epochs
epochs = 10

# Train all models
for name, model in models.items():
    print(f"Starting training for {name} model...")
    history = model.fit(train_generator, validation_data=test_generator, epochs=epochs)
    history_dict[name] = history
    print(f"{name} model training completed!")

print("All models successfully trained and ready for comparison!")

# Initialize optimized Models

In [None]:
# ===========================
# Load Data Generators
# ===========================

train_generator, test_generator = create_data_generators()

# ===========================
# Callbacks for Early Stopping & Learning Rate Reduction
# ===========================

from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau

# Early Stopping: Stops training if validation loss does not improve for 4 epochs
early_stopping = EarlyStopping(monitor='val_loss', patience=4, restore_best_weights=True)

# Learning Rate Reduction: Reduces LR by factor of 0.5 if validation loss stagnates for 2 epochs
lr_reduction = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=2, min_lr=1e-6)

# ===========================
# Initialize Models
# ===========================

models = {
    "Custom CNN v2": create_custom_cnn_v2(),
    "ResNet50 v2": create_resnet50_model_v2(),
}

# ===========================
# Train Models with Callbacks
# ===========================

# Dictionary to store training histories
history_dict = {}

# Number of epochs (increased due to Early Stopping)
epochs = 30  

for name, model in models.items():
    print(f"Starting training for {name} model...")
    history = model.fit(
        train_generator,
        validation_data=test_generator,
        epochs=epochs,
        callbacks=[early_stopping, lr_reduction]  # Apply early stopping and learning rate reduction
    )
    history_dict[name] = history
    print(f"{name} model training completed!")

print("All models successfully trained and ready for comparison!")

# Analyze Training & Validation Performance

In [None]:

# Plot training and validation accuracy/loss for all trained models
plot_training_history(history_dict)

print("Training and validation accuracy analyzed!")

# Display Confusion Matrix for Each Model

In [None]:
# Iterate through all trained models and plot their confusion matrices
for name, model in models.items():
    plot_confusion_matrix(model, name)

print("Confusion matrices created and analyzed!")

# Display ROC & Precision-Recall Curves for Each Model

In [None]:
# Iterate through all trained models and plot their ROC-AUC and Precision-Recall curves
for name, model in models.items():
    plot_roc_pr_curves(model, name)

print("ROC-AUC & Precision-Recall analysis completed!")

# Measure Inference Time for All Models

In [None]:
# Dictionary to store inference times for each model
inference_times = {}

# Iterate through all trained models and measure their inference time
for name, model in models.items():
    inference_times[name] = measure_inference_time(model, name)

# ===========================
# Plot Model Inference Time Comparison
# ===========================

import matplotlib.pyplot as plt

plt.figure(figsize=(8, 5))
plt.bar(inference_times.keys(), inference_times.values())
plt.xlabel("Model")
plt.ylabel("Average Inference Time (Seconds)")
plt.title("Model Runtime Analysis")
plt.show()

print("Runtime analysis completed, and models optimized for real-time performance!")

# Save Trained Models

In [None]:
# Save the Custom CNN v2 model
models["Custom CNN v2"].save("custom_cnn_v2.h5")

# Save the ResNet50 v2 model
models["ResNet50 v2"].save("resnet50_v2.h5")

print("Models successfully saved as .h5 files!")

# Load Trained Models

In [None]:

# Load the Custom CNN v2 model
custom_cnn_model = load_model("custom_cnn_v2.h5")

# Load the ResNet50 v2 model
resnet50_model = load_model("resnet50_v2.h5")

print("Models successfully loaded!")

# Deploy Custom Model to Real Time Analysis

In [None]:
# Initialize Mediapipe Face Detection
mp_face_detection = mp.solutions.face_detection
face_detection = mp_face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.5)

# Create a folder to save images
save_folder = "takenPictures"
os.makedirs(save_folder, exist_ok=True)

# Open webcam
cap = cv2.VideoCapture(0)

last_capture_time = time.time()
capture_interval = 0.5  # Capture every 0.5 seconds

emotion_labels = ["Angry", "Happy", "Disgust", "Fear", "Contempt", "Neutral", "Sad", "Surprise"]
emotion_results = {label: 0 for label in emotion_labels}

while True:
    ret, frame = cap.read()
    if not ret:
        print("Failed to grab frame")
        break

    # Convert frame to RGB
    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

    # Detect faces
    results = face_detection.process(rgb_frame)

    if results.detections:
        # Get the first detected face
        detection = results.detections[0]
        bboxC = detection.location_data.relative_bounding_box

        h, w, _ = frame.shape
        x, y, w_box, h_box = int(bboxC.xmin * w), int(bboxC.ymin * h), int(bboxC.width * w), int(bboxC.height * h)

        # Extract the face
        face_image = frame[y:y + h_box, x:x + w_box]

        # Resize the face to (224,224,3)
        face_resized = cv2.resize(face_image, (224, 224))

        # Save the image every 0.5 seconds
        if time.time() - last_capture_time > capture_interval:
            file_path = os.path.join(save_folder, f"face_image.jpg")
            cv2.imwrite(file_path, face_resized)
            last_capture_time = time.time()
            img = image.load_img(file_path, target_size=(224, 224)) 
            img_array = image.img_to_array(img)
            img_array = np.expand_dims(img_array, axis=0)
            datagen = ImageDataGenerator(rescale=1./255)
            img_stand = datagen.standardize(img_array)
            
            pred = custom_cnn_model.predict(img_stand)[0] * 100

            emotion_results = {label: value + pred[idx] for idx, label in enumerate(emotion_labels)}

    # Draw emotion results on the right side
    black_overlay = np.zeros((frame.shape[0], 300, 3), dtype=np.uint8)  # Black rectangle for text
    start_y = 50
    font = cv2.FONT_HERSHEY_SIMPLEX
    font_scale = 1
    font_thickness = 2
    text_white = (255, 255, 255) 
    text_green = (0, 255, 0)  

    sorted_emotions = sorted(emotion_results.items(), key=lambda x: x[1], reverse=True)

    for idx, (emotion, value) in enumerate(sorted_emotions):
        text_color = text_green if idx == 0 else text_white
        text = f"{emotion}: {value:.2f}%"
        cv2.putText(black_overlay, text, (20, start_y), font, font_scale, text_color, font_thickness)
        start_y += 40

    frame = np.hstack((frame, black_overlay))

    cv2.imshow("Emotion Recognition - Press 'q' to exit", frame)

    if cv2.waitKey(1) & 0xFF == ord("q"):
        break

# Release resources
cap.release()
cv2.destroyAllWindows()
cv2.waitKey(1) 