# Audubon Bird Plates - Handoff + Contract Assertions

Use this notebook as the entrypoint once a dataset has been scaffolded (i.e., `plates_structured/`, schemas, checksums, ledger).

Dataset root resolution:
- Colab: mounts Drive and searches under `/content/drive/MyDrive/burning-world-series/`
- Local: searches upward from the current working directory for a folder starting with `audubon-bird-plates`
- Override: set `BWS_DATASET_ROOT` to an explicit path


## Handoff: load project context (read-only)

Purpose:
- Resolve dataset root
- Expose canonical paths
- Perform light sanity checks only

This cell must:
- Not write files
- Not validate schemas deeply
- Not compute checksums


In [None]:
from pathlib import Path
import json
import os

def in_colab() -> bool:
    try:
        import google.colab
        return True
    except Exception:
        return False

def find_dataset_root(start: Path) -> Path:
    override = os.environ.get("BWS_DATASET_ROOT")
    if override:
        p = Path(override).expanduser()
        if p.exists():
            return p
        raise RuntimeError(f"BWS_DATASET_ROOT does not exist: {p}")

    def is_dataset_dir(p: Path) -> bool:
        if not p.is_dir():
            return False
        if not p.name.startswith("audubon-bird-plates"):
            return False
        if not (p / "data.json").exists():
            return False
        return True

    if is_dataset_dir(start):
        return start

    for base in [start] + list(start.parents):
        try:
            children = list(base.iterdir())
        except Exception:
            continue
        for child in sorted(children):
            if is_dataset_dir(child):
                return child

    raise RuntimeError("Could not find dataset root; set BWS_DATASET_ROOT")

if in_colab():
    from google.colab import drive
    drive.mount("/content/drive", force_remount=False)
    project_root = Path("/content/drive/MyDrive/burning-world-series")
    assert project_root.exists(), "Project root missing"
    DATASET_ROOT = find_dataset_root(project_root)
else:
    DATASET_ROOT = find_dataset_root(Path.cwd())

PLATES_ORIGINAL = DATASET_ROOT / "plates"
PLATES_STRUCTURED = DATASET_ROOT / "plates_structured"
SCHEMA_DIR = DATASET_ROOT / "schemas"
LEDGER_DIR = DATASET_ROOT / "ledger"
DATA_JSON = DATASET_ROOT / "data.json"
README_MD = DATASET_ROOT / "README.md"

assert PLATES_STRUCTURED.exists(), "plates_structured missing (run audubon_bird_plates_setup.ipynb first)"
assert SCHEMA_DIR.exists(), "schemas missing"
assert LEDGER_DIR.exists(), "ledger missing"
assert DATA_JSON.exists(), "data.json missing"

plate_dirs = [p for p in PLATES_STRUCTURED.iterdir() if p.is_dir()]
assert len(plate_dirs) == 435, "Unexpected plate count"

def get_plate_dir(plate_number: int) -> Path:
    return PLATES_STRUCTURED / f"plate-{str(plate_number).zfill(3)}"

def load_plate_manifest(plate_number: int) -> dict:
    plate_dir = get_plate_dir(plate_number)
    return json.loads((plate_dir / "manifest.json").read_text(encoding="utf-8"))

print("Project loaded successfully.")
print(f"Dataset root      : {DATASET_ROOT}")
print(f"Structured plates : {len(plate_dirs)}")
print("Ready for runs, embeddings, or analysis.")


## Structure & naming contract assertion (read-only)

Purpose:
- Re-assert filesystem law
- Confirm naming conventions
- Abort early if drift has occurred

This cell reads only and raises on any deviation.


In [None]:
import json
import re

print("\n==============================")
print("STRUCTURE & NAMING ASSERTION")
print("==============================\n")

assert DATASET_ROOT.exists(), "DATASET_ROOT missing"
assert PLATES_STRUCTURED.exists(), "plates_structured missing"
assert SCHEMA_DIR.exists(), "schemas missing"
assert LEDGER_DIR.exists(), "ledger missing"

print("[1] Canonical roots present (OK)")

plate_dirs = sorted(p for p in PLATES_STRUCTURED.iterdir() if p.is_dir())
assert len(plate_dirs) == 435, "Plate count must be exactly 435"

