## Metadata

Functions for ArcGIS workflows

Author: Adrian Wiegman, adrian.wiegman@usda.gov

Created: 10:05 AM Thursday, March 30, 2023

Updated: April 18, 2023

Notes and Instructions:

This script contains helper functions for arcgis workflow to analyze land use and other datasets within watersheds. 

- prefix all user defined functions with `fn_`
- place working functions in the main program
- place broken functions in the appendix and comment out or convert cell to Raw NBConvert


## Main Program

functions in this section have been checked and debugged

In [2]:
print('type `fn_`+TAB to for autocomplete suggestions')

def fn_get_info(name='fn_get_info'):
    '''
    returns the source information about a given function name
    '''
    ??$name

type `fn_`+TAB to for autocomplete suggestions


In [39]:
# this codeblock appends a path to source codes to the environment variable paths
# then runs a script containing other source codes
import sys
# Insert the path of modules folder 
sys.path.append(r"C:\Users\Adrian.Wiegman\Documents\GitHub\Wiegman_USDA_ARS\MEP\scripts")
# Import the module0 directly since 
# the current path is of modules.
%run -m _FeatureTableToDataFrame 
# the line above runs the script named _FeaturedTableToDataFrame.py from within the path .../scripts

In [28]:
def fn_run_script_w_propy_bat(
    py_script_path=None, # full path to python script includeing the name e.g. "C:\hello.py"
    propy_bat_path="C:\Progra~1\ArcGIS\Pro\\bin\Python\Scripts"
    ):
    '''
    this function can be used to execute standalone python scripts in the ArcGIS python environment using the command line
    the benefit of this is that arcpy can be used without opening arcgis
    read more: https://pro.arcgis.com/en/pro-app/latest/arcpy/get-started/using-conda-with-arcgis-pro.htm
    '''
    import os
    # create temporary file for testing the function
    if py_script_path is None:
        import tempfile
        tmpdir = tempfile.TemporaryDirectory()
        py_script_path = os.path.join(tmpdir.name,"hello.py")
            # Open the file for writing.
        with open(py_script_path, 'w') as f:
            f.write("print('Hello World!')")

    # get current working directory
    wdr = os.getcwd()
    
    # change directory to folder containing propy.bat
    os.chdir(propy_bat_path) 
    
    # construct cmd
    cmd = "propy.bat {}".format(py_script_path)
    
    print("running command:\n")
    print("{}\{}".format(propy_bat_path,cmd))
    # execute cmd
    os.system(cmd)
    
    # change directory back 
    os.chdir(wdr)
#fn_run_script_w_propy_bat()

In [None]:
def fn_try_mkdir(dirname):
    '''
    tries to make a new directory
    uses proper error handling 
    '''
    import os, errno
    try:
        os.mkdir(dirname)
    except OSError as exc:
        if exc.errno != errno.EEXIST:
            raise
        pass

In [None]:
def fn_hello(x="world"):
    '''
    prints hello x
    '''
    print("hello %s" %x)

In [None]:
def fn_recursive_glob_search (startDir=None,
                             fileExt="csv"):
    '''returns:
           file paths matching extension 
           within all subdirectories starting directory
       inputs:
           startDir = root or parent directory to start search
           fileExt = file extension, e.g. ".csv" ".xlsx" ".shp"
    '''
    import glob, os
    if startDir is None:
        startDir = os.getcwd
    fileList = []
    glbsearch = os.path.join(startDir,'**/*'+fileExt)
    for f in glob.glob(glbsearch, recursive=True):
        #print(f)
        fileList.append(f)
    return(fileList)

In [27]:
def fn_regex_search_replace(string,pattern,replacement=None):
    '''
    returns the a string with a pattern substituted by a replacement
    '''
    if replacement is None: replacement = ""
    import re
    x = re.sub(pattern,replacement,string)
    return(x)

