In [4]:
#Main Script for Feature Engineering and Deriving Master Dataset(storms_data)

import os
import re
from pathlib import Path
import pandas as pd
import geopandas as gpd
from datetime import timedelta
import numpy as np
from scipy.special import expit
from scipy.stats import spearmanr
from sklearn.neighbors import BallTree
from pathlib import Path

In [11]:
BASE_DIR = Path().resolve()

DATA_RAW = BASE_DIR / "data" / "raw"

# County list
ne_coastal = pd.read_csv(
    DATA_RAW / "northeast_counties" / "ne_coastal_counties_fips.csv"
)
# NOAA Storm Events
storms_data = pd.read_csv(
    DATA_RAW / "storm_events" / "StormEvents_2014_2022_NE_coastal.csv"
)

# EAGLE-I power outages
outage_df = pd.read_csv(
    DATA_RAW / "power_outages" / "eaglei_outages_2014_2022_NE_coastal_raw.csv"
)

# Housing units
hu20202024 = pd.read_csv(
    DATA_RAW / "housing_units" / "to_process_hu2020-2024.csv"
)

hu20102020 = pd.read_csv(
    DATA_RAW / "housing_units" / "to_process_hu2010-2020.csv",
    encoding="latin1",
)

# ERA5 storm intensity (directory)
ERA5_DIR = DATA_RAW / "storm_intensity" / "era5_NE_coastal_county_hourly"

# US road network / density
roads_file = DATA_RAW / "road_density" / "raw"

COUNTY_SHP = (
    DATA_RAW
    / "road_density"
    / "data_county_boundary"
    / "cb_2018_us_county_500k.shp"
)

counties_shape = gpd.read_file(COUNTY_SHP)

# EIA 861 (distribution circuits)
dist_sys = pd.read_excel(
    DATA_RAW / "circuit_distribution" / "Distribution_Systems_2023.xlsx"
)
serv_terr = pd.read_excel(
    DATA_RAW / "circuit_distribution" / "Service_Territory_2023.xlsx"
)
# RUCC code
rucc = pd.read_csv(
    DATA_RAW / "rucc" / "Ruralurbancontinuumcodes2023.csv",
    encoding="latin1",
)

# County Business Patterns (CBP)
cbp = pd.read_csv(
    DATA_RAW / "county_business_pattern" / "merged_coastal_counties_data.csv"
)
# Urban area shapefile
gdf_urban = (
    gpd.read_file(
        DATA_RAW
        / "shapefiles"
        / "national urban area shapefile"
        / "tl_2020_us_uac20.shp"
    )
    .to_crs("EPSG:4326")
)

In [3]:
cols_to_drop = ['LOCATION_INDEX', 'RANGE',
       'AZIMUTH', 'LOCATION', 'LAT2', 'LON2','EVENT_TYPE','INJURIES_DIRECT',
       'INJURIES_INDIRECT', 'DEATHS_DIRECT', 'DEATHS_INDIRECT',
       'DAMAGE_PROPERTY', 'DAMAGE_CROPS', 'MAGNITUDE', 'MAGNITUDE_TYPE','BEGIN_RANGE', 'BEGIN_AZIMUTH', 'END_RANGE',
       'END_AZIMUTH', 'BEGIN_LAT', 'BEGIN_LON', 'END_LAT', 'END_LON','key', 'STATE_UP', 'WFO', 'FLOOD_CAUSE', 'CZ_NAME_UP', 'CZ_TIMEZONE', 'EPISODE_ID_DET']

drop_counties = {
    "NEW LONDON",
    "FAIRFIELD",
    "MIDDLESEX",
    "NEW HAVEN",
}

mask = (
    storms_data["STATE"].eq("CONNECTICUT") &
    storms_data["CZ_NAME"].isin(drop_counties)
)

storms_data = storms_data.loc[~mask].copy()
storms_data = storms_data.drop(columns=cols_to_drop)

In [4]:
from storm_outage_after24h import match_max_outage_after24h

storms_data = match_max_outage_after24h(
    storm_df = storms_data,
    outage_df = outage_df,
)

In [5]:
from era5_storm_features_max48h import run_all_stream
storms_data = run_all_stream(storms_data)

Housing_units_datasets Overlap

In [6]:
from housing_units_process import housing_units_process
hu = housing_units_process(hu20202024, hu20102020, ne_coastal)

