# Facial Expression Recognition

## Import Essential Libraries

In [None]:
import os
import matplotlib.pyplot as plt
import shutil
import math
from PIL import Image
import cv2
import tensorflow as tf
import random
import pandas as pd
import seaborn as sns

from tensorflow.keras.applications.resnet50 import preprocess_input as resnet_preprocess_input
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, Dropout
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import SparseCategoricalCrossentropy

## Random Seed

In [None]:
# Set all random seeds (Python, NumPy, and TensorFlow)
tf.keras.utils.set_random_seed(42)

random.seed(42)

## Config Tensorflow to use GPU

In [None]:
import tensorflow as tf

# List all physical devices (CPUs and GPUs) that TensorFlow can see
physical_devices = tf.config.list_physical_devices()
print(f"Physical devices detected: {physical_devices}")

# Specifically list GPUs
gpu_devices = tf.config.list_physical_devices('GPU')
if gpu_devices:
    print(f"\nNumber of GPUs available: {len(gpu_devices)}")
    for i, gpu in enumerate(gpu_devices):
        print(f"  GPU {i}: {gpu}")
    print("\nTensorFlow will automatically use the GPU if available.")
else:
    print("\nNo GPU devices found. TensorFlow will run on CPU.")

# You can also check if a random tensor is placed on GPU by default
# This should show GPU if one is available and being used
test_tensor = tf.constant([1.0, 2.0, 3.0])
print(f"\nDefault device for a tensor: {test_tensor.device}")

## Global Variables

In [None]:
# List of emotions
emotions = ['anger', 'contempt', 'disgust', 'fear', 'happy', 'neutral', 'sad', 'surprise']

In [None]:
# List of emotion lables
emotion_labels = {
    0: 'anger',
    1: 'contempt',
    2: 'disgust',
    3: 'fear',
    4: 'happy',
    5: 'neutral',
    6: 'sad',
    7: 'surprise'
}

## Define Global URLs

In [None]:
# Input related URLs
input_base_url = '/kaggle/input/'

train_images_url = os.path.join(input_base_url, 'affectnet-yolo-format/YOLO_format/train/images')
valid_images_url = os.path.join(input_base_url, 'affectnet-yolo-format/YOLO_format/valid/images')
test_images_url = os.path.join(input_base_url, 'affectnet-yolo-format/YOLO_format/test/images')

train_labels_url = os.path.join(input_base_url, 'affectnet-yolo-format/YOLO_format/train/labels')
valid_labels_url = os.path.join(input_base_url, 'affectnet-yolo-format/YOLO_format/valid/labels')
test_labels_url = os.path.join(input_base_url, 'affectnet-yolo-format/YOLO_format/test/labels')

In [None]:
# Output related URLs
output_base_url = '/kaggle/working/'

In [None]:
# Organized Images paths
organized_base_dir = os.path.join(output_base_url, 'organized_images')

organized_train_images = os.path.join(organized_base_dir, 'train')
organized_valid_images = os.path.join(organized_base_dir, 'valid')
organized_test_images = os.path.join(organized_base_dir, 'test')

In [None]:
# Resized Images paths
resized_base_dir = os.path.join(output_base_url, 'resized_images')

resized_train_images = os.path.join(resized_base_dir, 'train')
resized_valid_images = os.path.join(resized_base_dir, 'valid')
resized_test_images = os.path.join(resized_base_dir, 'test')

In [None]:
# Balanced Images paths
balanced_base_dir = os.path.join(output_base_url, 'balanced_images')

balanced_train_images = os.path.join(balanced_base_dir, 'train')
balanced_valid_images = os.path.join(balanced_base_dir, 'valid')
balanced_test_images = os.path.join(balanced_base_dir, 'test')

## Create Directories

In [None]:
# Create directories
os.makedirs(organized_base_dir, exist_ok=True)
os.makedirs(resized_base_dir, exist_ok=True)
os.makedirs(balanced_base_dir, exist_ok=True)

## Split Images into Emotion Folders

