# Reminder

Preprocess was not saved in MoverV3.2 code. There are 6 columns with different imputation between Mover_V3 and MoverV3.2

Here, only preprocessed data are used to retrain MoverV3.2 for legal requests by removing three columns:
    1. p_31_50_exp
    2. p_married_exp
    3. borrowerage_bucket

In [1]:
%load_ext sparkmagic.magics
from dsx_core_utils import proxy_util,dsxhi_util
proxy_util.configure_proxy_livy() 
dsxhi_util.list_livy_endpoints()

success configuring sparkmagic livy.
['https://qlawsbidlhe02a.ad.datalake.foc.zone:8445/gateway/dsx/livy2/v1', 'https://qlawsdl001038a.ad.datalake.foc.zone:8443/gateway/dsx/livy/v1']


In [2]:
%%spark config
{"executorCores": 2, "numExecutors": 10, "executorMemory": "15g", 
 "driverMemory": "12g", "proxyUser": "aliu-", "driverCores": 1, 
 "conf": {"spark.yarn.appMasterEnv.THEANO_FLAGS": "base_compiledir=${PWD}/.theano"}}

In [3]:
%spark add -s inactivePurchaseLeadGen -k -l python -u https://qlawsbidlhe02a.ad.datalake.foc.zone:8445/gateway/dsx/livy2/v1

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
22880,application_1590030838276_68049,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [4]:
%%spark

import pyspark
import os, sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.functions import lower
import pyspark.sql.functions as F
from pyspark.sql.functions import last_day, rand, dense_rank, last_day, col, size, length, when, upper, unix_timestamp, avg, substring, lower, udf, sum, count, lit, mean, concat, countDistinct, desc, from_unixtime, row_number, year, month, to_date, upper, months_between
from pyspark.sql import DataFrameStatFunctions as statFunc
from pyspark.sql.types import *
from pyspark.sql.window import Window
from functools import reduce

from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import GBTClassifier

from pyspark.ml.feature import VectorIndexer, VectorAssembler, StringIndexer, QuantileDiscretizer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

from os.path import expanduser, join, abspath
import time
import pandas as pd

spark = SparkSession.builder.getOrCreate()

In [5]:
%%spark

model_save_path = '/dev/projects/retention_models/purchase_payoff/models/'
training_data_path = '/dev/projects/retention_models/purchase_payoff/training/data/train/'
test_data_path = '/dev/projects/retention_models/purchase_payoff/training/data/test/'
result_path = '/dev/projects/retention_models/purchase_payoff/training/results/movermodel/'

monthly_prep_path = '/dev/projects/retention_models/monthly_snapshot/monthly_preprocessed/'
actual_path = '/dev/projects/retention_models/actual_value/'

# 1. Load Preprocess Data

### Data for Mover Purchase V3.2 and Refi V1.3

In [6]:
%%spark

df201612 = spark.read.parquet(monthly_prep_path + 'monthly_preprocessed_201612.parquet').drop('gcid')
df201703 = spark.read.parquet(monthly_prep_path + 'monthly_preprocessed_201703.parquet').drop('gcid')
df201706 = spark.read.parquet(monthly_prep_path + 'monthly_preprocessed_201706.parquet').drop('gcid')
df201709 = spark.read.parquet(monthly_prep_path + 'monthly_preprocessed_201709.parquet').drop('gcid')
df201712 = spark.read.parquet(monthly_prep_path + 'monthly_preprocessed_201712.parquet').drop('gcid')
df201803 = spark.read.parquet(monthly_prep_path + 'monthly_preprocessed_201803.parquet').drop('gcid')
df201806 = spark.read.parquet(monthly_prep_path + 'monthly_preprocessed_201806.parquet').drop('gcid')
df201809 = spark.read.parquet(monthly_prep_path + 'monthly_preprocessed_201809.parquet').drop('gcid')
df201812 = spark.read.parquet(monthly_prep_path + 'monthly_preprocessed_201812.parquet')
df201903 = spark.read.parquet(monthly_prep_path + 'monthly_preprocessed_201903.parquet')
df201906 = spark.read.parquet(monthly_prep_path + 'monthly_preprocessed_201906.parquet')
df201909 = spark.read.parquet(monthly_prep_path + 'monthly_preprocessed_201909.parquet')
df201912 = spark.read.parquet(monthly_prep_path + 'monthly_preprocessed_201912.parquet')
df202003 = spark.read.parquet(monthly_prep_path + 'monthly_preprocessed_202003.parquet')


