In [None]:
import numpy as np
from sklearn.model_selection import train_test_split
# import dask_xgboost as dxgb_gpu
# import dask
# import dask_cudf
# from dask.delayed import delayed
# from dask.distributed import Client, wait
import xgboost as xgb
import cudf
from cudf.dataframe import DataFrame
from collections import OrderedDict
import gc
from glob import glob
import os
import pyblazing
import pandas as pd
import time
from chronometer import Chronometer

from pyblazing import FileSystemType, SchemaFrom, DriverType

def initialize_rmm_pool():
    from librmm_cffi import librmm_config as rmm_cfg

    rmm_cfg.use_pool_allocator = True
    # rmm_cfg.initial_pool_size = 2<<30 # set to 2GiB. Default is 1/2 total GPU memory

    rmm_cfg.initial_pool_size = 2 << 31

    import cudf
    return cudf._gdf.rmm_initialize()


def initialize_rmm_no_pool():
    from librmm_cffi import librmm_config as rmm_cfg

    rmm_cfg.use_pool_allocator = False
    import cudf
    return cudf._gdf.rmm_initialize()


def register_hdfs():
    print('*** Register a HDFS File System ***')
    fs_status = pyblazing.register_file_system(
        authority="35.203.181.221",
        type=FileSystemType.HDFS,
        root="/",
        params={
            "host": "35.203.181.221",
            "port": 54310,
            "user": "hadoop",
            "driverType": DriverType.LIBHDFS3,
            "kerberosTicket": ""
        }
    )
    print(fs_status)


def deregister_hdfs():
    fs_status = pyblazing.deregister_file_system(authority="35.203.181.221")
    print(fs_status)

def register_posix():
    print('*** Register a POSIX File System ***')
    fs_status = pyblazing.register_file_system(
        authority="mortgage",
        type=FileSystemType.POSIX,
        root="/"
    )
    print(fs_status)

def deregister_posix():
    fs_status = pyblazing.deregister_file_system(authority="mortgage")
    print(fs_status)

from libgdf_cffi import ffi, libgdf

def get_dtype_values(dtypes):
    values = []
    def gdf_type(type_name):
        dicc = {
            'str': libgdf.GDF_STRING,
            'date': libgdf.GDF_DATE64,
            'date64': libgdf.GDF_DATE64,
            'date32': libgdf.GDF_DATE32,
            'timestamp': libgdf.GDF_TIMESTAMP,
            'category': libgdf.GDF_CATEGORY,
            'float': libgdf.GDF_FLOAT32,
            'double': libgdf.GDF_FLOAT64,
            'float32': libgdf.GDF_FLOAT32,
            'float64': libgdf.GDF_FLOAT64,
            'short': libgdf.GDF_INT16,
            'long': libgdf.GDF_INT64,
            'int': libgdf.GDF_INT32,
            'int32': libgdf.GDF_INT32,
            'int64': libgdf.GDF_INT64,
        }
        if dicc.get(type_name):
            return dicc[type_name]
        return libgdf.GDF_INT64

    for key in dtypes:
        values.append( gdf_type(dtypes[key]))

    print('>>>> dtyps for', dtypes.values())
    print(values)
    return values

def get_type_schema(path):
    format = path.split('.')[-1]

    if format == 'parquet':
        return SchemaFrom.ParquetFile
    elif format == 'csv' or format == 'psv' or format.startswith("txt"):
        return SchemaFrom.CsvFile

def null_workaround(df, **kwargs):
    for column, data_type in df.dtypes.items():
        if str(data_type) == "category":
            df[column] = df[column].astype('int32').fillna(-1)
        if str(data_type) in ['int8', 'int16', 'int32', 'int64', 'float32', 'float64']:
            df[column] = df[column].fillna(-1)
    return df

def open_perf_table(table_ref):
    for key in table_ref.keys():
        sql = 'select * from main.%(table_name)s' % {"table_name": key.table_name}
        return pyblazing.run_query(sql, table_ref)

def run_gpu_workflow(quarter=1, year=2000, perf_file="", **kwargs):

    import time

    load_start_time = time.time()

    if use_hdfs:
        register_hdfs()
    else:
        register_posix()

    names = gpu_load_names()
    acq_gdf = gpu_load_acquisition_csv(acquisition_path=acq_data_path + "/Acquisition_"
                                                        + str(year) + "Q" + str(quarter) + ".txt")

    gdf = gpu_load_performance_csv(perf_file)

    load_end_time = time.time()

    etl_start_time = time.time()
    
    acq_gdf_results = merge_names(acq_gdf, names)

    everdf_results = create_ever_features(gdf)

    delinq_merge_results = create_delinq_features(gdf)

    new_everdf_results = join_ever_delinq_features(everdf_results.columns, delinq_merge_results.columns)

    joined_df_results = create_joined_df(gdf.columns, new_everdf_results.columns)
    del (new_everdf_results)

    testdf_results = create_12_mon_features_union_alt(joined_df_results.columns)
    # testdf_results = create_12_mon_features_union(joined_df_results.columns)
    
    testdf = testdf_results.columns
    new_joined_df_results = combine_joined_12_mon(joined_df_results.columns, testdf)
    del (testdf)
    del (joined_df_results)
    perf_df_results = final_performance_delinquency(gdf.columns, new_joined_df_results.columns)
    del (gdf)
    del (new_joined_df_results)

    final_gdf_results = join_perf_acq_gdfs(perf_df_results.columns, acq_gdf_results.columns)
    del (perf_df_results)
    del (acq_gdf_results)
    
    final_gdf = last_mile_cleaning(final_gdf_results.columns)

    if use_hdfs:
        deregister_hdfs()
    else:
        deregister_posix()

    etl_end_time = time.time()
    
    return [final_gdf, (load_end_time - load_start_time), (etl_end_time - etl_start_time)]




