In [None]:
# ----------------------------- viirs_download.py -----------------------------
"""
VIIRS Downloader (dated folders + Active Fire)
==============================================

What this script does
---------------------
• Authenticates to NASA Earthdata (via EARTHDATA_USERNAME / EARTHDATA_PASSWORD env vars,
  or secure prompt fallback) and initializes earthaccess.
• Reads an AOI shapefile (any CRS), computes an EPSG:4326 bounding box.
• For each day in START_DATE..END_DATE (inclusive):
    - Searches & downloads VIIRS L1B radiance (VJ202IMG) and GEO (VJ203IMG)
      into BASEDIR/YYYY-MM-DD/raw/
    - Optionally searches & downloads Active Fire (e.g., VJ214IMG) into the same folder
    - Writes per-day manifests in BASEDIR/YYYY-MM-DD/:
        * manifest_pairs.csv         — L1B↔GEO pairs + QC flag presence for I04/I05
        * manifest_active_fire.csv   — AF files + UTC time + has_internal_geo + paired GEO path (if needed)

Dependencies
------------
pip install earthaccess fiona shapely pyproj h5py

Notes
-----
• Product short names default to NOAA-21 (VJ2*). For S-NPP you would switch to VNP*,
  and for NOAA-20 to VJ1*.
• Network timeouts are set to 20 s (socket.setdefaulttimeout).
• The pairing uses the canonical 'Ayyyyddd.HHMM' key; if an exact GEO match is missing,
  a same-day fallback is used.
"""
# ---------------------------------------------------------------------------

from __future__ import annotations

# ============================== Standard libs ================================
import os
import csv
import re
import socket
from pathlib import Path
from datetime import date, datetime, timedelta, timezone
from typing import Iterable, Sequence

# ============================== Third-party ================================
import earthaccess as ea
import h5py
import fiona
from shapely.geometry import shape
from shapely.ops import unary_union, transform as shp_transform
from pyproj import CRS as PJCRS, Transformer

# ---------------------------- Global settings -------------------------------
socket.setdefaulttimeout(20)  # fail fast on flaky networks

# =============================== Configuration ==============================
# Core radiance + geolocation short names (NOAA-21 defaults). For NOAA-20/21 swap VJ1*/VJ2*;
# for S-NPP use VNP* equivalents.
SHORT_NAME_L1B = "VJ202IMG"   # VIIRS I/M band SDRs (radiances)
SHORT_NAME_GEO = "VJ203IMG"   # VIIRS geolocation

# Active Fire (optional). Add/remove short names as needed.
INCLUDE_ACTIVE_FIRE = True
AF_SHORTNAMES = [
    # "VNP14IMG",  # S-NPP (uncomment to include)
    # "VJ114IMG",  # NOAA-20 (uncomment to include)
    "VJ214IMG",    # NOAA-21 (kept from original)
]

# Time window (inclusive by day)
START_DATE = date(2025, 1, 1)
END_DATE   = date(2025, 1, 11)

# Base path; dated subfolders will be created under here (YYYY-MM-DD/{raw, manifests })
BASEDIR = Path(
    r"path\to\base\directory"
)

# AOI shapefile (any polygon/multipolygon; any CRS)
AOI_SHP = Path(
    r"path\to\F002_L1__IR__L2L1M0__2025-01-10T215412.018348Z_2025-04-10T154832.806087Z_97706189_MWIR_Boundary.shp"
)


# ============================== Auth utilities ==============================

def earthdata_login_username_password() -> None:
    """
    Authenticate via environment variables EARTHDATA_USERNAME / EARTHDATA_PASSWORD.
    If absent, securely prompt and set them in the environment for this process.
    """
    user = os.getenv("EARTHDATA_USERNAME")
    pwd  = os.getenv("EARTHDATA_PASSWORD")
    if not user or not pwd:
        import getpass
        user = user or input("Earthdata username: ").strip()
        pwd  = pwd  or getpass.getpass("Earthdata password: ")
        os.environ["EARTHDATA_USERNAME"] = user
        os.environ["EARTHDATA_PASSWORD"] = pwd
    ea.login(strategy="environment")


# ============================== AOI & helpers ===============================

def read_aoi_bbox_wgs84(shp_path: Path) -> tuple[float, float, float, float]:
    """
    Read the AOI shapefile and return a clamped WGS84 bounding box (minx, miny, maxx, maxy).

    • Accepts any input CRS; reprojects to EPSG:4326.
    • Clamps to valid lon/lat bounds ([-180,180], [-90,90]).
    """
    if not shp_path.exists():
        raise FileNotFoundError(f"AOI not found: {shp_path}")
    with fiona.open(shp_path, "r") as src:
        geoms = [shape(feat["geometry"]) for feat in src]
        if not geoms:
            raise ValueError("AOI shapefile has no geometries.")
        src_crs = src.crs
    if not src_crs:
        raise ValueError("AOI shapefile has no CRS.")
    aoi = unary_union(geoms).buffer(0)

    src_crs_obj = PJCRS.from_user_input(src_crs)
    wgs84 = PJCRS.from_epsg(4326)
    if src_crs_obj != wgs84:
        transformer = Transformer.from_crs(src_crs_obj, wgs84, always_xy=True)
        aoi = shp_transform(lambda x, y, z=None: transformer.transform(x, y), aoi)

    minx, miny, maxx, maxy = aoi.bounds
    return (
        max(minx, -180.0),
        max(miny,  -90.0),
        min(maxx,  180.0),
        min(maxy,   90.0),
    )


def date_iter(d0: date, d1: date) -> Iterable[date]:
    """Inclusive date iterator from d0 to d1 (step=1 day)."""
    cur = d0
    while cur <= d1:
        yield cur
        cur += timedelta(days=1)


def timestamp_key(p: Path) -> str:
    """
    Extract '.Ayyyyddd.HHMM.' from the filename if present (canonical key for pairing).
    Falls back to stem if not found.
    """
    m = re.search(r"\.(A\d{7}\.\d{4})\.", p.name)
    return m.group(1) if m else p.stem


