In [1]:
# Facial Emotion Recognition Model - Optimized for Snapdragon X Elite
# =======================================================================
# Configuration: MOBILENET with optimized hyperparameters
# Target: <1 hour training time with maximum accuracy

import sys
import numpy as np 
import pandas as pd
import matplotlib.pyplot as plt
import os
import cv2
from PIL import Image, UnidentifiedImageError

# Import TensorFlow and configure for optimal performance
import tensorflow as tf
print(f"TensorFlow version: {tf.__version__}")
print(f"Python executable: {sys.executable}")

import seaborn as sns



# Import deep learning libraries
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import (Flatten, Dropout, Dense, Input, 
                                     GlobalAveragePooling2D, Conv2D, 
                                     BatchNormalization, Activation, MaxPooling2D)
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.applications import MobileNetV2, EfficientNetB0


TensorFlow version: 2.19.1
Python executable: c:\Users\marty\anaconda3\envs\tf311_env\python.exe


In [2]:
# Auto-detect folder path
if sys.executable.startswith('/anaconda/envs/azureml_py38_PT_TF/bin/python'):
    folder_path = "Users/martyn.frank/IATD_Deeplearning/Project/data/images/"
else:
    folder_path = 'data/images/'
    
print(f"Data folder path: {folder_path}")


Data folder path: data/images/


In [3]:

# =======================================================================
# Configuration Parameters
# =======================================================================


import yaml

def load_cfg(path="config.yaml"):
    with open(path, "r") as f:
        data = yaml.safe_load(f)
    return data

cfg = load_cfg("config.yaml")

# optionally, map top-level training params from cfg
picture_size   = cfg.get("picture_size", 48)
batch_size     = cfg.get("batch_size", 128)
epochs         = cfg.get("epochs", 30)
no_of_classes  = cfg.get("no_of_classes", 7)
learning_rate  = cfg.get("learning_rate", 1e-4)




# Enable mixed precision for faster training on Snapdragon X Elite
from tensorflow.keras import mixed_precision

# Precision Swicth
if cfg["precision"] == "mixed":
    mixed_precision.set_global_policy('mixed_float16')
else:
    mixed_precision.set_global_policy('float32')
print("Mixed precision training enabled - expect 2-3x speedup!")




Mixed precision training enabled - expect 2-3x speedup!


In [4]:

# =======================================================================
# Image Validation and Cleaning
# =======================================================================

def is_image_valid(filepath):
    """Validate image integrity."""
    try:
        with Image.open(filepath) as img:
            img.verify()
        return True
    except (UnidentifiedImageError, OSError):
        return False

def delete_if_corrupt(filepath):
    """Delete corrupted images."""
    try:
        with Image.open(filepath) as img:
            img.verify()
        return False
    except (UnidentifiedImageError, OSError):
        print(f"Deleting corrupted image: {filepath}")
        os.remove(filepath)
        return True


In [5]:

# =======================================================================
# Advanced Image Preprocessing
# =======================================================================

def preprocess_image(image):
    """
    Apply histogram equalization for better feature extraction.
    Improves contrast and enhances facial features.
    """
    # Convert to uint8 if needed
    if image.dtype != np.uint8:
        image = (image * 255).astype(np.uint8)
    
    # Apply CLAHE (Contrast Limited Adaptive Histogram Equalization)
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
    if len(image.shape) == 2:  # Grayscale
        enhanced = clahe.apply(image)
    else:  # If RGB, convert to grayscale first
        gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
        enhanced = clahe.apply(gray)
    
    # Normalize to [0, 1]
    return enhanced.astype(np.float32) / 255.0

class PreprocessingImageDataGenerator(ImageDataGenerator):
    """Custom generator with advanced preprocessing."""
    
    def __init__(self, *args, preprocessing_function=None, **kwargs):
        super().__init__(*args, preprocessing_function=preprocessing_function, **kwargs)
    
    def standardize(self, x):
        x = super().standardize(x)
        # Apply additional preprocessing
        return preprocess_image(x)


In [6]:
print(os.listdir(folder_path + "validation/happy/"))


