In [None]:
%env NCCL_P2P_DISABLE=1

In [None]:
import numpy as np
import dask_xgboost as dxgb_gpu
import dask
import dask_cudf
from dask_cuda import LocalCUDACluster
from dask.delayed import delayed
from dask.distributed import Client, wait
import xgboost as xgb
import cudf
from cudf.dataframe import DataFrame
from collections import OrderedDict
import gc
from glob import glob
import os
from IPython.display import display

In [None]:
import subprocess

cmd = "hostname --all-ip-addresses"
process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)
output, error = process.communicate()
IPADDR = str(output.decode()).split()[0]

cluster = LocalCUDACluster(ip=IPADDR)
client = Client(cluster)
client

In [None]:
# to download data for this notebook, visit https://docs.rapids.ai/datasets/mortgage-data and update the following paths accordingly
acq_data_path = "/data/kratos/mortgage_1yr/acq"
perf_data_path = "/data/kratos/mortgage_1yr/perf"
col_names_path = "/data/kratos/mortgage_1yr/names.csv"
start_year = 2000
end_year = 2003 # end_year is inclusive
part_count = 16 # the number of data files to train against

In [None]:
## REPLACE WITH NEW SETALLOCATOR
def initialize_rmm_pool():
    from librmm_cffi import librmm_config as rmm_cfg

    rmm_cfg.use_pool_allocator = True
    rmm_cfg.initial_pool_size = int(1.4e10) # set to 2GiB. Default is 1/2 total GPU memory
    import cudf
    return cudf.rmm.initialize()

def initialize_rmm_no_pool():
    from librmm_cffi import librmm_config as rmm_cfg
    
    rmm_cfg.use_pool_allocator = False
    import cudf
    return cudf.rmm.initialize()

def finalize_rmm():
    import cudf
    return cudf.rmm.finalize()

In [None]:
client.run(initialize_rmm_pool)

### IO methods for loading the data

In [None]:
def load_names_df(names_df_path, **kwargs):
    """ Loads names used for renaming the banks
    
    Returns
    -------
    Dask GPU DataFrame
    """

    cols = [
        'seller_name', 'new'
    ]
    
    dtypes = OrderedDict([
        ("seller_name", "category"),
        ("new", "category"),
    ])

    return dask_cudf.read_csv(names_df_path, names=cols, delimiter='|', dtype=list(dtypes.values()), skiprows=1)


def load_perf_df(performance_path, **kwargs):
    """ Loads performance data

    Returns
    -------
    GPU DataFrame
    """
    
    cols = [
        "loan_id", "monthly_reporting_period", "servicer", "interest_rate", "current_actual_upb",
        "loan_age", "remaining_months_to_legal_maturity", "adj_remaining_months_to_maturity",
        "maturity_date", "msa", "current_loan_delinquency_status", "mod_flag", "zero_balance_code",
        "zero_balance_effective_date", "last_paid_installment_date", "foreclosed_after",
        "disposition_date", "foreclosure_costs", "prop_preservation_and_repair_costs",
        "asset_recovery_costs", "misc_holding_expenses", "holding_taxes", "net_sale_proceeds",
        "credit_enhancement_proceeds", "repurchase_make_whole_proceeds", "other_foreclosure_proceeds",
        "non_interest_bearing_upb", "principal_forgiveness_upb", "repurchase_make_whole_proceeds_flag",
        "foreclosure_principal_write_off_amount", "servicing_activity_indicator"
    ]
    
    dtypes = OrderedDict([
        ("loan_id", "int64"),
        ("monthly_reporting_period", "date"),
        ("servicer", "category"),
        ("interest_rate", "float64"),
        ("current_actual_upb", "float64"),
        ("loan_age", "float64"),
        ("remaining_months_to_legal_maturity", "float64"),
        ("adj_remaining_months_to_maturity", "float64"),
        ("maturity_date", "date"),
        ("msa", "float64"),
        ("current_loan_delinquency_status", "int32"),
        ("mod_flag", "category"),
        ("zero_balance_code", "category"),
        ("zero_balance_effective_date", "date"),
        ("last_paid_installment_date", "date"),
        ("foreclosed_after", "date"),
        ("disposition_date", "date"),
        ("foreclosure_costs", "float64"),
        ("prop_preservation_and_repair_costs", "float64"),
        ("asset_recovery_costs", "float64"),
        ("misc_holding_expenses", "float64"),
        ("holding_taxes", "float64"),
        ("net_sale_proceeds", "float64"),
        ("credit_enhancement_proceeds", "float64"),
        ("repurchase_make_whole_proceeds", "float64"),
        ("other_foreclosure_proceeds", "float64"),
        ("non_interest_bearing_upb", "float64"),
        ("principal_forgiveness_upb", "float64"),
        ("repurchase_make_whole_proceeds_flag", "category"),
        ("foreclosure_principal_write_off_amount", "float64"),
        ("servicing_activity_indicator", "category")
    ])

    print(performance_path)
    
    return dask_cudf.read_csv(performance_path, names=cols, delimiter='|', dtype=list(dtypes.values()), skiprows=1)


