In [None]:
import os
import cv2
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import tensorflow as tf
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input
from tensorflow.keras.models import Model
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, f1_score
from sklearn.utils.class_weight import compute_class_weight
import mediapipe as mp
import warnings
import gc

# Suppress warnings for cleaner output
warnings.filterwarnings('ignore')

# Configuration
RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)
tf.random.set_seed(RANDOM_SEED)

# Define paths and classes
BASE_DIR = "FER-2013"
TRAIN_DIR = os.path.join(BASE_DIR, "train")
TEST_DIR = os.path.join(BASE_DIR, "test")
CLASSES = ['angry', 'disgust', 'fear', 'happy', 'neutral', 'sad', 'surprise']

print("Libraries imported and configuration set.")

In [None]:
def load_data(data_dir):
    """Loads images (48x48 grayscale, converted to 3-channel BGR) and labels."""
    data = []
    labels = []
    label_map = {emotion: i for i, emotion in enumerate(CLASSES)}
    
    print(f"Loading data from: {data_dir}")
    
    for emotion in CLASSES:
        class_path = os.path.join(data_dir, emotion)
        if not os.path.exists(class_path):
            print(f"Warning: Path not found: {class_path}")
            continue
            
        for filename in os.listdir(class_path):
            if filename.endswith(('.jpg', '.png')):
                img_path = os.path.join(class_path, filename)
                # Read image as grayscale
                img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
                
                if img is not None and img.shape == (48, 48):
                    # Convert grayscale to 3 channels (required for MediaPipe/CNN backbones)
                    img_bgr = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR)
                    data.append(img_bgr)
                    labels.append(label_map[emotion])
    
    return np.array(data), np.array(labels)

# Load data
print("Loading training data...")
X_train_raw, y_train = load_data(TRAIN_DIR)
print("Loading test data...")
X_test_raw, y_test = load_data(TEST_DIR)

print("\n--- Train–Test split overview ---")
print(f"Train set shape: {X_train_raw.shape}, Labels shape: {y_train.shape}")
print(f"Test set shape: {X_test_raw.shape}, Labels shape: {y_test.shape}")

# Class distribution check
train_counts = pd.Series(y_train).value_counts().sort_index()
test_counts = pd.Series(y_test).value_counts().sort_index()
print("\nClass Counts (Train):", {CLASSES[i]: count for i, count in train_counts.items()})
print("Class Counts (Test):", {CLASSES[i]: count for i, count in test_counts.items()})

# Calculate class weights for handling imbalance
class_weights = compute_class_weight('balanced', classes=np.unique(y_train), y=y_train)
class_weight_dict = {i: weight for i, weight in enumerate(class_weights)}
print("\nClass Weights for handling imbalance:", class_weight_dict)

In [None]:
try:
    # Initialize MediaPipe components for Landmark extraction
    mp_face_mesh = mp.solutions.face_mesh
    mp_drawing = mp.solutions.drawing_utils

    # Indices for key points (used for normalization in Track A)
    LEFT_EYE_IDX = 133
    RIGHT_EYE_IDX = 362
    
    # Test MediaPipe initialization
    with mp_face_mesh.FaceMesh(static_image_mode=True, max_num_faces=1, min_detection_confidence=0.3) as test_face_mesh:
        test_result = test_face_mesh.process(np.ones((48, 48, 3), dtype=np.uint8))
    print("MediaPipe initialized successfully")
    
except Exception as e:
    print(f"MediaPipe initialization failed: {e}")
    print("Falling back to Haar Cascade for face detection")
    
    # Initialize Haar Cascade as fallback
    haar_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
    mp_face_mesh = None