In [None]:
def reorganize_dataset(source_images_dir, source_labels_dir, destination_base_dir, emotion_map):
    """
    Reorganizes image files into emotion-specific subfolders based on YOLO-format label files.

    Args:
        source_images_dir (str): Path to the directory containing image files (e.g., train/images).
        source_labels_dir (str): Path to the directory containing label .txt files (e.g., train/labels).
        destination_base_dir (str): Path to the root directory where reorganized data will be saved.
                                    (e.g., /kaggle/working/processed_train)
        emotion_map (dict): A dictionary mapping integer class IDs to emotion names (e.g., {0: 'anger'}).
    """
    print(f"--- Reorganizing: {source_images_dir.split('/')[-2]} set ---")

    # Create destination directories for each emotion
    for emotion_id, emotion_name in emotion_map.items():
        # Using the emotion name (e.g., 'anger') as the subfolder name
        class_folder = os.path.join(destination_base_dir, emotion_name)
        os.makedirs(class_folder, exist_ok=True) # exist_ok=True prevents error if folder already exists
        print(f"  Created directory: {class_folder}")

    # Iterate through each image file
    image_files = [f for f in os.listdir(source_images_dir) if f.lower().endswith(('.jpg', '.jpeg', '.png'))]
    print(f"  Found {len(image_files)} image files in {source_images_dir}")

    processed_count = 0
    skipped_count = 0

    for img_filename in image_files:
        img_path = os.path.join(source_images_dir, img_filename)
        
        # Construct the corresponding label file name
        # Remove the image extension (.png, .jpg) and replace with .txt
        base_filename = os.path.splitext(img_filename)[0]
        label_filename = base_filename + '.txt'
        label_path = os.path.join(source_labels_dir, label_filename)

        if not os.path.exists(label_path):
            print(f"    Warning: Label file not found for {img_filename} at {label_path}. Skipping.")
            skipped_count += 1
            continue

        try:
            with open(label_path, 'r') as f:
                # Read the first line (assuming one object/emotion per image)
                label_line = f.readline().strip()
                # Extract the class_id (first number)
                class_id = int(label_line.split(' ')[0])

            emotion_name = emotion_map.get(class_id)
            if emotion_name is None:
                print(f"    Warning: Unknown class_id {class_id} for {img_filename}. Skipping.")
                skipped_count += 1
                continue

            destination_folder = os.path.join(destination_base_dir, emotion_name)
            destination_path = os.path.join(destination_folder, img_filename)

            # Copy the image file
            shutil.copy(img_path, destination_path)
            processed_count += 1

            if processed_count % 1000 == 0:
                print(f"    Processed {processed_count} images for {source_images_dir.split('/')[-2]} set...")

        except Exception as e:
            print(f"    Error processing {img_filename} or {label_filename}: {e}. Skipping.")
            skipped_count += 1
            continue

    print(f"--- Finished reorganizing {source_images_dir.split('/')[-2]} set. Processed: {processed_count}, Skipped: {skipped_count} ---")
    print(f"Reorganized data is in: {destination_base_dir}\n")

In [None]:
# Reorganize the training data
reorganize_dataset(train_images_url, train_labels_url, organized_train_images, emotion_labels)

In [None]:
os.makedirs(organized_valid_images, exist_ok=True)

# Reorganize the validation data
reorganize_dataset(valid_images_url, valid_labels_url, organized_valid_images, emotion_labels)

In [None]:
os.makedirs(organized_test_images, exist_ok=True)

# Reorganize the validation data
reorganize_dataset(test_images_url, test_labels_url, organized_test_images, emotion_labels)

## Check if All of Images are RGBs

In [None]:
def check_all_image_color_modes_strict_rgb(base_directory):
    """
    Checks the color mode of ALL images within a given base directory and its subfolders,
    specifically verifying if they are ONLY 'RGB' (3-channel color).

    Args:
        base_directory (str): The root directory of the dataset split to check.

    Returns:
        tuple: A tuple containing:
               - all_unique_modes (set): A set of all unique color modes found across all images.
               - non_rgb_paths (list): A list of paths to ALL files that are NOT 'RGB'.
    """
    if not os.path.exists(base_directory):
        print(f"Error: Directory not found: {base_directory}")
        return set(), []

    all_unique_modes = set()
    non_rgb_paths = [] # To store paths of ALL images found not to be strictly 'RGB'
    
    print(f"--- Scanning directory: {base_directory} for image color modes (checking ALL images for 'RGB' only) ---")

    # Get the list of emotion subfolders (e.g., 'anger', 'contempt')
    emotion_folders = [d for d in os.listdir(base_directory) if os.path.isdir(os.path.join(base_directory, d))]
    
    if not emotion_folders:
        print(f"  No emotion subfolders found in {base_directory}. Cannot perform color mode check.")
        return set(), []

    processed_count = 0
    # Iterate through each emotion folder
    for emotion_folder in emotion_folders:
        emotion_path = os.path.join(base_directory, emotion_folder)
        # List all common image files in the current emotion folder
        image_files = [f for f in os.listdir(emotion_path) if f.lower().endswith(('.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.webp', '.gif'))]
        
        # Iterate through each image file
        for img_filename in image_files:
            img_path = os.path.join(emotion_path, img_filename)
            try:
                with Image.open(img_path) as img:
                    all_unique_modes.add(img.mode) # Add the image's mode to the set of unique modes
                    
                    # If the mode is NOT strictly 'RGB', add its path to our list
                    if img.mode != 'RGB':
                        non_rgb_paths.append(img_path)
            except Exception as e:
                print(f"  Warning: Could not open {img_path} to check mode: {e}. Skipping this file.")
            
            processed_count += 1
            # Print progress every 1000 images
            if processed_count % 1000 == 0:
                print(f"  Processed {processed_count} images in {base_directory}...")

    print(f"Scan complete for {base_directory}. Total images processed: {processed_count}.")
    
    if all_unique_modes:
        print(f"Unique color modes found across all images: {sorted(list(all_unique_modes))}")
    else:
        print("No image files found for mode check.")
        
    if non_rgb_paths:
        print(f"WARNING: Detected {len(non_rgb_paths)} images that are NOT 'RGB'.")
        print("  These might include 'L' (grayscale), 'P' (palettized), 'RGBA' (color with alpha), etc.")
        print("  Sample paths of non-'RGB' images (up to 10 for brevity):")
        for p in non_rgb_paths[:10]: # Print only a small sample to avoid huge output
            print(f"    {p}")
    else:
        print("All images found are exclusively 'RGB' (3-channel color).")
        
    return all_unique_modes, non_rgb_paths

