In [None]:
import os
import sys
import glob
import code
import json
import scipy
import difflib
import datetime
import numpy as np
import pandas as pd
import geopandas as gpd
import matplotlib.pyplot as plt
from shapely.geometry import Point
from shapely.ops import unary_union
from scipy.stats.mstats import winsorize


#--- Unity
pous_data_dir = "/POUS"
stormdata_dir = "/USA_storms_data"
counties_shapefile = "cb_2018_us_county_500k.shp"
eia_data_dir = "/EIA_reliability_2016_2021"
output_dir = "/POUS_and_storms"

#----
pd.set_option('display.max_columns', None)

In [None]:
# d = np.load(file=open("/gypsum/eguide/projects/zshah/data/NL_OutageDetection/training_data/USA/outline/USA_48201_outline.npz","rb"))["outline"]
# plt.imshow(d)

In [None]:
"""
Load extra data
"""
# load mapping of county names and FIPS to states
county2fips = pd.read_csv("/gypsum/eguide/projects/zshah/data/USA_fips2county/fips2county.tsv", delimiter="\t")
county2fips = county2fips[["CountyFIPS","CountyName","StateName"]].drop_duplicates()
county2fips["StateName"] = county2fips["StateName"].str.replace(r'[^A-Za-z]+', '', regex=True).str.lower()
county2fips["CountyName"] = county2fips["CountyName"].str.replace(r'[^A-Za-z]+', '', regex=True).str.lower()
county2fips["CountyFIPS"] = county2fips["CountyFIPS"].apply(lambda x: str(x).zfill(5))

# dictionary of state to list of county names
state2county = county2fips.groupby(["StateName"])
state2county = {key: group["CountyName"].tolist() for key, group in state2county}

In [None]:
print("--*--"*30, flush=True)
print("Loading and processing storms data.", flush=True)
#

"""
STEP 0: use county shapefile and NWPS Zones file to map zone numbers to countyFIPS
Reason: some entries in the storm dataset have zone numbers whereas some entries have countyFIPS. For consistency, we convert everything to countyFIPS.
"""
# process zones dataframe (updated z_22mr22.shp to z_08mr23.shp)
crs = "EPSG:4326" # NOTE: works only in usa_env conda environment. Newer versions of geopandas might raise an error
zones = gpd.read_file(stormdata_dir + "/NWPS_zones/z_08mr23.shp")[["STATE","STATE_ZONE","geometry"]]
zones = zones.to_crs(crs)

# sjoin with counties data (one zone can intersect with multiple counties)
counties = gpd.read_file(counties_shapefile)[["STATEFP","COUNTYFP","geometry"]]
counties = counties.to_crs(crs)
counties.loc[:,"CountyFIPS"] = counties.loc[:,"STATEFP"] + counties.loc[:,"COUNTYFP"]
counties["county_geom"] = counties["geometry"]

# Perform spatial join using 'intersects'
z2c = gpd.sjoin(zones, counties, op="intersects", how="inner")

# Calculate the area of the intersection between each zone and county
z2c["intersection_area"] = z2c.geometry.intersection(z2c["county_geom"]).area

# Filter out matches with minimal overlap/intersection (threshold was manually selected looking at Los Angeles county and the corresponding zones)
z2c = z2c[z2c.intersection_area >= 1e-3]

# Add zones that lie within a county but might be eliminated due to small self-area
z2c_within = gpd.sjoin(zones, counties, op="within", how="inner")

# combine the two dataframes
z2c = pd.concat([z2c, z2c_within]).reset_index(drop=True)

# find direct mapping of state shortname to state fips.
# since some zones intersect multiple states, stateFP and state_zone mapping needs to be cleaned first
state2fips = z2c[["STATE","STATEFP"]].drop_duplicates()
state2fips = state2fips.groupby("STATE")["STATEFP"].apply(lambda x: x.mode()[0])

# merge this with the z2c dataframe
z2c = z2c[["STATE","STATE_ZONE","CountyFIPS"]].drop_duplicates()
z2c = pd.merge(z2c, state2fips, on=["STATE"], how="left")