def gpu_load_performance_csv(performance_path, **kwargs):
    """ Loads performance data

    Returns
    -------
    GPU DataFrame
    """
    chronometer = Chronometer.makeStarted()

    cols = [
        "loan_id", "monthly_reporting_period", "servicer", "interest_rate", "current_actual_upb",
        "loan_age", "remaining_months_to_legal_maturity", "adj_remaining_months_to_maturity",
        "maturity_date", "msa", "current_loan_delinquency_status", "mod_flag", "zero_balance_code",
        "zero_balance_effective_date", "last_paid_installment_date", "foreclosed_after",
        "disposition_date", "foreclosure_costs", "prop_preservation_and_repair_costs",
        "asset_recovery_costs", "misc_holding_expenses", "holding_taxes", "net_sale_proceeds",
        "credit_enhancement_proceeds", "repurchase_make_whole_proceeds", "other_foreclosure_proceeds",
        "non_interest_bearing_upb", "principal_forgiveness_upb", "repurchase_make_whole_proceeds_flag",
        "foreclosure_principal_write_off_amount", "servicing_activity_indicator"
    ]

    dtypes = OrderedDict([
        ("loan_id", "int64"),
        ("monthly_reporting_period", "date"),
        ("servicer", "category"),
        ("interest_rate", "float64"),
        ("current_actual_upb", "float64"),
        ("loan_age", "float64"),
        ("remaining_months_to_legal_maturity", "float64"),
        ("adj_remaining_months_to_maturity", "float64"),
        ("maturity_date", "date"),
        ("msa", "float64"),
        ("current_loan_delinquency_status", "int32"),
        ("mod_flag", "category"),
        ("zero_balance_code", "category"),
        ("zero_balance_effective_date", "date"),
        ("last_paid_installment_date", "date"),
        ("foreclosed_after", "date"),
        ("disposition_date", "date"),
        ("foreclosure_costs", "float64"),
        ("prop_preservation_and_repair_costs", "float64"),
        ("asset_recovery_costs", "float64"),
        ("misc_holding_expenses", "float64"),
        ("holding_taxes", "float64"),
        ("net_sale_proceeds", "float64"),
        ("credit_enhancement_proceeds", "float64"),
        ("repurchase_make_whole_proceeds", "float64"),
        ("other_foreclosure_proceeds", "float64"),
        ("non_interest_bearing_upb", "float64"),
        ("principal_forgiveness_upb", "float64"),
        ("repurchase_make_whole_proceeds_flag", "category"),
        ("foreclosure_principal_write_off_amount", "float64"),
        ("servicing_activity_indicator", "category")
    ])
    print(performance_path)
    performance_table = pyblazing.create_table(table_name='perf', type=get_type_schema(performance_path), path=performance_path, delimiter='|', names=cols, dtypes=get_dtype_values(dtypes), skip_rows=1)
    Chronometer.show(chronometer, 'Read Performance CSV')
    return performance_table

def gpu_load_acquisition_csv(acquisition_path, **kwargs):
    """ Loads acquisition data

    Returns
    -------
    GPU DataFrame
    """
    chronometer = Chronometer.makeStarted()

    cols = [
        'loan_id', 'orig_channel', 'seller_name', 'orig_interest_rate', 'orig_upb', 'orig_loan_term',
        'orig_date', 'first_pay_date', 'orig_ltv', 'orig_cltv', 'num_borrowers', 'dti', 'borrower_credit_score',
        'first_home_buyer', 'loan_purpose', 'property_type', 'num_units', 'occupancy_status', 'property_state',
        'zip', 'mortgage_insurance_percent', 'product_type', 'coborrow_credit_score', 'mortgage_insurance_type',
        'relocation_mortgage_indicator'
    ]

    dtypes = OrderedDict([
        ("loan_id", "int64"),
        ("orig_channel", "category"),
        ("seller_name", "category"),
        ("orig_interest_rate", "float64"),
        ("orig_upb", "int64"),
        ("orig_loan_term", "int64"),
        ("orig_date", "date"),
        ("first_pay_date", "date"),
        ("orig_ltv", "float64"),
        ("orig_cltv", "float64"),
        ("num_borrowers", "float64"),
        ("dti", "float64"),
        ("borrower_credit_score", "float64"),
        ("first_home_buyer", "category"),
        ("loan_purpose", "category"),
        ("property_type", "category"),
        ("num_units", "int64"),
        ("occupancy_status", "category"),
        ("property_state", "category"),
        ("zip", "int64"),
        ("mortgage_insurance_percent", "float64"),
        ("product_type", "category"),
        ("coborrow_credit_score", "float64"),
        ("mortgage_insurance_type", "float64"),
        ("relocation_mortgage_indicator", "category")
    ])

    print(acquisition_path)

    acquisition_table = pyblazing.create_table(table_name='acq', type=get_type_schema(acquisition_path), path=acquisition_path, delimiter='|', names=cols, dtypes=get_dtype_values(dtypes), skip_rows=1)
    Chronometer.show(chronometer, 'Read Acquisition CSV')
    return acquisition_table