In [None]:
# Assuming organized_train_images, organized_valid_images, organized_test_images are defined from your global variables.

print("--- Starting Full STRICT 'RGB' Color Mode Check on ORGANIZED Images ---")

print("\n--- Checking Organized Training Images ---")
train_modes_strict, train_non_rgb_paths_strict = check_all_image_color_modes_strict_rgb(organized_train_images)

print("\n--- Checking Organized Validation Images ---")
valid_modes_strict, valid_non_rgb_paths_strict = check_all_image_color_modes_strict_rgb(organized_valid_images)

print("\n--- Checking Organized Test Images ---")
test_modes_strict, test_non_rgb_paths_strict = check_all_image_color_modes_strict_rgb(organized_test_images)

print("\n--- Full Strict 'RGB' Color Mode Check on Organized Directories Complete ---")

# The variables train_modes_strict, valid_modes_strict, test_modes_strict will contain sets like {'RGB', 'L', 'P', 'RGBA'}
# The lists train_non_rgb_paths_strict, etc., will contain paths to any images that were NOT strictly 'RGB'.

## Exploratory Data Analysis

### Verify Organized Images Directories

In [None]:
# Define a function to count images in the reorganized structure
def count_images_in_organized_dirs(base_dir, emotion_list):
    """
    Counts images in emotion subfolders within a given base directory.

    Args:
        base_dir (str): The root directory of the organized dataset split (e.g., '/kaggle/working/organized_images/train').
        emotion_list (list): A list of emotion names (which are also the subfolder names).

    Returns:
        dict: A dictionary with emotion names as keys and image counts as values.
    """
    counts = {}
    total_images = 0
    print(f"--- Checking: {base_dir.split('/')[-1].capitalize()} Set ---")

    if not os.path.exists(base_dir):
        print(f"  Warning: Base directory not found: {base_dir}. Skipping.")
        return counts, 0

    for emotion in emotion_list:
        emotion_path = os.path.join(base_dir, emotion)
        if os.path.exists(emotion_path) and os.path.isdir(emotion_path):
            # Count only files (not subdirectories)
            num_images = len([f for f in os.listdir(emotion_path) if os.path.isfile(os.path.join(emotion_path, f))])
            counts[emotion] = num_images
            total_images += num_images
            print(f"  {emotion.capitalize()}: {num_images} images")
        else:
            print(f"  Warning: Emotion directory not found for {emotion} in {base_dir}.")
            counts[emotion] = 0

    print(f"  Total images in {base_dir.split('/')[-1].capitalize()} Set: {total_images}\n")
    return counts, total_images

In [None]:
# Get counts for Training Set
train_counts, total_train = count_images_in_organized_dirs(organized_train_images, emotions)

In [None]:
# Get counts for Validation Set
valid_counts, total_valid = count_images_in_organized_dirs(organized_valid_images, emotions)

In [None]:
# Get counts for Test Set
test_counts, total_test = count_images_in_organized_dirs(organized_test_images, emotions)

In [None]:
print("\n--- Overall Summary of Reorganized Dataset ---")
print(f"Total images across all sets: {total_train + total_valid + total_test}")
print(f"Train Set Total: {total_train}")
print(f"Valid Set Total: {total_valid}")
print(f"Test Set Total: {total_test}")

In [None]:
# Store counts in global variables for future use (e.g., plotting in the next step)
globals()['train_counts'] = train_counts
globals()['valid_counts'] = valid_counts
globals()['test_counts'] = test_counts

### Class Distribution of Train, Valid and Test

In [None]:
print("Preparing DataFrames for class distribution plots...")

# Create DataFrames for each split
train_df = pd.DataFrame(list(train_counts.items()), columns=['Emotion', 'Count'])
train_df['Set'] = 'Train' # Add a 'Set' column (useful if we wanted a combined plot later)

valid_df = pd.DataFrame(list(valid_counts.items()), columns=['Emotion', 'Count'])
valid_df['Set'] = 'Validation'

test_df = pd.DataFrame(list(test_counts.items()), columns=['Emotion', 'Count'])
test_df['Set'] = 'Test'

print("DataFrames (train_df, valid_df, test_df) created.")

In [None]:
def plot_class_distribution(df, title):
    """
    Plots the class distribution for a given dataset split.

    Args:
        df (pd.DataFrame): DataFrame containing 'Emotion' and 'Count' columns.
        title (str): The title for the plot.
    """
    plt.figure(figsize=(10, 6)) # Adjust figure size as needed
    sns.barplot(x='Emotion', y='Count', data=df, palette='viridis')
    plt.title(title, fontsize=16)
    plt.xlabel('Emotion', fontsize=12)
    plt.ylabel('Number of Images', fontsize=12)
    plt.xticks(rotation=45, ha='right', fontsize=10) # Rotate labels for readability
    plt.yticks(fontsize=10)
    plt.grid(axis='y', linestyle='--', alpha=0.7)
    plt.tight_layout() # Adjust layout to prevent elements from overlapping
    plt.show()

In [None]:
# Plot train directory class distribution
plot_class_distribution(train_df, 'Training Set Class Distribution')

In [None]:
# Plot valid directory class distribution
plot_class_distribution(valid_df, 'Validation Set Class Distribution')

In [None]:
# Plot test directory class distribution
plot_class_distribution(test_df, 'Test Set Class Distribution')