# reformat STATEZONE column from: (statename + zone number) format ---> (statefips + zone number) format
# this is to ensure compatibility with the NOAA storms database
z2c["ZoneFIPS"] = z2c.apply(lambda x: x["STATEFP"] + x["STATE_ZONE"][2:], axis=1)
z2c["CZ_TYPE"] = "Z"

display(z2c)
    
# z2c["State"] = z2c["STATE_ZONE"].apply(lambda x: x[0:2])
# # NV gets 04 (AZ) STATEFP instead of 32; MO Missouri gets 05 (AR) instead of 29 in some cases. Centroids of these confused souls lie in an out-of-state county. We ignore these points for simplicity.
# condition1 = ((z2c["State"]=="NV") & (z2c["STATEFP"]=="04"))
# condition2 = ((z2c["State"]=="MO") & (z2c["STATEFP"]=="05"))
# z2c = z2c[(~condition1) & (~condition2)].reset_index(drop=True)
# #
# """
# STEP 1: merge all the stormevents csvs [cols_of_interest]
# """
# all_files = glob.glob(stormdata_dir + "/*.csv")
# ds = []
# for file in all_files:
#     temp = pd.read_csv(file)
#     ds.append(temp)
#     del(temp)
# ds = pd.concat(ds)
# ds.loc[:,"STATE"] = ds["STATE"].str.replace(r'\W', '', regex=True).str.lower()
# #
# """
# STEP 2: assign countyFIPS to all entries in CZ_TYPE == "z" in the storms dataframe.
# NOTE: we use both, CZ_TYPE == "c" and "z" in this work, where "c" represents county and "z" represents zone.
# """
# # create a CountyFIPS column in storms data using CZ_FIPS and STATE_FIPS
# ds[["CZ_FIPS","STATE_FIPS"]] = ds[["CZ_FIPS","STATE_FIPS"]].astype(str)
# ds.loc[:,"CZ_FIPS"] = ds["CZ_FIPS"].apply(lambda x: x.zfill(3))
# ds.loc[:,"CZ_FIPS"] = (ds["STATE_FIPS"] + ds["CZ_FIPS"]).astype(int)
# ds = pd.merge(ds, z2c[["CountyFIPS","ZoneFIPS"]], left_on="CZ_FIPS", right_on="ZoneFIPS", how="left")
# ds["CountyFIPS"] = ds.apply(lambda x: x["CZ_FIPS"] if x["CZ_TYPE"]=="C" else x["CountyFIPS"] if x["CZ_TYPE"]=="Z" else np.nan, axis=1)

In [None]:
"""
Load storm data and add the mapping of zone to county
"""
cols_of_interest = ["BEGIN_DATE_TIME","END_DATE_TIME","CZ_TIMEZONE","EPISODE_ID","EVENT_ID","STATE_FIPS","CZ_FIPS","STATE","CZ_NAME","CZ_TYPE","EVENT_TYPE","BEGIN_LAT","BEGIN_LON","END_LAT","END_LON"]
all_files = glob.glob(stormdata_dir + "/*_c2022*.csv") #use the most updated files (they were updated in 2023). 
ds = []
for file in all_files:
    temp = pd.read_csv(file)[cols_of_interest]
    ds.append(temp)
    print("File = {} | rows = {}".format(file, len(temp)))
    # display(temp.head())
    del(temp)
ds = pd.concat(ds)
ds.loc[:,"STATE"] = ds["STATE"].str.replace(r'\W', '', regex=True).str.lower()

print("total episodes = {} | total events = {}".format(ds["EPISODE_ID"].nunique(), ds["EVENT_ID"].nunique()))

ds.loc[:,"CZ_FIPS"] = ds.apply(lambda x: str(x["STATE_FIPS"]).zfill(2) + str(x["CZ_FIPS"]).zfill(3), axis=1)

# perform the join only for "Zones"
ds = pd.merge(ds, z2c[["CountyFIPS","ZoneFIPS","CZ_TYPE"]], left_on=["CZ_TYPE","CZ_FIPS"], right_on=["CZ_TYPE","ZoneFIPS"], how="left")

display(ds)