In [None]:
def fn_regex_search_0 (string,pattern,noneVal="NA"):
    '''
    returns the first match of a regular expression pattern search on a string
    '''
    import re
    x = re.search(pattern,string)
    if x is None: 
        x= [noneVal]    
    return(x[0])

In [276]:
def fn_arcpy_table_to_excel(inFeaturePath,outTablePath,outTableName):
    import os
    arcpy.conversion.TableToExcel(inFeaturePath, os.path.join(outTablePath,outTableName), "ALIAS", "CODE")

In [None]:
def fn_agg_sum_df_on_group(group_cols,df,func=sum):
    '''
    returns data frame aggregated on set of group_cols for given a func
    '''
    import pandas as pd
    import numpy as np
    return df.groupby(group_cols).aggregate(func).reset_index()

In [None]:
def fn_add_prefix_suffix_to_selected_cols(df,col_names,prefix=None,suffix=None,sep='_'):
    # use list comprehension to add prefix and/or suffix to old names
    _ = [i if prefix is None else prefix+sep+i for i in col_names]
    new_names = [i if suffix is None else i+sep+suffix for i in _]
    df.rename(columns=dict(zip(col_names, new_names)), inplace=True)
    return df

In [None]:
def fn_calc_pct_cover_within_groups(group_cols,x,area_col='Shape_Area'):
    '''
    calculates percent cover normalize metrics by polygon area
    inputs:
        x = pandas dataframe containing the following columns
        group_cols = a list of strings containing group column names
        area_col = string containing the name of the shape area column
    all other columns must be numeric columns containing areas of various attribute types 
    all other columns must have the same units as area_col
    '''
    x
    # copy the numeric columns that are not groups or the selected area column 
    _ = x.loc[:, ~x.columns.isin(group_cols+[area_col])]._get_numeric_data()
    
    # divide the 
    y = _.div(x[area_col],axis=0).mul(100)
    if 'Shape_Length' in y.columns:
        y.rename(columns={'Shape_Length':'Shape_Perim_to_Area'},inplace=True)
    # merge the data back together
    z = pd.merge(x[group_cols],x[area_col],left_index=True,right_index=True).merge(y,left_index=True, right_index=True)
    return z

In [None]:
def fn_buildWhereClauseFromList(table, # name of table
                                field, # name of field to search
                                valueList # list of values for the SQL query
                               ):
    
    """Takes a list of values and constructs a SQL WHERE
    clause to select those values within a given field and table."""

    # Add DBMS-specific field delimiters
    fieldDelimited = arcpy.AddFieldDelimiters(arcpy.Describe(table).path, field)

    # Determine field type
    fieldType = arcpy.ListFields(table, field)[0].type

    # Add single-quotes for string field values
    if str(fieldType) == 'String':
        valueList = ["'%s'" % value for value in valueList]

    # Format WHERE clause in the form of an IN statement
    whereClause = "%s IN(%s)" % (fieldDelimited, ', '.join(map(str, valueList)))
    return whereClause

In [68]:
#Get extract the cell size of raster
def fn_FA_to_Q (rasterpath=None,recharge_rate_in_yr = 27.25):
    _ = arcpy.GetRasterProperties_management(rasterpath, "CELLSIZEX")
    #Get the elevation standard deviation value from geoprocessing result object
    cellsize_x = _.getOutput(0)
    _ = arcpy.GetRasterProperties_management(rasterpath, "CELLSIZEY")
    cellsize_y = _.getOutput(0)
    # calculate cell area in meters
    cell_area_meters = float(cellsize_x) * float(cellsize_y)
    print(cell_area_meters)
    FA_to_Q = cell_area_meters * recharge_rate_in_yr * 2.54 * (1/100) * (1/365.25)
    print(FA_to_Q)

In [None]:
def fn_alter_field_double(intablepath,oldname,newname):
    '''
    renames a field in a table if the field is a double
    '''
    arcpy.management.AlterField(
        in_table=intablepath,
        field=oldname,
        new_field_name=newname,
        new_field_alias="",
        field_type="DOUBLE",
        field_length=16,
        field_is_nullable="NULLABLE",
        clear_field_alias="CLEAR_ALIAS")