In [7]:
hu["fips_str"] = hu["fips_str"].astype(str).str.zfill(5)
year_cols = [c for c in hu.columns if str(c).isdigit() and 2010 <= int(c) <= 2024]

hu_long = (
    hu.melt(
        id_vars=["fips_str"],
        value_vars=year_cols,
        var_name="year",
        value_name="housing_units",
    )
)

hu_long["year"] = hu_long["year"].astype(int)
hu_long["housing_units"] = pd.to_numeric(hu_long["housing_units"], errors="coerce")
storms_data["STATE_FIPS"] = storms_data["STATE_FIPS"].astype(str).str.zfill(2)
storms_data["CZ_FIPS"]    = storms_data["CZ_FIPS"].astype(str).str.zfill(3)
storms_data["fips_str"]   = storms_data["STATE_FIPS"] + storms_data["CZ_FIPS"]

storms_data["YEAR"] = pd.to_numeric(storms_data["YEAR"], errors="coerce").astype("Int64")  # 允许缺失
storms_data = storms_data.merge(
    hu_long.rename(columns={"year": "YEAR"}),
    on=["fips_str", "YEAR"],
    how="left",
)
storms_data["max_outage_after_24h"] = pd.to_numeric(storms_data["max_outage_after_24h"], errors="coerce")
storms_data["outage_ratio"] = np.where(
    (storms_data["housing_units"].notna()) & (storms_data["housing_units"] > 0),
    storms_data["max_outage_after_24h"] / storms_data["housing_units"],
    np.nan
)

Road_datasets Overlap

In [8]:
from road_datasets_process import road_datasets_process
road_density = road_datasets_process(roads_file, ne_coastal, counties_shape)

rd = road_density.copy()
rd["fips_str"] = rd["county_fips"].astype(str).str.zfill(5)
rd = rd[["fips_str", "road_density_km_per_km2"]]
storms_data["fips_str"] = (storms_data["STATE_FIPS"].astype(str).str.zfill(2)+ storms_data["CZ_FIPS"].astype(str).str.zfill(3))
storms_data = storms_data.merge(rd,on="fips_str",how="left")

Circuits_by_road_density Overlap

In [9]:
from circuits_distribution_process import circuits_distribution_process
storms_data = circuits_distribution_process(dist_sys, serv_terr, road_density, storms_data)

  county_fips State      County  circuits_total
0       09001    CT   Fairfield      352.809755
1       09007    CT   Middlesex       93.953592
2       09009    CT   New Haven      352.156725
3       09011    CT  New London      136.459881
4       23005    ME  Cumberland       55.569240
(44, 4)


RUCC Overlap

In [10]:
rucc["fips_str"] = rucc["FIPS"].astype(str).str.zfill(5)
rucc_rucc = (rucc.loc[rucc["Attribute"] == "RUCC_2023", ["fips_str", "Value"]].rename(columns={"Value": "rucc_2023"}))
rucc_rucc["rucc_2023"] = pd.to_numeric(rucc_rucc["rucc_2023"], errors="coerce")

storms_data["STATE_FIPS"] = storms_data["STATE_FIPS"].astype(str).str.zfill(2)
storms_data["CZ_FIPS"]    = storms_data["CZ_FIPS"].astype(str).str.zfill(3)
storms_data["fips_str"]   = storms_data["STATE_FIPS"] + storms_data["CZ_FIPS"]

storms_data = storms_data.merge(rucc_rucc,on="fips_str",how="left")

Season_code

In [11]:
storms_data["season"] = storms_data['MONTH_NAME'].map({
    'December': "winter", 'January': "winter", 'February': "winter",
    'March': "spring", 'April': "spring", 'May': "spring",
    'June': "summer", 'July': "summer", 'August': "summer",
    'September': "fall", 'October': "fall", 'November': "fall",
})
storms_data["season"] = storms_data["season"].astype("category")
storms_data["season_code"] = storms_data["season"].cat.codes

UG_ratio_proxy UG_ratio_proxy

In [22]:
u = storms_data["urban_score"].to_numpy(dtype=float)
r = storms_data["rd_norm"].to_numpy(dtype=float)
C = storms_data["weighted_number_of_circuits"].to_numpy(dtype=float)
y = storms_data["outage_ratio"].to_numpy(dtype=float)