In [None]:
# wherever the dataset states "C", the CZ_FIPS column gives us county FIPS --> leave that as is
# wherever the dataset states "Z", the CZ_FIPS gives the zone FIPS --> use mapping of zone FIPS to CountyFIPS
ds["CountyFIPS"] = ds.apply(lambda x: x["CZ_FIPS"] if x["CZ_TYPE"]=="C" else x["CountyFIPS"] if x["CZ_TYPE"]=="Z" else np.nan, axis=1)

# check if any of the "zones" have CZ_FIPS same as county FIPS
countyfips = counties["CountyFIPS"].unique()
ds["CountyFIPS"] = ds.apply(lambda x: x["CZ_FIPS"] if ((x["CZ_TYPE"]=="Z") & (pd.isnull(x["CountyFIPS"]) & (x["CZ_FIPS"] in countyfips))) else x["CountyFIPS"], axis=1)

# many of the rows with missing county FIPS correspond to marine zones, which are not covered in the NWPS zones
# since marine zones are limited to the beach/coastal areas, we would hardly find overlap between a county and marine zone
# so instead we will use BEGIN_LAT/LON and END_LAT/LON
ds_missing_CountyFIPS = ds[(pd.isnull(ds["CountyFIPS"]))].copy().drop(columns=["CountyFIPS"])

ds = (
    ds
    .dropna(subset=["CountyFIPS"])
    .drop(columns=["ZoneFIPS","CZ_TYPE","CZ_FIPS","CZ_NAME","STATE_FIPS","STATE","BEGIN_LAT","BEGIN_LON","END_LAT","END_LON"])
)

print("total episodes = {} | total events = {}".format(ds["EPISODE_ID"].nunique(), ds["EVENT_ID"].nunique()))

In [None]:
ds_missing_CountyFIPS["begin_geometry"] = ds_missing_CountyFIPS.apply(lambda x: Point(x["BEGIN_LON"], x["BEGIN_LAT"]), axis=1)
ds_missing_CountyFIPS["end_geometry"] = ds_missing_CountyFIPS.apply(lambda x: Point(x["END_LON"], x["END_LAT"]), axis=1)
ds_missing_CountyFIPS = ds_missing_CountyFIPS.drop(columns = ["BEGIN_LAT","BEGIN_LON","END_LAT","END_LON"])

#
ds_missing_CountyFIPS_begin = ds_missing_CountyFIPS.drop(columns=["end_geometry"]).rename(columns={"begin_geometry":"geometry"})
ds_missing_CountyFIPS_end = ds_missing_CountyFIPS.drop(columns=["begin_geometry"]).rename(columns={"end_geometry":"geometry"})

#
ds_missing_CountyFIPS = pd.concat([ds_missing_CountyFIPS_begin, ds_missing_CountyFIPS_end]).reset_index(drop=True)
ds_missing_CountyFIPS = gpd.GeoDataFrame(ds_missing_CountyFIPS, geometry="geometry", crs="EPSG:4269")
ds_missing_CountyFIPS = ds_missing_CountyFIPS.to_crs("EPSG:4326")
ds_missing_CountyFIPS = gpd.sjoin(ds_missing_CountyFIPS, counties.drop(columns=["county_geom","STATEFP","COUNTYFP"]), op="within", how="left")

#
ds_other = ds_missing_CountyFIPS[(pd.isnull(ds_missing_CountyFIPS["CountyFIPS"]))].drop(columns=["index_right","CountyFIPS"]).reset_index(drop=True)
ds_missing_CountyFIPS = ds_missing_CountyFIPS[(~pd.isnull(ds_missing_CountyFIPS["CountyFIPS"]))].drop(columns=["STATE_FIPS","STATE","geometry","index_right","ZoneFIPS","CZ_TYPE","CZ_FIPS","CZ_NAME"]).reset_index(drop=True)

In [None]:
"""
What is ds_other?

- it contains points (lon, lat) that do not belong to any county or forecast zone
- it contains points with empty lat, lon, and whose CZ_FIPS don't seem to be associated with any zone or counties. It almost seems like the FIPS are wrong.
    - we cannot do much in this case. If the CZ_NAME contains a county name we use it, else we discard the rows
"""
ds_other_coords = ds_other[(pd.notnull(ds_other["geometry"].x) & pd.notnull(ds_other["geometry"].y))]
ds_other_nocoords = ds_other[(pd.isnull(ds_other["geometry"].x) | pd.isnull(ds_other["geometry"].y))]

