In [191]:
# Census 1 
'''
Select an individual feature by attribute value from one feature class, then select features from another feature class by distance from the original feature. 
Originally used for selecting census block groups within a specified distance of a point feature. 
'''
def select_feature_by_attribute(focus_features, focus_field, focus_value): 
    
    # Construct SQL 
    sql = f"{focus_field} = '{focus_value}'"

    # Apply attribute selection 
    focus_selection, count          = arcpy.management.SelectLayerByAttribute(focus_features, 'NEW_SELECTION', sql)
    if count == '0': 
        print('No focus features selected')
    elif count != '1': 
        pass
    else: 
        print('Multiple features selected')
        
    return focus_selection
    
def export_block_groups_by_distance(geo_features, focus_selection, selection_distance, output_folder, output_name): 
    
    # Select by distance 
    target_selection, merge, count  = arcpy.management.SelectLayerByLocation(geo_features, 'WITHIN_A_DISTANCE', focus_selection, selection_distance, 'NEW_SELECTION')
    print(f'Selected  : {count} features')
    
    # Clip if needed 
    if clip == True: 
        clip_selection = r"memory/clip" 
        arcpy.analysis.Clip(target_selection, clip_features, clip_selection)
    
    # Designate output path 
    geo_output   = os.path.join(output_folder, output_name + '.shp')
    
    # Export target selection 
    if arcpy.Exists(geo_output) and overwrite == True or arcpy.Exists(geo_output) == False: 
        arcpy.conversion.ExportFeatures(clip_selection, geo_output)
    elif arcpy.Exists(geo_output) and overwrite == False: 
        pass 
    
    return geo_output

In [None]:
# Census 2 
'''
Return variables dictionary from JSON url. 
'''
def read_census_json(json_url): 
    
    # Make a GET request to the JSON url 
    response = requests.get(json_url)
    
    # Check if it was successful (status code == 200)
    if response.status_code == 200: 
        
        # This is the initial response, in dictionary form 
        json_dict = response.json()
        
        # There is only one key, so reassign the dictionary variable to that key 
        json_dictionary = json_dict['variables']
    
    else: 
        print("Couldn't read JSON")
    
    return json_dictionary

'''
Parse the dictionary so that it is intelligible. 
This includes all concepts, and will be filtered by an input group code or concept in the next function.
'''
def return_group_dictionary(json_dictionary): 
    
    # Initialize count 
    count = 0 
    
    # Initialize dictionaries 
    group_dictionary            = {} 
    
    # Iterate through JSON dictionary 
    for code, values in json_dictionary.items(): 
        
        try:
            # Identify group variable 
            group   = values['group']
            
            # Initialize dictionary key for group 
            if group not in group_dictionary.keys() and "," not in group: 
                group_dictionary[group]            = {} 
                group_dictionary[group]['concept'] = []
                group_dictionary[group]['codes']   = []
            else: 
                pass
            
        except: 
            pass 
        
        try: 
            # Identify concept variable 
            concept = values['concept']
            
            # Initialize dictionary key for group 
            group_dictionary[group]['concept'] = concept
            group_dictionary[group]['codes'].append(code)

        except KeyError:
            pass
        
    if group_dictionary != {}: 
        pass
    else: 
        print('Group dictionary empty')
        
    return group_dictionary

In [None]:
# Census 3 
'''
Return a list of variable codes based on a designated group code. 
'''
def find_variable_codes(input_group_code, group_dictionary): 
    
    # For group code, concept/codes dictionary
    for group_code, concept_codes_dict in group_dictionary.items(): 
        if group_code == input_group_code: 
            variable_codes = concept_codes_dict['codes']
            
    return variable_codes 

'''
This creates a dictionary of codes and their values. 
'''
def create_code_label_dictionary(variable_codes, json_dictionary): 
    
    code_label_dictionary = {} 
    code_label_dictionary['B01001_001E'] = 'Total Population'

    for code in variable_codes: 
        label = json_dictionary[code]['label'].split('!!')[-1]
        code_label_dictionary[code] = label 

    return code_label_dictionary

In [None]:
# Census 4
'''
Create variable string. 
'''

def create_variable_string(variable_codes): 
    
    # Initialize empty variable string 
    variable_string = r"NAME,B01001_001E"

    # Iterate through filtered dictionary 
    for code in variable_codes: 
        variable_string = variable_string + ',' + code
    
    return variable_string 

