<a href="https://colab.research.google.com/github/Loganathankumar/create-face-emotion-detection/blob/main/Untitled0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
"""
Advanced Facial Expression Recognition using RAVDESS Dataset
PART 1: Data Loading and Preprocessing
Processes VIDEO files to extract facial expressions
"""

import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
from sklearn.utils.class_weight import compute_class_weight
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, models, regularizers
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
import cv2
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')

# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

# ===================== CONFIGURATION =====================
class Config:
    DATASET_PATH = '/content/drive/MyDrive/RAVDESS DATASET'
    IMG_SIZE = (128, 128)
    BATCH_SIZE = 32
    EPOCHS = 100
    LEARNING_RATE = 0.001
    RANDOM_STATE = 42
    FRAMES_PER_VIDEO = 5  # Extract N frames from each video

    # Emotion mapping for RAVDESS
    EMOTIONS = {
        '01': 'neutral',
        '02': 'calm',
        '03': 'happy',
        '04': 'sad',
        '05': 'angry',
        '06': 'fearful',
        '07': 'disgust',
        '08': 'surprised'
    }

config = Config()

# Set seeds for reproducibility
np.random.seed(config.RANDOM_STATE)
tf.random.set_seed(config.RANDOM_STATE)

print("="*80)
print("GPU Available:", tf.config.list_physical_devices('GPU'))
print("="*80)

# ===================== FACE DETECTION =====================
class FaceDetector:
    def __init__(self):
        # Load Haar Cascade for face detection
        self.face_cascade = cv2.CascadeClassifier(
            cv2.data.haarcascades + 'haarcascade_frontalface_default.xml'
        )
        print("‚úì Face detector initialized")

    def detect_face(self, frame):
        """Detect and extract face from frame"""
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        faces = self.face_cascade.detectMultiScale(gray, 1.3, 5)

        if len(faces) > 0:
            # Get largest face
            (x, y, w, h) = max(faces, key=lambda rect: rect[2] * rect[3])
            face = frame[y:y+h, x:x+w]
            return face
        return None

# ===================== DATA LOADING & PREPROCESSING =====================
class RAVDESSDataLoader:
    def __init__(self, dataset_path):
        self.dataset_path = dataset_path
        self.data = []
        self.face_detector = FaceDetector()

    def extract_frames_from_video(self, video_path):
        """Extract frames from video and detect faces"""
        try:
            cap = cv2.VideoCapture(video_path)

            if not cap.isOpened():
                return []

            total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

            if total_frames == 0:
                cap.release()
                return []

            # Calculate frame indices to extract (evenly spaced)
            frame_indices = np.linspace(0, total_frames - 1,
                                       config.FRAMES_PER_VIDEO, dtype=int)

            faces = []
            for frame_idx in frame_indices:
                cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
                ret, frame = cap.read()

                if ret:
                    # Detect face in frame
                    face = self.face_detector.detect_face(frame)

                    if face is not None:
                        # Resize face to target size
                        face_resized = cv2.resize(face, config.IMG_SIZE)
                        # Convert BGR to RGB
                        face_rgb = cv2.cvtColor(face_resized, cv2.COLOR_BGR2RGB)
                        faces.append(face_rgb)

            cap.release()
            return faces

        except Exception as e:
            return []

    def parse_filename(self, filename):
        """Parse RAVDESS filename to extract emotion label"""
        parts = filename.split('-')
        if len(parts) >= 3:
            emotion_code = parts[2]
            return config.EMOTIONS.get(emotion_code, None)
        return None

    def load_dataset(self):
        """Load all video files and extract faces"""
        print("\n" + "="*80)
        print("LOADING RAVDESS DATASET")
        print("="*80)
        print(f"Dataset path: {self.dataset_path}")

        if not os.path.exists(self.dataset_path):
            print(f"ERROR: Dataset path does not exist: {self.dataset_path}")
            return []

        # Get all actor folders
        all_items = os.listdir(self.dataset_path)
        actor_folders = [f for f in all_items if f.startswith('Actor_')]

        print(f"Found {len(actor_folders)} actor folders")

        if len(actor_folders) == 0:
            print("ERROR: No actor folders found!")
            return []

        video_count = 0
        face_count = 0

        for actor_folder in tqdm(actor_folders, desc="Processing actors"):
            actor_path = os.path.join(self.dataset_path, actor_folder)

            if not os.path.isdir(actor_path):
                continue

            # Get all video files
            all_files = os.listdir(actor_path)
            video_files = [f for f in all_files
                          if f.endswith(('.mp4', '.avi', '.mov', '.MP4', '.AVI', '.MOV'))]

            for video_file in video_files:
                emotion = self.parse_filename(video_file)

                if emotion is None:
                    continue

                video_path = os.path.join(actor_path, video_file)
                faces = self.extract_frames_from_video(video_path)

                video_count += 1

                # Add each face as a separate sample
                for face in faces:
                    self.data.append({
                        'features': face,
                        'emotion': emotion,
                        'actor': actor_folder,
                        'filename': video_file
                    })
                    face_count += 1

        print(f"\n{'='*80}")
        print(f"Processing complete:")
        print(f"  Videos processed: {video_count}")
        print(f"  Face frames extracted: {face_count}")
        print(f"  Avg faces/video: {face_count/video_count if video_count > 0 else 0:.1f}")
        print(f"{'='*80}\n")

        return self.data

    def prepare_data(self):
        """Prepare data for training"""
        if not self.data:
            self.load_dataset()

        if len(self.data) == 0:
            raise ValueError("No data was loaded from the dataset")

        # Convert to arrays
        X = np.array([item['features'] for item in self.data])
        y = np.array([item['emotion'] for item in self.data])

        # Normalize features
        X = X / 255.0

        # Encode labels
        le = LabelEncoder()
        y_encoded = le.fit_transform(y)
        y_categorical = keras.utils.to_categorical(y_encoded)

        print(f"{'='*80}")
        print("DATASET SUMMARY")
        print(f"{'='*80}")
        print(f"Data shape: {X.shape}")
        print(f"Labels shape: {y_categorical.shape}")
        print(f"Number of classes: {len(le.classes_)}")
        print(f"Classes: {list(le.classes_)}")

        # Class distribution
        unique, counts = np.unique(y, return_counts=True)
        print(f"\n{'Class Distribution':^40}")
        print(f"{'-'*40}")
        for emotion, count in zip(unique, counts):
            print(f"  {emotion:12} : {count:4d} samples ({count/len(y)*100:5.1f}%)")
        print(f"{'='*80}\n")

        return X, y_categorical, le