plate_name_re = re.compile(r"^plate-\d{3}$")
bad_names = [p.name for p in plate_dirs if not plate_name_re.match(p.name)]
assert not bad_names, f"Invalid plate directory names: {bad_names[:5]}"

print("[2] Plate directory naming OK (OK)")

required_subdirs = {"source", "runs", "viz", "cache"}
violations = []

for plate_dir in plate_dirs:
    contents = {p.name for p in plate_dir.iterdir()}
    missing = required_subdirs - contents
    if missing:
        violations.append(f"{plate_dir.name}: missing {missing}")

    source_dir = plate_dir / "source"
    if source_dir.exists():
        files = [p for p in source_dir.iterdir() if p.is_file()]
        if len(files) != 1:
            violations.append(f"{plate_dir.name}: source/ has {len(files)} files")

    for fname in ("manifest.json", "source.sha256"):
        if not (plate_dir / fname).exists():
            violations.append(f"{plate_dir.name}: missing {fname}")

assert not violations, f"Plate structure violations:\n{violations[:5]}"

print("[3] Plate internal structure OK (OK)")

for plate_dir in plate_dirs:
    manifest = json.loads((plate_dir / "manifest.json").read_text(encoding="utf-8"))
    assert manifest["plate_id"] == plate_dir.name, f"{plate_dir.name}: plate_id mismatch"
    src = plate_dir / manifest["source_image"]
    assert src.exists(), f"{plate_dir.name}: source_image missing"

print("[4] Manifest <-> filesystem consistency OK (OK)")

run_re = re.compile(r"^run-\d{8}-\d{6}-[a-f0-9]{8}$")

for plate_dir in plate_dirs:
    runs_dir = plate_dir / "runs"
    if not runs_dir.exists():
        continue
    for run in runs_dir.iterdir():
        if not run.is_dir():
            continue
        assert run_re.match(run.name), f"{plate_dir.name}: invalid run dir {run.name}"
        assert (run / "metrics.json").exists(), f"{run}: missing metrics.json"

print("[5] Run directory naming (if any) OK (OK)")

expected_ledgers = {
    "plates.parquet",
    "runs.parquet",
    "embeddings.parquet",
    "segments.parquet",
}

found_ledgers = {p.name for p in LEDGER_DIR.iterdir() if p.is_file()}
assert expected_ledgers.issubset(found_ledgers), (
    f"Ledger files missing: {expected_ledgers - found_ledgers}"
)

print("[6] Ledger scaffolding present (OK)")

print("\n==============================")
print("CONTRACT STATUS: ASSERTED")
print("==============================")
print(
    "\n"
    "Filesystem, naming, and structural invariants\n"
    "are intact.\n\n"
    "It is now safe to:\n"
    "- start new runs\n"
    "- generate embeddings\n"
    "- perform segmentation\n"
    "- write derived artifacts\n"
)


## Dependencies (Colab)


In [None]:
!pip install pillow numpy matplotlib tqdm


## Cleanup: remove baseline image metadata outputs

Deletes only `input_image.json` under each `plates_structured/<plate_id>/`.


In [None]:
from pathlib import Path

plates_structured = DATASET_ROOT / "plates_structured"

removed = 0

for plate_dir in plates_structured.iterdir():
    if not plate_dir.is_dir():
        continue
    target = plate_dir / "input_image.json"
    if target.exists():
        target.unlink()
        removed += 1

print("Cleanup complete.")
print(f"Removed input_image.json files: {removed}")
print("Dataset restored to pre-baseline state.")


## Repo-wide exploratory (read-only, CPU-only)

Header-only inspection of source images; no writes and no full pixel loads.


In [None]:
import json
from collections import Counter
from PIL import Image

Image.MAX_IMAGE_PIXELS = None

ROOT = DATASET_ROOT
PLATES_STRUCTURED = ROOT / "plates_structured"
LEDGER_DIR = ROOT / "ledger"
SCHEMA_DIR = ROOT / "schemas"

print("\n==============================")
print("REPO EXPLORATORY REPORT")
print("==============================\n")

top_dirs = sorted([p.name for p in ROOT.iterdir() if p.is_dir()])
top_files = sorted([p.name for p in ROOT.iterdir() if p.is_file()])

