# Expungement Eligibility Classification - Parallel Featurization

## Dask Transformations

Loading up the Dask client is necessary to run processes on multiple workers

In [None]:
import os

import sqlalchemy as sa
from sqlalchemy.sql import select
from sqlalchemy import (
    Table, 
    Column, 
    Integer, 
    String, 
    MetaData, 
    DateTime,
    or_
)
import pandas as pd
import numpy as np
import dask.dataframe as dd

In [None]:
from distributed import Client

client = Client()
client

### Data Loading

In [None]:
USER = 'jupyter'
PASSWORD = os.environ['POSTGRES_PASS']
HOST = 'localhost'
PORT = '5432'
DB = 'expunge'

DATABASE_URI = f"postgresql://{USER}:{PASSWORD}@{HOST}:{PORT}/{DB}"
engine = sa.create_engine(DATABASE_URI)

Loading SQL extension for useful spot-checking

In [None]:
%load_ext sql
%sql {DATABASE_URI}

Dask DataFrame does not accept raw SQL, but will accept a SQLAlchemy ORM query object. We are using this to read data from `expunge` sorted by both `person_id` and `HearingDate`. This ordering is important for some of the partitioned aggregations. 

In [None]:
metadata_obj = MetaData()
expunge = Table('expunge_clean', metadata_obj, # Full Dataset
# expunge = Table('expunge_10k_clean', metadata_obj, # ~26K records
# expunge = Table('expunge_1k_clean', metadata_obj, # ~26K records
     Column('person_id', Integer),
     Column('HearingDate', DateTime),
     Column('CodeSection', String),
     Column('ChargeType', String),
     Column('Class', String),
     Column('DispositionCode', String),
     Column('Plea', String),
     Column('Race', String),
     Column('Sex', String),
     Column('fips', Integer),
)

Here you can see the raw query string to which the `query` SQLAlchemy object translates

In [None]:
query = (
    select(expunge)
    # Where clause just for testing, comment out for full run
    .where(
        or_(
            expunge.c.person_id == 127051000000102, 
            expunge.c.person_id == 224010000000817,
            expunge.c.person_id == 1000000000362
        )
    )
    .order_by(expunge.c.person_id, expunge.c.HearingDate)
)
print(str(query))

In [None]:
meta_dict = {
    'HearingDate': 'datetime64[ns]',
    'CodeSection': str,
    'ChargeType': str,
    'Class': str,
    'DispositionCode': str,
    'Plea': str,
    'Race': str,
    'Sex': str,
    'fips': 'int64'
}

meta_frame = pd.DataFrame(columns=meta_dict.keys()).astype(meta_dict)

meta_frame.dtypes

In [None]:
%%time
df = dd.read_sql_table(
    table=query,
    index_col='person_id',
    uri=DATABASE_URI,
#     npartitions=32,
#     npartitions=8
    meta=meta_frame
)

In [None]:
df

In [None]:
pd.set_option('max_columns', None)

In [None]:
df.head()

Number of partitions that the data is split into. Essentially, `npartitions` is equal to the number of separate Pandas DataFrames that Dask is operating on under the hood

In [None]:
df.npartitions

These divisions are the cutoffs for the various partitions. Dask automatically generates the divisions, splitting data into ~100-250mb Pandas DataFrames. 

Since `person_id` is the index, Dask will guarantee that a given `person_id` always falls entirely within a single partition. This is important for performing aggregations on a single person_id without shuffling records across nodes.

In [None]:
df.divisions[:5]

### Data Cleaning & Featurization

In [None]:
df['CodeSection'] = df['CodeSection'].fillna('MISSING')

In [None]:
VALID_DISPOSITIONS = [
    'Guilty',
    'Guilty In Absentia',
    'Dismissed',
    'Nolle Prosequi',
    'Not Guilty',
    'Not Guilty/Acquitted',
    'No Indictment Presented',
    'Not True Bill',
    'Dismissed/Other'
]

df = df[
    (~df['DispositionCode'].isna())
    & (df['DispositionCode'].isin(VALID_DISPOSITIONS))
]