['10019.jpg', '10023.jpg', '10074.jpg', '10096.jpg', '10106.jpg', '10126.jpg', '10138.jpg', '10141.jpg', '1020.jpg', '10218.jpg', '10237.jpg', '10248.jpg', '10257.jpg', '1027.jpg', '10273.jpg', '10276.jpg', '10312.jpg', '10317.jpg', '10344.jpg', '10362.jpg', '10367.jpg', '10370.jpg', '10432.jpg', '10456.jpg', '10467.jpg', '10468.jpg', '10480.jpg', '10528.jpg', '10540.jpg', '10552.jpg', '1056.jpg', '10571.jpg', '1058.jpg', '10622.jpg', '10638.jpg', '10640.jpg', '10644.jpg', '10647.jpg', '10683.jpg', '10703.jpg', '1074.jpg', '10770.jpg', '10773.jpg', '10792.jpg', '1081.jpg', '10811.jpg', '10841.jpg', '10879.jpg', '10903.jpg', '10935.jpg', '10956.jpg', '10992.jpg', '11019.jpg', '1111.jpg', '11150.jpg', '11208.jpg', '1121.jpg', '11236.jpg', '1130.jpg', '11345.jpg', '11349.jpg', '11392.jpg', '1141.jpg', '11500.jpg', '11508.jpg', '11516.jpg', '1152.jpg', '11530.jpg', '11549.jpg', '11552.jpg', '11558.jpg', '11573.jpg', '11575.jpg', '11588.jpg', '1159.jpg', '11594.jpg', '11604.jpg', '11614.jpg

In [7]:

# =======================================================================
# Data Augmentation and Loading
# =======================================================================

aug_map = {
    "none": dict(rescale=1./255),
    "light": dict(rescale=1./255, rotation_range=10, width_shift_range=0.1,
                  height_shift_range=0.1, zoom_range=0.1, horizontal_flip=True),
    "strong": dict(rescale=1./255, rotation_range=20, width_shift_range=0.15,
                   height_shift_range=0.15, shear_range=0.15, zoom_range=0.15,
                   brightness_range=[0.8,1.2], horizontal_flip=True)
}

# Training data generator with standard augmentation
datagen_train = ImageDataGenerator(**aug_map[cfg["aug_level"]])

datagen_validation = ImageDataGenerator(rescale=1./255)                                        

# Create training set
train_set = datagen_train.flow_from_directory(
    os.path.join(folder_path, "train"),
    target_size=(picture_size, picture_size),
    color_mode='grayscale',
    batch_size=batch_size,
    class_mode='categorical',
    shuffle=True,
)

# Create validation set
validation_set = datagen_validation.flow_from_directory(
    os.path.join(folder_path, "validation"),
    target_size=(picture_size, picture_size),
    color_mode='grayscale',
    batch_size=batch_size,
    class_mode='categorical',
    shuffle=False,  # stable evaluation
)


print(f"Training samples: {train_set.n}")
print(f"Validation samples: {validation_set.n}")
print(f"Class indices: {train_set.class_indices}")


Found 28821 images belonging to 7 classes.
Found 7066 images belonging to 7 classes.
Training samples: 28821
Validation samples: 7066
Class indices: {'angry': 0, 'disgust': 1, 'fear': 2, 'happy': 3, 'neutral': 4, 'sad': 5, 'surprise': 6}


In [8]:

# =======================================================================
# Model Architecture - MobileNetV2 (Transfer Learning)
# =======================================================================
# Build model (one path only)
inputs = Input(shape=(48, 48, 1))
rgb = tf.keras.layers.Concatenate()([inputs, inputs, inputs])  # gray->RGB
# =========== Backbone switch =============

if cfg["backbone"] == "mobilenet_v2":
    base = MobileNetV2(weights='imagenet', include_top=False, input_shape=(48,48,3))
elif cfg["backbone"] == "efficientnet_b0":
    base = EfficientNetB0(weights='imagenet', include_top=False, input_shape=(48,48,3))
elif cfg["backbone"] == "custom_cnn":
    # define your own small CNN
    y = layers.Conv2D(32,3,activation='relu',padding='same')(x)
    y = layers.MaxPooling2D()(y)
    y = layers.Conv2D(64,3,activation='relu',padding='same')(y)
    y = layers.GlobalAveragePooling2D()(y)
    outputs = layers.Dense(7, activation='softmax', dtype='float32')(y)
    model = Model(inputs, outputs)
    base = None
else:
    # simple custom CNN path
    x = Conv2D(32, 3, activation='relu', padding='same')(rgb)
    x = MaxPooling2D()(x)
    x = Conv2D(64, 3, activation='relu', padding='same')(x)
    x = GlobalAveragePooling2D()(x)
    outputs = Dense(no_of_classes, activation='softmax', dtype='float32')(x)
    model = Model(inputs, outputs)
    base_model = None

if base is not None:
    base.trainable = False  # initial freeze
    y = base(x, training=False)
    y = layers.GlobalAveragePooling2D()(y)
    y = layers.Dense(256, activation='relu')(y)
    y = layers.Dropout(0.5)(y)
    outputs = layers.Dense(7, activation='softmax', dtype='float32')(y)
    model = Model(inputs, outputs)

# Using Transfer Learning with MobileNetV2 (Transfer Learning)

# Load pre-trained base model
base_model = MobileNetV2(
    weights='imagenet',
    include_top=False,
    input_shape=(48, 48, 3)  # RGB input for transfer learning
)

# Freeze base model layers initially
base_model.trainable = False

# Build complete model
inputs = Input(shape=(48, 48, 1))

# Convert grayscale to RGB by repeating channels
x = tf.keras.layers.Concatenate()([inputs, inputs, inputs])

# Pass through base model
x = base_model(x, training=False)

# Add custom classification head
x = GlobalAveragePooling2D()(x)
x = Dense(256, activation='relu')(x)
x = BatchNormalization()(x)
x = Dropout(0.5)(x)
x = Dense(128, activation='relu')(x)
x = BatchNormalization()(x)
x = Dropout(0.3)(x)
outputs = Dense(no_of_classes, activation='softmax')(x)

model = Model(inputs, outputs)

print(f"Using {base_model.name} as feature extractor")
print(f"Total parameters: {model.count_params():,}")

# Compile the model

opt_name = cfg["optimizer"]
lr = cfg["lr"]

if opt_name == "adam":
    opt = tf.keras.optimizers.Adam(learning_rate=learning_rate)
elif opt_name == "sgd":
    opt = tf.keras.optimizers.SGD(learning_rate=learning_rate, momentum=0.9, nesterov=True)
elif opt_name == "adamw":
    opt = tf.keras.optimizers.AdamW(learning_rate=learning_rate, weight_decay=1e-4)
else:
    raise ValueError("Unknown optimizer")


model.compile(
    optimizer=opt,
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

model.summary()


NameError: name 'layers' is not defined

In [None]:
# Test data pipeline
print("\nüîç Testing data pipeline...")
sample_batch, sample_labels = next(iter(train_set))
print(f"Batch shape: {sample_batch.shape}")
print(f"Batch dtype: {sample_batch.dtype}")
print(f"Batch range: [{sample_batch.min():.3f}, {sample_batch.max():.3f}]")
print(f"Labels shape: {sample_labels.shape}")

# Test model prediction with error handling
try:
    test_pred = model.predict(sample_batch[:1], verbose=0)
    print(f"Prediction shape: {test_pred.shape}")
    print(f"Prediction values: {test_pred[0]}")
    print("‚úÖ Data pipeline working!\n")
except Exception as e:
    print(f"‚ùå Prediction failed: {e}")
    import traceback
    traceback.print_exc()

In [None]:

# =======================================================================
# Training Callbacks
# =======================================================================

checkpoint = ModelCheckpoint(
    filepath="best_model.keras",
    monitor='val_accuracy',
    verbose=1,
    save_best_only=True,
    mode='max'
)

early_stopping = EarlyStopping(
    monitor='val_loss',
    min_delta=0,
    patience=5,  # Increased patience
    verbose=1,
    restore_best_weights=True
)

reduce_learningrate = ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.2,
    patience=3,
    verbose=1,
    min_delta=0.0001,
    min_lr=0.00001
)

callbacks_list = [checkpoint, early_stopping, reduce_learningrate]


In [None]:

# =======================================================================
# Model Training
# =======================================================================

print("Starting training...")
print(f"Target: {epochs} epochs with early stopping")
print(f"Batch size: {batch_size}")
print(f"Learning rate: {learning_rate}")
print("Mixed precision: ENABLED")

history = model.fit(
    train_set,
    steps_per_epoch=train_set.n // train_set.batch_size,
    epochs=epochs,
    validation_data=validation_set,
    validation_steps=validation_set.n // validation_set.batch_size,
    callbacks=callbacks_list,
    verbose=1
)
print("‚úÖ Model training complete!")

In [None]:
# ---- Phase 2: fine-tune (switch applied here) ----
def apply_freeze_strategy(base_model, strategy="partial", n_layers=40):
    base_model.trainable = True
    for layer in base_model.layers[:-n_layers]:
        layer.trainable = False
    for layer in base_model.layers[-n_layers:]:
        if isinstance(layer, tf.keras.layers.BatchNormalization):
            layer.trainable = False  # keep BN frozen

apply_freeze_strategy(base_model, strategy="partial", n_layers=40)

# Recompile with lower LR
model.compile(optimizer=Adam(1e-5), loss='categorical_crossentropy', metrics=['accuracy'])

# Optional: shorter patience for fine-tuning
history_ft = model.fit(train_set, validation_data=validation_set, epochs=10, callbacks=callbacks_list)

In [None]:
# =======================================================================
# Save (versioned)
# =======================================================================
import re, os, json

def sanitize(s: str) -> str:
    # keep alnum and a few safe chars
    return re.sub(r'[^a-zA-Z0-9._-]+', '', str(s))

def cfg_tag(cfg: dict, keys=('backbone','aug_level','optimizer','lr','precision','freeze','fine_tune_layers')):
    # build compact tag like: b-mbv2_aug-strong_opt-adam_lr-1e-4_mp-true_ft-part40
    kmap = {
        'backbone': 'b',
        'aug_level': 'aug',
        'optimizer': 'opt',
        'lr': 'lr',
        'precision': 'prec',
        'freeze': 'ft',
        'fine_tune_layers': 'n',
    }
    parts = []
    for k in keys:
        if k in cfg:
            v = cfg[k]
            if k == 'backbone' and v == 'mobilenet_v2':
                v = 'mbv2'
            parts.append(f"{kmap.get(k,k)}-{sanitize(v)}")
    return "_".join(parts)

def save_model_versioned(model, base_name='emotion_model', extension='.keras',
                         directory='.', save_weights=True, cfg=None):
    tag = cfg_tag(cfg) if cfg else None
    name_with_tag = f"{base_name}_{tag}" if tag else base_name

    existing = [f for f in os.listdir(directory)
                if re.match(rf'{re.escape(name_with_tag)}_v\d+{re.escape(extension)}$', f)]
    if existing:
        vers = [int(re.search(rf'_v(\d+){re.escape(extension)}$', f).group(1)) for f in existing]
        next_v = max(vers) + 1
    else:
        next_v = 1

    model_filename = f"{name_with_tag}_v{next_v}{extension}"
    model_path = os.path.join(directory, model_filename)
    model.save(model_path)
    print(f"üíæ Saved model: {model_path}")

    paths = {'model': model_path}

    if save_weights:
        weights_filename = f"{name_with_tag}_v{next_v}.weights.h5"
        weights_path = os.path.join(directory, weights_filename)
        model.save_weights(weights_path)
        print(f"üíæ Saved weights: {weights_path}")
        paths['weights'] = weights_path

    # Optional: also save full cfg as JSON alongside
    if cfg:
        cfg_filename = f"{name_with_tag}_v{next_v}.cfg.json"
        with open(os.path.join(directory, cfg_filename), 'w') as f:
            json.dump(cfg, f, indent=2)
        paths['cfg'] = os.path.join(directory, cfg_filename)

    return paths


In [None]:

# =======================================================================
# Training Visualization
# =======================================================================

plt.style.use('dark_background')
plt.figure(figsize=(14, 5))

plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='Training Loss', linewidth=2)
plt.plot(history.history['val_loss'], label='Validation Loss', linewidth=2)
plt.title('Model Loss', fontsize=14)
plt.ylabel('Loss', fontsize=12)
plt.xlabel('Epoch', fontsize=12)
plt.legend()
plt.grid(alpha=0.3)