In [65]:
def fn_return_float(x):
    """
    returns an object of type float
    """
    try: 
        return(float(x))
    except:
        return(None)

In [2]:
def fn_classify_wetlands(x):
    """
    Consolidates DEP wetlands into fewer categories
    """
    if "MARSH" in x:
        return("MARSH")
    elif "WOOD" in x:
        return("FORESTED SWAMP")
    elif "SHRUB" in x:
        return("SHRUB SWAMP")
    elif len(x) == 0:
        return("NON WETLAND")
    else:
        return(x)

## 2. Broken Functions

place broken functions in this section and set as `Raw NBConvert`

In [11]:
# RESUME HERE 2023-07-05

ACTIVE_CRANBERRY_USECODES = ['017','710','270','71','27','17','7100','2700']
RETIRED_CRANBERRY_USECODES = ['9','20','20','21','29'] # starts with
#df.USECODE == 
def fn_land_use_conditions_dict_general(df):
    '''
    '''
    active_cranberry = '((df.COVERCODE == 21) & (df.USEGENCODE==7))|(np.isin(df.COVERCODE,[6,7])))'
    waterbody = '(np.isin(df.COVERCODE,[22,21]) & np.invert('+active_cranberry+'))'
    GENERAL = {

        # THE FIRST APPEARING CONDITION TAKES PRIORITY IN NP.SELECT
    
        # key (LUC): value (condition string)
    
        # 4: "Agriculture, Active Cranberry, Flowthrough"
    
        4: active_cranberry+' & '+flowthrough,
    
        # 5: "Agriculture, Active Cranberry, Non Flowthrough"
    
        5: active_cranberry+' & (np.invert('+flowthrough+'))',
    
        # 11: "Retired Cranberry, Flowthrough"
    
        11: abandoned_cranberry+' & '+flowthrough,
    
        # 12: "Retired Cranberry, Non flowthrough"
    
        12: abandoned_cranberry+' & np.invert('+flowthrough+')',
    
        # 1: "Natural Uplands"
    
        1: 'df.COVERCODE.between(8, 12, inclusive="both")',
    
        # 6: "Impervious, Roads"
    
        6: '(df.COVERCODE == 2) & (df.USEGENCODE == 55)',
    
        ## 3: "Agriculture, Non Cranberry"
    
        3: 'df.COVERCODE == [False]',
    
        # 8: "Recieving Body (Estuary)"
        
        8: '(df.COVERCODE == 23)|'+waterbody+')', # note that SUB is a string
        
        # 9: "Freshwater Ponds and Lakes"
    
        9: '(df.SUB != "0") &' + waterbody,
    
        # 10: "Wetlands"
    
        10: 'df.COVERCODE.between(13, 18, inclusive="both")',
    
        # 2: "Mowed Areas (Lawns, Sports Fields, and Golf Courses)"
    
        2: 'df.COVERCODE == 5',
    
        # 7: "Impervious, Non-Roads"
    
        7: '(df.COVERCODE == 2) & (df.USEGENCODE != 55)',
    
        # 13: "Other (Bare land or Shoreline)"
    
        13: 'np.isin(df.COVERCODE,[19,20])'

    }
    

