In [13]:
import pandas as pd
import geopandas as gpd

from state_dict import us_state_abbrev

import sys
import zipfile
import os

import ftplib

from tempfile import mkstemp
from io import BytesIO

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

from time import sleep

### Connection to FTP support

In [2]:
def connect_census(typecheck=None):
    def realdec(fn):
        def wrap(*args, **kwargs):
            try:
                try:
                    ftp.quit()
                except:
                    pass
                ftp = ftplib.FTP("ftp2.census.gov", timeout=10)
                ftp.login(user='anonymous')
                print('Login successful \nRunning function')
                frame = fn(ftp=ftp, *args, **kwargs)
                print('Frame loaded')
                ftp.quit()
                return frame
            except ftplib.all_errors:
                 print(sys.exc_info())
                 return False
    
        def wrapper(*args, **kwargs):
            var = wrap(*args, **kwargs)
            while not type(var) is typecheck:
                print('Retrying')
                var = wrap(*args, **kwargs)
            return var
        return wrapper
    return realdec

### Download ACS sequence from FTP, merge with template, save

In [6]:
@connect_census(pd.DataFrame)
def get_us_seq(seq=59, year=2016, timeframe=5, tract=False, ftp=None):
    temp = BytesIO()
    fname = f"{year}{timeframe}us{seq:0>4}000"
    path = f"programs-surveys/acs/summary_file/{year}/data/{timeframe}_year_seq_by_state/UnitedStates/All_Geographies_Not_Tracts_Block_Groups/{fname}.zip"
    print(f'Downloading {path}')
    ftp.retrbinary(f"RETR {path}", temp.write)
    with zipfile.ZipFile(temp) as z:
        with z.open(f"e{fname}.txt") as f:
            est = pd.read_csv(f, header=None)
    print('Sequence table created')
    tpath = f'programs-surveys/acs/summary_file/{year}/data/{year}_{timeframe}yr_Summary_FileTemplates.zip'
    temp = BytesIO()
    ftp.retrbinary(f'RETR {tpath}', temp.write)
    print('Downloading templates')
    with zipfile.ZipFile(temp) as z:
        with z.open(f'templates/Seq{seq}.xls') as f:
            template = pd.read_excel(f, sheet_name="E")
    print('Template table created')
    est.columns = template.columns
    print('Sequence and template merged')
    return est

def step1():
    frame = get_us_seq()
    frame.to_pickle('step1_output.pkl')
    del frame

step1()
frame = pd.read_pickle('step1_output.pkl')
frame.head()

(<class 'socket.timeout'>, timeout('timed out',), <traceback object at 0x0000017B56474648>)
Retrying
Login successful 
Running function
Downloading programs-surveys/acs/summary_file/2016/data/5_year_seq_by_state/UnitedStates/All_Geographies_Not_Tracts_Block_Groups/20165us0059000.zip
Sequence table created
Downloading templates
Template table created
Sequence and template merged
Frame loaded


Unnamed: 0,FILEID,FILETYPE,STUSAB,CHARITER,SEQUENCE,LOGRECNO,B19001_001,B19001_002,B19001_003,B19001_004,...,B19025_001,B19025A_001,B19025B_001,B19025C_001,B19025D_001,B19025E_001,B19025F_001,B19025G_001,B19025H_001,B19025I_001
0,ACSSF,201600000.0,us,0,59,1,117716237,8243664,6000362,5958713,...,9166046176400,7455138526400,745932129000,44105842400,535929539900.0,10582556400,218055427800,156302154500,6843444155500,879243313400
1,ACSSF,201600000.0,us,0,59,2,95047298,6835554,4843053,4782794,...,7535235469200,5939616862200,689614123300,29615345800,520038954800.0,9854728700,207513111300,138982343100,5368909529600,825186743800
2,ACSSF,201600000.0,us,0,59,3,22668939,1408110,1157309,1175919,...,1630810707200,1515521664200,56318005800,14490496600,15890585100.0,727827600,10542316500,17319811300,1474534625900,54056569600
3,ACSSF,201600000.0,us,0,59,4,331277,42690,24386,21581,...,18313116600,10252127400,263864100,6389423100,396675000.0,55837500,358662600,596526800,9443728400,1376802600
4,ACSSF,201600000.0,us,0,59,5,370,68,46,21,...,16215000,2275200,976100,11584300,,.,.,1010600,2127500,147700


### Download ACS geography file and the template, clean, and save