df_preprocessed = df201612.union(df201703).union(df201706).union(df201709).union(df201712)\
                            .union(df201803).union(df201806).union(df201809).union(df201812)\
                            .union(df201903).union(df201906).union(df201909).union(df201912)\
                            .union(df202003)\
                            .drop('borrowerage_bucket','p_married_exp','p_31_50_exp')

In [7]:
%%spark

df_preprocessed.printSchema()

root
 |-- servicecalendardate: date (nullable = true)
 |-- ln_no: string (nullable = true)
 |-- og_note_dt: timestamp (nullable = true)
 |-- ct_age_exp: integer (nullable = true)
 |-- ct_1_exp: string (nullable = true)
 |-- ct_2_exp: string (nullable = true)
 |-- ct_3_exp: string (nullable = true)
 |-- home_value_exp: double (nullable = true)
 |-- p_edu_hs_exp: integer (nullable = true)
 |-- personnum_per_room_exp: double (nullable = true)
 |-- mosaic_group_refi_exp: string (nullable = true)
 |-- mosaic_group_pur_exp: string (nullable = true)
 |-- ratespread_min_exp: double (nullable = true)
 |-- ratespread_min_pur_exp: double (nullable = true)
 |-- ln_ann_int_rt: double (nullable = true)
 |-- loantypedescription_exp: string (nullable = true)
 |-- loanamortizationtype: string (nullable = true)
 |-- ageinmon_exp: double (nullable = true)
 |-- og_mtg_am_exp: double (nullable = true)
 |-- currentcltv_exp: double (nullable = true)
 |-- orig_fico_exp: integer (nullable = true)
 |-- LiveYear

### Actual Payoff Data

In [6]:
%%spark

actual = spark.read.parquet(actual_path + 'Actual_payoff_20200610.parquet')\
                .select('servicecalendardate', 'ln_no', 'purchasepayoff')

actual.show(1)

+-------------------+----------+--------------+
|servicecalendardate|     ln_no|purchasepayoff|
+-------------------+----------+--------------+
|         2016-10-31|3219834719|           0.0|
+-------------------+----------+--------------+
only showing top 1 row

In [23]:
%%spark

actual.groupBy('servicecalendardate').count().sort(col("servicecalendardate").desc()).show()

+-------------------+-------+
|servicecalendardate|  count|
+-------------------+-------+
|         2020-05-31|1889579|
|         2020-04-30|1899099|
|         2020-03-31|1867297|
|         2020-02-29|1846864|
|         2020-01-31|1824706|
|         2019-12-31|1802243|
|         2019-11-30|1807991|
|         2019-10-31|1787277|
|         2019-09-30|1763391|
|         2019-08-31|1856202|
|         2019-07-31|1839005|
|         2019-06-30|1822001|
|         2019-05-31|1806604|
|         2019-04-30|1788444|
|         2019-03-31|1770015|
|         2019-02-28|1752593|
|         2019-01-31|1738794|
|         2018-12-31|1726017|
|         2018-11-30|1713615|
|         2018-10-31|1699510|
+-------------------+-------+
only showing top 20 rows

In [19]:
%%spark

df202003.count()

1867297

### Join actual data with preprocessed data

Model is retrained in 2020-05, 2020-03 data cannot be used for not seasoned enough!!

In [9]:
%%spark

df_all = df_preprocessed.join(actual, on=['servicecalendardate', 'ln_no'], how='inner').dropDuplicates()

In [10]:
%%spark

df_all.groupby('servicecalendardate').agg(count('ln_no'), sum('purchasepayoff')).orderBy(col('servicecalendardate').desc()).show()

+-------------------+------------+-------------------+
|servicecalendardate|count(ln_no)|sum(purchasepayoff)|
+-------------------+------------+-------------------+
|         2020-03-31|     1867297|             4440.0|
|         2019-12-31|     1802243|            13866.0|
|         2019-09-30|     1763391|            14160.0|
|         2019-06-30|     1822001|            19544.0|
|         2018-12-31|     1726017|            14249.0|
|         2018-09-30|     1683583|            12477.0|
|         2018-06-30|     1619695|            17312.0|
|         2018-03-31|     1577779|            19761.0|
|         2017-12-31|     1530004|            12739.0|
|         2017-09-30|     1468441|            11685.0|
|         2017-06-30|     1423608|            15272.0|
|         2017-03-31|     1379094|            16819.0|
|         2016-12-31|     1326375|             9786.0|
+-------------------+------------+-------------------+

In [11]:
(11487/(1802243-11487))

0.0064146092488312195

# 2. Build Model

