In [1]:
import numpy as np
import pandas as pd
from sodapy import Socrata
import dask.dataframe as dd

In [2]:
# dfp = dd.read_parquet('cdc_data.parquet.gzip')
# dfh = dd.read_hdf('cdc_data_comp.h5', key='pm')
# dfp

In [3]:
def optimize_cdc_dtypes(df):
    """
    optimize dtypes for raw data returned from
    CDC SODA API call.
    input is pandas df of raw records from API
    output is pandas df with optimized dtypes
    """
    
    # Print starting memory size
    start_size = df.memory_usage(index=True, deep=True) / (1024**2)
    start_size = np.round(start_size.sum, 3)
    print(f'start size: {start_size} MB')
    
    # Declare df dict
    df_dtypes = {
        'year': 'int16', # can be dropped
        'date': 'category', 
        'statefips': 'int8', 
        'countyfips': 'int32', 
        'ctfips': 'int64', 
        'latitude': 'float32',
        'longitude': 'float32', 
        'ds_pm_pred': 'float32', 
        'ds_pm_stdd': 'float32',
    }

    # Optimize dtypes
    for k, v in df_dtypes.items():
        if k in results_df.columns:
            if k == 'date':
                results_df[k] = pd.to_datetime(results_df[k], format='%d%b%Y')
            else:
                results_df[k] = results_df[k].astype(v)

    # Print ending memory size
    end_size = df.memory_usage(index=True, deep=True) / (1024**2)
    end_size = np.round(end_size.sum, 3)
    print(f'end size:   {end_size} MB')
    
    return df

In [4]:
def cdc_data_pull(cols=None, filt=None, writeout=True):
    """
    reads PM data from CDC into memory. data from:
    https://data.cdc.gov/Environmental-Health-Toxicology/Daily-Census-Tract-Level-PM2-5-Concentrations-2011/fcqm-xrf4

    pulls PM data from CDC SODA API if not locally found,
    else reads it in
    
    inputs:
        - cols: list, list of strings of col names to query
        - filt: str, SQL-like string to pass as WHERE clause to query
        - writeout: bool, whether to write df to disk
    output is dask dataframe
    """
    
    # Declare SQL-like args to pass to SODA API
    # State FIPS filter gets data for California, Texas, and Illinois
    if pd.isnull(cols):
        cols = "date, statefips, countyfips, ctfips, ds_pm_pred, ds_pm_stdd"
    if pd.isnull(filt):
        filt = "year = '2014' AND statefips IN ('6', '48', '17')" 

    # Establish SODA client via sodapy wrapper (using Socrata class)
    # Sodapy notes:
    # "Unauthenticated client only works with public data sets. Note 'None'
    # in place of application token, and no username or password:"
    client = Socrata("data.cdc.gov", None)

    # Get queried data subset from CDC SODA API
    records = client.get("fcqm-xrf4", select=cols, where=filt, limit=10000000)

    client.close() # close connection

    # Convert to pandas DataFrame
    df = pd.DataFrame.from_records(records)
    
    # Optimize dtypes to minimize size in RAM
    df = optimize_cdc_dtypes(df)
    
    # Convert to dask dataframe
    ddf = dd.from_pandas(df, npartitions=60) # of ~100,000 rows each
    
    if writeout:
        # Write to disk
        # ddf.to_parquet('cdc_data.parquet.gzip', compression='gzip')
        ddf.to_hdf('cdc_data_comp.h5', key='pm', complib='lzo', complevel=1, format='table')
        
    return ddf

In [5]:
def cdc_data_gen(fpath='', cols=None, filt=None, writeout=True):
    """
    """
    if len(fpath)==0:
        ddf = cdc_data_pull(cols, filt, writeout)
    else:
        ddf = dd.read_hdf('cdc_data_comp.h5', key='pm')
    return ddf