def pair_l1b_geo(l1b_paths: Sequence[Path] | Sequence[str],
                 geo_paths: Sequence[Path] | Sequence[str]) -> list[tuple[Path, Path]]:
    """
    Pair L1B with GEO using the canonical time key. If no exact GEO exists,
    fall back to a GEO from the same day (best effort).
    """
    gmap = {timestamp_key(Path(g)): Path(g) for g in geo_paths}
    pairs: list[tuple[Path, Path]] = []
    for l in l1b_paths:
        l = Path(l)
        k = timestamp_key(l)
        if k in gmap:
            pairs.append((l, gmap[k]))
        else:
            day = k.split(".")[0]  # 'Ayyyyddd'
            cands = [g for kk, g in gmap.items() if kk.startswith(day)]
            if cands:
                pairs.append((l, cands[0]))
    return pairs


def has_quality_flags_in_l1b(h5_path: Path, band: str) -> bool:
    """
    Check if '/observation_data/<BAND>_quality_flags' exists in the HDF5 L1B file.
    """
    try:
        with h5py.File(h5_path, "r") as f:
            return f.get(f"/observation_data/{band}_quality_flags") is not None
    except Exception:
        return False


def acquisition_dt_from_name(name: str) -> datetime | None:
    """Parse '.Ayyyyddd.HHMM.' → aware UTC datetime; returns None if not found."""
    m = re.search(r"\.A(\d{4})(\d{3})\.(\d{2})(\d{2})\.", name)
    if not m:
        return None
    year, doy, hh, mm = map(int, m.groups())
    return datetime(year, 1, 1, tzinfo=timezone.utc) + timedelta(days=doy - 1, hours=hh, minutes=mm)


def af_has_internal_geo(h5_path: Path) -> bool:
    """
    Active Fire HDF5 may include internal geolocation arrays.
    Returns True if both '/geolocation_data/latitude' and '/geolocation_data/longitude' exist.
    """
    try:
        with h5py.File(h5_path, "r") as f:
            return ("/geolocation_data/latitude" in f) and ("/geolocation_data/longitude" in f)
    except Exception:
        return False


# ================================== Main =====================================

def main() -> None:
    # ---- AOI & auth ----
    bbox = read_aoi_bbox_wgs84(AOI_SHP)
    earthdata_login_username_password()

    # ---- Iterate days ----
    for day in date_iter(START_DATE, END_DATE):
        day_str = day.strftime("%Y-%m-%d")
        DAYDIR = BASEDIR / day_str
        RAWDIR = DAYDIR / "raw"
        RAWDIR.mkdir(parents=True, exist_ok=True)

        MANIFEST_L1_GEO = DAYDIR / "manifest_pairs.csv"
        MANIFEST_AF     = DAYDIR / "manifest_active_fire.csv"

        # Temporal window for Earthdata search: [day, day+1)
        t0, t1 = day, day + timedelta(days=1)

        print(f"\n=== {day_str} ===")
        print(f"[INFO] Searching {SHORT_NAME_L1B}/{SHORT_NAME_GEO} for {t0}..{t1} in bbox {bbox} …")

        # ---- Search & download core L1B/GEO ----
        l1b_items = ea.search_data(short_name=SHORT_NAME_L1B, temporal=(t0, t1), bounding_box=bbox)
        geo_items = ea.search_data(short_name=SHORT_NAME_GEO, temporal=(t0, t1), bounding_box=bbox)

        if not l1b_items:
            print("[WARN] No L1B granules for this day.")
        if not geo_items:
            print("[WARN] No GEO granules for this day.")

        l1b_paths = ea.download(l1b_items, RAWDIR.as_posix()) if l1b_items else []
        geo_paths = ea.download(geo_items, RAWDIR.as_posix()) if geo_items else []
        print(f"[OK] Downloaded: {len(l1b_paths)} L1B, {len(geo_paths)} GEO into {RAWDIR}")

        # ---- Pair L1B↔GEO and write manifest ----
        if l1b_paths and geo_paths:
            pairs = pair_l1b_geo(l1b_paths, geo_paths)
            with MANIFEST_L1_GEO.open("w", newline="") as f:
                w = csv.writer(f)
                w.writerow(["l1b_path", "geo_path", "timestamp_key", "has_qc_I04", "has_qc_I05"])
                for l1b_p, geo_p in pairs:
                    h4 = has_quality_flags_in_l1b(Path(l1b_p), "I04")
                    h5 = has_quality_flags_in_l1b(Path(l1b_p), "I05")
                    w.writerow([str(l1b_p), str(geo_p), timestamp_key(Path(l1b_p)), int(h4), int(h5)])
            print(f"[OK] {MANIFEST_L1_GEO}")

        # ---- Active Fire (optional) ----
        if INCLUDE_ACTIVE_FIRE and AF_SHORTNAMES:
            af_rows: list[list[str | int]] = []

            # Pre-map GEO by timestamp for pairing AF that lack internal geolocation
            geo_map = {timestamp_key(Path(g)): Path(g) for g in geo_paths}

            for sn in AF_SHORTNAMES:
                print(f"[INFO] Searching AF {sn} for {t0}..{t1} …")
                items = ea.search_data(short_name=sn, temporal=(t0, t1), bounding_box=bbox)
                if not items:
                    print(f"[INFO] No AF granules for {sn} on {day_str}.")
                    continue

                paths = ea.download(items, RAWDIR.as_posix())
                for p in paths:
                    p = Path(p)
                    key = timestamp_key(p)
                    dt  = acquisition_dt_from_name(p.name)
                    acq_date = dt.strftime("%Y-%m-%d") if dt else ""
                    acq_time = dt.strftime("%H:%M")     if dt else ""
                    has_geo  = af_has_internal_geo(p)
                    paired_geo = str(geo_map.get(key, "")) if not has_geo else ""
                    af_rows.append([sn, str(p), key, acq_date, acq_time, int(has_geo), paired_geo])

            if af_rows:
                with MANIFEST_AF.open("w", newline="") as f:
                    w = csv.writer(f)
                    w.writerow([
                        "product", "file_path", "timestamp_key",
                        "acq_date_utc", "acq_time_utc",
                        "has_internal_geo", "paired_geo_path"
                    ])
                    w.writerows(af_rows)
                print(f"[OK] {MANIFEST_AF}")

    print("\n[DONE] All requested days downloaded into dated folders under:", BASEDIR)