print("[1] Top-level directories:")
for d in top_dirs:
    print("  -", d)

print("\n[1] Top-level files:")
for f in top_files:
    print("  -", f)

plate_dirs = sorted(
    p for p in PLATES_STRUCTURED.iterdir()
    if p.is_dir() and p.name.startswith("plate-")
)

print(f"\n[2] Plates found: {len(plate_dirs)}")

missing_manifest = []
missing_source = []
run_counts = []

for p in plate_dirs:
    if not (p / "manifest.json").exists():
        missing_manifest.append(p.name)

    src_dir = p / "source"
    if not src_dir.exists():
        missing_source.append(p.name)
    else:
        files = list(src_dir.iterdir())
        if len(files) != 1:
            missing_source.append(p.name)

    runs_dir = p / "runs"
    if runs_dir.exists():
        run_counts.append(len([r for r in runs_dir.iterdir() if r.is_dir()]))
    else:
        run_counts.append(0)

print("    Missing manifest.json:", len(missing_manifest))
print("    Missing/invalid source:", len(missing_source))
print("    Plates with runs:", sum(1 for c in run_counts if c > 0))
print("    Total runs:", sum(run_counts))

print("\n[3] Manifest field coverage (sampled)")

key_counter = Counter()
sampled = 0

for p in plate_dirs[:50]:
    try:
        m = json.loads((p / "manifest.json").read_text(encoding="utf-8"))
        key_counter.update(m.keys())
        sampled += 1
    except Exception:
        pass

for k, v in key_counter.most_common():
    print(f"  {k:20s} -> present in {v}/{sampled}")

print("\n[4] Image header stats (source images only)")

sizes = []
huge = []

for p in plate_dirs:
    try:
        src = next((p / "source").iterdir())
        with Image.open(src) as img:
            w, h = img.size
        mp = (w * h) / 1_000_000
        sizes.append(mp)
        if mp > 90:
            huge.append((p.name, round(mp, 2)))
    except Exception:
        pass

if sizes:
    print(f"    Min megapixels: {round(min(sizes), 2)}")
    print(f"    Max megapixels: {round(max(sizes), 2)}")
    print(f"    Mean megapixels: {round(sum(sizes)/len(sizes), 2)}")
    print(f"    Images > 90 MP (PIL warning risk): {len(huge)}")

for name, mp in huge[:5]:
    print(f"      - {name}: {mp} MP")

print("\n[5] Ledger files")
if LEDGER_DIR.exists():
    for p in sorted(LEDGER_DIR.iterdir()):
        if p.is_file():
            print(f"  - {p.name:20s} {round(p.stat().st_size/1024, 1)} KB")
else:
    print("  Ledger directory missing")

print("\n[5] Schemas")
if SCHEMA_DIR.exists():
    for p in sorted(SCHEMA_DIR.iterdir()):
        print("  -", p.name)
else:
    print("  Schema directory missing")

print("\n==============================")
print("SUMMARY")
print("==============================")
print(
    f"\n"
    f"Plates                : {len(plate_dirs)}\n"
    f"Total runs            : {sum(run_counts)}\n"
    f"Images > 90 MP        : {len(huge)}\n"
    f"Manifests missing     : {len(missing_manifest)}\n"
    f"Source issues         : {len(missing_source)}\n"
    f"Schemas present       : {SCHEMA_DIR.exists()}\n"
    f"Ledgers present       : {LEDGER_DIR.exists()}\n"
)

print("Exploratory complete. No files written.")


## CPU Baseline (SageMaker-style: write-only output root)

This section is designed to run in Google Colab Pro (CPU) while matching the SageMaker mental model: treat the dataset as read-only input and write all new artifacts to a separate output root.

### Input criteria

- `INPUT_ROOT` points at a scaffolded dataset root containing `plates_structured/`, `schemas/`, and `data.json`.
- `plates_structured/` contains 435 `plate-###/` directories.
- Each `plate-###/` contains `manifest.json`, `source.sha256`, and `source/plate-###.jpg`.
- `schemas/plate.manifest.schema.json` and `schemas/run.manifest.schema.json` exist and are draft-2020-12 valid.

### Midpoint artifacts