# ds_other = gpd.sjoin_nearest(ds_marine, counties.drop(columns=["county_geom","STATEFP","COUNTYFP"]), how="left")

In [None]:
# Deal with ds_other_nocoords by using county name-based regex expressions
ds_other_nocoords["counties"] = ds_other_nocoords["STATE"].apply(lambda x: state2county[x] if x in state2county else np.nan)

# drop rows with states like atlanticsouth or gulfofmexico
ds_other_nocoords = ds_other_nocoords.dropna(subset=["counties"])

# obtain the actual county names from the CZ_NAME statements. 
ds_other_nocoords["CZ_NAME_regex"] = ds_other_nocoords["CZ_NAME"].str.replace(r'[^A-Za-z]+', '', regex=True).str.lower()
ds_other_nocoords["CountyName"] = ds_other_nocoords.apply(lambda x: [y for y in x["counties"] if y in x["CZ_NAME_regex"]], axis=1)

# some events would be spread across multiple counties --> we find multiple matches --> we explode each county it into an individual row.
ds_other_nocoords = ds_other_nocoords.explode(column="CountyName").reset_index(drop=True)

# we will be left with entries where no match was found, and so we eliminate those rows
ds_other_nocoords = ds_other_nocoords[(~pd.isnull(ds_other_nocoords["CountyName"]))]

# add countyfips corresponding to that county
ds_other_nocoords = pd.merge(ds_other_nocoords, county2fips, left_on=["STATE","CountyName"], right_on=["StateName","CountyName"], how="left")
ds_other_nocoords = ds_other_nocoords.dropna(subset=["CountyFIPS"]).reset_index(drop=True)

# only extract the columns of interest
ds_other_nocoords = ds_other_nocoords[["BEGIN_DATE_TIME","END_DATE_TIME","CZ_TIMEZONE","EPISODE_ID","EVENT_ID","EVENT_TYPE","CountyFIPS"]].drop_duplicates().reset_index(drop=True)

display(ds_other_nocoords)

In [None]:
# Deal with ds_other_coords by finding the closest county to the lat lon of the event

def find_nearest_county(points_df, polygons_df):
    # Build the spatial index for polygons_df
    polygons_index = polygons_df.sindex

    # Create an empty list to store the nearest polygon ID for each point
    nearest_polygon_ids = []

    # Iterate over each point in points_df
    for index, point in points_df.iterrows():
        # Get the bounds of the point geometry
        point_bounds = point.geometry.bounds

        # Find the nearest polygon using the spatial index
        nearest_index = polygons_index.nearest(point_bounds)

        # Get the ID of the nearest polygon
        nearest_polygon_id = polygons_df.iloc[nearest_index]["CountyFIPS"].values
        # print(nearest_polygon_id)

        # Add the nearest polygon ID to the list
        nearest_polygon_ids.append(nearest_polygon_id)

    # # Add the nearest polygon ID column to points_df
    points_df["CountyFIPS"] = nearest_polygon_ids
    points_df = points_df.explode(column="CountyFIPS").reset_index(drop=True)
    return points_df

    
ds_other_coords = find_nearest_county(points_df=ds_other_coords.copy(), polygons_df=counties[["CountyFIPS","geometry"]].drop_duplicates().reset_index(drop=True))

# only extract the columns of interest
ds_other_coords = ds_other_coords[["BEGIN_DATE_TIME","END_DATE_TIME","CZ_TIMEZONE","EPISODE_ID","EVENT_ID","EVENT_TYPE","CountyFIPS"]].drop_duplicates().reset_index(drop=True)

display(ds_other_coords)

In [None]:
display(ds)

display(ds_missing_CountyFIPS)

display(ds_other_coords)

display(ds_other_nocoords)

In [None]:
ds = pd.concat([ds, ds_missing_CountyFIPS, ds_other_coords, ds_other_nocoords]).reset_index(drop=True)