In [None]:
# Milestone 1 - Face Detection & Visual Check
def detect_faces_mediapipe(image_bgr, face_mesh):
    """Detects faces using MediaPipe Face Mesh."""
    image_rgb = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2RGB)
    results = face_mesh.process(image_rgb)
    
    display_image = image_bgr.copy()
    face_box = None
    
    if results.multi_face_landmarks:
        landmarks = results.multi_face_landmarks[0]
        h, w, c = image_bgr.shape
        
        # Calculate bounding box from landmarks
        x_min, y_min = w, h
        x_max, y_max = 0, 0
        for landmark in landmarks.landmark:
            x, y = int(landmark.x * w), int(landmark.y * h)
            x_min = min(x_min, x)
            y_min = min(y_min, y)
            x_max = max(x_max, x)
            y_max = max(y_max, y)
            
        # Draw bounding box
        padding = 2
        x_min = max(0, x_min - padding)
        y_min = max(0, y_min - padding)
        x_max = min(w, x_max + padding)
        y_max = min(h, y_max + padding)
        
        cv2.rectangle(display_image, (x_min, y_min), (x_max, y_max), (0, 255, 0), 1)
        face_box = (x_min, y_min, x_max - x_min, y_max - y_min)
        
    return cv2.cvtColor(display_image, cv2.COLOR_BGR2RGB), face_box

def detect_faces_haar(image_bgr, cascade):
    """Detects faces using Haar Cascade as fallback."""
    gray = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2GRAY)
    faces = cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))
    
    display_image = image_bgr.copy()
    face_box = None
    
    if len(faces) > 0:
        # Use the largest face
        faces = sorted(faces, key=lambda x: x[2] * x[3], reverse=True)
        x, y, w, h = faces[0]
        cv2.rectangle(display_image, (x, y), (x + w, y + h), (0, 255, 0), 1)
        face_box = (x, y, w, h)
    
    return cv2.cvtColor(display_image, cv2.COLOR_BGR2RGB), face_box

def visualize_face_detection_samples(X_raw, y_raw, num_samples=8):
    """Visualize face detection results on samples."""
    label_names = {i: emotion for i, emotion in enumerate(CLASSES)}
    
    fig, axes = plt.subplots(2, 4, figsize=(16, 8))
    axes = axes.ravel()
    
    # Get diverse samples from different classes
    sample_indices = []
    for class_idx in range(len(CLASSES)):
        class_indices = np.where(y_raw == class_idx)[0]
        if len(class_indices) > 0:
            sample_indices.append(class_indices[0])
        if len(sample_indices) >= num_samples:
            break
    
    detection_count = 0
    
    if mp_face_mesh is not None:
        # Use MediaPipe
        with mp_face_mesh.FaceMesh(static_image_mode=True, max_num_faces=1, min_detection_confidence=0.3) as face_mesh:
            for idx, sample_idx in enumerate(sample_indices):
                if idx >= len(axes): 
                    break
                    
                image_bgr = X_raw[sample_idx]
                emotion = label_names[y_raw[sample_idx]]
                
                display_image_rgb, face_box = detect_faces_mediapipe(image_bgr, face_mesh)
                if face_box:
                    detection_count += 1
                
                axes[idx].imshow(display_image_rgb)
                detection_status = "Detected" if face_box else "Not Detected"
                axes[idx].set_title(f'{emotion} - {detection_status}')
                axes[idx].axis('off')
    else:
        # Use Haar Cascade
        for idx, sample_idx in enumerate(sample_indices):
            if idx >= len(axes): 
                break
                
            image_bgr = X_raw[sample_idx]
            emotion = label_names[y_raw[sample_idx]]
            
            display_image_rgb, face_box = detect_faces_haar(image_bgr, haar_cascade)
            if face_box:
                detection_count += 1
            
            axes[idx].imshow(display_image_rgb)
            detection_status = "Detected" if face_box else "Not Detected"
            axes[idx].set_title(f'{emotion} - {detection_status}')
            axes[idx].axis('off')
    
    for idx in range(len(sample_indices), len(axes)):
        axes[idx].axis('off')
    
    detection_rate = detection_count / len(sample_indices)
    plt.suptitle(f"Milestone 1: Face Detection Check (Detection Rate: {detection_rate:.1%})")
    plt.tight_layout()
    plt.show()
    
    return detection_rate