def gpu_load_names(**kwargs):
    """ Loads names used for renaming the banks

    Returns
    -------
    GPU DataFrame
    """
    chronometer = Chronometer.makeStarted()

    cols = [
        'seller_name', 'new_seller_name'
    ]

    dtypes = OrderedDict([
        ("seller_name", "category"),
        ("new_seller_name", "category"),
    ])

    names_table = pyblazing.create_table(table_name='names', type=get_type_schema(col_names_path), path=col_names_path, delimiter='|', names=cols, dtypes=get_dtype_values(dtypes), skip_rows=1)
    Chronometer.show(chronometer, 'Read Names CSV')
    return names_table


def merge_names(names_table, acq_table):
    chronometer = Chronometer.makeStarted()
    tables = {names_table.name: names_table.columns,
              acq_table.name:acq_table.columns}

    query = """SELECT loan_id, orig_channel, orig_interest_rate, orig_upb, orig_loan_term, 
        orig_date, first_pay_date, orig_ltv, orig_cltv, num_borrowers, dti, borrower_credit_score, 
        first_home_buyer, loan_purpose, property_type, num_units, occupancy_status, property_state,
        zip, mortgage_insurance_percent, product_type, coborrow_credit_score, mortgage_insurance_type, 
        relocation_mortgage_indicator, new_seller_name as seller_name 
        FROM main.acq as a LEFT OUTER JOIN main.names as n ON  a.seller_name = n.seller_name"""
    result = pyblazing.run_query(query, tables)
    Chronometer.show(chronometer, 'Create Acquisition (Merge Names)')
    return result


def merge_names_old_rapids(acq_gdf, names):
    acq_gdf = acq_gdf.merge(names, how='left', on=['seller_name'])
    acq_gdf.drop_column('seller_name')
    acq_gdf['seller_name'] = acq_gdf['new_seller_name']
    acq_gdf.drop_column('new_seller_name')

    return acq_gdf


def create_ever_features(table, **kwargs):
    chronometer = Chronometer.makeStarted()
    query = """SELECT loan_id,
        max(current_loan_delinquency_status) >= 1 as ever_30, 
        max(current_loan_delinquency_status) >= 3 as ever_90,
        max(current_loan_delinquency_status) >= 6 as ever_180
        FROM main.perf group by loan_id"""
    result = pyblazing.run_query(query, {table.name: table.columns})
    Chronometer.show(chronometer, 'Create Ever Features')
    return result


def create_ever_features_old_rapids(gdf, **kwargs):
    everdf = gdf[['loan_id', 'current_loan_delinquency_status']]
    everdf = everdf.groupby('loan_id', method='hash').max()
    del (gdf)
    everdf['ever_30'] = (everdf['max_current_loan_delinquency_status'] >= 1).astype('int8')
    everdf['ever_90'] = (everdf['max_current_loan_delinquency_status'] >= 3).astype('int8')
    everdf['ever_180'] = (everdf['max_current_loan_delinquency_status'] >= 6).astype('int8')
    everdf.drop_column('max_current_loan_delinquency_status')
    return everdf


def create_delinq_features(table, **kwargs):
    chronometer = Chronometer.makeStarted()
    query = """SELECT loan_id,
        min(monthly_reporting_period) as delinquency_30
        FROM main.perf where current_loan_delinquency_status >= 1 group by loan_id"""
    result_delinq_30 = pyblazing.run_query(query, {table.name: table.columns})
    
    query = """SELECT loan_id,
        min(monthly_reporting_period) as delinquency_90
        FROM main.perf where current_loan_delinquency_status >= 3 group by loan_id"""
    result_delinq_90 = pyblazing.run_query(query, {table.name: table.columns})
    
    query = """SELECT loan_id,
        min(monthly_reporting_period) as delinquency_180
        FROM main.perf where current_loan_delinquency_status >= 6 group by loan_id"""
    result_delinq_180 = pyblazing.run_query(query, {table.name: table.columns})
    
    

    new_tables = {"delinq_30": result_delinq_30.columns, "delinq_90": result_delinq_90.columns, "delinq_180": result_delinq_180.columns}
    query = """SELECT d30.loan_id, delinquency_30, COALESCE(delinquency_90, DATE '1970-01-01') as delinquency_90,
                COALESCE(delinquency_180, DATE '1970-01-01') as delinquency_180 FROM main.delinq_30 as d30
                LEFT OUTER JOIN main.delinq_90 as d90 ON d30.loan_id = d90.loan_id
                LEFT OUTER JOIN main.delinq_180 as d180 ON d30.loan_id = d180.loan_id"""
    result_merge = pyblazing.run_query(query, new_tables)
    Chronometer.show(chronometer, 'Create deliquency features')
    return result_merge