plt.subplot(1, 2, 2)
plt.plot(history.history['accuracy'], label='Training Accuracy', linewidth=2)
plt.plot(history.history['val_accuracy'], label='Validation Accuracy', linewidth=2)
plt.title('Model Accuracy', fontsize=14)
plt.ylabel('Accuracy', fontsize=12)
plt.xlabel('Epoch', fontsize=12)
plt.legend()
plt.grid(alpha=0.3)

plt.tight_layout()
plt.savefig('training_history.png', dpi=150, bbox_inches='tight')
plt.show()


In [None]:

# =======================================================================
# Model Evaluation
# =======================================================================

print("\n" + "="*70)
train_loss, train_acc = model.evaluate(train_set, verbose=0)
test_loss, test_acc = model.evaluate(validation_set, verbose=0)

print(f"üìä FINAL RESULTS")
print(f"Training Accuracy: {train_acc*100:.2f}%")
print(f"Validation Accuracy: {test_acc*100:.2f}%")
print(f"Training Loss: {train_loss:.4f}")
print(f"Validation Loss: {test_loss:.4f}")
print("="*70)


In [None]:

# =======================================================================
# Detailed Performance Analysis
# =======================================================================

from sklearn.metrics import confusion_matrix, classification_report
from keras.models import load_model