print("Running Milestone 1: Face Detection Visualization...")
detection_rate = visualize_face_detection_samples(X_train_raw, y_train)
print(f"Face detection rate on samples: {detection_rate:.1%}")

In [None]:
# Cell 5: Milestone 2 - Track A: Landmark Feature Extraction
def extract_and_normalize_landmarks(image_bgr, face_mesh):
    """Extracts 468 normalized landmarks (centered by eye midpoint, scaled by IPD)."""
    image_rgb = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2RGB)
    results = face_mesh.process(image_rgb)
    
    if not results.multi_face_landmarks:
        return None
    
    landmarks = results.multi_face_landmarks[0].landmark
    
    coords = np.array([[lm.x, lm.y] for lm in landmarks])
    
    # Calculate IPD and center
    p_left = coords[LEFT_EYE_IDX]
    p_right = coords[RIGHT_EYE_IDX]
    ipd = np.linalg.norm(p_left - p_right)
    
    if ipd == 0: 
        return None
        
    center = (p_left + p_right) / 2
    
    # Normalize
    normalized_coords = (coords - center) / ipd
    
    return normalized_coords.flatten() # 936 features

def get_landmark_features_and_labels(X_raw, y_raw):
    """Extract landmark features with progress tracking."""
    features = []
    filtered_labels = []
    failed_detections = 0
    
    print("Extracting landmark features...")
    
    if mp_face_mesh is None:
        print("MediaPipe not available. Cannot extract landmarks.")
        return np.array([]), np.array([])
    
    with mp_face_mesh.FaceMesh(static_image_mode=True, max_num_faces=1, min_detection_confidence=0.3) as face_mesh:
        for i, img in enumerate(X_raw):
            if i % 1000 == 0:
                print(f"Processed {i}/{len(X_raw)} images...")
                
            feature = extract_and_normalize_landmarks(img, face_mesh)
            if feature is not None:
                features.append(feature)
                filtered_labels.append(y_raw[i])
            else:
                failed_detections += 1
                
    print(f"Landmark extraction complete. Failed detections: {failed_detections}/{len(X_raw)} ({failed_detections/len(X_raw):.1%})")
    return np.array(features), np.array(filtered_labels)

print("\n--- Track A: Landmark Feature Extraction (936 features) ---")
X_train_lm, y_train_lm = get_landmark_features_and_labels(X_train_raw, y_train)
X_test_lm, y_test_lm = get_landmark_features_and_labels(X_test_raw, y_test)

if len(X_train_lm) > 0:
    print(f"Landmark Train Features shape: {X_train_lm.shape}, Labels shape: {y_train_lm.shape}")
    print(f"Landmark Test Features shape: {X_test_lm.shape}, Labels shape: {y_test_lm.shape}")
else:
    print("No landmark features extracted. Proceeding with CNN features only.")

In [None]:
# Cell 6: Milestone 2 - Track B: CNN Deep Feature Extraction
def extract_cnn_features_batched(X_raw, batch_size=32):
    """Extracts deep features using MobileNetV2 in batches to manage memory."""
    
    num_images = len(X_raw)
    all_features = []
    
    print(f"Starting CNN feature extraction for {num_images} images with batch size {batch_size}...")
    
    for i in range(0, num_images, batch_size):
        batch_raw = X_raw[i:i + batch_size]
        
        # 1. Resize batch
        resized_images = [cv2.resize(img, (224, 224), interpolation=cv2.INTER_AREA) for img in batch_raw]
        X_resized_batch = np.array(resized_images)
        
        # 2. Preprocess batch
        X_processed_batch = preprocess_input(X_resized_batch)
        
        # 3. Extract features
        features_batch = feature_extractor.predict(X_processed_batch, verbose=0)
        all_features.append(features_batch)
        
        # Clean up memory
        del X_resized_batch, X_processed_batch, features_batch
        gc.collect()
        
        if (i + batch_size) % (batch_size * 10) == 0:
            print(f"Processed {i + batch_size}/{num_images} images.")
            
    return np.concatenate(all_features, axis=0)

