In [1]:
from __future__ import annotations
from pathlib import Path
import random, math
import pandas as pd
from PIL import Image
import torchvision.transforms.functional as F
from torchvision.transforms import InterpolationMode as IM

In [2]:
SPLITS_DIR = Path(r"Data/data_emotions/splits")       # folder with train.csv
TRAIN_CSV  = SPLITS_DIR / "train.csv"

# Where to save augmented images + CSVs
OUT_DIR    = Path(r"./data_emotions/augmented")
AUG_IMG_DIR = OUT_DIR / "train_images_aug"
AUG_IMG_DIR.mkdir(parents=True, exist_ok=True)

In [3]:
RANDOM_SEED = 42
# Brightness/Contrast factors for (darker, original, brighter)
BC_OPTIONS = {
    "dark":   (0.8, 0.8),
    "orig":   (1.0, 1.0),
    "bright": (1.2, 1.2),
}
ROTATIONS = [-15, 0, 15]          # degrees
FLIPS     = [False, True]         # original, flipped
CROPS_PER_COMBO = 4               # random crops per (rotation, flip, brightness/contrast)
# =========================

random.seed(RANDOM_SEED)

In [4]:
def center_crop_to(img: Image.Image, target_w: int, target_h: int) -> Image.Image:
    """Center-crop (or pad via resize+crop) to target size."""
    w, h = img.size
    # If rotation expand=True, the image got bigger; center-crop back
    left = max((w - target_w) // 2, 0)
    top  = max((h - target_h) // 2, 0)
    right = left + min(target_w, w)
    bottom = top + min(target_h, h)
    img = img.crop((left, top, right, bottom))
    if img.size != (target_w, target_h):
        img = img.resize((target_w, target_h), Image.BILINEAR)
    return img

def random_crop_resized(img: Image.Image, out_w: int, out_h: int) -> Image.Image:
    """Random crop a region (80–100% of min side) and resize back to (out_w, out_h)."""
    w, h = img.size
    min_side = min(w, h)
    # choose crop size between 80% and 100% of min side
    side = int(random.uniform(0.80, 1.00) * min_side)
    side = max(1, min(side, min_side))
    # random top-left ensuring inside bounds
    if w == side:
        left = 0
    else:
        left = random.randint(0, w - side)
    if h == side:
        top = 0
    else:
        top = random.randint(0, h - side)
    crop = img.crop((left, top, left + side, top + side))
    return crop.resize((out_w, out_h), Image.BILINEAR)

def apply_bc(img: Image.Image, b_factor: float, c_factor: float) -> Image.Image:
    # torchvision F works on tensors, so use PIL adjustments here:
    # We'll convert via torchvision functional which supports PIL directly.
    img = F.adjust_brightness(img, b_factor)
    img = F.adjust_contrast(img, c_factor)
    return img


In [5]:
def main():
    df = pd.read_csv(TRAIN_CSV)
    assert {"resolved_path","image","label","label_norm","label_id"}.issubset(df.columns), \
        "train.csv must have columns: resolved_path,image,label,label_norm,label_id"

    aug_rows = []  # rows for train_aug.csv

    for idx, row in df.iterrows():
        src_path = Path(row["resolved_path"])
        try:
            img = Image.open(src_path).convert("RGB")
        except Exception as e:
            print(f"[WARN] Skipping unreadable image: {src_path} ({e})")
            continue

        orig_w, orig_h = img.size
        stem, ext = src_path.stem, src_path.suffix.lower() or ".jpg"

        for deg in ROTATIONS:
            # rotate with expand to avoid corner cut, then center-crop to original size
            rotated = F.rotate(img, angle=deg, interpolation=IM.BILINEAR, expand=True, fill=0)
            rotated = center_crop_to(rotated, orig_w, orig_h)

            for flip in FLIPS:
                x = F.hflip(rotated) if flip else rotated

                for bc_name, (b_fac, c_fac) in BC_OPTIONS.items():
                    x_bc = apply_bc(x, b_fac, c_fac)

                    for k in range(CROPS_PER_COMBO):
                        x_final = random_crop_resized(x_bc, orig_w, orig_h)

                        # build unique filename
                        flip_tag = "hf" if flip else "orig"
                        fname = f"{stem}_r{deg}_{flip_tag}_bc{bc_name}_c{k}{ext}"
                        out_path = AUG_IMG_DIR / fname
                        # avoid collisions
                        i = 1
                        while out_path.exists():
                            out_path = AUG_IMG_DIR / f"{stem}_r{deg}_{flip_tag}_bc{bc_name}_c{k}_{i}{ext}"
                            i += 1

                        x_final.save(out_path, quality=95)

                        aug_rows.append({
                            "resolved_path": str(out_path.resolve()),
                            "image": out_path.name,
                            "label": row["label"],
                            "label_norm": row["label_norm"],
                            "label_id": int(row["label_id"]),
                        })

        if (idx + 1) % 10 == 0:
            print(f"Processed {idx+1}/{len(df)} images...")

    # Save CSVs
    OUT_DIR.mkdir(parents=True, exist_ok=True)
    aug_df = pd.DataFrame(aug_rows, columns=["resolved_path","image","label","label_norm","label_id"])
    aug_df.to_csv(OUT_DIR / "train_aug.csv", index=False, encoding="utf-8")

    # Combined (original + augmented)
    combined = pd.concat([df[["resolved_path","image","label","label_norm","label_id"]]], ignore_index=True)
    combined = pd.concat([combined, aug_df], ignore_index=True)
    combined.to_csv(OUT_DIR / "train_with_aug.csv", index=False, encoding="utf-8")

    print(f"\nAugmented images saved to: {AUG_IMG_DIR}")
    print(f"Augmented CSV:           {OUT_DIR/'train_aug.csv'}  ({len(aug_df)} rows)")
    print(f"Combined CSV:            {OUT_DIR/'train_with_aug.csv'}  ({len(combined)} rows)")

if __name__ == "__main__":
    main()

Processed 10/79 images...
Processed 20/79 images...
Processed 30/79 images...
Processed 40/79 images...
Processed 50/79 images...
Processed 60/79 images...
Processed 70/79 images...

Augmented images saved to: data_emotions\augmented\train_images_aug
Augmented CSV:           data_emotions\augmented\train_aug.csv  (5688 rows)
Combined CSV:            data_emotions\augmented\train_with_aug.csv  (5767 rows)