### Visualize Images

In [None]:
def plot_emotion_samples_organized(dataset_split_base_dir: str, emotion_name: str, num_images_to_plot: int, images_per_row: int = 10):
    """
    Plots a specified number of sample images for a given emotion from an organized dataset split,
    preserving original image colors.

    Args:
        dataset_split_base_dir (str): The root directory of the organized dataset split (e.g., organized_train_images).
        emotion_name (str): The name of the emotion (e.g., 'happy', 'sad'). This should match the subfolder name.
        num_images_to_plot (int): The maximum number of images to plot for this emotion.
        images_per_row (int, optional): How many images to display horizontally in each row.
                                        Defaults to 10.
    """
    emotion_path = os.path.join(dataset_split_base_dir, emotion_name)

    if not os.path.exists(emotion_path):
        print(f"Error: Directory not found for '{emotion_name}' at '{emotion_path}'. Skipping plot.")
        return

    image_files = [f for f in os.listdir(emotion_path) if f.lower().endswith(('.jpg', '.jpeg', '.png'))]

    if not image_files:
        print(f"No images found in '{emotion_path}' for emotion '{emotion_name}'. Skipping plot.")
        return

    images_to_display = random.sample(image_files, min(len(image_files), num_images_to_plot))

    num_images_actual = len(images_to_display)
    if num_images_actual == 0:
        print(f"No images selected for display for '{emotion_name}'. Skipping plot.")
        return

    num_rows = math.ceil(num_images_actual / images_per_row)

    fig_width = images_per_row * 1.8 # Adjusted for slightly larger color images
    fig_height = num_rows * 2.2 # Slightly more height per row
    
    plt.figure(figsize=(fig_width, fig_height))
    
    split_name = dataset_split_base_dir.split(os.sep)[-1].capitalize() 
    plt.suptitle(f"Sample Images: {emotion_name.capitalize()} ({split_name} Set)", fontsize=18, y=1.02)

    for i, img_filename in enumerate(images_to_display):
        img_path = os.path.join(emotion_path, img_filename)

        try:
            img = Image.open(img_path) 
            
            ax = plt.subplot(num_rows, images_per_row, i + 1)
            ax.imshow(img) 
            ax.set_title(f"#{i+1}", fontsize=8)
            ax.axis('off')

        except Exception as e:
            print(f"  Warning: Could not load image '{img_filename}' from '{emotion_path}': {e}. Skipping.")
            ax = plt.subplot(num_rows, images_per_row, i + 1)
            ax.text(0.5, 0.5, 'Error', ha='center', va='center', fontsize=12, color='red')
            ax.axis('off')

    plt.tight_layout(rect=[0, 0.03, 1, 0.98])
    plt.show()
    print(f"Finished plotting {num_images_actual} images for '{emotion_name}' in the {split_name} set.\n")

In [None]:
NUM_IMAGES = 20

In [None]:
print("--- Visualizing Training Set Images ---")
for emotion in emotions:
    plot_emotion_samples_organized(organized_train_images, emotion, NUM_IMAGES)

In [None]:
print("--- Visualizing Validation Set Images ---")
for emotion in emotions:
    plot_emotion_samples_organized(organized_valid_images, emotion, NUM_IMAGES)

In [None]:
print("--- Visualizing Test Set Images ---")
for emotion in emotions:
    plot_emotion_samples_organized(organized_test_images, emotion, NUM_IMAGES)

## Resize Images to 224x224

In [None]:
def resize_and_save_dataset(source_base_dir, destination_base_dir, target_size=(224, 224)):
    """
    Resizes all images in emotion subfolders from a source directory and saves them
    to a new destination directory, maintaining the folder structure and preserving color.

    Args:
        source_base_dir (str): Path to the root directory of the already organized dataset split
                               (e.g., '/kaggle/working/organized_images/train').
        destination_base_dir (str): Path to the root directory where resized images will be saved.
                                    (e.g., '/kaggle/working/resized_images/train')
        target_size (tuple): A tuple (width, height) for the new image size.
    """
    print(f"--- Resizing and saving images for: {source_base_dir.split(os.sep)[-1].capitalize()} set to {target_size} ---")

    if not os.path.exists(source_base_dir):
        print(f"  Error: Source directory not found: {source_base_dir}. Skipping.")
        return

    os.makedirs(destination_base_dir, exist_ok=True)

    emotions_in_source = [d for d in os.listdir(source_base_dir) if os.path.isdir(os.path.join(source_base_dir, d))]
    
    if not emotions_in_source:
        print(f"  No emotion subfolders found in {source_base_dir}. Skipping.")
        return

    total_resized_count = 0
    total_skipped_count = 0

    for emotion_folder in emotions_in_source:
        source_emotion_path = os.path.join(source_base_dir, emotion_folder)
        destination_emotion_path = os.path.join(destination_base_dir, emotion_folder)

        os.makedirs(destination_emotion_path, exist_ok=True)

        image_files = [f for f in os.listdir(source_emotion_path) if f.lower().endswith(('.jpg', '.jpeg', '.png'))]

        resized_count_per_emotion = 0
        skipped_count_per_emotion = 0

        for img_filename in image_files:
            source_img_path = os.path.join(source_emotion_path, img_filename)
            destination_img_path = os.path.join(destination_emotion_path, img_filename)

            try:
                # Open image in its original mode (e.g., RGB)
                img = Image.open(source_img_path)
                # Convert to RGB just in case it's a palette or RGBA image, to ensure 3 channels
                if img.mode != 'RGB':
                    img = img.convert('RGB')
                
                # Resize image using LANCZOS for high-quality upscaling/downsampling
                resized_img = img.resize(target_size, Image.LANCZOS)
                
                resized_img.save(destination_img_path)
                resized_count_per_emotion += 1
                total_resized_count += 1
                
                if resized_count_per_emotion % 1000 == 0:
                    print(f"      Processed {resized_count_per_emotion} images in {emotion_folder} for {source_base_dir.split(os.sep)[-1].capitalize()} set...")

            except Exception as e:
                print(f"    Error resizing {img_filename} from {source_emotion_path}: {e}. Skipping.")
                skipped_count_per_emotion += 1
                total_skipped_count += 1
                continue
        
    print(f"--- Finished resizing and saving {source_base_dir.split(os.sep)[-1].capitalize()} set. Total Resized: {total_resized_count}, Total Skipped: {total_skipped_count} ---")
    print(f"Resized data saved to: {destination_base_dir}\n")