def fn_land_use_conditions_dict():
    '''
    GENERATE DICTIONARY OF CONDITIONS FOR RECLASSIFYING LAND USE
    '''

    # specific active cranberry: where other farming makes up a non zero portion of cultivated and hay/pasture cover types
    specific_active_cranberry = '(np.isin(df.CropStatus,"active"))'
    # general active cranberry: use where active cranberry makes up over 99% of COVERCODE 6 and 7. 
    general_active_cranberry = '((np.isin(df.CropStatus,"active"))|((df.COVERCODE == 21) & (df.USEGENCODE==7))|(np.isin(df.COVERCODE,[6,7])))'
    active_cranberry = specific_active_cranberry
    abandoned_cranberry = 'np.isin(df["FID"],df[df.CropStatus == "abandoned"]["FID"])'
    flowthrough = 'np.isin(df["FID"],df[df.Bog_type == "flowthrough"]["FID"])'
    terminus = 'np.isin(df["FID"],df[df.ele5pct == "LE5%"]["FID"])'
    flowthrough = terminus
    #abandoned_cranberry = '(np.isin(df.CropStatus,"abandoned"))'
    #flowthrough = '(np.isin(df.Bog_type,"flowthrough"))'
    waterbody = '(np.isin(df.COVERCODE,[22,21]) & np.invert('+active_cranberry+'))'
    
    # condition dictionary
    SPECIFIC = {
        # THE FIRST APPEARING CONDITION TAKES PRIORITY IN NP.SELECT
        # key (LUC): value (condition string)
        # 4: "Agriculture, Active Cranberry, Flowthrough"
        4: active_cranberry+' & '+flowthrough,
        # 5: "Agriculture, Active Cranberry, Non Flowthrough"
        5: active_cranberry+' & (np.invert('+flowthrough+'))',
    
        # 11: "Retired Cranberry, Flowthrough"
    
        11: abandoned_cranberry+' & '+flowthrough,
    
        # 12: "Retired Cranberry, Non flowthrough"
    
        12: abandoned_cranberry+' & np.invert('+flowthrough+')',
    
        # 1: "Natural Uplands"
    
        1: '(df.COVERCODE >= 8) & (df.COVERCODE <= 12)',
     
        # 6: "Impervious, Roads"
    
        6: '(df.COVERCODE == 2)',
    
        # 3: "Agriculture, Non Cranberry"
    
        3: '(np.isin(df.COVERCODE,[6,7])) & (np.invert('+active_cranberry+'))',
    
        # 8: "Recieving Body (Estuary)"
    
        8: '(df.SUB == "0") & ((df.COVERCODE == 23)|'+waterbody+')', # note that SUB is a string
    
        # 9: "Freshwater Ponds and Lakes"
    
        9: '(df.SUB != "0") &' + waterbody,
    
        # 10: "Wetlands"
    
        10: '(df.COVERCODE >= 13) & (df.COVERCODE <= 18)',
    
        # 2: "Mowed Areas (Lawns, Sports Fields, and Golf Courses)"
    
        2: 'df.COVERCODE is [False]', # this will be calculated separately
    
        # 7: "Impervious, Non-Roads"
    
        7: 'df.COVERCODE is [False]', # this will be calculated separately
    
        # 13: "Other (Bare land or Shoreline)"
    
        13: 'df.COVERCODE is [False]'}
    
    GENERAL = {
    
        # THE FIRST APPEARING CONDITION TAKES PRIORITY IN NP.SELECT
    
        # key (LUC): value (condition string)
    
        # 4: "Agriculture, Active Cranberry, Flowthrough"
    
        4: active_cranberry+' & '+flowthrough,
    
        # 5: "Agriculture, Active Cranberry, Non Flowthrough"
    
        5: active_cranberry+' & (np.invert('+flowthrough+'))',
    
        # 11: "Retired Cranberry, Flowthrough"
    
        11: abandoned_cranberry+' & '+flowthrough,
    
        # 12: "Retired Cranberry, Non flowthrough"
    
        12: abandoned_cranberry+' & np.invert('+flowthrough+')',
    
        # 1: "Natural Uplands"
    
        1: 'df.COVERCODE.between(8, 12, inclusive="both")',
    
        # 6: "Impervious, Roads"
    
        6: '(df.COVERCODE == 2) & (df.USEGENCODE == 55)',
    
        ## 3: "Agriculture, Non Cranberry"
    
        3: 'df.COVERCODE == [False]',
    
        # 8: "Recieving Body (Estuary)"
        
        8: '(df.USEGENCODE == 0) &' +waterbody, 
        
        # 9: "Freshwater Ponds and Lakes"
    
        9: '(df.USEGENCODE == 0 != 0) &' + waterbody,
    
        # 10: "Wetlands"
    
        10: 'df.COVERCODE.between(13, 18, inclusive="both")',
    
        # 2: "Mowed Areas (Lawns, Sports Fields, and Golf Courses)"
    
        2: 'df.COVERCODE == 5',
    
        # 7: "Impervious, Non-Roads"
    
        7: '(df.COVERCODE == 2) & (df.USEGENCODE != 55)',
    
        # 13: "Other (Bare land or Shoreline)"
    
        13: 'np.isin(df.COVERCODE,[19,20])'

    }
    return()