# Load MobileNetV2 backbone (1280 features)
print("Loading MobileNetV2 feature extractor...")
base_model = MobileNetV2(weights='imagenet', include_top=False, pooling='avg', input_shape=(224, 224, 3))
feature_extractor = Model(inputs=base_model.input, outputs=base_model.output)

print("\n--- Track B: CNN Deep Feature Extraction (1280 features) ---")
X_train_cnn = extract_cnn_features_batched(X_train_raw)
X_test_cnn = extract_cnn_features_batched(X_test_raw)

y_train_cnn = y_train
y_test_cnn = y_test

print(f"CNN Train Features shape: {X_train_cnn.shape}, Labels shape: {y_train_cnn.shape}")
print(f"CNN Test Features shape: {X_test_cnn.shape}, Labels shape: {y_test_cnn.shape}")

In [None]:
# Cell 7: Model Training with Improved Parameters
def train_and_evaluate_models():
    """Train and evaluate both models with improved parameters."""
    
    results = {}
    
    # Model 1: SVM on Landmark Features (if available)
    if len(X_train_lm) > 0 and len(X_test_lm) > 0:
        print("\n--- Training Model 1: SVM on Landmark Features ---")
        
        # Standardization
        scaler_lm = StandardScaler()
        X_train_lm_scaled = scaler_lm.fit_transform(X_train_lm)
        X_test_lm_scaled = scaler_lm.transform(X_test_lm)

        # Training SVM with class weights
        svm_model = SVC(kernel='rbf', C=1.0, gamma='scale', random_state=RANDOM_SEED, class_weight='balanced')
        svm_model.fit(X_train_lm_scaled, y_train_lm)
        y_pred_svm = svm_model.predict(X_test_lm_scaled)

        print("SVM Training Complete. (C=1.0, RBF kernel, class_weight='balanced')")
        results['svm'] = (svm_model, y_pred_svm, y_test_lm, "SVM (Landmarks)")
    
    # Model 2: Random Forest on CNN Features
    print("\n--- Training Model 2: Random Forest on CNN Features ---")
    
    # Standardize CNN features for better performance
    scaler_cnn = StandardScaler()
    X_train_cnn_scaled = scaler_cnn.fit_transform(X_train_cnn)
    X_test_cnn_scaled = scaler_cnn.transform(X_test_cnn)
    
    rf_model = RandomForestClassifier(
        n_estimators=200, 
        max_depth=15, 
        min_samples_split=5,
        min_samples_leaf=2,
        random_state=RANDOM_SEED, 
        n_jobs=-1,
        class_weight='balanced'
    )
    rf_model.fit(X_train_cnn_scaled, y_train_cnn)
    y_pred_rf = rf_model.predict(X_test_cnn_scaled)

    print("Random Forest Training Complete. (200 estimators, max_depth=15, balanced)")
    results['rf'] = (rf_model, y_pred_rf, y_test_cnn, "Random Forest (CNN)")
    
    return results

# Train models
model_results = train_and_evaluate_models()

In [None]:
# Cell 8: Milestone 3 - Comprehensive Evaluation
def evaluate_model(y_true, y_pred, model_name, feature_type):
    """Prints evaluation metrics and plots confusion matrix."""
    
    accuracy = accuracy_score(y_true, y_pred)
    macro_f1 = f1_score(y_true, y_pred, average='macro')
    
    print(f"\n--- Evaluation: {model_name} ({feature_type}) ---")
    print(f"Overall Accuracy: {accuracy:.4f}")
    print(f"Macro F1 Score: {macro_f1:.4f}")
    
    # Classification Report
    report = classification_report(y_true, y_pred, target_names=CLASSES, output_dict=True)
    print("\nClassification Report:")
    print(classification_report(y_true, y_pred, target_names=CLASSES))
    
    # Confusion Matrix
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(8, 7))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
                xticklabels=CLASSES, yticklabels=CLASSES)
    plt.title(f'Confusion Matrix: {model_name} ({feature_type})')
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')
    plt.tight_layout()
    plt.show()
    
    # Analyze common confusions
    print("\nKey Confusion Patterns:")
    for i in range(len(CLASSES)):
        for j in range(len(CLASSES)):
            if i != j and cm[i, j] > 0.1 * cm[i, i]:  # Significant confusion
                print(f"  {CLASSES[i]} → {CLASSES[j]}: {cm[i, j]} instances")
    
    return accuracy, macro_f1, report, cm