In [None]:
%%time
DISPOSITION_MAP = {
    'Nolle Prosequi': 'Dismissed',
    'No Indictment Presented': 'Dismissed',
    'Not True Bill': 'Dismissed',
    'Dismissed/Other': 'Dismissed',
    'Not Guilty': 'Dismissed',
    'Not Guilty/Acquitted': 'Dismissed',
    'Guilty In Absentia': 'Conviction',
    'Guilty': 'Conviction',
}

df['disposition'] = df['DispositionCode'].replace(DISPOSITION_MAP)

df.head()

In [None]:
%%time
deferral_pleas = [
    'Alford',
    'Guilty',
    'Nolo Contendere'
]

deferral_conditions = (
    (df['Plea'].isin(deferral_pleas))
    & (df['disposition']=='Dismissed')
)

df['disposition'] = df['disposition'].mask(deferral_conditions, 'Deferral Dismissal')

df[df['disposition']=='Deferral Dismissal'].head()

In [None]:
%%time
df['chargetype'] = df['ChargeType']

df.head()

In [None]:
COVERED_SECTIONS_A = [
    '4.1-305', 
    '18.2-250.1'
]

COVERED_SECTIONS_B = [
    '4.1-305',
    '18.2-96',
    '18.2-103',
    '18.2-119',
    '18.2-120',
    '18.2-134',
    '18.2-250.1',
    '18.2-415'
]

COVERED_SECTIONS_B_MISDEMEANOR = [
    '18.2-248.1'
]

EXCLUDED_SECTIONS_TWELVE = [
    '18.2-36.1',
    '18.2-36.2',
    '18.2-51.4',
    '18.2-51.5',
    '18.2-57.2',
    '18.2-266',
    '46.2-341.24'
]

In [None]:
def assign_code_section(row):
    if (
        row['CodeSection'] in COVERED_SECTIONS_A 
        and row['disposition']=='Deferral Dismissal'
    ):
        return 'covered in 19.2-392.6 - A'
    
    elif (
        row['CodeSection'] in COVERED_SECTIONS_B
        or (
            row['CodeSection'] in COVERED_SECTIONS_B_MISDEMEANOR
            and row['chargetype']=='Misdemeanor'
        )
    ):
        return 'covered in 19.2-392.6 - B'
    
    elif row['CodeSection'] in EXCLUDED_SECTIONS_TWELVE:
        return 'excluded by 19.2-392.12'
    
    else:
        return 'covered elsewhere'

In [None]:
%%time
df['codesection'] = df.map_partitions(
    lambda df: df.apply(assign_code_section, axis=1),
    meta=pd.Series(dtype=str)
)

df.head()

In [None]:
def has_conviction(df):
    conviction_map = (df['disposition']
              .apply(lambda x: x=='Conviction')
              .groupby('person_id')
              .any())
    
    return df.index.map(conviction_map)

In [None]:
%%time
df['convictions'] = df.map_partitions(
    has_conviction,
    meta=pd.Series(dtype=bool)
)

df.head()

**Question** - What about same day hearings?

In [None]:
def shift_hearing_date(df, shift_by):
    return (
        df.groupby('person_id')['HearingDate']
          .shift(periods=shift_by)
    )

In [None]:
%%time
df['last_hearing_date'] = df.map_partitions(
    shift_hearing_date,
    shift_by=1,
    meta=pd.Series(dtype='datetime64[ns]')
)

df.head()

In [None]:
%%time
df['next_hearing_date'] = df.map_partitions(
    shift_hearing_date,
    shift_by=-1,
    meta=pd.Series(dtype='datetime64[ns]')
)

df.head()

In [None]:
def get_felony_conviction_dates(df):
    return np.where(
        (df['disposition']=='Conviction') & (df['chargetype']=='Felony'), 
        df['HearingDate'],
        np.datetime64('NaT')
    )

In [None]:
%%time
df['felony_conviction_date'] = df.map_partitions(
    get_felony_conviction_dates,
    meta=pd.Series(dtype='datetime64[ns]')
)

df.head()

In [None]:
def get_last_felony_conviction_date(df):
    return (
        df['felony_conviction_date']
            .groupby('person_id')
            .shift(1)
            .groupby('person_id')
            .ffill()
            .fillna(pd.NaT)
    )

In [None]:
%%time
df['last_felony_conviction_date'] = df.map_partitions(
    get_last_felony_conviction_date,
    meta=pd.Series(dtype='datetime64[ns]')
)
df = df.drop('felony_conviction_date', axis='columns')

