In [2]:
# Name: fire_vulnerability
# Purpose: Notebook to process customer CSV data containing fire locations by FIPS code, 
#          merge with other data
# Requirements: Python3.x, ArcGIS Notebooks
# Author(s): Jeffrey Holycross, Esri
# Last Update: March 2023
# Copyright: Esri

In [3]:
## USER-DEFINED VARIABLES
## Define the ArcGIS Online Item for the Home Fire Fatalities incidents.csv. 
## Once set, this would only change if the Portal Item was republished.
csv_id = ''
## Define the ArcGIS Online Item for "SVI2020_US_county_Summarize_HFF". 
## Once set, this would only change if the Portal Item was republished.
hfs_id = ''
## Define the ArcGIS Online Item ID for the CDC\ATSDR SVI.
## Once set, this would only change if the Portal Item was republished.
## Not currently used
## svi_id = ''

Once variables above (`csv_id`, `hfs_id`) are defined, all cells can be run as-is using Cell --> Run All

In [4]:
## Standard library imports
import datetime
import math
import os
import time
## Standard library "from" imports
## External library imports
import pandas as pd
## External library "from" imports
from arcgis.features import GeoAccessor, GeoSeriesAccessor
from arcgis.gis import GIS
## Project specific imports
## Project specific "from" imports

In [5]:
def handle_timeouts(f):
    """
    Helper function to wrap REST API networking calls in retry logic.

    ===============     ====================================================================
    **Argument**        **Description**
    ---------------     --------------------------------------------------------------------
    function            Required function to be executed
    ===============     ====================================================================

    :return:
        Returns the return from the function if present, otherwise True in a successful execution
    """
    for n in range(1,4):
        try:
            res = f()
            ## return True if no Exception
            return res if ('res' in locals() and res is not None) else True
        except Exception as e:
            if 'Column names in each table must be unique' in e.args[0]:
                ## HFS thinks the column is already there, but res won't exist because Exception
                return True
            elif (
                    (hasattr(e, 'message') and 'Your request has timed out' in e.message) or 
                    ('Your request has timed out' in e.args[0]) or
                    ('The wait operation timed out' in e.args[0])
            ):
                if n < 3:
                    print(f"Timeout on attempt #{n}")
                    time.sleep(1)
                else:
                    raise RuntimeError("Stopping due to timeout after 3 attempts") from None
                    raise
            elif ('Invalid URL' in e.args[0]):
                if n < 3:
                    print(f"Invalid URL on attempt #{n}, retrying")
                    time.sleep(1)
                else:
                    raise RuntimeError("Stopping due to Invalid URL after 3 attempts") from None
            else:
                raise

In [6]:
def batch_edit_features(a_fset, e_fset, d_fset, batch_size):
    """
    Helper function to take lists of features to be added, edited, or deleted (such as from FeatureSet.features)
    and split into chunks of size batch_size.

    ===============     ====================================================================
    **Argument**        **Description**
    ---------------     --------------------------------------------------------------------
    feature_list        Required list. Features to be chunked.
    ---------------     --------------------------------------------------------------------
    batch_size          Required integer. Size of chunks.
    ===============     ====================================================================

    :return:
        Yields a list of features of `batch_size` size.
    """
    total_features = len(a_fset) + len(e_fset) + len(d_fset)
    num_sets = math.ceil(total_features / batch_size)
    a_pos = 0
    e_pos = 0
    d_pos = 0
    for i in range(0, total_features, batch_size):
        ## Start with adds
        if a_pos < len(a_fset):
            ## Start at previous position
            add_start = a_pos
            ## End at position + batch_size
            add_end = min(a_pos + batch_size,len(a_fset))
            ## Set position to the last item included
            a_pos = add_end
            ## How many items included?
            a_len = a_pos - add_start
        else:
            add_start = add_end = a_len = 0
        if a_len < batch_size:
            upd_start = e_pos
            upd_end = min(e_pos + batch_size - a_len,len(e_fset))
            e_pos = upd_end
            e_len = e_pos - upd_start
        else:
            upd_start = upd_end = e_len = 0
        if (a_len + e_len) < batch_size:
            del_start = d_pos
            del_end = min(d_pos + batch_size - (a_len + e_len),len(d_fset))
            d_pos = del_end
        else:
            del_start = del_end = e_len = 0
        yield num_sets, a_fset[add_start:add_end], e_fset[upd_start:upd_end], d_fset[del_start:del_end]

