# CoralScapes

In [14]:
from datasets import load_dataset

# Login using e.g. `huggingface-cli login` to access this dataset
ds = load_dataset("EPFL-ECEO/coralscapes")
#train_dataset = load_dataset("EPFL-ECEO/coralscapes", split="train")
#valid_dataset = load_dataset("EPFL-ECEO/coralscapes", split="validation")
#test_dataset  = load_dataset("EPFL-ECEO/coralscapes", split="test")

Generating train split:   0%|          | 0/1517 [00:00<?, ? examples/s]

Generating validation split:   0%|          | 0/166 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/392 [00:00<?, ? examples/s]

In [20]:
from datasets import load_dataset
from PIL import Image
import numpy as np
from io import BytesIO
from pathlib import Path
import pyarrow as pa
import pyarrow.parquet as pq
from tqdm import tqdm
import os

# ----- class id sets -----
HEALTHY_IDS = {6, 17, 22, 25, 28, 31, 34, 36, 27}
UNHEALTHY_IDS = {3, 4, 16, 23, 19, 20, 32, 33, 37}
RED   = np.array([255, 0,   0], dtype=np.uint8)
BLUE  = np.array([0,   0, 255], dtype=np.uint8)
BLACK = np.array([0,   0,   0], dtype=np.uint8)

# ----- helpers -----
def pil_to_png_bytes(img: Image.Image) -> bytes:
    buf = BytesIO(); img.save(buf, format="PNG"); return buf.getvalue()

def label_pil_to_id_array(lbl: Image.Image) -> np.ndarray:
    if lbl.mode in ("P", "L", "I"):
        return np.array(lbl).astype(np.int32)
    raise ValueError(f"Label mode {lbl.mode} is not an ID mask.")

def ids_to_health_rgb(id_mask: np.ndarray) -> Image.Image:
    out = np.zeros_like(id_mask, dtype=np.uint8)
    out[np.isin(id_mask, list(HEALTHY_IDS))] = 1
    out[np.isin(id_mask, list(UNHEALTHY_IDS))] = 2
    h, w = out.shape
    rgb = np.zeros((h, w, 3), dtype=np.uint8)
    rgb[out == 1] = RED
    rgb[out == 2] = BLUE
    return Image.fromarray(rgb, mode="RGB")

def table_size_bytes(table: pa.Table, compression: str = "zstd") -> int:
    sink = pa.BufferOutputStream()
    pq.write_table(table, sink, compression=compression)
    return sink.getvalue().size

def write_parquet_chunk(rows, out_path: Path, compression: str = "zstd"):
    table = pa.table(rows)
    pq.write_table(table, out_path.as_posix(), compression=compression)

def export_split_to_parquet(
    ds_split,
    split_name: str,
    out_parquet_dir: Path,
    include_images: bool,
    target_mb: float = 19.0,
    compression: str = "zstd",
    preview_dir: Path | None = None,
    preview_n: int = 5,
):
    out_parquet_dir.mkdir(parents=True, exist_ok=True)
    if preview_dir: preview_dir.mkdir(parents=True, exist_ok=True)
    TARGET = int(target_mb * 1024 * 1024)

    cur = {"split": [], "index": [], "label_health_rgb_png": []}
    if include_images: cur["image_png"] = []
    part_idx, n = 1, len(ds_split)

    for i in tqdm(range(n), desc=f"{split_name}: recolor+pack"):
        rec = ds_split[i]
        img: Image.Image  = rec["image"]
        lbl: Image.Image  = rec["label"]

        ids = label_pil_to_id_array(lbl)
        health_rgb = ids_to_health_rgb(ids)

        label_png = pil_to_png_bytes(health_rgb)
        if include_images: image_png = pil_to_png_bytes(img)

        cur["split"].append(split_name)
        cur["index"].append(i)
        cur["label_health_rgb_png"].append(label_png)
        if include_images: cur["image_png"].append(image_png)

        est = table_size_bytes(pa.table(cur), compression=compression)
        if est > TARGET:
            for k in list(cur.keys()): cur[k].pop()
            if len(cur["index"]) > 0:
                out_path = out_parquet_dir / f"{split_name}_part{part_idx:03d}.parquet"
                write_parquet_chunk(cur, out_path, compression=compression)
                part_idx += 1
                cur = {k: [] for k in cur.keys()}
            cur["split"].append(split_name)
            cur["index"].append(i)
            cur["label_health_rgb_png"].append(label_png)
            if include_images: cur["image_png"].append(image_png)
            if table_size_bytes(pa.table(cur), compression=compression) > TARGET:
                out_path = out_parquet_dir / f"{split_name}_part{part_idx:03d}.parquet"
                write_parquet_chunk(cur, out_path, compression=compression)
                part_idx += 1
                cur = {k: [] for k in cur.keys()}

        if preview_dir and i < preview_n:
            health_rgb.save(preview_dir / f"{split_name}_{i:05d}_label_health_rgb.png")

    if len(cur["index"]) > 0:
        out_path = out_parquet_dir / f"{split_name}_part{part_idx:03d}.parquet"
        write_parquet_chunk(cur, out_path, compression=compression)

