In [None]:
###############################################################################

!pip install -q jupyternotify
!pip install -q --upgrade pandas
!pip install -q tabulate

from IPython.core.interactiveshell import InteractiveShell
import jupyternotify
import numpy as np
import pandas as pd
import warnings

InteractiveShell.ast_node_interactivity = "all"
get_ipython().register_magics(jupyternotify.JupyterNotifyMagics)
warnings.filterwarnings("ignore") # For extraneous pd warnings

In [None]:
%%time
%%notify

## Download raw data (15min)
# url = (
#     'https://data.iowa.gov/api/views/m3tr-qhgy/rows.csv?accessType=DOWNLOAD')
# %time !noglob wget -c {url}
# !mv rows.csv?accessType=DOWNLOAD raw.csv

## Read in raw data (3min 30s)
%time df = pd.read_csv('raw.csv', dtype='object')

## Rename columns (1ms)
columns = (
    'Invoice Date Store_num Store Address City Zip Location County_num County '
    'Subcategory_num Subcategory Vendor_num Vendor Item_num Item Bottle_pack '
    'Bottle_mL Bottle_cost Bottle_retail Bottle_count Dollars Liters Gallons '
).split()
%time df.columns = columns

## Drop rows missing vendor, item, or sale amount (1min)
subset = 'Vendor_num Item_num Bottle_cost Bottle_retail Dollars'.split()
%time df = df.dropna(subset=subset)

## Fix anomilies that will prevent int-casting (4s)
df.loc[df.Zip=='712-2', 'Zip'] = '51529'
df.loc[df.Item_num=='x904631', 'Item_num'] = '904631'

## Format `Invoice` column (2min)
%time df.Invoice = df.Invoice.str.replace('INV-','1')
%time df.Invoice = df.Invoice.str.replace('S|-','')
%time df[['Invoice']].describe().T # Check injectivity

## Format location data (11min 30s)
%time df.Location = df.Location.str.replace('\(|\)|POINT ', '')
%time df[['Longitude','Latitude']] = df.Location.str.split(expand=True)
%time df = df.drop(columns=['Location'])

## Remove case distinctions (9min 30s)
for c in df.columns:
    %time df[c] = df[c].str.title()

## Engineer `Category` column, which groups `Subcategory` (6s)
def category_dict_update(category, string=None):
    if not string:
        string = category
    category = category.title()
# For each subcategory `x` that contains string...
    for x in {x for x in subcategories if string.upper() in x.upper()}:
# ...add the pair `x`:`category`
        category_dict[x] = category
# ...drop `x` from the list `subcategories`
        subcategories.drop(
            inplace = True,
            index = subcategories[subcategories==x].index)
subcategories = pd.Series(df.Subcategory.unique()).dropna()
category_dict = {}
pairs = [
    'Liqueur', 'Liqueur Schnapps', 'Liqueur Creme', 'Liqueur Amaretto',
    'Liqueur Anisette', 'Liqueur Triple', 'Whiskey Whisk', 'Whiskey Rye',
    'Whiskey Bourbon', 'Whiskey Scotch', 'Whiskey Iowa', 'Vodka', 'Rum', 'Gin',
    'Tequila', 'Tequila Mezcal', 'Brandy', 'Brandy Brandies', 'Cocktail',
    'Neutral', 'Neutral Alcohol'] # <========== THIS LINE NEW
for p in pairs:
    category_dict_update(*p.split())
# category_dict_update('Other', '') # <========== DOESNT DO ANYTHING
df['Category'] = df['Subcategory'].map(category_dict)

## Save to temp (8min 30s - 12min)
## NOTE: Passing thru csv "repairs" the data structure
##       after pd operations above and will speed up later steps.
%time df.to_csv('temp.csv', index=False)

## Clear namespace
%reset -f

In [None]:
###############################################################################

from IPython.core.interactiveshell import InteractiveShell
import jupyternotify
import numpy as np
import pandas as pd
import warnings

InteractiveShell.ast_node_interactivity = "all"
get_ipython().register_magics(jupyternotify.JupyterNotifyMagics)
warnings.filterwarnings("ignore")

def mysummary(df):
    df_dtypes = df.dtypes
    df_isna = df.isna().sum()
    df_0 = df.iloc[0].T.astype('str')
    df_summary = pd.DataFrame({
        'Type':df_dtypes,
        'Number of Nulls':df_isna,
        'First Row of Data':df_0})
    return df_summary

In [None]:
%%time
%%notify

## Load from temp (3min)
%time df = pd.read_csv('temp.csv', dtype='object')

## Format dates (30s)
%time df.Date = pd.to_datetime(df.Date)

## Cast float columns (4min)
for c in 'Bottle_cost Bottle_retail Dollars Liters Gallons Longitude Latitude'.split():
    %time df[c] = pd.to_numeric(df[c], errors='coerce', downcast='float')

## Cast int columns (2min)
## NOTE: Should work even without `round(df[c])`
for c in 'Bottle_pack Bottle_mL Bottle_count'.split():
    %time df[c] = pd.to_numeric(df[c], errors='coerce', downcast='integer')
