In [3]:
import tensorflow as tf
import numpy as np
import os
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split
from PIL import Image
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn.utils.class_weight import compute_class_weight
import pickle

# Set paths
train_dir = '../data/train'
test_dir = '../data/test'
img_size = 48  # Already 48x48, but you can change if needed

# Class labels
class_names = sorted(os.listdir(train_dir))  # ['angry', 'disgust', ...]

def load_images_from_folder(folder_path, class_names):
    """
    Load images from folders where each subfolder is a class
    """
    X = []
    y = []
    class_counts = {class_name: 0 for class_name in class_names}
    
    for label_idx, class_name in enumerate(class_names):
        class_folder = os.path.join(folder_path, class_name)
        for file in os.listdir(class_folder):
            file_path = os.path.join(class_folder, file)
            img = Image.open(file_path).convert('L')  # Convert to grayscale
            img = img.resize((img_size, img_size))   # Resize
            X.append(np.array(img))
            y.append(label_idx)
            class_counts[class_name] += 1
    
    return np.array(X), np.array(y), class_counts

# Load training data
X_train, y_train, train_counts = load_images_from_folder(train_dir, class_names)
X_test, y_test, test_counts = load_images_from_folder(test_dir, class_names)

# Normalize images
X_train = X_train / 255.0
X_test = X_test / 255.0

# Expand dimensions for CNN input: (samples, 48, 48, 1)
X_train = np.expand_dims(X_train, -1)
X_test = np.expand_dims(X_test, -1)

# One-hot encode labels
y_train_categorical = to_categorical(y_train, num_classes=len(class_names))
y_test_categorical = to_categorical(y_test, num_classes=len(class_names))

# Create validation set from training
X_train, X_val, y_train_categorical, y_val = train_test_split(
    X_train, y_train_categorical, test_size=0.1, stratify=y_train_categorical, random_state=42
)

# Calculate class weights for weighted loss function
y_integers = np.argmax(y_train_categorical, axis=1)
class_weights = compute_class_weight(
    class_weight='balanced',
    classes=np.unique(y_integers),
    y=y_integers
)
class_weight_dict = dict(enumerate(class_weights))

# Setup a data augmentation pipeline for minority classes
datagen = ImageDataGenerator(
    rescale=1./255,
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode='nearest'
)

def balance_dataset_with_augmentation(X, y_categorical, target_samples_per_class=7000):
    """
    Balance the dataset by generating augmented samples for minority classes
    Args:
        X: Input data, shape (samples, height, width, channels)
        y_categorical: One-hot encoded labels
        target_samples_per_class: Target number of samples per class
    Returns:
        X_balanced, y_balanced_categorical: Balanced dataset
    """
    y_indices = np.argmax(y_categorical, axis=1)
    X_balanced = []
    y_balanced = []
    
    # Process each class
    for class_idx in range(len(class_names)):
        # Get samples of this class
        indices = np.where(y_indices == class_idx)[0]
        X_class = X[indices]
        
        # Add all original samples
        X_balanced.extend(X_class)
        y_balanced.extend([class_idx] * len(X_class))
        
        # If we need more samples, generate them with augmentation
        if len(X_class) < target_samples_per_class:
            samples_needed = target_samples_per_class - len(X_class)
            
            # Generate augmented samples
            aug_samples = 0
            batch_size = min(len(X_class), 32)  # Avoid too small batches
            
            # Configure generator for this class
            augment_datagen = datagen.flow(
                X_class, 
                np.ones(len(X_class)),  # Dummy labels
                batch_size=batch_size,
                shuffle=True
            )
            
            while aug_samples < samples_needed:
                batch_X, _ = next(augment_datagen)
                batch_size_actual = len(batch_X)
                to_add = min(batch_size_actual, samples_needed - aug_samples)
                
                X_balanced.extend(batch_X[:to_add])
                y_balanced.extend([class_idx] * to_add)
                aug_samples += to_add
    
    # Convert to arrays and shuffle
    X_balanced = np.array(X_balanced)
    y_balanced = np.array(y_balanced)
    
    # Shuffle
    indices = np.arange(len(X_balanced))
    np.random.shuffle(indices)
    X_balanced = X_balanced[indices]
    y_balanced = y_balanced[indices]
    
    # Convert labels back to one-hot encoding
    y_balanced_categorical = to_categorical(y_balanced, num_classes=len(class_names))
    
    return X_balanced, y_balanced_categorical

# Uncomment the following line to apply data augmentation balancing
# X_train_balanced, y_train_balanced = balance_dataset_with_augmentation(X_train, y_train_categorical)

# Print shapes only
print("Dataset shapes:")
print(f"  Train: {X_train.shape}")
print(f"  Validation: {X_val.shape}")
print(f"  Test: {X_test.shape}")

np.save('../data/X_train.npy', X_train)
np.save('../data/X_val.npy', X_val)
np.save('../data/y_train.npy', y_train_categorical)
np.save('../data/y_val.npy', y_val)
np.save('../data/X_test.npy', X_test)
np.save('../data/y_test.npy', y_test_categorical)

with open('../data/class_weight.pkl', 'wb') as f:
    pickle.dump(class_weight_dict, f)
with open('../data/class_names.pkl', 'wb') as f:
    pickle.dump(class_names, f)

Dataset shapes:
  Train: (25838, 48, 48, 1)
  Validation: (2871, 48, 48, 1)
  Test: (7178, 48, 48, 1)