NameError: name 'GENERAL' is not defined

In [12]:
# RESUME HERE 2023-07-05

# specific active cranberry: where other farming makes up a non zero portion of cultivated and hay/pasture cover types
specific_active_cranberry = '(np.isin(df.CropStatus,"active"))'
# general active cranberry: use where active cranberry makes up over 99% of COVERCODE 6 and 7. 
general_active_cranberry = '((np.isin(df.CropStatus,"active"))|((df.COVERCODE == 21) & (df.USEGENCODE==7))|(np.isin(df.COVERCODE,[6,7])))'
active_cranberry = specific_active_cranberry
abandoned_cranberry = 'np.isin(df["FID"],df[df.CropStatus == "abandoned"]["FID"])'
flowthrough = 'np.isin(df["FID"],df[df.Bog_type == "flowthrough"]["FID"])'
terminus = 'np.isin(df["FID"],df[df.ele5pct == "LE5%"]["FID"])'
flowthrough = terminus
#abandoned_cranberry = '(np.isin(df.CropStatus,"abandoned"))'
#flowthrough = '(np.isin(df.Bog_type,"flowthrough"))'
waterbody = '(np.isin(df.COVERCODE,[22,21]))'# & np.invert('+active_cranberry+'))'
#waterbody = '(np.isin(df.COVERCODE,[22,21]) & np.invert('+active_cranberry+'))'
BASELINE = {
    
        # THE FIRST APPEARING CONDITION TAKES PRIORITY IN NP.SELECT
    
        # key (LUC): value (condition string)
    
        # 4: "Agriculture, Active Cranberry, Flowthrough"
    
        4: active_cranberry+' & '+flowthrough,
    
        # 5: "Agriculture, Active Cranberry, Non Flowthrough"
    
        5: active_cranberry+' & (np.invert('+flowthrough+'))',
    
        # 11: "Retired Cranberry, Flowthrough"
    
        11: abandoned_cranberry+' & '+flowthrough,
    
        # 12: "Retired Cranberry, Non flowthrough"
    
        12: abandoned_cranberry+' & np.invert('+flowthrough+')',
    
        # 1: "Natural Uplands"
    
        1: 'df.COVERCODE.between(8, 12, inclusive="both")',
    
        # 6: "Impervious, Roads"
    
        6: '(df.COVERCODE == 2) & (df.USEGENCODE == 55)',
    
        ## 3: "Agriculture, Non Cranberry"
    
        3: 'df.COVERCODE == [False]',
    
        # 8: "Recieving Body (Estuary)"
        
        8: '(df.USEGENCODE == 0) &' +waterbody, 
        
        # 9: "Freshwater Ponds and Lakes"
    
        9: '(df.USEGENCODE != 0) &' + waterbody,
    
        # 10: "Wetlands"
    
        10: 'df.COVERCODE.between(13, 18, inclusive="both")',
    
        # 2: "Mowed Areas (Lawns, Sports Fields, and Golf Courses)"
    
        2: 'df.COVERCODE == 5',
    
        # 7: "Impervious, Non-Roads"
    
        7: '(df.COVERCODE == 2) & (df.USEGENCODE != 55)',
    
        # 13: "Other (Bare land or Shoreline)"
    
        13: 'np.isin(df.COVERCODE,[19,20])'

    }