- `OUTPUT_ROOT/schemas/cpu.baseline.schema.json` is written once if missing.
- `OUTPUT_ROOT/reports/<run_id>/report.json` summarizes shard parameters, counts, and failures.

### Outputs (append-only, per plate)

For each plate in the shard, the job writes:

- `OUTPUT_ROOT/plates_structured/<plate_id>/runs/<run_id>/metrics.json`
- `OUTPUT_ROOT/plates_structured/<plate_id>/runs/<run_id>/cpu_baseline.json`

### Constraints enforced

- The input dataset is never mutated.
- Outputs are run-scoped and append-only; no overwrites of prior runs.
- Each `cpu_baseline.json` validates against `cpu.baseline.schema.json` (when present).
- Each `metrics.json` validates against `run.manifest.schema.json`.

### CPU-only feature pack

- Container facts: bytes, extension, format, sha256, EXIF flag, ICC flag+hash, JPEG progressive/subsampling/quant hash where exposed
- Geometry: width, height, megapixels, aspect ratio, tiling lattice
- Pixel distribution: RGB histograms and luma histogram (256 bins)
- Summary stats: mean/std/min/max + clipped ratios per channel + luma
- Entropy per channel + luma
- Hashes: aHash, dHash, pHash
- Sharpness proxy: Laplacian variance on downsampled grayscale


In [None]:
import argparse
import hashlib
import json
import math
import os
from datetime import datetime, timezone
from pathlib import Path

import numpy as np
from jsonschema import Draft202012Validator
from PIL import Image
from tqdm import tqdm

Image.MAX_IMAGE_PIXELS = None

TILE_SIZE = 512
HIST_BINS = 256
LAPLACE_MAX_DIM = 1024
PHASH_BITS = 8
PHASH_FACTOR = 4

def utc_iso():
    return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")

def sha256_file(path: Path, chunk_size: int = 1024 * 1024) -> str:
    h = hashlib.sha256()
    with path.open("rb") as f:
        for chunk in iter(lambda: f.read(chunk_size), b""):
            h.update(chunk)
    return h.hexdigest()

def generate_run_id(models: list[str], note: str) -> str:
    stamp = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S")
    payload = "|".join(models) + "|" + note
    short = hashlib.sha1(payload.encode("utf-8")).hexdigest()[:8]
    return f"run-{stamp}-{short}"

def resize_fit(img: Image.Image, max_dim: int) -> Image.Image:
    w, h = img.size
    m = max(w, h)
    if m <= max_dim:
        return img
    s = max_dim / m
    return img.resize((max(1, int(round(w * s))), max(1, int(round(h * s)))), resample=Image.BILINEAR)

def laplacian_variance(gray_img: Image.Image) -> float:
    g = resize_fit(gray_img, LAPLACE_MAX_DIM)
    a = np.asarray(g, dtype=np.float32)
    ap = np.pad(a, ((1, 1), (1, 1)), mode="edge")
    out = ap[:-2, 1:-1] + ap[2:, 1:-1] + ap[1:-1, :-2] + ap[1:-1, 2:] - 4.0 * ap[1:-1, 1:-1]
    return float(np.var(out))

def hist_stats(hist: list[int]) -> dict:
    total = int(sum(hist))
    if total <= 0:
        return {"total": 0, "min": None, "max": None, "mean": None, "std": None, "clipped_low_ratio": None, "clipped_high_ratio": None}
    mn = next((i for i, c in enumerate(hist) if c), 0)
    mx = next((255 - i for i, c in enumerate(reversed(hist)) if c), 255)
    mean = sum(i * c for i, c in enumerate(hist)) / total
    s2 = sum((i - mean) ** 2 * c for i, c in enumerate(hist)) / total
    return {"total": total, "min": int(mn), "max": int(mx), "mean": float(mean), "std": float(math.sqrt(s2)), "clipped_low_ratio": float(hist[0] / total), "clipped_high_ratio": float(hist[-1] / total)}

def entropy(hist: list[int]) -> float | None:
    total = float(sum(hist))
    if total <= 0:
        return None
    ent = 0.0
    for c in hist:
        if c:
            p = c / total
            ent -= p * math.log2(p)
    return float(ent)