mask = np.isfinite(u) & np.isfinite(r) & np.isfinite(C) & np.isfinite(y)
u, r, C, y = u[mask], r[mask], C[mask], y[mask]

best_a, best_b = None, None
best_err = np.inf
best_rho = None

for a in range(0, 5):
    for b in range(0, 5):
        ug = expit(a * u + b * r)
        pred = (1 - ug) * C

        rho, _ = spearmanr(y, pred)
        if np.isnan(rho):
            continue

        err = 1 - rho   # 越小越好

        if err < best_err:
            best_err = err
            best_a, best_b = a, b
            best_rho = rho

In [25]:
storms_data["UG_ratio_proxy"] = expit(
    3 * storms_data["urban_score"]
  + 2 * storms_data["rd_norm"]
)

storms_data["overhead_circuits"] = (
    1 - storms_data["UG_ratio_proxy"]
) * storms_data["weighted_number_of_circuits"]


county business pattern process

In [29]:
cbp["fipstate"] = cbp["fipstate"].astype(str).str.zfill(2)
cbp["fipscty"]  = cbp["fipscty"].astype(str).str.zfill(3)
cbp["full_fips"] = cbp["fipstate"] + cbp["fipscty"]

cbp = cbp[~cbp["full_fips"].str.startswith("09")].copy()

cbp["emp"] = pd.to_numeric(cbp["emp"], errors="coerce").fillna(0)

cbp_county = (
    cbp.groupby("full_fips", as_index=False)["emp"]
       .sum()
       .rename(columns={"emp": "cbp_emp_total"})
)

storms_data["full_fips"] = storms_data["full_fips"].astype(str).str.zfill(5)

storms_data = storms_data.merge(
    cbp_county,
    on="full_fips",
    how="left"
)

In [31]:
storms_data = storms_data[storms_data["YEARMONTH"] >= 201501].copy()

In [39]:
era_cols = [
    "era_i10fg_max_total_48h",
    "era_tp_max_total_48h",
    "era_crr_max_total_48h",
]

storms_data[era_cols] = (
    storms_data
    .groupby("full_fips")[era_cols]
    .transform(lambda x: x.fillna(x.median()))
)


point overlap for the small counties

In [None]:
from small_county_ERA5_overlap import small_county_ERA5_overlap
storms_data = small_county_ERA5_overlap(ERA5_DIR, storms_data)

npoints, nurban and urban ratio

In [4]:
from strom_impact_location_exposure import strom_impact_location_exposure
storms_data = strom_impact_location_exposure(gdf_urban, storms_data)

In [69]:
COUNTIES_SHP = r"F:\Storm Outage Modeling\data\raw\shapefiles\ne_counties\NE_coastal_counties.shp"
gdf = gpd.read_file(COUNTIES_SHP)
gdf["CZ_FIPS"] = gdf["GEOID"].astype(int)
gdf["area_km2"] = gdf["ALAND"] / 1e6
county_area_df = gdf[["CZ_FIPS", "area_km2"]].copy()

baseline construction

In [None]:
from baseline_outage_construction import baseline_outage_construction
storms_data = baseline_outage_construction(storms_data, outage_df)

In [None]:
storms = storms_data[["CZ_FIPS", "BEGIN_DATE_TIME", "END_DATE_TIME"]].copy()
storms["BEGIN_DATE_TIME"] = pd.to_datetime(storms["BEGIN_DATE_TIME"])
storms["END_DATE_TIME"] = pd.to_datetime(storms["END_DATE_TIME"])
storms["CZ_FIPS"] = storms["CZ_FIPS"].astype(int)

outage = outage_df[["fips_code", "run_start_time", "sum"]].copy()
outage = outage.rename(columns={"fips_code": "CZ_FIPS"})
outage["CZ_FIPS"] = outage["CZ_FIPS"].astype(int)
outage["run_start_time"] = pd.to_datetime(outage["run_start_time"])


def compute_baseline_for_county(df_out, df_storm):
    if df_storm.empty:
        return df_out["sum"].median()

    t = df_out["run_start_time"].values
    is_storm = np.zeros(len(df_out), dtype=bool)

    for _, r in df_storm.iterrows():
        is_storm |= (t >= r["BEGIN_DATE_TIME"]) & (t <= r["END_DATE_TIME"])

    baseline_vals = df_out.loc[~is_storm, "sum"]

    if baseline_vals.empty:
        return np.nan

    return baseline_vals.median()