def create_delinq_features_old_rapids(gdf, **kwargs):
    delinq_gdf = gdf[['loan_id', 'monthly_reporting_period', 'current_loan_delinquency_status']]
    del (gdf)
    delinq_30 = delinq_gdf.query('current_loan_delinquency_status >= 1')[
        ['loan_id', 'monthly_reporting_period']].groupby('loan_id', method='hash').min()
    delinq_30['delinquency_30'] = delinq_30['min_monthly_reporting_period']
    delinq_30.drop_column('min_monthly_reporting_period')
    delinq_90 = delinq_gdf.query('current_loan_delinquency_status >= 3')[
        ['loan_id', 'monthly_reporting_period']].groupby('loan_id', method='hash').min()
    delinq_90['delinquency_90'] = delinq_90['min_monthly_reporting_period']
    delinq_90.drop_column('min_monthly_reporting_period')
    delinq_180 = delinq_gdf.query('current_loan_delinquency_status >= 6')[
        ['loan_id', 'monthly_reporting_period']].groupby('loan_id', method='hash').min()
    delinq_180['delinquency_180'] = delinq_180['min_monthly_reporting_period']
    delinq_180.drop_column('min_monthly_reporting_period')
    del (delinq_gdf)

    delinq_merge = delinq_30.merge(delinq_90, how='left', on=['loan_id'], type='hash')
    delinq_merge['delinquency_90'] = delinq_merge['delinquency_90'].fillna(
        np.dtype('datetime64[ms]').type('1970-01-01').astype('datetime64[ms]'))
    delinq_merge = delinq_merge.merge(delinq_180, how='left', on=['loan_id'], type='hash')
    delinq_merge['delinquency_180'] = delinq_merge['delinquency_180'].fillna(
        np.dtype('datetime64[ms]').type('1970-01-01').astype('datetime64[ms]'))
    del (delinq_30)
    del (delinq_90)
    del (delinq_180)
    return delinq_merge


# everdf_tmp has loan_id, ever_30, ever_90, ever_180
# delinq_merge has loan_id, delinquency_30, delinquency_90, delinquency_180
def join_ever_delinq_features(everdf_tmp, delinq_merge, **kwargs):
    chronometer = Chronometer.makeStarted()
    tables = {"everdf": everdf_tmp, "delinq": delinq_merge}
    query = """SELECT everdf.loan_id as loan_id, ever_30, ever_90, ever_180,
                  COALESCE(delinquency_30, DATE '1970-01-01') as delinquency_30,
                  COALESCE(delinquency_90, DATE '1970-01-01') as delinquency_90,
                  COALESCE(delinquency_180, DATE '1970-01-01') as delinquency_180 FROM main.everdf as everdf
                  LEFT OUTER JOIN main.delinq as delinq ON everdf.loan_id = delinq.loan_id"""
    result_merge = pyblazing.run_query(query, tables)
    Chronometer.show(chronometer, 'Create ever deliquency features')
    return result_merge


def join_ever_delinq_features_old_rapids(everdf_tmp, delinq_merge, **kwargs):
    everdf = everdf_tmp.merge(delinq_merge, on=['loan_id'], how='left', type='hash')
    del (everdf_tmp)
    del (delinq_merge)
    everdf['delinquency_30'] = everdf['delinquency_30'].fillna(
        np.dtype('datetime64[ms]').type('1970-01-01').astype('datetime64[ms]'))
    everdf['delinquency_90'] = everdf['delinquency_90'].fillna(
        np.dtype('datetime64[ms]').type('1970-01-01').astype('datetime64[ms]'))
    everdf['delinquency_180'] = everdf['delinquency_180'].fillna(
        np.dtype('datetime64[ms]').type('1970-01-01').astype('datetime64[ms]'))
    return everdf


def create_joined_df(gdf, everdf, **kwargs):
    chronometer = Chronometer.makeStarted()
    tables = {"perf": gdf, "everdf": everdf}
    
    query = """SELECT perf.loan_id as loan_id, 
                perf.monthly_reporting_period as mrp_timestamp,
                EXTRACT(MONTH FROM perf.monthly_reporting_period) as timestamp_month,
                EXTRACT(YEAR FROM perf.monthly_reporting_period) as timestamp_year,
                COALESCE(perf.current_loan_delinquency_status, -1) as delinquency_12,
                COALESCE(perf.current_actual_upb, 999999999.9) as upb_12,
                everdf.ever_30 as ever_30,
                everdf.ever_90 as ever_90, 
                everdf.ever_180 as ever_180, 
                COALESCE(everdf.delinquency_30, DATE '1970-01-01') as delinquency_30, 
                COALESCE(everdf.delinquency_90, DATE '1970-01-01') as delinquency_90, 
                COALESCE(everdf.delinquency_180, DATE '1970-01-01') as delinquency_180
                FROM main.perf as perf 
                LEFT OUTER JOIN main.everdf as everdf ON perf.loan_id = everdf.loan_id"""

    results = pyblazing.run_query(query, tables)
    Chronometer.show(chronometer, 'Create Joined DF')
    return results