class_labels = ['Angry', 'Disgust', 'Fear', 'Happy', 'Neutral', 'Sad', 'Surprise']

# Load best model
my_model = load_model('best_model.keras', compile=False)

# Generate predictions on validation set
print("Generating predictions on validation set...")
predictions = []
true_labels = []

for i in range(len(validation_set)):
    batch_images, batch_labels = validation_set[i]
    batch_predictions = my_model.predict(batch_images, verbose=0)
    predictions.extend(np.argmax(batch_predictions, axis=1))
    true_labels.extend(np.argmax(batch_labels, axis=1))
    if i >= validation_set.n // test_set.batch_size:
        break

predictions = np.array(predictions)
true_labels = np.array(true_labels)

# Confusion Matrix
cm = confusion_matrix(true_labels, predictions)

plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt='d', cmap='rocket', 
            xticklabels=class_labels, 
            yticklabels=class_labels,
            cbar_kws={'label': 'Count'})
plt.title('Confusion Matrix - Facial Emotion Recognition', fontsize=16, pad=20)
plt.xlabel('Predicted Emotion', fontsize=12)
plt.ylabel('True Emotion', fontsize=12)
plt.xticks(rotation=45, ha='right')
plt.yticks(rotation=0)
plt.tight_layout()
plt.savefig('confusion_matrix.png', dpi=150, bbox_inches='tight')
plt.show()

