# CPS_Reader_Feather

##### Updated: April 8, 2018 -- Brian Dew, @bd_econ

This notebook is used to read monthly CPS datafiles and store them into annual feather format pandas dataframes.


TO DO:

Function should determine variable dataformat (e.g. np.int8) based on length provided by data dictionary.

Take advantage of pandas categoricals to store information about what values mean.

Clean up code and add annotations. The data dictionary reader is barely used, confusing to read, and doesn't work that well.

Thoroughly test and benchmark output against BLS published estimates.

Adjust weight variables for implied decimal places.

In [1]:
# Import packages and identify datafiles
import re
import os
import struct
import pickle
import numpy as np
import pandas as pd
from calendar import month_abbr

# Location of data
os.chdir('E:/08_Other/Archive/')

In [2]:
# Information about dictionary start and end dates and how to parse the columns
# Grouped by data dictionaries, which change when the underlying raw data changes
# First item is dictionary name. By name we have start and end dates YYYY-MM-DD.
# ixno is the location of the start of each variable of interest
# split_string is the gap between state codes and two letter state abbreviations
# p1 is the main pattern for variable information
# p2 works with the variable name to return the list of values
# p3 is the pattern for taking p2 into a tuple of (value_key, value_name)

d = {
 'January_2017_Record_Layout.txt': {'start': '2017-01-01', 'end': '2018-12-31', 'ixno': 3, 'split_string':'\t\t', 'p1': '\n(\w+)\s+(\d+)\s+(.*?)\t+.*?(\d\d*).*?(\d\d+)', 'p2': '.*?VALID ENTRIES\n\n\s+(.*?)\n(?:\x0c)?\w+\s+\d+\s+.*? \s+.*?\d\d*.*?\d\d+', 'p3': '(\d+.*?\d*)\t+(.*)'},
 'January_2015_Record_Layout.txt': {'start': '2015-01-01', 'end': '2016-12-31', 'ixno': 3, 'split_string':'\t\t', 'p1': '\n(\w+)\s+(\d+)\s+(.*?)\t+.*?(\d\d*).*?(\d\d+)', 'p2': '.*?VALID ENTRIES\n\n\s+(.*?)\n(?:\x0c)?\w+\s+\d+\s+.*? \s+.*?\d\d*.*?\d\d+', 'p3': '(\d+.*?\d*)\t+(.*)'},
 'January_2014_Record_Layout.txt': {'start': '2014-01-01', 'end': '2014-12-31', 'ixno': 3, 'split_string':'\t\t', 'p1': '\n(\w+)\s+(\d+)\s+(.*?)\t+.*?(\d\d*).*?(\d\d+)', 'p2': '.*?VALID ENTRIES\n\n\s+(.*?)\n(?:\x0c)?\w+\s+\d+\s+.*? \s+.*?\d\d*.*?\d\d+', 'p3': '(\d+.*?\d*)\t+(.*)'},
 'January_2013_Record_Layout.txt': {'start': '2013-01-01', 'end': '2013-12-31', 'ixno': 3, 'split_string':'\t\t', 'p1': '\n(\w+)\s+(\d+)\s+(.*?)\t+.*?(\d\d*).*?(\d\d+)', 'p2': '.*?VALID ENTRIES\n\n\s+(.*?)\n(?:\x0c)?\w+\s+\d+\s+.*? \s+.*?\d\d*.*?\d\d+', 'p3': '(\d+.*?\d*)\t+(.*)'},
 'may12dd.txt': {'start': '2012-05-01', 'end': '2012-12-31', 'ixno': 3, 'split_string':'\t\t', 'p1': '\n(\w+)\s+(\d+)\s+(.*?)\t+.*?(\d\d*).*?(\d\d+)', 'p2': '.*?VALID ENTRIES\n\n\s+(.*?)\n(?:\x0c)?\w+\s+\d+\s+.*? \s+.*?\d\d*.*?\d\d+', 'p3': '(\d+.*?\d*)\t+(.*)'},
 'jan10dd.txt': {'start': '2010-01-01', 'end': '2012-04-30', 'ixno': 3, 'split_string':'    ', 'p1': '\n(?:\x0c)?(\w+)\s+(\d+)\s+(.*?) \s+.*?(\d\d*).*?(\d\d+)', 'p2': '.*?VALID ENTRIES\n\n\s+(.*?)\n(?:\x0c)?\w+\s+\d+\s+.*? \s+.*?\d\d*.*?\d\d+', 'p3': '(\d+.*?\d*)\s+(.*)'},
 'jan09dd.txt': {'start': '2009-01-01', 'end': '2009-12-31', 'ixno': 3, 'split_string':'    ', 'p1': '\n(?:\x0c)?(\w+)\s+(\d+)\s+(.*?) \s+.*?(\d\d*).*?(\d\d+)', 'p2': '.*?VALID ENTRIES\n\n\s+(.*?)\n(?:\x0c)?\w+\s+\d+\s+.*? \s+.*?\d\d*.*?\d\d+', 'p3': '(\d+.*?\d*)\s+(.*)'},
 'jan07dd.txt': {'start': '2007-01-01', 'end': '2008-12-31', 'ixno': 3, 'split_string':'    ', 'p1': '\n(?:\x0c)?(\w+)\s+(\d+)\s+(.*?) \s+.*?(\d\d*).*?(\d\d+)', 'p2': '.*?VALID ENTRIES\n\n\s+(.*?)\n(?:\x0c)?\w+\s+\d+\s+.*? \s+.*?\d\d*.*?\d\d+', 'p3': '(\d+.*?\d*)\s+(.*)'},
 'augnov05dd.txt': {'start': '2005-08-01', 'end': '2006-12-31', 'ixno': 3, 'split_string':'    ', 'p1': '\n(?:\x0c)?(\w+)\s+(\d+)\s+(.*?) \s+.*?(\d\d*).*?(\d\d+)', 'p2': '.*?VALID ENTRIES\n\n\s+(.*?)\n(?:\x0c)?\w+\s+\d+\s+.*? \s+.*?\d\d*.*?\d\d+', 'p3': '(\d+.*?\d*)\s+(.*)'},
 'may04dd.txt': {'start': '2004-05-01', 'end': '2005-7-31', 'ixno': 3, 'split_string':'    ', 'p1': '\n(?:\x0c)?(\w+)\s+(\d+)\s+(.*?) \s+.*?(\d\d*).*?(\d\d+)', 'p2': '.*?VALID ENTRIES\n\n\s+(.*?)\n(?:\x0c)?\w+\s+\d+\s+.*? \s+.*?\d\d*.*?\d\d+', 'p3': '(\d+.*?\d*)\s+(.*)'},
 'jan03dd.txt': {'start': '2003-01-01', 'end': '2004-04-30', 'ixno': 3, 'split_string':'    ', 'p1': '\n(?:\x0c)?(\w+)\s+(\d+)\s+(.*?) \s+.*?(\d\d*).*?(\d\d+)', 'p2': '.*?VALID ENTRIES\n\n\s+(.*?)\n(?:\x0c)?\w+\s+\d+\s+.*? \s+.*?\d\d*.*?\d\d+', 'p3': '(\d+.*?\d*)\s+(.*)'},
 'jan98dd.asc': {'start': '1998-01-01', 'end': '2002-12-31', 'ixno': 2, 'split_string':'    ', 'p1': 'D (\w+)\s+(\d{1,2})\s+(\d+)\s+', 'p2': 'D {var[0]}.*?(V .*?)\n(?:\x0c)?D ', 'p3': '(\d+.*?\d*)\s+(.*)'},
 'sep95_dec97_dd.txt': {'start': '1995-09-01', 'end': '1997-12-31', 'ixno': 3, 'split_string':'    ', 'p1': '\n(?:\x0c)?(\w+)\s+(\d+)\s+(.*?) \s+.*?(\d\d*).*?(\d\d+)', 'p2': '.*?VALID ENTRIES\n\n\s+(.*?)\n(?:\x0c)?\w+\s+\d+\s+.*? \s+.*?\d\d*.*?\d\d+', 'p3': '(\d+.*?\d*)\s+(.*)'},
 'jun95_aug95_dd.txt': {'start': '1995-06-01', 'end': '1995-08-31', 'ixno': 3, 'split_string':'    ', 'p1': '\n(?:\x0c)?(\w+)\s+(\d+)\s+(.*?) \s+.*?(\d\d*).*?(\d\d+)', 'p2': '.*?VALID ENTRIES\n\n\s+(.*?)\n(?:\x0c)?\w+\s+\d+\s+.*? \s+.*?\d\d*.*?\d\d+', 'p3': '(\d+.*?\d*)\s+(.*)'},
 'apr94_may95_dd.txt': {'start': '1994-04-01', 'end': '1995-05-31', 'ixno': 3, 'split_string':'    ', 'p1': '\n(?:\x0c)?(\w+)\s+(\d+)\s+(.*?) \s+.*?(\d\d*).*?(\d\d+)', 'p2': '.*?VALID ENTRIES\n\n\s+(.*?)\n(?:\x0c)?\w+\s+\d+\s+.*? \s+.*?\d\d*.*?\d\d+', 'p3': '(\d+.*?\d*)\s+(.*)'},
 'jan94_mar94_dd.txt': {'start': '1994-01-01', 'end': '1995-03-31', 'ixno': 3, 'split_string':'    ', 'p1': '\n(?:\x0c)?(\w+)\s+(\d+)\s+(.*?) \s+.*?(\d\d*).*?(\d\d+)', 'p2': '.*?VALID ENTRIES\n\n\s+(.*?)\n(?:\x0c)?\w+\s+\d+\s+.*? \s+.*?\d\d*.*?\d\d+', 'p3': '(\d+.*?\d*)\s+(.*)'}    
}