def run_pipeline(outdir="coralscapes_export", include_images=False, target_mb=19.0):
    out_root = Path(outdir)
    parquet_dir = out_root / "parquet"
    samples_dir = out_root / "samples"
    parquet_dir.mkdir(parents=True, exist_ok=True)
    samples_dir.mkdir(parents=True, exist_ok=True)

    ds = load_dataset("EPFL-ECEO/coralscapes")
    for split in ("train", "validation", "test"):
        if split not in ds: continue
        export_split_to_parquet(
            ds_split=ds[split],
            split_name=split,
            out_parquet_dir=parquet_dir / split,
            include_images=include_images,
            target_mb=target_mb,
            compression="zstd",
            preview_dir=samples_dir / split,
            preview_n=5,
        )
    print(f"Done. Parquet parts in: {parquet_dir}")
    print(f"Preview PNGs in: {samples_dir}")

# <<< RUN IT HERE >>>
run_pipeline(outdir="coralscapes_export", include_images=False, target_mb=19.0)


train: recolor+pack: 100%|██████████| 1517/1517 [03:38<00:00,  6.95it/s]
validation: recolor+pack: 100%|██████████| 166/166 [00:19<00:00,  8.30it/s]
test: recolor+pack: 100%|██████████| 392/392 [00:47<00:00,  8.28it/s]


Done. Parquet parts in: coralscapes_export\parquet
Preview PNGs in: coralscapes_export\samples


# Coral Bleaching

In [2]:
import os
import re
from pathlib import Path
from typing import Tuple, Dict
from PIL import Image
import numpy as np


# --- Configuration (concrete, not placeholders) ---
BLEACHED_DIR = Path("coralbleaching/masks_bleached")
NON_BLEACHED_DIR = Path("coralbleaching/masks_non_bleached")
OUT_DIR = Path("./combined_masks")
SAVE_INDIVIDUAL_COLORIZED = True  # set False if you only want the combined


def ensure_dirs():
    OUT_DIR.mkdir(parents=True, exist_ok=True)
    if SAVE_INDIVIDUAL_COLORIZED:
        (OUT_DIR / "bleached_blue").mkdir(parents=True, exist_ok=True)
        (OUT_DIR / "non_bleached_red").mkdir(parents=True, exist_ok=True)


def is_image(p: Path) -> bool:
    return p.suffix.lower() in {".png", ".jpg", ".jpeg", ".tif", ".tiff", ".bmp"}


def normalize_stem(stem: str) -> str:
    """
    Normalize a filename stem to match pairs.

    Examples:
      'C1_BC_EM_T1_29nov24_CGomez_corr_bleached'      -> 'C1_BC_EM_T1_29nov24_CGomez'
      'C1_BC_EM_T1_29nov24_CGomez_corr_non_bleached'  -> 'C1_BC_EM_T1_29nov24_CGomez'
    """
    s = stem
    # Remove common suffix tokens in your samples
    s = s.replace("corr_bleached", "")
    s = s.replace("corr_non_bleached", "")
    # Generic clean-up for possible separators left behind
    s = re.sub(r"[_\-\.]+$", "", s)
    return s


def load_mask_bool(path: Path, size: Tuple[int, int] | None = None) -> np.ndarray:
    """
    Load an image mask as a boolean array.
    Any non-zero pixel is considered part of the mask.
    """
    img = Image.open(path).convert("L")
    if size is not None and img.size != size:
        # Nearest neighbor preserves the hard mask edges
        img = img.resize(size, resample=Image.Resampling.NEAREST)
    arr = np.array(img)
    return arr > 0