# Evaluate all trained models
evaluation_results = {}
for key, (model, y_pred, y_true, model_name) in model_results.items():
    feature_type = "Landmarks (Track A)" if "Landmark" in model_name else "CNN Embeddings (Track B)"
    acc, f1, report, cm = evaluate_model(y_true, y_pred, model_name, feature_type)
    evaluation_results[key] = (acc, f1, report, cm)

In [None]:
# Cell 9: Comparative Summary and Insights
def create_comparative_summary(evaluation_results):
    """Create comprehensive comparison of models."""
    
    summary_data = []
    
    for key, (acc, f1, report, cm) in evaluation_results.items():
        model_name = "SVM (Landmarks)" if key == 'svm' else "Random Forest (CNN Embeddings)"
        summary_data.append({
            'Model': model_name,
            'Accuracy': acc,
            'Macro Precision': report['macro avg']['precision'],
            'Macro Recall': report['macro avg']['recall'],
            'Macro F1': f1
        })
    
    summary_df = pd.DataFrame(summary_data).set_index('Model')
    
    print("\n" + "="*60)
    print("COMPARATIVE SUMMARY TABLE")
    print("="*60)
    print(summary_df.round(4))
    
    # Determine best model
    best_model_idx = summary_df['Accuracy'].idxmax()
    best_accuracy = summary_df.loc[best_model_idx, 'Accuracy']
    
    print(f"\nBest Model: {best_model_idx} (Accuracy: {best_accuracy:.2%})")
    
    # Insights
    print("\n" + "="*60)
    print("KEY INSIGHTS & OBSERVATIONS")
    print("="*60)
    
    insights = [
        "• Class imbalance significantly affects performance, especially for 'disgust' class",
        "• Fear and surprise are commonly confused due to similar facial expressions (wide eyes)",
        "• Happy emotions are generally well-classified across both models",
        "• Geometric features (landmarks) may struggle with subtle expression differences",
        "• CNN features capture more complex patterns but require careful preprocessing"
    ]
    
    for insight in insights:
        print(insight)
    
    return summary_df, best_model_idx

summary_df, best_model_name = create_comparative_summary(evaluation_results)

In [None]:
# Cell 10: Simple Deployment with Best Model
# Select the best model for deployment
if 'rf' in model_results and 'svm' in model_results:
    # Compare which model performed better
    rf_acc = evaluation_results['rf'][0]
    svm_acc = evaluation_results['svm'][0]
    
    if rf_acc > svm_acc:
        BEST_MODEL = model_results['rf'][0]
        BEST_FEATURE_TYPE = 'cnn'
        print("Selected Random Forest (CNN) as best model for deployment")
    else:
        BEST_MODEL = model_results['svm'][0]
        BEST_FEATURE_TYPE = 'landmarks'
        print("Selected SVM (Landmarks) as best model for deployment")
elif 'rf' in model_results:
    BEST_MODEL = model_results['rf'][0]
    BEST_FEATURE_TYPE = 'cnn'
    print("Selected Random Forest (CNN) as best model for deployment")
else:
    BEST_MODEL = None
    BEST_FEATURE_TYPE = None
    print("No suitable model found for deployment")

