In [1]:
import os, re, struct
import pandas as pd
import numpy as np
from io import BytesIO
from zipfile import ZipFile
from urllib.request import urlopen
from collections import Counter
import time

## 01. Download data dictionary to extract variable positions

In [2]:
data_dict_url = 'https://www2.census.gov/programs-surveys/cps/datasets/2020/basic/2020_Basic_CPS_Public_Use_Record_Layout_plus_IO_Code_list.txt'

fp = urlopen(data_dict_url)
mybytes = fp.read()
data_dict = mybytes.decode("ISO-8859-1")
fp.close()

## 02. Download data

`parse_raw()` uses the data dictionary to find the column in question

`get_data()` downloads the raw data and calls `parse_raw`

In [3]:
def parse_raw(col, raw_data):
    p = f'({col})\s+(\d+)\s+.*?\t+.*?(\d\d*).*?(\d\d+)'
    find_result = re.findall(p, data_dict)[0]
    temp_start = int(find_result[2]) - 1
    temp_end = int(find_result[3])
    return [row.decode("ISO-8859-1")[temp_start: temp_end].rstrip().strip() for row in raw_data]

In [21]:
def get_data(month, col_list, t = 20):
    result_dict = {}
    print(f"pause for {t} seconds")
    time.sleep(t)
    print(f"done pausing")
    print(f"loading data for {month}")
    
    resp = urlopen(f'https://www2.census.gov/programs-surveys/cps/datasets/2020/basic/{month}20pub.zip')
    zipfile = ZipFile(BytesIO(resp.read()))
    info = zipfile.infolist()
    raw_data = zipfile.open(info[0].filename).readlines()
    for col in col_list:
        try:
            result_dict[col] = parse_raw(col, raw_data)
        except:
            print(f"column for {month}, {col} did not work")
            time.sleep(t)
    return result_dict

In [30]:
months = ['jan', 'feb', 'mar', 'apr', 'may']

In [24]:
col_list = ['PWCMPWGT','PWSSWGT','PREMPNOT','PESEX','PEEDUCA','PTDTRACE']
results_list = []
start_time = time.time()
for month in months:
    results_list.append(get_data(month, col_list))
print("--- %s seconds ---" % (time.time() - start_time))

pause for 20 seconds
done pausing
loading data for jan
pause for 20 seconds
done pausing
loading data for feb
pause for 20 seconds
done pausing
loading data for mar
pause for 20 seconds
done pausing
loading data for apr
pause for 20 seconds
done pausing
loading data for may
--- 127.55020904541016 seconds ---


In [40]:
results_df = pd.DataFrame(columns = col_list)
for data_month_pair in zip(results_list, months):
    temp_df = pd.DataFrame(data_month_pair[0])
    temp_df["month"] = data_month_pair[1]
    results_df = results_df.append(temp_df)
results_df.head()

Unnamed: 0,PWCMPWGT,PWSSWGT,PREMPNOT,PESEX,PEEDUCA,PTDTRACE,month
0,17347552,17713809,4,1,38,2,jan
1,16756084,16864805,4,2,40,2,jan
2,21463402,20481802,4,1,40,1,jan
3,30966041,30137016,1,1,39,1,jan
4,17590812,17309391,1,2,43,1,jan


In [41]:
for col in col_list:
    results_df[col] = pd.to_numeric(results_df[col])
results_df.dtypes

PWCMPWGT     int64
PWSSWGT      int64
PREMPNOT     int64
PESEX        int64
PEEDUCA      int64
PTDTRACE     int64
month       object
dtype: object

## 03. Store data in postgres

https://naysan.ca/2020/05/09/pandas-to-postgresql-using-psycopg2-bulk-insert-performance-benchmark/

In [2]:
import psycopg2
from config import *

In [51]:
from sqlalchemy import create_engine
connect = "postgresql+psycopg2://%s:%s@%s:5432/%s" % (
    param_dic['user'],
    param_dic['password'],
    param_dic['host'],
    param_dic['database']
)

def to_alchemy(table_name, df):
    """
    Using a dummy table to test this call library
    """
    engine = create_engine(connect)
    df.to_sql(
        table_name, 
        con=engine, 
        index=False, 
        if_exists='replace'
    )
    print("to_sql() done")

In [None]:
to_alchemy('unemp_raw', results_df)