In [7]:
def update_dates_to_utc(input_sdf,date_columns):
    """
    Takes a DataFrame and converts a list of specified columns into UTC time.

    ===============     ====================================================================
    **Argument**        **Description**
    ---------------     --------------------------------------------------------------------
    input_sdf           Required DataFrame. The DataFrame containing the data to convert.
    ---------------     --------------------------------------------------------------------
    date_columns        Required list. List of columns containing dates to convert to UTC.
    ===============     ====================================================================

    :return:
        DataFrame (Pandas or Spatially-enabled)
    """
    for col in date_columns:
        new_time = pd.to_datetime(input_sdf[col], errors='raise', utc=True, infer_datetime_format=True)
        input_sdf[col] = new_time
    return input_sdf

In [8]:
def edit_features_from_sdf(add_sdf, upd_sdf, del_sdf,layer_or_table,batch_size,date_columns=None):
    """
    Applies additions, edits, and deletions to a specified layer or table, using values from a Spatially Enabled DataFrame.

    ===============     ====================================================================
    **Argument**        **Description**
    ---------------     --------------------------------------------------------------------
    input_sdf           Required Spatially Enabled DataFrame.  The Spatially Enabled DataFrame
                        containing the data to publish.
    ---------------     --------------------------------------------------------------------
    layer_or_table      Required FeatureLayer or Table. Input is assumed to be from 
                        `gis.content.get().layers[x]` or `gis.content.get().tables[x]`.
    ---------------     --------------------------------------------------------------------
    batch_size          Required Integer. The chunk size to send to edit_features. Esri
                        documentation recommends <=250. In practice this will depend on the data
                        being sent.
    ---------------     --------------------------------------------------------------------
    date_columns        Optional list of one of more DataFrame column names that contain Datetimes.
                        These columns will be converted to UTC before sent to edit_features. 
    ===============     ====================================================================

    :return:
        None
    """
    if date_columns is not None:
        add_sdf = update_dates_to_utc(add_sdf,date_columns)
        upd_sdf = update_dates_to_utc(upd_sdf,date_columns)
    if len(add_sdf) > 0:
        add_fset = add_sdf.spatial.to_featureset().features
    else:
        add_fset = []
    if len(upd_sdf) > 0:
        upd_fset = upd_sdf.spatial.to_featureset().features
    else:
        upd_fset = []
    del_fset = list(del_sdf)
    # Original feature count
    orig_count = handle_timeouts(lambda: layer_or_table.query(return_count_only=True))
    # Update Feature Layer
    for i,(num_sets,adds,upds,dels) in enumerate(batch_edit_features(add_fset, upd_fset, del_fset, batch_size),1):
        start_time = time.time()
        res = handle_timeouts(lambda: layer_or_table.edit_features(adds=adds,updates=upds,deletes=dels,rollback_on_failure=True))
        add_num = len([j for j in res['addResults'] if j['success']])
        upd_num = len([j for j in res['updateResults'] if j['success']])
        del_num = len([j for j in res['deleteResults'] if j['success']])
        total_success = add_num + upd_num + del_num
        print(f"Batch {i} of {num_sets}: Added {add_num}/{len(adds)}, edited {upd_num}/{len(upds)}, deleted {del_num}/{len(dels)} in {round(time.time()-start_time,0)} seconds")
        if total_success != (len(adds)+len(upds)+len(dels)):
            ## Not every edit was successful
            errors = [k['error']['description'] for k in res['addResults'] if not k['success']]
            errors += [k['error']['description'] for k in res['updateResults'] if not k['success']]
            errors += [k['error']['description'] for k in res['deleteResults'] if not k['success']]
            ## zip() is a zip generator. Convert it into a key-value dict.
            ## Then convert the items to a list of tuples, then sort based on the second value.
            error_counts = sorted(list(dict(zip(errors,[errors.count(e) for e in errors])).items()),key=lambda l:l[1],reverse=True)
            print(f"\tERROR: Batch {i} of {num_sets} had errors:")
            for e in error_counts: print(f"\t\t{e[1]}: {e[0]}")
    ## Wait 5 seconds to allow any final processing to complete
    time.sleep(5)
    ## Verify that final count matches expected
    final_count = handle_timeouts(lambda: layer_or_table.query(return_count_only=True))
    ## Expected count = Original Count + Adds - Deletes
    expected_count = orig_count + len(add_sdf) - len(del_sdf)
    if final_count == expected_count:
        print("Processing complete! Final count matches expected number of records.")
    else:
        print(f"ERROR: Processing is complete but final record count {final_count} does not match the expected value {expected_count}")