# ================================== CLI ======================================

if __name__ == "__main__":
    main()


In [None]:
# --------------------------- viirs_process_rad_bt.py ---------------------------
"""
VIIRS L1B Processor (date range; Radiance + Planck-derived BT)
================================================================

What this script does
---------------------
• Processes all days from START_DATE to END_DATE (inclusive).
• Expects dated folders from a downloader:

    BASEDIR/YYYY-MM-DD/
        raw/                    # HDF5 swaths: L1B (VJ202IMG) + GEO (VJ203IMG)
        manifest_pairs.csv      # (optional) L1B↔GEO pairs; auto-fallback scans raw/
        bt/                     # created here

• Outputs per day to BASEDIR/YYYY-MM-DD/bt/:
    *_I04_Rad.tif, *_I04_BT_K.tif, *_I05_Rad.tif, *_I05_BT_K.tif

• Maintains CSV log per day:
    BASEDIR/YYYY-MM-DD/processing_log.csv

Assumptions / Notes
-------------------
• Brightness temperature is computed via Planck inversion using nominal λ for I04/I05.
• A simple cold-cloud mask is applied from I05 BT (< CLOUD_BT_K = 265 °K) and I05 invalids.
• AOI is used for both gridding extent (intersection) and final mask (clip to AOI).
• Grid is EPSG:4326 with uniform spacing GRID_RES_DEG.
Notes
-----
• Cloud mask is deliberately simple (cold-cloud threshold on Band 31 BT).

Dependencies
------------
pip install h5py numpy rasterio pyresample fiona shapely pyproj tzdata
"""

from __future__ import annotations

# ============================== Imports ======================================
import os
import re
import csv
from pathlib import Path
from datetime import datetime, timedelta, timezone, date
from zoneinfo import ZoneInfo
from typing import Optional

import numpy as np
import h5py
import rasterio
from rasterio.transform import from_bounds
from rasterio.crs import CRS
from rasterio.features import geometry_mask

from pyresample import geometry, kd_tree

import fiona
from shapely.geometry import shape, mapping
from shapely.ops import unary_union, transform as shp_transform
from pyproj import CRS as PJCRS, Transformer


# ============================ Configuration ==================================
# Range to process (inclusive)
BASEDIR     = Path(r"path\to\base\directory")
START_DATE  = date(2025, 1, 1)
END_DATE    = date(2025, 1, 11)

# AOI shapefile used for intersection + clipping
AOI_SHP     = Path(r"path\to\F002_L1__IR__L2L1M0__2025-01-10T215412.018348Z_2025-04-10T154832.806087Z_97706189_MWIR_Boundary.shp")

# Grid and radiometry parameters
GRID_RES_DEG = 0.0036  # ~400 m at equator (deg per pixel)
LAM_I4_UM    = 3.74
LAM_I5_UM    = 11.45
CLOUD_BT_K   = 265.0   # simple cold-cloud threshold (Kelvin)

# Local time zone for logging human-readable times
LOCAL_TZNAME = "America/Chicago"

# =============================== Utilities ===================================
def date_iter(d0: date, d1: date):
    """Inclusive date iterator: d0, d0+1, …, d1."""
    cur = d0
    while cur <= d1:
        yield cur
        cur += timedelta(days=1)


def read_aoi_wgs84(shp_path: Path):
    """
    Read AOI shapefile and return a valid, unioned MultiPolygon in EPSG:4326.

    Returns
    -------
    shapely.geometry.base.BaseGeometry
        AOI geometry in WGS84 (lon/lat).
    """
    if not shp_path.exists():
        raise FileNotFoundError(f"AOI not found: {shp_path}")
    with fiona.open(shp_path, "r") as src:
        geoms = [shape(feat["geometry"]) for feat in src]
        if not geoms:
            raise ValueError("AOI shapefile has no geometries.")
        src_crs = src.crs
    if not src_crs:
        raise ValueError("AOI shapefile has no CRS.")
    aoi = unary_union(geoms).buffer(0)
    src_crs_obj = PJCRS.from_user_input(src_crs)
    dst_crs_obj = PJCRS.from_epsg(4326)
    if src_crs_obj != dst_crs_obj:
        transformer = Transformer.from_crs(src_crs_obj, dst_crs_obj, always_xy=True)
        aoi = shp_transform(lambda x, y, z=None: transformer.transform(x, y), aoi)
    return aoi


def timestamp_key(p: Path) -> str:
    """Extract '.Ayyyyddd.HHMM.' key from filename; fallback to stem."""
    m = re.search(r"\.(A\d{7}\.\d{4})\.", p.name)
    return m.group(1) if m else p.stem


def read_pairs_from_manifest(manifest_path: Path, rawdir: Path) -> list[tuple[Path, Path]]:
    """
    Read L1B↔GEO pairs from manifest if present. Otherwise, build pairs by scanning raw/.
    """
    if manifest_path.exists():
        pairs = []
        with manifest_path.open("r", newline="") as f:
            r = csv.DictReader(f)
            for row in r:
                pairs.append((Path(row["l1b_path"]), Path(row["geo_path"])))
        return pairs

    # Fallback: scan for common product names (S-NPP/NOAA-20/21)
    l1b = sorted(rawdir.glob("VNP02IMG*.h5")) + sorted(rawdir.glob("VJ102IMG*.h5")) + sorted(rawdir.glob("VJ202IMG*.h5"))
    geo = sorted(rawdir.glob("VNP03IMG*.h5")) + sorted(rawdir.glob("VJ103IMG*.h5")) + sorted(rawdir.glob("VJ203IMG*.h5"))
    gmap = {timestamp_key(p): p for p in geo}
    pairs = []
    for p in l1b:
        k = timestamp_key(p)
        if k in gmap:
            pairs.append((p, gmap[k]))
    return pairs