In [13]:
import pandas as pd
import numpy as np
# checking condtions 
df = pd.DataFrame({"FID":[0,1,2,3],
                   "COVERCODE":[21,21]*2,
                   "SUB":["0","1"]*2,
                   "CropStatus":["active"]*2 + ["abandoned"]*2,
                   "Bog_type":["","flowthrough"]*2,
                   'ele5pct':["","LE5%"]*2,
                   "USEGENCODE":[0,0,0,0],
                   "USE_CODE":[0,0,0,0]})
'''
df = pd.DataFrame({"FID":[4,5,6,7],
                   "COVERCODE":[2,5]*2,
                   "SUB":["0","1"]*2,
                   "CropStatus":[None]*2 + [None]*2,
                   "Bog_type":[None,None]*2,
                   'ele5pct':["","LE5%"]*2,
                   "USEGENCODE":[55,55,0,0],
                   "USE_CODE":[0,0,0,0]})
df = pd.DataFrame({"FID":[8,9,10,11],
                   "COVERCODE":[9,15,19,22],
                   "SUB":["0","1"]*2,
                   "CropStatus":[None]*2 + [None]*2,
                   "Bog_type":[None,None]*2,
                   'ele5pct':["","LE5%"]*2,
                   "USEGENCODE":[55,55,0,0],
                   "USE_CODE":[0,0,0,0]})
                   '''
for k in BASELINE.keys():
    v = BASELINE[k]
    print(k,v,"\n")
    df[str(k)] = eval(v)
print(df)

4 (np.isin(df.CropStatus,"active")) & np.isin(df["FID"],df[df.ele5pct == "LE5%"]["FID"]) 

5 (np.isin(df.CropStatus,"active")) & (np.invert(np.isin(df["FID"],df[df.ele5pct == "LE5%"]["FID"]))) 

11 np.isin(df["FID"],df[df.CropStatus == "abandoned"]["FID"]) & np.isin(df["FID"],df[df.ele5pct == "LE5%"]["FID"]) 

12 np.isin(df["FID"],df[df.CropStatus == "abandoned"]["FID"]) & np.invert(np.isin(df["FID"],df[df.ele5pct == "LE5%"]["FID"])) 

1 df.COVERCODE.between(8, 12, inclusive="both") 

6 (df.COVERCODE == 2) & (df.USEGENCODE == 55) 

3 df.COVERCODE == [False] 



ValueError: ('Lengths must match to compare', (4,), (1,))

In [31]:
for k in BASELINE.keys():
    v = BASELINE[k]
    print(k,v,"\n")
    eval(v)

4 (np.isin(df.CropStatus,"active")) & np.isin(df["FID"],df[df.ele5pct == "LE5%"]["FID"]) 

5 (np.isin(df.CropStatus,"active")) & (np.invert(np.isin(df["FID"],df[df.ele5pct == "LE5%"]["FID"]))) 

11 np.isin(df["FID"],df[df.CropStatus == "abandoned"]["FID"]) & np.isin(df["FID"],df[df.ele5pct == "LE5%"]["FID"]) 

12 np.isin(df["FID"],df[df.CropStatus == "abandoned"]["FID"]) & np.invert(np.isin(df["FID"],df[df.ele5pct == "LE5%"]["FID"])) 

1 df.COVERCODE.between(8, 12, inclusive="both") 

6 (df.COVERCODE == 2) & (df.USEGENCODE == 55) 

3 df.COVERCODE == [False] 



ValueError: ('Lengths must match to compare', (4,), (1,))

In [27]:
condlist = "["+", ".join(list(BASELINE.values()))+"]"
choicelist = list(BASELINE.keys())
print(choicelist)

list(BASELINE.values())

df["LUC"] = np.select(eval(condlist),choicelist,13)
print(df)

[4, 5, 11, 12, 1, 6, 3, 8, 9, 10, 2, 7, 13]


ValueError: ('Lengths must match to compare', (4,), (1,))