### Under-sample negative values

In [12]:
%%spark

# create fundtion for undersampling
def data_split(df, train_months, target, rate):
    
    ## Split Train/Validation From Test ## 
    train_validate = df.where(col('servicecalendardate').isin(train_months))
    
    ## Split Out Payoffs into Train/Validate ##
    train_refi_all = train_validate.filter(train_validate[target] == '1.')
    train_refi, validate_refi = train_refi_all.randomSplit([0.7, 0.3], seed=123)

    ## Split Out Non-Payoffs to Train/Validate
    train_nonrefi_all = train_validate.filter(train_validate[target] == '0.')
    train_nonrefi, validate_nonrefi = train_nonrefi_all.randomSplit([0.7, 0.3], seed=123)

    ## Undersample Non-Payoffs ## 
    train_non, notused1 = train_nonrefi.randomSplit([rate, 1-rate], seed=123)
    val_non, notused2 = validate_nonrefi.randomSplit([rate, 1-rate], seed=123)

    ## Create Final Training/Validate Sets
    train = train_refi.unionAll(train_non)
    validate = validate_refi.unionAll(val_non)
    train.cache()
    
    return train, validate

In [13]:
%%spark

train_months = ['2017-12-31', '2018-03-31', '2018-06-30', '2018-09-30', '2018-12-31', '2019-03-31', '2019-06-30']

target = 'purchasepayoff'
df_train, df_validate = data_split(df_all, train_months, target, 0.025)

In [14]:
%%spark

df_train.groupby(['servicecalendardate', 'purchasepayoff']).count().orderBy('servicecalendardate', 'purchasepayoff').show()

+-------------------+--------------+-----+
|servicecalendardate|purchasepayoff|count|
+-------------------+--------------+-----+
|         2017-12-31|           0.0|26450|
|         2017-12-31|           1.0| 8924|
|         2018-03-31|           0.0|27228|
|         2018-03-31|           1.0|13888|
|         2018-06-30|           0.0|27894|
|         2018-06-30|           1.0|12151|
|         2018-09-30|           0.0|29512|
|         2018-09-30|           1.0| 8783|
|         2018-12-31|           0.0|29996|
|         2018-12-31|           1.0|10015|
|         2019-06-30|           0.0|31876|
|         2019-06-30|           1.0|13483|
+-------------------+--------------+-----+

### Create Pipeline

In [15]:
%%spark

def create_pipeline(target, *arg):
    
    exclude_cols = ('servicecalendardate', 'ln_no', 'og_note_dt', 'mosaic_group_pur_exp', 'ratespread_min_pur_exp',
                'ln_purpose_type', 'investornameshort', 'ln_ann_int_rt', target) 
    cat_cols = [i[0] for i in df_train.dtypes if ((i[1]=='string') & (~i[0].endswith(exclude_cols)))]
    num_cols = [i[0] for i in df_train.dtypes if ((i[1].startswith(('int', 'double'))) & (~i[0].endswith(exclude_cols)))]
    
    stages = []
    
    for col in cat_cols:
        
        #Category indexing with StringIndexer
        indexer = StringIndexer(inputCol = col, outputCol = col+'_idx').setHandleInvalid('keep')
        stages += [indexer]
        
    #assemblerInputs = [c+'_vec' for c in cat_cols] + num_cols
    assemblerInputs = [c+'_idx' for c in cat_cols] + num_cols
    assembler = VectorAssembler(inputCols = assemblerInputs, outputCol = 'vectFeatures')
    
    stages += [assembler]
    
    lr = LogisticRegression(maxIter=100, regParam=0.1, elasticNetParam=0.0, fitIntercept = True,
                            featuresCol='vectFeatures', labelCol=target)
    rf = RandomForestClassifier(numTrees=250, maxDepth = 5, featuresCol='vectFeatures', labelCol=target)
    gbt = GBTClassifier(maxIter=100, featuresCol='vectFeatures', labelCol=target)

    pipeline_lr = Pipeline(stages = stages + [lr])
    pipeline_rf = Pipeline(stages = stages + [rf])
    pipeline_gbt = Pipeline(stages = stages + [gbt])
 
    if "lr" in arg and "rf" in arg and "gbt" in arg:
        return lr, rf, gbt, pipeline_lr, pipeline_rf, pipeline_gbt
    elif "lr" in arg and "rf" in arg:
        return lr, rf, pipeline_lr, pipeline_rf
    elif "lr" in arg and "gbt" in arg:
        return lr, gbt, pipeline_lr, pipeline_gbt 
    elif "rf" in arg and "gbt" in arg:
        return rf, gbt, pipeline_rf, pipeline_gbt
    elif "rf" in arg:
        return rf, pipeline_rf
    elif "lr" in arg:
        return lr, pipeline_lr
    elif "gbt" in arg:
        return gbt, pipeline_gbt
    else:
        return gbt, pipeline_gbt