def acquisition_dt_from_name(name: str) -> Optional[datetime]:
    """
    Parse '.Ayyyyddd.HHMM.' from a filename, returning an aware UTC datetime.
    """
    m = re.search(r"\.A(\d{4})(\d{3})\.(\d{2})(\d{2})\.", name)
    if not m:
        return None
    year, doy, hh, mm = map(int, m.groups())
    return datetime(year, 1, 1, tzinfo=timezone.utc) + timedelta(days=doy - 1, hours=hh, minutes=mm)


# ============================= Planck inversion ===============================
def planck_bt_from_radiance(L_um, lam_um: float) -> np.ndarray:
    """
    Invert Planck’s law to obtain brightness temperature (Kelvin) from spectral radiance.

    Parameters
    ----------
    L_um : array-like
        Spectral radiance in W m^-2 sr^-1 µm^-1.
    lam_um : float
        Effective wavelength in µm.

    Returns
    -------
    np.ndarray (float32)
        Brightness temperature (K).
    """
    h = 6.62607015e-34
    c = 2.99792458e8
    k = 1.380649e-23
    lam = lam_um * 1e-6
    K1 = (2 * h * c**2) / (lam**5) * 1e-6              # (W m^-2 sr^-1 µm^-1)
    K2 = (h * c) / (k * lam)                            # (K)
    L  = np.array(L_um, dtype=np.float64)
    L  = np.clip(L, 1e-9, np.inf)                       # avoid division/log issues
    return (K2 / np.log1p(K1 / L)).astype(np.float32)


# ========================= Band read / decode (L1B) ===========================
def read_band_rad_bt_and_mask(f: h5py.File, band: str, lam_um: float):
    """
    Read radiance for a VIIRS band and compute BT. Also derive validity mask.

    Logic
    -----
    • Scales/calibrates using scale_factor/add_offset.
    • Invalid if equals _FillValue or any of flag_values.
    • If <band>_quality_flags exists: require == 0 to be valid.

    Returns
    -------
    (rad, bt, valid_mask, has_qc) or (None, None, None, False) if band missing.
    """
    dset = f"/observation_data/{band}"
    if dset not in f:
        return None, None, None, False

    v = f[dset]
    si = v[...].astype(np.uint32)
    scale = float(v.attrs["scale_factor"])
    offs  = float(v.attrs["add_offset"])
    fill  = int(v.attrs["_FillValue"])
    flag_vals = list(v.attrs.get("flag_values", []))

    valid = ~(si == fill)
    for fv in flag_vals:
        valid &= ~(si == fv)

    qname  = f"/observation_data/{band}_quality_flags"
    has_qc = qname in f
    if has_qc:
        q = f[qname][...]
        valid &= (q == 0)

    rad = np.full(si.shape, np.nan, dtype=np.float32)
    rad[valid] = si[valid] * scale + offs
    bt = planck_bt_from_radiance(rad, lam_um=lam_um)
    return rad, bt, valid, has_qc


# ========================== Geolocation / gridding ============================
def read_geo(geo_path: Path) -> tuple[np.ndarray, np.ndarray]:
    """Read swath latitude/longitude arrays and clamp to valid ranges."""
    with h5py.File(geo_path, "r") as g:
        lat = g["/geolocation_data/latitude"][...].astype(np.float32)
        lon = g["/geolocation_data/longitude"][...].astype(np.float32)
    lat[(lat < -90) | (lat > 90)] = np.nan
    lon[(lon < -180) | (lon > 180)] = np.nan
    return lat, lon


def define_area_wgs84_intersection(lat: np.ndarray,
                                   lon: np.ndarray,
                                   aoi_geom,
                                   res_deg: float = GRID_RES_DEG,
                                   pad: float = 0.0):
    """
    Define an EPSG:4326 AreaDefinition that is the intersection of the swath bounds and AOI.

    Returns
    -------
    (area_def, transform, width, height) or None if no intersection.
    """
    sw_lon_min, sw_lon_max = float(np.nanmin(lon)), float(np.nanmax(lon))
    sw_lat_min, sw_lat_max = float(np.nanmin(lat)), float(np.nanmax(lat))
    aoi_lon_min, aoi_lat_min, aoi_lon_max, aoi_lat_max = aoi_geom.bounds

    lon_min = max(sw_lon_min, aoi_lon_min) - pad
    lon_max = min(sw_lon_max, aoi_lon_max) + pad
    lat_min = max(sw_lat_min, aoi_lat_min) - pad
    lat_max = min(sw_lat_max, aoi_lat_max) + pad
    if not (lon_min < lon_max and lat_min < lat_max):
        return None

    width  = int(np.ceil((lon_max - lon_min) / res_deg))
    height = int(np.ceil((lat_max - lat_min) / res_deg))
    transform = from_bounds(lon_min, lat_min, lon_max, lat_max, width, height)

    proj_dict = {"proj": "longlat", "datum": "WGS84"}  # pyresample longlat dictionary
    area_def = geometry.AreaDefinition(
        "wgs84", "WGS84 latlon", "epsg4326",
        proj_dict, width, height,
        (lon_min, lat_min, lon_max, lat_max)
    )
    return area_def, transform, width, height


def resample_swath_to_grid(lat: np.ndarray,
                           lon: np.ndarray,
                           data: np.ndarray,
                           area_def: geometry.AreaDefinition) -> np.ndarray:
    """
    Nearest-neighbor resampling of swath data to the target grid.
    """
    swath_def = geometry.SwathDefinition(lons=lon, lats=lat)
    out = kd_tree.resample_nearest(
        swath_def, data, area_def,
        radius_of_influence=5000,  # meters
        fill_value=np.nan
    )
    return out.astype(np.float32)


