In [26]:
from dask_cuda import LocalCUDACluster
from dask.distributed import Client, wait
import subprocess

cmd = "hostname --all-ip-addresses"
process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)
output, error = process.communicate()
IPADDR = str(output.decode()).split()[0]

cluster = LocalCUDACluster(ip=IPADDR)
client = Client(cluster)
client

Port 8787 is already in use. 
Perhaps you already have a cluster running?
Hosting the diagnostics dashboard on a random port instead.


0,1
Client  Scheduler: tcp://192.168.100.33:46787  Dashboard: http://192.168.100.33:45413/status,Cluster  Workers: 1  Cores: 1  Memory: 8.31 GB


In [27]:
import cudf as cf
from collections import OrderedDict
from glob import glob
import numpy as np

## 1.0 Path to Dataset

Initialize dataset:
* Acquistion
* Performance
* Names

In [31]:
# Considering 2001Q1 Data initially (Will change later)

# Paths for Acquisition, Performance and Names data
acqPath = '/home/darren/HDD1000/mortgage2000_2016/acq/Acquisition_2000Q1.txt'
perfPath = '/home/darren/HDD1000/mortgage2000_2016/perf/Performance_2000Q1.txt_0'
namesPath = '/home/darren/HDD1000/mortgage2000_2016/names.csv'

# Ordered Dictionary for field names and data types
acqDtypes = OrderedDict([
        ("loan_id", "int64"),
        ("orig_channel", "category"),
        ("seller_name", "category"),
        ("orig_interest_rate", "float64"),
        ("orig_upb", "int64"),
        ("orig_loan_term", "int64"),
        ("orig_date", "date"),
        ("first_pay_date", "date"),
        ("orig_ltv", "float64"),
        ("orig_cltv", "float64"),
        ("num_borrowers", "float64"),
        ("dti", "float64"),
        ("borrower_credit_score", "float64"),
        ("first_home_buyer", "category"),
        ("loan_purpose", "category"),
        ("property_type", "category"),
        ("num_units", "int64"),
        ("occupancy_status", "category"),
        ("property_state", "category"),
        ("zip", "int64"),
        ("mortgage_insurance_percent", "float64"),
        ("product_type", "category"),
        ("coborrow_credit_score", "float64"),
        ("mortgage_insurance_type", "float64"),
        ("relocation_mortgage_indicator", "category")
    ])

perfDtypes = OrderedDict([
        ("loan_id", "int64"),
        ("monthly_reporting_period", "date"),
        ("servicer", "category"),
        ("interest_rate", "float64"),
        ("current_actual_upb", "float64"),
        ("loan_age", "float64"),
        ("remaining_months_to_legal_maturity", "float64"),
        ("adj_remaining_months_to_maturity", "float64"),
        ("maturity_date", "date"),
        ("msa", "float64"),
        ("current_loan_delinquency_status", "int32"),
        ("mod_flag", "category"),
        ("zero_balance_code", "category"),
        ("zero_balance_effective_date", "date"),
        ("last_paid_installment_date", "date"),
        ("foreclosed_after", "date"),
        ("disposition_date", "date"),
        ("foreclosure_costs", "float64"),
        ("prop_preservation_and_repair_costs", "float64"),
        ("asset_recovery_costs", "float64"),
        ("misc_holding_expenses", "float64"),
        ("holding_taxes", "float64"),
        ("net_sale_proceeds", "float64"),
        ("credit_enhancement_proceeds", "float64"),
        ("repurchase_make_whole_proceeds", "float64"),
        ("other_foreclosure_proceeds", "float64"),
        ("non_interest_bearing_upb", "float64"),
        ("principal_forgiveness_upb", "float64"),
        ("repurchase_make_whole_proceeds_flag", "category"),
        ("foreclosure_principal_write_off_amount", "float64"),
        ("servicing_activity_indicator", "category")
    ])

namesDtypes = OrderedDict([
    ("seller_name", "category"),
    ("new", "category"),
])

acqDF = cf.read_csv(acqPath, sep='|', names=list(acqDtypes.keys()), dtype=list(acqDtypes.values()))
# Selecting only 100000 rows due to memory constraints
perfDF = cf.read_csv(perfPath, sep='|', names=list(perfDtypes.keys()), dtype=list(perfDtypes.values()),nrows=100000)
namesDF = cf.read_csv(namesPath, sep='|', names=list(namesDtypes.keys()), dtype=list(namesDtypes.values()))


# ETL

In [32]:
def null_workaround(df, **kwargs):
    for column, data_type in df.dtypes.items():
        if str(data_type) == "category":
            df[column] = df[column].astype('int32').fillna(-1)
        if str(data_type) in ['int8', 'int16', 'int32', 'int64', 'float32', 'float64']:
            df[column] = df[column].fillna(np.dtype(data_type).type(-1))
    return df

