In [None]:
import logging
import re
import os
from pathlib import Path
from collections import defaultdict, namedtuple, Counter
from datetime import datetime

import pandas as pd
import numpy as np
import s3fs

pd.set_option('display.max_columns', 50)

import data_utils
import aws_utils

# You may want to do it this way if you are debuging / editing the support files 
# modules imported via aimport will before you execute any cell
# %load_ext autoreload
# %autoreload 1
# %aimport data_utils
# %aimport aws_utils

## First we'll establish some basic logging

In [None]:
# create main logger
logger = logging.getLogger('main')
logger.setLevel(logging.DEBUG)

s_tdy = datetime.today().strftime('%m-%d-%Y')
main_log = Path(f'logs/chi-town-scrub_{s_tdy}.log')
if not main_log.parent.exists(): main_log.parent.mkdir(parents=True, exist_ok=True)
if main_log.exists(): os.remove(main_log)

# create console handler and set level to debug
ch = logging.StreamHandler()
ch_main_log = logging.FileHandler(main_log)
ch.setLevel(logging.DEBUG)
ch_main_log.setLevel(logging.DEBUG)

# create formatter
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(funcName)s - %(message)s')

# add formatter to ch
ch.setFormatter(formatter)
ch_main_log.setFormatter(formatter)

logger.addHandler(ch)
logger.addHandler(ch_main_log)

## Set up our regex patterns for columns that need to be validated, as well as any general patterns that might be helpful

We are going to load our data as str type into a dataframe, so that no automatic type conversions take place, then 
use regex patterns to validate the expected content of various columns, rejecting or replacing where necessary

In [None]:
ScrubRE = namedtuple('ScrubRE', ['TWO_LETTERS',
                                 'BLOCK', 'IUCR', 'PRIMARY_TYPE', 'DESCRIPTION', 'LOCATION_DESCRIPTION', 
                                 'LOCATION', 'ZIP_CODES'])

# Generic and column specific regex patterns for validation and transformation purposes
MY_REGX = ScrubRE(TWO_LETTERS=re.compile(r'^[a-z]{2}$', flags=re.I),                      
                      # Look for some kind of address like 013XX and then a street location like W 3RD AVE
                      # Will use to validate and to extract
                      BLOCK=re.compile(r'^(\d{1,4}X{1,4}) ((?:[a-z\d] ?){1,20}){1,5}$', flags=re.I),
                      # IUCR we just want to confirm it's some 4 length alphanumeric code
                      IUCR=re.compile(r'^[a-z\d]{4}$', flags=re.I),
                      # Look for up to five groups of letters, dashes
                      PRIMARY_TYPE=re.compile(r'^(?:[a-z\-]{1,20}(?: |$)){1,5}$', flags=re.I),
                      # Look for up to seven groups of letters, numbers, or [-/:,()$]
                      DESCRIPTION=re.compile(r'^(?:[a-z\-\/\:\,\.\(\)\d\$}]{1,25}(?: |$)){1,7}$', flags=re.I),
                      # Look for up to seven groups of letters, or [-/.,()]
                      LOCATION_DESCRIPTION=re.compile(r'^(?:[a-z\-\/\.\,\(\)]{1,20}(?: |$)){1,7}$', flags=re.I),
                      LOCATION=re.compile(r'^\((-?\d+\.\d+), ?(-?\d+\.\d+)\)$'),
                      # Zip codes should be length 4 or 5
                      ZIP_CODES=re.compile(r'^\d{5}|\d{4}$'))

## The following two functions define how we want to process our dataset

### First, we focus on columns that can't be null or if they fail some validation, need to be excluded entirely
### Second, we focus on everything else

This is where we will pre-process, validate, post-process and / or generate any derived columns

<font color=blue>__The following two functions rely heavily on data_utils.process_field to do the heavy lifting, so you should review what we can do with that function__</font>

In [None]:
def process_hard_rejects(df):
    '''
    Here we define any logic based on columns that can't be null or for which a validation error would mean we need to exclude the entire row
    
    Parameters:
    df (DataFrame): our raw data
    
    Returns:
    df_filt (DataFrame): a DataFrame where any hard rejects have been excluded entirely
    hard_rejects (dict of column name, indices): column names and row indices of anything that can't be null or failed some validation and
                                                 had to be excluded entirely becase of it
    '''
    df_filt = df.copy()
    
    logger.info(f'Length of records before hard rejects {len(df_filt):,}')
    
    hard_rejects = defaultdict(set)

    # Column specific logic
    # id -> Not null, must be digits
    id_val = lambda series: series.str.isdigit()
    # case_number -> Not null, first two must be alpha
    case_number_val = lambda series: series.str.slice(0,2).str.match(MY_REGX.TWO_LETTERS)
    
    # date -> Not null, try to parse known formats, also want to extract year, month
    def date_yr_mo(df, dates):
        df['year'] = dates.dt.year.astype(str)
        df['month'] = dates.dt.month.astype(str)

    nonnull_fields = {}
    nonnull_fields['id'] = {'validation': id_val}
    nonnull_fields['case_number'] = {'validation': case_number_val}
    nonnull_fields['date'] = {'date_field': True, 'other_nulls': ['0000-00-00'], 'generated_cols': [date_yr_mo]}

    for field, params in nonnull_fields.items():
        logger.debug(f'Processing {field}')
        data_utils.process_field(df_filt, field, hard_rejects, **params)

    # Filter out hard rejects
    for _, ids in hard_rejects.items():
        if len(ids) > 0: 
            df_filt = df_filt.loc[~df_filt.index.isin(ids)]

    logger.info(f'Length of records after hard rejects {len(df_filt):,}')
    
    return df_filt, hard_rejects

