# LEHD LODES ETL - OD

QC notebook updated to leverage Dask for faster processing of OD data 

QC checks:
1. Total row count, unique w_geocode, unique h_geocode
2. Sum of job variables

Author: DLE  
Last Updated: 6/16/2023

In [57]:
import pandas as pd
import os
import dask.dataframe as dd
import sqlalchemy as sa

# Data Preparation

In [58]:
# Used for data processing range
year_low = 2002
year_high = 2006

## LEHD Source Data

Includes California OD data (in `od` folder) and out-of-state OD data (in `oos` folder) 

**<u>California</u>** OD data includes: 
1. Workers who live in CA and work in San Diego (`w_geocode = 6073*`)
2. Workers who live in San Diego and work in CA/San Diego (`h_geocode = 6073*`)

**<u>Out-of-state</u>** OD data includes:
1. Workers who live in San Diego but work out of state (`h_geocode = 6073*`)

In [59]:
path = 'R:/DPOE/LEHD LODES/8.0/Source'

In [60]:
# Runtime: < 30 sec per year

df = dict()
df_year_data = []

for year in range(year_low, year_high+1):
    
    # CALIFORNIA OD
    # Subset data where origin OR destination is in San Diego (6073*)

    df_ca = dd.read_csv(path + f"/od/*{year}.csv")
    df_ca = df_ca[df_ca['w_geocode'].astype(str).str.startswith('6073') | 
                df_ca['h_geocode'].astype(str).str.startswith('6073')]
    
    # OUT-OF-STATE OD
    # Subset data where origin is in San Diego (6073*)
    
    df_oos = dd.read_csv(path + f"/oos/*{year}.csv")
    df_oos = df_oos[df_oos['h_geocode'].astype(str).str.startswith('6073')]

    # Combine California and OOS
    df_concat = dd.concat([df_ca, df_oos])
    df[year] = df_concat

    print('---- complete: ' + str(year))

---- complete: 2002
---- complete: 2003
---- complete: 2004
---- complete: 2005
---- complete: 2006


## SQL data

In [61]:
# SQL connection using SQLAlchemy

server = 'ddamwsql16'
database = 'dpoe_stage'
driver = 'ODBC Driver 17 for SQL Server'
url = f"mssql+pyodbc://{server}/{database}?trusted_connection=yes&driver={driver}"
engine = sa.create_engine(url)

In [89]:
qry_info = f"""
                SELECT  [yr],
                        COUNT(yr) as [count],
                        COUNT(DISTINCT [w_geocode]) as [unique_w],
                        COUNT(DISTINCT [h_geocode]) as [unique_h]
                FROM [dpoe_stage].[lehd_lodes].[od_8_0]
                WHERE [yr] BETWEEN {year_low} AND {year_high}
                GROUP BY [yr]
                ORDER BY [yr]
            """

qry_sum =  f"""
                SELECT  [yr]
                        ,SUM([S000]) as [S000]
                        ,SUM([SA01]) AS [SA01]
                        ,SUM([SA02]) AS [SA02]
                        ,SUM([SA03]) AS [SA03]
                        ,SUM([SE01]) AS [SE01]
                        ,SUM([SE02]) AS [SE02]
                        ,SUM([SE03]) AS [SE03]
                        ,SUM([SI01]) AS [SI01]
                        ,SUM([SI02]) AS [SI02]
                        ,SUM([SI03]) AS [SI03]
                FROM [dpoe_stage].[lehd_lodes].[od_8_0]
                WHERE yr BETWEEN {year_low} AND {year_high}
                GROUP BY yr
                ORDER BY yr
            """

# QC: Summary characteristics

- Number of records
- Unique w_geocode
- Unique h_geocode

In [63]:
# LEHD SOURCE
# Runtime: ~5 min per year

df_year_data = []

for year in range(year_low, year_high+1):
    df_rows = len(df[year].index)
    df_w_geocodes = len(df[year].drop_duplicates(subset=['w_geocode']))
    df_h_geocodes = len(df[year].drop_duplicates(subset=['h_geocode']))

    year_data = [year, df_rows, df_w_geocodes, df_h_geocodes]
    df_year_data.append(year_data)

df_info = pd.DataFrame(df_year_data, columns=['year', 'count', 'unique_w', 'unique_h'])
df_info

Unnamed: 0,year,count,unique_w,unique_h
0,2002,4336770,41316,100781
1,2003,4433531,43124,100609
2,2004,4543000,45425,102109
3,2005,4651598,46654,105907
4,2006,4747969,47778,108165


In [87]:
# SQL DATABASE

db_info = pd.read_sql(qry_info, con=engine)
db_info

Unnamed: 0,yr,count,unique_w,unique_h
0,2002,4336770,41316,100781
1,2003,4433531,43124,100609
2,2004,4543000,45425,102109
3,2005,4651598,46654,105907
4,2006,4747969,47778,108165


In [88]:
diff_count = df_info.set_index('year') - db_info.set_index('yr')
diff_count.reset_index()

Unnamed: 0,year,count,unique_w,unique_h
0,2002,0,0,0
1,2003,0,0,0
2,2004,0,0,0
3,2005,0,0,0
4,2006,0,0,0


# QC: Sum of columns

In [67]:
# LEHD SOURCE
cols = ['S000','SA01','SA02','SA03','SE01','SE02','SE03','SI01','SI02','SI03']
df_total = pd.DataFrame(columns=cols)

for year in range(year_low, year_high+1):
    total = df[year][cols].sum()
    total.name = year
    # Convert dask to dataframe
    total = total.compute().to_frame().transpose()
    df_total = pd.concat([df_total, total], axis=0)

    print('---- complete: ' + str(year))

df_total

Unnamed: 0,S000,SA01,SA02,SA03,SE01,SE02,SE03,SI01,SI02,SI03
2002,4829597.0,1447870.0,2822629.0,559098.0,1385251.0,1938338.0,1506008.0,896232.0,971467.0,2961898.0
2003,4933686.0,1500950.0,2844866.0,587870.0,1387821.0,1967390.0,1578475.0,863680.0,979196.0,3090810.0
2004,5040895.0,1545347.0,2870191.0,625357.0,1371617.0,1973854.0,1695424.0,912648.0,997464.0,3130783.0
2005,5157468.0,1603084.0,2892210.0,662174.0,1353565.0,1990251.0,1813652.0,936029.0,1028555.0,3192884.0
2006,5258258.0,1657397.0,2899110.0,701751.0,1341864.0,2009405.0,1906989.0,935651.0,1043205.0,3279402.0


In [90]:
# SQL DATABASE

db_total = pd.read_sql(qry_sum, con=engine)
db_total = db_total.set_index('yr')
db_total.index.name = None
db_total

Unnamed: 0,S000,SA01,SA02,SA03,SE01,SE02,SE03,SI01,SI02,SI03
2002,4829597,1447870,2822629,559098,1385251,1938338,1506008,896232,971467,2961898
2003,4933686,1500950,2844866,587870,1387821,1967390,1578475,863680,979196,3090810
2004,5040895,1545347,2870191,625357,1371617,1973854,1695424,912648,997464,3130783
2005,5157468,1603084,2892210,662174,1353565,1990251,1813652,936029,1028555,3192884
2006,5258258,1657397,2899110,701751,1341864,2009405,1906989,935651,1043205,3279402


In [91]:
diff_tot = df_total - db_total
diff_tot

Unnamed: 0,S000,SA01,SA02,SA03,SE01,SE02,SE03,SI01,SI02,SI03
2002,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2003,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2004,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2005,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2006,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