def colorize_and_save(mask: np.ndarray, color: Tuple[int, int, int], out_path: Path) -> None:
    """Save a single boolean mask as a solid-color RGB PNG."""
    h, w = mask.shape
    rgb = np.zeros((h, w, 3), dtype=np.uint8)
    rgb[mask] = color
    Image.fromarray(rgb, mode="RGB").save(out_path)


def combine_pair(b_mask: np.ndarray, n_mask: np.ndarray) -> np.ndarray:
    """
    Combine two boolean masks into a single RGB image:
      - Non-bleached -> Red
      - Bleached     -> Blue
      - Overlap      -> Magenta (Red + Blue)
    """
    h, w = b_mask.shape
    rgb = np.zeros((h, w, 3), dtype=np.uint8)
    # red channel for non-bleached
    rgb[n_mask, 0] = 255
    # blue channel for bleached
    rgb[b_mask, 2] = 255
    return rgb


def index_by_stem(folder: Path) -> Dict[str, Path]:
    """Map normalized stem -> file path (last one wins if duplicates)."""
    out: Dict[str, Path] = {}
    for p in folder.iterdir():
        if p.is_file() and is_image(p):
            out[normalize_stem(p.stem)] = p
    return out


def main():
    ensure_dirs()

    bleached = index_by_stem(BLEACHED_DIR)
    non_bleached = index_by_stem(NON_BLEACHED_DIR)

    # Build set of all stems present in either folder
    all_stems = sorted(set(bleached.keys()) | set(non_bleached.keys()))
    if not all_stems:
        print("No images found. Make sure your files are in ./bleached_masks and ./non_bleached_masks")
        return

    paired, only_b, only_n = 0, 0, 0

    for stem in all_stems:
        b_path = bleached.get(stem)
        n_path = non_bleached.get(stem)

        if b_path is None and n_path is None:
            continue

        # Load masks, aligning sizes where needed
        if b_path and n_path:
            # Use bleached as reference size
            ref_img = Image.open(b_path).convert("L")
            ref_size = ref_img.size  # (w, h)
            b_mask = np.array(ref_img) > 0
            n_mask = load_mask_bool(n_path, size=ref_size)
            paired += 1

        elif b_path:  # only bleached present
            ref_img = Image.open(b_path).convert("L")
            ref_size = ref_img.size
            b_mask = np.array(ref_img) > 0
            n_mask = np.zeros((ref_size[1], ref_size[0]), dtype=bool)
            only_b += 1

        else:  # only non-bleached present
            ref_img = Image.open(n_path).convert("L")
            ref_size = ref_img.size
            n_mask = np.array(ref_img) > 0
            b_mask = np.zeros((ref_size[1], ref_size[0]), dtype=bool)
            only_n += 1

        # Save individual colorized (optional)
        if SAVE_INDIVIDUAL_COLORIZED:
            if b_path:
                colorize_and_save(b_mask, (0, 0, 255), OUT_DIR / "bleached_blue" / f"{stem}_bleached_blue.png")
            if n_path:
                colorize_and_save(n_mask, (255, 0, 0), OUT_DIR / "non_bleached_red" / f"{stem}_non_bleached_red.png")

        # Save combined
        combined_rgb = combine_pair(b_mask, n_mask)
        Image.fromarray(combined_rgb, mode="RGB").save(OUT_DIR / f"{stem}_combined.png")

    print(f"Done. Paired: {paired}, only-bleached: {only_b}, only-non-bleached: {only_n}")
    print(f"Outputs written to: {OUT_DIR.resolve()}")


if __name__ == "__main__":
    main()


Done. Paired: 512, only-bleached: 146, only-non-bleached: 146
Outputs written to: C:\Users\joshu\Coral-reefs-DBL4\data_preprocessing\combined_masks


In [None]:
import os
import re
from pathlib import Path
from typing import Tuple, Dict
from PIL import Image
import numpy as np


# --- Fixed directories ---
BLEACHED_DIR = Path("coralbleaching/masks_bleached")
NON_BLEACHED_DIR = Path("coralbleaching//masks_non_bleached")

OUT_COMBINED = Path("coralbleaching//combined_masks")
OUT_SINGLE_B = Path("coralbleaching//single_masks/bleached_blue")
OUT_SINGLE_N = Path("coralbleaching//single_masks/non_bleached_red")