%time mysummary(df) # (20s)

## Convert dollars to cents (30s)
for c in 'Bottle_cost Bottle_retail Dollars'.split():
    %time df[c] = df[c]*100
%time df = df.rename(columns={'Dollars': 'Cents'})

## Convert Liters to mL (30s)
%time df.Liters = df.Liters*1000
%time df = df.rename(columns={'Liters': 'mL'})

## Cast newly-integer columns (4s 30s)
for c in 'Bottle_cost Bottle_retail Cents mL'.split():
    df[c] = pd.to_numeric(round(df[c]), errors='coerce', downcast='integer')
%time mysummary(df) # (30s)

## Describe data (10s)
pd.set_option('float_format', '{:.0f}'.format)
%time df.describe(include='integer').T.fillna('')
pd.set_option('float_format', '{:.2f}'.format)
%time df.describe(include='float32').T.fillna('')
pd.reset_option('float_format')

In [None]:
%%time
%%notify

## Add feature `Anomalous`
df['Anomalous'] = False

## Fix stores with incorrect Long-Lat (2s)
## THEN flag stores not in Iowa ( )
for store_num, cols, vals in [
        (4722, 'Longitude Latitude Zip', [-91.113432,40.807126,52601]),
        (5876, 'Longitude Latitude', [-94.445151,42.959583])]:
    %time df.loc[df['Store_num']==store_num, cols.split()] = vals
long_outliers = (df.Longitude<-97)|(df.Longitude>-90)
lat_outliers = (df.Latitude<40)|(df.Latitude>44)
%time df.loc[long_outliers|lat_outliers,'Anomalous'] = True

## Flag mL/Gallons != about 4000
%time df.loc[round(df.mL/df.Gallons,-3)!=4000,'Anomalous'] = True

## Flag mL != Bottle_count * Bottle_mL
%time volume_anomalies = round(df.mL - df.Bottle_mL*df.Bottle_count)!=0
%time df.loc[volume_anomalies,'Anomalous'] = True

## Flag retail markup != about 1.5
markup_inliers = round(df.Bottle_retail/df.Bottle_cost).isin([1.0,2.0])
%time df.loc[~markup_inliers,'Anomalous'] = True

## Check Bottle_count * Bottle_retail = Cents
price_anomalies = round(df.Bottle_count*df.Bottle_retail-df.Cents,-2)!=0
%time df.loc[price_anomalies,'Anomalous'] = True

## Investigate subcategories that did not get mapped to a category
check_cat1 = pd.pivot_table(
    df[df.Category.isna()], index='Subcategory', 
    values='Item', aggfunc=['count','nunique'], margins=True)
check_cat1.columns = ['Number of Sales', 'Number of Unique Items']
check_cat1

## Investigate first 3 Items in each subcategory not belonging to a category
check_cat2 = pd.concat([
    df.loc[df.Subcategory==subcat, ['Item','Subcategory']].drop_duplicates().head(3)
    for subcat in df.loc[df.Category.isna(), 'Subcategory'].drop_duplicates()
])
check_cat2 = check_cat2.reset_index()
check_cat2

## Investigate single puchases over 100k (10mil cents) -> No obvious anomalies
df[df.Cents>100000_00].sort_values('Cents',ascending=False).T

In [None]:
%%time
%%notify

## Serialize full dataset (2min)
%time df.to_parquet('data.parquet')

## Survey the final dataset data.parquet (3min)
%time mysummary(df)
%time df.describe(include='object', datetime_is_numeric=True).T.fillna('')
%time df.describe(include='datetime64').T.fillna('')

pd.set_option('float_format', '{:.0f}'.format)
%time df.describe(include='integer').T.fillna('')

pd.set_option('float_format', '{:.2f}'.format)
%time df.describe(include='float32').T.fillna('')
pd.reset_option('float_format')

## Downsample to 5% (40s)
np.random.seed(12345)
SAMPLE_RATE = 0.05
%time df['Random'] = np.random.random(size=len(df))
%time downsample = df[df.Random < SAMPLE_RATE]
%time downsample.to_parquet('downsample.parquet')

In [None]:
## Summarize data files
filesizes = !! du -h raw.csv data.parquet downsample.parquet
filesizes = [n.split('\t') for n in filesizes]
print(f'''
{filesizes[0][1]:>20}   (original)     =   {filesizes[0][0]}
{filesizes[1][1]:>20}   (cleaned)      =   {filesizes[1][0]}
{filesizes[2][1]:>20}   (sampled 5%)   =   {filesizes[2][0]}
''')

                 raw.csv   (original)     =   4.5G
            data.parquet   (cleaned)      =   848M
      downsample.parquet   (sampled 5%)   =    47M

In [None]:
### TO DO: describe etc .to_markdown()
###        then to README

### TO DO: try passing thru csv ? no we will lose all the numeric data...
###        try dealing with the anomalies before to_numeric?