df.head()

In [None]:
%%time
df['days_since_last_hearing'] = df['HearingDate'] - df['last_hearing_date']
df['days_until_next_hearing'] = df['next_hearing_date'] - df['HearingDate']
df['days_since_last_felony_conviction'] = df['HearingDate'] - df['last_felony_conviction_date']

df.head()

In [None]:
df.head(20)[['disposition','chargetype','HearingDate','last_felony_conviction_date','days_since_last_felony_conviction']]

## To Do - Features
1. `days_until_next_conviction`
2. `class_3_or_4_last_20`
3. `class_1_or_2`

### Writing and Loading Data
1. Write data to csv in `/tmp` directory
2. Load data to PostGres via `COPY` statements

This approach is *much* faster than loading via `df.to_sql`, since PostGres will help us load many records at once, instead of loading 1 by 1 via `INSERT` statements

In [None]:
target_dir = '/tmp/expunge_data'
target_glob = f'{target_dir}/expunge_features-*.csv'

return_val = os.system(f'rm -rf {target_glob}')

In [None]:
%%time
file_paths = df.to_csv(target_glob)

file_paths[:5]

Useful pandas functionality to approximate the SQL statement to create a table

In [None]:
from pandas.io.sql import get_schema

In [None]:
print(get_schema(df.head(), 'expunge_features'))

We're truncating before loading to avoid duplicate rows on re-runs

In [None]:
%%sql
DROP TABLE expunge_features;

In [None]:
engine.execute("""
    CREATE TABLE IF NOT EXISTS expunge_features (
        person_id BIGINT,
        "HearingDate" DATE,
        "CodeSection" TEXT,
        "ChargeType" TEXT,
        "Class" TEXT,
        "DispositionCode" TEXT,
        "Plea" TEXT,
        "Race" TEXT,
        "Sex" TEXT,
        "fips" INTEGER,
        "disposition" TEXT,
        "chargetype" TEXT,
        "codesection" TEXT,
        "convictions" BOOLEAN,
        "last_hearing_date" DATE,
        "next_hearing_date" DATE,
        "last_felony_conviction_date" DATE,
        "days_since_last_hearing" TEXT,
        "days_until_next_hearing" TEXT,
        "days_since_last_felony_conviction" TEXT
    );
    
    TRUNCATE TABLE expunge_features;
""")

These `COPY` statements do all of the data loading from CSVs

In [None]:
for path in file_paths:
    engine.execute(f"""
        COPY expunge_features
        FROM '{path}'
        WITH CSV HEADER;
        commit;
    """)

Make sure the data made it to the database

In [None]:
%%sql
SELECT COUNT(*)
FROM expunge_features

### Notes/Questions

- `ChargeType` and `chargetype` appear the same in `expunge` - is that because of cleaning done post-load?

### Added Columns
- `last_hearing_date`
- `last_felony_conviction_date`
- `next_hearing_date`
- `days_since_last_hearing`
- `days_since_last_felony_conviction`
- `days_until_next_hearing`

In [None]:
%%sql
SELECT *
FROM expunge_features
LIMIT 10

## Tables for Testing

Move 10k person_id's from clean table into materialized view for testing

In [None]:
%%sql
CREATE MATERIALIZED VIEW expunge_10k_clean AS
WITH ids AS (
    SELECT 
        DISTINCT person_id
    FROM expunge_clean
    LIMIT 10000
)
SELECT e.*
FROM expunge_clean e
WHERE EXISTS (
    SELECT 1
    FROM ids i
    WHERE i.person_id = e.person_id
)
ORDER BY e.person_id, e."HearingDate"

In [None]:
%%sql
CREATE MATERIALIZED VIEW expunge_1k_clean AS
WITH ids AS (
    SELECT 
        DISTINCT person_id
    FROM expunge_clean
    LIMIT 1000
)
SELECT e.*
FROM expunge_clean e
WHERE EXISTS (
    SELECT 1
    FROM ids i
    WHERE i.person_id = e.person_id
)
ORDER BY e.person_id, e."HearingDate"

In [None]:
%%sql
SELECT *
FROM expunge_1k_clean
WHERE person_id = 1000000000003

In [None]:
%%sql
SELECT *
FROM expunge_10k_clean
ORDER BY person_id, "HearingDate"
LIMIT 10