def load_acq_df(acquisition_path, **kwargs):
    """ Loads acquisition data

    Returns
    -------
    GPU DataFrame
    """
    
    cols = [
        'loan_id', 'orig_channel', 'seller_name', 'orig_interest_rate', 'orig_upb', 'orig_loan_term', 
        'orig_date', 'first_pay_date', 'orig_ltv', 'orig_cltv', 'num_borrowers', 'dti', 'borrower_credit_score', 
        'first_home_buyer', 'loan_purpose', 'property_type', 'num_units', 'occupancy_status', 'property_state',
        'zip', 'mortgage_insurance_percent', 'product_type', 'coborrow_credit_score', 'mortgage_insurance_type', 
        'relocation_mortgage_indicator'
    ]
    
    dtypes = OrderedDict([
        ("loan_id", "int64"),
        ("orig_channel", "category"),
        ("seller_name", "category"),
        ("orig_interest_rate", "float64"),
        ("orig_upb", "int64"),
        ("orig_loan_term", "int64"),
        ("orig_date", "date"),
        ("first_pay_date", "date"),
        ("orig_ltv", "float64"),
        ("orig_cltv", "float64"),
        ("num_borrowers", "float64"),
        ("dti", "float64"),
        ("borrower_credit_score", "float64"),
        ("first_home_buyer", "category"),
        ("loan_purpose", "category"),
        ("property_type", "category"),
        ("num_units", "int64"),
        ("occupancy_status", "category"),
        ("property_state", "category"),
        ("zip", "int64"),
        ("mortgage_insurance_percent", "float64"),
        ("product_type", "category"),
        ("coborrow_credit_score", "float64"),
        ("mortgage_insurance_type", "float64"),
        ("relocation_mortgage_indicator", "category")
    ])
    
    print(acquisition_path)
    return dask_cudf.read_csv(acquisition_path, names=cols, delimiter='|', dtype=list(dtypes.values()), skiprows=1)

### Feature creation methods

In [None]:
def create_ever_features(gdf, **kwargs):
    everdf = gdf.groupby('loan_id').current_loan_delinquency_status.max().to_frame()
    everdf['ever_30'] = (everdf['current_loan_delinquency_status'] >= 1).astype('int8')
    everdf['ever_90'] = (everdf['current_loan_delinquency_status'] >= 3).astype('int8')
    everdf['ever_180'] = (everdf['current_loan_delinquency_status'] >= 6).astype('int8')
    everdf = everdf.map_partitions(cudf.DataFrame.drop,'current_loan_delinquency_status')
    return everdf

def create_delinq_features(gdf, **kwargs):
    gdf['monthly_reporting_period'] = gdf['monthly_reporting_period'].astype("datetime64[ns]")
    delinq_30 = gdf.query('current_loan_delinquency_status >= 1').groupby('loan_id').monthly_reporting_period.min().to_frame()
    delinq_30['delinquency_30'] = delinq_30['monthly_reporting_period']
    delinq_30 = delinq_30.map_partitions(cudf.DataFrame.drop,'monthly_reporting_period')
    
    delinq_90 = gdf.query('current_loan_delinquency_status >= 3').groupby('loan_id').monthly_reporting_period.min().to_frame()
    delinq_90['delinquency_90'] = delinq_90['monthly_reporting_period']
    delinq_90 = delinq_90.map_partitions(cudf.DataFrame.drop,'monthly_reporting_period')
    
    delinq_180 = gdf.query('current_loan_delinquency_status >= 6').groupby('loan_id').monthly_reporting_period.min().to_frame()
    delinq_180['delinquency_180'] = delinq_180['monthly_reporting_period']
    delinq_180 = delinq_180.map_partitions(cudf.DataFrame.drop,'monthly_reporting_period')
    delinq_merge = delinq_30.merge(delinq_90, how='left', left_index=True, right_index= True)
    #delinq_merge['delinquency_90'] = delinq_merge['delinquency_90'].fillna(np.datetime64('1970-01-01').astype('datetime64[ms]'))
    delinq_merge = delinq_merge.merge(delinq_180, how='left', left_index=True, right_index= True)
    #delinq_merge['delinquency_180'] = delinq_merge['delinquency_180'].fillna(np.datetime64('1970-01-01').astype('datetime64[ms]'))
    del(delinq_30)
    del(delinq_90)
    del(delinq_180)
    return delinq_merge