def write_geotiff(path: Path,
                  arr: np.ndarray,
                  transform,
                  crs=CRS.from_epsg(4326),
                  nodata=np.float32(np.nan),
                  band_tags: dict | None = None,
                  dtype=rasterio.float32) -> None:
    """
    Write a single-band GeoTIFF with LZW compression and AOI nodata.
    """
    profile = {
        "driver": "GTiff", "height": arr.shape[0], "width": arr.shape[1], "count": 1,
        "dtype": dtype, "crs": crs, "transform": transform,
        "nodata": nodata, "compress": "lzw", "tiled": True
    }
    with rasterio.open(path, "w", **profile) as dst:
        dst.write(arr.astype(profile["dtype"]), 1)
        if band_tags:
            dst.update_tags(1, **band_tags)


def append_log_l1b(log_csv: Path, row: dict) -> None:
    """
    Append one record to the per-day processing_log.csv, creating headers if needed.
    """
    exists = log_csv.exists()
    with log_csv.open("a", newline="") as f:
        w = csv.DictWriter(f, fieldnames=[
            "l1b_file","geo_file","timestamp_key",
            "acq_date_utc","acq_time_utc","acq_date_local","acq_time_local","local_tz",
            "has_qc_I04","has_qc_I05","cloud_thresh_K",
            "bt_I04_min","bt_I04_max","bt_I04_mean",
            "bt_I05_min","bt_I05_max","bt_I05_mean",
            "valid_px_I04","valid_px_I05"
        ])
        if not exists:
            w.writeheader()
        w.writerow(row)


# ============================== Per-day worker ================================
def process_day(day: date, aoi) -> None:
    """
    Process one day: read pairs, compute/rasterize I04/I05 radiance + BT,
    apply cloud/AOI masks, write GeoTIFFs, append CSV log.
    """
    daydir   = BASEDIR / day.strftime("%Y-%m-%d")
    rawdir   = daydir / "raw"
    outbt    = daydir / "bt"
    outbt.mkdir(parents=True, exist_ok=True)

    manifest = daydir / "manifest_pairs.csv"
    log_csv  = daydir / "processing_log.csv"

    if not rawdir.exists():
        print(f"[SKIP] No raw folder for {day}: {rawdir}")
        return

    pairs = read_pairs_from_manifest(manifest, rawdir)
    if not pairs:
        print(f"[SKIP] No L1B+GEO pairs for {day}.")
        return

    tz = ZoneInfo(LOCAL_TZNAME)
    print(f"[INFO] {day} — processing {len(pairs)} pairs …")

    for l1b_p, geo_p in pairs:
        # ---- Read band radiances + BT + validity ----
        with h5py.File(l1b_p, "r") as f:
            rad_i4, bt_i4, valid_i4, has_qc_i4 = read_band_rad_bt_and_mask(f, "I04", lam_um=LAM_I4_UM)
            rad_i5, bt_i5, valid_i5, has_qc_i5 = read_band_rad_bt_and_mask(f, "I05", lam_um=LAM_I5_UM)

        if bt_i4 is None and bt_i5 is None:
            print(f"[WARN] No I04/I05 in {Path(l1b_p).name}; skipping.")
            continue

        # ---- GEO lat/lon ----
        lat, lon = read_geo(geo_p)

        # ---- Area / transform from swath∩AOI ----
        area = define_area_wgs84_intersection(lat, lon, aoi, res_deg=GRID_RES_DEG)
        if area is None:
            print("[INFO] Swath does not intersect AOI; skipping L1B.")
            continue
        area_def, transform, width, height = area

        # ---- Cloud/invalid mask (from I05) ----
        cloud_mask = None
        if bt_i5 is not None:
            cloud_mask = (bt_i5 < CLOUD_BT_K)
            if valid_i5 is not None:
                cloud_mask |= ~valid_i5

        def apply_masks(arr: Optional[np.ndarray], valid: Optional[np.ndarray]) -> Optional[np.ndarray]:
            """Apply validity and cloud masks to a band array (in swath geometry)."""
            if arr is None:
                return None
            out = arr.copy()
            if valid is not None:
                out[~valid] = np.nan
            if cloud_mask is not None:
                out[cloud_mask] = np.nan
            return out

        # Apply masks in swath space
        rad_i4_c = apply_masks(rad_i4, valid_i4)
        rad_i5_c = apply_masks(rad_i5, valid_i5)
        bt_i4_c  = apply_masks(bt_i4,  valid_i4)
        bt_i5_c  = apply_masks(bt_i5,  valid_i5)

        stem = Path(l1b_p).with_suffix("").name  # base name without ".h5"

        # ---- Resample to grid (only if present) ----
        if rad_i4_c is not None:
            rad_i4_g = resample_swath_to_grid(lat, lon, rad_i4_c, area_def)
            bt_i4_g  = resample_swath_to_grid(lat, lon, bt_i4_c,  area_def)
        if rad_i5_c is not None:
            rad_i5_g = resample_swath_to_grid(lat, lon, rad_i5_c, area_def)
            bt_i5_g  = resample_swath_to_grid(lat, lon, bt_i5_c,  area_def)

        # ---- AOI mask on gridded arrays ----
        mask = geometry_mask([mapping(aoi)], out_shape=(height, width), transform=transform, invert=True).astype(bool)

        n_valid_i4 = n_valid_i5 = 0
        i4_min = i4_max = i4_mean = np.nan
        i5_min = i5_max = i5_mean = np.nan

        # ---- Write I04 outputs + stats ----
        if rad_i4_c is not None:
            rad_i4_g = np.where(mask, rad_i4_g, np.nan)
            out4r = outbt / f"{stem}_I04_Rad.tif"
            write_geotiff(
                out4r, rad_i4_g, transform,
                band_tags={"units": "W/m^2/sr/μm", "long_name": "VIIRS I04 Radiance"}
            )

            bt_i4_g = np.where(mask, bt_i4_g, np.nan)
            n_valid_i4 = int(np.count_nonzero(~np.isnan(bt_i4_g)))
            if n_valid_i4 > 0:
                i4_min = float(np.nanmin(bt_i4_g))
                i4_max = float(np.nanmax(bt_i4_g))
                i4_mean = float(np.nanmean(bt_i4_g))

            out4b = outbt / f"{stem}_I04_BT_K.tif"
            write_geotiff(
                out4b, bt_i4_g, transform,
                band_tags={"units": "K", "long_name": "VIIRS I04 Brightness Temperature"}
            )
            print(f"[OK] wrote {out4r} and {out4b}")

        # ---- Write I05 outputs + stats ----
        if rad_i5_c is not None:
            rad_i5_g = np.where(mask, rad_i5_g, np.nan)
            out5r = outbt / f"{stem}_I05_Rad.tif"
            write_geotiff(
                out5r, rad_i5_g, transform,
                band_tags={"units": "W/m^2/sr/μm", "long_name": "VIIRS I05 Radiance"}
            )

            bt_i5_g = np.where(mask, bt_i5_g, np.nan)
            n_valid_i5 = int(np.count_nonzero(~np.isnan(bt_i5_g)))
            if n_valid_i5 > 0:
                i5_min = float(np.nanmin(bt_i5_g))
                i5_max = float(np.nanmax(bt_i5_g))
                i5_mean = float(np.nanmean(bt_i5_g))

            out5b = outbt / f"{stem}_I05_BT_K.tif"
            write_geotiff(
                out5b, bt_i5_g, transform,
                band_tags={"units": "K", "long_name": "VIIRS I05 Brightness Temperature"}
            )
            print(f"[OK] wrote {out5r} and {out5b}")

        # ---- Append log record ----
        dt_utc = acquisition_dt_from_name(Path(l1b_p).name)
        if dt_utc is not None:
            dt_local = dt_utc.astimezone(tz)
            acq_date_utc   = dt_utc.strftime("%Y-%m-%d")
            acq_time_utc   = dt_utc.strftime("%H:%M")
            acq_date_local = dt_local.strftime("%Y-%m-%d")
            acq_time_local = dt_local.strftime("%H:%M")
        else:
            acq_date_utc = acq_time_utc = acq_date_local = acq_time_local = ""

        append_log_l1b(log_csv, {
            "l1b_file": str(l1b_p), "geo_file": str(geo_p),
            "timestamp_key": timestamp_key(Path(l1b_p)),
            "acq_date_utc": acq_date_utc, "acq_time_utc": acq_time_utc,
            "acq_date_local": acq_date_local, "acq_time_local": acq_time_local, "local_tz": LOCAL_TZNAME,
            "has_qc_I04": int(bool(valid_i4 is not None)),  # preserves original semantics
            "has_qc_I05": int(bool(valid_i5 is not None)),
            "cloud_thresh_K": CLOUD_BT_K,
            "bt_I04_min": i4_min, "bt_I04_max": i4_max, "bt_I04_mean": i4_mean,
            "bt_I05_min": i5_min, "bt_I05_max": i5_max, "bt_I05_mean": i5_mean,
            "valid_px_I04": n_valid_i4, "valid_px_I05": n_valid_i5
        })