In [16]:
%%spark

def fit_pipeline(pipeline, training_dataset):
    model_pipeline = pipeline.fit(training_dataset)
    return model_pipeline
    
def persist_modelpersist_(pipeline_model, model_name, model_save_path=model_save_path):
    pipeline_model.save(model_save_path + model_name)

In [17]:
%%spark

lr, rf, gbt, pipeline_lr, pipeline_rf, pipeline_gbt = create_pipeline(target, "lr", "rf", "gbt")

In [21]:
%%spark

# Logistic Regression
start = time.time()
model_pipeline_lr = fit_pipeline(pipeline_lr, df_train)
end = time.time()

print('Logistic Regression Training Time:', end - start)

Logistic Regression Training Time: 88.2570116519928

In [22]:
%%spark

# Random Forest
start = time.time()
model_pipeline_rf = fit_pipeline(pipeline_rf, df_train)
end = time.time()

print('Random Forest Training Time:', end - start)

Random Forest Training Time: 106.32498598098755

In [23]:
%%spark

# Gradient Boosting
start = time.time()
model_pipeline_gbt = fit_pipeline(pipeline_gbt, df_train)
end = time.time()

print('Gradient Boosting Training Time:', end - start)

Gradient Boosting Training Time: 1286.2877581119537

### Save Models

In [24]:
%%spark

persist_modelpersist_(model_pipeline_lr, 'MoverV3_3_LR_20200513')
persist_modelpersist_(model_pipeline_rf, 'MoverV3_3_RF_20200513')
persist_modelpersist_(model_pipeline_gbt, 'MoverV3_3_GBT_20200513')

### Read Models

In [7]:
%%spark

model_pipeline_lr = PipelineModel.load(path = model_save_path+'MoverV3_3_LR_20200513')
model_pipeline_rf = PipelineModel.load(path = model_save_path+'MoverV3_3_RF_20200513')
model_pipeline_gbt = PipelineModel.load(path = model_save_path+'MoverV3_3_GBT_20200513')

# 3. Variable Importance

In [26]:
%%spark
def extractFeatureImp(model_pipeline, df_train, featuresCol):
    
    featureImp = model_pipeline.stages[-1].featureImportances
    transformed = model_pipeline.transform(df_train)
    list_extract = []
    
    for i in transformed.schema[featuresCol].metadata["ml_attr"]["attrs"]:
        list_extract = list_extract + transformed.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
        
    varlist = pd.DataFrame(list_extract)
    varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
    
    return(varlist.sort_values('score', ascending = False))
    return varlist

In [27]:
%%spark

gbt_feature_importance = extractFeatureImp(model_pipeline_gbt, df_train, "vectFeatures")

with pd.option_context('display.max_rows', None, 'display.max_columns', None):
    print(gbt_feature_importance)

    idx                         name  \
5    13                 ageinmon_exp   
7    15              currentcltv_exp   
4    12           ratespread_min_exp   
10   18           LiveYears_long_exp   
6    14                og_mtg_am_exp   
0     8                   ct_age_exp   
8    16                orig_fico_exp   
17    3    mosaic_group_refi_exp_idx   
1     9               home_value_exp   
13   21                    ln_tr_exp   
20    6      ln_purpose_type_exp_idx   
3    11       personnum_per_room_exp   
9    17          LiveYears_short_exp   
21    7  og_occupy_stat_type_exp_idx   
16    2                 ct_3_exp_idx   
2    10                 p_edu_hs_exp   
19    5     loanamortizationtype_idx   
18    4  loantypedescription_exp_idx   
11   19            LiveYears_grp_exp   
14    0                 ct_1_exp_idx   
15    1                 ct_2_exp_idx   
12   20         issingleborrower_exp   

                                                 vals     score  
5            

# 4. ROC

### Predictions

In [8]:
%%spark

def predict(model_pipeline, data):
    
    _scoreUdf = udf(lambda v: float(v[1]), DoubleType())
    
    prediction = model_pipeline.transform(data)
    pred_df = prediction.withColumn('pred', _scoreUdf(prediction['probability']))
    return pred_df

### ROC

In [29]:
%%spark