# ===================== DATA SPLITTING =====================
def split_data(X, y, test_size=0.2, val_size=0.15):
    """Split data with stratification"""
    y_labels = np.argmax(y, axis=1)

    # First split: train+val and test
    X_temp, X_test, y_temp, y_test = train_test_split(
        X, y, test_size=test_size,
        stratify=y_labels, random_state=config.RANDOM_STATE
    )

    # Second split: train and val
    y_temp_labels = np.argmax(y_temp, axis=1)
    val_size_adjusted = val_size / (1 - test_size)
    X_train, X_val, y_train, y_val = train_test_split(
        X_temp, y_temp, test_size=val_size_adjusted,
        stratify=y_temp_labels, random_state=config.RANDOM_STATE
    )

    print(f"{'='*80}")
    print("DATA SPLIT")
    print(f"{'='*80}")
    print(f"Training set:   {X_train.shape[0]:5d} samples ({X_train.shape[0]/X.shape[0]*100:.1f}%)")
    print(f"Validation set: {X_val.shape[0]:5d} samples ({X_val.shape[0]/X.shape[0]*100:.1f}%)")
    print(f"Test set:       {X_test.shape[0]:5d} samples ({X_test.shape[0]/X.shape[0]*100:.1f}%)")
    print(f"{'='*80}\n")

    return X_train, X_val, X_test, y_train, y_val, y_test

# ===================== EXECUTE DATA LOADING =====================
print("\n" + "="*80)
print("STARTING DATA PREPARATION")
print("="*80)

# Load and prepare data
loader = RAVDESSDataLoader(config.DATASET_PATH)
X, y, label_encoder = loader.prepare_data()

# Split data
X_train, X_val, X_test, y_train, y_val, y_test = split_data(X, y)

# Save processed data for later use
print("Saving preprocessed data...")
np.save('X_train.npy', X_train)
np.save('X_val.npy', X_val)
np.save('X_test.npy', X_test)
np.save('y_train.npy', y_train)
np.save('y_val.npy', y_val)
np.save('y_test.npy', y_test)

# Save label encoder classes
import pickle
with open('label_encoder.pkl', 'wb') as f:
    pickle.dump(label_encoder, f)

print("‚úÖ Data preparation complete!")
print(f"{'='*80}\n")
print("Run PART 2 to train models!")

MessageError: Error: credential propagation was unsuccessful

In [None]:
"""
Advanced Facial Expression Recognition using RAVDESS Dataset
PART 2: Deep Learning Model Architectures
"""

import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, models, regularizers
from tensorflow.keras.applications import (
    VGG16, ResNet50, InceptionV3, MobileNetV2,
    EfficientNetB0, DenseNet121, Xception
)
import pickle

# Load preprocessed data
print("="*80)
print("LOADING PREPROCESSED DATA")
print("="*80)

X_train = np.load('X_train.npy')
X_val = np.load('X_val.npy')
X_test = np.load('X_test.npy')
y_train = np.load('y_train.npy')
y_val = np.load('y_val.npy')
y_test = np.load('y_test.npy')

with open('label_encoder.pkl', 'rb') as f:
    label_encoder = pickle.load(f)

input_shape = X_train.shape[1:]
num_classes = y_train.shape[1]

print(f"‚úì Data loaded successfully")
print(f"  Input shape: {input_shape}")
print(f"  Number of classes: {num_classes}")
print(f"  Training samples: {X_train.shape[0]}")
print("="*80 + "\n")