# ================================ Main =======================================
def main() -> None:
    """Entry point: iterate date range and process each day."""
    aoi = read_aoi_wgs84(AOI_SHP)
    for day in date_iter(START_DATE, END_DATE):
        process_day(day, aoi)
    print("[DONE] Range processed.")


if __name__ == "__main__":
    main()


In [None]:
# --------------------------- viirs_process_af.py ---------------------------
"""
VIIRS Active Fire Processor (date range; FireMask-only detection)
=================================================================

What this script does
---------------------
• Processes all days in START_DATE..END_DATE (inclusive).
• Expects dated folders created by a downloader:

    BASEDIR/YYYY-MM-DD/
        raw/
        manifest_active_fire.csv   # produced by the downloader
        af/                        # created here

• For each AF granule, outputs to BASEDIR/YYYY-MM-DD/af/:
    *_AF_DetectMask.tif   # float32, 1.0 where FireMask >= 7, NaN elsewhere (AOI-clipped)
    *_AF_FireMask.tif     # float32 FireMask values (AOI-clipped), for reference

Important
---------
• Detection uses FireMask ≥ 7 only.
• When AF lacks internal geolocation, the paired GEO file path from the manifest is used.

Dependencies
------------
pip install h5py numpy rasterio pyresample fiona shapely pyproj tzdata
"""

from __future__ import annotations

# ============================== Standard libs ================================
import os
import re
import csv
from pathlib import Path
from datetime import datetime, timedelta, timezone, date
from typing import Optional, Iterable, Tuple, List

# ============================== Third-party =================================
import numpy as np
import h5py
import rasterio
from rasterio.transform import from_bounds
from rasterio.crs import CRS
from rasterio.features import geometry_mask
from pyresample import geometry, kd_tree

import fiona
from shapely.geometry import shape, mapping
from shapely.ops import unary_union, transform as shp_transform
from pyproj import CRS as PJCRS, Transformer


# =============================== Configuration ===============================
BASEDIR     = Path(r"path\to\base\directory")
START_DATE  = date(2025, 1, 1)
END_DATE    = date(2025, 1, 11)

# AOI shapefile (any CRS); used for intersection window + final clip
AOI_SHP     = Path(r"path\to\F002_L1__IR__L2L1M0__2025-01-10T215412.018348Z_2025-04-10T154832.806087Z_97706189_MWIR_Boundary.shp")

# Output grid resolution in degrees (EPSG:4326)
GRID_RES_DEG = 0.0036  # ~400 m at equator


# ================================ Utilities ==================================
def date_iter(d0: date, d1: date) -> Iterable[date]:
    """Inclusive date iterator: d0, d0+1, …, d1."""
    cur = d0
    while cur <= d1:
        yield cur
        cur += timedelta(days=1)


