## Data Acquisition and Setup

In [None]:
# Import required libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import os
import zipfile
from PIL import Image

# For ML data preparation
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

# Set random seeds for reproducibility
np.random.seed(42)

In [None]:
def extract_fer2013_dataset(zip_filename='fer2013_data.zip', extract_path='data'):
    """Extracts the FER2013 dataset from the provided zip file."""
    # Create extraction directory if it doesn't exist
    if not os.path.exists(extract_path):
        os.makedirs(extract_path)
        print(f"Created directory: {extract_path}")
    
    # Check if the zip file exists
    if not os.path.exists(zip_filename):
        print(f"Error: {zip_filename} not found.")
        print("Please ensure the dataset zip file is in the current directory.")
        return False
    
    # Extract the contents
    try:
        print(f"Extracting {zip_filename} to {extract_path}...")
        with zipfile.ZipFile(zip_filename, 'r') as zip_ref:
            zip_ref.extractall(extract_path)
        print("Extraction complete!")
        
        # List the extracted contents
        extracted_files = os.listdir(extract_path)
        print(f"Extracted {len(extracted_files)} files/directories:")
        for item in extracted_files:
            print(f"- {item}")
        
        return True
    
    except Exception as e:
        print(f"Error extracting {zip_filename}: {e}")
        return False

# Extract the dataset
extract_success = extract_fer2013_dataset()

In [None]:
def explore_fer2013_structure(data_path='data'):
    """Explores and documents the structure of the extracted FER2013 dataset."""
    print("Exploring FER2013 dataset structure...")
    
    # Define expected emotion categories
    emotion_categories = {
        0: 'Angry', 1: 'Disgust', 2: 'Fear', 
        3: 'Happy', 4: 'Sad', 5: 'Surprise', 6: 'Neutral'
    }
    
    # Check for fer2013.csv (the main dataset file)
    csv_path = os.path.join(data_path, 'fer2013.csv')
    if os.path.exists(csv_path):
        print(f"\nFound main dataset file: {csv_path}")
        
        # Load and inspect CSV file
        df = pd.read_csv(csv_path)
        print(f"Dataset shape: {df.shape}")
        print(f"Columns: {df.columns.tolist()}")
        
        # Check for expected columns
        if 'pixels' in df.columns and 'emotion' in df.columns:
            print("\nDataset format is as expected with 'pixels' and 'emotion' columns.")
            
            # Display emotion distribution
            emotion_counts = df['emotion'].value_counts().sort_index()
            print("\nEmotion distribution:")
            total_images = len(df)
            
            for emotion_idx, count in emotion_counts.items():
                emotion_name = emotion_categories.get(emotion_idx, f"Unknown ({emotion_idx})")
                percentage = (count / total_images) * 100
                print(f"- {emotion_name}: {count} images ({percentage:.2f}%)")
            
            return df
    
    # Check for alternative dataset formats (image directories)
    for subdir in ['train', 'test', 'val']:
        subdir_path = os.path.join(data_path, subdir)
        if os.path.exists(subdir_path) and os.path.isdir(subdir_path):
            print(f"\nFound {subdir} directory: {subdir_path}")
            
            # Check for emotion subdirectories
            emotion_dirs = [d for d in os.listdir(subdir_path) 
                           if os.path.isdir(os.path.join(subdir_path, d))]
            
            print(f"Contains {len(emotion_dirs)} subdirectories:")
            for emotion_dir in emotion_dirs:
                dir_path = os.path.join(subdir_path, emotion_dir)
                num_images = len([f for f in os.listdir(dir_path) 
                                 if f.endswith(('.jpg', '.png', '.jpeg'))])
                print(f"- {emotion_dir}: {num_images} images")
    
    print("\nDataset exploration complete.")
    return None

# Explore the dataset structure
fer_df = explore_fer2013_structure()

In [None]:
def load_fer2013_data(data_path='data', df=None):
    """Loads the FER2013 dataset into memory."""
    # Define emotion mapping
    emotion_map = {
        0: 'Angry', 1: 'Disgust', 2: 'Fear', 
        3: 'Happy', 4: 'Sad', 5: 'Surprise', 6: 'Neutral'
    }
    
    # First, try loading from CSV
    csv_path = os.path.join(data_path, 'fer2013.csv')
    if df is None and os.path.exists(csv_path):
        print(f"Loading data from {csv_path}...")
        df = pd.read_csv(csv_path)
    
    if df is not None and 'pixels' in df.columns and 'emotion' in df.columns:
        print("Processing data from DataFrame...")
        
        # Parse string pixel values into numpy arrays
        print("Converting pixel strings to image arrays...")
        X = []
        
        # Simple progress tracking
        total_rows = len(df)
        print_intervals = [int(total_rows * i / 10) for i in range(1, 11)]
        
        for i, pixel_str in enumerate(df['pixels']):
            # Simple progress reporting
            if i in print_intervals:
                print(f"Progress: {i}/{total_rows} images ({i/total_rows*100:.1f}%)")
                
            pixels = [int(p) for p in pixel_str.split()]
            X.append(np.array(pixels).reshape(48, 48))
        
        X = np.array(X)
        y = df['emotion'].values
        
        print(f"Loaded {len(X)} images with shape {X[0].shape}")
        return X, y, emotion_map
    
    # If CSV loading fails, try loading from image directories
    print("CSV loading failed, checking for image directories...")
    
    X = []
    y = []
    total_images = 0
    
    # First count total images
    for split in ['train', 'test', 'val']:
        split_dir = os.path.join(data_path, split)
        if not os.path.exists(split_dir):
            continue
        
        for emotion_idx, emotion_name in emotion_map.items():
            # Try different directory naming conventions
            for dir_name in [emotion_name.lower(), str(emotion_idx)]:
                emotion_dir = os.path.join(split_dir, dir_name)
                if os.path.exists(emotion_dir):
                    total_images += len([f for f in os.listdir(emotion_dir) 
                                        if f.lower().endswith(('.jpg', '.jpeg', '.png'))])
    
    print(f"Found {total_images} images in directories. Loading...")
    processed_images = 0
    print_intervals = [int(total_images * i / 10) for i in range(1, 11)]
    
    # Load images from directories
    for split in ['train', 'test', 'val']:
        split_dir = os.path.join(data_path, split)
        if not os.path.exists(split_dir):
            continue
        
        print(f"Loading images from {split_dir}...")
        
        for emotion_idx, emotion_name in emotion_map.items():
            # Try different directory naming conventions
            for dir_name in [emotion_name.lower(), str(emotion_idx)]:
                emotion_dir = os.path.join(split_dir, dir_name)
                if not os.path.exists(emotion_dir):
                    continue
                
                print(f"Loading {emotion_name} images...")
                for img_file in os.listdir(emotion_dir):
                    if img_file.lower().endswith(('.jpg', '.jpeg', '.png')):
                        img_path = os.path.join(emotion_dir, img_file)
                        img = Image.open(img_path).convert('L')  # Convert to grayscale
                        img_resized = img.resize((48, 48))
                        img_array = np.array(img_resized)
                        X.append(img_array)
                        y.append(emotion_idx)
                        
                        processed_images += 1
                        if processed_images in print_intervals:
                            print(f"Progress: {processed_images}/{total_images} images ({processed_images/total_images*100:.1f}%)")
    
    if len(X) > 0:
        X = np.array(X)
        y = np.array(y)
        print(f"Loaded {len(X)} images with shape {X[0].shape}")
        return X, y, emotion_map
    
    print("Failed to load dataset. Please check the dataset structure.")
    return None, None, emotion_map

# Load the dataset
X, y, emotion_map = load_fer2013_data(df=fer_df)

In [None]:
# Display basic information about the dataset
if X is not None and y is not None:
    print("\nDataset Overview:")
    print(f"Total images: {len(X)}")
    print(f"Image dimensions: {X[0].shape}")
    print(f"Label distribution:")
    
    unique_labels, counts = np.unique(y, return_counts=True)
    for label, count in zip(unique_labels, counts):
        emotion_name = emotion_map[label]
        percentage = (count / len(y)) * 100
        print(f"- {emotion_name}: {count} images ({percentage:.2f}%)")
    
    # Memory usage estimation
    memory_usage_mb = X.nbytes / (1024 * 1024)
    print(f"\nEstimated memory usage for image data: {memory_usage_mb:.2f} MB")

In [None]:
def visualize_emotion_samples(X, y, emotion_map, samples_per_emotion=3):
    """Visualizes sample images from each emotion category."""
    if X is None or y is None:
        print("No data available to visualize.")
        return
    
    # Get all unique emotion labels
    unique_emotions = sorted(np.unique(y))
    num_emotions = len(unique_emotions)
    
    # Create a figure with subplots
    fig, axes = plt.subplots(num_emotions, samples_per_emotion, 
                            figsize=(samples_per_emotion*3, num_emotions*2.5))
    
    # For each emotion, select random samples
    for i, emotion_idx in enumerate(unique_emotions):
        # Get indices of all images with this emotion
        emotion_indices = np.where(y == emotion_idx)[0]
        
        # Select random samples
        if len(emotion_indices) < samples_per_emotion:
            sample_indices = emotion_indices
        else:
            sample_indices = np.random.choice(emotion_indices, samples_per_emotion, replace=False)
        
        # Display each sample
        for j, idx in enumerate(sample_indices):
            ax = axes[i, j] if num_emotions > 1 else axes[j]
            ax.imshow(X[idx], cmap='gray')
            
            if j == 0:  # Only add emotion label to first sample in row
                ax.set_ylabel(emotion_map[emotion_idx], fontsize=12)
            
            ax.set_xticks([])
            ax.set_yticks([])
    
    plt.tight_layout()
    plt.suptitle('Sample Images from Each Emotion Category', y=1.02, fontsize=16)
    plt.show()

# Visualize sample images
visualize_emotion_samples(X, y, emotion_map)

In [None]:
def split_dataset(X, y, test_size=0.1, val_size=0.1):
    """Splits the dataset into training, validation, and test sets."""
    if X is None or y is None:
        print("No data available to split.")
        return None, None, None, None, None, None
    
    # First split: separate test set
    X_temp, X_test, y_temp, y_test = train_test_split(
        X, y, test_size=test_size, random_state=42, stratify=y
    )
    
    # Second split: separate validation set from remaining data
    val_size_adjusted = val_size / (1 - test_size)
    X_train, X_val, y_train, y_val = train_test_split(
        X_temp, y_temp, test_size=val_size_adjusted, random_state=42, stratify=y_temp
    )
    
    print("Dataset split complete:")
    print(f"Training set: {len(X_train)} images ({len(X_train)/len(X)*100:.1f}%)")
    print(f"Validation set: {len(X_val)} images ({len(X_val)/len(X)*100:.1f}%)")
    print(f"Test set: {len(X_test)} images ({len(X_test)/len(X)*100:.1f}%)")
    
    # Verify class distribution is preserved
    def print_distribution(name, labels):
        print(f"\n{name} set emotion distribution:")
        unique, counts = np.unique(labels, return_counts=True)
        for u, c in zip(unique, counts):
            print(f"- {emotion_map[u]}: {c} images ({c/len(labels)*100:.1f}%)")
    
    print_distribution("Training", y_train)
    print_distribution("Validation", y_val)
    print_distribution("Test", y_test)
    
    return X_train, X_val, X_test, y_train, y_val, y_test

# Split the dataset
X_train, X_val, X_test, y_train, y_val, y_test = split_dataset(X, y)

In [None]:
def save_test_set(X_test, y_test, save_dir='data/test_set'):
    """Saves the test set separately to avoid data leakage."""
    if X_test is None or y_test is None:
        return
    
    # Create directory if it doesn't exist
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)
    
    # Save test data
    np.save(os.path.join(save_dir, 'X_test.npy'), X_test)
    np.save(os.path.join(save_dir, 'y_test.npy'), y_test)
    
    print(f"\nTest set saved to {save_dir}")
    print("This test set should only be used for final model evaluation.")

# Save the test set
save_test_set(X_test, y_test)

In [None]:
def preprocess_data(X_train, X_val, X_test, y_train, y_val, y_test):
    """Performs basic preprocessing on the data for modeling."""
    if X_train is None or y_train is None:
        print("No data available to preprocess.")
        return None, None, None, None, None, None
    
    # Normalize pixel values to [0, 1]
    X_train_norm = X_train.astype('float32') / 255.0
    X_val_norm = X_val.astype('float32') / 255.0
    X_test_norm = X_test.astype('float32') / 255.0
    
    # Add channel dimension for CNN models (height, width, channels)
    X_train_cnn = X_train_norm.reshape(X_train_norm.shape[0], 48, 48, 1)
    X_val_cnn = X_val_norm.reshape(X_val_norm.shape[0], 48, 48, 1)
    X_test_cnn = X_test_norm.reshape(X_test_norm.shape[0], 48, 48, 1)
    
    # One-hot encode the labels for categorical crossentropy loss
    num_classes = len(np.unique(y_train))
    y_train_onehot = to_categorical(y_train, num_classes)
    y_val_onehot = to_categorical(y_val, num_classes)
    y_test_onehot = to_categorical(y_test, num_classes)
    
    print("Preprocessing complete:")
    print(f"X_train shape: {X_train_cnn.shape}")
    print(f"X_val shape: {X_val_cnn.shape}")
    print(f"X_test shape: {X_test_cnn.shape}")
    print(f"y_train shape: {y_train_onehot.shape}")
    
    return X_train_cnn, X_val_cnn, X_test_cnn, y_train_onehot, y_val_onehot, y_test_onehot

# Preprocess the data
X_train_cnn, X_val_cnn, X_test_cnn, y_train_onehot, y_val_onehot, y_test_onehot = preprocess_data(
    X_train, X_val, X_test, y_train, y_val, y_test
)

In [None]:
def save_processed_data(X_train, X_val, y_train, y_val, save_dir='data/processed'):
    """Saves the preprocessed data for future use."""
    if X_train is None or y_train is None:
        return
    
    # Create directory if it doesn't exist
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)
    
    # Save preprocessed data
    np.save(os.path.join(save_dir, 'X_train_cnn.npy'), X_train)
    np.save(os.path.join(save_dir, 'X_val_cnn.npy'), X_val)
    np.save(os.path.join(save_dir, 'y_train_onehot.npy'), y_train)
    np.save(os.path.join(save_dir, 'y_val_onehot.npy'), y_val)
    
    print(f"\nPreprocessed data saved to {save_dir}")

# Save the preprocessed data
save_processed_data(X_train_cnn, X_val_cnn, y_train_onehot, y_val_onehot)

## 3. Feature Engineering

In [11]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.utils import to_categorical
from sklearn.preprocessing import StandardScaler
from sklearn.feature_selection import SelectKBest, mutual_info_classif
from scipy import ndimage
import os
import time

# Section 3: Feature Engineering and Data Preprocessing

In [12]:
## 3.1 Enhanced Feature Extraction

def extract_facial_features(images):
    """
    Extract comprehensive features from facial images for emotion classification.
    
    Args:
        images: Array of facial images
        
    Returns:
        DataFrame containing extracted features
    """
    features = []
    feature_groups = {}  # Track feature groups for analysis
    
    for img in images:
        try:
            # Define facial regions with more precise areas for better emotion differentiation
            regions = {
                'left_eye': img[12:24, 12:24],
                'right_eye': img[12:24, 24:36],
                'mouth': img[30:42, 18:30],
                'nose': img[24:30, 18:30],
                'forehead': img[0:12, 12:36],
                'left_cheek': img[24:36, 0:18],
                'right_cheek': img[24:36, 30:48],
                'eyebrows': img[8:16, 12:36],        # Important for anger/disgust
                'upper_mouth': img[28:34, 18:30],    # Important for surprise/fear
                'lower_mouth': img[34:42, 18:30],    # Important for distinguishing happiness
                'between_eyes': img[16:24, 20:28]    # Important for fear/surprise differentiation
            }
            
            # Initialize feature dictionary
            img_features = {}
            
            # --- Global statistical features ---
            feature_groups['global_stats'] = []
            
            img_features['mean_intensity'] = np.mean(img)
            img_features['median_intensity'] = np.median(img)
            img_features['std_intensity'] = np.std(img)
            img_features['min_intensity'] = np.min(img)
            img_features['max_intensity'] = np.max(img)
            img_features['intensity_range'] = img_features['max_intensity'] - img_features['min_intensity']
            
            feature_groups['global_stats'].extend(['mean_intensity', 'median_intensity', 'std_intensity', 
                                                    'min_intensity', 'max_intensity', 'intensity_range'])
            
            # --- Histogram features ---
            feature_groups['histogram'] = []
            
            hist, bins = np.histogram(img, bins=16, range=(0, 256))
            hist = hist / np.sum(hist)  # Normalize
            for i, h in enumerate(hist):
                img_features[f'hist_bin_{i}'] = h
                feature_groups['histogram'].append(f'hist_bin_{i}')
            
            # --- Edge-based features ---
            feature_groups['edge'] = []
            
            sobel_h = ndimage.sobel(img, axis=0)
            sobel_v = ndimage.sobel(img, axis=1)
            magnitude = np.sqrt(sobel_h**2 + sobel_v**2)
            img_features['edge_mean'] = np.mean(magnitude)
            img_features['edge_std'] = np.std(magnitude)
            img_features['edge_max'] = np.max(magnitude)
            
            feature_groups['edge'].extend(['edge_mean', 'edge_std', 'edge_max'])
            
            # Direction of edges (useful for mouth curvature)
            with np.errstate(divide='ignore', invalid='ignore'):
                direction = np.arctan2(sobel_v, sobel_h) * 180 / np.pi
                direction = np.nan_to_num(direction)
            
            # Count edges in different directions
            direction_bins = np.linspace(-180, 180, 9)
            hist_dir, _ = np.histogram(direction, bins=direction_bins)
            hist_dir = hist_dir / np.sum(hist_dir)
            for i, h in enumerate(hist_dir):
                img_features[f'edge_dir_{i}'] = h
                feature_groups['edge'].append(f'edge_dir_{i}')
            
            # --- Regional features ---
            feature_groups['regional'] = []
            
            for region_name, region in regions.items():
                # Regional statistical features
                img_features[f'{region_name}_mean'] = np.mean(region)
                img_features[f'{region_name}_std'] = np.std(region)
                img_features[f'{region_name}_median'] = np.median(region)
                
                feature_groups['regional'].extend([f'{region_name}_mean', f'{region_name}_std', f'{region_name}_median'])
                
                # Regional edge features
                region_sobel_h = ndimage.sobel(region, axis=0)
                region_sobel_v = ndimage.sobel(region, axis=1)
                region_magnitude = np.sqrt(region_sobel_h**2 + region_sobel_v**2)
                img_features[f'{region_name}_edge_mean'] = np.mean(region_magnitude)
                img_features[f'{region_name}_edge_std'] = np.std(region_magnitude)
                
                feature_groups['regional'].extend([f'{region_name}_edge_mean', f'{region_name}_edge_std'])
            
            # --- Emotion-specific features ---
            feature_groups['emotion_specific'] = []
            
            # Mouth shape features (important for distinguishing emotions)
            mouth_region = regions['mouth']
            if mouth_region.size > 0:
                # Measure mouth openness (vertical variance)
                mouth_vertical_profile = np.mean(mouth_region, axis=1)
                img_features['mouth_openness'] = np.var(mouth_vertical_profile)
                
                # Measure mouth width/curvature (horizontal variance)
                mouth_horizontal_profile = np.mean(mouth_region, axis=0)
                img_features['mouth_width_var'] = np.var(mouth_horizontal_profile)
                
                # Enhanced smile detection (higher values at corners than center)
                if mouth_horizontal_profile.size > 2:
                    left_corner = np.mean(mouth_horizontal_profile[:4])
                    right_corner = np.mean(mouth_horizontal_profile[-4:])
                    center = np.mean(mouth_horizontal_profile[4:-4])
                    img_features['smile_metric'] = (left_corner + right_corner) / (2 * center + 1e-10)
                    
                    # Smile asymmetry (helps distinguish genuine from fake smiles)
                    img_features['smile_asymmetry'] = np.abs(left_corner - right_corner) / (left_corner + right_corner + 1e-10)
                else:
                    img_features['smile_metric'] = 0
                    img_features['smile_asymmetry'] = 0
            else:
                img_features['mouth_openness'] = 0
                img_features['mouth_width_var'] = 0
                img_features['smile_metric'] = 0
                img_features['smile_asymmetry'] = 0
            
            feature_groups['emotion_specific'].extend(['mouth_openness', 'mouth_width_var', 'smile_metric', 'smile_asymmetry'])
            
            # Eye features (important for surprise/fear)
            left_eye_region = regions['left_eye']
            right_eye_region = regions['right_eye']
            if left_eye_region.size > 0 and right_eye_region.size > 0:
                # Eye openness detection
                left_eye_vertical = np.mean(left_eye_region, axis=1)
                right_eye_vertical = np.mean(right_eye_region, axis=1)
                img_features['eye_openness'] = (np.var(left_eye_vertical) + np.var(right_eye_vertical)) / 2
                
                # Eye asymmetry (important for certain emotions)
                img_features['eye_asymmetry'] = np.abs(np.var(left_eye_vertical) - np.var(right_eye_vertical)) / (np.var(left_eye_vertical) + np.var(right_eye_vertical) + 1e-10)
            else:
                img_features['eye_openness'] = 0
                img_features['eye_asymmetry'] = 0
            
            feature_groups['emotion_specific'].extend(['eye_openness', 'eye_asymmetry'])
            
            # Eyebrow features (important for anger/disgust)
            eyebrow_region = regions['eyebrows']
            if eyebrow_region.size > 0:
                # Measure eyebrow "furrowing"
                eyebrow_horizontal = np.mean(eyebrow_region, axis=0)
                eyebrow_vertical = np.mean(eyebrow_region, axis=1)
                img_features['eyebrow_var_h'] = np.var(eyebrow_horizontal)
                img_features['eyebrow_var_v'] = np.var(eyebrow_vertical)
                
                # Eyebrow center depression (for anger)
                midpoint = len(eyebrow_horizontal) // 2
                if midpoint > 2:
                    center = np.mean(eyebrow_horizontal[midpoint-2:midpoint+2])
                    sides = (np.mean(eyebrow_horizontal[:4]) + np.mean(eyebrow_horizontal[-4:])) / 2
                    img_features['eyebrow_depression'] = (sides - center) / (sides + 1e-10)
                else:
                    img_features['eyebrow_depression'] = 0
            else:
                img_features['eyebrow_var_h'] = 0
                img_features['eyebrow_var_v'] = 0
                img_features['eyebrow_depression'] = 0
            
            feature_groups['emotion_specific'].extend(['eyebrow_var_h', 'eyebrow_var_v', 'eyebrow_depression'])
            
            # --- Symmetry features ---
            feature_groups['symmetry'] = []
            
            flipped = np.fliplr(img)
            img_features['asymmetry_score'] = np.mean(np.abs(img - flipped))
            feature_groups['symmetry'].append('asymmetry_score')
            
            # Validate all features
            for key, value in list(img_features.items()):
                if np.isnan(value) or np.isinf(value):
                    img_features[key] = 0.0  # Replace with safe default
            
            features.append(img_features)
            
        except Exception as e:
            print(f"Error extracting features: {str(e)}")
            # Add default feature set if extraction fails
            default_features = {'mean_intensity': 0.0, 'median_intensity': 0.0, 'std_intensity': 0.0}
            features.append(default_features)
    
    # Convert to DataFrame with validation
    if not features:
        return pd.DataFrame(), feature_groups
    
    feature_df = pd.DataFrame(features)
    
    # Final validation
    feature_df = feature_df.replace([np.inf, -np.inf], 0.0)
    feature_df = feature_df.fillna(0.0)
    
    return feature_df, feature_groups