In [3]:
# Series of interest 
s3 = ['PWORWGT', 'PWCMPWGT', 'HRHHID2', 'PRERNWA', 'PTERNWA', 'PWSSWGT']

s2 = ['PEHRUSL1', 'HRYEAR', 'HRYEAR4', 'PRUNEDUR', 'PRERNHLY', 'PTERNHLY']

# These series can be stored as categorical later on
s1 = ['HRMONTH', 'PESEX', 'PEMLR', 'PENLFRET', 'PENLFACT', 'PRDISC', 'GESTFIPS',
      'HRMIS', 'PRCOW1', 'PRFTLF', 'PREMPNOT', 'PRCIVLF', 'PEJHRSN','PRSJMJ', 
      'PEEDUCA', 'PRWKSTAT', 'PRMJOCC1', 'GTMETSTA', 'GEMETSTA', 'PEDWWNTO',
      'PRUNTYPE', 'PRMJIND1', 'PERACE', 'PTDTRACE', 'PRDTRACE', 'PRORIGIN',
      'PRDTHSP', 'PRCHLD', 'PRTAGE', 'PEAGE', 'PULINENO', 'PRWNTJOB', 'PEERNLAB']   
s = s1 + s2 + s3 + ['HRHHID']

def text_repl(string_item):
    return (string_item.replace('PEAGE', 'PRTAGE').replace('PTERNHLY', 'PRERNHLY')
            .replace('PTERNWA', 'PRERNWA').replace('GEMETSTA', 'GTMETSTA')
            .replace('PERACE', 'PRDTRACE').replace('PTDTRACE', 'PRDTRACE'))