def roc_auc(df, target):   
    
    pred_df_lr = predict(model_pipeline_lr, df)
    pred_df_rf = predict(model_pipeline_rf, df)
    pred_df_gbt = predict(model_pipeline_gbt, df)

    ### Calculate ROC ###  
    evalPred_lr = pred_df_lr.select(target, 'rawPrediction', 'prediction', 'probability')\
                                .withColumnRenamed(target, 'label')
    evalPred_rf = pred_df_rf.select(target, 'rawPrediction', 'prediction', 'probability')\
                                .withColumnRenamed(target, 'label')
    evalPred_gbt = pred_df_gbt.select(target, 'rawPrediction', 'prediction', 'probability')\
                                .withColumnRenamed(target, 'label')

    evaluatorLR = BinaryClassificationEvaluator()
    evaluatorRF = BinaryClassificationEvaluator()
    evaluatorGBT = BinaryClassificationEvaluator()

    print("Test Area Under ROC - LR: " + str(evaluatorLR.evaluate(evalPred_lr, {evaluatorLR.metricName: "areaUnderROC"})))        
    print("Test Area Under ROC - RF: " + str(evaluatorRF.evaluate(evalPred_rf, {evaluatorRF.metricName: "areaUnderROC"})))        
    print("Test Area Under ROC - GBT: " + str(evaluatorGBT.evaluate(evalPred_gbt, {evaluatorGBT.metricName: "areaUnderROC"})))
    print("\n")

In [30]:
%%spark

test_months = ['2017-09-30', '2019-09-30', '2019-12-31']
df_test = df_all.where(col('servicecalendardate').isin(test_months))

for df in [df_train, df_validate, df_test]:
    roc_auc(df, target)

Test Area Under ROC - LR: 0.6309517205196433
Test Area Under ROC - RF: 0.6587607092973101
Test Area Under ROC - GBT: 0.7061909966675342


Test Area Under ROC - LR: 0.6268778748195674
Test Area Under ROC - RF: 0.6546481944596021
Test Area Under ROC - GBT: 0.6878066684733987


Test Area Under ROC - LR: 0.6276730125751022
Test Area Under ROC - RF: 0.668378382185012
Test Area Under ROC - GBT: 0.6907329097345398

In [16]:
%%spark

test_months = ['2017-09-30', '2019-09-30', '2019-12-31']
df_test = df_all.where(col('servicecalendardate').isin(test_months))

# 5. Capture Rate

In [9]:
%%spark

def deciles(df, model, target):
    
    pred_df = predict(model, df)

    pred_df = QuantileDiscretizer(numBuckets=10, inputCol="pred", outputCol="decile", relativeError=0.00001,
                             handleInvalid="error").fit(pred_df).transform(pred_df)
    pred_df = pred_df.withColumn('decile', (10 - F.col('decile')).cast('int'))
    
    window_cumsum = Window.orderBy('decile').rangeBetween(Window.unboundedPreceding, 0)
    total_target = pred_df.select(F.sum(target)).collect()[0][0]
    df_out = pred_df\
        .groupBy('decile', )\
        .agg(F.count('ln_no').alias('decile_cnt'), F.sum(target).alias('payoff_cnt'))\
        .withColumn('cum_sum', F.sum('payoff_cnt').over(window_cumsum) / total_target)\
        .sort('decile')

    return df_out

In [10]:
%%spark

target = 'purchasepayoff'
date_label = test_months[2]
print(date_label)

name 'test_months' is not defined
Traceback (most recent call last):
NameError: name 'test_months' is not defined



In [41]:
%%spark
#print(date_label)
#deciles(df_all.filter(col('servicecalendardate') == date_label), model_pipeline_lr, target).show()

In [42]:
%%spark
#print(date_label)
#deciles(df_all.filter(col('servicecalendardate') == date_label), model_pipeline_rf, target).show()

In [43]:
%%spark
print(date_label)
deciles(df_all.filter(col('servicecalendardate') == date_label), model_pipeline_gbt, target).show()

2019-12-31
+------+----------+----------+-------------------+
|decile|decile_cnt|payoff_cnt|            cum_sum|
+------+----------+----------+-------------------+
|     1|    180219|    3510.0| 0.2531371700562527|
|     2|    180218|    2206.0|0.41223135727679217|
|     3|    180233|    1796.0| 0.5417568152315015|
|     4|    180213|    1473.0| 0.6479878840328862|
|     5|    180243|    1415.0|  0.750036059425934|
|     6|    180207|    1143.0| 0.8324679071109188|
|     7|    180232|     867.0| 0.8949949516803692|
|     8|    180221|     702.0| 0.9456223856916198|
|     9|    180198|     516.0| 0.9828357132554449|
|    10|    180259|     238.0|                1.0|
+------+----------+----------+-------------------+