def ahash(img: Image.Image, size: int = 8) -> str:
    g = img.convert("L").resize((size, size), Image.BILINEAR)
    a = np.asarray(g, dtype=np.float32)
    m = a.mean()
    bits = (a > m).astype(np.uint8).flatten()
    v = 0
    for b in bits:
        v = (v << 1) | int(b)
    return f"{v:0{(size * size) // 4}x}"

def dhash(img: Image.Image, size: int = 8) -> str:
    g = img.convert("L").resize((size + 1, size), Image.BILINEAR)
    a = np.asarray(g, dtype=np.float32)
    diff = (a[:, 1:] > a[:, :-1]).astype(np.uint8).flatten()
    v = 0
    for b in diff:
        v = (v << 1) | int(b)
    return f"{v:0{(size * size) // 4}x}"

def phash(img: Image.Image, hash_bits: int = PHASH_BITS, highfreq_factor: int = PHASH_FACTOR) -> str:
    n = hash_bits * highfreq_factor
    g = img.convert("L").resize((n, n), Image.BILINEAR)
    a = np.asarray(g, dtype=np.float32)
    idx = np.arange(n)
    k = idx.reshape(-1, 1)
    cos = np.cos(np.pi / n * (idx + 0.5) * k).astype(np.float32)
    d1 = cos @ a
    d2 = (cos @ d1.T).T
    d = d2[:hash_bits, :hash_bits].flatten()
    med = np.median(d[1:]) if d.size > 1 else np.median(d)
    bits = (d > med).astype(np.uint8)
    v = 0
    for b in bits:
        v = (v << 1) | int(b)
    return f"{v:0{(hash_bits * hash_bits) // 4}x}"

def exif_present(img) -> bool:
    try:
        ex = img.getexif()
        return ex is not None and len(ex) > 0
    except Exception:
        return False

def icc_hash(img):
    try:
        icc = img.info.get("icc_profile", None)
        if not icc:
            return False, None
        if isinstance(icc, str):
            icc = icc.encode("utf-8", errors="ignore")
        return True, hashlib.sha256(icc).hexdigest()
    except Exception:
        return False, None

def jpeg_progressive_flag(img):
    v = img.info.get("progressive", None)
    return None if v is None else bool(v)

def jpeg_subsampling_flag(img):
    v = img.info.get("subsampling", None)
    return None if v is None else str(v)

def quant_hash_from_pil(img):
    q = getattr(img, "quantization", None)
    if not q:
        return None
    try:
        payload = json.dumps(q, sort_keys=True)
        return hashlib.sha256(payload.encode("utf-8")).hexdigest()
    except Exception:
        return None

def write_cpu_schema_if_missing(output_root: Path):
    schema_dir = output_root / "schemas"
    schema_dir.mkdir(parents=True, exist_ok=True)
    cpu_path = schema_dir / "cpu.baseline.schema.json"
    if cpu_path.exists():
        return cpu_path
    schema = {
        "$schema": "https://json-schema.org/draft/2020-12/schema",
        "$id": "https://burning-world-series/schemas/cpu.baseline.schema.json",
        "title": "CPU Baseline Feature Pack",
        "type": "object",
        "required": ["plate_id", "run_id", "timestamp", "source_image", "source_file", "geometry", "tiling", "decode"],
        "properties": {
            "plate_id": {"type": "string", "pattern": "^plate-[0-9]{3}$"},
            "run_id": {"type": "string"},
            "timestamp": {"type": "string", "format": "date-time"},
            "source_image": {"type": "string"},
            "source_file": {"type": "object"},
            "geometry": {"type": "object"},
            "tiling": {"type": "object"},
            "decode": {"type": "object"},
            "pixel_stats": {"type": ["object", "null"]},
            "hashes": {"type": ["object", "null"]},
        },
        "additionalProperties": False,
    }
    cpu_path.write_text(json.dumps(schema, indent=2), encoding="utf-8")
    Draft202012Validator.check_schema(schema)
    return cpu_path

def load_validator(path: Path) -> Draft202012Validator:
    schema = json.loads(path.read_text(encoding="utf-8"))
    Draft202012Validator.check_schema(schema)
    return Draft202012Validator(schema)