In [23]:
def process_fatalities_csv(gis):
    """
    Function to perform business logic-specific processing

    ===============     ====================================================================
    **Argument**        **Description**
    ---------------     --------------------------------------------------------------------
    gis                 Required GIS object
    ===============     ====================================================================

    :return:
        The processed DataFrame
    """
    ## Retrieve existing incidents.csv
    fp = gis.content.get(csv_id).download()
    ## Hard code FIPS column to string to ensure that we don't lose leading zeros
    hff_df = pd.read_csv(fp,dtype={'FIPS':str})
    ## Remove the download
    os.remove(fp)
    ## Ensure that all FIPS codes are 5-digits with leading zeros
    hff_df['FIPS'] = hff_df['FIPS'].apply(lambda f: str(f).strip().zfill(5))
    ## For every row in the 'Date' column, apply a conversion to a datetime, and grab the year.
    ## Create a new column with this result
    hff_df['Year'] = hff_df['Date'].apply(lambda dt:str(pd.to_datetime(dt).year))
    ## Create a cross tab of FIPS & fatalities by year
    fatalities = pd.crosstab(index=hff_df['FIPS'],columns=hff_df['Year'])
    ## Change the name of the fatalities columns so they match Fatalities_XXXX convention
    fatalities.columns = [f'Fatalities_{y}' for y in fatalities.columns]
    ## Make the FIPS index a column rather than index
    return fatalities.reset_index()

In [10]:
def add_new_field(to_layer,field_name,field_type):
    """
    Function to add a new field to an existing FeatureLayer

    ===============     ====================================================================
    **Argument**        **Description**
    ---------------     --------------------------------------------------------------------
    to_layer            Required FeatureLayer object to add the field to
    ---------------     --------------------------------------------------------------------
    field_name          Required String for the field name & alias
    ---------------     --------------------------------------------------------------------
    field_type          Required esri Field type definition for the field being added
    ===============     ====================================================================

    :return:
        Return from Layer.manager.add_to_definition (if any), True otherwise
    """
    new_field = {
      "name": field_name, 
      "type": field_type, 
      "alias": field_name, 
      "nullable": True, 
      "editable": True, 
      "defaultValue": None,
      "visible": True
    }

    update_dict = {"fields": [new_field]}
    res = handle_timeouts(lambda: to_layer.manager.add_to_definition(update_dict))
    ## Sleep so added field can process
    time.sleep(5)
    return res