In [33]:
%%time
acqDF = acqDF.merge(namesDF, how='left', on=['seller_name'])
acqDF["seller_name"] = acqDF['new']
acqDF = acqDF.drop('new')

## Replaces create_ever_features
everDF = perfDF[['loan_id','current_loan_delinquency_status']]
everDF = everDF.groupby('loan_id', method='hash', as_index=False).max()
everDF['ever_30'] = (everDF['current_loan_delinquency_status'] >= 1).astype('int8')
everDF['ever_90'] = (everDF['current_loan_delinquency_status'] >= 3).astype('int8')
everDF['ever_180'] = (everDF['current_loan_delinquency_status'] >= 6).astype('int8')
everDF = everDF.drop('current_loan_delinquency_status')

## Replaces create_delinq_features
delinqDF = perfDF[['loan_id', 'monthly_reporting_period', 'current_loan_delinquency_status']]
delinqDF_30 = delinqDF.query('current_loan_delinquency_status >= 1')[['loan_id', 'monthly_reporting_period']].groupby('loan_id', method='hash', as_index=False).min()
delinqDF_30['delinquency_30'] = delinqDF_30['monthly_reporting_period']
delinqDF_30 = delinqDF_30.drop('monthly_reporting_period')
delinqDF_90 = delinqDF.query('current_loan_delinquency_status >= 3')[['loan_id', 'monthly_reporting_period']].groupby('loan_id', method='hash', as_index=False).min()
delinqDF_90['delinquency_90'] = delinqDF_90['monthly_reporting_period']
delinqDF_90 = delinqDF_90.drop('monthly_reporting_period')
delinqDF_180 = delinqDF.query('current_loan_delinquency_status >= 6')[['loan_id', 'monthly_reporting_period']].groupby('loan_id', method='hash', as_index=False).min()
delinqDF_180['delinquency_180'] = delinqDF_180['monthly_reporting_period']
delinqDF_180 = delinqDF_180.drop('monthly_reporting_period')
del(delinqDF)
delinqDF_merge = delinqDF_30.merge(delinqDF_90, how='left', on=['loan_id'], type='hash')
delinqDF_merge['delinquency_90'] = delinqDF_merge['delinquency_90'].fillna(np.dtype('datetime64[ms]').type('1970-01-01').astype('datetime64[ms]'))
delinqDF_merge = delinqDF_merge.merge(delinqDF_180, how='left', on=['loan_id'], type='hash')
delinqDF_merge['delinquency_180'] = delinqDF_merge['delinquency_180'].fillna(np.dtype('datetime64[ms]').type('1970-01-01').astype('datetime64[ms]'))
del(delinqDF_30)
del(delinqDF_90)
del(delinqDF_180)

## Replaces join_ever_delinq_features
everDF = everDF.merge(delinqDF_merge, on=['loan_id'], how='left', type='hash')
everDF['delinquency_30'] = everDF['delinquency_30'].fillna(np.dtype('datetime64[ms]').type('1970-01-01').astype('datetime64[ms]'))
everDF['delinquency_90'] = everDF['delinquency_90'].fillna(np.dtype('datetime64[ms]').type('1970-01-01').astype('datetime64[ms]'))
everDF['delinquency_180'] = everDF['delinquency_180'].fillna(np.dtype('datetime64[ms]').type('1970-01-01').astype('datetime64[ms]'))

del(delinqDF_merge)

## Replaces create_joined_df
testDF = perfDF[['loan_id', 'monthly_reporting_period', 'current_loan_delinquency_status', 'current_actual_upb']]
testDF['timestamp'] = testDF['monthly_reporting_period']
testDF = testDF.drop('monthly_reporting_period')
testDF['timestamp_month'] = testDF['timestamp'].dt.month
testDF['timestamp_year'] = testDF['timestamp'].dt.year
testDF['delinquency_12'] = testDF['current_loan_delinquency_status']
testDF = testDF.drop('current_loan_delinquency_status')
testDF['upb_12'] = testDF['current_actual_upb']
testDF = testDF.drop('current_actual_upb')
testDF['upb_12'] = testDF['upb_12'].fillna(999999999)
testDF['delinquency_12'] = testDF['delinquency_12'].fillna(-1)
joinedDF = testDF.merge(everDF, how='left', on=['loan_id'], type='hash')
del(testDF)
joinedDF['ever_30'] = joinedDF['ever_30'].fillna(-1)
joinedDF['ever_90'] = joinedDF['ever_90'].fillna(-1)
joinedDF['ever_180'] = joinedDF['ever_180'].fillna(-1)
joinedDF['delinquency_30'] = joinedDF['delinquency_30'].fillna(-1)
joinedDF['delinquency_90'] = joinedDF['delinquency_90'].fillna(-1)
joinedDF['delinquency_180'] = joinedDF['delinquency_180'].fillna(-1)
joinedDF['timestamp_year'] = joinedDF['timestamp_year'].astype('int32')
joinedDF['timestamp_month'] = joinedDF['timestamp_month'].astype('int32')