'''
Use the geographic field attributes of the feature classes to create a list of urls 
'''
def construct_api_urls(geo_output, api_key, variable_string): 
    
    api_urls = []
    
    with arcpy.da.SearchCursor(geo_output, ['STATE_FIPS', 'COUNTY_FIP', 'TRACT_FIPS', 'BLOCKGROUP']) as cursor: 
        for row in cursor: 
            state       = row[0]
            county      = row[1] 
            tract       = row[2] 
            block_group = row[3] 

            var_clause    = fr"https://api.census.gov/data/2022/acs/acs5?get={variable_string}"
            geo_clause    = fr"&for=block%20group:{block_group}&in=state:{state}%20county:{county}%20tract:{tract}"
            key_clause    = fr"&key={api_key}"

            api_url       = var_clause + geo_clause + key_clause
            api_urls.append(api_url)
        
    return api_urls

'''
Iterate through API URLs and receive dictionary of responses 
'''

def send_api_calls(api_urls): 
    
    # Initialize variables
    time_1  = time.time() 
    json_list = [] 
    url_count = 0

    # Send API call 
    for url in api_urls: 
        url_count += 1 

        print('---------------')
        print(f'Call #{url_count} / {len(api_urls)}')
        print('---------------')
        response = requests.get(url)

        if response.status_code == 200: 
            json_list.append(response.json())
            print(f'>>> Status:       Good')
        else: 
            print('>>> Error: ', response.status_code)

        # Measure and print elapsed time 
        time_2  = time.time() 
        elapsed = round(time_2 - time_1, 2) 
        print(f'>>> Elapsed time: {elapsed}')
        
    return json_list

In [None]:
# Census 5 
'''
Converts output of API calls to dataframe. Index is created from variables, rows each represent a block group. 
'''

def json_list_to_dataframe(json_list):

    output_dictionary = {} 
    
    index_list = json_list[0][0]
    
    for index in index_list: 
        output_dictionary[index] = []

    for json in json_list: 
        values = json[1]

        for i, value in enumerate(values): 
            output_dictionary[index_list[i]].append(value)

    api_dataframe = pd.DataFrame.from_dict(output_dictionary)
    api_dataframe = api_dataframe.rename(columns = code_label_dictionary)
    
    api_dataframe['FIPS'] = api_dataframe['state'] + api_dataframe['county'] + api_dataframe['tract'] + api_dataframe['block group']
    
    return api_dataframe 

'''
Change columns to integers if value is numeric. 
'''
def change_columns_to_integers(dataframe): 
    
    for column in dataframe.columns: 

        try: 
            dataframe[column] = dataframe[column].astype(int)
        except: 
            pass
            
    return dataframe 

'''
Calculate percentage columns. 
'''
def calculate_percentage_columns(dataframe):
    
    # Iterate through columns 
    for column in dataframe.columns:
        
        # If it is in the dictionary
        if column in code_label_dictionary.values(): 
            
            # Identify the column's position  
            column_position = dataframe.columns.get_loc(column) + 1
            # Calculate its value
            percentage_formula = (dataframe[column]/dataframe['Total Population'])*100
            # Insert the new column 
            dataframe.insert(column_position, f'% {column}', round(percentage_formula, 2))
            
    return dataframe 

In [None]:
# Census 6
'''
Merge API data with geometry, drop unnecessary columns, and export to a shapefile. 
'''
def merge_and_export_geodataframe(geo_output, api_dataframe, keep_columns, output_folder, merge_label): 
    
    # Initialize spatial dataframe from feature class 
    geodataframe = gpd.read_file(geo_output)

    # Merge 
    merge_geodataframe = geodataframe.merge(api_dataframe, on = 'FIPS', how = 'left')

    # Drop 
    for column in merge_geodataframe.columns: 
        if column not in keep_columns: 
            merge_geodataframe.drop(columns = column, inplace = True)

    # Export to shapefile 
    merge_shp = os.path.join(output_folder, merge_label) 
    merge_geodataframe.to_file(output_folder + merge_label)
    
    return merge_shp

In [None]:
def merge_shapefile_and_geodataframe(geo_output, api_dataframe, join_field): 
    
    # Initialize spatial dataframe from feature class 
    geodataframe = gpd.read_file(geo_output)

    # Merge 
    merge_geodataframe = geodataframe.merge(api_dataframe, on = join_field, how = 'left')
    
    return merge_geodataframe 