In [4]:
# Build up dictionary with variable locations and codes
for ddi in d.items():
    dd = open(f'data/{ddi[0]}', 'r', encoding='iso-8859-1').read()
    p1 = re.compile(ddi[1]['p1'])
    vlist = [(text_repl(i[0]), int(i[ddi[1]['ixno']]), int(i[1])) 
             for i in p1.findall(dd) if i[0] in s]
    gclist = [(i[0], int(i[ddi[1]['ixno']]), int(i[1])) 
             for i in p1.findall(dd) if i[0] in s1]
    d[ddi[0]]['variables'] = {}
    d[ddi[0]]['vlist'] = vlist
    p3 = re.compile(ddi[1]['p3'])

    # Get the list of codes and their values for each coded variable
    for var in vlist:
        td = {}
        td['start'] = var[1]
        td['length'] = var[2]
        td['values'] = 'can be used directly'
        if var[0] in [g[0] for g in gclist]:
            p2 = re.compile(f'\n(?:\x0c)?{var[0]}{ddi[1]["p2"]}', 
                            re.MULTILINE|re.DOTALL)
            if ddi[0][-1] == 'c':
                p2 = re.compile(f'D {var[0]}.*?(V .*?)\n(?:\x0c)?D ', 
                                re.MULTILINE|re.DOTALL)
            vals = [[x.strip() for x in i.split('\n') if len(x.strip()) > 0] 
                    for i in p2.findall(dd)][0]
            val_list = [(i[0], i[1]) for i in 
                        [p3.findall(v)[0] for v in vals 
                         if len(p3.findall(v)) > 0]]
            td['values'] = val_list
            d[ddi[0]]['variables'][var[0]] = td

    # Special code to capture state codes, which are stored in two columns
    state_vals = []
    if ddi[0][-1] == 'c':
        d[ddi[0]]['variables']['GESTFIPS']['values'] = state_vals
    else:
        for state in d[ddi[0]]['variables']['GESTFIPS']['values']:
            if (' ' in state[1]) or ('\t' in state[1]):
                split_string = ddi[1]['split_string']
                stsplit = state[1].split(split_string)
                state1 = tuple([state[0], stsplit[0].strip()])
                state2 = tuple([int(stsplit[1].strip()[:2]), 
                                stsplit[1].strip()[-2:]])
                state_vals.append(state1)
                state_vals.append(state2)
            else:
                state_vals.append(state)
        d[ddi[0]]['variables']['GESTFIPS']['values'] = state_vals
        