def read_aoi_wgs84(shp_path: Path):
    """
    Read an AOI shapefile, union geometries, return as WGS84 geometry.

    Returns
    -------
    shapely geometry in EPSG:4326
    """
    if not shp_path.exists():
        raise FileNotFoundError(f"AOI not found: {shp_path}")
    with fiona.open(shp_path, "r") as src:
        geoms = [shape(feat["geometry"]) for feat in src]
        if not geoms:
            raise ValueError("AOI shapefile has no geometries.")
        src_crs = src.crs
    if not src_crs:
        raise ValueError("AOI shapefile has no CRS.")
    aoi = unary_union(geoms).buffer(0)
    src_crs_obj = PJCRS.from_user_input(src_crs)
    wgs84 = PJCRS.from_epsg(4326)
    if src_crs_obj != wgs84:
        transformer = Transformer.from_crs(src_crs_obj, wgs84, always_xy=True)
        aoi = shp_transform(lambda x, y, z=None: transformer.transform(x, y), aoi)
    return aoi


def find_dataset_path_case_insensitive(f: h5py.File, candidates: list[str]) -> Optional[str]:
    """
    Return the first dataset path in 'f' whose basename loosely matches any candidate name.
    Matching ignores case and non-alphanumerics.
    """
    import re as _re

    def _norm(s: str) -> str:
        return _re.sub(r"[^a-z0-9]", "", s.lower())

    target = {_norm(c): c for c in candidates}
    found: Optional[str] = None

    def visitor(name, obj):
        nonlocal found
        if found is not None:
            return
        if isinstance(obj, h5py.Dataset):
            base = name.split("/")[-1]
            if _norm(base) in target:
                found = name

    f.visititems(visitor)
    return found


def read_af_latlon(af_path: Path, paired_geo: Optional[Path]) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]:
    """
    Read latitude/longitude from AF HDF5 if available; otherwise, from paired GEO.

    Returns
    -------
    (lat, lon) arrays or (None, None) if not available.
    """
    with h5py.File(af_path, "r") as f:
        has_latlon = ("/geolocation_data/latitude" in f) and ("/geolocation_data/longitude" in f)
        if has_latlon:
            lat = f["/geolocation_data/latitude"][...].astype(np.float32)
            lon = f["/geolocation_data/longitude"][...].astype(np.float32)
        else:
            if paired_geo is None or not paired_geo.exists():
                return None, None
            with h5py.File(paired_geo, "r") as g:
                lat = g["/geolocation_data/latitude"][...].astype(np.float32)
                lon = g["/geolocation_data/longitude"][...].astype(np.float32)

    # Clamp to valid ranges
    lat[(lat < -90) | (lat > 90)] = np.nan
    lon[(lon < -180) | (lon > 180)] = np.nan
    return lat, lon


def read_firemask_only(af_path: Path) -> Optional[np.ndarray]:
    """
    Read FireMask (float32) from an AF product, handling varied dataset names.

    Returns
    -------
    FireMask array (float32) with fill values set to NaN, or None if missing.
    """
    candidates = ["FireMask", "fire_mask", "fire mask", "FP_Mask", "Mask"]
    with h5py.File(af_path, "r") as f:
        p_mask = find_dataset_path_case_insensitive(f, candidates)
        if not p_mask:
            return None
        d = f[p_mask]
        fm = d[...].astype(np.float32)
        fm_fill = d.attrs.get("_FillValue", None)
        if fm_fill is not None:
            fm[fm == fm_fill] = np.nan
        return fm


def _maybe_transpose_to_match(target_shape: tuple[int, int], arr: Optional[np.ndarray]) -> Optional[np.ndarray]:
    """If 'arr' is 2D and transposing matches 'target_shape', return transposed."""
    if arr is None:
        return None
    if arr.shape == target_shape:
        return arr
    if arr.ndim == 2 and arr.T.shape == target_shape:
        return arr.T
    return arr


def coerce_same_shape(lat: np.ndarray, lon: np.ndarray, *arrays: np.ndarray) -> Tuple[np.ndarray, np.ndarray, list[np.ndarray]]:
    """
    Ensure lat/lon and provided arrays share the same 2D shape.
    • Transposes arrays that match lat.shape when transposed.
    • Crops all to the minimal common (rows, cols).

    Returns
    -------
    (lat2, lon2, arrays2)
    """
    arrays_t = [_maybe_transpose_to_match(lat.shape, a) for a in arrays]
    shapes = [lat.shape, lon.shape] + [a.shape for a in arrays_t if a is not None]
    if any(len(s) != 2 for s in shapes):
        raise ValueError("All AF arrays must be 2D for resampling.")
    min_r = min(s[0] for s in shapes)
    min_c = min(s[1] for s in shapes)

    def crop(a: Optional[np.ndarray]) -> Optional[np.ndarray]:
        return a[:min_r, :min_c] if (a is not None and a.shape != (min_r, min_c)) else a

    lat2, lon2 = crop(lat), crop(lon)
    arrays2 = [crop(a) for a in arrays_t]
    return lat2, lon2, arrays2


def define_area_wgs84_intersection(lat: np.ndarray,
                                   lon: np.ndarray,
                                   aoi_geom,
                                   res_deg: float = GRID_RES_DEG,
                                   pad: float = 0.0):
    """
    Define a WGS84 (EPSG:4326) grid covering (swath ∩ AOI). Returns None if empty.
    """
    sw_lon_min, sw_lon_max = float(np.nanmin(lon)), float(np.nanmax(lon))
    sw_lat_min, sw_lat_max = float(np.nanmin(lat)), float(np.nanmax(lat))
    aoi_lon_min, aoi_lat_min, aoi_lon_max, aoi_lat_max = aoi_geom.bounds

    lon_min = max(sw_lon_min, aoi_lon_min) - pad
    lon_max = min(sw_lon_max, aoi_lon_max) + pad
    lat_min = max(sw_lat_min, aoi_lat_min) - pad
    lat_max = min(sw_lat_max, aoi_lat_max) + pad
    if not (lon_min < lon_max and lat_min < lat_max):
        return None

    width  = int(np.ceil((lon_max - lon_min) / res_deg))
    height = int(np.ceil((lat_max - lat_min) / res_deg))
    transform = from_bounds(lon_min, lat_min, lon_max, lat_max, width, height)

    proj_dict = {"proj": "longlat", "datum": "WGS84"}  # pyresample expects dict
    area_def = geometry.AreaDefinition(
        "wgs84", "WGS84 latlon", "epsg4326",
        proj_dict, width, height, (lon_min, lat_min, lon_max, lat_max)
    )
    return area_def, transform, width, height


