# Address Geocode ETL

This notebook connects to our databases, extracts address and like data, and then passes them through third-party geocoding services in order to extract geometry data 
that can be used for geospatial analysis. 


Geocoding details:
- MapBox API Pricing
    - https://www.mapbox.com/pricing#temporary-geocoding-api
| Monthly requests       | Cost per 1,000 (as of Nov 2024) |
|------------------------|----------------------------|
| Up to 100,000          | Free                       |
| 100,001 to 500,000     | \$0.75                     |
| 500,001 to 1,000,000   | \$0.60                     |
| 1,000,001 to 4,999,999 | \$0.45                     |
| 5,000,000+             | Contact sales for discount |

## User Input

In [1]:
import pandas as pd
import ipywidgets as widgets
import os
import sqlalchemy
from sqlalchemy import inspect
from dotenv import load_dotenv
from tqdm import tqdm
from datetime import datetime
from functools import partial
from geopy.extra.rate_limiter import RateLimiter
from geopy.geocoders import MapBox
from shapely import wkt
from sqlalchemy import text

### Database Schema Selection

Select the database schema.

In [2]:
database_dirpath = os.path.join('..', 'data', 'databases')
database_schemas = []
for r, d, f in os.walk(database_dirpath):
    for file in f:
        if file.endswith('.db'):
            path = os.path.join(r, file)
            database_schemas.append(path)

    
selected_database_schema = widgets.Dropdown(
    options = database_schemas,
    disabled=False,
    layout=widgets.Layout(width='max-content'),
)
selected_database_schema

Dropdown(layout=Layout(width='max-content'), options=('../data/databases/census_bureau/census_acsse_2022_count…

### Database Table Selection

In [3]:
sql_engine = sqlalchemy.create_engine('sqlite:///' + selected_database_schema.value)
insp = inspect(sql_engine)

selected_database_table = widgets.Dropdown(
    options = insp.get_table_names(),
    disabled=False,
    layout=widgets.Layout(width='max-content'),
)
selected_database_table

Dropdown(layout=Layout(width='max-content'), options=('general_2024', 'general_2024_geocoded'), value='general…

### Database Column Selection

In [23]:
df = pd.read_sql_table(selected_database_table.value, sql_engine)
df.shape  # 879630

(879532, 34)

View the dataframe head above and select the column to be geocoded from the dropdown below.

In [11]:
columns = df.columns.tolist()
selected_database_column = widgets.Dropdown(
    options = columns,
    disabled=False,
    layout=widgets.Layout(width='max-content'),
)
selected_database_column

Dropdown(layout=Layout(width='max-content'), options=('index', 'county_code', 'precinct', 'vuid', 'last_name',…

## Extract

In [12]:
# select only rows without geometry data (create geometry column if none exists)
if 'geometry' in df.columns:
    missing_geometries = df[df['geometry'].eq('')]
else:
    df['geometry'] = ''
    missing_geometries = df['geometry']

len(df[df['geometry'] == '']) # 879630

879630

In [13]:
# select only rows without geometry data (create geometry column if none exists)
if 'last_updated' in df.columns:
    last_updated = df[df['last_updated'].eq('')]
else:
    df['last_updated'] = ''
    last_updated = df['last_updated']

len(df[df['last_updated'] == '']) # 879287

879630

In [14]:
# subset dataframe to keep within API limits 
df = df[df['geometry'].eq('')][-99:]
df.shape

(99, 36)

## Transform

In [15]:
# API key string, or import from .env file
load_dotenv()
api_key = os.getenv('mapbox_access_token')

# initializes geocoder class using Mapbox as source
geolocator = MapBox(api_key=api_key)
geocode = RateLimiter(geolocator.geocode)
# creates progress bar
tqdm.pandas()

# begin geocoding selected rows
df['location'] = df[selected_database_column.value].progress_apply(partial(geocode, exactly_one=True))

100%|██████████| 99/99 [00:06<00:00, 14.97it/s]


In [16]:
# removes PERM ADDRESS column since location data is stored in new `location` column
df = df.drop(['perm_address'], axis=1)

# creates column that contains only latitude and longitude data for geospatial analysis
df['geometry'] = df['location'].apply(lambda loc: (loc.latitude, loc.longitude) if loc else None)

# adds string `POINT` to `geometry` field to fulfil geodataframe requirements
df['geometry'] = 'POINT ' + df['geometry'].astype(str)

# replaces extraneous commas in field
df['geometry'] = df['geometry'].str.replace(',', '')

# converts `geometry` field to geometry field
df['geometry'] = df['geometry'].apply(wkt.loads)

# removes extraneous data from `location` (validated address) field
df['location'] = df['location'].apply(lambda loc: loc[0])

# adds update timestamp
df['last_updated'] = datetime.now()

# transforms all fields to str in order to write data to SQLite
df = df.astype(str)

df = df.rename({'index': 'id'}, axis='columns')

### Load
Loads dataframe data into new database table

In [17]:
df.to_sql(f'{selected_database_table.value}_geocoded', sql_engine, if_exists='append', index=True)

99

### Post-ETL
Clean up original database by deleting rows that are in geocoded database from original database.

In [18]:
count_duplicate_sql = text('SELECT id from {}'.format(f'{selected_database_table.value}_geocoded'))

In [19]:
with sql_engine.connect() as conn:
    trans = conn.begin()
    try:
        cursor = conn.execute(count_duplicate_sql)
        result = cursor.mappings().fetchall()
        trans.commit()
    except:
        trans.rollback()
        raise 

In [20]:
database_ids_list = []
for item in result:
    for key, value in item.items():
        database_ids_list.append(value)
        
database_ids = ', '.join(database_ids_list)

In [21]:
delete_duplicate_sql = text('DELETE from {} WHERE rowid IN ({})'.format(f'{selected_database_table.value}', database_ids))

In [22]:
with sql_engine.connect() as conn:
    trans = conn.begin()
    try:
        cursor = conn.execute(delete_duplicate_sql)
        result = cursor
        trans.commit()
    except:
        trans.rollback()
        raise 