with open('cps_dictionaries.pkl', 'wb') as handle:
    pickle.dump(d, handle)

In [5]:
# CEPR Race and Ethnicity variable that is time consistent
r_e_d = {'re1': {'race': ('PRDTRACE', [(1, [1]), (2, [2]), (4, [4]), (5, [3, 5])]),
                 'hisp': ('PRORIGIN', (3, [1, 2, 3, 4, 5, 6, 7])),
                 'start': '1994-01-01',
                 'end': '2002-12-01'},
         're2': {'race': ('PRDTRACE', [(1, [1]), 
                                       (2, [2, 6, 10, 11, 12, 15, 16, 19]), 
                                       (4, [4, 5, 8, 9, 13, 14, 17, 18]), 
                                       (5, [3, 7, 20, 21])]),
                 'hisp': ('PRDTHSP', (3, [1, 2, 3, 4, 5])),
                 'start': '2003-01-01',
                 'end': '2012-04-01'},
         're3': {'race': ('PRDTRACE', [(1, [1]), 
                                       (2, [2, 6, 10, 11, 12, 16, 17, 18, 22, 23]), 
                                       (4, [4, 5, 8, 9, 13, 14, 15, 19, 20, 21, 24]), 
                                       (5, [3, 7, 25, 26])]),
                 'hisp': ('PRDTHSP', (3, [1, 2, 3, 4, 5])),
                 'start': '2012-05-01',
                 'end': '2013-12-01'},
         're4': {'race': ('PRDTRACE', [(1, [1]), 
                                       (2, [2, 6, 10, 11, 12, 16, 17, 18, 22, 23]), 
                                       (4, [4, 5, 8, 9, 13, 14, 15, 19, 20, 21, 24]), 
                                       (5, [3, 7, 25, 26])]),
                 'hisp': ('PRDTHSP', (3, [1, 2, 3, 4, 5, 6, 7, 8])),
                 'start': '2014-01-01',
                 'end': '2018-12-01'}}