def resample_swath_to_grid(lat: np.ndarray,
                           lon: np.ndarray,
                           data: np.ndarray,
                           area_def: geometry.AreaDefinition) -> np.ndarray:
    """
    Nearest-neighbor resampling of swath data to the target grid.
    """
    swath_def = geometry.SwathDefinition(lons=lon, lats=lat)
    out = kd_tree.resample_nearest(
        swath_def, data, area_def,
        radius_of_influence=5000,  # meters
        fill_value=np.nan
    )
    return out.astype(np.float32)


def write_geotiff(path: Path,
                  arr: np.ndarray,
                  transform,
                  crs=CRS.from_epsg(4326),
                  nodata=np.float32(np.nan),
                  band_tags: dict | None = None) -> None:
    """
    Write a single-band GeoTIFF (float32) with LZW compression, tiled, with optional band tags.
    """
    profile = {
        "driver": "GTiff",
        "height": arr.shape[0],
        "width": arr.shape[1],
        "count": 1,
        "dtype": rasterio.float32,
        "crs": crs,
        "transform": transform,
        "nodata": nodata,
        "compress": "lzw",
        "tiled": True,
    }
    with rasterio.open(path, "w", **profile) as dst:
        dst.write(arr.astype(profile["dtype"]), 1)
        if band_tags:
            dst.update_tags(1, **band_tags)


# ============================== Per-day worker ================================
def process_day(day: date, aoi) -> None:
    """
    Process one calendar day of AF granules:
    • Read manifest_active_fire.csv
    • For each granule: read geolocation (internal or paired GEO), read FireMask,
      compute detect mask (FireMask>=7), resample to AOI grid, AOI-clip, write GeoTIFFs.
    """
    daydir    = BASEDIR / day.strftime("%Y-%m-%d")
    rawdir    = daydir / "raw"
    afout_dir = daydir / "af"
    afout_dir.mkdir(parents=True, exist_ok=True)
    manifest_af = daydir / "manifest_active_fire.csv"

    if not rawdir.exists():
        print(f"[SKIP] No raw folder for {day}: {rawdir}")
        return
    if not manifest_af.exists():
        print(f"[SKIP] No AF manifest for {day}: {manifest_af}")
        return

    with manifest_af.open("r", newline="") as f:
        rows = list(csv.DictReader(f))
    if not rows:
        print(f"[SKIP] Empty AF manifest for {day}.")
        return

    print(f"[INFO] {day} — processing {len(rows)} AF granules …")
    for row in rows:
        product  = row["product"]
        af_path  = Path(row["file_path"])
        has_geo  = int(row["has_internal_geo"]) == 1
        geo_path = Path(row["paired_geo_path"]) if row.get("paired_geo_path") else None

        if not af_path.exists():
            print(f"[WARN] AF file missing: {af_path}")
            continue

        # --- Geolocation (internal or paired GEO) ---
        lat, lon = read_af_latlon(af_path, geo_path if not has_geo else None)
        if lat is None or lon is None:
            print(f"[WARN] No geolocation for AF {af_path.name}; skipping.")
            continue

        # --- FireMask ---
        firemask = read_firemask_only(af_path)
        if firemask is None:
            print(f"[WARN] No FireMask in {af_path.name}; skipping.")
            continue

        # Detection rule: FireMask >= 7  (binary mask as float32)
        fm_clean = firemask.copy()
        det_mask = np.isfinite(fm_clean) & (fm_clean >= 7)

        # Align shapes (lat/lon & arrays must match before resampling)
        lat2, lon2, (det2, fm2) = coerce_same_shape(
            lat, lon, det_mask.astype(np.float32), fm_clean
        )

        # --- Define output grid = (swath ∩ AOI) ---
        area = define_area_wgs84_intersection(lat2, lon2, aoi, res_deg=GRID_RES_DEG)
        if area is None:
            print("[INFO] AF swath does not intersect AOI; skipping.")
            continue
        area_def, transform, width, height = area

        # --- Resample to grid ---
        det_grid = resample_swath_to_grid(lat2, lon2, det2, area_def)
        fm_grid  = resample_swath_to_grid(lat2, lon2, fm2,  area_def)

        # --- AOI mask/clip ---
        mask_aoi = geometry_mask([mapping(aoi)], out_shape=(height, width), transform=transform, invert=True).astype(bool)
        det_grid = np.where(mask_aoi, det_grid, np.nan)
        fm_grid  = np.where(mask_aoi, fm_grid,  np.nan)

        # --- Write outputs ---
        stem    = af_path.with_suffix("").name
        out_det = afout_dir / f"{stem}_AF_DetectMask.tif"
        out_fm  = afout_dir / f"{stem}_AF_FireMask.tif"

        write_geotiff(
            out_det, det_grid, transform,
            band_tags={"units": "binary", "long_name": f"{product} Detection Mask (1=FireMask>=7)"}
        )
        print(f"[OK] wrote {out_det}")

        write_geotiff(
            out_fm, fm_grid, transform,
            band_tags={"units": "class", "long_name": f"{product} FireMask"}
        )
        print(f"[OK] wrote {out_fm}")


# ================================== Main =====================================
def main() -> None:
    aoi = read_aoi_wgs84(AOI_SHP)
    for day in date_iter(START_DATE, END_DATE):
        process_day(day, aoi)
    print("[DONE] AF range processed.")


if __name__ == "__main__":
    main()
