<a href="https://colab.research.google.com/github/gunelaliyevaa/wildfire-detection-using-satellite-imagery/blob/main/input_image_preprocessing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
"""preprocess_and_patch.py
========================

Apply *Contrast‑Limited Adaptive Histogram Equalisation* (CLAHE) to each
image and slice it into fixed‑size patches. The script expects the input
dataset to be organised as::

    input_dir/
        fire/
        no_fire/

and produces an identical folder tree in *output_dir* containing the
generated patches::

    output_dir/
        fire/
            <orig>_patch_1.png
            <orig>_patch_2.png
            ...
        no_fire/
            ...

By default, every 512 × 512 image is divided into four 256 × 256 patches,
but the patch size is configurable.

Example
-------
```
python preprocess_and_patch.py \
    --input-dir /data/labeled_images \
    --output-dir /data/patched_images \
    --patch-size 256 \
    --clip-limit 2.0 \
    --tile-size 8 8
```
"""
from __future__ import annotations

import argparse
import os
from pathlib import Path
from typing import Sequence

import cv2
import numpy as np

# ---------------------------------------------------------------------------
# Utility functions
# ---------------------------------------------------------------------------

def create_clahe(clip_limit: float, tile_grid_size: tuple[int, int]) -> cv2.CLAHE:  # noqa: D401
    """Return an OpenCV CLAHE object with the given parameters."""
    return cv2.createCLAHE(clipLimit=clip_limit, tileGridSize=tile_grid_size)


def apply_clahe_rgb(image: np.ndarray, *, clahe: cv2.CLAHE) -> np.ndarray:
    """Apply CLAHE **per‑channel** in *Lab* colour‑space.

    Args:
        image: BGR ``numpy`` array as returned by ``cv2.imread``.
        clahe: Pre‑configured ``cv2.CLAHE`` instance.
    """
    lab = cv2.cvtColor(image, cv2.COLOR_BGR2LAB)
    l, a, b = cv2.split(lab)
    l_eq = clahe.apply(l)
    lab_eq = cv2.merge((l_eq, a, b))
    return cv2.cvtColor(lab_eq, cv2.COLOR_LAB2BGR)


def split_into_patches(image: np.ndarray, *, patch_size: int = 256) -> list[np.ndarray]:
    """Return **non‑overlapping** ``patch_size`` square patches from *image*."""
    patches: list[np.ndarray] = []
    h, w, _ = image.shape
    for y in range(0, h, patch_size):
        for x in range(0, w, patch_size):
            patch = image[y : y + patch_size, x : x + patch_size]
            if patch.shape[0] == patch_size and patch.shape[1] == patch_size:
                patches.append(patch)
    return patches


# ---------------------------------------------------------------------------
# Core processing
# ---------------------------------------------------------------------------

def process_dataset(
    *,
    input_dir: Path,
    output_dir: Path,
    categories: Sequence[str] = ("fire", "no_fire"),
    patch_size: int = 256,
    clip_limit: float = 2.0,
    tile_grid: tuple[int, int] = (8, 8),
) -> None:
    """Apply CLAHE + patching for every image in *input_dir*/*category*.

    Args:
        input_dir: Root directory of original images.
        output_dir: Root directory where patches are saved.
        categories: Sub‑folders (classes) to process.
        patch_size: Width / height of the square patches.
        clip_limit: CLAHE clip limit.
        tile_grid: CLAHE tile grid size as ``(rows, cols)``.
    """
    clahe = create_clahe(clip_limit, tile_grid)

    for category in categories:
        src_folder = input_dir / category
        dst_folder = output_dir / category
        dst_folder.mkdir(parents=True, exist_ok=True)

        for img_name in src_folder.iterdir():
            if not img_name.suffix.lower() in {".png", ".jpg", ".jpeg"}:
                continue
            image = cv2.imread(str(img_name))
            if image is None:
                print(f"Could not read {img_name}; skipping")
                continue

            # --- Pre‑process & patch ---
            image_eq = apply_clahe_rgb(image, clahe=clahe)
            patches = split_into_patches(image_eq, patch_size=patch_size)

            # --- Save patches ---
            stem = img_name.stem
            for i, patch in enumerate(patches, 1):
                patch_filename = f"{stem}_patch_{i}.png"
                cv2.imwrite(str(dst_folder / patch_filename), patch)

    print("Processing completed successfully.")


# ---------------------------------------------------------------------------
# CLI interface
# ---------------------------------------------------------------------------

def parse_args() -> argparse.Namespace:  # noqa: D401
    """Parse command‑line arguments."""
    p = argparse.ArgumentParser(description="CLAHE + patch generator")
    p.add_argument("--input-dir", type=Path, required=True, help="Input dataset root")
    p.add_argument("--output-dir", type=Path, required=True, help="Output root for patches")
    p.add_argument("--patch-size", type=int, default=256, help="Square patch size (default: 256)")
    p.add_argument("--clip-limit", type=float, default=2.0, help="CLAHE clip limit (default: 2.0)")
    p.add_argument("--tile-size", type=int, nargs=2, metavar=("ROWS", "COLS"), default=(8, 8), help="CLAHE tile grid size (default: 8 8)")
    p.add_argument("--categories", nargs="+", default=["fire", "no_fire"], help="Category sub‑folders to process")
    return p.parse_args()


if __name__ == "__main__":
    args = parse_args()
    process_dataset(
        input_dir=args.input_dir,
        output_dir=args.output_dir,
        categories=args.categories,
        patch_size=args.patch_size,
        clip_limit=args.clip_limit,
        tile_grid=tuple(args.tile_size),
    )
"}


Processing completed successfully.
