In [23]:
import numpy as np 
import pandas as pd 
import os 
import time 

## Import Test Data

In [117]:
DATAFOLDER = "~/Documents/data-science-coursework/nyu-ml/project/"
fp = os.path.join(DATAFOLDER, 'data_for_model/data_with_trends_all_features_2018-05-08_v2.csv')
appeals_with_trends = pd.read_csv(fp)
test_judges = appeals_with_trends['ij_code'].value_counts()[:10].index.tolist()
appeals_with_trends = appeals_with_trends[appeals_with_trends['ij_code'].isin(test_judges)].copy()
appeals_with_trends['datAppealFiled_dt'] = pd.to_datetime(appeals_with_trends['datAppealFiled_dt'])
appeals_with_trends['datBIADecision_dt'] = pd.to_datetime(appeals_with_trends['datBIADecision_dt'])

## Break Into Data and Ref Tables

In [87]:
ref = appeals_with_trends[['ij_code', 'datBIADecision_dt', 'granted']].copy() 
ref = ref.sort_values(by=['ij_code', 'datBIADecision_dt'], ascending=[True, False])
data = appeals_with_trends[['idnAppeal', 'ij_code', 'datAppealFiled_dt']].copy()
print(ref.info())
print(data.info())

<class 'pandas.core.frame.DataFrame'>
Int64Index: 15308 entries, 223820 to 8653
Data columns (total 3 columns):
ij_code              15308 non-null object
datBIADecision_dt    15308 non-null datetime64[ns]
granted              15308 non-null int64
dtypes: datetime64[ns](1), int64(1), object(1)
memory usage: 478.4+ KB
None
<class 'pandas.core.frame.DataFrame'>
Int64Index: 15308 entries, 52 to 247234
Data columns (total 3 columns):
idnAppeal            15308 non-null int64
ij_code              15308 non-null object
datAppealFiled_dt    15308 non-null datetime64[ns]
dtypes: datetime64[ns](1), int64(1), object(1)
memory usage: 478.4+ KB
None


## Test Various Methods

In [88]:
# use multiprocessing to generate average grant rate of last n appeals with the same feature 

from multiprocessing import Pool
from functools import partial 

def apply_func_to_row(row, feature, last_n): 
    """ Creates function to be applied to a dataframe row. Normally we would do df.apply(func, axis=1) 
        but this proves too slow for our purpose. """
    return ref[(ref['ij_code'] == row[feature]) & 
               (ref['datBIADecision_dt'] < row['datAppealFiled_dt'])]['granted'].head(last_n).mean() 

def apply_func_to_data(data, row_func):
    """ Applies function to a (subset of) data and returns ()'idnAppeal', 'judge_last_10_decisions') as a df"""
    data['judge_last_10_decisions'] = data.apply(row_func, axis=1) 
    return data[['idnAppeal', 'judge_last_10_decisions']] 

def parallelize_dataframe(df, func):
    """ Splits data and runs above func in parallel, then combines results to return as a single df """
    df_split = np.array_split(df, 8)
    pool = Pool(4)
    df = pd.concat(pool.map(func, df_split))
    pool.close()
    pool.join()
    return df

def get_recent_decisions(df, feature, last_n): 
    """ Ties above helper methods to get average grant rate of last n appeals that share same feature """
    row_func = partial(apply_func_to_row, feature=feature, last_n=last_n)
    data_func = partial(apply_func_to_data, row_func=row_func) 
    recent_decisions = parallelize_dataframe(df, data_func)
    return recent_decisions 

#### Regular Apply

In [89]:
# with regular apply 
start_time = time.time()
judge_last_10_decisions = data.apply(partial(apply_func_to_row, feature='ij_code', last_n=10), axis=1)
print("Computed judge_last_10_decisions in {} seconds".format(time.time() - start_time))

Computed judge_last_10_decisions in 44.3526830673 seconds


#### Multiprocessing 

In [90]:
# with multiprocessing 
start_time = time.time()
judge_last_10_decisions = get_recent_decisions(data, 'ij_code', 10)
print("Computed judge_last_10_decisions in {} seconds".format(time.time() - start_time))

Computed judge_last_10_decisions in 23.5769910812 seconds


#### Chunk Self-Join

In [91]:
# chunk self-join 
def chunk_compute(data_chunk, ref_chunk): 
    start_time = time.time() 
    df = data_chunk.merge(ref_chunk, how='left', on='ij_code')
    results = df[df['datBIADecision_dt'] < df['datAppealFiled_dt']].groupby('idnAppeal').apply(
        lambda f: f.head(10)['granted'].mean()) 
    print("Completed in {} seconds".format(time.time() - start_time)) 
    return results 

In [95]:
results = chunk_compute(data, ref)

Completed in 13.7086651325 seconds


## Proceed with Chunk Compute 

In [94]:
def break_into_chunks(data, dimension, max_chunk): 
    """ Returns a dictionary of lists to instruct breaking up dataset into chunks, 
        where resulting rows from self-join on ij_code does not exceed max_df_rows """
    dimensions = pd.DataFrame(data.groupby(dimension).size().sort_values(ascending=False)) 
    dimensions = dimensions.rename(columns={0: 'rows'}).reset_index() 
    dimensions['self_join'] = dimensions['rows'] ** 2
    dimensions['self_join_cumulative'] = dimensions['self_join'].cumsum() 
    dimensions['chunk'] = np.floor(dimensions['self_join_cumulative'] / max_chunk).astype(int)
    chunk_assignments = dimensions.groupby('chunk')[dimension].apply(list).to_dict()
    return chunk_assignments

In [124]:
def compute_recent_decisions(data, dimension, max_chunk=50000000): 
    
    # get chunk assignments 
    chunk_assignments = break_into_chunks(data, dimension, max_chunk)
    
    # initialize empty list 
    results = [] 
    
    # loop through each chunk 
    for chunk, selected in chunk_assignments.iteritems(): 
        data_variables = ['idnAppeal', 'datAppealFiled_dt'] + [dimension]
        ref_variables = ['datBIADecision_dt', 'granted'] + [dimension] 
        data_chunk = data[data[dimension].isin(selected)][data_variables]
        ref_chunk = data[data[dimension].isin(selected)][ref_variables].sort_values(
            by=[dimension] + ['datBIADecision_dt'], ascending=[True, False])  
        result = chunk_compute(data_chunk, ref_chunk)
        results.append(result)
    
    return pd.concat(results) 

In [125]:
results = compute_recent_decisions(appeals_with_trends, 'ij_code', max_chunk=50000000)

Completed in 15.0160229206 seconds
Completed in 7.08671593666 seconds


#### Check 

In [130]:
results.loc[86]

0.22222222222222221

In [38]:
data[data['idnAppeal'] == 86]

Unnamed: 0,idnAppeal,ij_code,datAppealFiled_dt
40173,86,RJF,1994-04-08


In [39]:
ref[(ref['ij_code'] == 'RJF') & (ref['datBIADecision_dt'] < '1994-04-08')]

Unnamed: 0,ij_code,datBIADecision_dt,granted
57329,RJF,1994-03-28,0
70861,RJF,1994-03-23,1
70766,RJF,1994-02-25,0
70730,RJF,1994-02-15,0
70688,RJF,1994-02-14,1
14770,RJF,1994-02-10,0
70760,RJF,1993-11-05,0
70695,RJF,1993-11-04,0
70868,RJF,1993-10-28,0