## 3.2 Create Emotion Mappimg

In [13]:
## 3.2 Create 5-Class Emotion Mapping

def map_emotions_to_five_classes(emotion_label):
    """
    Maps the 7 original emotion classes to 5 broader categories:
    0: Positive (originally Happy)
    1: Negative-High Arousal (originally Angry, Disgust)
    2: Negative-Low Arousal (originally Sad, Fear)
    3: Surprise (remains the same)
    4: Neutral (remains the same)
    """
    emotion_mapping = {
        0: 1,  # Angry -> Negative-High Arousal
        1: 1,  # Disgust -> Negative-High Arousal
        2: 2,  # Fear -> Negative-Low Arousal
        3: 0,  # Happy -> Positive
        4: 2,  # Sad -> Negative-Low Arousal
        5: 3,  # Surprise -> Surprise (maintains its own category)
        6: 4,  # Neutral -> Neutral
    }
    return emotion_mapping[emotion_label]

def create_five_class_mapping(y_train, y_val, y_test=None):
    """
    Create 5-class emotion mappings from the original 7-class labels.
    
    Args:
        y_train, y_val, y_test: Original 7-class emotion labels
        
    Returns:
        Dictionary containing 5-class labels and one-hot encoded versions
    """
    # Define new emotion map
    new_emotion_map = {
        0: 'Positive',
        1: 'Negative-High Arousal',
        2: 'Negative-Low Arousal',
        3: 'Surprise',
        4: 'Neutral'
    }
    
    # Apply mapping to training and validation sets
    y_train_5class = np.array([map_emotions_to_five_classes(label) for label in y_train])
    y_val_5class = np.array([map_emotions_to_five_classes(label) for label in y_val])
    
    # Process test set if provided
    if y_test is not None:
        y_test_5class = np.array([map_emotions_to_five_classes(label) for label in y_test])
    else:
        y_test_5class = None
    
    # Create one-hot encoded versions for CNN models
    y_train_5class_onehot = to_categorical(y_train_5class, 5)
    y_val_5class_onehot = to_categorical(y_val_5class, 5)
    
    if y_test_5class is not None:
        y_test_5class_onehot = to_categorical(y_test_5class, 5)
    else:
        y_test_5class_onehot = None
    
    # Print statistics
    print("5-class mapping successfully created.")
    print(f"Training labels shape: {y_train_5class.shape}")
    print(f"Validation labels shape: {y_val_5class.shape}")
    
    # Return dictionary with all versions
    return {
        'y_train_5class': y_train_5class,
        'y_val_5class': y_val_5class,
        'y_test_5class': y_test_5class,
        'y_train_5class_onehot': y_train_5class_onehot,
        'y_val_5class_onehot': y_val_5class_onehot,
        'y_test_5class_onehot': y_test_5class_onehot,
        'new_emotion_map': new_emotion_map
    }


## 3.3 Analysis of Distribution 

In [14]:
## 3.3 Analyze 5-Class Distribution

def analyze_class_distribution(y_train, y_val, y_test=None, emotion_map=None):
    """
    Analyze class distribution in the dataset and compute class weights.
    
    Args:
        y_train, y_val, y_test: Labels for each split
        emotion_map: Mapping from label indices to emotion names
    
    Returns:
        Dictionary of class balance information
    """
    from collections import Counter
    from sklearn.utils.class_weight import compute_class_weight
    
    # Default emotion map if none provided
    if emotion_map is None:
        emotion_map = {
            0: 'Positive',
            1: 'Negative-High Arousal',
            2: 'Negative-Low Arousal',
            3: 'Surprise',
            4: 'Neutral'
        }
    
    # Count class occurrences
    train_counts = Counter(y_train)
    val_counts = Counter(y_val)
    test_counts = Counter(y_test) if y_test is not None else None
    
    # Print class distribution
    print("\nClass distribution:")
    
    # Find the total number of classes
    num_classes = len(emotion_map)
    
    for i in range(num_classes):
        if i in train_counts:
            emotion = emotion_map[i]
            train_count = train_counts[i]
            train_pct = train_count / len(y_train) * 100
            val_count = val_counts[i]
            val_pct = val_count / len(y_val) * 100
            
            output = f"{emotion}: Train={train_count} ({train_pct:.1f}%), Val={val_count} ({val_pct:.1f}%)"
            
            if test_counts is not None:
                test_count = test_counts[i]
                test_pct = test_count / len(y_test) * 100
                output += f", Test={test_count} ({test_pct:.1f}%)"
                
            print(output)
    
    # Compute class weights for loss function
    class_weights = compute_class_weight(
        'balanced',
        classes=np.unique(y_train),
        y=y_train
    )
    
    class_weights_dict = {i: class_weights[idx] for idx, i in enumerate(np.unique(y_train))}
    
    # Print class weights
    print("\nClass weights for balanced training:")
    for class_id, weight in class_weights_dict.items():
        print(f"  Class {class_id} ({emotion_map[class_id]}): {weight:.2f}")
    
    # Compute imbalance metrics
    majority_class = max(train_counts.items(), key=lambda x: x[1])[0]
    minority_class = min(train_counts.items(), key=lambda x: x[1])[0]
    imbalance_ratio = train_counts[majority_class] / train_counts[minority_class]
    
    print(f"\nImbalance ratio (majority:minority): {imbalance_ratio:.2f}")
    print(f"Majority class: {emotion_map[majority_class]} ({train_counts[majority_class]} samples)")
    print(f"Minority class: {emotion_map[minority_class]} ({train_counts[minority_class]} samples)")
    
    # Plot class distribution
    plt.figure(figsize=(12, 6))
    
    # Prepare data for plotting
    emotions = [emotion_map[i] for i in range(num_classes) if i in train_counts]
    train_vals = [train_counts[i] for i in range(num_classes) if i in train_counts]
    val_vals = [val_counts[i] for i in range(num_classes) if i in train_counts]
    
    # Create bar plot
    x = np.arange(len(emotions))
    width = 0.25
    
    plt.bar(x - width, train_vals, width, label='Train')
    plt.bar(x, val_vals, width, label='Validation')
    
    if test_counts is not None:
        test_vals = [test_counts[i] for i in range(num_classes) if i in train_counts]
        plt.bar(x + width, test_vals, width, label='Test')
    
    plt.xlabel('Emotion')
    plt.ylabel('Count')
    plt.title('Class Distribution Across Data Splits')
    plt.xticks(x, emotions, rotation=45)
    plt.legend()
    plt.tight_layout()
    plt.show()
    
    return {
        'train_counts': train_counts,
        'val_counts': val_counts,
        'test_counts': test_counts,
        'class_weights': class_weights_dict,
        'imbalance_ratio': imbalance_ratio,
        'majority_class': majority_class,
        'minority_class': minority_class
    }


## 3.4 Data Augmentation Setup

In [15]:

## 3.4 Data Augmentation for 5-Class Model

def create_augmentation_generator(emotion_specific=True):
    """
    Create data augmentation generator optimized for the 5-class emotion model.
    
    Args:
        emotion_specific: Whether to use emotion-specific augmentation parameters
        
    Returns:
        Configured ImageDataGenerator and emotion-specific generators
    """
    # Base augmentation parameters
    base_params = {
        'rotation_range': 15,
        'width_shift_range': 0.1,
        'height_shift_range': 0.1,
        'zoom_range': 0.1,
        'horizontal_flip': True,
        'brightness_range': [0.8, 1.2],
        'fill_mode': 'nearest'
        # No rescale parameter - we'll normalize images separately
    }
    
    # Create generator with base parameters
    train_datagen = ImageDataGenerator(**base_params)
    
    # For emotion-specific augmentation, we now use 5-class categories
    emotion_generators = {}
    if emotion_specific:
        # 5-class emotion-specific augmentation parameters
        emotion_params = {
            # Positive (Happy)
            0: {**base_params, 'rotation_range': 20, 'zoom_range': 0.15},
            
            # Negative-High Arousal (Angry, Disgust)
            1: {**base_params, 'rotation_range': 10, 'brightness_range': [0.7, 1.1]},
            
            # Negative-Low Arousal (Fear, Sad)
            2: {**base_params, 'zoom_range': 0.05, 'brightness_range': [0.85, 1.1]},
            
            # Surprise
            3: {**base_params, 'height_shift_range': 0.05, 'zoom_range': 0.05},
            
            # Neutral
            4: {**base_params, 'rotation_range': 10, 'zoom_range': 0.05}
        }
        
        # Create specialized generators
        for emotion_id, params in emotion_params.items():
            emotion_generators[emotion_id] = ImageDataGenerator(**params)
    
    # Create generator for validation (no augmentation)
    val_datagen = ImageDataGenerator()
    
    print("Created augmentation generators for 5-class model:")
    print("- Base generator for general augmentation")
    if emotion_specific:
        print("- 5 emotion-specific generators with parameters tailored to each emotion category")
    
    return train_datagen, val_datagen, emotion_generators if emotion_specific else None

def generate_augmented_data_func(X_train, y_train_5class, emotion_generators, 
                            samples_per_class=1000, random_state=42):
    """
    Generate augmented samples for each emotion class and combine with original data.
    
    Args:
        X_train: Original training images
        y_train_5class: 5-class emotion labels
        emotion_generators: Dictionary of emotion-specific generators
        samples_per_class: Target number of samples per class after augmentation
        random_state: Random seed for reproducibility
        
    Returns:
        Combined original and augmented data with labels
    """
    # Set random seed for reproducibility
    np.random.seed(random_state)
    
    # Copy original data
    X_augmented = X_train.copy()
    y_augmented = y_train_5class.copy()
    
    print("Generating augmented samples:")
    
    # Get emotion map
    emotion_map = {
        0: 'Positive',
        1: 'Negative-High Arousal',
        2: 'Negative-Low Arousal',
        3: 'Surprise',
        4: 'Neutral'
    }
    
    # For each emotion class
    for emotion in range(5):
        # Count original samples for this class
        original_indices = np.where(y_train_5class == emotion)[0]
        original_count = len(original_indices)
        
        # Calculate how many samples to generate
        target_count = samples_per_class
        generate_count = max(0, target_count - original_count)
        
        print(f"  Class {emotion} ({emotion_map[emotion]}): {original_count} original samples")
        
        if generate_count <= 0:
            print(f"    Already have enough samples, no augmentation needed")
            continue
            
        print(f" Generating {generate_count} augmented samples")
        
        # Select indices to augment (with replacement if needed)
        if original_count < generate_count:
            augment_indices = np.random.choice(original_indices, generate_count, replace=True)
        else:
            augment_indices = np.random.choice(original_indices, generate_count, replace=False)
        
        # Get appropriate generator
        if emotion in emotion_generators:
            generator = emotion_generators[emotion]
        else:
            generator = train_datagen
            
        # Generate augmented samples
        augmented_batch = []
        for i, idx in enumerate(augment_indices):
            # Prepare image for generator - normalize first
            img_float = X_train[idx].astype('float32') / 255.0  # Normalize to [0,1]
            img_reshaped = img_float.reshape(1, 48, 48, 1)
            
            # Generate augmented image
            aug_img = next(generator.flow(img_reshaped, batch_size=1))[0]
            
            # Convert back to original scale if needed
            if X_train.max() > 1.0:
                aug_img = (aug_img * 255.0).astype(X_train.dtype)
            
            # Add to batch
            augmented_batch.append(aug_img.reshape(48, 48))
            
            # Print progress
            if (i+1) % 500 == 0:
                print(f"      Generated {i+1}/{generate_count} samples")
        
        # Add batch to augmented dataset
        X_augmented = np.vstack([X_augmented, np.array(augmented_batch)])
        y_augmented = np.append(y_augmented, np.array([emotion] * len(augmented_batch)))
    
    print("\nAugmented dataset statistics:")
    print(f"  Original dataset size: {len(X_train)} samples")
    print(f"  Augmented dataset size: {len(X_augmented)} samples")
    
    for emotion in range(5):
        original_count = np.sum(y_train_5class == emotion)
        augmented_count = np.sum(y_augmented == emotion)
        print(f"  Class {emotion} ({emotion_map[emotion]}): {original_count} → {augmented_count} samples")
    
    return X_augmented, y_augmented

## 3.5 Pipeline