In [44]:
%%spark

actual1 = spark.read.parquet(actual_path + 'Actual_payoff_20200505.parquet')\
                .select('servicecalendardate', 'ln_no', 'purchasepayoff')

df_all1 = df_preprocessed.join(actual1, on=['servicecalendardate', 'ln_no'], how='inner').dropDuplicates().where(col('servicecalendardate') == '2019-12-31')

In [45]:
%%spark

deciles(df_all1.filter(col('servicecalendardate') == date_label), model_pipeline_gbt, target).show()

+------+----------+----------+-------------------+
|decile|decile_cnt|payoff_cnt|            cum_sum|
+------+----------+----------+-------------------+
|     1|    180219|    2934.0|0.25541916949595195|
|     2|    180232|    1876.0| 0.4187342212936363|
|     3|    180219|    1461.0|  0.545921476451641|
|     4|    180229|    1214.0| 0.6516061634891617|
|     5|    180237|    1165.0| 0.7530251588752502|
|     6|    180201|     941.0| 0.8349438495690781|
|     7|    180207|     731.0| 0.8985810046139113|
|     8|    180225|     565.0| 0.9477670410028728|
|     9|    180212|     410.0| 0.9834595629842431|
|    10|    180262|     190.0|                1.0|
+------+----------+----------+-------------------+

# 6. Predictions

In [11]:
%%spark

def fullMonthPrediction(df):
    
    pred_df_lr = predict(model_pipeline_lr, df)
    validationPredictionsLR = pred_df_lr.select('ln_no', 'pred')\
                                            .withColumnRenamed('pred', 'logRegProb')
    
    pred_df_rf = predict(model_pipeline_rf, df)
    validationPredictionsRF = pred_df_rf.select('ln_no', 'pred')\
                                            .withColumnRenamed('pred', 'randForProb')
    
    pred_df_gbt = predict(model_pipeline_gbt, df)
    validationPredictionsGBT = pred_df_gbt.select('ln_no', 'pred')\
                                            .withColumnRenamed('pred', 'gbtProb')
    
    combinedFinalPred = validationPredictionsLR.join(validationPredictionsRF, on='ln_no', how='left')\
                                                .join(validationPredictionsGBT, on='ln_no', how='left')\
                                                .dropDuplicates()
    
    return combinedFinalPred

In [12]:
%%spark

def pred_save(df, servicedate, filename, result_path = result_path):
    
    df1 = df.where(col('servicecalendardate') == servicedate)
    df_pred = fullMonthPrediction(df1)
    
    df_pred.coalesce(1).write.csv(result_path + filename, header=True)

In [13]:
%%spark

def pred_save_newmonth(preprocessed_file, filename, monthly_prep_path=monthly_prep_path, result_path=result_path):
    
    df = spark.read.parquet(monthly_prep_path + preprocessed_file)
    df_pred = fullMonthPrediction(df)
    
    print(df.count())
    df_pred.coalesce(1).write.csv(result_path + filename, header=True)

In [29]:
%%spark

# 2017-9
pred_save(df_test.where(col('servicecalendardate') == '2017-09-30'), '2017-09-30', 'pred_MoverV3_3_sep17_20200514.csv')


# 2019-9
pred_save(df_test.where(col('servicecalendardate') == '2019-09-30'), '2019-09-30', 'pred_MoverV3_3_sep19_20200514.csv')

          
# 2019-12
pred_save(df_test.where(col('servicecalendardate') == '2019-12-31'), '2019-12-31', 'pred_MoverV3_3_dec19_20200514.csv')
       

In [30]:
%%spark

print(df_all.where(col('servicecalendardate') == '2017-09-30').count())
print(spark.read.csv(result_path + 'pred_MoverV3_3_sep17_20200514.csv', header = True).count())


print(df_all.where(col('servicecalendardate') == '2019-09-30').count())
print(spark.read.csv(result_path + 'pred_MoverV3_3_sep19_20200514.csv', header = True).count())


print(df_all.where(col('servicecalendardate') == '2019-12-31').count())
print(spark.read.csv(result_path + 'pred_MoverV3_3_dec19_20200514.csv', header = True).count())



1468441
1468441
1763391
1763391
1802243
1802243

In [14]:
%%spark