def create_joined_df_old_rapids(gdf, everdf, **kwargs):
    test = gdf[['loan_id', 'monthly_reporting_period', 'current_loan_delinquency_status', 'current_actual_upb']]
    del (gdf)
    test['timestamp'] = test['monthly_reporting_period']
    test.drop_column('monthly_reporting_period')
    test['timestamp_month'] = test['timestamp'].dt.month
    test['timestamp_year'] = test['timestamp'].dt.year
    test['delinquency_12'] = test['current_loan_delinquency_status']
    test.drop_column('current_loan_delinquency_status')
    test['upb_12'] = test['current_actual_upb']
    test.drop_column('current_actual_upb')
    test['upb_12'] = test['upb_12'].fillna(999999999)
    test['delinquency_12'] = test['delinquency_12'].fillna(-1)

    joined_df = test.merge(everdf, how='left', on=['loan_id'], type='hash')
    del (everdf)
    del (test)

    joined_df['ever_30'] = joined_df['ever_30'].fillna(-1)
    joined_df['ever_90'] = joined_df['ever_90'].fillna(-1)
    joined_df['ever_180'] = joined_df['ever_180'].fillna(-1)
    joined_df['delinquency_30'] = joined_df['delinquency_30'].fillna(-1)
    joined_df['delinquency_90'] = joined_df['delinquency_90'].fillna(-1)
    joined_df['delinquency_180'] = joined_df['delinquency_180'].fillna(-1)

    joined_df['timestamp_year'] = joined_df['timestamp_year'].astype('int32')
    joined_df['timestamp_month'] = joined_df['timestamp_month'].astype('int32')

    return joined_df


# joined_df has loan_id, mrp_timestamp, timestamp_month, timestamp_year, delinquency_12, upb_12, ever_30, ever_90, ever_180, delinquency_30, delinquency_90, delinquency_180
def create_12_mon_features(joined_df, **kwargs):
    chronometer = Chronometer.makeStarted()
    all_results = []
    testdfs = []
    tables = {"joined_df": joined_df}
    n_months = 12
    for y in range(1, n_months + 1):
        josh_mody_n_str = "floor((timestamp_year * 12 + timestamp_month - 24000.0 - " + str(y) + ")/12.0)"
        query = "SELECT loan_id, " + josh_mody_n_str + " as josh_mody_n, max(delinquency_12) > 3 as max_d12_gt3, min(upb_12) = 0 as min_upb_12_eq0, min(upb_12) as upb_12  FROM main.joined_df as joined_df GROUP BY loan_id, " + josh_mody_n_str
        temp = pyblazing.run_query(query, tables)
    
        tables2 = {"temp": temp.columns}
        query = "SELECT loan_id, max_d12_gt3 + min_upb_12_eq0 as delinquency_12, upb_12, floor(((josh_mody_n * 12) + " + str(
            24000 + (y - 1)) + ")/12) as timestamp_year, josh_mody_n * 0 + " + str(
            y) + " as timestamp_month from main.temp"
        results = pyblazing.run_query(query, tables2)
        all_results.append(results)  # this is to keep them in scope
        tmpdf = results.columns
        testdfs.append(tmpdf)

    result = cudf.concat(testdfs)
    Chronometer.show(chronometer, 'Create 12 month features')
    return result

    # joined_df has loan_id, mrp_timestamp, timestamp_month, timestamp_year, delinquency_12, upb_12, ever_30, ever_90, ever_180, delinquency_30, delinquency_90, delinquency_180
def create_12_mon_features_union(joined_df, **kwargs):
    chronometer = Chronometer.makeStarted()
    all_results = []
    all_temps = []
    tables = {"joined_df": joined_df}
    n_months = 12
    for y in range(1, n_months + 1):
        josh_mody_n_str = "floor((timestamp_year * 12 + timestamp_month - 24000.0 - " + str(y) + ")/12.0)"
        query = "SELECT loan_id, " + josh_mody_n_str + " as josh_mody_n, max(delinquency_12) > 3 as max_d12_gt3, min(upb_12) = 0 as min_upb_12_eq0, min(upb_12) as upb_12  FROM main.joined_df as joined_df GROUP BY loan_id, " + josh_mody_n_str
        temp = pyblazing.run_query(query, tables)
        all_temps.append(temp)
  
    y = 1
    tables2 = {"temp1": all_temps[0].columns}
    union_query = "(SELECT loan_id, max_d12_gt3 + min_upb_12_eq0 as delinquency_12, upb_12, floor(((josh_mody_n * 12) + " + str(
            24000 + (y - 1)) + ")/12) as timestamp_year, josh_mody_n * 0 + " + str(
            y) + " as timestamp_month from main.temp" + str(y) + ")"
    for y in range(2, n_months + 1):
        tables2["temp" + str(y)] = all_temps[y-1].columns
        query = " UNION ALL (SELECT loan_id, max_d12_gt3 + min_upb_12_eq0 as delinquency_12, upb_12, floor(((josh_mody_n * 12) + " + str(
            24000 + (y - 1)) + ")/12) as timestamp_year, josh_mody_n * 0 + " + str(
            y) + " as timestamp_month from main.temp" + str(y) + ")"
        union_query = union_query + query

    results = pyblazing.run_query(union_query, tables2)
    Chronometer.show(chronometer, 'Create 12 month features union')

    return results

        # joined_df has loan_id, mrp_timestamp, timestamp_month, timestamp_year, delinquency_12, upb_12, ever_30, ever_90, ever_180, delinquency_30, delinquency_90, delinquency_180