In [16]:
def preprocess_pipeline(X_train, y_train, X_val, y_val, X_test, y_test,
                        use_5class=True, balance_method=None, 
                        generate_augmented_data=True, samples_per_class=1000,
                        extract_features=True, select_n_features=75,
                        save_results=False, save_path='preprocessed_data'):
    """
    Complete preprocessing pipeline for the facial emotion recognition model.
    
    Args:
        X_train, X_val, X_test: Image data
        y_train, y_val, y_test: Original emotion labels
        use_5class: Whether to use 5-class emotion mapping
        balance_method: Method for class balancing (None, 'augmentation')
        generate_augmented_data: Whether to generate augmented data
        samples_per_class: Target number of samples per class
        extract_features: Whether to extract features
        select_n_features: Number of features to select
        save_results: Whether to save results
        save_path: Path to save results
        
    Returns:
        Dictionary of preprocessed data
    """
    start_time = time.time()
    
    # Step 1: Check for and filter out black images
    print("Step 1: Checking for black images...")
    
    def filter_black_images(X, y, threshold=0.1):
        valid_indices = []
        removed_indices = []
        
        for i in range(len(X)):
            if np.max(X[i]) > threshold:
                valid_indices.append(i)
            else:
                removed_indices.append(i)
        
        X_filtered = X[valid_indices]
        y_filtered = y[valid_indices]
        
        print(f"  Removed {len(removed_indices)} black images out of {len(X)} total images.")
        
        return X_filtered, y_filtered
    
    X_train, y_train = filter_black_images(X_train, y_train)
    X_val, y_val = filter_black_images(X_val, y_val)
    X_test, y_test = filter_black_images(X_test, y_test)
    
    # Step 2: Create 5-class mapping if requested
    if use_5class:
        print("\nStep 2: Creating 5-class emotion mapping...")
        five_class_data = create_five_class_mapping(y_train, y_val, y_test)
        y_train_5class = five_class_data['y_train_5class']
        y_val_5class = five_class_data['y_val_5class']
        y_test_5class = five_class_data['y_test_5class']
        y_train_5class_onehot = five_class_data['y_train_5class_onehot']
        y_val_5class_onehot = five_class_data['y_val_5class_onehot']
        y_test_5class_onehot = five_class_data['y_test_5class_onehot']
        new_emotion_map = five_class_data['new_emotion_map']
        
        # Analyze class distribution
        class_info = analyze_class_distribution(y_train_5class, y_val_5class, y_test_5class, new_emotion_map)
        class_weights = class_info['class_weights']
    else:
        print("\nStep 2: Keeping original 7-class emotions...")
        y_train_5class = y_train
        y_val_5class = y_val
        y_test_5class = y_test
        y_train_5class_onehot = to_categorical(y_train, 7)
        y_val_5class_onehot = to_categorical(y_val, 7)
        y_test_5class_onehot = to_categorical(y_test, 7)
        new_emotion_map = {
            0: 'Angry', 1: 'Disgust', 2: 'Fear', 
            3: 'Happy', 4: 'Sad', 5: 'Surprise', 6: 'Neutral'
        }
        
        # Analyze class distribution
        class_info = analyze_class_distribution(y_train, y_val, y_test, new_emotion_map)
        class_weights = class_info['class_weights']
    
    # Step 3: Normalize images (only once!)
    print("\nStep 3: Normalizing images...")
    
    # Check if already normalized
    if X_train.max() <= 1.0 and X_train.min() >= 0.0:
        print("  Images are already normalized to [0,1] range")
        X_train_norm = X_train
        X_val_norm = X_val
        X_test_norm = X_test
    else:
        print("  Normalizing images to [0,1] range")
        print(f"  Before normalization - X_train min: {X_train.min()}, max: {X_train.max()}, mean: {X_train.mean():.4f}")
        
        X_train_norm = X_train.astype('float32') / 255.0
        X_val_norm = X_val.astype('float32') / 255.0
        X_test_norm = X_test.astype('float32') / 255.0
        
        print(f"  After normalization - X_train min: {X_train_norm.min()}, max: {X_train_norm.max()}, mean: {X_train_norm.mean():.4f}")
    
    # Add channel dimension for CNN models
    X_train_cnn = X_train_norm.reshape(X_train_norm.shape[0], 48, 48, 1)
    X_val_cnn = X_val_norm.reshape(X_val_norm.shape[0], 48, 48, 1)
    X_test_cnn = X_test_norm.reshape(X_test_norm.shape[0], 48, 48, 1)
    
    # Step 4: Generate augmented data if requested
    if generate_augmented_data and balance_method == 'augmentation':
        print("\nStep 4: Generating augmented data...")
        
        # Create augmentation generators
        train_datagen, val_datagen, emotion_generators = create_augmentation_generator(emotion_specific=True)
        
        # Generate augmented data
        print(f"  Generating augmented data with target {samples_per_class} samples per class...")
        X_train_aug, y_train_aug = generate_augmented_data_func(
            X_train, y_train_5class, emotion_generators, 
            samples_per_class=samples_per_class
        )
        
        # Create one-hot encoded labels for augmented data
        num_classes = 5 if use_5class else 7
        y_train_aug_onehot = to_categorical(y_train_aug, num_classes)
        
        # Reshape for CNN
        X_train_aug_cnn = X_train_aug.astype('float32') / 255.0
        X_train_aug_cnn = X_train_aug_cnn.reshape(X_train_aug_cnn.shape[0], 48, 48, 1)
    else:
        print("\nStep 4: Skipping data augmentation")
        X_train_aug = X_train
        y_train_aug = y_train_5class
        y_train_aug_onehot = y_train_5class_onehot
        X_train_aug_cnn = X_train_cnn
        train_datagen = ImageDataGenerator(
            rotation_range=10,
            width_shift_range=0.1,
            height_shift_range=0.1,
            zoom_range=0.1,
            horizontal_flip=True,
            fill_mode='nearest'
        )
        val_datagen = ImageDataGenerator()
    
    # Step 5: Extract features for traditional ML models if requested
    if extract_features:
        print("\nStep 5: Extracting features for traditional ML models...")
        
        # Extract features from original data
        X_train_features, feature_groups = extract_facial_features(X_train)
        X_val_features, _ = extract_facial_features(X_val)
        X_test_features, _ = extract_facial_features(X_test)
        
        print(f"  Extracted {X_train_features.shape[1]} features from each image")
        
        # Extract features from augmented data if it was generated
        if generate_augmented_data and balance_method == 'augmentation':
            print("  Extracting features from augmented data...")
            X_train_aug_features, _ = extract_facial_features(X_train_aug)
        else:
            X_train_aug_features = X_train_features
        
        # Standardize features
        print("  Standardizing features...")
        scaler = StandardScaler()
        X_train_scaled = scaler.fit_transform(X_train_features)
        X_val_scaled = scaler.transform(X_val_features)
        X_test_scaled = scaler.transform(X_test_features)
        
        if generate_augmented_data and balance_method == 'augmentation':
            X_train_aug_scaled = scaler.transform(X_train_aug_features)
        else:
            X_train_aug_scaled = X_train_scaled
        
        # Step 6: Select most informative features if requested
        if select_n_features > 0:
            print(f"\nStep 6: Selecting {select_n_features} most informative features...")
            
            # Feature selection using mutual information
            selector = SelectKBest(mutual_info_classif, k=select_n_features)
            X_train_selected = selector.fit_transform(X_train_scaled, y_train_5class)
            X_val_selected = selector.transform(X_val_scaled)
            X_test_selected = selector.transform(X_test_scaled)
            X_train_aug_selected = selector.transform(X_train_aug_scaled)
            
            # Get feature importance
            feature_scores = selector.scores_
            feature_indices = selector.get_support(indices=True)
            
            # Get feature names if available
            if hasattr(X_train_features, 'columns'):
                feature_names = X_train_features.columns
                selected_features = [feature_names[i] for i in feature_indices]
            else:
                selected_features = [f"feature_{i}" for i in feature_indices]
            
            # Print top features
            print("  Top 10 selected features:")
            sorted_indices = np.argsort(feature_scores[feature_indices])[::-1]
            for i in range(min(10, len(selected_features))):
                idx = sorted_indices[i]
                print(f"    {i+1}. {selected_features[idx]}: {feature_scores[feature_indices[idx]]:.4f}")
        else:
            print("\nStep 6: Skipping feature selection")
            X_train_selected = X_train_scaled
            X_val_selected = X_val_scaled
            X_test_selected = X_test_scaled
            X_train_aug_selected = X_train_aug_scaled
            selector = None
            selected_features = None
    else:
        print("\nSteps 5-6: Skipping feature extraction and selection")
        X_train_features = None
        X_val_features = None
        X_test_features = None
        X_train_aug_features = None
        X_train_scaled = None
        X_val_scaled = None
        X_test_scaled = None
        X_train_aug_scaled = None
        X_train_selected = None
        X_val_selected = None
        X_test_selected = None
        X_train_aug_selected = None
        feature_groups = None
        scaler = None
        selector = None
        selected_features = None
    
    # Step 7: Save results if requested
    if save_results:
        print("\nStep 7: Saving preprocessed data...")
        
        # Create directory if it doesn't exist
        os.makedirs(save_path, exist_ok=True)
        
        # Save numpy arrays
        arrays_to_save = {
            'X_train.npy': X_train,
            'X_val.npy': X_val,
            'X_test.npy': X_test,
            'y_train.npy': y_train,
            'y_val.npy': y_val,
            'y_test.npy': y_test,
            'y_train_5class.npy': y_train_5class,
            'y_val_5class.npy': y_val_5class,
            'y_test_5class.npy': y_test_5class,
            'X_train_norm.npy': X_train_norm,
            'X_val_norm.npy': X_val_norm,
            'X_test_norm.npy': X_test_norm,
            'X_train_cnn.npy': X_train_cnn,
            'X_val_cnn.npy': X_val_cnn,
            'X_test_cnn.npy': X_test_cnn
        }
        
        # Add augmented data if available
        if generate_augmented_data and balance_method == 'augmentation':
            arrays_to_save.update({
                'X_train_aug.npy': X_train_aug,
                'y_train_aug.npy': y_train_aug,
                'X_train_aug_cnn.npy': X_train_aug_cnn
            })
        
        # Add selected features if available
        if extract_features and X_train_selected is not None:
            arrays_to_save.update({
                'X_train_selected.npy': X_train_selected,
                'X_val_selected.npy': X_val_selected,
                'X_test_selected.npy': X_test_selected
            })
            
            if generate_augmented_data and balance_method == 'augmentation':
                arrays_to_save['X_train_aug_selected.npy'] = X_train_aug_selected
        
        # Save arrays
        saved_count = 0
        for filename, array in arrays_to_save.items():
            if array is not None:
                np.save(os.path.join(save_path, filename), array)
                saved_count += 1
        
        # Save other objects
        import pickle
        
        other_data = {
            'class_weights': class_weights,
            'new_emotion_map': new_emotion_map,
            'class_info': class_info,
            'feature_groups': feature_groups,
            'scaler': scaler,
            'selector': selector,
            'selected_features': selected_features
        }
        
        with open(os.path.join(save_path, 'metadata.pkl'), 'wb') as f:
            pickle.dump(other_data, f)
        
        print(f"  Saved {saved_count} arrays and metadata to {save_path}")
    else:
        print("\nStep 7: Skipping saving results")
    
    # Create results dictionary
    results = {
        # Original data
        'X_train': X_train,
        'X_val': X_val,
        'X_test': X_test,
        'y_train': y_train,
        'y_val': y_val,
        'y_test': y_test,
        
        # 5-class data
        'y_train_5class': y_train_5class,
        'y_val_5class': y_val_5class,
        'y_test_5class': y_test_5class,
        'y_train_5class_onehot': y_train_5class_onehot,
        'y_val_5class_onehot': y_val_5class_onehot,
        'y_test_5class_onehot': y_test_5class_onehot,
        
        # Normalized data
        'X_train_norm': X_train_norm,
        'X_val_norm': X_val_norm,
        'X_test_norm': X_test_norm,
        
        # CNN data
        'X_train_cnn': X_train_cnn,
        'X_val_cnn': X_val_cnn,
        'X_test_cnn': X_test_cnn,
        
        # Augmented data
        'X_train_aug': X_train_aug,
        'y_train_aug': y_train_aug,
        'y_train_aug_onehot': y_train_aug_onehot,
        'X_train_aug_cnn': X_train_aug_cnn,
        
        # Feature data
        'X_train_features': X_train_features,
        'X_val_features': X_val_features,
        'X_test_features': X_test_features,
        'X_train_aug_features': X_train_aug_features,
        
        # Scaled feature data
        'X_train_scaled': X_train_scaled,
        'X_val_scaled': X_val_scaled,
        'X_test_scaled': X_test_scaled,
        'X_train_aug_scaled': X_train_aug_scaled,
        
        # Selected feature data
        'X_train_selected': X_train_selected,
        'X_val_selected': X_val_selected, 
        'X_test_selected': X_test_selected,
        'X_train_aug_selected': X_train_aug_selected,
        
        # Metadata
        'class_weights': class_weights,
        'new_emotion_map': new_emotion_map,
        'class_info': class_info,
        'feature_groups': feature_groups,
        'scaler': scaler,
        'selector': selector,
        'selected_features': selected_features,
        
        # Generators
        'train_datagen': train_datagen,
        'val_datagen': val_datagen
    }
    
    # Total execution time
    total_time = time.time() - start_time
    print(f"\nPreprocessing completed in {total_time:.2f} seconds")
    
    return results

In [None]:
processed_data = preprocess_pipeline(
        X_train, y_train, X_val, y_val, X_test, y_test,
        use_5class=True,
        balance_method='augmentation',  # This is crucial for class balancing
        generate_augmented_data=True,   # Enable data augmentation
        samples_per_class=8000,         # Target 8000 samples per class
        extract_features=True,
        select_n_features=75,
        save_results=False              # Set to True if you want to save the results
)

In [None]:
# Quick check to see if preprocessing was successful
if 'processed_data' in globals() and processed_data is not None:
    print("Preprocessing successful!")
    print(f"Shapes of key processed datasets:")
    
    # Check original data
    if 'X_train' in processed_data:
        print(f"  X_train: {processed_data['X_train'].shape}")
    
    # Check augmented data
    if 'X_train_aug' in processed_data:
        print(f"  X_train_aug: {processed_data['X_train_aug'].shape}")
        print(f"  Original vs. Augmented size: {len(processed_data['X_train'])} → {len(processed_data['X_train_aug'])}")
    
    # Check class distribution in augmented data
    if 'y_train_aug' in processed_data:
        from collections import Counter
        counts = Counter(processed_data['y_train_aug'])
        print("\nClass distribution in augmented data:")
        for cls, count in sorted(counts.items()):
            print(f"  Class {cls}: {count} samples")
else:
    print("Preprocessing did not complete successfully or 'processed_data' is not defined.")
    print("You may need to re-run the preprocessing pipeline.")

## 4.0 Models and Evaluations

In [19]:
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score, GridSearchCV, StratifiedKFold
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Dense, Dropout, Flatten, BatchNormalization, Input
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
import time
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

In [20]:
## 4.1 Create and Evaluate 5-Class Traditional ML Models

def train_evaluate_traditional_models(X_train, y_train, X_val, y_val, class_weights=None, model_names=None):
    """
    Train and evaluate traditional ML models for 5-class emotion classification.
    
    Args:
        X_train: Training features
        y_train: Training labels
        X_val: Validation features
        y_val: Validation labels
        class_weights: Class weights to handle class imbalance
        model_names: List of model names to train (default: all)
        
    Returns:
        Dictionary of trained models and performance metrics
    """
    # Define models to train
    all_models = {
        'RandomForest': RandomForestClassifier(
            n_estimators=200,
            max_depth=20,
            min_samples_split=5,
            class_weight='balanced',
            n_jobs=-1,
            random_state=42
        ),
        'GradientBoosting': GradientBoostingClassifier(
            n_estimators=100,
            learning_rate=0.1,
            max_depth=5,
            random_state=42
        ),
        'SVM': SVC(
            kernel='rbf',
            C=10,
            gamma='scale',
            class_weight='balanced',
            probability=True,
            random_state=42
        ),
        'KNN': KNeighborsClassifier(
            n_neighbors=7,
            weights='distance',
            n_jobs=-1
        ),
        'LogisticRegression': LogisticRegression(
            multi_class='multinomial',
            solver='lbfgs',
            max_iter=1000,
            class_weight='balanced',
            random_state=42,
            n_jobs=-1
        )
    }
    
    # Filter models if specified
    if model_names is not None:
        models = {name: all_models[name] for name in model_names if name in all_models}
    else:
        models = all_models
    
    # Dictionary to store results
    results = {}
    
    # Train and evaluate each model
    for name, model in models.items():
        print(f"\nTraining {name}...")
        start_time = time.time()
        
        # Train model
        model.fit(X_train, y_train)
        
        # Make predictions
        y_val_pred = model.predict(X_val)
        
        # Calculate metrics
        accuracy = accuracy_score(y_val, y_val_pred)
        report = classification_report(y_val, y_val_pred, output_dict=True)
        conf_matrix = confusion_matrix(y_val, y_val_pred)
        
        # Calculate training time
        training_time = time.time() - start_time
        
        # Store results
        results[name] = {
            'model': model,
            'accuracy': accuracy,
            'report': report,
            'conf_matrix': conf_matrix,
            'training_time': training_time
        }
        
        # Print results
        print(f"  Accuracy: {accuracy:.4f}")
        print(f"  Training time: {training_time:.2f} seconds")
        print("  Performance by class:")
        
        # Print class-wise metrics
        for i in range(len(report) - 3):  # Skip 'accuracy', 'macro avg', 'weighted avg'
            class_precision = report[str(i)]['precision']
            class_recall = report[str(i)]['recall']
            class_f1 = report[str(i)]['f1-score']
            class_support = report[str(i)]['support']
            
            print(f"    Class {i}: Precision={class_precision:.4f}, Recall={class_recall:.4f}, F1={class_f1:.4f}, Support={class_support}")
    
    # Find best model based on accuracy
    best_model = max(results.items(), key=lambda x: x[1]['accuracy'])
    print(f"\nBest model: {best_model[0]} with accuracy {best_model[1]['accuracy']:.4f}")
    
    return results


In [21]:
def visualize_model_performance(model_results, emotion_map=None):
    """
    Visualize the performance of trained models.
    
    Args:
        model_results: Dictionary of model results
        emotion_map: Mapping from class indices to emotion names
    """
    if emotion_map is None:
        emotion_map = {
            0: 'Positive',
            1: 'Negative-High Arousal',
            2: 'Negative-Low Arousal',
            3: 'Surprise',
            4: 'Neutral'
        }
    
    # Extract model names and accuracies
    model_names = list(model_results.keys())
    accuracies = [model_results[name]['accuracy'] for name in model_names]
    training_times = [model_results[name]['training_time'] for name in model_names]
    
    # Sort by accuracy
    sorted_indices = np.argsort(accuracies)[::-1]
    sorted_names = [model_names[i] for i in sorted_indices]
    sorted_accuracies = [accuracies[i] for i in sorted_indices]
    sorted_times = [training_times[i] for i in sorted_indices]
    
    # Plot accuracies
    plt.figure(figsize=(12, 6))
    
    plt.subplot(1, 2, 1)
    plt.bar(sorted_names, sorted_accuracies)
    plt.ylim(0, 1)
    plt.xlabel('Model')
    plt.ylabel('Accuracy')
    plt.title('Model Accuracy Comparison')
    plt.xticks(rotation=45)
    
    plt.subplot(1, 2, 2)
    plt.bar(sorted_names, sorted_times)
    plt.xlabel('Model')
    plt.ylabel('Training Time (s)')
    plt.title('Model Training Time Comparison')
    plt.xticks(rotation=45)
    
    plt.tight_layout()
    plt.show()
    
    # Plot confusion matrix for the best model
    best_model = sorted_names[0]
    conf_matrix = model_results[best_model]['conf_matrix']
    
    plt.figure(figsize=(10, 8))
    sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues',
                xticklabels=[emotion_map[i] for i in range(len(emotion_map))],
                yticklabels=[emotion_map[i] for i in range(len(emotion_map))])
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.title(f'Confusion Matrix - {best_model}')
    plt.tight_layout()
    plt.show()
    
    # Plot class-wise metrics for all models
    metrics = ['precision', 'recall', 'f1-score']
    num_classes = len(emotion_map)
    
    # Create a figure with a subplot for each metric
    plt.figure(figsize=(15, 5 * len(metrics)))
    
    for m, metric in enumerate(metrics):
        plt.subplot(len(metrics), 1, m+1)
        
        # Extract metric for each model and class
        metric_data = []
        for name in model_names:
            report = model_results[name]['report']
            class_metrics = [report[str(i)][metric] for i in range(num_classes)]
            metric_data.append(class_metrics)
        
        # Convert to numpy array
        metric_data = np.array(metric_data)
        
        # Create heatmap
        sns.heatmap(metric_data, annot=True, fmt='.2f', cmap='YlGnBu',
                    xticklabels=[emotion_map[i] for i in range(num_classes)],
                    yticklabels=model_names)
        plt.xlabel('Emotion Class')
        plt.ylabel('Model')
        plt.title(f'{metric.capitalize()} by Model and Emotion Class')
    
    plt.tight_layout()
    plt.show()

In [22]:
## 4.2 Create and Train 5-Class CNN Models

def create_emotion_cnn(input_shape=(48, 48, 1), num_classes=5):
    """
    Create a CNN model for emotion classification.
    
    Args:
        input_shape: Shape of input images
        num_classes: Number of emotion classes
        
    Returns:
        Compiled Keras model
    """
    model = Sequential([
        # First convolutional block
        Conv2D(32, (3, 3), padding='same', activation='relu', input_shape=input_shape),
        BatchNormalization(),
        Conv2D(32, (3, 3), padding='same', activation='relu'),
        BatchNormalization(),
        MaxPooling2D(pool_size=(2, 2)),
        Dropout(0.25),
        
        # Second convolutional block
        Conv2D(64, (3, 3), padding='same', activation='relu'),
        BatchNormalization(),
        Conv2D(64, (3, 3), padding='same', activation='relu'),
        BatchNormalization(),
        MaxPooling2D(pool_size=(2, 2)),
        Dropout(0.25),
        
        # Third convolutional block
        Conv2D(128, (3, 3), padding='same', activation='relu'),
        BatchNormalization(),
        Conv2D(128, (3, 3), padding='same', activation='relu'),
        BatchNormalization(),
        MaxPooling2D(pool_size=(2, 2)),
        Dropout(0.25),
        
        # Fully connected layers
        Flatten(),
        Dense(512, activation='relu'),
        BatchNormalization(),
        Dropout(0.5),
        Dense(256, activation='relu'),
        BatchNormalization(),
        Dropout(0.5),
        Dense(num_classes, activation='softmax')
    ])
    
    # Compile model
    model.compile(
        optimizer=Adam(learning_rate=0.001),
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )
    
    print("CNN Model Summary:")
    model.summary()
    
    return model