In [11]:
def merge_new_and_old_data(hfs,new_data,columns_to_update):
    """
    Function to take new and old data, process as necessary and perform merging

    ===============     ====================================================================
    **Argument**        **Description**
    ---------------     --------------------------------------------------------------------
    gis                 Required GIS object
    ---------------     --------------------------------------------------------------------
    new_data            Required DataFrame with new data
    ---------------     --------------------------------------------------------------------
    columns_to_update   Required list of columns being updated by new_data
    ===============     ====================================================================

    :return:
        The merged Spatially-enabled DataFrame.
    """
    ## Pull existing data, including SVI fields
    old_sdf = handle_timeouts(lambda: hfs.layers[0].query(return_geometry=False).sdf)
    ## Reset new_data to have all of the potential FIPS values from existing SDF
    ## Use old_sdf[['FIPS']] as left_df so that we get the index from old_sdf
    fatalities = old_sdf[['FIPS']].merge(right=new_data,on='FIPS',how='outer')
    ## Maximum of 3,143 FIPS values as of 2023
    if len(fatalities) != 3143:
        if len(old_sdf) != 3143:
            raise RuntimeError(f"Existing data has more FIPS codes than expected. Please manually review existing layer for duplicates or invalid information.")
        else:
            ## Likely that an invalid FIPS was provided in new_data
            ## Check for FIPS codes in new_data that are not in old_sdf FIPS column
            invalid_fips = new_data['FIPS'].loc[~new_data['FIPS'].isin(old_sdf['FIPS'])].values.tolist()
            raise RuntimeError(f"Unexpected result from merging old data with new data. " + 
                               f"Combined FIPS count {len(fatalities)} rather than expected 3,143.\n" +
                               f"Unexpected FIPS value(s): {invalid_fips}")
    ## Reset Fatalities count columns to INT values, fill N/As with 0
    for c in fatalities.columns:
        if not c == 'FIPS':
            fatalities[c] = fatalities[c].fillna(0).astype(int)

    ## Processing
    ## Make new_sdf a copy of the existing SDF
    new_sdf = old_sdf.copy(True)
    ## Overwrite the updated column(s) in new_sdf
    for column in columns_to_update:
        ## Are there any columns in columns_to_update that aren't already in old_sdf?
        if column not in new_sdf.columns:
            column_year = int(column.split('_')[1])
            if column_year < datetime.datetime.now().year:
                ## Don't expect prior years to be missing
                raise RuntimeError(f"Field for {column} not present in Hosted Feature Service data. \n" +
                                f"Verify that hfs_id is set to the right Hosted Feature service. \n" +
                                f"If set correctly, please add an Integer field named {column} to the " + 
                                f"{hfs.layers[0].properties['name']} layer in {hfs.title}")
            else:
                ## Auto add a field with column name
                print(f"Adding field {column}")
                add_new_field(hfs.layers[0],column,"esriFieldTypeInteger")
                print(f"Successfully added field {column}")
                new_sdf[column] = fatalities[column]
        else:
            ## Overwrite
            new_sdf[column] = fatalities[column]
    ## Calculate new Point_Count column from all Fatalities_2XXX columns
    fatalities_columns = [c for c in new_sdf.columns if c.startswith('Fatalities_2')]
    ## Generate Point_Count
    new_sdf['Point_Count'] = new_sdf[fatalities_columns].agg(sum,axis=1)
    ## Return the processed SeDF
    return new_sdf