In [None]:
def process_soft_rejects(df_filt):
    '''
    Here we define any logic based on columns that can be null, and for which a validation error would mean we set the column to null
    
    After processing, the data in the original column names will have clean data in them, and columns like '[col_name]_orig' will have the 
    original value if it failed validation.  See data_utils.process_field for a detailed explanation
    
    Parameters:
    df_filt (DataFrame): our data after processing hard rejects (modified in place)
    
    Returns:
    soft_rejects (dict of column name, indices): column names and row indices of anything that failed some validation and
                                                 had to be set to null because of it
    '''
    soft_rejects = defaultdict(set)

    # Generic cosmetic changes (post-processing)
    capitalize_first = lambda series: series.str.title()
    upper_case = lambda series: series.str.upper()

    # Block we want to extract the hidden house number from the street location so we can analyze crime by street
    def block_num_addr(df, blocks):
        df[['house_num', 'street_addr']] = blocks.str.extract(MY_REGX.BLOCK, expand=True)

    primary_type_post = [(None, capitalize_first)]
    # Combine regex validation and max length 50
    description_val = lambda series: series.str.match(MY_REGX.DESCRIPTION, na=False) & series.str.len().le(50)
    description_post = [(None, capitalize_first)]
    # Combine regex validation and max length 50
    location_description_val = lambda series: series.str.match(MY_REGX.LOCATION_DESCRIPTION, na=False) & series.str.len().le(50)
    location_description_post = [(None, capitalize_first)]
    # arrest / domestic fields look to be all True / False, so let's just confirm this with a valid values constraint
    tf_valid = ['true', 'false']
    # beat, district, ward, community area, look to be all integer fields, so for these four, we will 
    # validate it's an int
    valid_int = lambda series: series.str.isdigit()

    # Location, we want to extract lat / lon into their own columns
    # Also, we don't care about the original field after the extract, so we'll drop it
    def location_lat_lon(df, locations):
        df[['latitude','longitude']] = locations.str.extract(MY_REGX.LOCATION, expand=True)

    # For zip codes we want to prefix 4 length zip codes with a '0' at the beginning after validation
    def zip_to_five(zips):
        condlist = [zips.str.len().eq(4), zips.str.len().eq(5)]
        choicelist = ['0' + zips, zips]
        return pd.Series(index=zips.index, data=np.select(condlist, choicelist, default=np.nan))

    post_zip_codes = [(None, lambda series: zip_to_five(series))]

    nullable_fields = {}
    nullable_fields['block'] = {'validation': MY_REGX.BLOCK, 'generated_cols': [block_num_addr]}
    nullable_fields['iucr'] = {'validation': MY_REGX.IUCR}
    nullable_fields['primary_type'] = {'validation': MY_REGX.PRIMARY_TYPE, 'post_process': primary_type_post}
    nullable_fields['description'] = {'validation': description_val, 'post_process': description_post}
    nullable_fields['location_description'] = {'validation': location_description_val, 'post_process': location_description_post}
    nullable_fields['arrest'] = {'valid_values': tf_valid}
    nullable_fields['domestic'] = {'valid_values': tf_valid}
    nullable_fields['beat'] = {'validation': valid_int}
    nullable_fields['district'] = {'validation': valid_int}
    nullable_fields['ward'] = {'validation': valid_int}
    nullable_fields['community_area'] = {'validation': valid_int}
    nullable_fields['location'] = {'validation': MY_REGX.LOCATION, 'generated_cols': [location_lat_lon], 'drop_field': True}
    nullable_fields['zip_codes'] = {'validation': MY_REGX.ZIP_CODES, 'post_process': post_zip_codes}

    for field, params in nullable_fields.items():
        logger.debug(f'Processing {field}')
        data_utils.process_field(df_filt, field, soft_rejects, **params)
        
    return soft_rejects

## Now, let's test our scrubbing process on a single file

First, we'll get a list of the files we want to process via Lambda

<font color=red>Make sure to change the name of the S3_BUCKET to your bucket</font>

In [None]:
s3 = s3fs.S3FileSystem()
FILE_PATTERN = re.compile('.*?(\d+)\.csv')
S3_BUCKET = 'chi-town-scrub-data'

s3_files = aws_utils.get_s3_files_to_process(s3, FILE_PATTERN, S3_BUCKET, '')
s3_files

## Pick one of the files above to process

In [None]:
%%time

