# 0) Overview : Loan Prediction Based on Customer Behavior


- Predict who possible Defaulters are for the Consumer Loans Product.

An organization wants to predict who possible defaulters are for the consumer loans product. They have data about historic customer behavior based on what they have observed. Hence when they acquire new customers they want to predict who is riskier and who is not.

In [0]:
spark

## Import Packages

In [0]:
!pip install xgboost

#Transforming Data
from pyspark.ml import Pipeline, Transformer
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import mean,col,split, col,count, regexp_extract, when, lit, isnan, when, monotonically_increasing_id, round
from pyspark.sql.types import StringType, IntegerType, FloatType, DoubleType
from pyspark.ml.feature import OneHotEncoder, StandardScaler, StringIndexer, VectorAssembler

# Modeling
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder, TrainValidationSplit
from pyspark.ml.classification import LogisticRegression,DecisionTreeClassifier,RandomForestClassifier,GBTClassifier, LinearSVC
from xgboost.spark import SparkXGBClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

You should consider upgrading via the '/local_disk0/.ephemeral_nfs/envs/pythonEnv-00ff5091-6eea-4c73-ad9e-ad363b133245/bin/python -m pip install --upgrade pip' command.[0m


## Reading Data

In [0]:
df_train = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/hailim101@gmail.com/Training_Data.csv")
df_test = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/hailim101@gmail.com/Test_Data.csv")
df_result = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/hailim101@gmail.com/Sample_Prediction_Dataset.csv")

In [0]:
display(df_train)

Id,Income,Age,Experience,Married/Single,House_Ownership,Car_Ownership,Profession,CITY,STATE,CURRENT_JOB_YRS,CURRENT_HOUSE_YRS,Risk_Flag
1,1303834,23,3,single,rented,no,Mechanical_engineer,Rewa,Madhya_Pradesh,3,13,0
2,7574516,40,10,single,rented,no,Software_Developer,Parbhani,Maharashtra,9,13,0
3,3991815,66,4,married,rented,no,Technical_writer,Alappuzha,Kerala,4,10,0
4,6256451,41,2,single,rented,yes,Software_Developer,Bhubaneswar,Odisha,2,12,1
5,5768871,47,11,single,rented,no,Civil_servant,Tiruchirappalli[10],Tamil_Nadu,3,14,1
6,6915937,64,0,single,rented,no,Civil_servant,Jalgaon,Maharashtra,0,12,0
7,3954973,58,14,married,rented,no,Librarian,Tiruppur,Tamil_Nadu,8,12,0
8,1706172,33,2,single,rented,no,Economist,Jamnagar,Gujarat,2,14,0
9,7566849,24,17,single,rented,yes,Flight_attendant,Kota[6],Rajasthan,11,11,0
10,8964846,23,12,single,rented,no,Architect,Karimnagar,Telangana,5,13,0


In [0]:
#Checking Schema of our dataset
df_train.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Income: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Experience: string (nullable = true)
 |-- Married/Single: string (nullable = true)
 |-- House_Ownership: string (nullable = true)
 |-- Car_Ownership: string (nullable = true)
 |-- Profession: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- CURRENT_JOB_YRS: string (nullable = true)
 |-- CURRENT_HOUSE_YRS: string (nullable = true)
 |-- Risk_Flag: string (nullable = true)



In [0]:
#Checking Data size
train_count = df_train.count()
test_count = df_test.count()

print("Train data size: ", train_count, "rows")
print("Test data size: ", test_count, "rows")

Train data size:  252000 rows
Test data size:  28000 rows


In [0]:
# Checking statistic summary of the dataset
df_train.describe().show()

+-------+-----------------+-----------------+------------------+------------------+--------------+---------------+-------------+--------------------+-----------+--------------+------------------+------------------+------------------+
|summary|               Id|           Income|               Age|        Experience|Married/Single|House_Ownership|Car_Ownership|          Profession|       CITY|         STATE|   CURRENT_JOB_YRS| CURRENT_HOUSE_YRS|         Risk_Flag|
+-------+-----------------+-----------------+------------------+------------------+--------------+---------------+-------------+--------------------+-----------+--------------+------------------+------------------+------------------+
|  count|           252000|           252000|            252000|            252000|        252000|         252000|       252000|              252000|     252000|        252000|            252000|            252000|            252000|
|   mean|         126000.5|4997116.665325397| 49.95407142857143|

# 1) Exploratory Data Analysis (EDA)

In [0]:
# Checking proportion of the target value 
df_train.groupBy("Risk_Flag").count().show()
gropuBy_output = df_train.groupBy("Risk_Flag").count()
display(gropuBy_output)

+---------+------+
|Risk_Flag| count|
+---------+------+
|        0|221004|
|        1| 30996|
+---------+------+



Risk_Flag,count
0,221004
1,30996


Output can only be rendered in Databricks

Out of 252,000 in dataset, about 30,996 has risk.

In [0]:
# Checking effect of House ownership on Risk Flag
df_train.groupBy("House_Ownership","Risk_Flag").count().show()
gropuBy_output = df_train.groupBy("House_Ownership","Risk_Flag").count()
display(gropuBy_output)

+---------------+---------+------+
|House_Ownership|Risk_Flag| count|
+---------------+---------+------+
|         rented|        0|202777|
|   norent_noown|        1|   715|
|         rented|        1| 29121|
|          owned|        0| 11758|
|   norent_noown|        0|  6469|
|          owned|        1|  1160|
+---------------+---------+------+



House_Ownership,Risk_Flag,count
rented,0,202777
norent_noown,1,715
rented,1,29121
owned,0,11758
norent_noown,0,6469
owned,1,1160


Output can only be rendered in Databricks

In [0]:
# Checking effect of car ownership on Risk Flag
df_train.groupBy("Car_Ownership","Risk_Flag").count().show()
gropuBy_output = df_train.groupBy("Car_Ownership","Risk_Flag").count()
display(gropuBy_output)

+-------------+---------+------+
|Car_Ownership|Risk_Flag| count|
+-------------+---------+------+
|          yes|        0| 67565|
|           no|        0|153439|
|           no|        1| 22561|
|          yes|        1|  8435|
+-------------+---------+------+



Car_Ownership,Risk_Flag,count
yes,0,67565
no,0,153439
no,1,22561
yes,1,8435


Output can only be rendered in Databricks

In [0]:
# Checking effect of Marital Status on Risk Flag
df_train.groupBy("Married/Single","Risk_Flag").count().show()
gropuBy_output = df_train.groupBy("Married/Single","Risk_Flag").count()
display(gropuBy_output)

+--------------+---------+------+
|Married/Single|Risk_Flag| count|
+--------------+---------+------+
|        single|        1| 28360|
|       married|        0| 23092|
|       married|        1|  2636|
|        single|        0|197912|
+--------------+---------+------+



Married/Single,Risk_Flag,count
single,1,28360
married,0,23092
married,1,2636
single,0,197912


Output can only be rendered in Databricks

# 2) Transforming Data

## 2.1) Missing Value

In [0]:
# Checking missing value
# This function use to print feature with null values and null count 
def null_value_count(df):
    df.select([count(when(isnan(c) | col(c).isNull() | (col(c) == "null") | (col(c) == "unknown"),c,)).alias(c) for c in df.columns]).show()

# Calling function
null_value_count(df_train)

+---+------+---+----------+--------------+---------------+-------------+----------+----+-----+---------------+-----------------+---------+
| Id|Income|Age|Experience|Married/Single|House_Ownership|Car_Ownership|Profession|CITY|STATE|CURRENT_JOB_YRS|CURRENT_HOUSE_YRS|Risk_Flag|
+---+------+---+----------+--------------+---------------+-------------+----------+----+-----+---------------+-----------------+---------+
|  0|     0|  0|         0|             0|              0|            0|         0|   0|    0|              0|                0|        0|
+---+------+---+----------+--------------+---------------+-------------+----------+----+-----+---------------+-----------------+---------+



There are no missing value in the dataset!

## 2.2) Data Cleaning

### Step1: Cleaning some categorical data

In [0]:
# check unique values of city column
unique_val = df_train.select("CITY").distinct().collect()
print(f"unique values of city (Before): {len(unique_val)}")

dropDisDF = df_train.dropDuplicates(["CITY"]).select("CITY")
dropDisDF.show(truncate=False)

unique values of city (Before): 317
+-------------------+
|CITY               |
+-------------------+
|Anantapuram[24]    |
|Bhubaneswar        |
|Kota[6]            |
|Jalgaon            |
|Kamarhati          |
|Sirsa              |
|Hajipur[31]        |
|Adoni              |
|Rewa               |
|Kollam             |
|Tiruppur           |
|Tiruchirappalli[10]|
|Parbhani           |
|Amaravati          |
|Madurai            |
|Secunderabad       |
|Erode[17]          |
|Jamnagar           |
|Bhusawal           |
|Alappuzha          |
+-------------------+
only showing top 20 rows



In [0]:
# We need to clean city column by omitting [] behind the city name.

df_tmp = df_train.withColumn("CITY",regexp_extract(col("CITY"),"([A-Za-z]+)",1))

unique_val = df_tmp.select("CITY").distinct().collect()
print(f"unique values of city (After): {len(unique_val)}")

dropDisDF = df_tmp.dropDuplicates(["CITY"]).select("CITY")
dropDisDF.show(truncate=False)

unique values of city (After): 316
+---------------+
|CITY           |
+---------------+
|Bhubaneswar    |
|Jalgaon        |
|Kamarhati      |
|Sirsa          |
|Tiruchirappalli|
|Anantapuram    |
|Adoni          |
|Rewa           |
|Kollam         |
|Tiruppur       |
|Parbhani       |
|Amaravati      |
|Kota           |
|Hajipur        |
|Madurai        |
|Secunderabad   |
|Jamnagar       |
|Erode          |
|Bhusawal       |
|Alappuzha      |
+---------------+
only showing top 20 rows



In [0]:
# check unique values of state column
unique_val = df_train.select("STATE").distinct().collect()
print(f"unique values of state (Before): {len(unique_val)}")

dropDisDF = df_train.dropDuplicates(["STATE"]).select("STATE")
dropDisDF.show(truncate=False)

unique values of state (Before): 29
+----------------+
|STATE           |
+----------------+
|Karnataka       |
|Odisha          |
|Kerala          |
|Punjab          |
|Mizoram         |
|Andhra_Pradesh  |
|Puducherry      |
|Haryana         |
|Madhya_Pradesh  |
|Jharkhand       |
|Gujarat         |
|West_Bengal     |
|Tamil_Nadu      |
|Uttar_Pradesh   |
|Rajasthan       |
|Himachal_Pradesh|
|Maharashtra     |
|Telangana       |
|Bihar           |
|Tripura         |
+----------------+
only showing top 20 rows



In [0]:
# We need to clean state column by omitting [] behind the state name.

df_tmp = df_train.withColumn("STATE",regexp_extract(col("STATE"),"([A-Za-z]+)",1))

unique_val = df_tmp.select("STATE").distinct().collect()
print(f"unique values of state (After): {len(unique_val)}")

dropDisDF = df_tmp.dropDuplicates(["STATE"]).select("STATE")
dropDisDF.show(truncate=False)

unique values of state (After): 28
+-----------+
|STATE      |
+-----------+
|Madhya     |
|Karnataka  |
|Odisha     |
|Kerala     |
|Punjab     |
|Himachal   |
|Mizoram    |
|Puducherry |
|Haryana    |
|Tamil      |
|Jharkhand  |
|Gujarat    |
|Andhra     |
|Uttar      |
|Rajasthan  |
|West       |
|Maharashtra|
|Telangana  |
|Bihar      |
|Tripura    |
+-----------+
only showing top 20 rows



In [0]:
# Set the function for pipeline

class Convert_categorical_data(Transformer):
    def __init__(self):
        super(Convert_categorical_data, self).__init__()

    def _transform(self, df: DataFrame) -> DataFrame:
        df = df.withColumn("CITY",regexp_extract(col("CITY"),"([A-Za-z]+)",1))
        df = df.withColumn("STATE",regexp_extract(col("STATE"),"([A-Za-z]+)",1))
        return df
      
clean1 = Convert_categorical_data()

### Step2: Change data types of each columns

In [0]:
#Check data type of each columns
df_train.printSchema()
df_tmp = df_train.alias('df_train')

#Change data types of each columns
df_tmp= df_tmp.withColumn("Id",col("Id").cast(StringType()))
df_tmp= df_tmp.withColumn("Income",col("Income").cast(IntegerType()))
df_tmp= df_tmp.withColumn("Age",col("Age").cast(IntegerType()))
df_tmp= df_tmp.withColumn("Experience",col("Experience").cast(IntegerType()))
df_tmp= df_tmp.withColumn("Married/Single",col("Married/Single").cast(StringType()))
df_tmp= df_tmp.withColumn("House_Ownership",col("House_Ownership").cast(StringType()))
df_tmp= df_tmp.withColumn("Car_Ownership",col("Car_Ownership").cast(StringType()))
df_tmp= df_tmp.withColumn("Profession",col("Profession").cast(StringType()))
df_tmp= df_tmp.withColumn("CITY",col("CITY").cast(StringType()))
df_tmp= df_tmp.withColumn("STATE",col("STATE").cast(StringType()))
df_tmp= df_tmp.withColumn("CURRENT_JOB_YRS",col("CURRENT_JOB_YRS").cast(IntegerType()))
df_tmp= df_tmp.withColumn("CURRENT_HOUSE_YRS",col("CURRENT_HOUSE_YRS").cast(IntegerType()))
df_tmp= df_tmp.withColumn("Risk_Flag",col("Risk_Flag").cast(IntegerType()))

df_tmp.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Income: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Experience: string (nullable = true)
 |-- Married/Single: string (nullable = true)
 |-- House_Ownership: string (nullable = true)
 |-- Car_Ownership: string (nullable = true)
 |-- Profession: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- CURRENT_JOB_YRS: string (nullable = true)
 |-- CURRENT_HOUSE_YRS: string (nullable = true)
 |-- Risk_Flag: string (nullable = true)

root
 |-- Id: string (nullable = true)
 |-- Income: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Married/Single: string (nullable = true)
 |-- House_Ownership: string (nullable = true)
 |-- Car_Ownership: string (nullable = true)
 |-- Profession: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- CURRENT_JOB_YRS: integer (nullable = tru

In [0]:
# Set the function for pipeline
class Convert_data_types(Transformer):
    def __init__(self):
        super(Convert_data_types, self).__init__()

    def _transform(self, df: DataFrame) -> DataFrame:
        df= df.withColumn("Id",col("Id").cast(StringType()))
        df= df.withColumn("Income",col("Income").cast(IntegerType()))
        df= df.withColumn("Age",col("Age").cast(IntegerType()))
        df= df.withColumn("Experience",col("Experience").cast(IntegerType()))
        df= df.withColumn("Married/Single",col("Married/Single").cast(StringType()))
        df= df.withColumn("House_Ownership",col("House_Ownership").cast(StringType()))
        df= df.withColumn("Car_Ownership",col("Car_Ownership").cast(StringType()))
        df= df.withColumn("Profession",col("Profession").cast(StringType()))
        df= df.withColumn("CITY",col("CITY").cast(StringType()))
        df= df.withColumn("STATE",col("STATE").cast(StringType()))
        df= df.withColumn("CURRENT_JOB_YRS",col("CURRENT_JOB_YRS").cast(IntegerType()))
        df= df.withColumn("CURRENT_HOUSE_YRS",col("CURRENT_HOUSE_YRS").cast(IntegerType()))
        return df
      

clean2 = Convert_data_types()

### Step3: Drop useless columns

In [0]:
# Drop columns which are not required - Id, CITY
df_tmp = df_train.drop("Id","CITY")

In [0]:
# Set the function for pipeline
class Drop_columns(Transformer):
    def __init__(self):
        super(Drop_columns, self).__init__()

    def _transform(self, df: DataFrame) -> DataFrame:
        df=df.drop("Id","CITY")
        return df
      
clean3 = Drop_columns()

In [0]:
#Define the mini-pipeline for data_preprocessing:
pipe_clean = Pipeline(stages=[clean1, clean2, clean3])

#Transform by data cleaning stages:
cleaned_data = pipe_clean.fit(df_train).transform(df_train)
cleaned_data.show(15)
     

+-------+---+----------+--------------+---------------+-------------+-------------------+-----------+---------------+-----------------+---------+
| Income|Age|Experience|Married/Single|House_Ownership|Car_Ownership|         Profession|      STATE|CURRENT_JOB_YRS|CURRENT_HOUSE_YRS|Risk_Flag|
+-------+---+----------+--------------+---------------+-------------+-------------------+-----------+---------------+-----------------+---------+
|1303834| 23|         3|        single|         rented|           no|Mechanical_engineer|     Madhya|              3|               13|        0|
|7574516| 40|        10|        single|         rented|           no| Software_Developer|Maharashtra|              9|               13|        0|
|3991815| 66|         4|       married|         rented|           no|   Technical_writer|     Kerala|              4|               10|        0|
|6256451| 41|         2|        single|         rented|          yes| Software_Developer|     Odisha|              2|       

In [0]:
cleaned_data.printSchema()

root
 |-- Income: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Married/Single: string (nullable = true)
 |-- House_Ownership: string (nullable = true)
 |-- Car_Ownership: string (nullable = true)
 |-- Profession: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- CURRENT_JOB_YRS: integer (nullable = true)
 |-- CURRENT_HOUSE_YRS: integer (nullable = true)
 |-- Risk_Flag: string (nullable = true)



## 2.3) Preprosessing

In [0]:
# Divide columns into categorical columns and numerical columns.
cat_col = ["Married/Single", "House_Ownership", "Car_Ownership","Profession","STATE"]
num_col = ["Income", "Age","Experience","CURRENT_JOB_YRS","CURRENT_HOUSE_YRS"]
target_col = "Risk_Flag"

### Step1: Categorical features

In [0]:
# Preprocessing for categorical features
CAT_COLS_INDEXER = [f"{col}_indexer" for col in cat_col] 
CAT_COLS_ONEHOT = [f"{col}_vec" for col in cat_col]

# Set the pipeline
cat_pre = [
    StringIndexer( # convert string to index
        inputCols=cat_col,
        outputCols=CAT_COLS_INDEXER,),
    OneHotEncoder( # onehot encoding
        inputCols=CAT_COLS_INDEXER,
        outputCols=CAT_COLS_ONEHOT,
        dropLast=True,),
]

# Show the effect of the categorical preprocessing
Pipeline(stages=cat_pre).fit(cleaned_data).transform(cleaned_data).show(2)

+-------+---+----------+--------------+---------------+-------------+-------------------+-----------+---------------+-----------------+---------+----------------------+-----------------------+---------------------+------------------+-------------+------------------+-------------------+-----------------+---------------+--------------+
| Income|Age|Experience|Married/Single|House_Ownership|Car_Ownership|         Profession|      STATE|CURRENT_JOB_YRS|CURRENT_HOUSE_YRS|Risk_Flag|Married/Single_indexer|House_Ownership_indexer|Car_Ownership_indexer|Profession_indexer|STATE_indexer|Married/Single_vec|House_Ownership_vec|Car_Ownership_vec| Profession_vec|     STATE_vec|
+-------+---+----------+--------------+---------------+-------------+-------------------+-----------+---------------+-----------------+---------+----------------------+-----------------------+---------------------+------------------+-------------+------------------+-------------------+-----------------+---------------+--------

### Step2: Numerical features

In [0]:
# Preprocessing for numerical features
# Set the pipeline
num_pre = [
    VectorAssembler( #assemble numerical data
        inputCols=num_col,
        outputCol="assembled_num",
    ),
    StandardScaler(  #scaling - standardscaler
        inputCol="assembled_num",
        outputCol="scaled_num",
    ),
]

# Show the effect of the numerical preprocessing
Pipeline(stages=num_pre).fit(cleaned_data).transform(cleaned_data).show(20,truncate=False)

+-------+---+----------+--------------+---------------+-------------+----------------------+-----------+---------------+-----------------+---------+-------------------------------+-------------------------------------------------------------------------------------------------+
|Income |Age|Experience|Married/Single|House_Ownership|Car_Ownership|Profession            |STATE      |CURRENT_JOB_YRS|CURRENT_HOUSE_YRS|Risk_Flag|assembled_num                  |scaled_num                                                                                       |
+-------+---+----------+--------------+---------------+-------------+----------------------+-----------+---------------+-----------------+---------+-------------------------------+-------------------------------------------------------------------------------------------------+
|1303834|23 |3         |single        |rented         |no           |Mechanical_engineer   |Madhya     |3              |13               |0        |[1303834.0,23.0

### Step3: Feature assembling

In [0]:
# Preprocessing for all features
# Set the pipeline
feature_assembler = [
    VectorAssembler(
        inputCols=CAT_COLS_ONEHOT + ["scaled_num"],
        outputCol="features",
    )
]


### Step4: Target feature

In [0]:
# Preprocessing for target feature
# Set the pipeline
target_pre = [StringIndexer(inputCol=target_col, outputCol="label")]

# Show the effect of the target preprocessing
Pipeline(stages=target_pre).fit(cleaned_data).transform(cleaned_data).show(2)

+-------+---+----------+--------------+---------------+-------------+-------------------+-----------+---------------+-----------------+---------+-----+
| Income|Age|Experience|Married/Single|House_Ownership|Car_Ownership|         Profession|      STATE|CURRENT_JOB_YRS|CURRENT_HOUSE_YRS|Risk_Flag|label|
+-------+---+----------+--------------+---------------+-------------+-------------------+-----------+---------------+-----------------+---------+-----+
|1303834| 23|         3|        single|         rented|           no|Mechanical_engineer|     Madhya|              3|               13|        0|  0.0|
|7574516| 40|        10|        single|         rented|           no| Software_Developer|Maharashtra|              9|               13|        0|  0.0|
+-------+---+----------+--------------+---------------+-------------+-------------------+-----------+---------------+-----------------+---------+-----+
only showing top 2 rows



## 2.4) Stack Pipeline

In [0]:
pipe_clean = Pipeline(stages=[clean1, clean2, clean3])

pipe_pre = Pipeline(stages=cat_pre + num_pre + feature_assembler)

transforming_data = Pipeline(stages=[pipe_clean,
                                     pipe_pre])

train = transforming_data.fit(df_train).transform(df_train).select("features")
target = Pipeline(stages=target_pre).fit(df_train).transform(df_train).select("label")

train = train.withColumn("code", monotonically_increasing_id())
target = target.withColumn("code", monotonically_increasing_id())
train_df = train.join(target, ["code"]).select(["features","label"])
train_df.show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(86,[0,1,3,15,60,...|  0.0|
|(86,[0,1,3,24,55,...|  0.0|
|(86,[1,3,17,68,81...|  0.0|
|(86,[0,1,24,71,81...|  1.0|
|(86,[0,1,3,52,59,...|  1.0|
+--------------------+-----+
only showing top 5 rows



# 3) Modeling

In [0]:
# Split train dataset
(trainData, testData) = train_df.randomSplit([0.7, 0.3],seed = 11)

## 3.1) Baseline modeling

In [0]:
#LogisticRegression
lr = LogisticRegression(labelCol="label", featuresCol="features")

#Training algorithm
lrModel = lr.fit(trainData)
lr_prediction = lrModel.transform(testData)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

# Evaluating accuracy of LogisticRegression
lr_accuracy = evaluator.evaluate(lr_prediction)
print("Accuracy of LogisticRegression : %g" % (lr_accuracy))
print("Test Error of LogisticRegression : %g" % (1.0 - lr_accuracy))

Accuracy of LogisticRegression = 0.877329
Test Error of LogisticRegression = 0.122671 


In [0]:
#DecisionTreeClassifier
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")

#Training algorithm
dt_Model = dt.fit(trainData)
dt_prediction = dt_Model.transform(testData)

#Evaluating accuracy of DecisionTreeClassifier
dt_accuracy = evaluator.evaluate(dt_prediction)
print("Accuracy of DecisionTreeClassifier : %g" % (dt_accuracy))
print("Test Error of DecisionTreeClassifier : %g" % (1.0 - dt_accuracy))


Accuracy of DecisionTreeClassifier = 0.878278
Test Error of DecisionTreeClassifier = 0.121722 


In [0]:
#RandomForestClassifier
rf = RandomForestClassifier(labelCol="label", featuresCol="features")

#Training algorithm
rf_Model = rf.fit(trainData)
rf_prediction = rf_Model.transform(testData)

#Evaluating accuracy of RandomForestClassifier
rf_accuracy = evaluator.evaluate(rf_prediction)
print("Accuracy of RandomForestClassifier : %g" % (rf_accuracy))
print("Test Error of RandomForestClassifier : %g" % (1.0 - rf_accuracy))

Accuracy of RandomForestClassifier = 0.877329
Test Error of RandomForestClassifier  = 0.122671 


In [0]:
#Gradient-boosted tree classifier
gbt = GBTClassifier(labelCol="label", featuresCol="features",maxIter=10)

#Training algorithm
gbt_Model = gbt.fit(trainData)
gbt_prediction = gbt_Model.transform(testData)

#Evaluating accuracy of GBTClassifier
gbt_accuracy = evaluator.evaluate(gbt_prediction)
print("Accuracy of Gradient-boosted tree classifie : %g" % (gbt_accuracy))
print("Test Error of Gradient-boosted tree classifie : %g" % (1.0 - gbt_accuracy))

Accuracy of Gradient-boosted tree classifie = 0.878225
Test Error of Gradient-boosted tree classifie = 0.121775


In [0]:
#Support Vector Machine
svm = LinearSVC(labelCol="label", featuresCol="features")

#Training algorithm
svm_Model = svm.fit(trainData)
svm_prediction = svm_Model.transform(testData)

#Evaluating accuracy of Support Vector Machine
svm_accuracy = evaluator.evaluate(svm_prediction)
print("Accuracy of Support Vector Machine : %g" % (svm_accuracy))
print("Test Error of Support Vector Machine : %g" % (1.0 - svm_accuracy))

Accuracy of Support Vector Machine = 0.877329
Test Error of Support Vector Machine = 0.122671 


In [0]:
#XGBoost Classifier
xgb = SparkXGBClassifier(label_col="label", features_col="features")

#Training algorithm
xgb_Model = xgb.fit(trainData)
xgb_prediction = xgb_Model.transform(testData)

#Evaluating accuracy of XGBoost Classifier
xgb_accuracy = evaluator.evaluate(xgb_prediction)
print("Accuracy of XGBoost Classifier : %g" % (xgb_accuracy))
print("Test Error of XGBoost Classifier : %g" % (1.0 - xgb_accuracy))

Accuracy of XGBoost Classifier = 0.887376
Test Error of XGBoost Classifier = 0.112624 


## 3.2) Hyperparameter Tuning

The above modeling results show that the XGBoost algorithm has the best performance.
I compared the performance of the model by performing Hyperparameter tuning with the baseline model about XGBoost algorithm.

In [0]:
#Gridsearch for finding the best parameter

'''#XGBoost Classifier
xgb = SparkXGBClassifier(label_col="label", features_col="features")
grid = (ParamGridBuilder()
        .addGrid(xgb.n_estimators, [100, 1000])
        .addGrid(xgb.max_depth, [2, 5])
        .addGrid(xgb.learning_rate, [0.01, 0.1])
        .build()
       )
print(grid)

evaluator = MulticlassClassificationEvaluator()
cv2 = CrossValidator(estimator=xgb,
                    estimatorParamMaps=grid,
                    evaluator=evaluator,
                    parallelism=6
                    )

cvModel2=cv2.fit(trainData)
print(cvModel2.avgMetrics) # result of cross validation for each combination of parameters

print(
    f"Result of linear regression on the test set:\
    {evaluator.evaluate(cvModel2.transform(testData))}"
)'''

Out[37]: '#XGBoost Classifier\nxgb = SparkXGBClassifier(label_col="label", features_col="features")\ngrid = (ParamGridBuilder()\n        .addGrid(xgb.n_estimators, [100, 1000])\n        .addGrid(xgb.max_depth, [2, 5])\n        .addGrid(xgb.learning_rate, [0.01, 0.1])\n        .build()\n       )\nprint(grid)\n\nevaluator = MulticlassClassificationEvaluator()\ncv2 = CrossValidator(estimator=xgb,\n                    estimatorParamMaps=grid,\n                    evaluator=evaluator,\n                    parallelism=6\n                    )\n\ncvModel2=cv2.fit(trainData)\nprint(cvModel2.avgMetrics) # result of cross validation for each combination of parameters\n\nprint(\n    f"Result of linear regression on the test set:    {evaluator.evaluate(cvModel2.transform(testData))}"\n)'

In [0]:
#XGBoost Classifier with Hyperparameter tunning
xgb2 = SparkXGBClassifier(label_col="label", features_col="features", 
                          n_estimators=1000, 
                          max_depth=5,
                          learning_rate=0.1)

#Training algorithm
xgb_Model2 = xgb2.fit(trainData)
xgb_prediction2 = xgb_Model2.transform(testData)

#Evaluating accuracy of XGBoost Classifier with Hyperparameter tunning
xgb_accuracy2 = evaluator.evaluate(xgb_prediction2)
print("Accuracy of XGBoost Classifier : %g" % (xgb_accuracy2))
print("Test Error of XGBoost Classifier : %g" % (1.0 - xgb_accuracy2))

Accuracy of XGBoost Classifier 0.893203
Test Error of XGBoost Classifier 0.106797 


## 3.3) Evaluation

The evaluation results for the above models are as follows.

As the results, XGBoost model with hyperparameter tuning has the best performance.

In [0]:
res1 = [["LogisticRegression", lr_accuracy], 
        ["DecisionTree", dt_accuracy],
        ["RandomForest", rf_accuracy],
        ["GBT", gbt_accuracy],
        ["SVM", svm_accuracy],
        ["XGBoost", xgb_accuracy]]

res2 = [['XGBoost(Baseline)', xgb_accuracy], 
        ['XGBoost(with tuning)', xgb_accuracy2]]

res1 = spark.createDataFrame(res1,["Model", "Accuracy"])
res1 = res1.withColumn("Accuracy",col("Accuracy").cast(FloatType()))
res1 = res1.withColumn("Accuracy", round(res1["Accuracy"], 4))
res1.show()

res2 = spark.createDataFrame(res2,["Model", "Accuracy"])
res2 = res2.withColumn("Accuracy",col("Accuracy").cast(FloatType()))
res2 = res2.withColumn("Accuracy", round(res2["Accuracy"], 4))
res2.show()

+------------------+--------+
|             Model|Accuracy|
+------------------+--------+
|LogisticRegression|  0.8773|
|      DecisionTree|  0.8783|
|      RandomForest|  0.8773|
|               GBT|  0.8782|
|               SVM|  0.8773|
|           XGBoost|  0.8874|
+------------------+--------+

+--------------------+--------+
|               Model|Accuracy|
+--------------------+--------+
|   XGBoost(Baseline)|  0.8874|
|XGBoost(with tuning)|  0.8932|
+--------------------+--------+



# 4) Prediction

Based on the above results, the test data is learned by the best model and evaluated with the actual label to evaluate prediction accuracy.

In [0]:
#transform test dataset by pipeline.
test_df = transforming_data.fit(df_test).transform(df_test).select("features")

#predict by the best model
pred = xgb_Model2.transform(test_df)

#convert shape of the data to compare actual label
pred = pred.withColumn("id", monotonically_increasing_id()+1).select("id","prediction")
pred_df = df_result.join(pred, ["id"])
pred_df = pred_df.withColumn("risk_flag",col("risk_flag").cast(DoubleType()))
pred_df = pred_df.withColumn("prediction",col("prediction").cast(DoubleType()))

#Evaluating accuracy of the final model with actual label
evaluator = MulticlassClassificationEvaluator(labelCol="risk_flag", predictionCol="prediction", metricName="accuracy")
final_accuracy = evaluator.evaluate(pred_df)
print("Accuracy of the final model with actual data : %g" % (final_accuracy))
print("Test Error of the final model with actual data : %g" % (1.0 - final_accuracy))

Accuracy of the final model with actual data : 0.865571
Test Error of the final model with actual data : 0.134429