In [12]:
def calculate_fire_rankings(hfs,sdf):
    """
    Function to calcualte Fire Vulnerabilty and other related columns

    ===============     ====================================================================
    **Argument**        **Description**
    ---------------     --------------------------------------------------------------------
    hfs                 Required Feature Layer (used for name & title attributes in a failure)
    ---------------     --------------------------------------------------------------------
    sdf                 Required Spatially Enabled DataFrame with expected columns
    ===============     ====================================================================

    :return:
        The processed Spatially Enabled DataFrame
    """
    for column in ['Fatalities_per_100000','SVI_Ranking','HFF_Ranking','Fire_Vulnerability','Fire_Vulnerability_Ranking']:
        if column not in sdf.columns:
            col_type = 'a Double' if column == 'Fatalities_per_100000' else 'a String' if column == 'Fire_Vulnerability_Ranking' else 'an Integer'
            raise RuntimeError(f"Field for {column} not present in Hosted Feature Service data. \n" +
                               f"Verify that hfs_id is set to the right Hosted Feature service. \n" +
                               f"If set correctly, please add {col_type} field named {column} to the " + 
                               f"{hfs.layers[0].properties['name']} layer in {hfs.title}")
    ## Fatalities_per_100000 (double)
    sdf['Fatalities_per_100000'] = (sdf['Point_Count']/sdf['E_TOTPOP'])*100000
    ## HFF_Ranking (long)
    sdf.loc[(sdf['Fatalities_per_100000'] < 10),'HFF_Ranking'] = 0
    sdf.loc[(sdf['Fatalities_per_100000'] >= 10) & (sdf['Fatalities_per_100000'] < 25),'HFF_Ranking'] = 1
    sdf.loc[(sdf['Fatalities_per_100000'] >= 25) & (sdf['Fatalities_per_100000'] < 35),'HFF_Ranking'] = 2
    sdf.loc[(sdf['Fatalities_per_100000'] >= 35),'HFF_Ranking'] = 3
    ## SVI_Ranking (long)
    sdf.loc[(sdf['RPL_THEMES'] <= 0.25),'SVI_Ranking'] = 0
    sdf.loc[(sdf['RPL_THEMES'] > 0.25) & (sdf['RPL_THEMES'] <= 0.5),'SVI_Ranking'] = 1
    sdf.loc[(sdf['RPL_THEMES'] > 0.5) & (sdf['RPL_THEMES'] <= 0.75),'SVI_Ranking'] = 2
    sdf.loc[(sdf['RPL_THEMES'] >= 0.75),'SVI_Ranking'] = 3
    ## Fire_Vulnerability (long)
    sdf['Fire_Vulnerability'] = sdf['HFF_Ranking'] + sdf['SVI_Ranking']
    ## Fire_Vulnerability_Ranking (text)
    sdf.loc[(sdf['Fire_Vulnerability'].isin([0])),'Fire_Vulnerability_Ranking'] = 'Extremely Low'
    sdf.loc[(sdf['Fire_Vulnerability'].isin([1,2])),'Fire_Vulnerability_Ranking'] = 'Low'
    sdf.loc[(sdf['Fire_Vulnerability'].isin([3,4])),'Fire_Vulnerability_Ranking'] = 'Medium'
    sdf.loc[(sdf['Fire_Vulnerability'].isin([5,6])),'Fire_Vulnerability_Ranking'] = 'High'
    return sdf
    

In [None]:
## main():
gis = GIS("home")
new_data = process_fatalities_csv(gis)
## Keep track of the Fatalities_2XXX columns that are being updated
columns_to_update = [c for c in new_data.columns if c.startswith('Fatalities_2')]
svc = handle_timeouts(lambda: gis.content.get(hfs_id))
sdf = merge_new_and_old_data(svc,new_data,columns_to_update)
## Assume that Point_Count was updated during merge_new_and_old_data
columns_to_update.append('Point_Count')
## Calculate Fatalities per 100k and HFF/SVI/FV rankings
sdf = calculate_fire_rankings(svc,sdf)
## Assume that Fire Rankings columns were updated during calculate_fire_rankings
columns_to_update.extend(['Fatalities_per_100000','SVI_Ranking','HFF_Ranking','Fire_Vulnerability','Fire_Vulnerability_Ranking'])
## Overwrite with SeDF should be next, in theory
## Only need to overwrite columns in columns_to_update, plus Object ID field is required for identifying the update row
## Ensure that columns_to_update contains the Object ID field for HFS_ID
columns_to_update.insert(0,svc.layers[0].properties['objectIdField'])
## Pass only columns_to_update columns to publishing functions
print("Editing features...")
## Can go directly to edit_features_from_sdf because we don't need to identify edits (everything is an edit)
edit_features_from_sdf(add_sdf=[],
                       upd_sdf=sdf[columns_to_update],
                       del_sdf=[],
                       layer_or_table=svc.layers[0],
                       batch_size=800)