# Deployment function
def predict_emotion(image_path, model, feature_type='cnn'):
    """
    Reads a FER-style face image, runs the pipeline, and predicts the emotion.
    """
    if model is None:
        return "Error: No model available for prediction."
    
    # 1. Load Image
    img_gray = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
    if img_gray is None:
        return "Error: Could not load image."
        
    img_bgr = cv2.cvtColor(img_gray, cv2.COLOR_GRAY2BGR)
    
    if feature_type == 'cnn':
        # CNN pipeline
        img_resized = cv2.resize(img_bgr, (224, 224), interpolation=cv2.INTER_AREA)
        X_input = np.expand_dims(img_resized, axis=0)
        X_processed = preprocess_input(X_input)
        features = feature_extractor.predict(X_processed, verbose=0)
        
        # Standardize features if using RF with CNN
        if hasattr(model, 'estimators_'):  # Random Forest
            features = scaler_cnn.transform(features)
            
    else:
        # Landmark pipeline
        if mp_face_mesh is None:
            return "Error: MediaPipe not available for landmark extraction."
            
        with mp_face_mesh.FaceMesh(static_image_mode=True, max_num_faces=1, min_detection_confidence=0.3) as face_mesh:
            features = extract_and_normalize_landmarks(img_bgr, face_mesh)
            if features is None:
                return "Error: No face detected in image."
            features = scaler_lm.transform([features])
    
    # Prediction
    prediction = model.predict(features)
    predicted_label_index = prediction[0]
    
    return CLASSES[predicted_label_index]

# Test deployment on sample images
def test_deployment(num_samples=3):
    """Test the deployment function on random test samples."""
    print(f"\n--- Testing Deployment on {num_samples} Random Samples ---")
    
    for i in range(num_samples):
        # Randomly select a test image
        random_idx = np.random.randint(len(X_test_raw))
        test_image = X_test_raw[random_idx]
        true_emotion = CLASSES[y_test[random_idx]]
        
        # Save temporary image for prediction
        temp_path = f"temp_test_{i}.jpg"
        cv2.imwrite(temp_path, test_image)
        
        # Predict
        predicted_emotion = predict_emotion(temp_path, BEST_MODEL, BEST_FEATURE_TYPE)
        
        print(f"Sample {i+1}: True='{true_emotion}', Predicted='{predicted_emotion}' {'✅' if true_emotion == predicted_emotion else '❌'}")
        
        # Clean up
        if os.path.exists(temp_path):
            os.remove(temp_path)

if BEST_MODEL is not None:
    test_deployment()
else:
    print("Skipping deployment test - no model available.")

In [None]:
# Cell 11: Final Summary and Cleanup
print("\n" + "="*70)
print("FINAL PROJECT SUMMARY")
print("="*70)

print(f"\nDataset Statistics:")
print(f"   Training samples: {len(X_train_raw):,}")
print(f"   Test samples: {len(X_test_raw):,}")
print(f"   Classes: {CLASSES}")

print(f"\nTechnical Implementation:")
print(f"   Face detection: {'MediaPipe' if mp_face_mesh else 'Haar Cascade'}")
print(f"   Feature tracks: {'Landmarks + CNN' if len(X_train_lm) > 0 else 'CNN only'}")
print(f"   Models trained: {len(model_results)}")

print(f"\nPerformance Summary:")
for key, (acc, f1, report, cm) in evaluation_results.items():
    model_name = "SVM (Landmarks)" if key == 'svm' else "Random Forest (CNN)"
    print(f"   {model_name}: Accuracy = {acc:.2%}, F1 = {f1:.2%}")

print(f"\nDeployment Ready: {BEST_MODEL is not None}")
if BEST_MODEL is not None:
    best_model_type = "Random Forest (CNN)" if BEST_FEATURE_TYPE == 'cnn' else "SVM (Landmarks)"
    print(f"   Best model: {best_model_type}")

print(f"\nRecommendations for Improvement:")
improvements = [
    "1. Address class imbalance with data augmentation",
    "2. Try ensemble methods combining both feature types",
    "3. Experiment with different CNN architectures",
    "4. Add more sophisticated face detection fallbacks",
    "5. Implement cross-validation for hyperparameter tuning"
]

for imp in improvements:
    print(f"   {imp}")

# Cleanup
gc.collect()
print("\nCode execution completed successfully!")