def create_12_mon_features_union_alt(joined_df, **kwargs):
    chronometer = Chronometer.makeStarted()
    tables = {"joined_df": joined_df}
    josh_mody_n_str = "timestamp_year * 12 + timestamp_month - 24000.0"
    query = "SELECT loan_id, " + josh_mody_n_str + " as josh_mody_n, max(delinquency_12) as max_d12, min(upb_12) as min_upb_12  FROM main.joined_df as joined_df GROUP BY loan_id, " + josh_mody_n_str
    mastertemp = pyblazing.run_query(query, tables)
    
    all_temps = []
    all_tokens = []
    tables = {"joined_df": mastertemp.columns}
    n_months = 12
    
    for y in range(1, n_months + 1):
        josh_mody_n_str = "floor((josh_mody_n - " + str(y) + ")/12.0)"
        query = "SELECT loan_id, " + josh_mody_n_str + " as josh_mody_n, max(max_d12) > 3 as max_d12_gt3, min(min_upb_12) = 0 as min_upb_12_eq0, min(min_upb_12) as upb_12  FROM main.joined_df as joined_df GROUP BY loan_id, " + josh_mody_n_str
        
        metaToken = pyblazing.run_query_get_token(query, tables)
        all_tokens.append(metaToken)
        
    for metaToken in all_tokens:
        temp = pyblazing.run_query_get_results(metaToken)
        all_temps.append(temp)
    
    y = 1
    tables2 = {"temp1": all_temps[0].columns}
    union_query = "(SELECT loan_id, max_d12_gt3 + min_upb_12_eq0 as delinquency_12, upb_12, floor(((josh_mody_n * 12) + " + str(
            24000 + (y - 1)) + ")/12) as timestamp_year, josh_mody_n * 0 + " + str(
            y) + " as timestamp_month from main.temp" + str(y) + ")"
    for y in range(2, n_months + 1):
        tables2["temp" + str(y)] = all_temps[y-1].columns
        query = " UNION ALL (SELECT loan_id, max_d12_gt3 + min_upb_12_eq0 as delinquency_12, upb_12, floor(((josh_mody_n * 12) + " + str(
            24000 + (y - 1)) + ")/12) as timestamp_year, josh_mody_n * 0 + " + str(
            y) + " as timestamp_month from main.temp" + str(y) + ")"
        union_query = union_query + query

    results = pyblazing.run_query(union_query, tables2)
    Chronometer.show(chronometer, 'Create 12 month features once')
    return results


def create_12_mon_features_old_rapids(joined_df, **kwargs):
    testdfs = []
    n_months = 12
    for y in range(1, n_months + 1):
        tmpdf = joined_df[['loan_id', 'timestamp_year', 'timestamp_month', 'delinquency_12', 'upb_12']]
        print(tmpdf.dtypes)
        tmpdf['josh_months'] = tmpdf['timestamp_year'] * 12 + tmpdf['timestamp_month']
        tmpdf['josh_mody_n'] = ((tmpdf['josh_months'].astype('float64') - 24000 - y) / 12).floor()
        tmpdf = tmpdf.groupby(['loan_id', 'josh_mody_n'], method='hash').agg({'delinquency_12': 'max', 'upb_12': 'min'})
        tmpdf['delinquency_12'] = (tmpdf['max_delinquency_12'] > 3).astype('int32')
        tmpdf['delinquency_12'] += (tmpdf['min_upb_12'] == 0).astype('int32')
        tmpdf.drop_column('max_delinquency_12')
        tmpdf['upb_12'] = tmpdf['min_upb_12']
        tmpdf.drop_column('min_upb_12')
        tmpdf['timestamp_year'] = (((tmpdf['josh_mody_n'] * n_months) + 24000 + (y - 1)) / 12).floor().astype('int16')
        tmpdf['timestamp_month'] = np.int8(y)
        tmpdf.drop_column('josh_mody_n')
        testdfs.append(tmpdf)
        del (tmpdf)
    del (joined_df)

    return cudf.concat(testdfs)


# joined_df has loan_id, mrp_timestamp, timestamp_month, timestamp_year, delinquency_12, upb_12, ever_30, ever_90, ever_180, delinquency_30, delinquency_90, delinquency_180
# testdf has loan_id delinquency_12 upb_12 timestamp_year timestamp_month
def combine_joined_12_mon(joined_df, testdf, **kwargs):
    chronometer = Chronometer.makeStarted()
    tables = {"joined_df": joined_df, "testdf": testdf}
    query = """SELECT j.loan_id, j.mrp_timestamp, j.timestamp_month, j.timestamp_year, 
                j.ever_30, j.ever_90, j.ever_180, j.delinquency_30, j.delinquency_90, j.delinquency_180,
                t.delinquency_12, t.upb_12 
                FROM main.joined_df as j LEFT OUTER JOIN main.testdf as t 
                ON j.loan_id = t.loan_id and j.timestamp_year = t.timestamp_year and j.timestamp_month = t.timestamp_month"""
    results = pyblazing.run_query(query, tables)
    Chronometer.show(chronometer, 'Combine joind 12 month')
    return results