baseline_records = []

for cz, df_out_c in outage.groupby("CZ_FIPS"):
    df_storm_c = storms.loc[storms["CZ_FIPS"] == cz]
    baseline = compute_baseline_for_county(df_out_c, df_storm_c)

    baseline_records.append(
        {
            "CZ_FIPS": cz,
            "baseline_outage_median": baseline,
        }
    )
baseline_df = pd.DataFrame(baseline_records)
baseline_df = baseline_df.copy()
baseline_df.columns = baseline_df.columns.str.strip()

storms_data = storms_data.copy()
storms_data.columns = storms_data.columns.str.strip()

baseline_df["fips5"] = baseline_df["CZ_FIPS"].astype(str).str.zfill(5)

candidate_cols = [c for c in baseline_df.columns if c not in {"CZ_FIPS", "fips5"}]


preferred = [c for c in candidate_cols if "baseline" in c.lower() or "median" in c.lower()]
baseline_col = preferred[0] if preferred else candidate_cols[0]


if "full_fips" in storms_data.columns:
    storms_data["fips5"] = pd.to_numeric(storms_data["full_fips"], errors="coerce").astype("Int64").astype(str).str.zfill(5)
elif "fips_str" in storms_data.columns:
    storms_data["fips5"] = storms_data["fips_str"].astype(str).str.extract(r"(\d+)")[0].str.zfill(5)
else:
    raise KeyError("no full_fips or fips_str")

tmp = baseline_df[["fips5", baseline_col]].rename(columns={baseline_col: "baseline_outage_median"})

if "baseline_outage_median" in storms_data.columns:
    storms_data = storms_data.drop(columns=["baseline_outage_median"])

storms_data = storms_data.merge(
    tmp,
    on="fips5",
    how="left",
    validate="many_to_one",
)

In [5]:
import numpy as np
import pandas as pd

def _first_stable_off_time(times: np.ndarray,
                           outage: np.ndarray,
                           baseline: float,
                           start_idx: int,
                           stable_steps: int):

    if start_idx >= len(outage):
        return pd.NaT

    if stable_steps <= 1:
        j = np.where(outage[start_idx:] <= baseline)[0]
        if len(j) == 0:
            return pd.NaT
        return pd.Timestamp(times[start_idx + j[0]])

    ok = (outage <= baseline)
    ok2 = ok[start_idx:]

    if len(ok2) < stable_steps:
        return pd.NaT

    sums = np.convolve(ok2.astype(int), np.ones(stable_steps, dtype=int), mode="valid")
    k = np.where(sums == stable_steps)[0]
    if len(k) == 0:
        return pd.NaT

    off_idx = start_idx + int(k[0])
    return pd.Timestamp(times[off_idx])