In [23]:
def create_attention_cnn(input_shape=(48, 48, 1), num_classes=5):
    """
    Create a CNN with attention mechanism for emotion classification.
    
    Args:
        input_shape: Shape of input images
        num_classes: Number of emotion classes
        
    Returns:
        Compiled Keras model
    """
    from tensorflow.keras.layers import multiply, Reshape, Permute, GlobalAveragePooling2D, Activation
    
    # Input layer
    inputs = Input(shape=input_shape)
    
    # First convolutional block
    x = Conv2D(32, (3, 3), padding='same', activation='relu')(inputs)
    x = BatchNormalization()(x)
    x = Conv2D(32, (3, 3), padding='same', activation='relu')(x)
    x = BatchNormalization()(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    x = Dropout(0.25)(x)
    
    # Second convolutional block
    x = Conv2D(64, (3, 3), padding='same', activation='relu')(x)
    x = BatchNormalization()(x)
    x = Conv2D(64, (3, 3), padding='same', activation='relu')(x)
    x = BatchNormalization()(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    x = Dropout(0.25)(x)
    
    # Third convolutional block with attention
    x = Conv2D(128, (3, 3), padding='same', activation='relu')(x)
    x = BatchNormalization()(x)
    feature_maps = Conv2D(128, (3, 3), padding='same', activation='relu')(x)
    x = BatchNormalization()(feature_maps)
    
    # Spatial attention mechanism
    attention = Conv2D(1, (1, 1), padding='same', activation='sigmoid')(x)
    x = multiply([feature_maps, attention])
    x = MaxPooling2D(pool_size=(2, 2))(x)
    x = Dropout(0.25)(x)
    
    # Fully connected layers
    x = Flatten()(x)
    x = Dense(512, activation='relu')(x)
    x = BatchNormalization()(x)
    x = Dropout(0.5)(x)
    x = Dense(256, activation='relu')(x)
    x = BatchNormalization()(x)
    x = Dropout(0.5)(x)
    outputs = Dense(num_classes, activation='softmax')(x)
    
    # Create model
    model = Model(inputs=inputs, outputs=outputs)
    
    # Compile model
    model.compile(
        optimizer=Adam(learning_rate=0.001),
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )
    
    print("Attention CNN Model Summary:")
    model.summary()
    
    return model

In [24]:
def train_cnn_model(model, X_train, y_train, X_val, y_val, 
                    batch_size=64, epochs=50, class_weights=None, 
                    model_name="emotion_cnn", save_best=True):
    """
    Train a CNN model with early stopping and learning rate reduction.
    
    Args:
        model: Compiled Keras model
        X_train, y_train: Training data and labels
        X_val, y_val: Validation data and labels
        batch_size: Batch size for training
        epochs: Maximum number of epochs
        class_weights: Class weights for imbalanced data
        model_name: Name for saving the model
        save_best: Whether to save the best model during training
        
    Returns:
        Trained model and training history
    """
    print(f"Training {model_name}...")
    
    # Callbacks
    callbacks = [
        EarlyStopping(
            monitor='val_loss',
            patience=10,
            restore_best_weights=True,
            verbose=1
        ),
        ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.5,
            patience=5,
            min_lr=1e-6,
            verbose=1
        )
    ]
    
    # Add model checkpoint callback if save_best is True
    if save_best:
        callbacks.append(
            ModelCheckpoint(
                f"{model_name}_best.keras",
                monitor='val_accuracy',
                save_best_only=True,
                verbose=1
            )
        )
    
    # Train model
    start_time = time.time()
    
    history = model.fit(
        X_train, y_train,
        batch_size=batch_size,
        epochs=epochs,
        validation_data=(X_val, y_val),
        class_weight=class_weights,
        callbacks=callbacks,
        verbose=1
    )
    
    # Calculate training time
    training_time = time.time() - start_time
    print(f"Training completed in {training_time:.2f} seconds")
    
    return model, history

def visualize_training_history(history, title="Model Training History"):
    """
    Visualize the training history of a CNN model.
    
    Args:
        history: Training history from model.fit()
        title: Plot title
    """
    # Plot accuracy
    plt.figure(figsize=(15, 5))
    
    plt.subplot(1, 2, 1)
    plt.plot(history.history['accuracy'], label='Train')
    plt.plot(history.history['val_accuracy'], label='Validation')
    plt.title('Model Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.grid(True)
    
    # Plot loss
    plt.subplot(1, 2, 2)
    plt.plot(history.history['loss'], label='Train')
    plt.plot(history.history['val_loss'], label='Validation')
    plt.title('Model Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.grid(True)
    
    plt.suptitle(title)
    plt.tight_layout()
    plt.show()

def evaluate_cnn_model(model, X_val, y_val, emotion_map=None):
    """
    Evaluate a trained CNN model on test data.
    
    Args:
        model: Trained Keras model
        X_val: val features
        y_val: val labels (one-hot encoded)
        emotion_map: Mapping from class indices to emotion names
    
    Returns:
        Dictionary of evaluation metrics
    """
    if emotion_map is None:
        emotion_map = {
            0: 'Positive',
            1: 'Negative-High Arousal',
            2: 'Negative-Low Arousal',
            3: 'Surprise',
            4: 'Neutral'
        }
    
    # Get predictions
    y_pred_prob = model.predict(X_val)
    y_pred = np.argmax(y_pred_prob, axis=1)
    y_true = np.argmax(y_val, axis=1)
    
    # Calculate metrics
    accuracy = accuracy_score(y_true, y_pred)
    report = classification_report(y_true, y_pred, output_dict=True)
    conf_matrix = confusion_matrix(y_true, y_pred)
    
    # Print results
    print(f"Model Accuracy: {accuracy:.4f}")
    print("\nPerformance by class:")
    
    # Print class-wise metrics
    for i in range(len(emotion_map)):
        if str(i) in report:
            class_precision = report[str(i)]['precision']
            class_recall = report[str(i)]['recall']
            class_f1 = report[str(i)]['f1-score']
            class_support = report[str(i)]['support']
            
            print(f"  Class {i} ({emotion_map[i]}): Precision={class_precision:.4f}, Recall={class_recall:.4f}, F1={class_f1:.4f}, Support={class_support}")
    
    # Plot confusion matrix
    plt.figure(figsize=(10, 8))
    sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues',
                xticklabels=[emotion_map[i] for i in range(len(emotion_map))],
                yticklabels=[emotion_map[i] for i in range(len(emotion_map))])
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.title('Confusion Matrix')
    plt.tight_layout()
    plt.show()
    
    # Return evaluation results
    return {
        'accuracy': accuracy,
        'report': report,
        'conf_matrix': conf_matrix
    }

def visualize_model_predictions(model, X_val, y_val, emotion_map=None, num_samples=6):
    """
    Visualize model predictions on sample test images.
    
    Args:
        model: Trained Keras model
        X_val: val features
        y_val: val labels (one-hot encoded)
        emotion_map: Mapping from class indices to emotion names
        num_samples: Number of samples to visualize
    """
    if emotion_map is None:
        emotion_map = {
            0: 'Positive',
            1: 'Negative-High Arousal',
            2: 'Negative-Low Arousal',
            3: 'Surprise',
            4: 'Neutral'
        }
    
    # Get predictions
    y_pred_prob = model.predict(X_val)
    y_pred = np.argmax(y_pred_prob, axis=1)
    y_true = np.argmax(y_val, axis=1)
    
    # Find some correctly and incorrectly classified examples
    correct_indices = np.where(y_pred == y_true)[0]
    incorrect_indices = np.where(y_pred != y_true)[0]
    
    # Determine how many of each to show
    n_correct = min(num_samples // 2 + num_samples % 2, len(correct_indices))
    n_incorrect = min(num_samples // 2, len(incorrect_indices))
    
    # Sample indices
    if len(correct_indices) > 0:
        sampled_correct = np.random.choice(correct_indices, n_correct, replace=False)
    else:
        sampled_correct = []
    
    if len(incorrect_indices) > 0:
        sampled_incorrect = np.random.choice(incorrect_indices, n_incorrect, replace=False)
    else:
        sampled_incorrect = []
    
    # Combine samples
    sampled_indices = np.concatenate([sampled_correct, sampled_incorrect])
    
    # Create figure
    plt.figure(figsize=(15, n_correct + n_incorrect))
    
    # Plot each sample
    for i, idx in enumerate(sampled_indices):
        # Get image and reshape if needed
        img = X_val[idx]
        if len(img.shape) == 3 and img.shape[-1] == 1:
            img = img.reshape(img.shape[0], img.shape[1])
        
        # Get true and predicted labels
        true_label = y_true[idx]
        pred_label = y_pred[idx]
        pred_prob = y_pred_prob[idx][pred_label]
        
        # Determine color based on correctness
        color = 'green' if true_label == pred_label else 'red'
        
        # Plot image
        plt.subplot(len(sampled_indices), 1, i+1)
        plt.imshow(img, cmap='gray')
        plt.title(f"True: {emotion_map[true_label]} | Predicted: {emotion_map[pred_label]} ({pred_prob:.2f})", 
                    color=color)
        plt.axis('off')
    
    plt.tight_layout()
    plt.show()

def compare_models_performance(results_dict, emotion_map=None):
    """
    Compare the performance of multiple trained models.
    
    Args:
        results_dict: Dictionary of model evaluation results
        emotion_map: Mapping from class indices to emotion names
    """
    if emotion_map is None:
        emotion_map = {
            0: 'Positive',
            1: 'Negative-High Arousal',
            2: 'Negative-Low Arousal',
            3: 'Surprise',
            4: 'Neutral'
        }
    
    # Extract model names and accuracies
    model_names = list(results_dict.keys())
    accuracies = [results_dict[name]['accuracy'] for name in model_names]
    
    # Create bar chart for accuracies
    plt.figure(figsize=(12, 6))
    plt.bar(model_names, accuracies)
    plt.xlabel('Model')
    plt.ylabel('Accuracy')
    plt.title('Model Accuracy Comparison')
    plt.ylim(0, 1)
    plt.xticks(rotation=45)
    plt.grid(axis='y', alpha=0.3)
    
    # Add accuracy values above bars
    for i, v in enumerate(accuracies):
        plt.text(i, v + 0.01, f"{v:.4f}", ha='center')
    
    plt.tight_layout()
    plt.show()
    
    # Create a table for class-wise F1 scores
    plt.figure(figsize=(10, 6))
    plt.title('Class-wise F1 Scores')
    
    # Extract F1 scores for each model and class
    f1_scores = np.zeros((len(model_names), len(emotion_map)))
    
    for i, name in enumerate(model_names):
        report = results_dict[name]['report']
        for j in range(len(emotion_map)):
            if str(j) in report:
                f1_scores[i, j] = report[str(j)]['f1-score']
    
    # Create heatmap
    sns.heatmap(f1_scores, annot=True, fmt='.3f', cmap='YlGnBu',
                xticklabels=[emotion_map[i] for i in range(len(emotion_map))],
                yticklabels=model_names)
    plt.xlabel('Emotion Class')
    plt.ylabel('Model')
    plt.tight_layout()
    plt.show()

In [None]:
# Train Traditional ML Models
# First, make sure you have the right features for traditional ML models
if processed_data is not None and 'X_train_selected' in processed_data:
    print("Training traditional ML models...")
    ml_results = train_evaluate_traditional_models(
        processed_data['X_train_selected'], 
        processed_data['y_train_5class'],
        processed_data['X_val_selected'], 
        processed_data['y_val_5class'],
        class_weights=processed_data['class_weights']
    )
    
    # Visualize model performance
    visualize_model_performance(ml_results, processed_data['new_emotion_map'])
else:
    print("Cannot train traditional ML models: processed data not available or missing selected features")

# Train CNN Models
if processed_data is not None and 'X_train_aug_cnn' in processed_data:
    # 1. Basic CNN model
    print("\nTraining basic CNN model...")
    cnn_model = create_emotion_cnn(input_shape=(48, 48, 1), num_classes=5)
    
    trained_cnn, cnn_history = train_cnn_model(
        cnn_model,
        processed_data['X_train_aug_cnn'],
        processed_data['y_train_aug_onehot'],
        processed_data['X_val_cnn'],
        processed_data['y_val_5class_onehot'],
        batch_size=64,
        epochs=50,
        class_weights=processed_data['class_weights'],
        model_name="basic_cnn"
    )
    
    # Visualize training history
    visualize_training_history(cnn_history, title="Basic CNN Training History")
    
    # Evaluate on test set
    cnn_evaluation = evaluate_cnn_model(
        trained_cnn,
        processed_data['X_val_cnn'],
        processed_data['y_val_5class_onehot'],
        emotion_map=processed_data['new_emotion_map']
    )
    
    # 2. Attention CNN model
    print("\nTraining attention CNN model...")
    attention_model = create_attention_cnn(input_shape=(48, 48, 1), num_classes=5)
    
    trained_attention, attention_history = train_cnn_model(
        attention_model,
        processed_data['X_train_aug_cnn'],
        processed_data['y_train_aug_onehot'],
        processed_data['X_val_cnn'],
        processed_data['y_val_5class_onehot'],
        batch_size=64,
        epochs=50,
        class_weights=processed_data['class_weights'],
        model_name="attention_cnn"
    )
    
    # Visualize training history
    visualize_training_history(attention_history, title="Attention CNN Training History")
    
    # Evaluate on val set
    attention_evaluation = evaluate_cnn_model(
        trained_attention,
        processed_data['X_val_cnn'],
        processed_data['y_val_5class_onehot'],
        emotion_map=processed_data['new_emotion_map']
    )
    
    # Compare model performance
    model_comparison = {
        'Basic CNN': cnn_evaluation,
        'Attention CNN': attention_evaluation
    }
    
    compare_models_performance(model_comparison, processed_data['new_emotion_map'])
    
    # Visualize model predictions
    visualize_model_predictions(
        trained_attention,
        processed_data['X_val_cnn'],
        processed_data['y_val_5class_onehot'],
        emotion_map=processed_data['new_emotion_map'],
        num_samples=6
    )
else:
    print("Cannot train CNN models: processed data not available or missing augmented CNN data")

Analysis of Current Results

Model Performance Comparison:

The CNNs (Basic CNN: 66.42%, Attention CNN: 65.47%) significantly outperform the traditional ML models (best was RandomForest at 51.31%)
The attention mechanism doesn't seem to provide a substantial improvement over the basic CNN in overall accuracy


Class-wise Performance:

Both CNN models perform very well on "Positive" emotions (~82% F1 score) and "Surprise" (~74% F1 score)
Both struggle more with "Negative-High Arousal" and "Negative-Low Arousal" categories
There's confusion between similar emotion classes (e.g., between the two negative emotion categories)


Training Dynamics:

From the training history plots, both models show early convergence with validation accuracy stabilizing around 25 epochs
There's a small gap between training and validation accuracy, suggesting the models aren't overfitting significantly
Early stopping is working correctly, preventing overfitting


Traditional ML Models:

RandomForest performed best among traditional models (51.31%)
Traditional models show varying strengths for different emotion classes
The training time comparison shows a big difference - SVM and GradientBoosting took ~5 minutes while RandomForest took only 5 seconds

## 5. Continued Model Dev And Fine tuning On Validation Set 

In [26]:
## 5. Advanced Model Development and Fine-Tuning
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import time
import os
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import (Conv2D, MaxPooling2D, Dense, Dropout, Flatten, 
                                    BatchNormalization, Input, GlobalAveragePooling2D,
                                    SeparableConv2D, Activation, add, Lambda)
from tensorflow.keras.optimizers import Adam, RMSprop, SGD
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import VGG16, ResNet50, EfficientNetB0
from tensorflow.keras.losses import CategoricalCrossentropy
from tensorflow.keras.regularizers import l2
from tensorflow.keras import backend as K
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
from sklearn.model_selection import KFold, StratifiedKFold

## 5.1 Hyperparamters

In [27]:
## 5.1 Hyperparameter Optimization for CNNs

def create_improved_cnn(input_shape=(48, 48, 1), num_classes=5, 
                        filters=(32, 64, 128), kernel_size=(3, 3), 
                        dropout_rates=(0.25, 0.25, 0.25, 0.5, 0.5),
                        use_batch_norm=True, learning_rate=0.001, 
                        optimizer_name='adam', l2_reg=0.0001):
    """
    Create an improved CNN model with configurable hyperparameters.
    
    Args:
        input_shape: Shape of input images
        num_classes: Number of emotion classes
        filters: Tuple of filter counts for each conv block
        kernel_size: Size of convolutional kernels
        dropout_rates: Dropout rates for each block
        use_batch_norm: Whether to use batch normalization
        learning_rate: Initial learning rate
        optimizer_name: Name of optimizer to use ('adam', 'rmsprop', or 'sgd')
        l2_reg: L2 regularization strength
        
    Returns:
        Compiled Keras model
    """
    # Initialize model
    model = Sequential()
    
    # First convolutional block
    model.add(Conv2D(filters[0], kernel_size, padding='same', 
                    activation='relu', input_shape=input_shape,
                    kernel_regularizer=l2(l2_reg)))
    if use_batch_norm:
        model.add(BatchNormalization())
    model.add(Conv2D(filters[0], kernel_size, padding='same', 
                    activation='relu', kernel_regularizer=l2(l2_reg)))
    if use_batch_norm:
        model.add(BatchNormalization())
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Dropout(dropout_rates[0]))
    
    # Second convolutional block
    model.add(Conv2D(filters[1], kernel_size, padding='same', 
                    activation='relu', kernel_regularizer=l2(l2_reg)))
    if use_batch_norm:
        model.add(BatchNormalization())
    model.add(Conv2D(filters[1], kernel_size, padding='same', 
                    activation='relu', kernel_regularizer=l2(l2_reg)))
    if use_batch_norm:
        model.add(BatchNormalization())
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Dropout(dropout_rates[1]))
    
    # Third convolutional block
    model.add(Conv2D(filters[2], kernel_size, padding='same', 
                    activation='relu', kernel_regularizer=l2(l2_reg)))
    if use_batch_norm:
        model.add(BatchNormalization())
    model.add(Conv2D(filters[2], kernel_size, padding='same', 
                    activation='relu', kernel_regularizer=l2(l2_reg)))
    if use_batch_norm:
        model.add(BatchNormalization())
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Dropout(dropout_rates[2]))
    
    # Fully connected layers
    model.add(Flatten())
    model.add(Dense(512, activation='relu', kernel_regularizer=l2(l2_reg)))
    if use_batch_norm:
        model.add(BatchNormalization())
    model.add(Dropout(dropout_rates[3]))
    model.add(Dense(256, activation='relu', kernel_regularizer=l2(l2_reg)))
    if use_batch_norm:
        model.add(BatchNormalization())
    model.add(Dropout(dropout_rates[4]))
    model.add(Dense(num_classes, activation='softmax'))
    
    # Select optimizer
    if optimizer_name.lower() == 'adam':
        optimizer = Adam(learning_rate=learning_rate)
    elif optimizer_name.lower() == 'rmsprop':
        optimizer = RMSprop(learning_rate=learning_rate)
    elif optimizer_name.lower() == 'sgd':
        optimizer = SGD(learning_rate=learning_rate, momentum=0.9)
    else:
        print(f"Unrecognized optimizer: {optimizer_name}. Using Adam.")
        optimizer = Adam(learning_rate=learning_rate)
    
    # Compile model
    model.compile(
        optimizer=optimizer,
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )
    
    return model

def create_focal_loss(gamma=2.0, alpha=0.25):
    """
    Creates a focal loss function to address class imbalance more effectively.
    
    Args:
        gamma: Focusing parameter (higher values focus more on hard examples)
        alpha: Class balance parameter
        
    Returns:
        Focal loss function compatible with Keras
    """
    def focal_loss(y_true, y_pred):
        # Clip predictions to prevent NaN or Inf values
        epsilon = K.epsilon()
        y_pred = K.clip(y_pred, epsilon, 1.0 - epsilon)
        
        # Calculate cross entropy
        cross_entropy = -y_true * K.log(y_pred)
        
        # Calculate focal loss
        loss = alpha * K.pow(1 - y_pred, gamma) * cross_entropy
        
        # Sum over classes and average over samples
        return K.mean(K.sum(loss, axis=-1))
        
    return focal_loss

def grid_search_cnn_hyperparameters(X_train, y_train, X_val, y_val, class_weights=None):
    """
    Perform a grid search over CNN hyperparameters using training and validation data only.
    
    Args:
        X_train, y_train: Training data and labels
        X_val, y_val: Validation data and labels
        class_weights: Class weights for imbalanced data
        
    Returns:
        Dictionary of results for each hyperparameter combination
    """
    # Define hyperparameter grid
    param_grid = {
        'learning_rate': [0.001, 0.0005, 0.0001],
        'optimizer': ['adam', 'rmsprop'],
        'dropout_rates': [
            (0.25, 0.25, 0.25, 0.5, 0.5),  # Original
            (0.3, 0.3, 0.3, 0.5, 0.5),     # Higher in conv layers
            (0.2, 0.2, 0.2, 0.6, 0.6)      # Higher in dense layers
        ],
        'filters': [
            (32, 64, 128),                 # Original
            (64, 128, 256),                # More filters
            (16, 32, 64)                   # Fewer filters
        ],
        'use_focal_loss': [False, True]    # Whether to use focal loss
    }
    
    # Initialize results dictionary
    results = {}
    
    # Track best model
    best_val_acc = 0
    best_params = None
    best_model = None
    
    # Define training parameters
    batch_size = 64
    epochs = 30
    
    # Define callbacks
    callbacks = [
        EarlyStopping(
            monitor='val_loss',
            patience=8,
            restore_best_weights=True,
            verbose=1
        ),
        ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.5,
            patience=3,
            min_lr=1e-6,
            verbose=1
        )
    ]
    
    # Iterate through hyperparameter combinations
    param_combinations = []
    
    # Generate all parameter combinations (subset of full grid)
    for lr in param_grid['learning_rate']:
        for opt in param_grid['optimizer']:
            for dr in [param_grid['dropout_rates'][0]]:  # Just use first dropout config for brevity
                for filt in param_grid['filters']:
                    for use_focal in param_grid['use_focal_loss']:
                        param_combinations.append({
                            'learning_rate': lr,
                            'optimizer': opt,
                            'dropout_rates': dr,
                            'filters': filt,
                            'use_focal_loss': use_focal
                        })
    
    # Limit to a manageable number of combinations
    param_combinations = param_combinations[:6]  # Adjust based on available time/resources
    
    print(f"Performing grid search with {len(param_combinations)} parameter combinations")
    
    # Iterate through parameter combinations
    for i, params in enumerate(param_combinations):
        print(f"\nTraining model {i+1}/{len(param_combinations)}:")
        print(f"Parameters: {params}")
        
        # Create model
        model = create_improved_cnn(
            input_shape=X_train.shape[1:],
            num_classes=y_train.shape[1],
            filters=params['filters'],
            dropout_rates=params['dropout_rates'],
            learning_rate=params['learning_rate'],
            optimizer_name=params['optimizer']
        )
        
        # Use focal loss if specified
        if params['use_focal_loss']:
            model.compile(
                optimizer=model.optimizer,
                loss=create_focal_loss(gamma=2.0),
                metrics=['accuracy']
            )
        
        # Train model
        start_time = time.time()
        
        history = model.fit(
            X_train, y_train,
            batch_size=batch_size,
            epochs=epochs,
            validation_data=(X_val, y_val),
            class_weight=class_weights,
            callbacks=callbacks,
            verbose=1
        )
        
        training_time = time.time() - start_time
        
        # Evaluate model on validation data only
        val_loss, val_acc = model.evaluate(X_val, y_val, verbose=0)
        
        # Save results
        results[i] = {
            'params': params,
            'val_accuracy': val_acc,
            'val_loss': val_loss,
            'history': history.history,
            'training_time': training_time
        }
        
        # Check if this is the best model
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            best_params = params
            best_model = model
        
        print(f"Validation accuracy: {val_acc:.4f}, Validation loss: {val_loss:.4f}")
        print(f"Training time: {training_time:.2f} seconds")
        
        # Clear model to free memory
        K.clear_session()
    
    # Print best parameters
    print("\nGrid search complete!")
    print(f"Best validation accuracy: {best_val_acc:.4f}")
    print(f"Best parameters: {best_params}")
    
    return results, best_params, best_model

