### DATASCI 207: Final Project - Team 1 ###
## Data Cleaning, Feature Engineering, & Train/Val/Test Split (with Polars) ##

In [1]:
# imports
from pathlib import Path
from sklearn.model_selection import train_test_split
import numpy as np
import pandas as pd
import duckdb
import polars as pl
import urllib.request, hashlib

#### 1) Download Data Source Programatically ####
We are using the FPA FOD-Attributes Dataset (https://zenodo.org/records/8381129).<br>
To prevent RAM issues, we download the yearly datasets to avoid ingesting the full 5GB dataset at once. 

In [None]:
# base = "https://zenodo.org/records/8381129/files"
# years = range(1992, 2021)
# raw = Path("../data/raw"); raw.mkdir(parents=True, exist_ok=True)

# def fetch(url, dest):
#     if dest.exists(): return
#     with urllib.request.urlopen(url) as r, open(dest, "wb") as f:
#         f.write(r.read())

# for y in years:
#     url = f"{base}/{y}_FPA_FOD_cons.csv?download=1"
#     dest = raw / f"{y}_FPA_FOD_cons.csv"
#     fetch(url, dest)

## We'll skip the 5GB combined file during development
## NOTE NOTE NOTE: only uncomment the below to download the 5GB combined file
### Only if needed # fetch(f"{base}/FPA_FOD_Plus.csv?download=1", raw/"FPA_FOD_Plus.csv")


#### 2) Ingest Dataset in One Pass, Clean, & Create Parquet

Note: the polars library can create Parquet files from CSVs. We use Parquet to enable later EDA/models to be much faster. It can selectively load only the columns we test, and DuckDB/Polars can query it in place. https://pola.rs/

In [15]:
raw = Path("../data/raw")
interim = Path("../data/interim"); interim.mkdir(parents=True, exist_ok=True)

# Columns we need immediately; add more later as features
KEEP = [
    "FOD_ID","FIRE_YEAR","DISCOVERY_DATE","FIRE_SIZE","FIRE_SIZE_CLASS",
    "LATITUDE","LONGITUDE","STATE","NWCG_CAUSE_CLASSIFICATION",
    "NWCG_GENERAL_CAUSE"
]

scan = pl.concat([
    pl.scan_csv(str(p), infer_schema_length=5000, ignore_errors=True)
      .select([c for c in KEEP if c in pl.scan_csv(str(p)).columns])
    for p in sorted(raw.glob("*_FPA_FOD_cons.csv"))
])

df = (
    scan
    .with_columns([
        pl.col("FIRE_YEAR").cast(pl.Int32),
        pl.col("FIRE_SIZE").cast(pl.Float64),
        pl.col("DISCOVERY_DATE").str.strptime(pl.Date, strict=False, exact=False),
        pl.col("FIRE_SIZE_CLASS").cast(pl.Categorical),
        pl.col("LONGITUDE").cast(pl.Float64),
        pl.col("LATITUDE").cast(pl.Float64),
    ])
    .filter(pl.col("FIRE_SIZE").is_not_null())     # drop missing size
    .filter(pl.col("FIRE_SIZE") > 0)               # drop zeros/non-sense for regression
)

df.sink_parquet(f"{interim}/fpa_core.parquet", compression="zstd", statistics=True)

  .select([c for c in KEEP if c in pl.scan_csv(str(p)).columns])


#### 3) Apply Fire Size Filter (MTBS) & Create Helpful Fields

Will use the Monitoring Trends in Burn Severity (**MTBS**) threshold of **"1,000 acres or greater in the western United States and 500 acres or greater in the eastern Unites States."** https://www.mtbs.gov/<br>

Note: An alternative is the National Incident Management Situation Report (**ISMR**) definition for **"large wildland fires",** which are defined as **100+ acres in timber fuel models** or **300+ acres in grass fuel models**. https://www.nifc.gov/sites/default/files/NICC/1-Incident%20Information/IMSR/Understanding%20the%20IMSR%202024.pdf

In [16]:
core = pl.scan_parquet("../data/interim/fpa_core.parquet")

is_west = pl.col("LONGITUDE") <= -100  # ~100°W proxy for MTBS West/East
outpath = Path("../data/interim/fpa_core_filtered.parquet")

# isolate big / large fires (West >= 1000; East >= 500)
df_big = (
    core
    # stage 1: create helper columns
    .with_columns([
        pl.when(is_west).then(pl.lit(1000.0)).otherwise(500.0).alias("size_threshold"),
        pl.when(is_west).then(pl.lit("west")).otherwise(pl.lit("east")).alias("region"),
        pl.col("FIRE_SIZE").log1p().alias("log_fire_size"),
    ])
    # stage 2: can now use size_threshold
    .with_columns([
        (pl.col("FIRE_SIZE") >= pl.col("size_threshold")).alias("is_large_mtbs")
    ])
    .filter(pl.col("is_large_mtbs"))
    .drop("is_large_mtbs")
)

# stream to Parquet without materializing in RAM
df_big.sink_parquet(str(outpath), compression="zstd")

In [5]:
#print(df_big_lf.explain())
print(core.schema)

Schema([('FOD_ID', Int64), ('FIRE_YEAR', Int32), ('DISCOVERY_DATE', Date), ('FIRE_SIZE', Float64), ('FIRE_SIZE_CLASS', Categorical), ('LATITUDE', Float64), ('LONGITUDE', Float64), ('STATE', String), ('NWCG_CAUSE_CLASSIFICATION', String), ('NWCG_GENERAL_CAUSE', String)])


  print(core.schema)


#### Split 60/20/20, Stratified within Each Year (by size bins) ### 

In [None]:
df = pl.read_parquet("../data/interim/fpa_core_filtered.parquet").to_pandas()

rng = 207  # reproducible seed

parts = []
for year, g in df.groupby("FIRE_YEAR"):
    # make stratification bins (quantiles of log size)
    q = pd.qcut(g["log_fire_size"], q=np.clip(len(g)//50, 3, 10), duplicates="drop")
    # 60 / 40
    g_train, g_temp, q_train, q_temp = train_test_split(
        g, q, test_size=0.4, random_state=rng, stratify=q
    )
    # then 50/50 of the 40 -> 20/20
    q_temp2 = pd.qcut(g_temp["log_fire_size"], q=len(q.unique()), duplicates="drop")
    g_val, g_test = train_test_split(
        g_temp, test_size=0.5, random_state=rng, stratify=q_temp2
    )
    g_train["split"] = "train"; g_val["split"] = "val"; g_test["split"] = "test"
    parts += [g_train, g_val, g_test]

split_df = pd.concat(parts, ignore_index=True)

# sanity checks
assert set(split_df["split"]) == {"train","val","test"}
check = split_df.groupby(["FIRE_YEAR","split"]).size().unstack().fillna(0)
print(check.head()) 
# print(check)

# write out
out = Path("../data/processed"); out.mkdir(parents=True, exist_ok=True)
split_df.query("split=='train'").to_parquet(out/"train.parquet")
split_df.query("split=='val'").to_parquet(out/"val.parquet")
split_df.query("split=='test'").to_parquet(out/"test.parquet")

split      test  train  val
FIRE_YEAR                  
1992         62    184   61
1993         56    167   56
1994        126    376  125
1995         75    223   75
1996        132    394  131


In [None]:
split_df.loc[split_df['split'] == 'train']

Unnamed: 0,FOD_ID,FIRE_YEAR,DISCOVERY_DATE,FIRE_SIZE,FIRE_SIZE_CLASS,LATITUDE,LONGITUDE,STATE,NWCG_CAUSE_CLASSIFICATION,NWCG_GENERAL_CAUSE,size_threshold,region,log_fire_size,split
0,1360316,1992,1992-08-19,1600.0,F,38.918889,-121.293056,CA,Human,Equipment and vehicle use,1000.0,west,7.378384,train
1,19960027,1992,1992-05-14,2160.0,F,33.463406,-103.078529,NM,Missing data/not specified/undetermined,Missing data/not specified/undetermined,1000.0,west,7.678326,train
2,47165,1992,1992-08-18,1095.0,F,32.916667,-116.871667,CA,Human,Missing data/not specified/undetermined,1000.0,west,6.999422,train
3,183324,1992,1992-10-22,1705.0,F,43.026400,-100.901800,SD,Human,Arson/incendiarism,1000.0,west,7.441907,train
4,369399,1992,1992-04-30,1435.0,F,45.400000,-97.316667,SD,Missing data/not specified/undetermined,Missing data/not specified/undetermined,500.0,east,7.269617,train
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
17533,400648603,2020,2020-11-17,747.0,E,30.483061,-86.841116,FL,Human,Missing data/not specified/undetermined,500.0,east,6.617403,train
17534,400628211,2020,2020-12-02,6687.4,G,33.744705,-117.636572,CA,Human,Missing data/not specified/undetermined,1000.0,west,8.808130,train
17535,400640200,2020,2020-07-13,3226.0,F,39.762136,-108.418611,CO,Natural,Natural,1000.0,west,8.079308,train
17536,400633675,2020,2020-07-11,6510.0,G,35.755306,-101.141309,TX,Natural,Natural,1000.0,west,8.781248,train


In [14]:
split_df.dtypes

FOD_ID                                int64
FIRE_YEAR                             int32
DISCOVERY_DATE               datetime64[ms]
FIRE_SIZE                           float64
FIRE_SIZE_CLASS                    category
LATITUDE                            float64
LONGITUDE                           float64
STATE                                object
NWCG_CAUSE_CLASSIFICATION            object
NWCG_GENERAL_CAUSE                   object
size_threshold                      float64
region                               object
log_fire_size                       float64
split                                object
dtype: object