In [1]:
!pip install -U albumentations

Collecting albumentations
  Downloading albumentations-2.0.1-py3-none-any.whl.metadata (38 kB)
Collecting albucore==0.0.23 (from albumentations)
  Downloading albucore-0.0.23-py3-none-any.whl.metadata (5.3 kB)
Collecting simsimd>=5.9.2 (from albucore==0.0.23->albumentations)
  Downloading simsimd-6.2.1-cp310-cp310-manylinux_2_28_x86_64.whl.metadata (66 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m66.0/66.0 kB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
Downloading albumentations-2.0.1-py3-none-any.whl (276 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m276.8/276.8 kB[0m [31m9.3 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading albucore-0.0.23-py3-none-any.whl (14 kB)
Downloading simsimd-6.2.1-cp310-cp310-manylinux_2_28_x86_64.whl (632 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m632.7/632.7 kB[0m [31m22.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: simsimd, albucore, albumentations
  Att

In [2]:
import os
import shutil
from albumentations import (
    RandomBrightnessContrast,
    GaussianBlur,
    Affine,
    HueSaturationValue,
    Compose
)
from PIL import Image, ImageOps
import numpy as np
from zipfile import ZipFile
import logging

# Setup logging
log_file = "/kaggle/working/dataset_preparation.log"
logging.basicConfig(
    filename=log_file,
    filemode='w',
    format='%(asctime)s - %(levelname)s - %(message)s',
    level=logging.INFO
)

def log_and_print(message, level=logging.INFO):
    """Logs and prints a message."""
    logging.log(level, message)
    print(message)

# Augmentation Pipeline
augmentation_pipeline = Compose([
    RandomBrightnessContrast(p=0.5),
    GaussianBlur(blur_limit=(3, 5), p=0.3),
    Affine(scale=(0.95, 1.05), translate_percent=(0.05, 0.05), rotate=0, p=0.7),
    HueSaturationValue(hue_shift_limit=10, sat_shift_limit=15, val_shift_limit=10, p=0.5)
])

# Analyze dataset to find class sizes and the largest class
def analyze_dataset(dataset_dir):
    class_counts = {}
    for class_name in os.listdir(dataset_dir):
        class_dir = os.path.join(dataset_dir, class_name)
        if os.path.isdir(class_dir):
            class_counts[class_name] = len([f for f in os.listdir(class_dir) if f.endswith(('.jpg', '.png', '.jpeg'))])
    max_class_size = max(class_counts.values())
    return class_counts, max_class_size

# Copy all original images
def copy_images(input_dir, output_dir):
    for class_name in os.listdir(input_dir):
        class_dir = os.path.join(input_dir, class_name)
        output_class_dir = os.path.join(output_dir, class_name)
        os.makedirs(output_class_dir, exist_ok=True)
        if os.path.isdir(class_dir):
            images = [f for f in os.listdir(class_dir) if f.endswith(('.jpg', '.png', '.jpeg'))]
            for img_name in images:
                shutil.copy(os.path.join(class_dir, img_name), os.path.join(output_class_dir, img_name))
            log_and_print(f"Copied {len(images)} images for class '{class_name}'.")

# Augment images to balance the dataset
def augment_images(input_dir, output_dir, max_class_size, class_counts):
    for class_name, count in class_counts.items():
        augment_needed = max_class_size - count
        if augment_needed > 0:
            class_dir = os.path.join(input_dir, class_name)
            output_class_dir = os.path.join(output_dir, class_name)
            images = [f for f in os.listdir(class_dir) if f.endswith(('.jpg', '.png', '.jpeg'))]
            for i in range(augment_needed):
                img_name = images[i % len(images)]
                img_path = os.path.join(class_dir, img_name)
                img = Image.open(img_path)

                # **Mandatory Horizontal Flip**
                flipped_img = ImageOps.mirror(img)

                # Apply additional augmentations after flipping
                augmented = augmentation_pipeline(image=np.array(flipped_img))['image']

                # Save augmented image
                augmented_img = Image.fromarray(augmented)
                augmented_img.save(os.path.join(output_class_dir, f"aug_{i}_{img_name}"))
                log_and_print(f"[Augmented] {img_name} -> Saved augmented image with flip in '{output_class_dir}'")
        else:
            log_and_print(f"No augmentation needed for class '{class_name}'.")

# Compress final dataset
def create_zip(output_dir, zip_path):
    log_and_print(f"Creating ZIP file at {zip_path}...")
    with ZipFile(zip_path, 'w') as zipf:
        for root, _, files in os.walk(output_dir):
            for file in files:
                file_path = os.path.join(root, file)
                arcname = os.path.relpath(file_path, output_dir)
                zipf.write(file_path, arcname)
    log_and_print(f"ZIP file created: {zip_path}")

# Main workflow
def main():
    INPUT_DIR = "/kaggle/input/cropped-dataset-clean/cropped_dataset (())"
    OUTPUT_DIR = "/kaggle/working/final_balanced_dataset"
    ZIP_FILE = "/kaggle/working/final_balanced_dataset.zip"

    # Train dataset
    log_and_print("Analyzing Train Dataset...")
    train_counts, train_max_size = analyze_dataset(os.path.join(INPUT_DIR, "Train"))
    log_and_print(f"Train class counts: {train_counts}, Max size: {train_max_size}")

    log_and_print("Copying Train Dataset...")
    copy_images(os.path.join(INPUT_DIR, "Train"), os.path.join(OUTPUT_DIR, "Train"))

    log_and_print("Augmenting Train Dataset...")
    augment_images(os.path.join(OUTPUT_DIR, "Train"), os.path.join(OUTPUT_DIR, "Train"), train_max_size, train_counts)

    # Test dataset
    log_and_print("Analyzing Test Dataset...")
    test_counts, test_max_size = analyze_dataset(os.path.join(INPUT_DIR, "Test"))
    log_and_print(f"Test class counts: {test_counts}, Max size: {test_max_size}")

    log_and_print("Copying Test Dataset...")
    copy_images(os.path.join(INPUT_DIR, "Test"), os.path.join(OUTPUT_DIR, "Test"))

    log_and_print("Augmenting Test Dataset...")
    augment_images(os.path.join(OUTPUT_DIR, "Test"), os.path.join(OUTPUT_DIR, "Test"), test_max_size, test_counts)

    # Create ZIP file
    log_and_print("Zipping final dataset...")
    create_zip(OUTPUT_DIR, ZIP_FILE)

    log_and_print("Dataset preparation complete.")

if __name__ == "__main__":
    main()


Analyzing Train Dataset...
Train class counts: {'surprise': 1088, 'fear': 1559, 'neutral': 1846, 'sad': 1599, 'happy': 2330, 'anger': 1469}, Max size: 2330
Copying Train Dataset...
Copied 1088 images for class 'surprise'.
Copied 1559 images for class 'fear'.
Copied 1846 images for class 'neutral'.
Copied 1599 images for class 'sad'.
Copied 2330 images for class 'happy'.
Copied 1469 images for class 'anger'.
Augmenting Train Dataset...
[Augmented] image0026571.jpg -> Saved augmented image with flip in '/kaggle/working/final_balanced_dataset/Train/surprise'
[Augmented] image0027997.jpg -> Saved augmented image with flip in '/kaggle/working/final_balanced_dataset/Train/surprise'
[Augmented] image0031205.jpg -> Saved augmented image with flip in '/kaggle/working/final_balanced_dataset/Train/surprise'
[Augmented] image0010019.jpg -> Saved augmented image with flip in '/kaggle/working/final_balanced_dataset/Train/surprise'
[Augmented] ffhq_633.png -> Saved augmented image with flip in '/kagg