def combine_joined_12_mon_old_rapids(joined_df, testdf, **kwargs):
    joined_df.drop_column('delinquency_12')
    joined_df.drop_column('upb_12')
    joined_df['timestamp_year'] = joined_df['timestamp_year'].astype('int16')
    joined_df['timestamp_month'] = joined_df['timestamp_month'].astype('int8')
    return joined_df.merge(testdf, how='left', on=['loan_id', 'timestamp_year', 'timestamp_month'], type='hash')


# joined_df has loan_id, mrp_timestamp, timestamp_month, timestamp_year, ever_30, ever_90, ever_180, delinquency_30, delinquency_90, delinquency_180, delinquency_12, upb_12
def final_performance_delinquency(gdf, joined_df, **kwargs):
    chronometer = Chronometer.makeStarted()
    tables = {"gdf": gdf, "joined_df": joined_df}
    query = """SELECT g.loan_id, current_actual_upb, current_loan_delinquency_status, delinquency_12, interest_rate, loan_age, mod_flag, msa, non_interest_bearing_upb 
        FROM main.gdf as g LEFT OUTER JOIN main.joined_df as j
        ON g.loan_id = j.loan_id and EXTRACT(YEAR FROM g.monthly_reporting_period) = j.timestamp_year and EXTRACT(MONTH FROM g.monthly_reporting_period) = j.timestamp_month """
    results = pyblazing.run_query(query, tables)
    Chronometer.show(chronometer, 'Final performance delinquency')
    return results


def final_performance_delinquency_old_rapids(gdf, joined_df, **kwargs):
    merged = null_workaround(gdf)
    joined_df = null_workaround(joined_df)
    merged['timestamp_month'] = merged['monthly_reporting_period'].dt.month
    merged['timestamp_month'] = merged['timestamp_month'].astype('int8')
    merged['timestamp_year'] = merged['monthly_reporting_period'].dt.year
    merged['timestamp_year'] = merged['timestamp_year'].astype('int16')
    merged = merged.merge(joined_df, how='left', on=['loan_id', 'timestamp_year', 'timestamp_month'], type='hash')
    merged.drop_column('timestamp_year')
    merged.drop_column('timestamp_month')
    return merged


# from perf we need: current_actual_upb, current_loan_delinquency_status, delinquency_12, interest_rate, loan_age, mod_flag, msa, non_interest_bearing_upb

# from acq we need: borrower_credit_score, dti, first_home_buyer, loan_purpose, mortgage_insurance_percent, num_borrowers, num_units, occupancy_status, 
# orig_channel, orig_cltv, orig_date, orig_interest_rate, orig_loan_term, orig_ltv, orig_upb, product_type, property_state, property_type, relocation_mortgage_indicator, seller_name, zip
def join_perf_acq_gdfs(perf, acq, **kwargs):
    chronometer = Chronometer.makeStarted()
    tables = {"perf": perf, "acq": acq}
    query = """SELECT p.loan_id, current_actual_upb, current_loan_delinquency_status, delinquency_12, interest_rate, loan_age, mod_flag, msa, non_interest_bearing_upb,
     borrower_credit_score, dti, first_home_buyer, loan_purpose, mortgage_insurance_percent, num_borrowers, num_units, occupancy_status, 
     orig_channel, orig_cltv, orig_date, orig_interest_rate, orig_loan_term, orig_ltv, orig_upb, product_type, property_state, property_type, 
     relocation_mortgage_indicator, seller_name, zip FROM main.perf as p LEFT OUTER JOIN main.acq as a ON p.loan_id = a.loan_id"""
    results = pyblazing.run_query(query, tables)
    Chronometer.show(chronometer, 'Join performance acquitistion gdfs')
    return results


def join_perf_acq_gdfs_old_rapids(perf, acq, **kwargs):
    perf = null_workaround(perf)
    acq = null_workaround(acq)
    return perf.merge(acq, how='left', on=['loan_id'], type='hash')


def last_mile_cleaning(df, **kwargs):
    # drop_list = [
    #     'loan_id', 'orig_date', 'first_pay_date', 'seller_name',
    #     'monthly_reporting_period', 'last_paid_installment_date', 'maturity_date', 'ever_30', 'ever_90', 'ever_180',
    #     'delinquency_30', 'delinquency_90', 'delinquency_180', 'upb_12',
    #     'zero_balance_effective_date','foreclosed_after', 'disposition_date','mrp_timestamp', 'loan_id0'
    # ]
    # for column in drop_list:
    #     df.drop_column(column)
    chronometer = Chronometer.makeStarted()
    for col, dtype in df.dtypes.iteritems():
        if str(dtype) == 'category':
            df[col] = df[col].cat.codes
        df[col] = df[col].astype('float32')
    df['delinquency_12'] = df['delinquency_12'] > 0
    df['delinquency_12'] = df['delinquency_12'].fillna(False).astype('int32')
    for column in df.columns:
        df[column] = df[column].fillna(-1)
    Chronometer.show(chronometer, 'Last mile cleaning')
    return df
  

use_hdfs = True

# to download data for this notebook, visit https://rapidsai.github.io/demos/datasets/mortgage-data and update the following paths accordingly

acq_data_path = ""
perf_data_path = ""
col_names_path = ""
if use_hdfs:
    acq_data_path = "hdfs://35.203.181.221/mortgage_2000-2001/acq"
    perf_data_path = "hdfs://35.203.181.221/mortgage_2000-2001/perf"
    col_names_path = "hdfs://35.203.181.221/mortgage_2000-2001/names.csv"