In [None]:
IMAGE_SIZE = (224, 224)

In [None]:
resize_and_save_dataset(organized_train_images, resized_train_images, target_size=IMAGE_SIZE)

In [None]:
resize_and_save_dataset(organized_valid_images, resized_valid_images, target_size=IMAGE_SIZE)

In [None]:
resize_and_save_dataset(organized_test_images, resized_test_images, target_size=IMAGE_SIZE)

## Data Augmentation

In [None]:
print("Defining the Keras Data Augmentation layer (tf.keras.Sequential model)...")

data_augmentation_pipeline = tf.keras.Sequential([
    tf.keras.layers.RandomFlip("horizontal"),
    tf.keras.layers.RandomRotation(0.1),
    tf.keras.layers.RandomZoom(0.1),
    # Add more robust augmentation layers here as needed:
    # tf.keras.layers.RandomTranslation(height_factor=0.1, width_factor=0.1),
    # tf.keras.layers.RandomContrast(0.1),
    # tf.keras.layers.RandomBrightness(0.1),
], name="data_augmentation_pipeline")

print("Data augmentation Sequential layer 'data_augmentation_pipeline' defined.")

In [None]:
def generate_and_save_augmented_images(
    source_emotion_dir,
    dest_emotion_dir,
    current_count,
    target_count_per_class,
    augmentation_model, # The tf.keras.Sequential model for augmentation
    image_size=(224, 224)
):
    """
    Generates and saves augmented images for a specific emotion class to reach a target count.
    First, it copies existing images, then generates augmented ones.

    Args:
        source_emotion_dir (str): Path to the source directory for the current emotion.
        dest_emotion_dir (str): Path to the destination directory for the current emotion.
        current_count (int): Current number of images in the source_emotion_dir.
        target_count_per_class (int): Desired total number of images per class after balancing.
        augmentation_model (tf.keras.Sequential): Keras Sequential model with augmentation layers.
        image_size (tuple): Tuple (height, width) for the images.
    """
    os.makedirs(dest_emotion_dir, exist_ok=True)

    image_files = [f for f in os.listdir(source_emotion_dir) if f.lower().endswith(('.jpg', '.jpeg', '.png'))]
    
    # 1. Copy all existing images to the new balanced directory
    print(f"  Copying {len(image_files)} existing images from {source_emotion_dir} to {dest_emotion_dir}...")
    for img_filename in image_files:
        try:
            shutil.copy(os.path.join(source_emotion_dir, img_filename), os.path.join(dest_emotion_dir, img_filename))
        except Exception as e:
            print(f"    Warning: Error copying {img_filename}: {e}. Skipping copy.")
            
    # Calculate how many new augmented images are needed
    images_to_generate = target_count_per_class - current_count
    
    if images_to_generate <= 0:
        print(f"  Class '{os.path.basename(source_emotion_dir)}' already has {current_count} images (target {target_count_per_class}). No new images to generate.")
        return

    print(f"  Generating {images_to_generate} new augmented images for '{os.path.basename(source_emotion_dir)}'...")

    loaded_images = []
    for img_filename in image_files:
        img_path = os.path.join(source_emotion_dir, img_filename)
        try:
            img = Image.open(img_path).convert('RGB')
            loaded_images.append(np.array(img, dtype=np.float32))
        except Exception as e:
            print(f"    Warning: Could not load {img_path} for augmentation generation: {e}. Skipping.")
    
    if not loaded_images:
        print(f"    No images successfully loaded from {source_emotion_dir}. Cannot generate augmented images.")
        return

    for i in range(images_to_generate):
        original_img_np = random.choice(loaded_images)
        original_img_tensor = tf.expand_dims(original_img_np, 0)
        
        augmented_tensor = augmentation_model(original_img_tensor, training=True)
        augmented_img_np = augmented_tensor[0].numpy().astype("uint8")
        augmented_img_pil = Image.fromarray(augmented_img_np, 'RGB')
        
        original_base_name = os.path.splitext(random.choice(image_files))[0]
        new_filename = f"{original_base_name}_aug_{i:04d}_{random.randint(0,9999)}.png"
        save_path = os.path.join(dest_emotion_dir, new_filename)
        
        try:
            augmented_img_pil.save(save_path)
        except Exception as e:
            print(f"    Error saving augmented image {new_filename} to {dest_emotion_dir}: {e}. Skipping.")
            continue

        if (i + 1) % 100 == 0 or (i + 1) == images_to_generate:
            print(f"    Generated {i+1}/{images_to_generate} images for '{os.path.basename(source_emotion_dir)}'...")
    
    print(f"  Finished generating augmented images for '{os.path.basename(source_emotion_dir)}'. Total images now: {target_count_per_class}")

