# Income Inequality

In [None]:
import clickhouse_connect
import polars as pl
import matplotlib.pyplot as plt
import numpy as np
import geopandas as gp
import pandas as pd
import topojson
import json

In [None]:
client = clickhouse_connect.get_client(host='hub.publichealthhq.xyz', port=18123, username='default', password='Password123!')

## Get RUCC

In [14]:
res = client.query('SELECT FIPS as STATE_COUNTY_FIPS, RUCC FROM cps_00004.rural_urban_codes')
rucc_df = pl.from_dicts(res.named_results(), infer_schema_length=400)
rucc_df = rucc_df.to_pandas()

In [15]:
rucc_df['STATE_COUNTY_FIPS'] = rucc_df['STATE_COUNTY_FIPS'].astype(int)

## Get Geospatial Data

In [16]:

with open('./counties-albers-10m.json', 'r') as f:
    data = json.load(f)
topo = topojson.Topology(data, object_name='counties')

# May be useful for making the background/border
# topo.simplify(4)

gdf = topo.to_gdf()

gdf['STATE_COUNTY_FIPS'] = gdf.index
gdf['STATE_COUNTY_FIPS'] = gdf['STATE_COUNTY_FIPS'].astype(int)

In [17]:
gdf.set_crs(crs='EPSG:3857', inplace=True)
gdf_buf = gdf.copy()
gdf_buf['geometry'] = gdf.buffer(100)


  return lib.buffer(


In [18]:
gdf_buf

Unnamed: 0,geometry,name,STATE_COUNTY_FIPS
04015,"POLYGON ((57.835 341.969, 56.777 348.801, 56.2...",Mohave,4015
22105,"POLYGON ((503.465 483.833, 503.923 491.789, 50...",Tangipahoa,22105
16063,"POLYGON ((95.926 152.846, 94.383 160.622, 92.9...",Lincoln,16063
27119,"POLYGON ((383.996 115.363, 384.615 117.654, 38...",Polk,27119
38017,"POLYGON ((371.614 108.442, 371.512 114.348, 37...",Cass,38017
...,...,...,...
31101,"POLYGON ((295.711 238.939, 295.457 243.914, 29...",Keith,31101
28001,"POLYGON ((648.670 382.554, 646.848 381.080, 63...",Adams,28001
36069,"POLYGON ((700.849 187.677, 701.165 193.186, 70...",Ontario,36069
54053,"POLYGON ((639.869 300.935, 641.024 308.982, 64...",Mason,54053


In [19]:
joined_rucc = pd.merge(gdf_buf, rucc_df, on=['STATE_COUNTY_FIPS'], how='inner', validate='1:1')
joined_rucc

Unnamed: 0,geometry,name,STATE_COUNTY_FIPS,RUCC
0,"POLYGON ((57.835 341.969, 56.777 348.801, 56.2...",Mohave,4015,3
1,"POLYGON ((503.465 483.833, 503.923 491.789, 50...",Tangipahoa,22105,3
2,"POLYGON ((95.926 152.846, 94.383 160.622, 92.9...",Lincoln,16063,8
3,"POLYGON ((383.996 115.363, 384.615 117.654, 38...",Polk,27119,3
4,"POLYGON ((371.614 108.442, 371.512 114.348, 37...",Cass,38017,3
...,...,...,...,...
3128,"POLYGON ((295.711 238.939, 295.457 243.914, 29...",Keith,31101,9
3129,"POLYGON ((648.670 382.554, 646.848 381.080, 63...",Adams,28001,7
3130,"POLYGON ((700.849 187.677, 701.165 193.186, 70...",Ontario,36069,1
3131,"POLYGON ((639.869 300.935, 641.024 308.982, 64...",Mason,54053,6


## Spatial Join
The single line below is really critical since it is performing the spatial join that will match up each county with the its neighboring counties.

The projected/selected spatial join results will be saved in Clickhouse to be used as a precomputed table. This will give us the ability to make some parts of the analysis interactive without having to wait for the expensive joins.

In [None]:
cross_joined = gp.sjoin(joined_rucc, joined_rucc, how='inner', predicate='intersects')
cross_joined = cross_joined[ cross_joined['STATE_COUNTY_FIPS_left'] !=  cross_joined['STATE_COUNTY_FIPS_right']]

In [None]:
!pip install shapely

In [None]:
for (i, group) in enumerate(cross_joined.groupby('STATE_COUNTY_FIPS_right')):
    if i > 10:
        break
    print(group[1])

In [None]:
selected = cross_joined[['name_left', 'STATE_COUNTY_FIPS_left', 'RUCC_left', 'name_right', 'STATE_COUNTY_FIPS_right', 'RUCC_right']]

In [None]:
selected[ ]

### Save as CSV
We'll use this as a pre-computed table in the database since I never got turf to work for the spatial joins.

In [None]:
import csv
with open('neighboring_counties.csv', 'w') as f:
    writer = csv.DictWriter(f, fieldnames=['ROWNUM', 'STATE_COUNTY_FIPS_left', 'GINI_left', 'AVG_AGI_left', 'STATE_COUNTY_FIPS_right', 'GINI_right', 'AVG_AGI_right'])
    for (i, (_, row)) in enumerate(selected.iterrows()):
        writer.writerow({
            'ROWNUM': i,
            'STATE_COUNTY_FIPS_left': row['STATE_COUNTY_FIPS_left'],
            'GINI_left': row['GINI_left'],
            'AVG_AGI_left': row['avg_agi_left'],
            'STATE_COUNTY_FIPS_right': row['STATE_COUNTY_FIPS_right'],
            'GINI_right': row['GINI_right'],
            'AVG_AGI_right': row['avg_agi_right']
        })

## Group counties
Here we group the counties into four groups:

1. Poor counties neighbored only by poor counties
2. Rich counties neighbored only by rich counties
3. Poor counties neighbored by at least one rich county
4. Rich counties neighbored by at least one poor county

In [None]:
poor = selected['avg_agi_right'].quantile(0.10)
rich = selected['avg_agi_right'].quantile(0.90)


selected_pl = pl.from_pandas(selected)

# Just showing the same thing with polars
# poor_pl = selected_pl['avg_agi_left'].quantile(0.10)

print(f'poor: ${poor:.2f}')
print(f'rich: ${rich:.2f}')

poor_near_poor = selected_pl\
.filter(pl.col('avg_agi_left') <= poor )\
.group_by('STATE_COUNTY_FIPS_left')\
    .agg( avg_agi_right_max=pl.col('avg_agi_right').max() )\
.filter( pl.col('avg_agi_right_max') <= poor )

rich_near_rich = selected_pl\
.filter( pl.col('avg_agi_left') >= rich )\
.group_by('STATE_COUNTY_FIPS_left')\
    .agg( avg_agi_right_max=pl.col('avg_agi_right').max() )\
.filter( pl.col('avg_agi_right_max') >= rich )

poor_near_rich = selected_pl\
.filter(pl.col('avg_agi_left') <= poor )\
.group_by('STATE_COUNTY_FIPS_left')\
    .agg( avg_agi_right_max=pl.col('avg_agi_right').max() )\
.filter( pl.col('avg_agi_right_max') >= rich )

# Notice that this uses min in the agg, not max
rich_near_poor = selected_pl\
.filter(pl.col('avg_agi_left') >= rich )\
.group_by('STATE_COUNTY_FIPS_left')\
    .agg( avg_agi_right_max=pl.col('avg_agi_right').min() )\
.filter( pl.col('avg_agi_right_max') <= poor )

In [None]:
print(f'poor_near_poor: {len(poor_near_poor)}')
print(f'rich_near_rich: {len(rich_near_rich)}')
print(f'poor_near_rich: {len(poor_near_rich)}')
print(f'rich_near_poor: {len(rich_near_poor)}')

In [None]:
pnp_list = list(poor_near_poor['STATE_COUNTY_FIPS_left'])
rnr_list = list(rich_near_rich['STATE_COUNTY_FIPS_left'])
pnr_list = list(poor_near_rich['STATE_COUNTY_FIPS_left'])
rnp_list = list(rich_near_poor['STATE_COUNTY_FIPS_left'])

In [None]:
pnp_list

In [None]:
pnp_fips = poor_near_poor.select(['STATE_COUNTY_FIPS_left'])
renamed_pnp = selected_pl\
.join(pnp_fips, on='STATE_COUNTY_FIPS_left', how='inner')\
.rename({
    'STATE_left': 'STATE_FIPS_focus',
    'STATE_COUNTY_FIPS_left': 'STATE_COUNTY_FIPS_focus',
    'GINI_left': 'GINI_focus',
    'avg_agi_left': 'AVG_AGI_focus',
    'STATE_right': 'STATE_FIPS_adj',
    'STATE_COUNTY_FIPS_right': 'STATE_COUNTY_FIPS_adj',
    'GINI_right': 'GINI_adj',
    'avg_agi_right': 'AVG_AGI_adj'
})

In [None]:
rnr_fips = rich_near_rich.select(['STATE_COUNTY_FIPS_left'])
renamed_rnr = selected_pl\
.join(rnr_fips, on='STATE_COUNTY_FIPS_left', how='inner')\
.rename({
    'STATE_left': 'STATE_FIPS_focus',
    'STATE_COUNTY_FIPS_left': 'STATE_COUNTY_FIPS_focus',
    'GINI_left': 'GINI_focus',
    'avg_agi_left': 'AVG_AGI_focus',
    'STATE_right': 'STATE_FIPS_adj',
    'STATE_COUNTY_FIPS_right': 'STATE_COUNTY_FIPS_adj',
    'GINI_right': 'GINI_adj',
    'avg_agi_right': 'AVG_AGI_adj'
})

In [None]:
pnr_fips = poor_near_poor.select(['STATE_COUNTY_FIPS_left'])
renamed_pnr = selected_pl\
.join(pnr_fips, on='STATE_COUNTY_FIPS_left', how='inner')\
.rename({
    'STATE_left': 'STATE_FIPS_focus',
    'STATE_COUNTY_FIPS_left': 'STATE_COUNTY_FIPS_focus',
    'GINI_left': 'GINI_focus',
    'avg_agi_left': 'AVG_AGI_focus',
    'STATE_right': 'STATE_FIPS_adj',
    'STATE_COUNTY_FIPS_right': 'STATE_COUNTY_FIPS_adj',
    'GINI_right': 'GINI_adj',
    'avg_agi_right': 'AVG_AGI_adj'
})

In [None]:
rnp_fips = poor_near_poor.select(['STATE_COUNTY_FIPS_left'])
renamed_rnp = selected_pl\
.join(rnp_fips, on='STATE_COUNTY_FIPS_left', how='inner')\
.rename({
    'STATE_left': 'STATE_FIPS_focus',
    'STATE_COUNTY_FIPS_left': 'STATE_COUNTY_FIPS_focus',
    'GINI_left': 'GINI_focus',
    'avg_agi_left': 'AVG_AGI_focus',
    'STATE_right': 'STATE_FIPS_adj',
    'STATE_COUNTY_FIPS_right': 'STATE_COUNTY_FIPS_adj',
    'GINI_right': 'GINI_adj',
    'avg_agi_right': 'AVG_AGI_adj'
})

In [None]:
res = client.query('''
        SELECT STATE_COUNTY_FIPS_left, GINI_left, AVG_AGI_left, STATE_COUNTY_FIPS_right, GINI_right, AVG_AGI_right 
        FROM cps_00004.neighboring_counties
    ''')
df = pl.from_dicts(res.named_results(), infer_schema_length=400)
df = df.select(
    pl.col('STATE_COUNTY_FIPS_left'), 
    pl.col('GINI_left'), 
    pl.col('AVG_AGI_left').cast(pl.Float64), 
    pl.col('STATE_COUNTY_FIPS_right'), 
    pl.col('GINI_right'), 
    pl.col('AVG_AGI_right').cast(pl.Float64)
)
poor = df['AVG_AGI_left'].quantile(0.10)
rich = df['AVG_AGI_left'].quantile(0.90)

poor_near_poor = df\
.filter(pl.col('AVG_AGI_left') <= poor )\
.group_by('STATE_COUNTY_FIPS_left')\
    .agg( AVG_AGI_right_max=pl.col('AVG_AGI_right').max() )\
.filter( pl.col('AVG_AGI_right_max') <= poor )\


rich_near_rich = df\
.filter( pl.col('AVG_AGI_left') >= rich )\
.group_by('STATE_COUNTY_FIPS_left')\
    .agg( AVG_AGI_right_max=pl.col('AVG_AGI_right').max() )\
.filter( pl.col('AVG_AGI_right_max') >= rich )\

poor_near_rich = df\
.filter(pl.col('AVG_AGI_left') <= poor )\
.group_by('STATE_COUNTY_FIPS_left')\
    .agg( AVG_AGI_right_max=pl.col('AVG_AGI_right').max() )\
.filter( pl.col('AVG_AGI_right_max') >= rich )\

# Notice that this uses min in the agg, not max
rich_near_poor = df\
.filter(pl.col('AVG_AGI_left') >= rich )\
.group_by('STATE_COUNTY_FIPS_left')\
    .agg( AVG_AGI_right_max=pl.col('AVG_AGI_right').min() )\
.filter( pl.col('AVG_AGI_right_max') <= poor ) \


pnp_fips = poor_near_poor.select(['STATE_COUNTY_FIPS_left'])
renamed_pnp = df\
.join(pnp_fips, on='STATE_COUNTY_FIPS_left', how='inner')\
.filter( pl.col('STATE_COUNTY_FIPS_left').ne(pl.col('STATE_COUNTY_FIPS_right')) )\
.rename({
    'STATE_COUNTY_FIPS_left': 'STATE_COUNTY_FIPS_focus',
    'GINI_left': 'GINI_focus',
    'AVG_AGI_left': 'AVG_AGI_focus',
    'STATE_COUNTY_FIPS_right': 'STATE_COUNTY_FIPS_adj',
    'GINI_right': 'GINI_adj',
    'AVG_AGI_right': 'AVG_AGI_adj'
})

rnr_fips = rich_near_rich.select(['STATE_COUNTY_FIPS_left'])
renamed_rnr = df\
.join(rnr_fips, on='STATE_COUNTY_FIPS_left', how='inner')\
.filter( pl.col('STATE_COUNTY_FIPS_left').ne(pl.col('STATE_COUNTY_FIPS_right')) )\
.rename({
    'STATE_COUNTY_FIPS_left': 'STATE_COUNTY_FIPS_focus',
    'GINI_left': 'GINI_focus',
    'AVG_AGI_left': 'AVG_AGI_focus',
    'STATE_COUNTY_FIPS_right': 'STATE_COUNTY_FIPS_adj',
    'GINI_right': 'GINI_adj',
    'AVG_AGI_right': 'AVG_AGI_adj'
})

pnr_fips = poor_near_rich.select(['STATE_COUNTY_FIPS_left'])
renamed_pnr = df\
.join(pnr_fips, on='STATE_COUNTY_FIPS_left', how='inner')\
.filter( pl.col('STATE_COUNTY_FIPS_left').ne(pl.col('STATE_COUNTY_FIPS_right')) )\
.rename({
    'STATE_COUNTY_FIPS_left': 'STATE_COUNTY_FIPS_focus',
    'GINI_left': 'GINI_focus',
    'AVG_AGI_left': 'AVG_AGI_focus',
    'STATE_COUNTY_FIPS_right': 'STATE_COUNTY_FIPS_adj',
    'GINI_right': 'GINI_adj',
    'AVG_AGI_right': 'AVG_AGI_adj'
})

rnp_fips = rich_near_poor.select(['STATE_COUNTY_FIPS_left'])
renamed_rnp = df\
.join(rnp_fips, on='STATE_COUNTY_FIPS_left', how='inner')\
.filter( pl.col('STATE_COUNTY_FIPS_left').ne(pl.col('STATE_COUNTY_FIPS_right')) )\
.rename({
    'STATE_COUNTY_FIPS_left': 'STATE_COUNTY_FIPS_focus',
    'GINI_left': 'GINI_focus',
    'AVG_AGI_left': 'AVG_AGI_focus',
    'STATE_COUNTY_FIPS_right': 'STATE_COUNTY_FIPS_adj',
    'GINI_right': 'GINI_adj',
    'AVG_AGI_right': 'AVG_AGI_adj'
})

In [None]:
renamed_rnp.join(renamed_pnr, left_on='STATE_COUNTY_FIPS_adj', right_on='STATE_COUNTY_FIPS_focus', how='inner')