s3_bucket_key = s3_files[0][1]
print(f'Loading {s3_bucket_key}')
with s3.open(s3_files[0][1], 'r') as file:
    df = pd.read_csv(file, low_memory=False, encoding='utf-8', dtype=str)

# I like column names lower case and with spaces replaced with underscores
data_utils.clean_col_names(df)

keep_cols = ['id', 'case_number', 'date', 'block', 'iucr', 'primary_type', 'description', 'location_description',
            'arrest', 'domestic', 'beat', 'district', 'ward', 'community_area', 'location', 'zip_codes']

# Filter out the data to the columns we're interested in
df = df[keep_cols]

# This will trim data as well as replace multiple white spaces inside text with a single white space
data_utils.remove_excess_white_space(df)

### We're not focused too much in this example on exploring the data, but it's useful to take a quick peak at the data in the columns to get an idea of it, and to help visualize what the regex is doing

In [None]:
df.head()

In [None]:
print(f'Number of crimes: {len(df):,}')
print(f'Number of crimes resulting in arrest: {len(df.arrest.eq("true")):,}')

print('\nMost common crimes (primary type):\n')
crime_cnt = Counter(df.primary_type)

for primary_type, cnt in crime_cnt.most_common(5):
    print(f'{primary_type} => {cnt:,}')

crime_cnt_spec = Counter(df['primary_type'] + ' => ' + df['description'])
print('\nMost common crimes (primary type => description):\n')
for specific_type, cnt in crime_cnt_spec.most_common(10):
    print(f'{specific_type} => {cnt:,}')
    
crime_loc = Counter(df.location_description)
print('\nMost common crime location:\n')
for location, cnt in crime_loc.most_common(10):
    print(f'{location} => {cnt:,}')

## Now let's clean the data

This is essentially what we want the Lambda function to do for us

1. Process hard rejects (and filter our DataFrame)
2. Process soft rejects
3. Analyze and log the rejects (so we know where to focus improvements in our scrubbing regex)
4. Upload the reject data to s3, as well as our clean data

In [None]:
%%time

df_filt, hard_rejects = process_hard_rejects(df)
soft_rejects = process_soft_rejects(df_filt)
unq_hard_rejects, unq_soft_rejects = data_utils.analyze_hard_and_soft_rejects(hard_rejects, soft_rejects)

file_name = s3_bucket_key.split('/')[-1]
hard_rej_df, soft_rej_df = data_utils.generate_reject_dfs(df, df_filt, file_name, hard_rejects, unq_hard_rejects, soft_rejects, unq_soft_rejects)

with s3.open(f'{S3_BUCKET}/rejects/hard_rejects_{file_name}', 'w') as hard_up, \
        s3.open(f'{S3_BUCKET}/rejects/soft_rejects_{file_name}', 'w') as soft_up, \
        s3.open(f'{S3_BUCKET}/clean_data/clean_{file_name}', 'w') as clean_up:
    hard_rej_df.to_csv(hard_up, index_label='file_index', encoding='utf-8')
    soft_rej_df.to_csv(soft_up, index_label='file_index', encoding='utf-8')
    
    clean_cols = [col for col in df_filt.columns if '_orig' not in col]
    df_filt[clean_cols].to_csv(clean_up, index=False, encoding='utf-8')

<font color=blue>You can go check your S3 bucket now to see the data is there</font>

## Let's take a peek at our hard rejcts and soft rejects

You will probably be looking these, tweaking your process, and re-running the above code over and over until your happy
with the results, then set up your Lambda function to do the heavy lifting

Notice that we are storing the file_name of the rejects, and that we uploaded this to s3.  We can easily list all the rejects
files in the rejects bucket we made in S3, load them all and concatenate the results to a single DataFrame and analyze everything
after Lambda does it's work

We are also storing the column(s) that caused the reject

In [None]:
hard_rej_df.head()

### For example, we stated as a hard reject validation in our regex that all "valid" case numbers, start with two letters, so let's check our rejects

In [None]:
hard_rej_df.loc[hard_rej_df.cols.str.contains('case_number'), 'case_number'].str.slice(0,2).unique()

In [None]:
# And for the "clean" data
df_filt['case_number'].str.slice(0,2).unique()

## Soft rejects, as they are not totally excluded, include the original value, so you can see what is being set to null

In [None]:
soft_rej_df.head()

### Similar to the function we used above, analyze_hard_and_soft_rejects, we can look at them this way

In [None]:
orig_cols = [col for col in soft_rej_df.columns if '_orig' in col]
soft_rej_df[orig_cols].notnull().sum()

### We can see what's being set to null and then tweak our regex

Here, it looks like our description validation is probably too strict, so there's room for improvement

In [None]:
MY_REGX.DESCRIPTION

In [None]:
soft_rej_df.loc[soft_rej_df.description_orig.notnull(), ['description', 'description_orig']].sample(20)

### It's also useful to look at the "clean" DataFrame, and sample it for the columns you need to clean up and see if anything weird is still in there and tweak your regex

In [None]:
df_filt[clean_cols].sample(10)

## Now let's turn this into a Lambda!