print("total episodes = {} | total events = {}".format(ds["EPISODE_ID"].nunique(), ds["EVENT_ID"].nunique()))

display(ds)

In [None]:
"""
STEP 3: 
- convert CZ_TIMEZONE to native Python format. 
- convert local ts to UTC (POUS is in UTC).
"""
timezones = {"EST-5":"US/Eastern","EDT-4":"US/Eastern","CST-6":"US/Central","CDT-5":"US/Central","MST-7":"US/Mountain","PST-8":"US/Pacific","PDT-7":"US/Pacific","AKST-9":"US/Alaska","HST-10":"US/Hawaii"}
ds.loc[:,"tz"] = ds["CZ_TIMEZONE"].apply(lambda x: timezones[x] if x in timezones.keys() else np.nan)
# we drop nans and remove virgin islands
ds = ds.dropna().reset_index(drop=True)

In [None]:
"""
STEP 4: process string timestamps and convert tz to UTC
"""
ds.loc[:,"BEGIN_DATE_TIME"] = pd.to_datetime(ds["BEGIN_DATE_TIME"], format='%d-%b-%y %H:%M:%S')
ds.loc[:,"END_DATE_TIME"] = pd.to_datetime(ds["END_DATE_TIME"], format='%d-%b-%y %H:%M:%S')

# ds.loc[:,["BEGIN_DATE_TIME","END_DATE_TIME"]] = ds.loc[:,["BEGIN_DATE_TIME","END_DATE_TIME"]].apply(pd.to_datetime, format='%d-%b-%y %H:%M:%S')
ds = ds.sort_values(by=["CountyFIPS","BEGIN_DATE_TIME"])

In [None]:
# ds.groupby(["CountyFIPS"]).tz.nunique().reset_index().sort_values(by="tz").tail(60)

In [None]:
ds

In [None]:
# iterating using groupby because some countyFIPS have two tz. County-wise iteration also helps with nonexistent and ambigous timestamps due to DST.
ds_utc = []
for idx, grouped in ds.groupby(["CountyFIPS","tz"]):
    grouped.loc[:,"BEGIN_DATE_TIME"] = grouped.apply(lambda row: row["BEGIN_DATE_TIME"].tz_localize(row['tz'], nonexistent='shift_forward').tz_convert('UTC').tz_localize(None), axis=1)
    grouped.loc[:,"END_DATE_TIME"] = grouped.apply(lambda row: row["END_DATE_TIME"].tz_localize(row['tz'], nonexistent='shift_forward', ambiguous='NaT').tz_convert('UTC').tz_localize(None), axis=1)
    ds_utc.append(grouped)
ds_utc = pd.concat(ds_utc).dropna().reset_index(drop=True) # we lose only 1 ambiguous reading here
ds_utc = (
    ds_utc[["BEGIN_DATE_TIME","END_DATE_TIME","EPISODE_ID","EVENT_ID","EVENT_TYPE","CountyFIPS"]]
    .rename(columns={"BEGIN_DATE_TIME":"event_start", "END_DATE_TIME":"event_end", "EVENT_ID":"event_id","EVENT_TYPE":"event_type","EPISODE_ID":"episode_id"}) 
)

In [None]:
ds_utc

In [None]:
"""
For each (event_id, event_type, CountyFIPS) tuple, create one row per day of the event. 
"""
db = []
for _, row in ds_utc.iterrows():
    event_start = row["event_start"].date()
    event_end = row["event_end"].date()
    # print(row.T)
    #
    date_range = pd.date_range(event_start, event_end, freq="1D")
    #
    temp = pd.DataFrame({
        "episode_id":row["episode_id"],
        "event_id":row["event_id"],
        "event_type":row["event_type"],
        "CountyFIPS":row["CountyFIPS"],
        "event_date":date_range,
    })
    db.append(temp)
#
db = pd.concat(db).reset_index(drop=True)

display(db)

In [None]:
display(db)

In [None]:
#save the storms dataframe
output_dir = "/StormEvents_processed"
output_filename = "StormEvents_c2022.pkl" #2022 version

db.to_pickle(os.path.join(output_dir, output_filename))