def ensure_dirs():
    OUT_COMBINED.mkdir(parents=True, exist_ok=True)
    OUT_SINGLE_B.mkdir(parents=True, exist_ok=True)
    OUT_SINGLE_N.mkdir(parents=True, exist_ok=True)


def is_image(p: Path) -> bool:
    return p.suffix.lower() in {".png", ".jpg", ".jpeg", ".tif", ".tiff", ".bmp"}


def normalize_stem(stem: str) -> str:
    """
    Normalize a filename stem to match pairs.

    Examples:
      'C1_BC_EM_T1_29nov24_CGomez_corr_bleached'      -> 'C1_BC_EM_T1_29nov24_CGomez'
      'C1_BC_EM_T1_29nov24_CGomez_corr_non_bleached'  -> 'C1_BC_EM_T1_29nov24_CGomez'
    """
    s = stem
    s = s.replace("corr_bleached", "")
    s = s.replace("corr_non_bleached", "")
    s = re.sub(r"[_\-\.]+$", "", s)
    return s


def load_mask_bool(path: Path, size: Tuple[int, int] | None = None) -> np.ndarray:
    img = Image.open(path).convert("L")
    if size is not None and img.size != size:
        img = img.resize(size, resample=Image.Resampling.NEAREST)
    arr = np.array(img)
    return arr > 0


def colorize(mask: np.ndarray, color: tuple[int, int, int]) -> np.ndarray:
    h, w = mask.shape
    rgb = np.zeros((h, w, 3), dtype=np.uint8)
    rgb[mask] = color
    return rgb


def combine_pair(b_mask: np.ndarray, n_mask: np.ndarray) -> np.ndarray:
    h, w = b_mask.shape
    rgb = np.zeros((h, w, 3), dtype=np.uint8)
    rgb[n_mask, 0] = 255   # red for non-bleached
    rgb[b_mask, 2] = 255   # blue for bleached
    return rgb


def index_by_stem(folder: Path) -> Dict[str, Path]:
    out: Dict[str, Path] = {}
    if not folder.exists():
        return out
    for p in folder.iterdir():
        if p.is_file() and is_image(p):
            out[normalize_stem(p.stem)] = p
    return out


def main():
    ensure_dirs()

    bleached = index_by_stem(BLEACHED_DIR)
    non_bleached = index_by_stem(NON_BLEACHED_DIR)

    all_stems = sorted(set(bleached.keys()) | set(non_bleached.keys()))
    if not all_stems:
        print("No images found. Put bleached masks in ./bleached_masks and non-bleached in ./non_bleached_masks.")
        return

    paired, only_b, only_n = 0, 0, 0

    for stem in all_stems:
        b_path = bleached.get(stem)
        n_path = non_bleached.get(stem)

        if b_path and n_path:
            # Pair -> save ONLY combined
            ref_img = Image.open(b_path).convert("L")
            ref_size = ref_img.size
            b_mask = np.array(ref_img) > 0
            n_mask = load_mask_bool(n_path, size=ref_size)
            combined = combine_pair(b_mask, n_mask)
            Image.fromarray(combined, mode="RGB").save(OUT_COMBINED / f"{stem}_combined.png")
            paired += 1

        elif b_path:
            # Singleton bleached -> save ONLY blue mask
            ref_img = Image.open(b_path).convert("L")
            b_mask = np.array(ref_img) > 0
            blue = colorize(b_mask, (0, 0, 255))
            Image.fromarray(blue, mode="RGB").save(OUT_SINGLE_B / f"{stem}_bleached_blue.png")
            only_b += 1

        else:  # n_path exists only
            ref_img = Image.open(n_path).convert("L")
            n_mask = np.array(ref_img) > 0
            red = colorize(n_mask, (255, 0, 0))
            Image.fromarray(red, mode="RGB").save(OUT_SINGLE_N / f"{stem}_non_bleached_red.png")
            only_n += 1

    print(f"Done. Paired saved (combined only): {paired}")
    print(f"Singleton bleached saved (blue only): {only_b}")
    print(f"Singleton non-bleached saved (red only): {only_n}")
    print(f"Combined -> {OUT_COMBINED.resolve()}")
    print(f"Singletons -> {OUT_SINGLE_B.parent.resolve()}")


if __name__ == "__main__":
    main()


No images found. Put bleached masks in ./bleached_masks and non-bleached in ./non_bleached_masks.