In [8]:
@connect_census(pd.DataFrame)
def get_us_geo(year=2016, timeframe=5, tract=False, ftp=None):
    temp = BytesIO()
    path = f'RETR /programs-surveys/acs/summary_file/{year}/data/{timeframe}_year_seq_by_state/UnitedStates/All_Geographies_Not_Tracts_Block_Groups/g{year}{timeframe}us.csv'
    print(path)
    ftp.retrbinary(path, temp.write)
    temp.seek(0)
    geographies = pd.read_csv(temp, low_memory=False, encoding='latin 1', header=None)
    path = f"RETR /programs-surveys/acs/summary_file/{year}/data/{year}_{timeframe}yr_Summary_FileTemplates.zip"
    temp = BytesIO()
    ftp.retrbinary(path, temp.write)
    with zipfile.ZipFile(temp) as z:
        with z.open(f'templates/{year}_SFGeoFileTemplate.xls') as f:
            geo_template = pd.read_excel(f, encoding='latin 1')
    geographies.columns = geo_template.columns
    return geographies

def clean_geographies(geo):
    geo.GEOID = geo.GEOID.str.split('US').str.get(1)
    return geo

def step2():
    geo = get_us_geo()
    clean = clean_geographies(geo)
    clean.to_pickle('step2_output.pkl')
    
step2()
frame = pd.read_pickle('step2_output.pkl')
frame.head()

Login successful 
Running function
RETR /programs-surveys/acs/summary_file/2016/data/5_year_seq_by_state/UnitedStates/All_Geographies_Not_Tracts_Block_Groups/g20165us.csv
Frame loaded


Unnamed: 0,FILEID,STUSAB,SUMLEVEL,COMPONENT,LOGRECNO,US,REGION,DIVISION,STATECE,STATE,...,PCI,BLANK.3,BLANK.4,PUMA5,BLANK.5,GEOID,NAME,BTTR,BTBG,BLANK.6
0,ACSSF,US,10,0,1,1.0,,,,,...,,,,,,,United States,,,
1,ACSSF,US,10,1,2,1.0,,,,,...,,,,,,,United States -- Urban,,,
2,ACSSF,US,10,43,3,1.0,,,,,...,,,,,,,United States -- Rural,,,
3,ACSSF,US,10,89,4,1.0,,,,,...,,,,,,,United States -- American Indian Reservation a...,,,
4,ACSSF,US,10,90,5,1.0,,,,,...,,,,,,,United States -- American Indian Reservation a...,,,


### Download Urban Areas Tiger File and save

In [12]:
@connect_census(gpd.GeoDataFrame)
def get_uac_df(year=2017, ftp=None):
    fd, name = mkstemp(suffix='.zip')
    ftp.retrbinary(f'RETR /geo/tiger/TIGER{year}/UAC/tl_{year}_us_uac10.zip', lambda f: os.write(fd, f))
    os.close(fd)
    gdf = gpd.read_file('', vfs=f'zip://{name}')
    os.remove(name)
    return gdf

def step3():
    uac_df = get_uac_df()
    uac_df.to_pickle('step3_output.pkl')

step3()
frame = pd.read_pickle('step3_output.pkl')
frame.head()

Login successful 
Running function
Frame loaded


Unnamed: 0,UACE10,GEOID10,NAME10,NAMELSAD10,LSAD10,MTFCC10,UATYP10,FUNCSTAT10,ALAND10,AWATER10,INTPTLAT10,INTPTLON10,geometry
0,24310,24310,"Dixon, IL","Dixon, IL Urban Cluster",76,G3500,C,S,25525003,938058,41.8529507,-89.4817439,"POLYGON ((-89.498589 41.854668, -89.498538 41...."
1,27847,27847,"Escanaba, MI","Escanaba, MI Urban Cluster",76,G3500,C,S,46648248,283456,45.8704839,-87.0638396,"(POLYGON ((-87.063103 45.866083, -87.062210999..."
2,18100,18100,"Clintonville, WI","Clintonville, WI Urban Cluster",76,G3500,C,S,5854683,502563,44.6232203,-88.7611283,"POLYGON ((-88.78650499999999 44.629957, -88.78..."
3,6166,6166,"Bedford, IN","Bedford, IN Urban Cluster",76,G3500,C,S,30402519,2314,38.856653,-86.5012383,"(POLYGON ((-86.518316 38.79547, -86.518253 38...."
4,75270,75270,"Riverdale, CA","Riverdale, CA Urban Cluster",76,G3500,C,S,2306823,0,36.431071,-119.8620544,"POLYGON ((-119.869132 36.430832, -119.870931 3..."


### Airflow Pipeline

In [None]:
dag = DAG('ACS Tiger')

s1 = PythonOperator(step1, 
                    task_id='step1', 
                    dag=dag)

s2 = PythonOperator(step2,
                   task_id='step2', 
                   dag=dag)

s3 = PythonOperator(step3,
                   task_id='step3'
                   dag=dag)

s2.set_upstream(s1)
s3.set_upstream(s2)