In [18]:
# ------------------ POLARS: positional CFL / Hazard pipeline ------------------
import os
from typing import Dict, Iterable, Optional, Tuple, List
import numpy as np
import polars as pl

# Optional (only for raster step)
try:
    import rasterio
    from rasterio.transform import Affine
except Exception:
    rasterio, Affine = None, None

# Fixed midpoints (feet) for the 6 flame-length bins
FL_MIDPOINTS_FT = (1.0, 3.0, 5.0, 7.0, 10.0, 14.0)

In [19]:
# ---- helpers ---------------------------------------------------------------

def _header_length(csv_path: str) -> int:
    tmp = pl.read_csv(csv_path, n_rows=0, has_header=True, ignore_errors=True)
    return len(tmp.columns)

def _positional_names(n_cols: int) -> List[str]:
    base = ["XPos","YPos","PBurn","FIL1","FIL2","FIL3","FIL4","FIL5","FIL6"]
    if n_cols <= len(base):
        return base[:n_cols]
    return base + [f"COL{i}" for i in range(10, 10 + (n_cols - len(base)))]

def _clean_num(colname: str) -> pl.Expr:
    # strip NBSP/BOM/newlines/tabs/spaces and any non [0-9 e/E + - .]
    return (
        pl.col(colname)
        .cast(pl.Utf8, strict=False)
        .str.replace_all(r"[\u00A0\uFEFF]", "")       # NBSP/BOM
        .str.replace_all(r"[^\dEe+\-\.]", "")         # keep digits, e/E, sign, dot
        .str.strip_chars()
        .cast(pl.Float64, strict=False)
    )

In [20]:
# ---- core processing -------------------------------------------------------

def process_csv_polars_positional(csv_path: str,
                                  midpoints_ft: Iterable[float] = FL_MIDPOINTS_FT
                                  ) -> pl.DataFrame:
    n_cols = _header_length(csv_path)
    new_cols = _positional_names(n_cols)

    # Force canonical names by position; explicit comma separator & standard quoting
    lf = pl.scan_csv(
        csv_path,
        has_header=True,
        new_columns=new_cols,   # positional override
        separator=",",
        quote_char='"',
        ignore_errors=True,
    )

    # Clean/cast just the needed numeric columns (keep all others intact)
    need_num = [c for c in ("XPos","YPos","PBurn","FIL1","FIL2","FIL3","FIL4","FIL5","FIL6") if c in new_cols]
    lf = lf.with_columns([_clean_num(c).alias(c) for c in need_num])

    # Build CFL expression once, and build hazard from the SAME expression (no alias lookup)
    fil_cols = [c for c in ("FIL1","FIL2","FIL3","FIL4","FIL5","FIL6") if c in new_cols]
    weights  = list(midpoints_ft)[:len(fil_cols)]
    cfl_expr = pl.sum_horizontal([pl.col(c) * w for c, w in zip(fil_cols, weights)]) if fil_cols else pl.lit(None)
    hazard_expr = (pl.sum_horizontal([pl.col(c) * w for c, w in zip(fil_cols, weights)]) * pl.col("PBurn")) if fil_cols else pl.lit(None)

    out = (
        lf.with_columns([
            cfl_expr.alias("CFL_ft"),
            hazard_expr.alias("hazard"),
        ])
        .collect(engine="streaming")
    )
    return out

In [21]:
def batch_process_folder_polars_positional(
    root_dir: str,
    pattern_filename: str = "FLP_English.csv",
    save_parquet: bool = False,
    parquet_out_dir: Optional[str] = None,
) -> Dict[str, pl.DataFrame]:
    results: Dict[str, pl.DataFrame] = {}
    for current_dir, _, files in os.walk(root_dir):
        for f in files:
            if f == pattern_filename:
                csv_path = os.path.join(current_dir, f)
                print(f"[RUN] Processing folder: {current_dir}")
                try:
                    df_pl = process_csv_polars_positional(csv_path)
                except Exception as e:
                    print(f"[WARN] Skipping {csv_path}: {e}")
                    continue
                results[csv_path] = df_pl
                if save_parquet:
                    if parquet_out_dir is None:
                        parquet_out_dir = os.path.join(root_dir, "_CFL_parquet")
                    os.makedirs(parquet_out_dir, exist_ok=True)
                    rel = os.path.relpath(current_dir, root_dir).replace(os.sep, "_")
                    out_path = os.path.join(parquet_out_dir, f"CFL_hazard_{rel or 'root'}.parquet")
                    df_pl.write_parquet(out_path)
                    print(f"[OK] Wrote {out_path}")
    return results

In [22]:
# ---- rasterization ------------------------------------------