def wbhao_map(row, r_e):
    if row[r_e['hisp'][0]] in r_e['hisp'][1][1]:
        return r_e['hisp'][1][0]
    else:
        for r in r_e['race'][1]:
            if row[r_e['race'][0]] in r[1]:
                return r[0]
            
def wbhao_mapper(df, year):
    if year < 2003:
        df['WBHAO'] = df.apply(lambda x: wbhao_map(x, r_e_d['re1']), axis=1)

    if 2002 < year < 2012:
        df['WBHAO'] = df.apply(lambda x: wbhao_map(x, r_e_d['re2']), axis=1)

    if year == 2012:
        df.loc[df[df['HRMONTH'] < 5].index, 'WBHAO'] = df.loc[df[df['HRMONTH'] < 5].index, :].apply(
            lambda x: wbhao_map(x, r_e_d['re2']), axis=1)
        df.loc[df[df['HRMONTH'] >= 5].index, 'WBHAO'] = df.loc[df[df['HRMONTH'] >= 5].index, :].apply(
            lambda x: wbhao_map(x, r_e_d['re3']), axis=1)

    if year == 2013:
        df['WBHAO'] = df.apply(lambda x: wbhao_map(x, r_e_d['re3']), axis=1)

    if year > 2013:
        df['WBHAO'] = df.apply(lambda x: wbhao_map(x, r_e_d['re4']), axis=1)
    return df

In [6]:
# Set of functions for parsing raw data

# Use struct to read files faster 
def struct_constr(fieldspecs):
    """Specify which characters to retrieve and which to ignore"""
    unpack_len = 0
    unpack_fmt = ""
    for fieldspec in fieldspecs:
        start = fieldspec[1] - 1
        end = start + fieldspec[2]
        if start > unpack_len:
            unpack_fmt += str(start - unpack_len) + "x"
        unpack_fmt += str(end - start) + "s"
        unpack_len = end
    return struct.Struct(unpack_fmt).unpack_from

def fwf_to_list(file, unpacker, fieldspecs):
    """Return list of substrings"""
    fw = [i for i in fieldspecs if i[0] == 'PWSSWGT'][0]
    #Read monthly file and add it to annual dataframe
    return [tuple(map(int, unpacker(line))) 
            for line in open(f'data/{file}', 'rb') 
            if int(unpacker(line)[-1]) > 0]

# Convert list of lists to pandas df
def list_to_df(row_list, fieldspecs):
    """Store list as pandas dataframe"""
    df = (pd.DataFrame(row_list, columns=[v[0] for v in fieldspecs]))
    df[[s for s in s1 if s in df]] = df[[s for s in s1 if s in df]].astype(np.int8)    
    df[[s for s in s2 if s in df]] = df[[s for s in s2 if s in df]].astype(np.int16)
    df[[s for s in s3 if s in df]] = df[[s for s in s3 if s in df]].astype(np.int32)
#    id_vars = ['HRHHID', 'HRHHID2', 'PULINENO']
#    df['per_id'] = df['GESTFIPS'].astype(str).str.cat(
#        others=[df[i].astype(str) for i in id_vars if i in df.keys()])
    return df

# Manages the other functions
def monthly_to_annual(year, d, path):
    """Read monthly files and store as one annual file"""
    flist = {f: pd.to_datetime(f'{year}-{f[:3]}-01') 
             for f in os.listdir('data/') 
             if f.endswith(f'{str(year)[2:]}pub.dat')}
    for dd, vals in d.items():
        for f, t in flist.items():
            if t in pd.date_range(vals['start'], vals['end']):
                flist[f] = dd
    df = pd.concat([list_to_df(
        fwf_to_list(rd, struct_constr(d[dd]['vlist']), d[dd]['vlist']), 
        d[dd]['vlist']) for rd, dd in flist.items()]).reset_index(drop=True)
    # Additional functions to be applied here, for example cepr_wbhao
    df = wbhao_mapper(df, year)
    df.to_feather(f'{path}cps_{year}.ft')

In [7]:
# Read data, year by year and feather
path = 'C:/Working/econ_data/micro/data/'
for year in range(1994, 2019):
    monthly_to_annual(year, d, path)