In [None]:
target_count_per_class_train = 4000 # Your specified target for each training class

print(f"Starting data balancing for training set. Each class will aim for {target_count_per_class_train} images.")

for emotion_id, emotion_name in emotion_labels.items():
    current_count = train_counts.get(emotion_name, 0)

    source_dir_emotion = os.path.join(resized_train_images, emotion_name)
    dest_dir_emotion = os.path.join(balanced_train_images, emotion_name)

    generate_and_save_augmented_images(
        source_dir_emotion,
        dest_dir_emotion,
        current_count,
        target_count_per_class_train,
        data_augmentation_pipeline, # Use the defined augmentation Sequential model
        image_size=IMAGE_SIZE
    )

print("\nTraining data balancing process complete.")
print("The 'balanced_train_images' directory now contains the augmented and copied training data.")

In [None]:
def copy_split_for_balancing_structure(source_path, dest_path):
    set_name = source_path.split(os.sep)[-1]
    print(f"--- Copying {set_name.capitalize()} Set (no balancing/augmentation) ---")
    if os.path.exists(dest_path):
        print(f"  Removing existing content in {dest_path} before copying...")
        shutil.rmtree(dest_path)
    os.makedirs(dest_path, exist_ok=True)
    
    shutil.copytree(source_path, dest_path, dirs_exist_ok=True)
    print(f"  Finished copying {set_name} to {dest_path}.\n")

copy_split_for_balancing_structure(resized_valid_images, balanced_valid_images)
copy_split_for_balancing_structure(resized_test_images, balanced_test_images)

print("Validation and Test sets copied to 'balanced_images' structure.")
print("Your 'balanced_base_dir' is now ready to use for Keras Datasets.")

In [None]:
# Assuming train_counts is available from Cell 12
print("Calculating class weights for balanced training dataset (for potential loss adjustment)...")

# Get the counts for the newly balanced training set to see the effect
# This will count the images in the BALANCED_TRAIN_IMAGES_DIR
balanced_train_counts, _ = count_images_in_organized_dirs(balanced_train_images, emotions)

# Convert counts to an ordered list based on emotion_labels order (0 to 7)
balanced_train_counts_ordered = [balanced_train_counts[emotion_labels[i]] for i in range(len(emotion_labels))]

total_samples_balanced_train = sum(balanced_train_counts_ordered)
max_samples_balanced_train = float(max(balanced_train_counts_ordered))

class_weights = {}
for i, count in enumerate(balanced_train_counts_ordered):
    class_weights[i] = max_samples_balanced_train / count

print("\nCalculated Class Weights (for SparseCategoricalCrossentropy):")
for idx, weight in class_weights.items():
    print(f"  {emotion_labels[idx].capitalize()} (ID {idx}): Count={balanced_train_counts_ordered[idx]}, Weight={weight:.2f}")

# Store class_weights in a global variable for use in model.fit()
# Using snake_case for consistency
class_weights_dict = class_weights

print("\nClass weights calculated and stored in class_weights_dict global variable.")

## Normalize

## Model

In [None]:
# Define model input image size and batch size
IMAGE_SIZE = (224, 224) # ResNet50 expects 224x224 input
BATCH_SIZE = 64 # A common and good starting batch size

# Number of classes based on your EMOTION_LABELS dictionary
NUM_CLASSES = len(emotion_labels) # EMOTION_LABELS should be globally available from Cell 5

print(f"Preparing datasets from resized images with size {IMAGE_SIZE} and batch size {BATCH_SIZE}...")
print(f"Number of emotion classes: {NUM_CLASSES}")

In [None]:
# Define the data augmentation layers
# These transformations will be applied randomly to training images on each epoch.
data_augmentation = tf.keras.Sequential([
    tf.keras.layers.RandomFlip("horizontal"), # Randomly flip images horizontally
    tf.keras.layers.RandomRotation(0.1), # Randomly rotate images by +/- 10% (36 degrees)
    tf.keras.layers.RandomZoom(0.1), # Randomly zoom in/out by +/- 10%
    # tf.keras.layers.RandomTranslation(height_factor=0.1, width_factor=0.1), # Optional: Randomly shift images
    # tf.keras.layers.RandomContrast(0.1), # Optional: Randomly adjust contrast
    # tf.keras.layers.RandomBrightness(0.1), # Optional: Randomly adjust brightness
], name="data_augmentation_layer") # Name the sequential layer for clarity

In [None]:
# Define a function to apply the ResNet50-specific preprocessing
# This function handles the normalization (scaling pixels to [-1, 1] based on ImageNet stats)
def apply_resnet_preprocessing(image, label):
    image = resnet_preprocess_input(image)
    return image, label