def visualize_grid_search_results(results):
    """
    Visualize the results of the grid search.
    
    Args:
        results: Dictionary of grid search results
    """
    # Extract results
    param_indices = list(results.keys())
    val_accuracies = [results[i]['val_accuracy'] for i in param_indices]
    val_losses = [results[i]['val_loss'] for i in param_indices]
    training_times = [results[i]['training_time'] for i in param_indices]
    
    # Sort by validation accuracy
    sorted_indices = np.argsort(val_accuracies)[::-1]
    sorted_param_indices = [param_indices[i] for i in sorted_indices]
    sorted_val_accs = [val_accuracies[i] for i in sorted_indices]
    sorted_val_losses = [val_losses[i] for i in sorted_indices]
    sorted_times = [training_times[i] for i in sorted_indices]
    
    # Create labels for x-axis
    labels = [
        f"M{idx}: " + 
        f"LR={results[idx]['params']['learning_rate']}, " +
        f"Opt={results[idx]['params']['optimizer'][:3]}, " +
        f"Filt={results[idx]['params']['filters'][0]}"
        for idx in sorted_param_indices
    ]
    
    # Plotting
    plt.figure(figsize=(14, 10))
    
    # Plot validation accuracy
    plt.subplot(2, 1, 1)
    bars = plt.bar(range(len(sorted_param_indices)), sorted_val_accs, color='skyblue')
    plt.title('Validation Accuracy by Hyperparameter Combination')
    plt.ylabel('Validation Accuracy')
    plt.xticks(range(len(sorted_param_indices)), range(1, len(sorted_param_indices) + 1))
    
    # Add accuracy values on bars
    for i, bar in enumerate(bars):
        plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.005, 
                f"{sorted_val_accs[i]:.4f}", ha='center')
    
    # Plot validation loss
    plt.subplot(2, 1, 2)
    plt.bar(range(len(sorted_param_indices)), sorted_val_losses, color='salmon')
    plt.title('Validation Loss by Hyperparameter Combination')
    plt.xlabel('Model Configuration')
    plt.ylabel('Validation Loss')
    plt.xticks(range(len(sorted_param_indices)), labels, rotation=45, ha='right')
    
    plt.tight_layout()
    plt.show()
    
    # Plot learning curves for best model
    best_idx = sorted_param_indices[0]
    history = results[best_idx]['history']
    
    plt.figure(figsize=(12, 5))
    
    plt.subplot(1, 2, 1)
    plt.plot(history['accuracy'], label='Train')
    plt.plot(history['val_accuracy'], label='Validation')
    plt.title(f'Model Accuracy (Best Configuration)')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.grid(True)
    
    plt.subplot(1, 2, 2)
    plt.plot(history['loss'], label='Train')
    plt.plot(history['val_loss'], label='Validation')
    plt.title(f'Model Loss (Best Configuration)')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.grid(True)
    
    plt.tight_layout()
    plt.show()

## 5.2 Cross-Val

In [28]:
## 5.2 Cross-Validation for Robust Performance Estimation

def perform_cross_validation(X_train, y_train, model_fn, n_splits=5, batch_size=64, epochs=30, 
                            class_weights=None, use_augmentation=True):
    """
    Perform k-fold cross-validation on the emotion recognition model using only training data.
    
    Args:
        X_train, y_train: Training features and labels
        model_fn: Function to create model
        n_splits: Number of cross-validation folds
        batch_size: Batch size for training
        epochs: Maximum number of epochs
        class_weights: Class weights for imbalanced data
        use_augmentation: Whether to use data augmentation
        
    Returns:
        Dictionary of cross-validation results
    """
    # Initialize stratified k-fold
    kfold = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)
    
    # Convert one-hot encoded labels back to class indices for stratification
    if len(y_train.shape) > 1 and y_train.shape[1] > 1:
        y_indices = np.argmax(y_train, axis=1)
    else:
        y_indices = y_train
    
    # Initialize results
    fold_accuracies = []
    fold_losses = []
    fold_histories = []
    fold_models = []
    
    # Define callbacks
    callbacks = [
        EarlyStopping(
            monitor='val_loss',
            patience=8,
            restore_best_weights=True,
            verbose=1
        ),
        ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.5,
            patience=3,
            min_lr=1e-6,
            verbose=1
        )
    ]
    
    # Create data augmentation generator if requested
    if use_augmentation:
        train_datagen = ImageDataGenerator(
            rotation_range=15,
            width_shift_range=0.1,
            height_shift_range=0.1,
            zoom_range=0.1,
            horizontal_flip=True,
            brightness_range=[0.8, 1.2],
            fill_mode='nearest'
        )
    
    # Perform k-fold cross-validation
    for fold, (train_idx, val_idx) in enumerate(kfold.split(X_train, y_indices)):
        print(f"\nTraining fold {fold+1}/{n_splits}")
        
        # Split data
        X_train_fold, X_val_fold = X_train[train_idx], X_train[val_idx]
        
        # Handle one-hot encoded labels properly
        if len(y_train.shape) > 1 and y_train.shape[1] > 1:
            y_train_fold, y_val_fold = y_train[train_idx], y_train[val_idx]
        else:
            y_train_fold, y_val_fold = y_train[train_idx], y_train[val_idx]
        
        # Create model
        model = model_fn()
        
        # Train model
        if use_augmentation:
            # Use data generator with augmentation
            train_generator = train_datagen.flow(
                X_train_fold, y_train_fold, 
                batch_size=batch_size
            )
            
            history = model.fit(
                train_generator,
                steps_per_epoch=len(X_train_fold) // batch_size,
                epochs=epochs,
                validation_data=(X_val_fold, y_val_fold),
                class_weight=class_weights,
                callbacks=callbacks,
                verbose=1
            )
        else:
            # Train without augmentation
            history = model.fit(
                X_train_fold, y_train_fold,
                batch_size=batch_size,
                epochs=epochs,
                validation_data=(X_val_fold, y_val_fold),
                class_weight=class_weights,
                callbacks=callbacks,
                verbose=1
            )
        
        # Evaluate model on fold validation data
        val_loss, val_acc = model.evaluate(X_val_fold, y_val_fold, verbose=0)
        
        # Store results
        fold_accuracies.append(val_acc)
        fold_losses.append(val_loss)
        fold_histories.append(history.history)
        fold_models.append(model)
        
        print(f"Fold {fold+1} - Validation accuracy: {val_acc:.4f}, Validation loss: {val_loss:.4f}")
    
    # Calculate average performance
    avg_accuracy = np.mean(fold_accuracies)
    avg_loss = np.mean(fold_losses)
    std_accuracy = np.std(fold_accuracies)
    
    print("\nCross-validation complete!")
    print(f"Average validation accuracy: {avg_accuracy:.4f} ± {std_accuracy:.4f}")
    print(f"Average validation loss: {avg_loss:.4f}")
    
    # Return results
    return {
        'fold_accuracies': fold_accuracies,
        'fold_losses': fold_losses,
        'fold_histories': fold_histories,
        'fold_models': fold_models,
        'avg_accuracy': avg_accuracy,
        'avg_loss': avg_loss,
        'std_accuracy': std_accuracy
    }

def visualize_cross_validation_results(cv_results):
    """
    Visualize the results of cross-validation.
    
    Args:
        cv_results: Dictionary of cross-validation results
    """
    # Extract results
    fold_accuracies = cv_results['fold_accuracies']
    fold_losses = cv_results['fold_losses']
    fold_histories = cv_results['fold_histories']
    avg_accuracy = cv_results['avg_accuracy']
    std_accuracy = cv_results['std_accuracy']
    
    # Plot fold accuracies
    plt.figure(figsize=(12, 5))
    
    plt.subplot(1, 2, 1)
    plt.bar(range(1, len(fold_accuracies) + 1), fold_accuracies, color='skyblue')
    plt.axhline(y=avg_accuracy, color='r', linestyle='-', label=f'Average: {avg_accuracy:.4f}')
    plt.axhline(y=avg_accuracy + std_accuracy, color='r', linestyle='--', alpha=0.5,
                label=f'± Std Dev: {std_accuracy:.4f}')
    plt.axhline(y=avg_accuracy - std_accuracy, color='r', linestyle='--', alpha=0.5)
    plt.title('Validation Accuracy by Fold')
    plt.xlabel('Fold')
    plt.ylabel('Validation Accuracy')
    plt.xticks(range(1, len(fold_accuracies) + 1))
    plt.legend()
    plt.grid(True, alpha=0.3)
    
    plt.subplot(1, 2, 2)
    plt.bar(range(1, len(fold_losses) + 1), fold_losses, color='salmon')
    plt.axhline(y=cv_results['avg_loss'], color='r', linestyle='-', 
                label=f'Average: {cv_results["avg_loss"]:.4f}')
    plt.title('Validation Loss by Fold')
    plt.xlabel('Fold')
    plt.ylabel('Validation Loss')
    plt.xticks(range(1, len(fold_losses) + 1))
    plt.legend()
    plt.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    # Plot learning curves for each fold
    plt.figure(figsize=(14, 8))
    
    plt.subplot(2, 1, 1)
    for i, history in enumerate(fold_histories):
        plt.plot(history['val_accuracy'], label=f'Fold {i+1}')
    plt.title('Validation Accuracy During Training (All Folds)')
    plt.xlabel('Epoch')
    plt.ylabel('Validation Accuracy')
    plt.legend()
    plt.grid(True)
    
    plt.subplot(2, 1, 2)
    for i, history in enumerate(fold_histories):
        plt.plot(history['val_loss'], label=f'Fold {i+1}')
    plt.title('Validation Loss During Training (All Folds)')
    plt.xlabel('Epoch')
    plt.ylabel('Validation Loss')
    plt.legend()
    plt.grid(True)
    
    plt.tight_layout()
    plt.show()

## 5.3 Transfer Learning 

In [29]:
## 5.3 Transfer Learning with Pre-trained Models

def create_transfer_learning_model(base_model_name='vgg16', input_shape=(48, 48, 3), 
                                    num_classes=5, learning_rate=0.0001, trainable_layers=0):
    """
    Create a transfer learning model based on a pre-trained CNN.
    
    Args:
        base_model_name: Name of pre-trained model ('vgg16', 'resnet50', or 'efficientnet')
        input_shape: Shape of input images
        num_classes: Number of emotion classes
        learning_rate: Initial learning rate
        trainable_layers: Number of layers to make trainable from the top
        
    Returns:
        Compiled Keras model
    """
    # Handle grayscale input
    if input_shape[-1] == 1:
        # Create a Lambda layer to convert grayscale to RGB
        preprocessing_layer = Lambda(lambda x: K.repeat_elements(x, 3, axis=-1))
        input_rgb_shape = (input_shape[0], input_shape[1], 3)
    else:
        preprocessing_layer = None
        input_rgb_shape = input_shape
    
    # Load pre-trained base model
    if base_model_name.lower() == 'vgg16':
        base_model = VGG16(include_top=False, weights='imagenet', input_shape=input_rgb_shape)
    elif base_model_name.lower() == 'resnet50':
        base_model = ResNet50(include_top=False, weights='imagenet', input_shape=input_rgb_shape)
    elif base_model_name.lower() == 'efficientnet':
        base_model = EfficientNetB0(include_top=False, weights='imagenet', input_shape=input_rgb_shape)
    else:
        raise ValueError(f"Unsupported base model: {base_model_name}")
    
    # Freeze base model
    base_model.trainable = False
    
    # Make some layers trainable if requested
    if trainable_layers > 0:
        for layer in base_model.layers[-trainable_layers:]:
            layer.trainable = True
    
    # Create full model
    inputs = Input(shape=input_shape)
    
    # Apply preprocessing if needed
    if preprocessing_layer is not None:
        x = preprocessing_layer(inputs)
    else:
        x = inputs
    
    # Pass inputs through base model
    x = base_model(x)
    x = GlobalAveragePooling2D()(x)
    x = Dense(512, activation='relu')(x)
    x = BatchNormalization()(x)
    x = Dropout(0.5)(x)
    x = Dense(256, activation='relu')(x)
    x = BatchNormalization()(x)
    x = Dropout(0.5)(x)
    outputs = Dense(num_classes, activation='softmax')(x)
    
    model = Model(inputs=inputs, outputs=outputs)
    
    # Compile model
    model.compile(
        optimizer=Adam(learning_rate=learning_rate),
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )
    
    return model

def fine_tune_transfer_learning_model(model, X_train, y_train, X_val, y_val, 
                                        batch_size=32, epochs=20, class_weights=None):
    """
    Fine-tune a transfer learning model using training and validation data only.
    
    Args:
        model: Pre-trained transfer learning model
        X_train, y_train: Training data and labels
        X_val, y_val: Validation data and labels
        batch_size: Batch size for training
        epochs: Maximum number of epochs
        class_weights: Class weights for imbalanced data
        
    Returns:
        Fine-tuned model and training history
    """
    # Define callbacks
    callbacks = [
        EarlyStopping(
            monitor='val_loss',
            patience=8,
            restore_best_weights=True,
            verbose=1
        ),
        ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.5,
            patience=3,
            min_lr=1e-6,
            verbose=1
        )
    ]
    
    # Define data augmentation generator
    train_datagen = ImageDataGenerator(
        rotation_range=15,
        width_shift_range=0.1,
        height_shift_range=0.1,
        zoom_range=0.1,
        horizontal_flip=True,
        brightness_range=[0.8, 1.2],
        fill_mode='nearest'
    )
    
    # Train model
    print("Fine-tuning transfer learning model...")
    
    # Create generator for training
    train_generator = train_datagen.flow(
        X_train, y_train, 
        batch_size=batch_size
    )
    
    history = model.fit(
        train_generator,
        steps_per_epoch=len(X_train) // batch_size,
        epochs=epochs,
        validation_data=(X_val, y_val),
        class_weight=class_weights,
        callbacks=callbacks,
        verbose=1
    )
    
    return model, history


## 5.4 Ensemble 

