# InfoGroup data

> Specify schema and convert raw CSV to a partitioned parquet dataset.

- Start with original data files in CSV format.
- Upgrade old datapackage.json schema.
- Rename columns and standardize to ALL_CAPS.
- In 2009: pad string fields with zeroes.
- Validate values format, replace errors with missing values.
- Save to disk in parquet format.
  - Excluding variables that are not present in all years and POPULATION_CODE.
- Provide interface to load data as DataFrame, with options to select columns and subsamples by year and state.

## Possible future work

- add indicator variables for different samples (random, WI, FAI, ...) to be used as parquet partitions to allow quick read of data subsets
- put meaningfult labels to categoricals ("1-5" instead if "A" etc).
  - This will be tricky for POPULATION_CODE that changes coding between 2015 and 2016
- make up and add enum constraints for TITLE_CODE and CALL_STATUS_CODE
- if categoricals are worthy on fields with large number of unique values, possible unknown a priori, such as city or NAICS, then they should be applied. care should be taken because set of unique values can vary between years, and it might create problems when merging.
- few variables have many values like "00000", those should possibly be replaced with np.nan
  - subsidiary_number, parent_number, site_number, census_tract, csa_code, maybe others

In [None]:
from nbdev import *
%nbdev_default_export infogroup

In [None]:
%nbdev_export
import sys
import logging
import json
import gzip
import shutil
import multiprocessing as mp
import functools

import numpy as np
import pandas as pd
import fastparquet
from IPython import display
from joblib import Memory

from rurec import resources
from rurec.resources import Resource
from rurec import util

memory = Memory(resources.paths.cache)
data_years = range(1997, 2018)

In [None]:
%nbdev_export
# for batch runs: log to a file
logging.basicConfig(filename=resources.paths.root / 'logs/processing.log', 
                    filemode='w', level=logging.INFO, format='%(asctime)s %(levelname)s:\n%(message)s', force=True)

In [None]:
# for interactive use: log to stdout
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='%(asctime)s %(levelname)s:\n%(message)s', force=True)

In [None]:
resources.add(Resource('infogroup/schema', '/InfoGroup/data/processed/schema.json', 'Processed InfoGroup data, schema', False))
resources.add(Resource('infogroup/full', '/InfoGroup/data/processed/full.pq', 'Processed InfoGroup data, all years, partitioned parquet', False))
for y in data_years:
    resources.add(Resource(f'infogroup/orig/{y}', f'/InfoGroup/data/original/raw/{y}_Business_Academic_QCQ.csv', f'Original unprocessed InfoGroup data, {y}', False))

In [None]:
# override resource paths for testing and debugging
resources.get('infogroup/full').path = resources.paths.root / 'tmp/full.pq'

In [None]:
# faster returns for testing and debugging
pd.read_csv = functools.partial(pd.read_csv, nrows=10_000)

In [None]:
%nbdev_export

def convert_schema(datapackage_schema_path):
    """Convert old datapackage schema.json into a new file, to be used for data validation."""
    sch0 = json.load(open(datapackage_schema_path))

    def get_field_years(field_name):
        years = []
        for fl in sch0['field_lists']:
            if field_name in fl['fields']:
                years += fl['years']
        if 2015 in years:
            years += [2016, 2017]
        if field_name == 'gender':
            years += [2017]
        return sorted(years)


    sch = dict()
    sch['info'] = 'Schema for cleaned InfoGroup data.'
    sch['fields'] = list()

    for f0 in sch0['fields']:
        f = dict()
        name = f0['name']
        f['name'] = name.upper()
        f['years'] = get_field_years(name)
        if 'enum' in f0['constraints']:
            enum = f0['constraints']['enum'].copy()
            if '' in enum:
                enum.pop(enum.index(''))
            f['enum'] = enum
        if 'values' in f0:
            values = f0['values'].copy()
            if name == 'cbsa_level':
                del values['0'] # code never used
            f['enum_labels'] = values

        field_widths = {
            'ZIP': 5,
            'ZIP4': 4,
            'COUNTY_CODE': 3,
            'AREA_CODE': 3,
            'PHONE': 7,
            'SIC': 6,
            'SIC0': 6,
            'SIC1': 6,
            'SIC2': 6,
            'SIC3': 6,
            'SIC4': 6,
            'NAICS': 8,
            'YP_CODE': 5,
            'ABI': 9,
            'SUBSIDIARY_NUMBER': 9,
            'PARENT_NUMBER': 9,
            'SITE_NUMBER': 9,
            'CENSUS_TRACT': 6,
            'CENSUS_BLOCK': 1,
            'CBSA_CODE': 5,
            'CSA_CODE': 3,
            'FIPS_CODE': 5
        }
        if f['name'] in field_widths:
            f['width'] = field_widths[f['name']]

        f['original_name'] = f0['originalName']
        f['original_description'] = f0['originalDescription']
        sch['fields'].append(f)

    json.dump(sch, open(resources.get('infogroup/schema').path, 'w'), indent=1)
    