def rasterize_from_polars(
    df_pl: pl.DataFrame,
    out_path: str,
    crs_epsg: Optional[int] = None,
    x_col: str = "XPos",
    y_col: str = "YPos",
    value_col: str = "hazard",   # default: rasterize hazard
    nodata: float = np.nan,
) -> str:
    """
    Convert a Polars result (XPos, YPos, and value_col) to a single-band GeoTIFF.
    """
    if rasterio is None or Affine is None:
        raise RuntimeError("Install 'rasterio' to write rasters.")

    import pandas as pd

    def _infer_grid_params(df_xy: pd.DataFrame, x: str, y: str) -> Tuple[np.ndarray, np.ndarray, float, float]:
        xs = np.sort(df_xy[x].unique())
        ys = np.sort(df_xy[y].unique())
        if len(xs) < 2 or len(ys) < 2:
            raise ValueError("Not enough unique X/Y positions to form a grid.")
        dxs = np.diff(xs); dys = np.diff(ys)
        # modal step (robust to occasional gaps)
        def modal_step(arr: np.ndarray) -> float:
            vals, counts = np.unique(np.round(arr, 6), return_counts=True)
            return float(vals[np.argmax(counts)])
        return xs, ys, modal_step(dxs), modal_step(dys)

    df_pd = df_pl.to_pandas()

    xs, ys, dx, dy = _infer_grid_params(df_pd, x_col, y_col)
    x_to_idx = {x: i for i, x in enumerate(xs)}
    y_to_idx = {y: i for i, y in enumerate(ys)}

    ncols, nrows = len(xs), len(ys)
    arr = np.full((nrows, ncols), np.nan, dtype=np.float32)

    for _, r in df_pd.iterrows():
        ci = x_to_idx[r[x_col]]
        ri = (nrows - 1 - y_to_idx[r[y_col]])  # top row = max Y
        arr[ri, ci] = float(r[value_col])

    min_x, max_y = xs.min(), ys.max()
    transform = Affine.translation(min_x - dx/2.0, max_y + dy/2.0) * Affine.scale(dx, -dy)

    profile = dict(
        driver="GTiff", height=nrows, width=ncols, count=1, dtype="float32",
        transform=transform, compress="lzw", nodata=nodata, tiled=True, interleave="band",
        crs=(f"EPSG:{crs_epsg}" if crs_epsg is not None else None),
    )
    os.makedirs(os.path.dirname(out_path), exist_ok=True)
    with rasterio.open(out_path, "w", **profile) as dst:
        dst.write(arr, 1)
    return out_path

In [None]:
root_dir = r"C:\Users\bsf31\Documents\data\NL060\WFM Outputs\run_97th_percentiles"


In [23]:
# 1) Process all subfolders (prints progress), keep results in memory
results = batch_process_folder_polars_positional(root_dir, save_parquet=False)  # Parquet optional

[RUN] Processing folder: C:\Users\bsf31\Documents\data\NL060\WFM Outputs\run_97th_percentiles\Central_Coast
[RUN] Processing folder: C:\Users\bsf31\Documents\data\NL060\WFM Outputs\run_97th_percentiles\Cuyama
[RUN] Processing folder: C:\Users\bsf31\Documents\data\NL060\WFM Outputs\run_97th_percentiles\Santa_Ynez
[RUN] Processing folder: C:\Users\bsf31\Documents\data\NL060\WFM Outputs\run_97th_percentiles\South_Coast_E
[RUN] Processing folder: C:\Users\bsf31\Documents\data\NL060\WFM Outputs\run_97th_percentiles\South_Coast_W


In [None]:
# one result (XPos, YPos, PBurn, CFL_ft, hazard)
any_df = next(iter(results.values()))
any_df

XPos,YPos,PBurn,FIL1,FIL2,FIL3,FIL4,FIL5,FIL6,CFL_ft,hazard
f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64
164481.34375,3.882655e6,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
164511.34375,3.882655e6,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
164541.34375,3.882655e6,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
164571.34375,3.882655e6,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
164601.34375,3.882655e6,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
…,…,…,…,…,…,…,…,…,…,…
217611.34375,3.820255e6,0.007917,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
217641.34375,3.820255e6,0.007917,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
217671.34375,3.820255e6,0.007917,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
217701.34375,3.820255e6,0.007917,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [27]:


# 2) rasterize HAZARD per folder
for csv_path, df_pl in results.items():
    # Name output by folder
    rel = os.path.relpath(os.path.dirname(csv_path), root_dir).replace(os.sep, "_")
    out_tif = os.path.join(root_dir, "_CFL_rasters", f"hazard_{rel or 'root'}.tif")
    rasterize_from_polars(df_pl, out_tif, crs_epsg=26911)  