# ===================== MODEL 1: CUSTOM CNN =====================
def create_custom_cnn(input_shape, num_classes):
    """Custom CNN architecture optimized for facial expressions"""
    print("\n[MODEL 1] Building Custom CNN...")

    model = models.Sequential([
        # Block 1
        layers.Conv2D(64, (3, 3), activation='relu', padding='same', input_shape=input_shape),
        layers.BatchNormalization(),
        layers.Conv2D(64, (3, 3), activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.MaxPooling2D((2, 2)),
        layers.Dropout(0.25),

        # Block 2
        layers.Conv2D(128, (3, 3), activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.Conv2D(128, (3, 3), activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.MaxPooling2D((2, 2)),
        layers.Dropout(0.25),

        # Block 3
        layers.Conv2D(256, (3, 3), activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.Conv2D(256, (3, 3), activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.MaxPooling2D((2, 2)),
        layers.Dropout(0.25),

        # Block 4
        layers.Conv2D(512, (3, 3), activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.Conv2D(512, (3, 3), activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.MaxPooling2D((2, 2)),
        layers.Dropout(0.25),

        # Dense layers
        layers.Flatten(),
        layers.Dense(512, activation='relu', kernel_regularizer=regularizers.l2(0.001)),
        layers.BatchNormalization(),
        layers.Dropout(0.5),
        layers.Dense(256, activation='relu', kernel_regularizer=regularizers.l2(0.001)),
        layers.BatchNormalization(),
        layers.Dropout(0.5),
        layers.Dense(num_classes, activation='softmax')
    ], name='Custom_CNN')

    print(f"‚úì Custom CNN created - Total params: {model.count_params():,}")
    return model

# ===================== MODEL 2: ATTENTION CNN =====================
def create_attention_cnn(input_shape, num_classes):
    """CNN with Spatial Attention Mechanism"""
    print("\n[MODEL 2] Building Attention-based CNN...")

    inputs = layers.Input(shape=input_shape)

    # Block 1
    x = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling2D((2, 2))(x)
    x = layers.Dropout(0.25)(x)

    # Block 2
    x = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling2D((2, 2))(x)
    x = layers.Dropout(0.25)(x)

    # Block 3 with Attention
    x = layers.Conv2D(256, (3, 3), activation='relu', padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2D(256, (3, 3), activation='relu', padding='same')(x)
    x = layers.BatchNormalization()(x)

    # Spatial Attention
    attention = layers.Conv2D(1, (1, 1), activation='sigmoid', padding='same', name='attention_map')(x)
    x = layers.Multiply()([x, attention])

    x = layers.MaxPooling2D((2, 2))(x)
    x = layers.Dropout(0.25)(x)

    # Block 4
    x = layers.Conv2D(512, (3, 3), activation='relu', padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.GlobalAveragePooling2D()(x)

    # Dense layers
    x = layers.Dense(512, activation='relu', kernel_regularizer=regularizers.l2(0.001))(x)
    x = layers.BatchNormalization()(x)
    x = layers.Dropout(0.5)(x)
    x = layers.Dense(256, activation='relu', kernel_regularizer=regularizers.l2(0.001))(x)
    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(num_classes, activation='softmax')(x)

    model = models.Model(inputs=inputs, outputs=outputs, name='Attention_CNN')
    print(f"‚úì Attention CNN created - Total params: {model.count_params():,}")
    return model

# ===================== MODEL 3: RESIDUAL CNN =====================
def create_residual_cnn(input_shape, num_classes):
    """Custom CNN with Residual Connections"""
    print("\n[MODEL 3] Building Residual CNN...")

    inputs = layers.Input(shape=input_shape)

    # Initial convolution
    x = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(inputs)
    x = layers.BatchNormalization()(x)

    # Residual Block 1
    shortcut = x
    x = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2D(64, (3, 3), padding='same')(x)
    x = layers.Add()([x, shortcut])
    x = layers.Activation('relu')(x)
    x = layers.MaxPooling2D((2, 2))(x)
    x = layers.Dropout(0.25)(x)

    # Residual Block 2
    x = layers.Conv2D(128, (1, 1), padding='same')(x)  # Match dimensions
    shortcut = x
    x = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2D(128, (3, 3), padding='same')(x)
    x = layers.Add()([x, shortcut])
    x = layers.Activation('relu')(x)
    x = layers.MaxPooling2D((2, 2))(x)
    x = layers.Dropout(0.25)(x)

    # Residual Block 3
    x = layers.Conv2D(256, (1, 1), padding='same')(x)
    shortcut = x
    x = layers.Conv2D(256, (3, 3), activation='relu', padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2D(256, (3, 3), padding='same')(x)
    x = layers.Add()([x, shortcut])
    x = layers.Activation('relu')(x)
    x = layers.MaxPooling2D((2, 2))(x)
    x = layers.Dropout(0.25)(x)

    # Global pooling and dense
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dense(512, activation='relu', kernel_regularizer=regularizers.l2(0.001))(x)
    x = layers.BatchNormalization()(x)
    x = layers.Dropout(0.5)(x)
    x = layers.Dense(256, activation='relu', kernel_regularizer=regularizers.l2(0.001))(x)
    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(num_classes, activation='softmax')(x)

    model = models.Model(inputs=inputs, outputs=outputs, name='Residual_CNN')
    print(f"‚úì Residual CNN created - Total params: {model.count_params():,}")
    return model

# ===================== MODEL 4: VGG16 TRANSFER LEARNING =====================
def create_vgg16_transfer(input_shape, num_classes):
    """VGG16 with Transfer Learning"""
    print("\n[MODEL 4] Building VGG16 Transfer Learning Model...")

    base_model = VGG16(weights='imagenet', include_top=False, input_shape=input_shape)
    base_model.trainable = False  # Freeze base layers

    model = models.Sequential([
        base_model,
        layers.GlobalAveragePooling2D(),
        layers.Dense(512, activation='relu', kernel_regularizer=regularizers.l2(0.001)),
        layers.BatchNormalization(),
        layers.Dropout(0.5),
        layers.Dense(256, activation='relu', kernel_regularizer=regularizers.l2(0.001)),
        layers.BatchNormalization(),
        layers.Dropout(0.5),
        layers.Dense(num_classes, activation='softmax')
    ], name='VGG16_Transfer')

    print(f"‚úì VGG16 Transfer created - Total params: {model.count_params():,}")
    print(f"  Trainable params: {sum([tf.size(w).numpy() for w in model.trainable_weights]):,}")
    return model

# ===================== MODEL 5: RESNET50 TRANSFER LEARNING =====================
def create_resnet50_transfer(input_shape, num_classes):
    """ResNet50 with Transfer Learning"""
    print("\n[MODEL 5] Building ResNet50 Transfer Learning Model...")

    base_model = ResNet50(weights='imagenet', include_top=False, input_shape=input_shape)
    base_model.trainable = False

    model = models.Sequential([
        base_model,
        layers.GlobalAveragePooling2D(),
        layers.Dense(512, activation='relu', kernel_regularizer=regularizers.l2(0.001)),
        layers.BatchNormalization(),
        layers.Dropout(0.5),
        layers.Dense(256, activation='relu', kernel_regularizer=regularizers.l2(0.001)),
        layers.BatchNormalization(),
        layers.Dropout(0.5),
        layers.Dense(num_classes, activation='softmax')
    ], name='ResNet50_Transfer')

    print(f"‚úì ResNet50 Transfer created - Total params: {model.count_params():,}")
    print(f"  Trainable params: {sum([tf.size(w).numpy() for w in model.trainable_weights]):,}")
    return model

# ===================== MODEL 6: MOBILENETV2 TRANSFER LEARNING =====================
def create_mobilenet_transfer(input_shape, num_classes):
    """MobileNetV2 with Transfer Learning - Lightweight & Fast"""
    print("\n[MODEL 6] Building MobileNetV2 Transfer Learning Model...")

    base_model = MobileNetV2(weights='imagenet', include_top=False, input_shape=input_shape)
    base_model.trainable = False

    model = models.Sequential([
        base_model,
        layers.GlobalAveragePooling2D(),
        layers.Dense(512, activation='relu', kernel_regularizer=regularizers.l2(0.001)),
        layers.BatchNormalization(),
        layers.Dropout(0.5),
        layers.Dense(256, activation='relu', kernel_regularizer=regularizers.l2(0.001)),
        layers.BatchNormalization(),
        layers.Dropout(0.5),
        layers.Dense(num_classes, activation='softmax')
    ], name='MobileNetV2_Transfer')

    print(f"‚úì MobileNetV2 Transfer created - Total params: {model.count_params():,}")
    print(f"  Trainable params: {sum([tf.size(w).numpy() for w in model.trainable_weights]):,}")
    return model

# ===================== MODEL 7: EFFICIENTNET TRANSFER LEARNING =====================
def create_efficientnet_transfer(input_shape, num_classes):
    """EfficientNetB0 with Transfer Learning"""
    print("\n[MODEL 7] Building EfficientNetB0 Transfer Learning Model...")

    base_model = EfficientNetB0(weights='imagenet', include_top=False, input_shape=input_shape)
    base_model.trainable = False

    model = models.Sequential([
        base_model,
        layers.GlobalAveragePooling2D(),
        layers.Dense(512, activation='relu', kernel_regularizer=regularizers.l2(0.001)),
        layers.BatchNormalization(),
        layers.Dropout(0.5),
        layers.Dense(256, activation='relu', kernel_regularizer=regularizers.l2(0.001)),
        layers.BatchNormalization(),
        layers.Dropout(0.5),
        layers.Dense(num_classes, activation='softmax')
    ], name='EfficientNetB0_Transfer')

    print(f"‚úì EfficientNetB0 Transfer created - Total params: {model.count_params():,}")
    print(f"  Trainable params: {sum([tf.size(w).numpy() for w in model.trainable_weights]):,}")
    return model

# ===================== MODEL 8: DENSENET TRANSFER LEARNING =====================
def create_densenet_transfer(input_shape, num_classes):
    """DenseNet121 with Transfer Learning"""
    print("\n[MODEL 8] Building DenseNet121 Transfer Learning Model...")

    base_model = DenseNet121(weights='imagenet', include_top=False, input_shape=input_shape)
    base_model.trainable = False

    model = models.Sequential([
        base_model,
        layers.GlobalAveragePooling2D(),
        layers.Dense(512, activation='relu', kernel_regularizer=regularizers.l2(0.001)),
        layers.BatchNormalization(),
        layers.Dropout(0.5),
        layers.Dense(256, activation='relu', kernel_regularizer=regularizers.l2(0.001)),
        layers.BatchNormalization(),
        layers.Dropout(0.5),
        layers.Dense(num_classes, activation='softmax')
    ], name='DenseNet121_Transfer')

    print(f"‚úì DenseNet121 Transfer created - Total params: {model.count_params():,}")
    print(f"  Trainable params: {sum([tf.size(w).numpy() for w in model.trainable_weights]):,}")
    return model

# ===================== CREATE ALL MODELS =====================
print("\n" + "="*80)
print("CREATING ALL MODELS")
print("="*80)

models_dict = {
    'Custom_CNN': create_custom_cnn(input_shape, num_classes),
    'Attention_CNN': create_attention_cnn(input_shape, num_classes),
    'Residual_CNN': create_residual_cnn(input_shape, num_classes),
    'VGG16_Transfer': create_vgg16_transfer(input_shape, num_classes),
    'ResNet50_Transfer': create_resnet50_transfer(input_shape, num_classes),
    'MobileNetV2_Transfer': create_mobilenet_transfer(input_shape, num_classes),
    'EfficientNetB0_Transfer': create_efficientnet_transfer(input_shape, num_classes),
    'DenseNet121_Transfer': create_densenet_transfer(input_shape, num_classes),
}

print("\n" + "="*80)
print(f"‚úÖ ALL {len(models_dict)} MODELS CREATED SUCCESSFULLY!")
print("="*80)

# Save models dictionary
import pickle
with open('models_dict.pkl', 'wb') as f:
    pickle.dump(models_dict, f)

print("\n‚úì Models saved to 'models_dict.pkl'")
print("\nRun PART 3 to train all models!")

LOADING PREPROCESSED DATA


FileNotFoundError: [Errno 2] No such file or directory: 'X_train.npy'

In [None]:
"""
Advanced Facial Expression Recognition using RAVDESS Dataset
PART 3: Training Pipeline with Anti-Overfitting Techniques
"""

import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
from sklearn.utils.class_weight import compute_class_weight
import pickle
from tqdm import tqdm

# Load data and models
print("="*80)
print("LOADING DATA AND MODELS")
print("="*80)

X_train = np.load('X_train.npy')
X_val = np.load('X_val.npy')
X_test = np.load('X_test.npy')
y_train = np.load('y_train.npy')
y_val = np.load('y_val.npy')
y_test = np.load('y_test.npy')

with open('label_encoder.pkl', 'rb') as f:
    label_encoder = pickle.load(f)

with open('models_dict.pkl', 'rb') as f:
    models_dict = pickle.load(f)

print(f"‚úì Data loaded")
print(f"‚úì {len(models_dict)} models loaded")
print("="*80 + "\n")

# ===================== TRAINING CONFIGURATION =====================
class TrainingConfig:
    BATCH_SIZE = 32
    EPOCHS = 100
    LEARNING_RATE = 0.001

config = TrainingConfig()

# ===================== DATA AUGMENTATION =====================
def get_data_augmentation():
    """Data augmentation to prevent overfitting"""
    return ImageDataGenerator(
        rotation_range=15,
        width_shift_range=0.15,
        height_shift_range=0.15,
        horizontal_flip=True,
        zoom_range=0.15,
        shear_range=0.1,
        fill_mode='nearest'
    )

datagen = get_data_augmentation()
datagen.fit(X_train)

# ===================== CALLBACKS =====================
def get_callbacks(model_name):
    """Get training callbacks for anti-overfitting"""
    return [
        EarlyStopping(
            monitor='val_loss',
            patience=20,
            restore_best_weights=True,
            verbose=1
        ),
        ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.5,
            patience=10,
            min_lr=1e-7,
            verbose=1
        ),
        ModelCheckpoint(
            f'best_{model_name}.h5',
            monitor='val_accuracy',
            save_best_only=True,
            verbose=0
        )
    ]

# ===================== CLASS WEIGHTS =====================
def calculate_class_weights(y):
    """Calculate class weights for imbalanced data"""
    y_integers = np.argmax(y, axis=1)
    class_weights = compute_class_weight(
        class_weight='balanced',
        classes=np.unique(y_integers),
        y=y_integers
    )
    return dict(enumerate(class_weights))

class_weights = calculate_class_weights(y_train)
print("Class weights calculated to handle imbalanced data")
print(f"Class weights: {class_weights}\n")

# ===================== TRAINING FUNCTION =====================
def train_model(model, model_name, X_train, X_val, y_train, y_val):
    """Train a single model with all anti-overfitting techniques"""
    print(f"\n{'='*80}")
    print(f"TRAINING: {model_name}")
    print(f"{'='*80}")

    # Compile model
    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=config.LEARNING_RATE),
        loss='categorical_crossentropy',
        metrics=['accuracy', keras.metrics.AUC(name='auc')]
    )

    # Get callbacks
    callbacks = get_callbacks(model_name)

    print(f"Starting training with:")
    print(f"  - Data augmentation")
    print(f"  - Early stopping (patience=20)")
    print(f"  - Learning rate reduction (patience=10)")
    print(f"  - Class weight balancing")
    print(f"  - Batch size: {config.BATCH_SIZE}")
    print(f"  - Max epochs: {config.EPOCHS}")
    print()

    # Train with data augmentation
    history = model.fit(
        datagen.flow(X_train, y_train, batch_size=config.BATCH_SIZE),
        validation_data=(X_val, y_val),
        epochs=config.EPOCHS,
        callbacks=callbacks,
        class_weight=class_weights,
        verbose=2
    )

    print(f"\n‚úì Training completed for {model_name}")
    print(f"  Best val_accuracy: {max(history.history['val_accuracy']):.4f}")
    print(f"  Epochs trained: {len(history.history['loss'])}")

    return model, history

# ===================== PLOTTING FUNCTION =====================
def plot_training_history(history, model_name):
    """Plot training history"""
    fig, axes = plt.subplots(2, 2, figsize=(16, 12))

    # Accuracy
    axes[0, 0].plot(history.history['accuracy'], label='Train Accuracy', linewidth=2)
    axes[0, 0].plot(history.history['val_accuracy'], label='Val Accuracy', linewidth=2)
    axes[0, 0].set_title(f'{model_name} - Accuracy', fontsize=14, fontweight='bold')
    axes[0, 0].set_xlabel('Epoch')
    axes[0, 0].set_ylabel('Accuracy')
    axes[0, 0].legend()
    axes[0, 0].grid(True, alpha=0.3)

    # Loss
    axes[0, 1].plot(history.history['loss'], label='Train Loss', linewidth=2)
    axes[0, 1].plot(history.history['val_loss'], label='Val Loss', linewidth=2)
    axes[0, 1].set_title(f'{model_name} - Loss', fontsize=14, fontweight='bold')
    axes[0, 1].set_xlabel('Epoch')
    axes[0, 1].set_ylabel('Loss')
    axes[0, 1].legend()
    axes[0, 1].grid(True, alpha=0.3)

    # AUC
    axes[1, 0].plot(history.history['auc'], label='Train AUC', linewidth=2)
    axes[1, 0].plot(history.history['val_auc'], label='Val AUC', linewidth=2)
    axes[1, 0].set_title(f'{model_name} - AUC', fontsize=14, fontweight='bold')
    axes[1, 0].set_xlabel('Epoch')
    axes[1, 0].set_ylabel('AUC')
    axes[1, 0].legend()
    axes[1, 0].grid(True, alpha=0.3)

    # Overfitting detection
    train_val_gap = np.array(history.history['accuracy']) - np.array(history.history['val_accuracy'])
    axes[1, 1].plot(train_val_gap, label='Train-Val Gap', linewidth=2, color='red')
    axes[1, 1].axhline(y=0, color='black', linestyle='--', alpha=0.3)
    axes[1, 1].set_title(f'{model_name} - Overfitting Detection', fontsize=14, fontweight='bold')
    axes[1, 1].set_xlabel('Epoch')
    axes[1, 1].set_ylabel('Accuracy Gap')
    axes[1, 1].legend()
    axes[1, 1].grid(True, alpha=0.3)

    plt.tight_layout()
    plt.savefig(f'{model_name}_training_history.png', dpi=300, bbox_inches='tight')
    plt.show()

    print(f"‚úì Training plot saved: {model_name}_training_history.png")

# ===================== TRAIN ALL MODELS =====================
print("\n" + "="*80)
print("STARTING TRAINING FOR ALL MODELS")
print("="*80)

training_results = {}

for model_name, model in models_dict.items():
    try:
        # Train model
        trained_model, history = train_model(
            model, model_name, X_train, X_val, y_train, y_val
        )

        # Plot training history
        plot_training_history(history, model_name)

        # Store results
        training_results[model_name] = {
            'model': trained_model,
            'history': history.history
        }

        # Save model
        trained_model.save(f'{model_name}_final.h5')
        print(f"‚úì Model saved: {model_name}_final.h5\n")

    except Exception as e:
        print(f"‚ùå Error training {model_name}: {str(e)}\n")
        continue

# Save training results
with open('training_results.pkl', 'wb') as f:
    pickle.dump(training_results, f)

print("\n" + "="*80)
print(f"‚úÖ TRAINING COMPLETED FOR {len(training_results)} MODELS")
print("="*80)
print("\nRun PART 4 to evaluate all models!")

In [None]:
"""
Advanced Facial Expression Recognition using RAVDESS Dataset
PART 4: Model Evaluation and Comparison
"""

import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
from sklearn.metrics import precision_recall_fscore_support, roc_auc_score, cohen_kappa_score
import pandas as pd
import pickle

# Load data and results
print("="*80)
print("LOADING DATA AND TRAINED MODELS")
print("="*80)

X_test = np.load('X_test.npy')
y_test = np.load('y_test.npy')

with open('label_encoder.pkl', 'rb') as f:
    label_encoder = pickle.load(f)

with open('training_results.pkl', 'rb') as f:
    training_results = pickle.load(f)

print(f"‚úì Test data loaded: {X_test.shape[0]} samples")
print(f"‚úì {len(training_results)} trained models loaded")
print("="*80 + "\n")

# ===================== EVALUATION FUNCTION =====================
def evaluate_model(model, model_name, X_test, y_test):
    """Comprehensive model evaluation"""
    print(f"\nEvaluating {model_name}...")

    # Predictions
    y_pred_prob = model.predict(X_test, verbose=0)
    y_pred_classes = np.argmax(y_pred_prob, axis=1)
    y_test_classes = np.argmax(y_test, axis=1)

    # Calculate metrics
    accuracy = accuracy_score(y_test_classes, y_pred_classes)

    # Per-class metrics
    precision, recall, f1, support = precision_recall_fscore_support(
        y_test_classes, y_pred_classes, average=None
    )

    # Overall metrics
    precision_macro, recall_macro, f1_macro, _ = precision_recall_fscore_support(
        y_test_classes, y_pred_classes, average='macro'
    )

    precision_weighted, recall_weighted, f1_weighted, _ = precision_recall_fscore_support(
        y_test_classes, y_pred_classes, average='weighted'
    )

    # Cohen's Kappa
    kappa = cohen_kappa_score(y_test_classes, y_pred_classes)

    # Confusion matrix
    cm = confusion_matrix(y_test_classes, y_pred_classes)

    # Classification report
    report = classification_report(
        y_test_classes, y_pred_classes,
        target_names=label_encoder.classes_,
        output_dict=True
    )

    results = {
        'accuracy': accuracy,
        'precision_macro': precision_macro,
        'recall_macro': recall_macro,
        'f1_macro': f1_macro,
        'precision_weighted': precision_weighted,
        'recall_weighted': recall_weighted,
        'f1_weighted': f1_weighted,
        'kappa': kappa,
        'confusion_matrix': cm,
        'report': report,
        'predictions': y_pred_classes,
        'true_labels': y_test_classes,
        'pred_probabilities': y_pred_prob
    }

    print(f"  Accuracy: {accuracy:.4f}")
    print(f"  F1-Score (Macro): {f1_macro:.4f}")
    print(f"  Cohen's Kappa: {kappa:.4f}")

    return results

# ===================== EVALUATE ALL MODELS =====================
print("\n" + "="*80)
print("EVALUATING ALL MODELS ON TEST SET")
print("="*80)

evaluation_results = {}

for model_name in training_results.keys():
    model = training_results[model_name]['model']
    results = evaluate_model(model, model_name, X_test, y_test)
    evaluation_results[model_name] = results

# ===================== CONFUSION MATRIX PLOTTING =====================
def plot_confusion_matrix(model_name, cm, classes):
    """Plot confusion matrix"""
    plt.figure(figsize=(12, 10))

    # Normalize confusion matrix
    cm_normalized = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

    sns.heatmap(cm_normalized, annot=True, fmt='.2f', cmap='Blues',
                xticklabels=classes, yticklabels=classes,
                cbar_kws={'label': 'Normalized Count'})

    plt.title(f'{model_name} - Confusion Matrix\nAccuracy: {evaluation_results[model_name]["accuracy"]:.4f}',
              fontsize=14, fontweight='bold')
    plt.ylabel('True Label', fontsize=12)
    plt.xlabel('Predicted Label', fontsize=12)
    plt.xticks(rotation=45, ha='right')
    plt.yticks(rotation=0)
    plt.tight_layout()
    plt.savefig(f'{model_name}_confusion_matrix.png', dpi=300, bbox_inches='tight')
    plt.show()
    print(f"‚úì Confusion matrix saved: {model_name}_confusion_matrix.png")

# Plot confusion matrices for all models
print("\n" + "="*80)
print("GENERATING CONFUSION MATRICES")
print("="*80)

for model_name in evaluation_results.keys():
    cm = evaluation_results[model_name]['confusion_matrix']
    plot_confusion_matrix(model_name, cm, label_encoder.classes_)

# ===================== DETAILED CLASSIFICATION REPORTS =====================
print("\n" + "="*80)
print("DETAILED CLASSIFICATION REPORTS")
print("="*80)

for model_name in evaluation_results.keys():
    print(f"\n{'='*80}")
    print(f"{model_name}")
    print(f"{'='*80}")

    report = evaluation_results[model_name]['report']

    # Print per-class metrics
    print(f"\n{'Emotion':<15} {'Precision':<12} {'Recall':<12} {'F1-Score':<12} {'Support':<10}")
    print("-" * 80)

    for emotion in label_encoder.classes_:
        metrics = report[emotion]
        print(f"{emotion:<15} {metrics['precision']:<12.4f} {metrics['recall']:<12.4f} "
              f"{metrics['f1-score']:<12.4f} {int(metrics['support']):<10}")

    # Print overall metrics
    print("-" * 80)
    print(f"\nOverall Metrics:")
    print(f"  Accuracy:           {evaluation_results[model_name]['accuracy']:.4f}")
    print(f"  Macro Avg F1:       {evaluation_results[model_name]['f1_macro']:.4f}")
    print(f"  Weighted Avg F1:    {evaluation_results[model_name]['f1_weighted']:.4f}")
    print(f"  Cohen's Kappa:      {evaluation_results[model_name]['kappa']:.4f}")

# ===================== MODEL COMPARISON =====================
print("\n" + "="*80)
print("MODEL COMPARISON")
print("="*80)

# Create comparison DataFrame
comparison_data = []
for model_name in evaluation_results.keys():
    comparison_data.append({
        'Model': model_name,
        'Accuracy': evaluation_results[model_name]['accuracy'],
        'F1-Macro': evaluation_results[model_name]['f1_macro'],
        'F1-Weighted': evaluation_results[model_name]['f1_weighted'],
        'Precision': evaluation_results[model_name]['precision_weighted'],
        'Recall': evaluation_results[model_name]['recall_weighted'],
        'Kappa': evaluation_results[model_name]['kappa']
    })

df_comparison = pd.DataFrame(comparison_data)
df_comparison = df_comparison.sort_values('Accuracy', ascending=False)

print("\n" + df_comparison.to_string(index=False))
print()

# Save comparison table
df_comparison.to_csv('model_comparison.csv', index=False)
print("‚úì Comparison table saved: model_comparison.csv")

# ===================== COMPARISON VISUALIZATION =====================
def plot_model_comparison(df):
    """Plot comprehensive model comparison"""
    fig, axes = plt.subplots(2, 2, figsize=(18, 12))

    metrics = ['Accuracy', 'F1-Macro', 'F1-Weighted', 'Kappa']
    colors = ['steelblue', 'coral', 'seagreen', 'mediumpurple']

    for idx, (metric, color) in enumerate(zip(metrics, colors)):
        ax = axes[idx // 2, idx % 2]

        df_sorted = df.sort_values(metric, ascending=True)
        bars = ax.barh(df_sorted['Model'], df_sorted[metric], color=color, alpha=0.8)

        # Add value labels
        for bar in bars:
            width = bar.get_width()
            ax.text(width, bar.get_y() + bar.get_height()/2,
                   f'{width:.4f}',
                   ha='left', va='center', fontsize=10, fontweight='bold')

        ax.set_xlabel(metric, fontsize=12, fontweight='bold')
        ax.set_title(f'Model Comparison - {metric}', fontsize=14, fontweight='bold')
        ax.set_xlim(0, 1)
        ax.grid(axis='x', alpha=0.3)

    plt.tight_layout()
    plt.savefig('model_comparison_chart.png', dpi=300, bbox_inches='tight')
    plt.show()
    print("‚úì Comparison chart saved: model_comparison_chart.png")

plot_model_comparison(df_comparison)

# ===================== FIND BEST MODEL =====================
best_model_name = df_comparison.iloc[0]['Model']
best_accuracy = df_comparison.iloc[0]['Accuracy']

print("\n" + "="*80)
print("BEST MODEL")
print("="*80)
print(f"\nüèÜ Best Model: {best_model_name}")
print(f"   Accuracy: {best_accuracy:.4f}")
print(f"   F1-Score (Macro): {df_comparison.iloc[0]['F1-Macro']:.4f}")
print(f"   F1-Score (Weighted): {df_comparison.iloc[0]['F1-Weighted']:.4f}")
print(f"   Cohen's Kappa: {df_comparison.iloc[0]['Kappa']:.4f}")
print("="*80)

# ===================== PER-CLASS PERFORMANCE COMPARISON =====================
def plot_per_class_comparison():
    """Compare all models' performance per emotion class"""
    emotions = label_encoder.classes_

    # Collect F1-scores for each emotion from each model
    data_for_plot = []
    for model_name in evaluation_results.keys():
        report = evaluation_results[model_name]['report']
        for emotion in emotions:
            data_for_plot.append({
                'Model': model_name,
                'Emotion': emotion,
                'F1-Score': report[emotion]['f1-score']
            })

    df_per_class = pd.DataFrame(data_for_plot)

    # Create grouped bar chart
    fig, ax = plt.subplots(figsize=(16, 8))

    emotions_list = list(emotions)
    x = np.arange(len(emotions_list))
    width = 0.1

    model_names = list(evaluation_results.keys())
    colors_palette = plt.cm.Set3(np.linspace(0, 1, len(model_names)))

    for idx, model_name in enumerate(model_names):
        model_data = df_per_class[df_per_class['Model'] == model_name]
        f1_scores = [model_data[model_data['Emotion'] == e]['F1-Score'].values[0]
                    for e in emotions_list]
        offset = width * (idx - len(model_names)/2)
        ax.bar(x + offset, f1_scores, width, label=model_name,
               color=colors_palette[idx], alpha=0.8)

    ax.set_xlabel('Emotion', fontsize=12, fontweight='bold')
    ax.set_ylabel('F1-Score', fontsize=12, fontweight='bold')
    ax.set_title('Per-Class Performance Comparison Across All Models',
                fontsize=14, fontweight='bold')
    ax.set_xticks(x)
    ax.set_xticklabels(emotions_list, rotation=45, ha='right')
    ax.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
    ax.grid(axis='y', alpha=0.3)
    ax.set_ylim(0, 1)

    plt.tight_layout()
    plt.savefig('per_class_comparison.png', dpi=300, bbox_inches='tight')
    plt.show()
    print("‚úì Per-class comparison saved: per_class_comparison.png")

print("\n" + "="*80)
print("PER-CLASS PERFORMANCE COMPARISON")
print("="*80)
plot_per_class_comparison()

# ===================== SAVE FINAL RESULTS =====================
with open('evaluation_results.pkl', 'wb') as f:
    pickle.dump(evaluation_results, f)

print("\n" + "="*80)
print("‚úÖ EVALUATION COMPLETE!")
print("="*80)
print("\nAll results saved:")
print("  - evaluation_results.pkl")
print("  - model_comparison.csv")
print("  - Confusion matrices (PNG)")
print("  - Comparison charts (PNG)")
print("\nBest model:", best_model_name)
print("="*80)