Notebook works in tandem with save_topic_metadata.ipynb.

Purpose of this notebook is to import the primary SIPP 2018 dataset limited to the columns within a topic area, take a subset of that data to collapse to the person level, and finally save that smaller dataframe to the data directory.

The second part of this notebook reads all csv's created above and merges them and the final output is saved to csv. In short, the raw original dataframe becomes a dataframe with fewer rows collapsed to person level and fewer features.

Purpose of this process is create a manageable file to use for EDA and feature selection.

### Create Person Level CSV's for Each Topic

In [None]:
import time
import numpy as np
import pandas as pd
import dask.dataframe as dd
import os

# Load Schema for import ----------
rd_schema = pd.read_json('../data/raw/sipp_2018/pu2018_schema.json')
rd_schema['dtype'] = (['Int64' if x == 'integer'
                       else 'object' if x == 'string'
                       else 'Float64' if x == 'float'
                       else 'ERROR'
                       for x in rd_schema['dtype']]
                     )

# Define core features ----------
core_features = ['SSUID', 'PNUM', 'MONTHCODE', 
                 'RIN_UNIV', 'TAGE', 'EOWN_ST',
                ]

# Get list of metadata files with prefix from directory
path = '../data/interim/'
prefix = 'feature_import_meta_'
file_list = [x for x in os.listdir(path) if prefix in x]  # List includes on files with prefix in the filename

In [None]:
# For each file
for file in file_list:
    
    start = time.time()
    
    ## read first column and put list of column names in list
    topic_features = (pd.read_csv(path+file, usecols=['variable'])
                      .variable
                      .to_list()
                     )
    
    ## Combine core cols with list from file
    all_features = core_features + topic_features
    
    ## Remove duplicate features
    use_cols = []
    [use_cols.append(x) for x in all_features if x not in use_cols]                      
   
    ## Read csv w/ dask
    dask_topic_df = dd.read_csv("../data/raw/sipp_2018/pu2018.csv",
                                usecols=use_cols,
                                dtype=dict([(i,v) for i,v in zip(rd_schema.name, rd_schema.dtype)]),
                                sep='|',
                                header=0,
                                names=rd_schema['name'],
                               )
    
    ## Subset masks
    mask_month = dask_topic_df.MONTHCODE == 12
    mask_age = dask_topic_df.TAGE >= 18
    mask_univ = dask_topic_df.RIN_UNIV > 0
    masks = mask_month & mask_age & mask_univ
    
    ## Collapse rows, remove columns, and save to csv
    filename = file.replace('feature_import_meta', 'sipp2018_person')
    
    ## Compute to pd.df
    dask_topic_person_df = (dask_topic_df
                            [masks]    
                            .compute()   
                           )
    ## Edit and save csv
    dask_topic_person_df = (dask_topic_person_df
                            .drop_duplicates(['SSUID', 'PNUM'])
                            .drop(['MONTHCODE', 'RIN_UNIV'], axis='columns')                           
                           )
    
    dask_topic_person_df.to_csv(path + filename, index=False)
    
    end = time.time()
    
    print(f'{filename} iteration time: {end - start} seconds.')
    print(f'{filename} saved with {dask_topic_person_df.shape[0]} rows and {dask_topic_person_df.shape[1]} columns')

### Merge Collapsed Person Level CSV's

In [None]:
# new empty df
sipp_2018 = pd.DataFrame()

# read csv's
path = '../data/interim/sipp2018_person'
file_list = os.listdir(path)
df_list = [pd.read_csv(f'{path}/{file}') for file in file_list if 'sipp_person_' in file]

# merge dataframes
for i, df in enumerate(df_list):
    try:
        sipp_2018 = pd.merge(sipp_2018, df.drop('EOWN_ST', axis=1), on=['SSUID', 'PNUM'])
    except:
        sipp_2018 = df
        print(i)
        
# save merged df to csv
sipp_2018.to_csv(f'{path}/sipp2018_person.csv', index=False)

sipp_2018.shape

In [None]:
# Weights columns added after the fact. This was run independently on July 13
import time
import numpy as np
import pandas as pd
import dask.dataframe as dd
import os

# Load Schema for import ----------
rd_schema = pd.read_json('../data/raw/sipp_2018/pu2018_schema.json')
rd_schema['dtype'] = (['Int64' if x == 'integer'
                       else 'object' if x == 'string'
                       else 'Float64' if x == 'float'
                       else 'ERROR'
                       for x in rd_schema['dtype']]
                     )

weights = pd.read_csv("../data/raw/sipp_2018/pu2018.csv",
                      names=rd_schema['name'],
                      dtype=dict([(i,v) for i,v in zip(rd_schema.name, rd_schema.dtype)]),
                      sep='|',
                      header=0,
                      usecols=['SSUID', 'PNUM', 'MONTHCODE', 
                               'RIN_UNIV', 'TAGE', 'EOWN_ST', 'WPFINWGT',
                              ],
                     )


In [None]:
weights_person = (weights
                    .sort_values(by=['SSUID', 'PNUM', 'MONTHCODE'], 
                                 ascending=[True, True, False])
                    .drop_duplicates(['SSUID', 'PNUM'])
                    .query('RIN_UNIV > 0 and TAGE >= 18')
                    .drop(['MONTHCODE', 'RIN_UNIV'], axis='columns')
                   )
weights_person

In [None]:
weights_person = weights_person.drop(['EOWN_ST', 'TAGE'], axis=1)


In [21]:
weights_person['SSUID'] = weights_person['SSUID'].astype(int)

In [None]:
sipp_person = pd.read_csv('../data/interim/sipp2018_person/sipp2018_person.csv')

In [22]:
sipp_person = pd.merge(sipp_person, weights_person, on=['SSUID', 'PNUM'])

In [25]:
sipp_person.filter(like='WPFIN')

Unnamed: 0,WPFINWGT
0,5989.598574
1,3904.848315
2,4082.881038
3,3994.372476
4,3994.372476
...,...
49699,3690.255482
49700,3340.674724
49701,4522.314071
49702,3347.512844


In [27]:
sipp_person.to_csv('../data/interim/sipp2018_person/sipp2018_person.csv', index=False)