In [30]:
def create_ensemble_model(models, X_val, y_val, method='average'):
    """
    Create an ensemble of multiple emotion recognition models using validation data.
    
    Args:
        models: List of trained models to ensemble
        X_val, y_val: Validation data for calibration
        method: Ensemble method ('average', 'weighted', or 'stacking')
        
    Returns:
        Ensemble model function and evaluation metrics on validation data
    """
    num_models = len(models)
    num_classes = y_val.shape[1]
    
    # Compute predictions on validation set
    print("Computing validation predictions for ensemble calibration...")
    val_preds = []
    
    for i, model in enumerate(models):
        print(f"Model {i+1}/{num_models}")
        val_pred = model.predict(X_val)
        val_preds.append(val_pred)
    
    if method == 'average':
        # Simple averaging ensemble
        print("Creating averaging ensemble...")
        
        # Define ensemble function
        def ensemble_predict(X):
            # Get predictions from each model
            preds = []
            for model in models:
                pred = model.predict(X)
                preds.append(pred)
            
            # Average predictions
            ensemble_pred = np.mean(preds, axis=0)
            return ensemble_pred
        
        # Evaluate on validation data
        ensemble_val_pred = np.mean(val_preds, axis=0)
        
    elif method == 'weighted':
        # Weighted averaging based on validation performance
        print("Creating weighted ensemble...")
        
        # Calculate model weights based on validation accuracy
        model_accuracies = []
        for i, pred in enumerate(val_preds):
            acc = np.mean(np.argmax(pred, axis=1) == np.argmax(y_val, axis=1))
            model_accuracies.append(acc)
            print(f"Model {i+1}: Validation accuracy = {acc:.4f}")
        
        # Normalize accuracies to get weights
        model_weights = np.array(model_accuracies) / np.sum(model_accuracies)
        print("Model weights:", model_weights)
        
        # Define ensemble function
        def ensemble_predict(X):
            # Get predictions from each model
            preds = []
            for model in models:
                pred = model.predict(X)
                preds.append(pred)
            
            # Compute weighted average
            ensemble_pred = np.zeros_like(preds[0])
            for i, pred in enumerate(preds):
                ensemble_pred += model_weights[i] * pred
                
            return ensemble_pred
        
        # Evaluate on validation data
        ensemble_val_pred = np.zeros_like(val_preds[0])
        for i, pred in enumerate(val_preds):
            ensemble_val_pred += model_weights[i] * pred
            
    elif method == 'stacking':
        # Stacking ensemble using a meta-learner
        print("Creating stacking ensemble...")
        
        # Prepare meta-learner training data
        meta_X = np.concatenate(val_preds, axis=1)
        meta_y = y_val
        
        # Train a simple meta-learner (logistic regression)
        from sklearn.linear_model import LogisticRegression
        meta_learner = LogisticRegression(multi_class='multinomial', max_iter=1000, C=0.1)
        meta_learner.fit(meta_X, np.argmax(meta_y, axis=1))
        
        # Define ensemble function
        def ensemble_predict(X):
            # Get predictions from each model
            preds = []
            for model in models:
                pred = model.predict(X)
                preds.append(pred)
            
            # Combine predictions for meta-learner
            meta_X_pred = np.concatenate(preds, axis=1)
            
            # Get meta-learner predictions
            meta_pred_class = meta_learner.predict(meta_X_pred)
            meta_pred_prob = meta_learner.predict_proba(meta_X_pred)
            
            # Convert to one-hot encoded format
            ensemble_pred = np.zeros((len(meta_pred_class), num_classes))
            for i, cls in enumerate(meta_pred_class):
                ensemble_pred[i, cls] = 1
                
            return meta_pred_prob
        
        # Evaluate on validation data
        meta_X_val = np.concatenate(val_preds, axis=1)
        ensemble_val_pred = meta_learner.predict_proba(meta_X_val)
    
    else:
        raise ValueError(f"Unsupported ensemble method: {method}")
    
    # Evaluate ensemble on validation data only
    ensemble_val_acc = np.mean(np.argmax(ensemble_val_pred, axis=1) == np.argmax(y_val, axis=1))
    print(f"Ensemble validation accuracy: {ensemble_val_acc:.4f}")
    
    # Calculate class-wise metrics
    ensemble_val_pred_class = np.argmax(ensemble_val_pred, axis=1)
    y_val_class = np.argmax(y_val, axis=1)
    
    report = classification_report(y_val_class, ensemble_val_pred_class, output_dict=True)
    conf_matrix = confusion_matrix(y_val_class, ensemble_val_pred_class)
    
    # Return ensemble function and evaluation metrics
    return ensemble_predict, {
        'accuracy': ensemble_val_acc,
        'report': report,
        'conf_matrix': conf_matrix,
        'method': method
    }

In [31]:
def visualize_ensemble_performance(ensemble_metrics, individual_metrics, emotion_map=None):
    """
    Visualize the performance of the ensemble compared to individual models on validation data.
    
    Args:
        ensemble_metrics: Metrics from the ensemble model
        individual_metrics: List of metrics from individual models
        emotion_map: Mapping from class indices to emotion names
    """
    if emotion_map is None:
        emotion_map = {
            0: 'Positive',
            1: 'Negative-High Arousal',
            2: 'Negative-Low Arousal',
            3: 'Surprise',
            4: 'Neutral'
        }
    
    # Extract accuracies
    model_accs = [metrics['accuracy'] for metrics in individual_metrics]
    model_accs.append(ensemble_metrics['accuracy'])
    
    # Create labels
    model_labels = [f"Model {i+1}" for i in range(len(individual_metrics))]
    model_labels.append(f"Ensemble ({ensemble_metrics['method']})")
    
    # Plot accuracies
    plt.figure(figsize=(10, 6))
    plt.bar(model_labels, model_accs, color=['skyblue'] * len(individual_metrics) + ['orange'])
    plt.title('Model Accuracy Comparison')
    plt.xlabel('Model')
    plt.ylabel('Validation Accuracy')
    plt.ylim(0, 1)
    plt.grid(axis='y', alpha=0.3)
    
    # Add accuracy values on bars
    for i, v in enumerate(model_accs):
        plt.text(i, v + 0.01, f"{v:.4f}", ha='center')
    
    plt.tight_layout()
    plt.show()
    
    # Plot class-wise F1 scores
    plt.figure(figsize=(12, 6))
    
    # Extract F1 scores
    num_classes = len(emotion_map)
    f1_scores = np.zeros((len(model_labels), num_classes))
    
    for i, metrics in enumerate(individual_metrics):
        report = metrics['report']
        for j in range(num_classes):
            if str(j) in report:
                f1_scores[i, j] = report[str(j)]['f1-score']
    
    # Add ensemble F1 scores
    ensemble_report = ensemble_metrics['report']
    for j in range(num_classes):
        if str(j) in ensemble_report:
            f1_scores[-1, j] = ensemble_report[str(j)]['f1-score']
    
    # Create heatmap
    sns.heatmap(f1_scores, annot=True, fmt='.3f', cmap='YlGnBu',
                xticklabels=[emotion_map[i] for i in range(num_classes)],
                yticklabels=model_labels)
    plt.xlabel('Emotion Class')
    plt.ylabel('Model')
    plt.title('Class-wise F1 Scores Comparison')
    plt.tight_layout()
    plt.show()
    
    # Plot confusion matrix for ensemble
    plt.figure(figsize=(10, 8))
    sns.heatmap(ensemble_metrics['conf_matrix'], annot=True, fmt='d', cmap='Blues',
                xticklabels=[emotion_map[i] for i in range(num_classes)],
                yticklabels=[emotion_map[i] for i in range(num_classes)])
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.title(f'Confusion Matrix - Ensemble ({ensemble_metrics["method"]})')
    plt.tight_layout()
    plt.show()

## 5.5 Error analysis 

In [32]:
def perform_error_analysis(model, X_val, y_val, emotion_map=None, n_samples=20):
    """
    Perform detailed error analysis on validation data to understand model weaknesses.
    
    Args:
        model: Trained model
        X_val: Validation features
        y_val: Validation labels (one-hot encoded)
        emotion_map: Mapping from class indices to emotion names
        n_samples: Number of misclassified samples to analyze
        
    Returns:
        Dictionary of error analysis results
    """
    if emotion_map is None:
        emotion_map = {
            0: 'Positive',
            1: 'Negative-High Arousal',
            2: 'Negative-Low Arousal',
            3: 'Surprise',
            4: 'Neutral'
        }
    
    print("Performing error analysis on validation data...")
    
    # Get predictions
    y_pred_prob = model.predict(X_val)
    y_pred = np.argmax(y_pred_prob, axis=1)
    y_true = np.argmax(y_val, axis=1)
    
    # Identify misclassified samples
    misclassified_indices = np.where(y_pred != y_true)[0]
    print(f"Found {len(misclassified_indices)} misclassified samples out of {len(y_val)} ({len(misclassified_indices)/len(y_val)*100:.2f}%)")
    
    # Create confusion matrix
    conf_matrix = confusion_matrix(y_true, y_pred)
    norm_conf_matrix = conf_matrix.astype('float') / conf_matrix.sum(axis=1)[:, np.newaxis]
    
    # Analyze error patterns
    error_patterns = []
    
    for true_class in range(len(emotion_map)):
        for pred_class in range(len(emotion_map)):
            if true_class != pred_class and conf_matrix[true_class, pred_class] > 0:
                error_rate = norm_conf_matrix[true_class, pred_class]
                error_patterns.append({
                    'true_class': true_class,
                    'true_emotion': emotion_map[true_class],
                    'pred_class': pred_class,
                    'pred_emotion': emotion_map[pred_class],
                    'count': conf_matrix[true_class, pred_class],
                    'error_rate': error_rate
                })
    
    # Sort error patterns by count
    error_patterns = sorted(error_patterns, key=lambda x: x['count'], reverse=True)
    
    # Print top error patterns
    print("\nTop error patterns:")
    for i, pattern in enumerate(error_patterns[:5]):
        print(f"{i+1}. {pattern['true_emotion']} → {pattern['pred_emotion']}: "
                f"{pattern['count']} samples ({pattern['error_rate']*100:.2f}% of {pattern['true_emotion']} samples)")
    
    # Select random misclassified samples for analysis
    if len(misclassified_indices) > n_samples:
        sample_indices = np.random.choice(misclassified_indices, n_samples, replace=False)
    else:
        sample_indices = misclassified_indices
    
    samples = []
    
    for idx in sample_indices:
        true_class = y_true[idx]
        pred_class = y_pred[idx]
        pred_prob = y_pred_prob[idx, pred_class]
        true_prob = y_pred_prob[idx, true_class]
        
        sample = {
            'index': idx,
            'image': X_val[idx],
            'true_class': true_class,
            'true_emotion': emotion_map[true_class],
            'pred_class': pred_class,
            'pred_emotion': emotion_map[pred_class],
            'pred_prob': pred_prob,
            'true_prob': true_prob,
            'confidence_gap': pred_prob - true_prob
        }
        
        samples.append(sample)
    
    # Analyze confidence patterns
    correct_confidences = y_pred_prob[y_pred == y_true, y_true[y_pred == y_true]]
    incorrect_confidences = y_pred_prob[y_pred != y_true, y_pred[y_pred != y_true]]
    
    # Class-specific confidence analysis
    class_confidences = {}
    
    for cls in range(len(emotion_map)):
        # Correct predictions for this class
        correct_indices = np.where((y_pred == y_true) & (y_true == cls))[0]
        if len(correct_indices) > 0:
            correct_conf = y_pred_prob[correct_indices, cls]
        else:
            correct_conf = np.array([])
        
        # Incorrect predictions as this class
        incorrect_as_indices = np.where((y_pred != y_true) & (y_pred == cls))[0]
        if len(incorrect_as_indices) > 0:
            incorrect_as_conf = y_pred_prob[incorrect_as_indices, cls]
        else:
            incorrect_as_conf = np.array([])
        
        # Incorrect predictions of this class
        incorrect_of_indices = np.where((y_pred != y_true) & (y_true == cls))[0]
        if len(incorrect_of_indices) > 0:
            incorrect_of_conf = y_pred_prob[incorrect_of_indices, y_pred[incorrect_of_indices]]
        else:
            incorrect_of_conf = np.array([])
        
        class_confidences[cls] = {
            'correct': correct_conf,
            'incorrect_as': incorrect_as_conf,
            'incorrect_of': incorrect_of_conf
        }
    
    # Return results
    results = {
        'conf_matrix': conf_matrix,
        'norm_conf_matrix': norm_conf_matrix,
        'error_patterns': error_patterns,
        'samples': samples,
        'correct_confidences': correct_confidences,
        'incorrect_confidences': incorrect_confidences,
        'class_confidences': class_confidences
    }
    
    return results

In [33]:
def visualize_error_analysis(error_results, emotion_map=None):
    """
    Visualize the results of error analysis.
    
    Args:
        error_results: Dictionary of error analysis results
        emotion_map: Mapping from class indices to emotion names
    """
    if emotion_map is None:
        emotion_map = {
            0: 'Positive',
            1: 'Negative-High Arousal',
            2: 'Negative-Low Arousal',
            3: 'Surprise',
            4: 'Neutral'
        }
    
    # Plot normalized confusion matrix
    plt.figure(figsize=(10, 8))
    sns.heatmap(error_results['norm_conf_matrix'], annot=True, fmt='.2f', cmap='Blues',
                xticklabels=[emotion_map[i] for i in range(len(emotion_map))],
                yticklabels=[emotion_map[i] for i in range(len(emotion_map))])
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.title('Normalized Confusion Matrix')
    plt.tight_layout()
    plt.show()
    
    # Plot top error patterns
    top_patterns = error_results['error_patterns'][:5]
    
    plt.figure(figsize=(12, 6))
    plt.bar(
        [f"{p['true_emotion']} → {p['pred_emotion']}" for p in top_patterns],
        [p['count'] for p in top_patterns],
        color='salmon'
    )
    plt.title('Top Error Patterns')
    plt.xlabel('Error Pattern')
    plt.ylabel('Count')
    plt.xticks(rotation=45, ha='right')
    plt.grid(axis='y', alpha=0.3)
    plt.tight_layout()
    plt.show()
    
    # Plot confidence distributions
    plt.figure(figsize=(12, 5))
    
    plt.subplot(1, 2, 1)
    plt.hist(error_results['correct_confidences'], bins=20, alpha=0.7, label='Correct predictions', color='green')
    plt.hist(error_results['incorrect_confidences'], bins=20, alpha=0.7, label='Incorrect predictions', color='red')
    plt.xlabel('Confidence')
    plt.ylabel('Count')
    plt.title('Confidence Distribution')
    plt.legend()
    plt.grid(alpha=0.3)
    
    # Plot misclassified samples
    plt.figure(figsize=(15, min(len(error_results['samples']), 10) * 2))
    
    for i, sample in enumerate(error_results['samples'][:10]):
        img = sample['image']
        if len(img.shape) == 3 and img.shape[-1] == 1:
            img = img.reshape(img.shape[0], img.shape[1])
        
        plt.subplot(min(len(error_results['samples']), 10), 2, i * 2 + 1)
        plt.imshow(img, cmap='gray')
        plt.title(f"True: {sample['true_emotion']}, Pred: {sample['pred_emotion']}")
        plt.axis('off')
        
        # Plot prediction probabilities
        plt.subplot(min(len(error_results['samples']), 10), 2, i * 2 + 2)
        
        probs = np.zeros(len(emotion_map))
        for j in range(len(emotion_map)):
            if j == sample['true_class']:
                probs[j] = sample['true_prob']
            elif j == sample['pred_class']:
                probs[j] = sample['pred_prob']
            # Other classes' probabilities not shown for clarity
        
        colors = ['red' if j != sample['true_class'] else 'green' for j in range(len(emotion_map))]
        plt.bar(range(len(emotion_map)), probs, color=colors)
        plt.xticks(range(len(emotion_map)), [emotion_map[i] for i in range(len(emotion_map))], rotation=45, ha='right')
        plt.xlabel('Emotion')
        plt.ylabel('Probability')
        plt.ylim(0, 1)
    
    plt.tight_layout()
    plt.show()
    
    # Plot class-wise confidence analysis
    plt.figure(figsize=(15, 10))
    
    for i, cls in enumerate(range(len(emotion_map))):
        confidences = error_results['class_confidences'][cls]
        
        plt.subplot(3, 2, i+1)
        
        if len(confidences['correct']) > 0:
            plt.hist(confidences['correct'], bins=10, alpha=0.7, label='Correct', color='green')
        
        if len(confidences['incorrect_as']) > 0:
            plt.hist(confidences['incorrect_as'], bins=10, alpha=0.7, label='Falsely predicted', color='red')
        
        if len(confidences['incorrect_of']) > 0:
            plt.hist(confidences['incorrect_of'], bins=10, alpha=0.7, label='Missed', color='orange')
        
        plt.title(f"Confidence Analysis: {emotion_map[cls]}")
        plt.xlabel('Confidence')
        plt.ylabel('Count')
        plt.legend()
        plt.grid(alpha=0.3)
    
    plt.tight_layout()
    plt.show()

In [34]:
def refine_model_for_common_errors(model, X_train, y_train, X_val, y_val, 
                                    error_results, emotion_map=None, 
                                    batch_size=64, epochs=30):
    """
    Refine the model to address common error patterns using training and validation data.
    Fixed to avoid class_weight issue with generators.
    
    Args:
        model: Trained model to refine
        X_train, y_train: Training data and labels
        X_val, y_val: Validation data and labels
        error_results: Error analysis results
        emotion_map: Mapping from class indices to emotion names
        batch_size: Batch size for training
        epochs: Maximum number of epochs
        
    Returns:
        Refined model and training history
    """
    if emotion_map is None:
        emotion_map = {
            0: 'Positive',
            1: 'Negative-High Arousal',
            2: 'Negative-Low Arousal',
            3: 'Surprise',
            4: 'Neutral'
        }
    
    # Identify most common error patterns
    top_errors = error_results['error_patterns'][:3]
    
    print("Refining model to address common error patterns:")
    for i, pattern in enumerate(top_errors):
        print(f"{i+1}. {pattern['true_emotion']} → {pattern['pred_emotion']}: "
                f"{pattern['count']} samples ({pattern['error_rate']*100:.2f}% of {pattern['true_emotion']} samples)")
    
    # Create focused data augmentation for problematic classes
    train_datagen = ImageDataGenerator(
        rotation_range=15,
        width_shift_range=0.1,
        height_shift_range=0.1,
        zoom_range=0.1,
        horizontal_flip=True,
        brightness_range=[0.8, 1.2],
        fill_mode='nearest'
    )
    
    # Create focused dataset with more samples of problematic classes
    problem_classes = [pattern['true_class'] for pattern in top_errors]
    
    # Identify samples from problematic classes
    y_train_indices = np.argmax(y_train, axis=1)
    problem_indices = []
    for cls in problem_classes:
        cls_indices = np.where(y_train_indices == cls)[0]
        problem_indices.extend(cls_indices)
    
    # Instead of using custom generator with class weights,
    # we'll oversample the problem classes in the dataset
    X_train_enhanced = X_train.copy()
    y_train_enhanced = y_train.copy()
    
    # Duplicate problem samples (2x)
    X_problem = X_train[problem_indices]
    y_problem = y_train[problem_indices]
    
    # Add duplicates
    X_train_enhanced = np.concatenate([X_train_enhanced, X_problem], axis=0)
    y_train_enhanced = np.concatenate([y_train_enhanced, y_problem], axis=0)
    
    print(f"Enhanced training set: {len(X_train)} → {len(X_train_enhanced)} samples")
    print(f"Problem classes oversampled: {[emotion_map[cls] for cls in problem_classes]}")
    
    # Define callbacks
    callbacks = [
        EarlyStopping(
            monitor='val_loss',
            patience=8,
            restore_best_weights=True,
            verbose=1
        ),
        ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.5,
            patience=3,
            min_lr=1e-6,
            verbose=1
        )
    ]
    
    # Create data generator for augmentation
    train_gen = train_datagen.flow(
        X_train_enhanced, y_train_enhanced, 
        batch_size=batch_size
    )
    
    # Refine model
    print("\nRefining model...")
    
    # Train model using generator without class_weights
    history = model.fit(
        train_gen,
        steps_per_epoch=len(X_train_enhanced) // batch_size,
        epochs=epochs,
        validation_data=(X_val, y_val),
        callbacks=callbacks,
        verbose=1
    )
    
    return model, history