def drop_dataframe_columns(merge_geodataframe, keep_columns):
    
    # Drop 
    for column in merge_geodataframe.columns: 
        if column not in keep_columns: 
            merge_geodataframe.drop(columns = column, inplace = True)
            
    return merge_geodataframe 

def export_gdf_to_shapefile(merge_geodataframe, output_folder, merge_shp_label): 
    
    # Export to shapefile 
    merge_shp = os.path.join(output_folder, merge_shp_label + '.shp') 
    merge_geodataframe.to_file(merge_shp, 'ESRI Shapefile')
    
    return merge_shp

In [None]:
# Census 7
'''
Add to map
'''
def identify_layer(mp_name, output_shp): 

    # Identify layer 
    aprx = arcpy.mp.ArcGISProject('CURRENT')
    mp   = aprx.listMaps(mp_name)[0]
    lyr  = mp.addDataFromPath(output_shp)

    return lyr 

def identify_color_ramp(color_ramp_name): 
    
    # Find color ramp 
    aprx = arcpy.mp.ArcGISProject('CURRENT')
    
    try: 
        color_ramp = aprx.listColorRamps(color_ramp_name)[0]
    except: 
        print('Color ramp not found')
        
    return color_ramp 

'''
Apply unclassed symbology 
'''
def apply_unclassed_symbology(layer, sym_field, color_ramp, lower_label, upper_label): 

    # Identify map 
    aprx = arcpy.mp.ArcGISProject("CURRENT")

    # Initialize weighted_symbology 
    symbology = layer.symbology 

    # Define weighted Colors Renderer
    if symbology.renderer.type != "UnclassedColorsRenderer": 
        symbology.updateRenderer("UnclassedColorsRenderer")
    else: 
        pass
    
    symbology.renderer.field               = sym_field 
    symbology.renderer.colorRamp           = color_ramp 
    symbology.renderer.lowerLabel          = lower_label
    symbology.renderer.upperLabel          = upper_label
    
    # Outline color 
    symbology.renderer.symbolTemplate.outlineColor = {'RGB': [0, 0, 0, 100]}
    symbology.renderer.symbolTemplate.outlineWidth = 1

    # Apply symbology 
    layer.symbology = symbology
    
    return symbology 

In [198]:
'''
Composite 
'''

# Census 2 
def retrieve_acs_json(json_url): 
    
    json_dictionary  = read_census_json(json_url)
    group_dictionary = return_group_dictionary(json_dictionary)
    
    return json_dictionary, group_dictionary

# Census 3 
def retrieve_acs_dictionary(input_group_code, group_dictionary, json_dictionary): 
    
    variable_codes        = find_variable_codes(input_group_code, group_dictionary)
    code_label_dictionary = create_code_label_dictionary(variable_codes, json_dictionary)
    
    return variable_codes, code_label_dictionary 

# Census 4 
def census_api(variable_codes, geo_output, api_key): 
    
    variable_string = create_variable_string(variable_codes)
    api_urls        = construct_api_urls(geo_output, api_key, variable_string)
    json_list       = send_api_calls(api_urls)
    
    return variable_string, json_list 

# Census 5
def create_api_dataframe(json_list): 

    api_dataframe = json_list_to_dataframe(json_list)
    api_dataframe = change_columns_to_integers(api_dataframe)
    api_dataframe = calculate_percentage_columns(api_dataframe)
    
    return api_dataframe

# Census 6 
def create_output_geodataframe(geo_output, api_dataframe, output_folder, output_label, keep_columns):
    
    output_shp = merge_and_export_geodataframe(geo_output, api_dataframe, output_folder, output_label, keep_columns)
    
    return output_shp 

# Census 7 
def visualize_acs_data(mp_name, output_shp, color_ramp_name, lower_label, upper_label): 
    
    layer     = identify_layer(mp_name, output_shp)
    symbology = apply_unclassed_symbology(layer, color_ramp_name, lower_label, upper_label)
    
    return layer 

In [None]:
#####################################################################################################################################################
#####################################################################################################################################################
#####################################################################################################################################################

In [199]:
'''
Master 
'''

# Imports 
import os 
import requests 
import pandas as pd
import geopandas as gpd 