def get_schema(field_name=None, year=None):
    """Flexible function to get dataset schema.
    
    With no parameters, return full list of field dictionaries.
    With `year`, restrict to fields present in that year.
    With `field_name`, return dictionary for specific field (`year` is ignored in this case).
    """
    sch = json.load(resources.get(f'infogroup/schema').path.open())['fields']
    if field_name is not None:
        return [x for x in sch if x['name'] == field_name][0]
    if year is not None:
        return [x for x in sch if year in x['years']]
    return sch  

    
def pad_with_zeroes(df, schema_fields):
    """Prepend string column values with zeroes to have constant width."""

    for field in schema_fields:
        if 'width' in field:
            df[field['name']] = df[field['name']].str.zfill(field['width'])

def validate_raw_strings(df, schema_fields):
    """Validate values in raw InfoGroup data according to string constraints.
    Return list of dicts of invalid values.
    """
    
    constraints = {
        'ZIP': {'number': True},
        'ZIP4': {'number': True},
        'COUNTY_CODE': {'number': True},
        'AREA_CODE': {'number': True},
        'PHONE': {'number': True},
        'SIC': {'number': True},
        'SIC0': {'number': True},
        'SIC1': {'number': True},
        'SIC2': {'number': True},
        'SIC3': {'number': True},
        'SIC4': {'number': True},
        'NAICS': {'number': True},
        'YEAR': {'notna': True, 'number': True},
        'YP_CODE': {'number': True},
        'EMPLOYEES': {'number': True},
        'SALES': {'number': True},
        'PARENT_EMPLOYEES': {'number': True},
        'PARENT_SALES': {'number': True},
        'YEAR_EST': {'number': True},
        'ABI': {'unique': True, 'notna': True, 'number': True},
        'SUBSIDIARY_NUMBER': {'number': True},
        'PARENT_NUMBER': {'number': True},
        'SITE_NUMBER': {'number': True},
        'CENSUS_TRACT': {'number': True},
        'CENSUS_BLOCK': {'number': True},
        'LATITUDE': {'number': True},
        'LONGITUDE': {'number': True},
        'CBSA_CODE': {'number': True},
        'CSA_CODE': {'number': True},
        'FIPS_CODE': {'number': True}
    }
    
    # the above hard coded list of constraints must be consistent with field availability in given year
    constraints = {k: v for k, v in constraints.items() if k in df}
    
    
    for field in schema_fields:
        name = field['name']
        if 'enum' in field:
            if name not in constraints: constraints[name] = dict()
            constraints[name]['cats'] = field['enum']
        if 'width' in field:
            constraints[name]['nchar'] = field['width']
    
    return util.validate_values(df, constraints)


def convert_dtypes(df, schema_fields):
    """Inplace convert string columns to appropriate types."""
    
    for col in ['YEAR', 'EMPLOYEES', 'SALES', 'PARENT_EMPLOYEES', 'PARENT_SALES', 'YEAR_EST', 'LATITUDE', 'LONGITUDE']:
        df[col] = pd.to_numeric(df[col])
        
    for field in schema_fields:
        if 'enum' in field:
            df[field['name']] = pd.Categorical(df[field['name']], categories=field['enum'])
    