def cpu_baseline_job(input_root: Path, output_root: Path, shard_index: int, shard_count: int, run_id: str | None, skip_if_present: bool):
    plates_root = input_root / "plates_structured"
    schemas_root = input_root / "schemas"
    if not plates_root.exists():
        raise RuntimeError(f"Missing plates_structured: {plates_root}")
    if not schemas_root.exists():
        raise RuntimeError(f"Missing schemas: {schemas_root}")

    plate_validator = load_validator(schemas_root / "plate.manifest.schema.json")
    run_validator = load_validator(schemas_root / "run.manifest.schema.json")

    cpu_schema_path = schemas_root / "cpu.baseline.schema.json"
    if cpu_schema_path.exists():
        cpu_validator = load_validator(cpu_schema_path)
    else:
        cpu_validator = load_validator(write_cpu_schema_if_missing(output_root))

    plates = sorted([p for p in plates_root.iterdir() if p.is_dir() and p.name.startswith("plate-")])
    if len(plates) != 435:
        raise RuntimeError(f"Unexpected plate count: {len(plates)}")

    selected = [p for i, p in enumerate(plates) if i % shard_count == shard_index]
    models = ["cpu-baseline-v1"]
    note = "cpu baseline: container geometry hist entropy hashes laplacian"
    rid = run_id or generate_run_id(models, note)

    report = {
        "run_id": rid,
        "timestamp": utc_iso(),
        "input_root": str(input_root),
        "output_root": str(output_root),
        "shard_index": shard_index,
        "shard_count": shard_count,
        "plates_total": len(plates),
        "plates_selected": len(selected),
        "plates_processed": 0,
        "plates_skipped": 0,
        "decode_failures": 0,
        "schema_failures": 0,
        "errors_sample": [],
    }

    for plate_dir in tqdm(selected, desc="plates"):
        manifest = json.loads((plate_dir / "manifest.json").read_text(encoding="utf-8"))
        if list(plate_validator.iter_errors(manifest)):
            report["schema_failures"] += 1
            if len(report["errors_sample"]) < 10:
                report["errors_sample"].append(f"{plate_dir.name}: plate manifest schema failure")
            continue

        src_path = plate_dir / manifest["source_image"]
        if not src_path.exists():
            report["decode_failures"] += 1
            if len(report["errors_sample"]) < 10:
                report["errors_sample"].append(f"{plate_dir.name}: missing source image")
            continue

        out_plate_dir = output_root / "plates_structured" / plate_dir.name
        out_run_dir = out_plate_dir / "runs" / rid
        out_run_dir.mkdir(parents=True, exist_ok=True)

        out_cpu = out_run_dir / "cpu_baseline.json"
        out_metrics = out_run_dir / "metrics.json"

        if skip_if_present and out_cpu.exists() and out_metrics.exists():
            report["plates_skipped"] += 1
            continue

        run_manifest = {
            "run_id": rid,
            "plate_id": plate_dir.name,
            "timestamp": utc_iso(),
            "models": models,
            "outputs": ["cpu_baseline.json"],
            "notes": note,
        }

        if list(run_validator.iter_errors(run_manifest)):
            report["schema_failures"] += 1
            if len(report["errors_sample"]) < 10:
                report["errors_sample"].append(f"{plate_dir.name}: run manifest schema failure")
            continue

        baseline = {
            "plate_id": manifest["plate_id"],
            "run_id": rid,
            "timestamp": utc_iso(),
            "source_image": manifest["source_image"],
            "source_file": {
                "bytes": int(src_path.stat().st_size),
                "extension": src_path.suffix.lower().lstrip("."),
                "format": None,
                "sha256": sha256_file(src_path),
                "exif_present": False,
                "icc_present": False,
                "icc_hash": None,
                "jpeg_is_progressive": None,
                "jpeg_subsampling": None,
                "jpeg_quant_hash": None,
            },
            "geometry": {"width_px": None, "height_px": None, "megapixels": None, "aspect_ratio": None, "mode": None},
            "tiling": {"tile_size_px": TILE_SIZE, "tiles_x": 1, "tiles_y": 1, "total_tiles": 1},
            "decode": {"ok": False, "error": None},
            "pixel_stats": None,
            "hashes": None,
        }

        try:
            with Image.open(src_path) as img:
                baseline["source_file"]["format"] = img.format
                baseline["source_file"]["exif_present"] = exif_present(img)
                ip, ih = icc_hash(img)
                baseline["source_file"]["icc_present"] = ip
                baseline["source_file"]["icc_hash"] = ih
                if (img.format or "").upper() == "JPEG":
                    baseline["source_file"]["jpeg_is_progressive"] = jpeg_progressive_flag(img)
                    baseline["source_file"]["jpeg_subsampling"] = jpeg_subsampling_flag(img)
                    baseline["source_file"]["jpeg_quant_hash"] = quant_hash_from_pil(img)

                w, h = img.size
                baseline["geometry"] = {"width_px": int(w), "height_px": int(h), "megapixels": float(round((w * h) / 1_000_000, 3)), "aspect_ratio": float(w / h), "mode": img.mode}
                tiles_x = (w + TILE_SIZE - 1) // TILE_SIZE
                tiles_y = (h + TILE_SIZE - 1) // TILE_SIZE
                baseline["tiling"] = {"tile_size_px": TILE_SIZE, "tiles_x": int(tiles_x), "tiles_y": int(tiles_y), "total_tiles": int(tiles_x * tiles_y)}

                rgb = img.convert("RGB")
                raw = rgb.histogram()
                Rh, Gh, Bh = raw[0:256], raw[256:512], raw[512:768]
                L = rgb.convert("L")
                Lh = L.histogram()

                baseline["pixel_stats"] = {
                    "colorspace": "as-decoded",
                    "rgb_histograms": {"bins": HIST_BINS, "R": Rh, "G": Gh, "B": Bh},
                    "l_histogram": {"bins": HIST_BINS, "L": Lh},
                    "rgb_stats": {"R": hist_stats(Rh), "G": hist_stats(Gh), "B": hist_stats(Bh)},
                    "luma_stats": hist_stats(Lh),
                    "entropy": {"R": entropy(Rh), "G": entropy(Gh), "B": entropy(Bh), "L": entropy(Lh)},
                    "laplacian_var": laplacian_variance(L),
                }
                baseline["hashes"] = {"ahash": ahash(rgb), "dhash": dhash(rgb), "phash": phash(rgb)}
                baseline["decode"] = {"ok": True, "error": None}

        except Exception as e:
            report["decode_failures"] += 1
            baseline["decode"] = {"ok": False, "error": f"{type(e).__name__}: {str(e)[:300]}"}

        errs = list(cpu_validator.iter_errors(baseline))
        if errs:
            report["schema_failures"] += 1
            if len(report["errors_sample"]) < 10:
                report["errors_sample"].append(f"{plate_dir.name}: cpu baseline schema failure")
            continue

        out_metrics.write_text(json.dumps(run_manifest, indent=2), encoding="utf-8")
        out_cpu.write_text(json.dumps(baseline, indent=2), encoding="utf-8")
        report["plates_processed"] += 1

    report_dir = output_root / "reports" / rid
    report_dir.mkdir(parents=True, exist_ok=True)
    (report_dir / "report.json").write_text(json.dumps(report, indent=2), encoding="utf-8")
    return report

INPUT_ROOT = Path(os.environ.get("BWS_INPUT_ROOT", str(DATASET_ROOT))).expanduser().resolve()
OUTPUT_ROOT = Path(os.environ.get("BWS_OUTPUT_ROOT", str(DATASET_ROOT / "_RUN_OUTPUT"))).expanduser().resolve()
SHARD_INDEX = int(os.environ.get("BWS_SHARD_INDEX", "0"))
SHARD_COUNT = int(os.environ.get("BWS_SHARD_COUNT", "1"))
RUN_ID = os.environ.get("BWS_RUN_ID", None)
SKIP_IF_PRESENT = os.environ.get("BWS_SKIP_IF_PRESENT", "1") == "1"

print("INPUT_ROOT:", INPUT_ROOT)
print("OUTPUT_ROOT:", OUTPUT_ROOT)
print("SHARD_INDEX/COUNT:", SHARD_INDEX, SHARD_COUNT)

report = cpu_baseline_job(
    input_root=INPUT_ROOT,
    output_root=OUTPUT_ROOT,
    shard_index=SHARD_INDEX,
    shard_count=SHARD_COUNT,
    run_id=RUN_ID,
    skip_if_present=SKIP_IF_PRESENT,
)
print(json.dumps(report, indent=2))