else:
    acq_data_path = "/home/william/repos/blazingsql-mortgage-pipeline/data/acq"
    perf_data_path = "/home/william/repos/blazingsql-mortgage-pipeline/data/perf"
    col_names_path = "/home/william/repos/blazingsql-mortgage-pipeline/data/names.csv"

start_year = 2000
end_year = 2000  # end_year is inclusive
start_quarter = 1
end_quarter = 1
part_count = 1  # the number of data files to train against

import time

# initialize_rmm_pool()

dxgb_gpu_params = {
    'nround':            100,
    'max_depth':         8,
    'max_leaves':        2**8,
    'alpha':             0.9,
    'eta':               0.1,
    'gamma':             0.1,
    'learning_rate':     0.1,
    'subsample':         1,
    'reg_lambda':        1,
    'scale_pos_weight':  2,
    'min_child_weight':  30,
    'tree_method':       'gpu_hist',
    'n_gpus':            1,
    'distributed_dask':  True,
    'loss':              'ls',
    'objective':         'reg:linear',
    'max_features':      'auto',
    'criterion':         'friedman_mse',
    'grow_policy':       'lossguide',
    # 'nthread', ncores[worker],  # WSM may want to set this
    'verbose':           True
}


def range1(start, end):
    return range(start, end+1)

def use_file_type_suffix(year, quarter):
    if year==2001 and quarter>=2:
        return True
    return False

def getChunks(year, quarter):
    if use_file_type_suffix(year, quarter):
        return range(0, 1+1)
    return range(0, 0+1)

final_cpu_df_label = None
final_cpu_df_data = None

all_load_times = []
all_etl_times = []
all_xgb_convert_times = []

for year in range1(start_year, end_year):
    for quarter in range1(start_quarter, end_quarter):
        for chunk in getChunks(year, quarter):
            chunk_sufix = "_{}".format(chunk) if use_file_type_suffix(year, quarter) else ""
            perf_file = perf_data_path + "/Performance_" + str(year) + "Q" + str(quarter) + ".txt" + chunk_sufix
        
            [gpu_df, load_time, etl_time] = run_gpu_workflow(quarter=quarter, year=year, perf_file=perf_file)
            all_load_times.append(load_time)
            all_etl_times.append(etl_time)

            xgb_convert_start_time = time.time()

            gpu_df = (gpu_df[['delinquency_12']], gpu_df[list(gpu_df.columns.difference(['delinquency_12']))])

            cpu_df_label = gpu_df[0].to_pandas()
            cpu_df_data = gpu_df[1].to_pandas()

            del (gpu_df)

            if year == start_year:
                final_cpu_df_label = cpu_df_label
                final_cpu_df_data = cpu_df_data
            else:
                final_cpu_df_label = pd.concat([final_cpu_df_label, cpu_df_label])
                final_cpu_df_data = pd.concat([final_cpu_df_data, cpu_df_data])

            xgb_convert_end_time = time.time()

            all_xgb_convert_times.append(xgb_convert_end_time - xgb_convert_start_time)

data_train, data_test, label_train, label_test = train_test_split(final_cpu_df_data, final_cpu_df_label, test_size=0.20, random_state=42)

xgdf_train = xgb.DMatrix(data_train, label_train)
xgdf_test = xgb.DMatrix(data_test, label_test)

# labels = None
# bst = dxgb_gpu.train(client, dxgb_gpu_params, gpu_dfs, labels, num_boost_round=dxgb_gpu_params['nround'])

# BEGIN predict 1
chronometerTrain1 = Chronometer.makeStarted()
startTime = time.time()
bst = xgb.train(dxgb_gpu_params, xgdf_train, num_boost_round=dxgb_gpu_params['nround'])
Chronometer.show(chronometerTrain1, 'Train 1')

chronometerPredict1 = Chronometer.makeStarted()
preds = bst.predict(xgdf_test)
Chronometer.show(chronometerPredict1, 'Predict 1')

labels = xgdf_test.get_label()
print('prediction error=%f' % (sum(1 for i in range(len(preds)) if int(preds[i] > 0.5) != labels[i]) / float(len(preds))))

endTime = time.time()
trainPredict_time = (endTime - startTime)

print("TIMES SUMMARY")
print('LOAD Time: %fs' % sum(all_load_times))
print('ETL Time: %fs' % sum(all_etl_times))
print('CONVERT Time: %fs' % sum(all_xgb_convert_times))
print('TRAIN/PREDICT Time: %fs' % trainPredict_time)


# END predict 1

# BEGIN predict 2
# def feval(preds, dtrain):
#     labels = dtrain.get_label()
#     error = float(sum(labels != (preds > 0.5))) / len(labels)
#     return 'bsql-error', error
# startTime = time.time()
# bst = xgb.train(dxgb_gpu_params, xgdf_train, dxgb_gpu_params['nround'], [(xgdf_test, 'eval'), (xgdf_train, 'train')], feval=feval)
# endTime = time.time()
# print(bst)
# chronometerPredict2 = Chronometer.makeStarted()
# preds = bst.predict(xgdf_test)
# Chronometer.show(chronometerPredict2, 'Predict 2')
# print(preds)
# # END predict 2
Chronometer.show_resume()