In [35]:
def visualize_model_predictions_on_samples(model, X_sample, emotion_map=None, true_emotions=None, n_samples=10):
    """
    Visualize model predictions on a small set of sample images.
    
    Args:
        model: Trained model
        X_sample: Sample images to visualize
        emotion_map: Mapping from class indices to emotion names
        true_emotions: True emotion labels (optional)
        n_samples: Number of samples to visualize
    """
    if emotion_map is None:
        emotion_map = {
            0: 'Positive',
            1: 'Negative-High Arousal',
            2: 'Negative-Low Arousal',
            3: 'Surprise',
            4: 'Neutral'
        }
    
    # Limit to n_samples
    if len(X_sample) > n_samples:
        indices = np.random.choice(len(X_sample), n_samples, replace=False)
        X_display = X_sample[indices]
        if true_emotions is not None:
            true_emotions = [true_emotions[i] for i in indices]
    else:
        X_display = X_sample
    
    # Get predictions
    y_pred = model.predict(X_display)
    y_pred_class = np.argmax(y_pred, axis=1)
    
    # Visualize results
    n_cols = min(5, len(X_display))
    n_rows = (len(X_display) + n_cols - 1) // n_cols
    
    plt.figure(figsize=(n_cols * 3, n_rows * 3))
    
    for i, img in enumerate(X_display):
        plt.subplot(n_rows, n_cols, i + 1)
        
        # Handle different image formats
        if len(img.shape) == 3 and img.shape[-1] == 1:
            img = img.reshape(img.shape[0], img.shape[1])
        
        plt.imshow(img, cmap='gray')
        
        pred_class = y_pred_class[i]
        pred_emotion = emotion_map[pred_class]
        pred_confidence = y_pred[i, pred_class]
        
        title = f"Pred: {pred_emotion}\nConf: {pred_confidence:.2f}"
        if true_emotions is not None:
            title = f"True: {true_emotions[i]}\n{title}"
            
        plt.title(title)
        plt.axis('off')
    
    plt.tight_layout()
    plt.show()

In [36]:
def build_complete_emotion_recognition_pipeline(model, emotion_map=None, confidence_threshold=0.5):
    """
    Create a complete pipeline for emotion recognition.
    
    Args:
        model: Trained model
        emotion_map: Mapping from class indices to emotion names
        confidence_threshold: Threshold for confidence to accept prediction
        
    Returns:
        Pipeline function
    """
    if emotion_map is None:
        emotion_map = {
            0: 'Positive',
            1: 'Negative-High Arousal',
            2: 'Negative-Low Arousal',
            3: 'Surprise',
            4: 'Neutral'
        }
    
    def preprocess_image(img):
        """Preprocess a single image for prediction."""
        # Check if image needs to be converted to grayscale
        if len(img.shape) == 3 and img.shape[2] > 1:
            from skimage.color import rgb2gray
            img = rgb2gray(img)
        
        # Resize to expected dimensions
        from skimage.transform import resize
        img = resize(img, (48, 48), anti_aliasing=True)
        
        # Normalize to [0, 1]
        if img.max() > 1.0:
            img = img / 255.0
        
        # Add batch dimension and channel dimension
        img = img.reshape(1, 48, 48, 1)
        
        return img
    
    def predict_emotion(img):
        """Predict emotion from an image."""
        # Preprocess image
        processed_img = preprocess_image(img)
        
        # Get prediction
        pred_probs = model.predict(processed_img)[0]
        pred_class = np.argmax(pred_probs)
        pred_confidence = pred_probs[pred_class]
        
        # Check confidence
        if pred_confidence >= confidence_threshold:
            emotion = emotion_map[pred_class]
            result = {
                'emotion': emotion,
                'confidence': pred_confidence,
                'emotion_probabilities': {emotion_map[i]: float(pred_probs[i]) for i in range(len(emotion_map))}
            }
        else:
            # Low confidence, return uncertain result
            result = {
                'emotion': 'Uncertain',
                'confidence': pred_confidence,
                'emotion_probabilities': {emotion_map[i]: float(pred_probs[i]) for i in range(len(emotion_map))}
            }
        
        return result
    
    def process_image_batch(images):
        """Process a batch of images."""
        # Preprocess images
        processed_images = np.array([preprocess_image(img)[0] for img in images])
        
        # Get predictions
        pred_probs = model.predict(processed_images)
        pred_classes = np.argmax(pred_probs, axis=1)
        pred_confidences = [pred_probs[i, pred_classes[i]] for i in range(len(pred_classes))]
        
        # Create results
        results = []
        for i in range(len(images)):
            if pred_confidences[i] >= confidence_threshold:
                emotion = emotion_map[pred_classes[i]]
            else:
                emotion = 'Uncertain'
                
            result = {
                'emotion': emotion,
                'confidence': pred_confidences[i],
                'emotion_probabilities': {emotion_map[j]: float(pred_probs[i, j]) for j in range(len(emotion_map))}
            }
            
            results.append(result)
        
        return results
    
    # Return pipeline functions
    return {
        'predict_emotion': predict_emotion,
        'process_image_batch': process_image_batch
    }

In [37]:
def save_and_export_model(model, model_name, emotion_map, preprocessing_info):
    """
    Save and export the trained model for future use.
    
    Args:
        model: Trained model
        model_name: Name for saving the model
        emotion_map: Mapping from class indices to emotion names
        preprocessing_info: Dictionary of preprocessing parameters
        
    Returns:
        Path to saved model
    """
    # Create directory if it doesn't exist
    save_dir = 'models'
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)
    
    # Save model
    model_path = os.path.join(save_dir, f"{model_name}.h5")
    model.save(model_path)
    
    # Save metadata
    metadata = {
        'emotion_map': emotion_map,
        'preprocessing_info': preprocessing_info,
        'model_name': model_name,
        'input_shape': model.input_shape[1:],
        'output_shape': model.output_shape[1:],
        'date_trained': time.strftime('%Y-%m-%d %H:%M:%S')
    }
    
    import json
    metadata_path = os.path.join(save_dir, f"{model_name}_metadata.json")
    with open(metadata_path, 'w') as f:
        json.dump(metadata, f)
    
    print(f"Model saved to {model_path}")
    print(f"Metadata saved to {metadata_path}")
    
    return model_path

## Run The Pipeline

In [38]:
def run_section5_pipeline(processed_data=None):
    """
    Run the complete Section 5 pipeline following proper methodology.
    
    Args:
        processed_data: Dictionary of preprocessed data from previous sections
    
    Steps:
    1. Hyperparameter optimization using training and validation data
    2. Cross-validation for robust performance estimation
    3. Transfer learning for improved feature extraction
    4. Ensemble methods to combine model strengths
    5. Error analysis to identify weaknesses
    6. Model refinement based on error analysis
    7. Final model selection based on validation performance
    
    Returns:
        Dictionary of results and the final selected model
    """
    if processed_data is None:
        print("No processed data provided. Please run previous sections first.")
        return
    
    # 1. Hyperparameter Optimization
    print("\n========== 5.1 Hyperparameter Optimization ==========")
    
    grid_results, best_params, best_hp_model = grid_search_cnn_hyperparameters(
        processed_data['X_train_aug_cnn'],
        processed_data['y_train_aug_onehot'],
        processed_data['X_val_cnn'],
        processed_data['y_val_5class_onehot'],
        class_weights=processed_data['class_weights']
    )
    
    visualize_grid_search_results(grid_results)
    
    # 2. Cross-Validation
    print("\n========== 5.2 Cross-Validation ==========")
    
    # Create model with best parameters
    def create_best_model():
        return create_improved_cnn(
            input_shape=processed_data['X_train_cnn'].shape[1:],
            num_classes=processed_data['y_train_5class_onehot'].shape[1],
            filters=best_params['filters'],
            dropout_rates=best_params['dropout_rates'],
            learning_rate=best_params['learning_rate'],
            optimizer_name=best_params['optimizer']
        )
    
    # Only use training data for cross-validation, splitting into train/val within CV folds
    cv_results = perform_cross_validation(
        processed_data['X_train_aug_cnn'],
        processed_data['y_train_aug_onehot'],
        create_best_model,
        n_splits=5,
        batch_size=64,
        epochs=30,
        class_weights=processed_data['class_weights'],
        use_augmentation=True
    )
    
    visualize_cross_validation_results(cv_results)
    
    # Get the best CV model based on validation accuracy
    best_cv_model = cv_results['fold_models'][np.argmax(cv_results['fold_accuracies'])]
    
    # 3. Transfer Learning
    print("\n========== 5.3 Transfer Learning ==========")
    
    # Create transfer learning model
    transfer_model = create_transfer_learning_model(
        base_model_name='vgg16',
        input_shape=processed_data['X_train_cnn'].shape[1:],
        num_classes=processed_data['y_train_5class_onehot'].shape[1],
        learning_rate=0.0001,
        trainable_layers=5  # Fine-tune top 5 layers
    )
    
    # Fine-tune model on training data, validating on validation data
    fine_tuned_model, transfer_history = fine_tune_transfer_learning_model(
        transfer_model,
        processed_data['X_train_aug_cnn'],
        processed_data['y_train_aug_onehot'],
        processed_data['X_val_cnn'],
        processed_data['y_val_5class_onehot'],
        batch_size=32,
        epochs=20,
        class_weights=processed_data['class_weights']
    )
    
    # Visualize training history
    visualize_training_history(transfer_history, title="Transfer Learning Model History")
    
    # Evaluate on validation data
    val_preds_transfer = fine_tuned_model.predict(processed_data['X_val_cnn'])
    val_preds_transfer_class = np.argmax(val_preds_transfer, axis=1)
    val_true_class = np.argmax(processed_data['y_val_5class_onehot'], axis=1)
    
    transfer_val_accuracy = np.mean(val_preds_transfer_class == val_true_class)
    transfer_val_report = classification_report(val_true_class, val_preds_transfer_class, output_dict=True)
    transfer_val_conf_matrix = confusion_matrix(val_true_class, val_preds_transfer_class)
    
    print(f"Transfer Learning Validation Accuracy: {transfer_val_accuracy:.4f}")
    
    transfer_val_metrics = {
        'accuracy': transfer_val_accuracy,
        'report': transfer_val_report,
        'conf_matrix': transfer_val_conf_matrix
    }
    
    # 4. Ensemble Methods
    print("\n========== 5.4 Ensemble Methods ==========")
    
    # Create another instance of the best model and train it on the full training set
    third_model = create_best_model()
    third_model.fit(
        processed_data['X_train_aug_cnn'],
        processed_data['y_train_aug_onehot'],
        batch_size=64,
        epochs=20,
        validation_data=(processed_data['X_val_cnn'], processed_data['y_val_5class_onehot']),
        class_weight=processed_data['class_weights'],
        callbacks=[
            EarlyStopping(monitor='val_loss', patience=8, restore_best_weights=True),
            ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=3, min_lr=1e-6)
        ],
        verbose=1
    )
    
    # Evaluate all individual models on validation data
    models = [best_hp_model, best_cv_model, fine_tuned_model, third_model]
    model_names = ["Hyperparameter Optimized", "Best CV", "Transfer Learning", "Additional Model"]
    individual_val_metrics = []
    
    for i, model in enumerate(models):
        print(f"\nEvaluating {model_names[i]} on validation set...")
        val_preds = model.predict(processed_data['X_val_cnn'])
        val_preds_class = np.argmax(val_preds, axis=1)
        
        val_accuracy = np.mean(val_preds_class == val_true_class)
        val_report = classification_report(val_true_class, val_preds_class, output_dict=True)
        val_conf_matrix = confusion_matrix(val_true_class, val_preds_class)
        
        print(f"Validation accuracy: {val_accuracy:.4f}")
        
        individual_val_metrics.append({
            'name': model_names[i],
            'accuracy': val_accuracy,
            'report': val_report,
            'conf_matrix': val_conf_matrix
        })
    
    # Create ensemble using weighted method
    ensemble_predict, ensemble_val_metrics = create_ensemble_model(
        models,
        processed_data['X_val_cnn'],
        processed_data['y_val_5class_onehot'],
        method='weighted'
    )
    
    # Add name to ensemble metrics
    ensemble_val_metrics['name'] = "Ensemble (weighted)"
    
    # Compare all models on validation data
    all_val_metrics = individual_val_metrics + [ensemble_val_metrics]
    
    # Find the best model based on validation accuracy
    best_model_idx = np.argmax([m['accuracy'] for m in all_val_metrics])
    best_model_name = all_val_metrics[best_model_idx]['name']
    
    print(f"\nBest model based on validation accuracy: {best_model_name} with "
            f"accuracy {all_val_metrics[best_model_idx]['accuracy']:.4f}")
    
    # Visualize ensemble performance
    visualize_ensemble_performance(
        ensemble_val_metrics,
        individual_val_metrics,
        emotion_map=processed_data['new_emotion_map']
    )
    
    # 5. Error Analysis
    print("\n========== 5.5 Error Analysis ==========")
    
    # Use the best individual model for error analysis
    if best_model_name == "Ensemble (weighted)":
        # For error analysis we need a model object, not just a prediction function
        # So we'll use the best individual model instead
        best_individual_idx = np.argmax([m['accuracy'] for m in individual_val_metrics])
        model_for_analysis = models[best_individual_idx]
        print(f"Using {model_names[best_individual_idx]} for error analysis...")
    else:
        model_idx = model_names.index(best_model_name)
        model_for_analysis = models[model_idx]
        print(f"Using {best_model_name} for error analysis...")
    
    # Perform error analysis on validation data
    error_results = perform_error_analysis(
        model_for_analysis,
        processed_data['X_val_cnn'],
        processed_data['y_val_5class_onehot'],
        emotion_map=processed_data['new_emotion_map'],
        n_samples=10
    )
    
    visualize_error_analysis(error_results, processed_data['new_emotion_map'])
    
    # 6. Refine model based on error analysis
    print("\n========== 5.6 Model Refinement ==========")
    
    refined_model, refinement_history = refine_model_for_common_errors(
        model_for_analysis,
        processed_data['X_train_aug_cnn'],
        processed_data['y_train_aug_onehot'],
        processed_data['X_val_cnn'],
        processed_data['y_val_5class_onehot'],
        error_results,
        emotion_map=processed_data['new_emotion_map'],
        batch_size=64,
        epochs=20
    )
    
    # Visualize refinement history
    visualize_training_history(refinement_history, title="Model Refinement History")
    
    # Evaluate refined model on validation data
    val_preds_refined = refined_model.predict(processed_data['X_val_cnn'])
    val_preds_refined_class = np.argmax(val_preds_refined, axis=1)
    
    refined_val_accuracy = np.mean(val_preds_refined_class == val_true_class)
    refined_val_report = classification_report(val_true_class, val_preds_refined_class, output_dict=True)
    refined_val_conf_matrix = confusion_matrix(val_true_class, val_preds_refined_class)
    
    print(f"Refined Model Validation Accuracy: {refined_val_accuracy:.4f}")
    
    refined_val_metrics = {
        'name': "Refined Model",
        'accuracy': refined_val_accuracy,
        'report': refined_val_report,
        'conf_matrix': refined_val_conf_matrix
    }
    
    # Add refined model to comparison
    all_val_metrics.append(refined_val_metrics)
    models.append(refined_model)
    model_names.append("Refined Model")
    
    # Update best model if refined model is better
    if refined_val_accuracy > all_val_metrics[best_model_idx]['accuracy']:
        best_model_idx = len(all_val_metrics) - 1  # Index of refined model
        best_model_name = "Refined Model"
        print(f"Refined model is now the best model with accuracy {refined_val_accuracy:.4f}")
    
    # 7. Final Model Selection
    print("\n========== 5.7 Final Model Selection ==========")
    
    # Select the final model based on validation performance
    if best_model_name == "Ensemble (weighted)":
        # For the final model, we'll use the ensemble prediction function
        # But for demonstration, we'll also use the best individual model
        best_individual_idx = np.argmax([m['accuracy'] for m in individual_val_metrics])
        final_model = models[best_individual_idx]
        final_model_name = model_names[best_individual_idx]
        
        print(f"Selected final model: {best_model_name} (using {final_model_name} as base)")
        print("Note: For full ensemble use in production, would need to save all component models")
    else:
        model_idx = model_names.index(best_model_name)
        final_model = models[model_idx]
        final_model_name = best_model_name
        
        print(f"Selected final model: {final_model_name}")
    
    # Visualize model predictions on a few validation samples
    print("\nVisualizing predictions on validation samples:")
    
    sample_indices = np.random.choice(len(processed_data['X_val_cnn']), 10, replace=False)
    X_samples = processed_data['X_val_cnn'][sample_indices]
    y_samples = np.argmax(processed_data['y_val_5class_onehot'][sample_indices], axis=1)
    sample_true_emotions = [processed_data['new_emotion_map'][y] for y in y_samples]
    
    visualize_model_predictions_on_samples(
        final_model,
        X_samples,
        emotion_map=processed_data['new_emotion_map'],
        true_emotions=sample_true_emotions
    )
    
    # Save the final model
    preprocessing_info = {
        'image_size': (48, 48),
        'normalization': 'divide_by_255',
        'color_mode': 'grayscale'
    }
    
    model_path = save_and_export_model(
        final_model,
        f"emotion_recognition_5class_{final_model_name.replace(' ', '_').lower()}",
        processed_data['new_emotion_map'],
        preprocessing_info
    )
    
    print(f"\nModel saved and ready for next section: {model_path} .")
    
    # Return comprehensive results
    return {
        'grid_results': grid_results,
        'best_params': best_params,
        'cv_results': cv_results,
        'transfer_model': fine_tuned_model,
        'transfer_val_metrics': transfer_val_metrics,
        'individual_models': models,
        'individual_model_names': model_names,
        'individual_val_metrics': individual_val_metrics,
        'ensemble_val_metrics': ensemble_val_metrics,
        'error_results': error_results,
        'refined_model': refined_model,
        'refined_val_metrics': refined_val_metrics,
        'all_val_metrics': all_val_metrics,
        'final_model': final_model,
        'final_model_name': final_model_name
    }

In [None]:
# Load the preprocessed data
if 'processed_data' in globals():
    print("Running Section 5 pipeline with preprocessed data...")
    section5_results = run_section5_pipeline(processed_data)
else:
    print("Preprocessed data not found. Please run Sections 1-4 first.")

## 6. Final Model Training and Evaluation 

