SPLIT THEM TO 3 FOLDERS

In [1]:
from pathlib import Path
import shutil
import json
import csv
import random
import re
from datetime import datetime
from collections import defaultdict

# =========================
# CONFIG
# =========================
INTERIM_DIR = Path("../../data/interim/Stage0/color_clahe_1500x1000_noborder_aug")  # source (contains *_orig.png and *_augXX.png)
PROCESSED_DIR = Path("../../data/processed/Stage0")

IMAGES_DIR = PROCESSED_DIR / "images"
TRAIN_DIR = IMAGES_DIR / "train"
VAL_DIR   = IMAGES_DIR / "val"
TEST_DIR  = IMAGES_DIR / "test"

TRAIN_RATIO = 0.85
VAL_RATIO   = 0.10
TEST_RATIO  = 0.05
SEED = 42

IMAGE_EXTS = {".png", ".jpg", ".jpeg", ".bmp", ".tiff"}

# =========================
# SETUP
# =========================
random.seed(SEED)

TRAIN_DIR.mkdir(parents=True, exist_ok=True)
VAL_DIR.mkdir(parents=True, exist_ok=True)
TEST_DIR.mkdir(parents=True, exist_ok=True)

image_paths = sorted([p for p in INTERIM_DIR.iterdir() if p.suffix.lower() in IMAGE_EXTS])
assert len(image_paths) > 0, "No images found in interim directory"
print(f"Found {len(image_paths)} images")

# =========================
# GROUPING (prevents leakage)
# =========================
# Group all siblings together:
#   base_orig.png, base_aug00.png, base_aug01.png ... -> same group "base"
def group_id_from_filename(filename: str) -> str:
    stem = Path(filename).stem
    # remove trailing _orig or _augXX
    stem = re.sub(r"_(orig|aug\d{2})$", "", stem)
    return stem

groups = defaultdict(list)
for p in image_paths:
    gid = group_id_from_filename(p.name)
    groups[gid].append(p)

group_ids = list(groups.keys())
random.shuffle(group_ids)

n_groups = len(group_ids)
n_train_g = int(n_groups * TRAIN_RATIO)
n_val_g   = int(n_groups * VAL_RATIO)

train_gids = set(group_ids[:n_train_g])
val_gids   = set(group_ids[n_train_g:n_train_g + n_val_g])
test_gids  = set(group_ids[n_train_g + n_val_g:])

train_imgs = [p for gid in train_gids for p in groups[gid]]
val_imgs   = [p for gid in val_gids   for p in groups[gid]]
test_imgs  = [p for gid in test_gids  for p in groups[gid]]

print(f"Groups total: {n_groups}")
print(f"Group split -> train/val/test: {len(train_gids)}/{len(val_gids)}/{len(test_gids)}")
print(f"Image split -> train/val/test: {len(train_imgs)}/{len(val_imgs)}/{len(test_imgs)}")

# =========================
# COPY IMAGES
# =========================
def copy_split(imgs, split_dir, split_name):
    relpaths = []
    for p in imgs:
        dst = split_dir / p.name
        shutil.copy2(p, dst)
        relpaths.append(f"images/{split_name}/{p.name}")
    return relpaths

train_rel = copy_split(train_imgs, TRAIN_DIR, "train")
val_rel   = copy_split(val_imgs,   VAL_DIR,   "val")
test_rel  = copy_split(test_imgs,  TEST_DIR,  "test")

# =========================
# SAVE splits.json
# =========================
splits = {"train": train_rel, "val": val_rel, "test": test_rel}

with open(PROCESSED_DIR / "splits.json", "w") as f:
    json.dump(
        {
            "created_at": datetime.now().isoformat(timespec="seconds"),
            "source": str(INTERIM_DIR),
            "seed": SEED,
            "ratios": {"train": TRAIN_RATIO, "val": VAL_RATIO, "test": TEST_RATIO},
            "counts": {k: len(v) for k, v in splits.items()},
            "grouping": "base_name (strip _orig/_augXX)",
            "n_groups": n_groups,
            "splits": splits,
        },
        f,
        indent=2,
    )

# =========================
# INDEX.CSV
# =========================
def extract_timestamp(filename: str) -> str:
    """
    Matches: YYYY-MM-DD_HH-MM-SS-sss.png
    (Your augmented files may NOT have this; it's okay if blank.)
    """
    m = re.search(r"(\d{4}-\d{2}-\d{2})_(\d{2}-\d{2}-\d{2})-(\d{3})", filename)
    if not m:
        return ""
    return f"{m.group(1)}T{m.group(2).replace('-', ':')}.{m.group(3)}"

with open(PROCESSED_DIR / "index.csv", "w", newline="") as f:
    writer = csv.DictWriter(
        f,
        fieldnames=[
            "filepath",
            "split",
            "stage",
            "scan_session_id",
            "group_id",
            "filename",
            "scan_timestamp",
            "source_interim_path",
        ],
    )
    writer.writeheader()

    for split_name, imgs in [("train", train_imgs), ("val", val_imgs), ("test", test_imgs)]:
        for p in imgs:
            gid = group_id_from_filename(p.name)
            writer.writerow(
                {
                    "filepath": f"images/{split_name}/{p.name}",
                    "split": split_name,
                    "stage": "Stage0",
                    "scan_session_id": INTERIM_DIR.name,
                    "group_id": gid,
                    "filename": p.name,
                    "scan_timestamp": extract_timestamp(p.name),
                    "source_interim_path": str(p),
                }
            )

print("✅ Dataset frozen with GROUP split (aug/orig siblings stay together)")

Found 1035 images
Groups total: 1035
Group split -> train/val/test: 879/103/53
Image split -> train/val/test: 879/103/53
✅ Dataset frozen with GROUP split (aug/orig siblings stay together)


In [2]:
from pathlib import Path
import json

IMG_DIR = Path("../../data/processed/Stage0/images/train")
OUT_JSON = Path("../../data/labels/Stage0/train.json")

OUT_JSON.parent.mkdir(parents=True, exist_ok=True)

annotations = []

for p in sorted(IMG_DIR.iterdir()):
    if p.suffix.lower() in {".png", ".jpg", ".jpeg"}:
        annotations.append({
            "image": p.name,
             "no_contraband": 1,
            "isolated_items": 0,
            "empty": 1

        })

with open(OUT_JSON, "w") as f:
    json.dump(annotations, f, indent=2)

print(f"✅ JSON generated with {len(annotations)} samples → {OUT_JSON}")


✅ JSON generated with 975 samples → ../../data/labels/Stage0/train.json


In [3]:
from pathlib import Path
import json

IMG_DIR = Path("../../data/processed/Stage0/images/val")
OUT_JSON = Path("../../data/labels/Stage0/val.json")

OUT_JSON.parent.mkdir(parents=True, exist_ok=True)

annotations = []

for p in sorted(IMG_DIR.iterdir()):
    if p.suffix.lower() in {".png", ".jpg", ".jpeg"}:
        annotations.append({
            "image": p.name,
             "no_contraband": 1,
            "isolated_items": 0,
            "empty": 1
        })

with open(OUT_JSON, "w") as f:
    json.dump(annotations, f, indent=2)

print(f"✅ JSON generated with {len(annotations)} samples → {OUT_JSON}")


✅ JSON generated with 136 samples → ../../data/labels/Stage0/val.json