def add_outage_duration_by_baseline(
    storms_data: pd.DataFrame,
    outage_df: pd.DataFrame,
    baseline_df: pd.DataFrame,
    *,
    # key / columns
    storm_begin_col: str = "BEGIN_DATE_TIME",
    storm_end_col: str = "END_DATE_TIME",
    outage_time_col: str = "run_start_time",
    outage_value_col: str = "sum",
    outage_fips_col: str = "fips_code",
    baseline_fips_col: str = "CZ_FIPS",
    baseline_value_col: str = "baseline_outage_median",
    # search / stability params
    post_hours: int = 72,
    pre_hours: int = 0,
    stable_steps: int = 4,
) -> pd.DataFrame:

    s = storms_data.copy()
    o = outage_df.copy()
    b = baseline_df.copy()

    s.columns = s.columns.str.strip()
    o.columns = o.columns.str.strip()
    b.columns = b.columns.str.strip()

    if "full_fips" in s.columns:
        s["fips5"] = (
            pd.to_numeric(s["full_fips"], errors="coerce")
            .astype("Int64")
            .astype(str)
            .str.zfill(5)
        )
    elif "fips_str" in s.columns:
        s["fips5"] = (
            s["fips_str"].astype(str).str.extract(r"(\d+)")[0].str.zfill(5)
        )
    elif "CZ_FIPS" in s.columns:
        s["fips5"] = (
            pd.to_numeric(s["CZ_FIPS"], errors="coerce")
            .astype("Int64")
            .astype(str)
            .str.zfill(5)
        )
    else:
        raise KeyError("no county FIPS")

    o[outage_time_col] = pd.to_datetime(o[outage_time_col])
    o["fips5"] = (
        pd.to_numeric(o[outage_fips_col], errors="coerce")
        .astype("Int64")
        .astype(str)
        .str.zfill(5)
    )
    o[outage_value_col] = pd.to_numeric(o[outage_value_col], errors="coerce")

    if baseline_value_col not in b.columns:
        raise KeyError(f"baseline_df 缺少列 {baseline_value_col}")

    b["fips5"] = (
        pd.to_numeric(b[baseline_fips_col], errors="coerce")
        .astype("Int64")
        .astype(str)
        .str.zfill(5)
    )
    if "baseline_outage_median" in s.columns:
        s = s.drop(columns=["baseline_outage_median"])

    s = s.merge(
        b[["fips5", baseline_value_col]].rename(columns={baseline_value_col: "baseline_outage_median"}),
        on="fips5",
        how="left",
        validate="many_to_one",
    )

    s[storm_begin_col] = pd.to_datetime(s[storm_begin_col])
    s[storm_end_col] = pd.to_datetime(s[storm_end_col])
    o = o.dropna(subset=["fips5", outage_time_col]).sort_values(["fips5", outage_time_col])
    outage_groups = {k: df for k, df in o.groupby("fips5", sort=False)}

    pre_delta = pd.Timedelta(hours=pre_hours)
    post_delta = pd.Timedelta(hours=post_hours)

    t_on_list, t_off_list, dur_hours_list = [], [], []
    for _, row in s.iterrows():
        fips5 = row["fips5"]
        O0 = row["baseline_outage_median"]
        t0 = row[storm_begin_col]
        t1 = row[storm_end_col]

        if pd.isna(O0) or pd.isna(t0) or pd.isna(t1) or (fips5 not in outage_groups):
            t_on_list.append(pd.NaT)
            t_off_list.append(pd.NaT)
            dur_hours_list.append(np.nan)
            continue

        df_out = outage_groups[fips5]

        win_start = t0 - pre_delta
        win_end = t1 + post_delta
        df_win = df_out.loc[(df_out[outage_time_col] >= win_start) & (df_out[outage_time_col] <= win_end)]

        if df_win.empty:
            t_on_list.append(pd.NaT)
            t_off_list.append(pd.NaT)
            dur_hours_list.append(np.nan)
            continue

        times = df_win[outage_time_col].to_numpy()
        vals = df_win[outage_value_col].to_numpy()

        # t_on
        on = np.where(vals > float(O0))[0]
        if len(on) == 0:
            t_on_list.append(pd.NaT)
            t_off_list.append(pd.NaT)
            dur_hours_list.append(0.0)
            continue

        on_idx = int(on[0])
        t_on = pd.Timestamp(times[on_idx])

        # t_off
        t_off = _first_stable_off_time(
            times=times,
            outage=vals,
            baseline=float(O0),
            start_idx=on_idx + 1,
            stable_steps=stable_steps,
        )

        if pd.isna(t_off):
            duration_hours = np.nan
        else:
            duration_hours = (t_off - t_on) / pd.Timedelta(hours=1)

        t_on_list.append(t_on)
        t_off_list.append(t_off)
        dur_hours_list.append(duration_hours)

    s["t_on"] = t_on_list
    s["t_off"] = t_off_list
    s["duration_hours"] = dur_hours_list

    return s

storms_data = add_outage_duration_by_baseline(storms_data, outage_df, baseline_df)

In [None]:
import geopandas as gpd
counties = ne_coastal.copy()
print(counties.crs)
counties_eq = counties.to_crs("EPSG:5070") 
counties_eq["county_area_km2"] = counties_eq.geometry.area / 1e6
area_df = (
    counties_eq[["GEOID", "county_area_km2"]]
    .rename(columns={"GEOID": "CZ_FIPS"})
)

In [117]:
storms_data['housing_units_by_area'] = storms_data['housing_units']/storms_data['county_area_km2']

In [119]:
storms_data['cbp_emp_total'] = storms_data['cbp_emp_total']/storms_data['county_area_km2']

In [120]:
storms_data.to_csv(r'F:\Storm Outage Modeling\storms_data.csv')