In [None]:
def train_final_model(processed_data, section5_results):
    """
    Train the final model with extended training parameters and reduced early stopping.
    """
    import numpy as np
    import matplotlib.pyplot as plt
    import os
    from sklearn.model_selection import train_test_split
    from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
    from tensorflow.keras.models import Sequential
    from tensorflow.keras.layers import Conv2D, MaxPooling2D, Dropout, Flatten, Dense, BatchNormalization
    from tensorflow.keras.optimizers import Adam, RMSprop, SGD
    from tensorflow.keras.regularizers import l2
    
    # Define batch size early to avoid reference errors
    batch_size = 64
    
    print("\n========== 6. Final Model Training ==========")
    
    # 1. Combine training and validation data
    print("Combining training and validation data for final model training...")
    
    # Combine image data
    X_combined = np.concatenate([
        processed_data['X_train_aug_cnn'], 
        processed_data['X_val_cnn']
    ], axis=0)
    
    # Combine labels
    y_combined = np.concatenate([
        processed_data['y_train_aug_onehot'], 
        processed_data['y_val_5class_onehot']
    ], axis=0)
    
    print(f"Combined dataset shape: {X_combined.shape}")
    print(f"Combined labels shape: {y_combined.shape}")
    
    # Calculate class distribution in combined dataset
    y_combined_classes = np.argmax(y_combined, axis=1)
    class_counts = np.bincount(y_combined_classes)
    
    print("\nClass distribution in combined dataset:")
    for cls, count in enumerate(class_counts):
        print(f"  Class {cls} ({processed_data['new_emotion_map'][cls]}): {count} samples")
    
    # 2. Create a validation split from the combined data
    # Using a smaller validation set to have more training data
    X_train, X_val, y_train, y_val = train_test_split(
        X_combined, 
        y_combined,
        test_size=0.10,  # Use 10% as internal validation set (smaller than before)
        stratify=y_combined_classes,
        random_state=42
    )
    
    print(f"\nAfter validation split:")
    print(f"  Training set: {X_train.shape[0]} samples")
    print(f"  Validation set: {X_val.shape[0]} samples")
    
    # 3. Create a new model with best hyperparameters
    best_params = section5_results['best_params']
    print(f"\nCreating a new model with best hyperparameters:")
    print(f"  Learning rate: {best_params['learning_rate']}")
    print(f"  Optimizer: {best_params['optimizer']}")
    print(f"  Filters: {best_params['filters']}")
    
    # Create CNN model directly
    def create_improved_cnn(input_shape=(48, 48, 1), num_classes=5, 
                            filters=(32, 64, 128), kernel_size=(3, 3), 
                            dropout_rates=(0.25, 0.25, 0.25, 0.5, 0.5),
                            use_batch_norm=True, learning_rate=0.001, 
                            optimizer_name='adam', l2_reg=0.0001):
        """
        Create an improved CNN model with configurable hyperparameters.
        """
        # Initialize model
        model = Sequential()
        
        # First convolutional block
        model.add(Conv2D(filters[0], kernel_size, padding='same', 
                        activation='relu', input_shape=input_shape,
                        kernel_regularizer=l2(l2_reg)))
        if use_batch_norm:
            model.add(BatchNormalization())
        model.add(Conv2D(filters[0], kernel_size, padding='same', 
                        activation='relu', kernel_regularizer=l2(l2_reg)))
        if use_batch_norm:
            model.add(BatchNormalization())
        model.add(MaxPooling2D(pool_size=(2, 2)))
        model.add(Dropout(dropout_rates[0]))
        
        # Second convolutional block
        model.add(Conv2D(filters[1], kernel_size, padding='same', 
                        activation='relu', kernel_regularizer=l2(l2_reg)))
        if use_batch_norm:
            model.add(BatchNormalization())
        model.add(Conv2D(filters[1], kernel_size, padding='same', 
                        activation='relu', kernel_regularizer=l2(l2_reg)))
        if use_batch_norm:
            model.add(BatchNormalization())
        model.add(MaxPooling2D(pool_size=(2, 2)))
        model.add(Dropout(dropout_rates[1]))
        
        # Third convolutional block
        model.add(Conv2D(filters[2], kernel_size, padding='same', 
                        activation='relu', kernel_regularizer=l2(l2_reg)))
        if use_batch_norm:
            model.add(BatchNormalization())
        model.add(Conv2D(filters[2], kernel_size, padding='same', 
                        activation='relu', kernel_regularizer=l2(l2_reg)))
        if use_batch_norm:
            model.add(BatchNormalization())
        model.add(MaxPooling2D(pool_size=(2, 2)))
        model.add(Dropout(dropout_rates[2]))
        
        # Fully connected layers
        model.add(Flatten())
        model.add(Dense(512, activation='relu', kernel_regularizer=l2(l2_reg)))
        if use_batch_norm:
            model.add(BatchNormalization())
        model.add(Dropout(dropout_rates[3]))
        model.add(Dense(256, activation='relu', kernel_regularizer=l2(l2_reg)))
        if use_batch_norm:
            model.add(BatchNormalization())
        model.add(Dropout(dropout_rates[4]))
        model.add(Dense(num_classes, activation='softmax'))
        
        # Select optimizer
        if optimizer_name.lower() == 'adam':
            optimizer = Adam(learning_rate=learning_rate)
        elif optimizer_name.lower() == 'rmsprop':
            optimizer = RMSprop(learning_rate=learning_rate)
        elif optimizer_name.lower() == 'sgd':
            optimizer = SGD(learning_rate=learning_rate, momentum=0.9)
        else:
            print(f"Unrecognized optimizer: {optimizer_name}. Using Adam.")
            optimizer = Adam(learning_rate=learning_rate)
        
        # Compile model
        model.compile(
            optimizer=optimizer,
            loss='categorical_crossentropy',
            metrics=['accuracy']
        )
        
        return model
    
    # Create model
    final_model = create_improved_cnn(
        input_shape=processed_data['X_train_cnn'].shape[1:],
        num_classes=processed_data['y_train_5class_onehot'].shape[1],
        filters=best_params['filters'],
        dropout_rates=(0.25, 0.25, 0.25, 0.5, 0.5),
        learning_rate=best_params['learning_rate'],
        optimizer_name=best_params['optimizer']
    )
    
    # 4. Set up class weights similar to the successful Section 5 setup
    # Instead of enhancing, we'll use the exact class weights that worked well in Section 5
    print("Using the exact class weights that worked well in Section 5...")
    
    # Use the original class weights 
    class_weights = processed_data['class_weights']
    
    # Create directory for checkpoints
    if not os.path.exists('./checkpoints'):
        os.makedirs('./checkpoints')
    
    # 5. Define callbacks with much higher patience for early stopping
    model_checkpoint = ModelCheckpoint(
        filepath='./checkpoints/best_model.keras',
        monitor='val_accuracy',  # Monitor accuracy instead of loss
        save_best_only=True,
        verbose=1
    )
    
    callbacks = [
        EarlyStopping(
            monitor='val_accuracy',
            patience=20,  # Much higher patience
            restore_best_weights=True,
            verbose=1,
            mode='max'  # We want to maximize accuracy
        ),
        ReduceLROnPlateau(
            monitor='val_accuracy',
            factor=0.5,
            patience=10,  # Also higher patience for LR reduction
            min_lr=1e-6,
            verbose=1,
            mode='max'  # We want to maximize accuracy
        ),
        model_checkpoint
    ]
    
    # 6. Train the model with more epochs
    print("\nTraining final model on combined dataset...")
    print("Using extended training parameters (more epochs, higher patience)...")
    
    # Train with more epochs and patience
    history = final_model.fit(
        X_train, y_train,
        batch_size=batch_size,
        epochs=100,  # Much more epochs (100 instead of 30)
        validation_data=(X_val, y_val),
        class_weight=class_weights,
        callbacks=callbacks,
        verbose=1
    )
    
    # 7. Try to load the best model from checkpoint
    try:
        from tensorflow.keras.models import load_model
        best_model_path = './checkpoints/best_model.h5'
        if os.path.exists(best_model_path):
            print(f"\nLoading best model from checkpoint: {best_model_path}")
            final_model = load_model(best_model_path)
    except Exception as e:
        print(f"Error loading checkpoint: {e}")
        print("Continuing with current model state")
    
    # 8. Visualize training history
    plt.figure(figsize=(12, 5))
    
    plt.subplot(1, 2, 1)
    plt.plot(history.history['accuracy'], label='Train')
    plt.plot(history.history['val_accuracy'], label='Validation')
    plt.title('Model Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.grid(True, alpha=0.3)
    
    plt.subplot(1, 2, 2)
    plt.plot(history.history['loss'], label='Train')
    plt.plot(history.history['val_loss'], label='Validation')
    plt.title('Model Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    # 9. Evaluate on the validation set
    print("\nEvaluating final model on validation data...")
    val_loss, val_accuracy = final_model.evaluate(X_val, y_val, verbose=1)
    print(f"Validation accuracy: {val_accuracy:.4f}")
    print(f"Validation loss: {val_loss:.4f}")
    
    
    # 10. Save the model
    save_dir = 'production_models'
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)
    
    model_path = os.path.join(save_dir, "emotion_recognition_5class_final.h5")
    final_model.save(model_path)
    
    # Save best epoch and accuracy in metadata
    best_epoch = np.argmax(history.history['val_accuracy']) + 1
    best_accuracy = np.max(history.history['val_accuracy'])
    
    print(f"\nBest validation accuracy: {best_accuracy:.4f} (epoch {best_epoch})")
    print(f"Final model saved to {model_path}")
    
    return {
        'final_model': final_model,
        'training_history': history,
        'model_path': model_path,
        'val_accuracy': val_accuracy,
        'val_loss': val_loss,
        'best_epoch': best_epoch,
        'best_accuracy': best_accuracy
    }

# Run the extended training pipeline
if 'section5_results' in globals() and 'processed_data' in globals():
    print("Running Section 6 pipeline with results from Section 5...")
    section6_results = train_final_model(processed_data, section5_results)
else:
    print("Required data from Section 5 not found. Please run Section 5 first.")

## 7. Test Set Evaluation 

### 7.1 Test Set Evaluation

In [42]:
def evaluate_on_test_set(final_model, processed_data):
    """
    Evaluate the final model on the test set.
    This is the only time the test set is used in the entire pipeline.
    
    Args:
        final_model: The trained final model
        processed_data: Dictionary of preprocessed data
        
    Returns:
        Evaluation metrics on test set
    """
    print("\n========== 7.1 FINAL TEST SET EVALUATION ==========")
    print("Note: This is the only evaluation on the test set in the entire pipeline")
    
    # Get predictions on test set
    test_pred = final_model.predict(processed_data['X_test_cnn'])
    test_pred_class = np.argmax(test_pred, axis=1)
    test_true_class = np.argmax(processed_data['y_test_5class_onehot'], axis=1)
    
    # Calculate metrics
    test_accuracy = np.mean(test_pred_class == test_true_class)
    test_report = classification_report(test_true_class, test_pred_class, output_dict=True)
    test_conf_matrix = confusion_matrix(test_true_class, test_pred_class)
    
    # Print results
    print(f"Final model test accuracy: {test_accuracy:.4f}")
    print("\nClass-wise performance on test set:")
    
    # Print class-wise metrics
    for i in range(len(processed_data['new_emotion_map'])):
        if str(i) in test_report:
            class_precision = test_report[str(i)]['precision']
            class_recall = test_report[str(i)]['recall']
            class_f1 = test_report[str(i)]['f1-score']
            class_support = test_report[str(i)]['support']
            
            print(f"  Class {i} ({processed_data['new_emotion_map'][i]}): "
                    f"Precision={class_precision:.4f}, "
                    f"Recall={class_recall:.4f}, "
                    f"F1={class_f1:.4f}, "
                    f"Support={class_support}")
    
    # Return evaluation metrics
    return {
        'accuracy': test_accuracy,
        'report': test_report,
        'conf_matrix': test_conf_matrix
    }

### 7.2 Visualize Test Set Performance

In [43]:
def visualize_test_performance(test_metrics, processed_data, model):
    """
    Visualize the performance on the test set through different visualizations.
    
    Args:
        test_metrics: Test set evaluation metrics
        processed_data: Dictionary of preprocessed data
        model: The trained model to use for predictions
    """
    print("\n========== 7.2 Test Set Performance Visualization ==========")
    
    # Extract data
    test_conf_matrix = test_metrics['conf_matrix']
    test_report = test_metrics['report']
    
    # 1. Confusion Matrix
    plt.figure(figsize=(10, 8))
    sns.heatmap(test_conf_matrix, annot=True, fmt='d', cmap='Blues',
                xticklabels=[processed_data['new_emotion_map'][i] for i in range(len(processed_data['new_emotion_map']))],
                yticklabels=[processed_data['new_emotion_map'][i] for i in range(len(processed_data['new_emotion_map']))])
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.title('Test Set Confusion Matrix')
    plt.tight_layout()
    plt.show()
    
    # 2. Normalized Confusion Matrix
    plt.figure(figsize=(10, 8))
    norm_conf_matrix = test_conf_matrix.astype('float') / test_conf_matrix.sum(axis=1)[:, np.newaxis]
    sns.heatmap(norm_conf_matrix, annot=True, fmt='.2f', cmap='Blues',
                xticklabels=[processed_data['new_emotion_map'][i] for i in range(len(processed_data['new_emotion_map']))],
                yticklabels=[processed_data['new_emotion_map'][i] for i in range(len(processed_data['new_emotion_map']))])
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.title('Normalized Test Set Confusion Matrix')
    plt.tight_layout()
    plt.show()
    
    # 3. Class-wise Metrics Visualization
    plt.figure(figsize=(12, 6))
    
    # Collect class metrics
    classes = []
    precision = []
    recall = []
    f1_scores = []
    
    for i in range(len(processed_data['new_emotion_map'])):
        if str(i) in test_report:
            classes.append(processed_data['new_emotion_map'][i])
            precision.append(test_report[str(i)]['precision'])
            recall.append(test_report[str(i)]['recall'])
            f1_scores.append(test_report[str(i)]['f1-score'])
    
    # Plot class metrics
    x = np.arange(len(classes))
    width = 0.25
    
    plt.bar(x - width, precision, width, label='Precision', color='skyblue')
    plt.bar(x, recall, width, label='Recall', color='salmon')
    plt.bar(x + width, f1_scores, width, label='F1-Score', color='lightgreen')
    
    plt.xlabel('Emotion Class')
    plt.ylabel('Score')
    plt.title('Test Set Performance Metrics by Class')
    plt.xticks(x, classes, rotation=45)
    plt.legend()
    plt.grid(axis='y', alpha=0.3)
    plt.tight_layout()
    plt.show()
    
    # 4. Sample misclassifications
    # Get a few misclassified examples from the test set
    test_pred = model.predict(processed_data['X_test_cnn'])
    test_pred_class = np.argmax(test_pred, axis=1)
    test_true_class = np.argmax(processed_data['y_test_5class_onehot'], axis=1)
    
    misclassified_indices = np.where(test_pred_class != test_true_class)[0]
    
    if len(misclassified_indices) > 0:
        sample_size = min(10, len(misclassified_indices))
        sample_indices = np.random.choice(misclassified_indices, sample_size, replace=False)
        
        plt.figure(figsize=(15, sample_size))
        
        for i, idx in enumerate(sample_indices):
            plt.subplot(2, 5, i+1)
            plt.imshow(processed_data['X_test_cnn'][idx].reshape(48, 48), cmap='gray')
            plt.title(f"True: {processed_data['new_emotion_map'][test_true_class[idx]]}\n" + 
                      f"Pred: {processed_data['new_emotion_map'][test_pred_class[idx]]}")
            plt.axis('off')
        
        plt.tight_layout()
        plt.show()
    else:
        print("No misclassified samples found in the test set (perfect accuracy)!")

### 7.3 Run Test Evaluation

In [44]:
def run_section7_pipeline(section6_results, processed_data):
    """
    Run the Section 7 pipeline for final test set evaluation.
    
    Args:
        section6_results: Results from Section 6
        processed_data: Dictionary of preprocessed data
        
    Returns:
        Dictionary of test evaluation results
    """
    print("\n========== 7. Final Test Set Evaluation ==========")
    
    # 1. Test set evaluation
    test_metrics = evaluate_on_test_set(section6_results['final_model'], processed_data)
    
    # 2. Visualize test set performance
    visualize_test_performance(test_metrics, processed_data)
    
    print("\nFinal test set evaluation complete!")
    print(f"Final test accuracy: {test_metrics['accuracy']:.4f}")
    
    return {
        'test_metrics': test_metrics,
        'final_model': section6_results['final_model']  # Include the model for reference
    }

### 7.4 Execute Final Evaluation

In [None]:
# Only run this after Section 6 is complete and you're ready for final evaluation
if 'section6_results' in globals() and 'processed_data' in globals():
    print("Running final test set evaluation...")
    section7_results = run_section7_pipeline(section6_results, processed_data)
else:
    print("Section 6 results or processed data not found. Please run previous sections first.")

In [None]:
!pip install flask opencv-python tensorflow


In [None]:
import cv2
import numpy as np
import tensorflow as tf
import json
import time
from collections import deque

# Load model and metadata
model = tf.keras.models.load_model('emotion_recognition_5class_final.h5')
with open('emotion_recognition_5class_final_metadata.json', 'r') as f:
    metadata = json.load(f)

# Preprocessing function
def preprocess_image(image):
    image = cv2.resize(image, (48, 48))
    if len(image.shape) == 3 and image.shape[2] > 1:
        image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    image = image / 255.0
    image = image.reshape(1, 48, 48, 1)
    return image

# Softer, modern color palette for emotions
emotion_colors = {
    "Engaged": (102, 204, 153),        
    "Frustrated": (102, 102, 255),       
    "Disengaged": (255, 153, 153),    # blue (unchanged)
    "Surprise": (0, 215, 255),        # yellow (unchanged)
    "Neutral": (128, 128, 128)        # gray (darker gray)
}

# Smoothing buffer
prediction_buffer = deque(maxlen=10)

# Start webcam
cap = cv2.VideoCapture(0)

# Haar Cascade face detection
face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')

while True:
    ret, frame = cap.read()
    if not ret:
        break

    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    faces = face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5)

    for (x, y, w, h) in faces:
        face_img = frame[y:y+h, x:x+w]
        processed_img = preprocess_image(face_img)

        # Predict and smooth
        raw_preds = model.predict(processed_img)[0]
        prediction_buffer.append(raw_preds)
        avg_preds = np.mean(prediction_buffer, axis=0)

        pred_class = np.argmax(avg_preds)
        emotion = metadata['emotion_map'][str(pred_class)]
        confidence = float(avg_preds[pred_class])

        # Draw bounding box
        box_color = emotion_colors.get(emotion, (255, 255, 255))
        cv2.rectangle(frame, (x, y), (x+w, y+h), box_color, 2)

        # Draw label above face
        label = f'{emotion}'
        cv2.putText(frame, label, (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX,
                    0.9, box_color, 2, lineType=cv2.LINE_AA)

        # Progress bars beside the face
        bar_x = x + w + 10
        bar_y = y
        bar_width = 50
        bar_height = h // len(metadata['emotion_map'])

        for i, (key, name) in enumerate(metadata['emotion_map'].items()):
            prob = avg_preds[int(key)]
            bar_length = int(prob * bar_width)
            color = emotion_colors.get(name, (255, 255, 255))
            y_pos = bar_y + i * bar_height

            # Background bar (light gray)
            cv2.rectangle(frame, (bar_x, y_pos),
                          (bar_x + bar_width, y_pos + bar_height - 4),
                          (230, 230, 230), -1, lineType=cv2.LINE_AA)

            # Colored progress bar
            cv2.rectangle(frame, (bar_x, y_pos),
                          (bar_x + bar_length, y_pos + bar_height - 4),
                          color, -1, lineType=cv2.LINE_AA)

            # Drop shadow text
            cv2.putText(frame, name, (bar_x + bar_width + 6, y_pos + bar_height - 6),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.42, (50, 50, 50), 2, lineType=cv2.LINE_AA)
            # Main text
            cv2.putText(frame, name, (bar_x + bar_width + 6, y_pos + bar_height - 6),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.42, color, 1, lineType=cv2.LINE_AA)

    # Display window
    cv2.imshow('Emotion Recognition', frame)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# Cleanup
cap.release()
cv2.destroyWindow('Emotion Recognition')
cv2.waitKey(1)
time.sleep(0.5)