df202003 = spark.read.parquet(monthly_prep_path + 'monthly_preprocessed_202003.parquet')
df_preprocessed = df202003.drop('borrowerage_bucket','p_married_exp','p_31_50_exp')
df_all = df_preprocessed.join(actual, on=['servicecalendardate', 'ln_no'], how='inner').dropDuplicates()

pred_save(df_all.where(col('servicecalendardate') == '2020-03-31'), '2020-03-31', 'pred_MoverV3_3_mar20_20200514.csv')
print(spark.read.csv(result_path + 'pred_MoverV3_3_mar20_20200514.csv', header = True).count())

deciles(df_all.filter(col('servicecalendardate') == '2020-03-31'), model_pipeline_gbt, target).show()

1867297
+------+----------+----------+------------------+
|decile|decile_cnt|payoff_cnt|           cum_sum|
+------+----------+----------+------------------+
|     1|    186702|    1329.0|0.2993243243243243|
|     2|    186761|     693.0|0.4554054054054054|
|     3|    186723|     561.0|0.5817567567567568|
|     4|    186706|     490.0|0.6921171171171171|
|     5|    186748|     397.0|0.7815315315315315|
|     6|    186739|     314.0|0.8522522522522522|
|     7|    186684|     262.0|0.9112612612612613|
|     8|    186779|     192.0|0.9545045045045045|
|     9|    186715|     143.0|0.9867117117117117|
|    10|    186740|      59.0|               1.0|
+------+----------+----------+------------------+

In [15]:
%%spark

df_all.groupby('servicecalendardate').agg(count('ln_no'), sum(target)).orderBy(col('servicecalendardate').desc()).show()

+-------------------+------------+-------------------+
|servicecalendardate|count(ln_no)|sum(purchasepayoff)|
+-------------------+------------+-------------------+
|         2020-03-31|     1867297|             4440.0|
+-------------------+------------+-------------------+

In [16]:
%%spark

df202004 = spark.read.parquet(monthly_prep_path + 'monthly_preprocessed_202004.parquet')
df_preprocessed = df202004.drop('borrowerage_bucket','p_married_exp','p_31_50_exp')
df_all = df_preprocessed.join(actual, on=['servicecalendardate', 'ln_no'], how='inner').dropDuplicates()

pred_save(df_all.where(col('servicecalendardate') == '2020-04-30'), '2020-04-30', 'pred_MoverV3_3_apr20_20200514.csv')
print(spark.read.csv(result_path + 'pred_MoverV3_3_apr20_20200514.csv', header = True).count())

deciles(df_all.filter(col('servicecalendardate') == '2020-04-30'), model_pipeline_gbt, target).show()

1899099
+------+----------+----------+-------------------+
|decile|decile_cnt|payoff_cnt|            cum_sum|
+------+----------+----------+-------------------+
|     1|    189876|     222.0| 0.2971887550200803|
|     2|    189928|     118.0|0.45515394912985274|
|     3|    189893|      80.0| 0.5622489959839357|
|     4|    189933|      83.0| 0.6733601070950469|
|     5|    189926|      73.0| 0.7710843373493976|
|     6|    189900|      57.0| 0.8473895582329317|
|     7|    189901|      52.0| 0.9170013386880856|
|     8|    189915|      26.0| 0.9518072289156626|
|     9|    189918|      24.0| 0.9839357429718876|
|    10|    189909|      12.0|                1.0|
+------+----------+----------+-------------------+

In [17]:
%%spark

df_all.groupby('servicecalendardate').agg(count('ln_no'), sum(target)).orderBy(col('servicecalendardate').desc()).show()

+-------------------+------------+-------------------+
|servicecalendardate|count(ln_no)|sum(purchasepayoff)|
+-------------------+------------+-------------------+
|         2020-04-30|     1899099|              747.0|
+-------------------+------------+-------------------+

In [19]:
%%spark

df_all = spark.read.csv(result_path + 'pred_MoverV3_2_apr20_20200505.csv', header = True)

df = df_all.withColumn('pred', col('gbtProb').cast('double'))\
                .withColumn('servicecalendardate', lit('2020-04-30'))\
                .join(actual, on=['servicecalendardate', 'ln_no'], how='inner').dropDuplicates()
pred_df = QuantileDiscretizer(numBuckets=10, inputCol="pred", outputCol="decile", relativeError=0.00001,
                             handleInvalid="error").fit(df).transform(df)
pred_df = pred_df.withColumn('decile', (10 - F.col('decile')).cast('int'))
    