# Classification Report
print("\n" + "="*70)
print("üìà DETAILED CLASSIFICATION METRICS")
print("="*70)
print(classification_report(true_labels, predictions, target_names=class_labels))


In [None]:

# =======================================================================
# Export to ONNX for Snapdragon Optimization
# =======================================================================

try:
    import tf2onnx
    import onnx
    
    print("\n" + "="*70)
    print("üöÄ Exporting model to ONNX format for Snapdragon optimization...")
    
    # Convert to ONNX
    spec = (tf.TensorSpec((None, 48, 48, 1), tf.float32, name="input"),)
    output_path = "emotion_model.onnx"
    
    model_proto, _ = tf2onnx.convert.from_keras(model, input_signature=spec, opset=13, output_path=output_path)
    
    print(f"‚úÖ Model exported to {output_path}")
    print("This model is optimized for Snapdragon X Elite inference!")
    print("="*70)
    
except ImportError:
    print("‚ö†Ô∏è tf2onnx not installed. Run: pip install tf2onnx")
except Exception as e:
    print(f"‚ö†Ô∏è ONNX export failed: {e}")


In [None]:


print("\nüéâ Training complete! Your optimized model is ready.")
print(f"‚è±Ô∏è Expected training time: 30-50 minutes on Snapdragon X Elite")
print(paths)