# Flow control 
create_geo_output   = True 
make_api_call       = True 
export_geodataframe = True 

# Focus point 
focus_features      = r'C:\DALE\Personal\Minnesota\MyProject\ACS.gdb\Focus_Points'
focus_field         = 'NAME'
focus_value         = 'SMU Minneapolis'

# Block groups      
geo_features        = r'C:\DALE\Personal\Minnesota\GIS Projects\ACS\Block_Groups.gdb\MN'
selection_distance  = '20 Miles'
output_folder       = r'C:\DALE\Personal\Minnesota\GIS Projects\ACS\Output Data\Block Group Shapefiles'
output_name         = 'White_Alone_Part2'
overwrite           = True 
clip                = True 
clip_features       = r'C:\DALE\Personal\Minnesota\GIS Projects\ACS\Output Data\ACS Data\Metro_White_Alone.shp'

# API 
json_url            = r'https://api.census.gov/data/2022/acs/acs5/variables.json'
input_group_code    = 'B02001'
api_key             = r'1749dc1d87964116107a80cd7d76fca300dda59f'

# Dataframe 
output_folder       = r'C:\DALE\Personal\Minnesota\GIS Projects\ACS\Output Data\ACS Data'    
merge_shp_label     = 'Metro_White_Alone'
join_field          = 'FIPS'
keep_columns        = ['FIPS', 'geometry', '% White alone']

# Map and Layer
mp_name             = 'Census'
sym_field           = '% White al'
color_ramp_name     = 'White_to_Brown'
lower_label         = '100'
upper_label         = '0'

In [200]:
'''
Master Process 
'''

### Create geo output ### 
if create_geo_output == True: 
    
    focus_selection = select_feature_by_attribute(focus_features, focus_field, focus_value)
    geo_output      = export_block_groups_by_distance(geo_features, focus_selection, selection_distance, output_folder, output_name, overwrite, clip, clip_features)

else: 
    geo_output                        = os.path.join(output_folder, output_name)
    print('1: Block groups shapefile already exists:')
    print(output_folder, output_name)

Multiple features selected
Selected  : 2086 features


In [None]:
### Call API ### 
if make_api_call == True: 
    
    # Retrieve concepts JSON 
    json_dictionary, group_dictionary     = retrieve_acs_json(json_url)
    print("2: Retrieved ACS json")

    # Retrieve list of codes based on concept 
    variable_codes, code_label_dictionary = retrieve_acs_dictionary(input_group_code, group_dictionary, json_dictionary)
    print("3: Retrieved ACS codes")
    
    # Create variable string 
    variable_string = create_variable_string(variable_codes)
    
    # Create list of API urls 
    api_urls        = construct_api_urls(geo_output, api_key, variable_string)
    
    # Call API if designated
    print('URLs returned: ', len(api_urls))
    print('Continue?')
    x = input() 
    
    if x != "": 
        pass 
    else: 
        # Call the API 
        json_list       = send_api_calls(api_urls)
        print("4: Called Census API and retrieved data")

    # Create dataframe
    api_dataframe                         = create_api_dataframe(json_list)
    print("5: Converted API response to dataframe")

### Export geodataframe ### 
if export_geodataframe == True: 
    
    # Merge API dataframe with block groups, drop unnecessary columns, and export to shapefile 
    merge_geodataframe = merge_shapefile_and_geodataframe(geo_output, api_dataframe, join_field)
    merge_geodataframe = drop_dataframe_columns(merge_geodataframe, keep_columns)
    merge_shp          = export_gdf_to_shapefile(merge_geodataframe, output_folder, merge_shp_label)

    print("6: Export dataframe to shapefile")
    print(f" {merge_shp}")

In [None]:
#####################################################################################################################################################
#####################################################################################################################################################
#####################################################################################################################################################

In [None]:
for d in dir(symbology.renderer.symbolTemplate): 
    if d.startswith("_") == False: 
        print(d)

In [None]:
symbology.renderer.symbolTemplate.outlineColor

In [None]:
# Find color ramp 
for cr in aprx.listColorRamps(): 
    if cr.name == 'White_to_Brown': 
        print(cr.name)

In [None]:
### Add to map ### 
layer      = identify_layer(mp_name, merge_shp)
color_ramp = identify_color_ramp(color_ramp_name)
symbology  = apply_unclassed_symbology(layer, sym_field, color_ramp, lower_label, upper_label)