window_cumsum = Window.orderBy('decile').rangeBetween(Window.unboundedPreceding, 0)
total_target = pred_df.select(F.sum(target)).collect()[0][0]
df_out = pred_df\
        .groupBy('decile', )\
        .agg(F.count('ln_no').alias('decile_cnt'), F.sum(target).alias('payoff_cnt'))\
        .withColumn('cum_sum', F.sum('payoff_cnt').over(window_cumsum) / total_target)\
        .sort('decile')
df_out.show()

+------+----------+----------+------------------+
|decile|decile_cnt|payoff_cnt|           cum_sum|
+------+----------+----------+------------------+
|     1|    189895|     213.0| 0.285140562248996|
|     2|    189910|     125.0|0.4524765729585007|
|     3|    189916|      88.0| 0.570281124497992|
|     4|    189895|      83.0|0.6813922356091031|
|     5|    189886|      55.0|0.7550200803212851|
|     6|    189955|      63.0|0.8393574297188755|
|     7|    189914|      41.0|0.8942436412315931|
|     8|    189890|      39.0|0.9464524765729585|
|     9|    189912|      23.0|0.9772423025435074|
|    10|    189926|      17.0|               1.0|
+------+----------+----------+------------------+

In [20]:
%%spark

df202002 = spark.read.parquet(monthly_prep_path + 'monthly_preprocessed_202002.parquet')
df_preprocessed = df202002.drop('borrowerage_bucket','p_married_exp','p_31_50_exp')
df_all = df_preprocessed.join(actual, on=['servicecalendardate', 'ln_no'], how='inner').dropDuplicates()

pred_save(df_all.where(col('servicecalendardate') == '2020-02-29'), '2020-02-29', 'pred_MoverV3_3_feb20_20200514.csv')
print(spark.read.csv(result_path + 'pred_MoverV3_3_feb20_20200514.csv', header = True).count())

deciles(df_all.filter(col('servicecalendardate') == '2020-02-29'), model_pipeline_gbt, target).show()

1846864
+------+----------+----------+-------------------+
|decile|decile_cnt|payoff_cnt|            cum_sum|
+------+----------+----------+-------------------+
|     1|    184663|    2568.0|0.28028814669286184|
|     2|    184706|    1413.0|0.43451211525867717|
|     3|    184686|    1208.0| 0.5663610565378738|
|     4|    184684|    1000.0| 0.6755075311067452|
|     5|    184656|     823.0| 0.7653350796769265|
|     6|    184691|     699.0| 0.8416284654005676|
|     7|    184706|     571.0| 0.9039511023793931|
|     8|    184700|     438.0| 0.9517572582405588|
|     9|    184651|     320.0| 0.9866841301025977|
|    10|    184721|     122.0|                1.0|
+------+----------+----------+-------------------+

In [21]:
%%spark

df_all = spark.read.csv(result_path + 'pred_MoverV3_2_feb20_20200305.csv', header = True)

df = df_all.withColumn('pred', col('gbtProb').cast('double'))\
                .withColumn('servicecalendardate', lit('2020-02-29'))\
                .join(actual, on=['servicecalendardate', 'ln_no'], how='inner').dropDuplicates()
pred_df = QuantileDiscretizer(numBuckets=10, inputCol="pred", outputCol="decile", relativeError=0.00001,
                             handleInvalid="error").fit(df).transform(df)
pred_df = pred_df.withColumn('decile', (10 - F.col('decile')).cast('int'))
    
window_cumsum = Window.orderBy('decile').rangeBetween(Window.unboundedPreceding, 0)
total_target = pred_df.select(F.sum(target)).collect()[0][0]
df_out = pred_df\
        .groupBy('decile', )\
        .agg(F.count('ln_no').alias('decile_cnt'), F.sum(target).alias('payoff_cnt'))\
        .withColumn('cum_sum', F.sum('payoff_cnt').over(window_cumsum) / total_target)\
        .sort('decile')
df_out.show()

+------+----------+----------+------------------+
|decile|decile_cnt|payoff_cnt|           cum_sum|
+------+----------+----------+------------------+
|     1|    184669|    2299.0|0.2509277450338354|
|     2|    184688|    1441.0|0.4082078148875791|
|     3|    184695|    1177.0|0.5366732154551408|
|     4|    184675|    1021.0|0.6481117659899586|
|     5|    184706|     863.0|0.7423051735428946|
|     6|    184672|     710.0|0.8197991704867933|
|     7|    184693|     632.0|  0.88877974241432|
|     8|    184691|     504.0|0.9437895655970312|
|     9|    184654|     344.0| 0.981335952848723|
|    10|    184721|     171.0|               1.0|
+------+----------+----------+------------------+