## Replaces create_12_mon_features
testdfs = []
n_months = 12
for y in range(1, n_months + 1):
    tmpDF = joinedDF[['loan_id', 'timestamp_year', 'timestamp_month', 'delinquency_12', 'upb_12']]
    tmpDF['josh_months'] = tmpDF['timestamp_year'] * 12 + tmpDF['timestamp_month']
    tmpDF['josh_mody_n'] = ((tmpDF['josh_months'].astype('float64') - 24000 - y) / 12).floor()
    tmpDF = tmpDF.groupby(['loan_id', 'josh_mody_n'], method='hash', as_index=False).agg({'delinquency_12': 'max','upb_12': 'min'})
    tmpDF['delinquency_12'] = (tmpDF['max_delinquency_12']>3).astype('int32')
    tmpDF['delinquency_12'] +=(tmpDF['min_upb_12']==0).astype('int32')
    tmpDF = tmpDF.drop('max_delinquency_12')
    tmpDF['upb_12'] = tmpDF['min_upb_12']
    tmpDF = tmpDF.drop('min_upb_12')
    tmpDF['timestamp_year'] = (((tmpDF['josh_mody_n'] * n_months) + 24000 + (y - 1)) / 12).floor().astype('int16')
    tmpDF['timestamp_month'] = np.int8(y)
    tmpDF = tmpDF.drop('josh_mody_n')
    testdfs.append(tmpDF)
    del(tmpDF)
testDF = cf.concat(testdfs)

## Replaces combine_joined_12_min
joinedDF = joinedDF.drop(['delinquency_12','upb_12'])
joinedDF['timestamp_year'] = joinedDF['timestamp_year'].astype('int16')
joinedDF['timestamp_month'] = joinedDF['timestamp_month'].astype('int8')
joinedDF = joinedDF.merge(testDF, how='left', on=['loan_id', 'timestamp_year', 'timestamp_month'], type='hash')

del(testDF)

## Replaces final_performace_delinquency
merged = null_workaround(perfDF)
joinedDF = null_workaround(joinedDF)
joinedDF['timestamp_month'] = joinedDF['timestamp_month'].astype('int8')
joinedDF['timestamp_year'] = joinedDF['timestamp_year'].astype('int16')
merged['timestamp_month'] = merged['monthly_reporting_period'].dt.month
merged['timestamp_month'] = merged['timestamp_month'].astype('int8')
merged['timestamp_year'] = merged['monthly_reporting_period'].dt.year
merged['timestamp_year'] = merged['timestamp_year'].astype('int16')
merged = merged.merge(joinedDF, how='left', on=['loan_id', 'timestamp_year', 'timestamp_month'], type='hash')
perfDF = merged.drop(['timestamp_year','timestamp_month'])

del(joinedDF, merged)


## Replaces join_perf_acq_gdfs
perfDF = null_workaround(perfDF)
acqDF = null_workaround(acqDF)
finalDF = perfDF.merge(acqDF, how='left', on=['loan_id'], type='hash')

del(perfDF, acqDF)

## Replaces last_mile_cleaning
drop_list = [
        'loan_id', 'orig_date', 'first_pay_date', 'seller_name',
        'monthly_reporting_period', 'last_paid_installment_date', 'maturity_date', 'ever_30', 'ever_90', 'ever_180',
        'delinquency_30', 'delinquency_90', 'delinquency_180', 'upb_12',
        'zero_balance_effective_date','foreclosed_after', 'disposition_date','timestamp'
    ]

for column in drop_list:
        finalDF = finalDF.drop(column)
for col, dtype in finalDF.dtypes.iteritems():
        if str(dtype)=='category':
            finalDF[col] = finalDF[col].cat.codes
        finalDF[col] = finalDF[col].astype('float32')
finalDF['delinquency_12'] = finalDF['delinquency_12'] > 0
finalDF['delinquency_12'] = finalDF['delinquency_12'].fillna(False).astype('int32')
for column in finalDF.columns:
    finalDF[column] = finalDF[column].fillna(np.dtype(str(finalDF[column].dtype)).type(-1))
print(finalDF)
finalDF = finalDF.to_arrow(preserve_index=False)

         servicer  interest_rate  current_actual_upb  loan_age  remaining_months_to_legal_maturity  adj_remaining_months_to_maturity      msa ...  relocation_mortgage_indicator
0   1240500900.0          7.875           184799.27      24.0                               336.0                               0.0  42660.0 ...                      2313200.0
1           -1.0          7.875           186109.36      14.0                               346.0                             346.0  42660.0 ...                      2313200.0
2           -1.0          7.875           186250.22      13.0                               347.0                             347.0  42660.0 ...                      2313200.0
3           -1.0          7.875           185967.56      15.0                               345.0                             345.0  42660.0 ...                      2313200.0
4           -1.0          7.875           185244.55      20.0                               340.0                      