def create_12_mon_features(joined_df, **kwargs):
    testdfs = []
    n_months = 12
    for y in range(1, n_months + 1):
        tmpdf = joined_df[['loan_id', 'timestamp_year', 'timestamp_month', 'delinquency_12', 'upb_12']]
        tmpdf['josh_months'] = tmpdf['timestamp_year'] * 12 + tmpdf['timestamp_month']
        tmpdf['josh_mody_n'] = ((tmpdf['josh_months'].astype('float64') - 24000 - y) // 12)
        tmpdf = tmpdf.groupby(['loan_id', 'josh_mody_n']).agg({'delinquency_12': 'max','upb_12': 'min'})
        tmpdf = tmpdf.map_partitions(cudf.DataFrame.reset_index)
        tmpdf['delinquency_12'] = (tmpdf['delinquency_12']>3).astype('int32')
        tmpdf['delinquency_12'] +=(tmpdf['upb_12']==0).astype('int32')
        tmpdf['timestamp_year'] = (((tmpdf['josh_mody_n'] * n_months) + 24000 + (y - 1)) // 12).astype('int16')
        tmpdf['timestamp_month'] = np.int8(y)
        tmpdf = tmpdf.map_partitions(cudf.DataFrame.drop,'josh_mody_n')
        testdfs.append(tmpdf)
        del(tmpdf)
    
    return dask.dataframe.concat(testdfs)

In [None]:
def create_joined_df(perf_df, ever_df, **kwargs):
    temp = perf_df[['loan_id', 'monthly_reporting_period', 'current_loan_delinquency_status', 'current_actual_upb']]
    temp['timestamp'] = temp['monthly_reporting_period']
    temp = temp.map_partitions(cudf.DataFrame.drop,'monthly_reporting_period')
    temp['timestamp_month'] = temp['timestamp'].dt.month
    temp['timestamp_year'] = temp['timestamp'].dt.year
    temp['delinquency_12'] = temp['current_loan_delinquency_status']
    temp = temp.map_partitions(cudf.DataFrame.drop,'current_loan_delinquency_status')
    temp['upb_12'] = temp['current_actual_upb']
    temp.map_partitions(cudf.DataFrame.drop,'current_actual_upb')
    #test['upb_12'] = test['upb_12'].fillna(999999999)
    #test['delinquency_12'] = test['delinquency_12'].fillna(-1)
    ever_df = ever_df.map_partitions(cudf.DataFrame.reset_index)
    joined_df = temp.merge(ever_df, how='left', on='loan_id')
    
    #joined_df['ever_30'] = joined_df['ever_30'].fillna(-1)
    #joined_df['ever_90'] = joined_df['ever_90'].fillna(-1)
    #joined_df['ever_180'] = joined_df['ever_180'].fillna(-1)
    #joined_df['delinquency_30'] = joined_df['delinquency_30'].fillna(-1)
    #joined_df['delinquency_90'] = joined_df['delinquency_90'].fillna(-1)
    #joined_df['delinquency_180'] = joined_df['delinquency_180'].fillna(-1)
    
    joined_df['timestamp_year'] = joined_df['timestamp_year'].astype('int32')
    joined_df['timestamp_month'] = joined_df['timestamp_month'].astype('int32')
    
    return joined_df

def combine_joined_12_mon(joined_df, testdf, **kwargs):
    joined_df = joined_df.map_partitions(cudf.DataFrame.drop, ['delinquency_12','upb_12'])
    joined_df['timestamp_year'] = joined_df['timestamp_year'].astype('int16')
    joined_df['timestamp_month'] = joined_df['timestamp_month'].astype('int8')
    joined_df = joined_df.merge(testdf, how='left', on=['loan_id', 'timestamp_year', 'timestamp_month'])
    return joined_df

def final_performance_delinquency(joined_df, perf_df):
    perf_df['timestamp_month'] = perf_df['monthly_reporting_period'].dt.month
    perf_df['timestamp_month'] = perf_df['timestamp_month'].astype('int8')
    perf_df['timestamp_year'] = perf_df['monthly_reporting_period'].dt.year
    perf_df['timestamp_year'] = perf_df['timestamp_year'].astype('int16')
    #Workaround for issue rapidsai/cudf#2705
    perf_df['maturity_date'] = perf_df['maturity_date'].astype("datetime64[ns]")
    perf_df['zero_balance_effective_date'] = perf_df['zero_balance_effective_date'].astype("datetime64[ns]")
    perf_df['last_paid_installment_date'] = perf_df['last_paid_installment_date'].astype("datetime64[ns]")
    perf_df['foreclosed_after'] = perf_df['foreclosed_after'].astype("datetime64[ns]")
    perf_df['disposition_date'] = perf_df['disposition_date'].astype("datetime64[ns]")
    
    
    
    perf_df = perf_df.merge(joined_df, how='left', on=['loan_id', 'timestamp_year', 'timestamp_month'])
    perf_df.map_partitions(cudf.DataFrame.drop,'timestamp_year')
    perf_df.map_partitions(cudf.DataFrame.drop,'timestamp_month')
    return perf_df

In [None]:
names_df = load_names_df("file://"+col_names_path)
perf_df = load_perf_df("file://"+perf_data_path+"/*")
perf_df = perf_df.repartition(npartitions=5)
acq_df = load_acq_df("file://"+acq_data_path+"/*")
acq = perf_df.repartition(npartitions=5)

In [None]:
acq_df = acq_df.merge(names_df, how='left', on='seller_name')
acq_df = acq_df.map_partitions(cudf.DataFrame.drop,'seller_name')
acq_df = acq_df.rename(columns={'new':'seller_name'})
ever_df = create_ever_features(perf_df)
delinq_merged_df = create_delinq_features(perf_df)
ever_df = ever_df.merge(delinq_merged_df, how='left', left_index=True, right_index= True)
del(delinq_merged_df)
joined_df = create_joined_df(perf_df, ever_df)
joined_df_2 = create_12_mon_features(joined_df)
joined_df = combine_joined_12_mon(joined_df, joined_df_2)
del(joined_df_2)
perf_df_final = final_performance_delinquency(joined_df, perf_df)
del(joined_df)
acq_df['orig_date'] = acq_df['orig_date'].astype("datetime64[ns]")
acq_df['first_pay_date'] = acq_df['first_pay_date'].astype("datetime64[ns]")

df_final = perf_df_final.merge(acq_df, how='left', on=['loan_id'])


drop_list = [
        'loan_id', 'orig_date', 'first_pay_date', 'seller_name',
        'monthly_reporting_period', 'last_paid_installment_date', 'maturity_date', 'ever_30', 'ever_90', 'ever_180',
        'delinquency_30', 'delinquency_90', 'delinquency_180', 'upb_12',
        'zero_balance_effective_date','foreclosed_after', 'disposition_date','timestamp'
    ]
df_final = df_final.map_partitions(cudf.DataFrame.drop, drop_list)
df_arrow = df_final.map_partitions(cudf.DataFrame.to_arrow,preserve_index=False)

In [None]:
%time len(df_arrow)

In [None]:
client.run(finalize_rmm)

In [None]:
client.run(initialize_rmm_no_pool)

In [None]:
dxgb_gpu_params = {
    'nround':            100,
    'max_depth':         8,
    'max_leaves':        2**8,
    'alpha':             0.9,
    'eta':               0.1,
    'gamma':             0.1,
    'learning_rate':     0.1,
    'subsample':         1,
    'reg_lambda':        1,
    'scale_pos_weight':  2,
    'min_child_weight':  30,
    'tree_method':       'gpu_hist',
    'n_gpus':            1,
    'distributed_dask':  True,
    'loss':              'ls',
    'objective':         'gpu:reg:linear',
    'max_features':      'auto',
    'criterion':         'friedman_mse',
    'grow_policy':       'lossguide',
    'verbose':           True
}

In [None]:
gpu_dfs = [delayed(cudf.DataFrame.from_arrow)(gpu_df) for gpu_df in df_arrow]
wait(gpu_dfs)