In [None]:
# 1. Load the datasets using tf.keras.utils.image_dataset_from_directory
#    Images are already 224x224 from the resizing step, so image_size here primarily ensures loading consistency.
#    Labels are inferred from the emotion subfolder names (e.g., 'anger', 'contempt', etc.).

# Training Dataset
train_ds = tf.keras.utils.image_dataset_from_directory(
    resized_train_images, # Using your already resized training images
    labels='inferred',
    label_mode='int',
    image_size=IMAGE_SIZE,
    interpolation='bilinear',
    batch_size=BATCH_SIZE,
    shuffle=True, # Shuffle training data
    seed=42 # Set seed for reproducibility
)

# Validation Dataset
valid_ds = tf.keras.utils.image_dataset_from_directory(
    resized_valid_images,
    labels='inferred',
    label_mode='int',
    image_size=IMAGE_SIZE,
    interpolation='bilinear',
    batch_size=BATCH_SIZE,
    shuffle=False, # No need to shuffle validation data
    seed=42
)

# Test Dataset
test_ds = tf.keras.utils.image_dataset_from_directory(
    resized_test_images,
    labels='inferred',
    label_mode='int',
    image_size=IMAGE_SIZE,
    interpolation='bilinear',
    batch_size=BATCH_SIZE,
    shuffle=False, # No need to shuffle test data
    seed=42
)

In [None]:
# Verify the class names inferred by Keras (they should match your emotion folder names alphabetically)
keras_inferred_class_names = train_ds.class_names
print(f"\nKeras inferred class names (alphabetical order from folders): {keras_inferred_class_names}")

# 2. Apply data augmentation to the training dataset ONLY
#    Then apply ResNet50 preprocessing and optimize dataset loading.
train_ds = train_ds.map(lambda x, y: (data_augmentation(x, training=True), y), num_parallel_calls=tf.data.AUTOTUNE) # Apply augmentation
train_ds = train_ds.map(apply_resnet_preprocessing, num_parallel_calls=tf.data.AUTOTUNE).cache().prefetch(buffer_size=tf.data.AUTOTUNE)

# Apply ResNet50 preprocessing to validation and test datasets (NO augmentation here)
valid_ds = valid_ds.map(apply_resnet_preprocessing, num_parallel_calls=tf.data.AUTOTUNE).cache().prefetch(buffer_size=tf.data.AUTOTUNE)
test_ds = test_ds.map(apply_resnet_preprocessing, num_parallel_calls=tf.data.AUTOTUNE).cache().prefetch(buffer_size=tf.data.AUTOTUNE)

print("Datasets prepared, augmented (training only), preprocessed, and optimized.")

In [None]:
print("Defining ResNet50 model with Data Augmentation and Dropout layers...")

# Load the ResNet50 base model pre-trained on ImageNet.
base_model = ResNet50(weights='imagenet', include_top=False, input_shape=(IMAGE_SIZE[0], IMAGE_SIZE[1], 3))

# Initially freeze the entire base model for the first phase of training (feature extraction)
base_model.trainable = False

# Create the new model:
# 1. Input layer (receives original images)
# 2. Data augmentation layer (applies transformations ONLY during training)
# 3. Base model (ResNet50 convolutional layers)
# 4. Custom classification head (GlobalAveragePooling, Dropout, Dense)

inputs = tf.keras.Input(shape=(IMAGE_SIZE[0], IMAGE_SIZE[1], 3))
x = data_augmentation(inputs) # Apply data augmentation
x = base_model(x, training=False) # Pass through the frozen base model (training=False to ensure BatchNorm layers stay frozen)
x = GlobalAveragePooling2D()(x) # Reduce spatial dimensions
x = Dropout(0.5)(x) # Add a Dropout layer (e.g., 50% dropout rate) to prevent overfitting
outputs = Dense(NUM_CLASSES, activation='softmax')(x) # Output layer for 8 emotion classes

model = Model(inputs=inputs, outputs=outputs)

# Compile the model for the first phase of training (feature extraction)
model.compile(optimizer=Adam(learning_rate=0.001), # Standard learning rate for initial phase
              loss=SparseCategoricalCrossentropy(),
              metrics=['accuracy'])

print("ResNet50 model (with augmentation and dropout) defined and compiled for initial feature extraction.")

In [None]:
print("Displaying model architecture summary (initial phase - frozen base)...")
model.summary()
print("\nModel architecture summary displayed. Note the 'data_augmentation_layer' and 'Dropout' layer.")
print("Also, ResNet50 layers are mostly 'Non-trainable params' in this phase.")

In [None]:
# Assuming 'model' is defined and compiled from Cell 23

INITIAL_EPOCHS = 10 # You can adjust this based on how quickly validation loss plateaus.

print(f"Starting initial training phase (feature extraction) for {INITIAL_EPOCHS} epochs...")

callbacks_initial = [
    tf.keras.callbacks.EarlyStopping(
        monitor='val_loss',
        patience=3,
        restore_best_weights=True
    )
]

history_initial = model.fit(
    train_ds,
    epochs=INITIAL_EPOCHS,
    validation_data=valid_ds,
    callbacks=callbacks_initial
)

print("\nInitial model training (feature extraction) complete.")
print(f"Training stopped after {len(history_initial.history['loss'])} epochs.")

In [None]:
import tensorflow as tf
from tensorflow.keras.optimizers import Adam