def validate_raw_numbers(df, year):
    """Validate values in raw InfoGroup data according to numerical constraints.
    Return list of dicts of invalid values.
    """
    
    constraints = {
        'YEAR': {'eq': year},
        'EMPLOYEES': {'ge': 0},
        'SALES': {'ge': 0},
        'PARENT_EMPLOYEES': {'ge': 0},
        'PARENT_SALES': {'ge': 0},
        'YEAR_EST': {'ge': 1000, 'le': year},
        'LATITUDE': {'ge': 0, 'le': 90},
        'LONGITUDE': {'ge': -180, 'le': 0}
    }
    return util.validate_values(df, constraints)

def replace_invalid(df, invalid_list, replacement=np.nan):
    """Replace invalid values."""
    for inv in invalid_list:
        df.loc[inv['idx'], inv['col']] = replacement
        logging.info(f'Replace invalid value `{inv["val"]}` with `{replacement}` at .loc[{inv["idx"]}, \'{inv["col"]}\']')

In [None]:
convert_schema('/InfoGroup/data/original/raw/datapackage_schema.json')

In [None]:
%nbdev_export
# Columns that do not appear in all years, as well as POPULATION_CODE that changes categories,
# are not includes in the full parquet dataset
shared_cols = [x['name']
               for x in json.load(resources.get('infogroup/schema').path.open())['fields']
               if (1997 in x['years']) and (x['name'] not in ['YEAR', 'POPULATION_CODE'])]

def load_validate_convert(year):
    """Run full processing pipeline on one year of data:
    load CVS, validate values, convert dtypes and save as parquet partition.
    """
    logging.info(f'Processing started for year {year}\n' + '-'*80)
    sch = json.load(resources.get(f'infogroup/schema').path.open())
    sch = [x for x in sch['fields'] if year in x['years']]
    
    # POPULATION_CODE has different values in 2016 and 2017
    if year >= 2016:
        for f in sch:
            if f['name'] == 'POPULATION_CODE':
                f['enum'] = list('0123456789')
                break

    df = pd.read_csv(resources.get(f'infogroup/orig/{year}').path, dtype='str', encoding='latin-1')

    df.rename(columns={x['original_name']: x['name'] for x in sch}, inplace=True)

    if year == 2009:
        pad_with_zeroes(df, sch)

    invalid_str = validate_raw_strings(df, sch)
    if len(invalid_str) < 100:
        replace_invalid(df, invalid_str)
    else:
        logging.error(f'Very many invalid_str values: {len(invalid_str)}, processing aborted')
        logging.error(invalid_str[:5])
        return

    convert_dtypes(df, sch)

    invalid_num = validate_raw_numbers(df, year)
    if len(invalid_num) < 100:
        replace_invalid(df, invalid_num)
    else:
        logging.error(f'Very many invalid_num values: {len(invalid_num)}, processing aborted')
        logging.error(invalid_num[:5])
        return
    
    df = df[shared_cols]
    partition_path = str(resources.get('infogroup/full').path / f'YEAR={year}')
    fastparquet.write(partition_path, df, file_scheme='hive', write_index=False, partition_on=['STATE'])

    logging.info(f'Processing finished for year {year}\n' + '-'*80)
    return partition_path


def build_parquet_dataset(n_cpus=1):
    """Create full parquet dataset from yearly CSV files."""
    
    logging.info('create_parquet_dataset() started.')
    
    p = resources.get('infogroup/full').path
    # Remove dataset files if they exist from before
    if p.exists():
        shutil.rmtree(p)
    p.mkdir()
    if n_cpus > 1:
        with mp.Pool(n_cpus) as pool:
            partition_paths = pool.map(load_validate_convert, data_years)
    else:
        partition_paths = [load_validate_convert(y) for y in data_years]
    _ = fastparquet.writer.merge(partition_paths)
    logging.info('create_parquet_dataset() finished.')

In [None]:
%nbdev_export
def get_df(years=None, cols=None, states=None):
    """Return one year of InfoGroup data with appropriate data types.
    Subset of columns can be loaded by passing list to `cols`.
    """
    filters = []
    if years is not None:
        filters.append(('YEAR', 'in', years))
    if states is not None:
        filters.append(('STATE', 'in', states))
    res = resources.get('infogroup/full')
    df = pd.read_parquet(res.path, 'fastparquet', columns=cols, filters=filters)
    df['YEAR'] = df['YEAR'].astype(int)
    return df