print("Preparing model for fine-tuning phase...")

# Unfreeze from a specific layer/block
# ResNet50 has 5 main blocks (conv2_block, conv3_block, conv4_block, conv5_block).
# It's common to unfreeze conv4_block and conv5_block, or just conv5_block.
# Let's try unfreezing from 'conv5_block1_1_conv' onwards (which is typically the start of the last major block).
# Inspect model.summary() to see layer names.

# Freeze all layers in the base_model first
base_model.trainable = True # First set trainable for entire base model

# Then, iterate and freeze layers you want to keep frozen
# The goal is to freeze early layers (generic features) and unfreeze later layers (more specific features)
# A common practice for ResNet50 is to unfreeze from `conv5_block1_0_conv` or similar later blocks.
# Let's freeze all layers up to and including 'conv4_block6_out'
for layer in base_model.layers:
    if 'conv5_block' not in layer.name: # Example: Freeze everything before conv5_block
        layer.trainable = False
    else:
        layer.trainable = True
        # print(f"Unfrozen layer: {layer.name}") # Optional: to see which layers are unfrozen

# Verify the number of trainable layers (optional)
# trainable_count = sum(1 for layer in model.trainable_weights)
# print(f"Number of trainable weights after partial unfreezing: {trainable_count}")


# Recompile the model with a much lower learning rate for fine-tuning.
model.compile(optimizer=Adam(learning_rate=0.00001), # Learning rate significantly reduced (e.g., 1e-5)
              loss=SparseCategoricalCrossentropy(),
              metrics=['accuracy'])

print("Base model partially unfrozen and model recompiled for fine-tuning with a lower learning rate.")

In [None]:
print("Displaying model architecture summary (fine-tuning phase - partially unfrozen base)...")
# Note the trainable parameters should be higher than initial phase, but lower than full unfreeze.
model.summary()
print("\nModel architecture summary displayed. Observe 'Trainable params' for partial unfreezing.")

In [None]:
# Assuming 'history_initial' object from previous phase is available.

FINE_TUNE_EPOCHS = 30 # More epochs for fine-tuning, you can increase this further (e.g., 20-30)
TOTAL_EPOCHS_CUMULATIVE = INITIAL_EPOCHS + FINE_TUNE_EPOCHS

print(f"Starting fine-tuning phase for {FINE_TUNE_EPOCHS} additional epochs (Total cumulative epochs: {TOTAL_EPOCHS_CUMULATIVE})...")

callbacks_fine_tune = [
    tf.keras.callbacks.EarlyStopping(
        monitor='val_loss',
        patience=5, # Increased patience for fine-tuning
        restore_best_weights=True
    )
]

history_fine_tune = model.fit(
    train_ds,
    epochs=TOTAL_EPOCHS_CUMULATIVE,
    initial_epoch=history_initial.epoch[-1] + 1, # Continue from where initial training left off
    validation_data=valid_ds,
    callbacks=callbacks_fine_tune
)

print("\nModel fine-tuning complete.")

In [None]:
import matplotlib.pyplot as plt # Ensure matplotlib.pyplot is imported

print("Plotting cumulative training and validation accuracy and loss...")

# Get training history data from the initial phase
acc = history_initial.history['accuracy']
val_acc = history_initial.history['val_accuracy']
loss = history_initial.history['loss']
val_loss = history_initial.history['val_loss']

# Append training history data from the fine-tuning phase
acc.extend(history_fine_tune.history['accuracy'])
val_acc.extend(history_fine_tune.history['val_accuracy'])
loss.extend(history_fine_tune.history['loss'])
val_loss.extend(history_fine_tune.history['val_loss'])

epochs_range = range(len(acc)) # Create a range for the x-axis, covering all combined epochs

plt.figure(figsize=(12, 5))

# Subplot 1: Accuracy
plt.subplot(1, 2, 1)
plt.plot(epochs_range, acc, label='Training Accuracy')
plt.plot(epochs_range, val_acc, label='Validation Accuracy')
plt.axvline(x=len(history_initial.history['accuracy']) - 1, color='r', linestyle='--', label='Start Fine-tuning')
plt.legend(loc='lower right')
plt.title('Training and Validation Accuracy (Cumulative)', fontsize=14)
plt.xlabel('Epoch', fontsize=12)
plt.ylabel('Accuracy', fontsize=12)
plt.grid(True)

# Subplot 2: Loss
plt.subplot(1, 2, 2)
plt.plot(epochs_range, loss, label='Training Loss')
plt.plot(epochs_range, val_loss, label='Validation Loss')
plt.axvline(x=len(history_initial.history['loss']) - 1, color='r', linestyle='--', label='Start Fine-tuning')
plt.legend(loc='upper right')
plt.title('Training and Validation Loss (Cumulative)', fontsize=14)
plt.xlabel('Epoch', fontsize=12)
plt.ylabel('Loss', fontsize=12)
plt.grid(True)

plt.tight_layout()
plt.show()

print("\nCumulative Accuracy and Loss plots displayed.")

In [None]:
print("Evaluating the fine-tuned model on the test set...")

loss, accuracy = model.evaluate(test_ds)

print(f"\nTest Loss (after fine-tuning): {loss:.4f}")
print(f"Test Accuracy (after fine-tuning): {accuracy:.4f}")

print("\nModel testing complete.")