In [1]:
from pyspark import SparkContext
sc = SparkContext("local","myproj")
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('myproj').getOrCreate()
data = spark.read.csv('UCI_Credit_Card.csv',inferSchema=True,header=True)
data.printSchema()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

root
 |-- ID: integer (nullable = true)
 |-- LIMIT_BAL: double (nullable = true)
 |-- SEX: integer (nullable = true)
 |-- EDUCATION: integer (nullable = true)
 |-- MARRIAGE: integer (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- PAY_0: integer (nullable = true)
 |-- PAY_2: integer (nullable = true)
 |-- PAY_3: integer (nullable = true)
 |-- PAY_4: integer (nullable = true)
 |-- PAY_5: integer (nullable = true)
 |-- PAY_6: integer (nullable = true)
 |-- BILL_AMT1: double (nullable = true)
 |-- BILL_AMT2: double (nullable = true)
 |-- BILL_AMT3: double (nullable = true)
 |-- BILL_AMT4: double (nullable = true)
 |-- BILL_AMT5: double (nullable = true)
 |-- BILL_AMT6: double (nullable = true)
 |-- PAY_AMT1: double (nullable = true)
 |-- PAY_AMT2: double (nullable = true)
 |-- PAY_AMT3: double (nullable = true)
 |-- PAY_AMT4: double (nullable = true)
 |-- PAY_AMT5: double (nullable = true)
 |-- PAY_AMT6: double (nullable = true)
 |-- default.payment.next.month: integer (nullable

# Data Understanding
Data Dictionary
1.	ID: ID of each client
2.	LIMIT_BAL: Amount of given credit in NT dollars (includes individual and family/supplementary credit)
3.	SEX: Gender (1=male, 2=female)
4.	EDUCATION: (1=graduate school, 2=university, 3=high school, 4=others, 5=unknown, 6=unknown)
5.	MARRIAGE: Marital status (1=married, 2=single, 3=others)
6.	Age: Age in years
7.	PAY_0 ------ PAY_6 (6 features): Repayment status from April, 2005 to September, 2005(-1=pay duly, 0=not delay, other number = the number of months for payment delay)
8.	BILL_AMT1 ------ BILL_AMT6 (6 features): Amount of bill statement from April, 2005 to September 2005(NT dollar)
9.	PAY_AMT1: Amount of previous payment from April, 2005 to September 2005(NT dollar)
10.	Default. Payment. next. Month: default payment (1=yes, 2=no)

In [4]:
data = data.withColumnRenamed("PAY_0", "PAY_1")
data = data.withColumnRenamed("default.payment.next.month", "Default")

## Change data types

In [5]:
from pyspark.sql.types import StructField,StringType,IntegerType,DoubleType,StructType
# if the second element is True means it could have null cells
df_schema = StructType([
    StructField('ID', StringType(), True),
    StructField('LIMIT_BAL', DoubleType(), True),
    StructField('SEX', StringType(), True),
    StructField('EDUCATION', StringType(), True),
    StructField('MARRIAGE', StringType(), True),
    StructField('AGE', IntegerType(), True),
    StructField('PAY_1', IntegerType(), True),
    StructField('PAY_2', IntegerType(), True),
    StructField('PAY_3', IntegerType(), True),
    StructField('PAY_4', IntegerType(), True),
    StructField('PAY_5', IntegerType(), True),
    StructField('PAY_6', IntegerType(), True),
    StructField('BILL_AMT1', DoubleType(), True),
    StructField('BILL_AMT2', DoubleType(), True),
    StructField('BILL_AMT3', DoubleType(), True),
    StructField('BILL_AMT4', DoubleType(), True),
    StructField('BILL_AMT5', DoubleType(), True),
    StructField('BILL_AMT6', DoubleType(), True),
    StructField('PAY_AMT1', DoubleType(), True),
    StructField('PAY_AMT2', DoubleType(), True),
    StructField('PAY_AMT3', DoubleType(), True),
    StructField('PAY_AMT4', DoubleType(), True),
    StructField('PAY_AMT5', DoubleType(), True),
    StructField('PAY_AMT6', DoubleType(), True),
    StructField('Default', IntegerType(), True),
    ])
data = spark.createDataFrame(data.collect(),schema = df_schema)
data.printSchema()

root
 |-- ID: string (nullable = true)
 |-- LIMIT_BAL: double (nullable = true)
 |-- SEX: string (nullable = true)
 |-- EDUCATION: string (nullable = true)
 |-- MARRIAGE: string (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- PAY_1: integer (nullable = true)
 |-- PAY_2: integer (nullable = true)
 |-- PAY_3: integer (nullable = true)
 |-- PAY_4: integer (nullable = true)
 |-- PAY_5: integer (nullable = true)
 |-- PAY_6: integer (nullable = true)
 |-- BILL_AMT1: double (nullable = true)
 |-- BILL_AMT2: double (nullable = true)
 |-- BILL_AMT3: double (nullable = true)
 |-- BILL_AMT4: double (nullable = true)
 |-- BILL_AMT5: double (nullable = true)
 |-- BILL_AMT6: double (nullable = true)
 |-- PAY_AMT1: double (nullable = true)
 |-- PAY_AMT2: double (nullable = true)
 |-- PAY_AMT3: double (nullable = true)
 |-- PAY_AMT4: double (nullable = true)
 |-- PAY_AMT5: double (nullable = true)
 |-- PAY_AMT6: double (nullable = true)
 |-- Default: integer (nullable = true)



In [6]:
from pyspark.sql.functions import *
from pyspark.sql.functions import when, count, col
df = data
na_report=df.select([count(when(isnull(c), c)).alias(c) for c in df.columns])# dimensions of the dataframe
print("Number of Rows: ",df.count() ,"   Number of Columns: ", len(df.columns))
# we should handle the missing values by imputation unless too many of them are emtpy
# na.drop() method is not recommended for avoiding biased except for the dependent variable
na_report.show()

Number of Rows:  30000    Number of Columns:  25
+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-------+
| ID|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_1|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|Default|
+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-------+
|  0|        0|  0|        0|       0|  0|    0|    0|    0|    0|    0|    0|        0|        0|        0|        0|        0|        0|       0|       0|       0|       0|       0|       0|      0|
+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+

In [7]:
df.describe().show()

+-------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+--------------------+------------------+--------------------+------------------+-----------------+-----------------+-----------------+-----------------+------------------+-----------------+----------------+-----------------+------------------+-----------------+------------------+------------------+-----------------+-------------------+
|summary|               ID|         LIMIT_BAL|               SEX|         EDUCATION|          MARRIAGE|              AGE|             PAY_1|               PAY_2|             PAY_3|               PAY_4|             PAY_5|            PAY_6|        BILL_AMT1|        BILL_AMT2|        BILL_AMT3|         BILL_AMT4|        BILL_AMT5|       BILL_AMT6|         PAY_AMT1|          PAY_AMT2|         PAY_AMT3|          PAY_AMT4|          PAY_AMT5|         PAY_AMT6|            Default|
+-------+-----------------+-----------------

# Data Preparation
Drop abnormal data in "EDUCATION" and "MARRIAGE" comlumns

In [8]:
df.select('MARRIAGE','EDUCATION').describe().show()

+-------+------------------+------------------+
|summary|          MARRIAGE|         EDUCATION|
+-------+------------------+------------------+
|  count|             30000|             30000|
|   mean|1.5518666666666667|1.8531333333333333|
| stddev|0.5219696006132467|0.7903486597207269|
|    min|                 0|                 0|
|    max|                 3|                 6|
+-------+------------------+------------------+



In [9]:
df = df.filter((df.MARRIAGE != '3')&(df.MARRIAGE != '0') &(df.EDUCATION != '0') &  (df.EDUCATION != '5') & (df.EDUCATION != '6'))

In [10]:
df.select('MARRIAGE','EDUCATION').describe().show()

+-------+-------------------+------------------+
|summary|           MARRIAGE|         EDUCATION|
+-------+-------------------+------------------+
|  count|              29283|             29283|
|   mean| 1.5397671003653997|   1.8114605743947|
| stddev|0.49842457944014473|0.7094951994131714|
|    min|                  1|                 1|
|    max|                  2|                 4|
+-------+-------------------+------------------+



# Visualizations
## We created some visualizations to explore story behind data based on the Databricks platform.

In [11]:
display(df.select("EDUCATION","Default","ID").groupBy("EDUCATION","Default").agg(count("ID")).orderBy('EDUCATION'))

EDUCATION,Default,count(ID)
1,0,8508
1,1,2023
2,0,10577
2,1,3285
3,1,1206
3,0,3564
4,1,7
4,0,113


In [12]:
display(df.select("SEX","Default","ID").groupBy("SEX","Default").agg(count("ID")))

SEX,Default,count(ID)
1,0,8795
1,1,2821
2,0,13967
2,1,3700


In [13]:
display(df.select("AGE","Default","ID").groupBy("AGE","Default").agg(count("ID")).orderBy('AGE'))

AGE,Default,count(ID)
21,1,14
21,0,50
22,0,383
22,1,164
23,1,246
23,0,669
24,0,815
24,1,300
25,0,873
25,1,298


In [14]:
display(df.select("EDUCATION","ID").groupBy("EDUCATION").agg(count("ID")).orderBy('EDUCATION'))

EDUCATION,count(ID)
1,10531
2,13862
3,4770
4,120


In [15]:
display(df.select("MARRIAGE","ID").groupBy("MARRIAGE").agg(count("ID")))

MARRIAGE,count(ID)
1,13477
2,15806


In [16]:
display(df.select("MARRIAGE","ID","Default").groupBy("Default","MARRIAGE").agg(count("id")).orderBy('MARRIAGE'))

Default,MARRIAGE,count(id)
1,1,3192
0,1,10285
0,2,12477
1,2,3329


In [17]:
display(df.select("LIMIT_BAL","ID","Default").groupBy("Default","LIMIT_BAL").agg(count("id")).orderBy('LIMIT_BAL'))

Default,LIMIT_BAL,count(id)
0,10000.0,284
1,10000.0,189
0,16000.0,1
1,20000.0,679
0,20000.0,1231
1,30000.0,554
0,30000.0,1003
1,40000.0,92
0,40000.0,132
1,50000.0,864


In [18]:
display(df.select("Default","ID").groupBy("Default").agg(count("ID")))

Default,count(ID)
1,6521
0,22762


## Check outliers

In [19]:
quantiles = {
    c: dict(
        zip(["q1", "q3"], df.approxQuantile(c, [0.25, 0.75], 0))
    )
    for c in ["LIMIT_BAL", "AGE","PAY_1","PAY_2","PAY_3","PAY_4","PAY_5","PAY_6","BILL_AMT1","BILL_AMT2","BILL_AMT3","BILL_AMT4","BILL_AMT5","BILL_AMT6","PAY_AMT1","PAY_AMT2","PAY_AMT3","PAY_AMT4","PAY_AMT5","PAY_AMT6"]
}
quantiles

{'LIMIT_BAL': {'q1': 50000.0, 'q3': 240000.0},
 'AGE': {'q1': 28.0, 'q3': 41.0},
 'PAY_1': {'q1': -1.0, 'q3': 0.0},
 'PAY_2': {'q1': -1.0, 'q3': 0.0},
 'PAY_3': {'q1': -1.0, 'q3': 0.0},
 'PAY_4': {'q1': -1.0, 'q3': 0.0},
 'PAY_5': {'q1': -1.0, 'q3': 0.0},
 'PAY_6': {'q1': -1.0, 'q3': 0.0},
 'BILL_AMT1': {'q1': 3491.0, 'q3': 66866.0},
 'BILL_AMT2': {'q1': 2946.0, 'q3': 63774.0},
 'BILL_AMT3': {'q1': 2618.0, 'q3': 60005.0},
 'BILL_AMT4': {'q1': 2313.0, 'q3': 54574.0},
 'BILL_AMT5': {'q1': 1770.0, 'q3': 50290.0},
 'BILL_AMT6': {'q1': 1271.0, 'q3': 49327.0},
 'PAY_AMT1': {'q1': 995.0, 'q3': 5007.0},
 'PAY_AMT2': {'q1': 817.0, 'q3': 5000.0},
 'PAY_AMT3': {'q1': 390.0, 'q3': 4508.0},
 'PAY_AMT4': {'q1': 292.0, 'q3': 4038.0},
 'PAY_AMT5': {'q1': 260.0, 'q3': 4086.0},
 'PAY_AMT6': {'q1': 134.0, 'q3': 4003.0}}

In [20]:
for i in quantiles:
    iqr = quantiles[i]['q3'] - quantiles[i]['q1']
    quantiles[i]['lower_bound'] = quantiles[i]['q1'] - (iqr * 1.5)
    quantiles[i]['upper_bound'] = quantiles[i]['q3'] + (iqr * 1.5)
print(quantiles)

{'LIMIT_BAL': {'q1': 50000.0, 'q3': 240000.0, 'lower_bound': -235000.0, 'upper_bound': 525000.0}, 'AGE': {'q1': 28.0, 'q3': 41.0, 'lower_bound': 8.5, 'upper_bound': 60.5}, 'PAY_1': {'q1': -1.0, 'q3': 0.0, 'lower_bound': -2.5, 'upper_bound': 1.5}, 'PAY_2': {'q1': -1.0, 'q3': 0.0, 'lower_bound': -2.5, 'upper_bound': 1.5}, 'PAY_3': {'q1': -1.0, 'q3': 0.0, 'lower_bound': -2.5, 'upper_bound': 1.5}, 'PAY_4': {'q1': -1.0, 'q3': 0.0, 'lower_bound': -2.5, 'upper_bound': 1.5}, 'PAY_5': {'q1': -1.0, 'q3': 0.0, 'lower_bound': -2.5, 'upper_bound': 1.5}, 'PAY_6': {'q1': -1.0, 'q3': 0.0, 'lower_bound': -2.5, 'upper_bound': 1.5}, 'BILL_AMT1': {'q1': 3491.0, 'q3': 66866.0, 'lower_bound': -91571.5, 'upper_bound': 161928.5}, 'BILL_AMT2': {'q1': 2946.0, 'q3': 63774.0, 'lower_bound': -88296.0, 'upper_bound': 155016.0}, 'BILL_AMT3': {'q1': 2618.0, 'q3': 60005.0, 'lower_bound': -83462.5, 'upper_bound': 146085.5}, 'BILL_AMT4': {'q1': 2313.0, 'q3': 54574.0, 'lower_bound': -76078.5, 'upper_bound': 132965.5}, 'B

In [21]:

import pyspark.sql.functions as f
df_clean=df.select(
    "*",
    *[
        f.when(
            f.col(c).between(quantiles[c]['lower_bound'], quantiles[c]['upper_bound']),
            0
        ).otherwise(1).alias(c+"_out") 
        for c in ["LIMIT_BAL", "AGE","PAY_1","PAY_2","PAY_3","PAY_4","PAY_5","PAY_6","BILL_AMT1","BILL_AMT2","BILL_AMT3","BILL_AMT4","BILL_AMT5","BILL_AMT6","PAY_AMT1","PAY_AMT2","PAY_AMT3","PAY_AMT4","PAY_AMT5","PAY_AMT6"]
    ]
)

In [22]:
from pyspark.sql.functions import col
df_clean=df_clean.withColumn("outliers", col("LIMIT_BAL_out")+col("AGE_out")+col("PAY_1_out")+col("BILL_AMT1_out")+col("PAY_AMT1_out")+col("PAY_2_out")+col("BILL_AMT2_out")+col("PAY_AMT2_out")+col("PAY_3_out")+col("BILL_AMT3_out")+col("PAY_AMT3_out")+col("PAY_4_out")+col("BILL_AMT4_out")+col("PAY_AMT4_out")+col("PAY_5_out")+col("BILL_AMT5_out")+col("PAY_AMT5_out")+col("PAY_6_out")+col("BILL_AMT6_out")+col("PAY_AMT6_out"))

In [23]:
display(df_clean.select("outliers","ID").groupBy("outliers").agg(count("id")).orderBy('outliers'))

outliers,count(id)
0,12827
1,5797
2,3040
3,2001
4,1397
5,1044
6,1635
7,454
8,259
9,188


# Data Preparation
# Data processing

# Dealing with imbalanced labels（Try to improve the performance---Oversampling）

In [24]:
majority = df.filter((df.Default == 0))
minority = df.filter((df.Default == 1))
 
majority_count = majority.count()
minority_count = minority.count() 
ratio = majority_count / minority_count
print("ratio: {}".format(ratio))
sampled_minority_df = minority.sample(withReplacement=True, fraction=ratio)
 
df = sampled_minority_df.unionAll(majority)
df.show()

ratio: 3.4905689311455297
+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-------+
| ID|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_1|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|Default|
+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-------+
|  1|  20000.0|  2|        2|       1| 24|    2|    2|   -1|   -1|   -2|   -2|   3913.0|   3102.0|    689.0|      0.0|      0.0|      0.0|     0.0|   689.0|     0.0|     0.0|     0.0|     0.0|      1|
|  1|  20000.0|  2|        2|       1| 24|    2|    2|   -1|   -1|   -2|   -2|   3913.0|   3102.0|    689.0|      0.0|      0.0|      0.0|     0.0|   689.0|     0.0|     

In [25]:
df.select('Default').describe().show()

+-------+------------------+
|summary|           Default|
+-------+------------------+
|  count|             45464|
|   mean|0.4993401372514517|
| stddev|0.5000050635231267|
|    min|                 0|
|    max|                 1|
+-------+------------------+



In [26]:
display(df.select("Default","ID").groupBy("Default").agg(count("ID")))

Default,count(ID)
1,22702
0,22762


## For statistical imputation we convert the pyspark dataframe into pandas to use functions of this library

In [27]:
from sklearn.impute import SimpleImputer
import pandas as pd

In [28]:
sample=df.toPandas()

In [29]:
samplecopy=sample
samplecopy

Unnamed: 0,ID,LIMIT_BAL,SEX,EDUCATION,MARRIAGE,AGE,PAY_1,PAY_2,PAY_3,PAY_4,...,BILL_AMT4,BILL_AMT5,BILL_AMT6,PAY_AMT1,PAY_AMT2,PAY_AMT3,PAY_AMT4,PAY_AMT5,PAY_AMT6,Default
0,1,20000.0,2,2,1,24,2,2,-1,-1,...,0.0,0.0,0.0,0.0,689.0,0.0,0.0,0.0,0.0,1
1,1,20000.0,2,2,1,24,2,2,-1,-1,...,0.0,0.0,0.0,0.0,689.0,0.0,0.0,0.0,0.0,1
2,1,20000.0,2,2,1,24,2,2,-1,-1,...,0.0,0.0,0.0,0.0,689.0,0.0,0.0,0.0,0.0,1
3,2,120000.0,2,2,2,26,-1,2,0,0,...,3272.0,3455.0,3261.0,0.0,1000.0,1000.0,1000.0,0.0,2000.0,1
4,2,120000.0,2,2,2,26,-1,2,0,0,...,3272.0,3455.0,3261.0,0.0,1000.0,1000.0,1000.0,0.0,2000.0,1
5,2,120000.0,2,2,2,26,-1,2,0,0,...,3272.0,3455.0,3261.0,0.0,1000.0,1000.0,1000.0,0.0,2000.0,1
6,2,120000.0,2,2,2,26,-1,2,0,0,...,3272.0,3455.0,3261.0,0.0,1000.0,1000.0,1000.0,0.0,2000.0,1
7,14,70000.0,1,2,2,30,1,2,2,0,...,66782.0,36137.0,36894.0,3200.0,0.0,3000.0,3000.0,1500.0,0.0,1
8,17,20000.0,1,1,2,24,0,0,2,2,...,18338.0,17905.0,19104.0,3200.0,0.0,1500.0,0.0,1650.0,0.0,1
9,17,20000.0,1,1,2,24,0,0,2,2,...,18338.0,17905.0,19104.0,3200.0,0.0,1500.0,0.0,1650.0,0.0,1


# Feature preprocessing
## Working with Categorical Columns

In [30]:
from pyspark.ml.feature import (VectorAssembler,VectorIndexer,
                                OneHotEncoder,StringIndexer)

In [31]:
# OneHot encoding of categorical variable to be able to enter into machine learning algorithms
# we vectorize them to be able to use it
# Indexer return the numrical equivalent it's could be used for label encoding
# OneHotEncoder is for using returning dummy variables equivalent of the categorical variables
SEX_indexer = StringIndexer(inputCol='SEX',outputCol='SEXIndex')
SEX_encoder = OneHotEncoder(inputCol='SEXIndex',outputCol='SEXVec')

EDUCATION_indexer = StringIndexer(inputCol='EDUCATION',outputCol='EDUCATIONIndex')
EDUCATION_encoder = OneHotEncoder(inputCol='EDUCATIONIndex',outputCol='EDUCATIONVec')

MARRIAGE_indexer = StringIndexer(inputCol='MARRIAGE',outputCol='MARRIAGEIndex')
MARRIAGE_encoder = OneHotEncoder(inputCol='MARRIAGEIndex',outputCol='MARRIAGEVec')

In [32]:
input_cols_OneHot= ['LIMIT_BAL', 'SEXVec', 'EDUCATIONVec', 'MARRIAGEVec', 'AGE', 'PAY_1', 'PAY_2', 'PAY_3', 'PAY_4', 'PAY_5', 'PAY_6', 'BILL_AMT1', 'BILL_AMT2', 'BILL_AMT3', 'BILL_AMT4', 'BILL_AMT5', 'BILL_AMT6', 'PAY_AMT1', 'PAY_AMT2', 'PAY_AMT3', 'PAY_AMT4', 'PAY_AMT5', 'PAY_AMT6']
assembler_OneHot = VectorAssembler(inputCols= input_cols_OneHot ,outputCol='features')

input_cols_Label= ['LIMIT_BAL', 'SEXIndex', 'EDUCATIONIndex', 'MARRIAGEIndex', 'AGE', 'PAY_1', 'PAY_2', 'PAY_3', 'PAY_4', 'PAY_5', 'PAY_6', 'BILL_AMT1', 'BILL_AMT2', 'BILL_AMT3', 'BILL_AMT4', 'BILL_AMT5', 'BILL_AMT6', 'PAY_AMT1', 'PAY_AMT2', 'PAY_AMT3', 'PAY_AMT4', 'PAY_AMT5', 'PAY_AMT6']
assembler_Label = VectorAssembler(inputCols=input_cols_Label,outputCol='features')

# Model Building（LogisticRegression）
## - LogisticRegression

In [33]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import RandomForestClassifier

# Defining a Pipeline 
## with pipeline we define the steps that we want to do

In [34]:
from pyspark.ml import Pipeline

In [35]:
log_reg_df = LogisticRegression(featuresCol='features',labelCol='Default')
rf_df = RandomForestClassifier(featuresCol='features',labelCol='Default')

In [36]:
# a pipeline for logistic regression
pipeline_lr_OneHot = Pipeline(stages=[SEX_indexer,EDUCATION_indexer,MARRIAGE_indexer,
                            SEX_encoder,EDUCATION_encoder,MARRIAGE_encoder,
                           assembler_OneHot,log_reg_df])

pipeline_lr_Label = Pipeline(stages=[SEX_indexer,EDUCATION_indexer,MARRIAGE_indexer,
                           assembler_Label,log_reg_df])

pipeline_rf_Label = Pipeline(stages=[SEX_indexer,EDUCATION_indexer,MARRIAGE_indexer,
                           assembler_Label,rf_df])

In [37]:
# random split with a seed number to reproduce the results
train_df, test_df = df.randomSplit([0.8,.2],110)

In [38]:
fit_model_OneHot = pipeline_lr_OneHot.fit(train_df)
fit_model_Label = pipeline_lr_Label.fit(train_df)
fit_rf_Label = pipeline_rf_Label.fit(train_df)

In [39]:
train_OneHot_results = fit_model_OneHot.transform(train_df)
test_OneHot_results = fit_model_OneHot.transform(test_df)

train_Label_results = fit_model_Label.transform(train_df)
test_Label_results = fit_model_Label.transform(test_df)

# Testing and Evaluation
- AUC
- Accuracy

In [40]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [41]:
AUC_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='Default')
Accu_evaluator = MulticlassClassificationEvaluator()
Accu_evaluator.setPredictionCol('prediction')
Accu_evaluator.setLabelCol('Default')

MulticlassClassificationEvaluator_a0e9f4579143

In [42]:
test_Label_results.select('Default','prediction').show(5)

+-------+----------+
|Default|prediction|
+-------+----------+
|      1|       1.0|
|      1|       1.0|
|      1|       1.0|
|      1|       0.0|
|      1|       0.0|
+-------+----------+
only showing top 5 rows



In [43]:
AUC_test_Label = AUC_eval.evaluate(test_Label_results)
AUC_train_Label = AUC_eval.evaluate(train_Label_results)

AUC_test_OneHot = AUC_eval.evaluate(test_OneHot_results)
AUC_train_OneHot = AUC_eval.evaluate(train_OneHot_results)

In [44]:
Accu_test_Label = Accu_evaluator.evaluate(test_Label_results, {Accu_evaluator.metricName: "accuracy"})
Accu_train_Label = Accu_evaluator.evaluate(train_Label_results, {Accu_evaluator.metricName: "accuracy"})

Accu_test_OneHot = Accu_evaluator.evaluate(test_OneHot_results, {Accu_evaluator.metricName: "accuracy"})
Accu_train_OneHot = Accu_evaluator.evaluate(train_OneHot_results, {Accu_evaluator.metricName: "accuracy"})

In [45]:
print("Test AUC with OneHot Encoding", AUC_test_OneHot)
print("\n")
print("Train AUC with OneHot Encoding", AUC_train_OneHot)

Test AUC with OneHot Encoding 0.672794660125935


Train AUC with OneHot Encoding 0.6744880710701873


In [46]:
print("Accuracy: Test with OneHot Encoding", Accu_test_OneHot)
print("\n")
print("Accuracy: Train with OneHot Encoding", Accu_train_OneHot)

Accuracy: Test with OneHot Encoding 0.6728313518273888


Accuracy: Train with OneHot Encoding 0.6745189664650907


In [47]:
print("Test AUC with Label Encoding", AUC_test_Label)
print("\n")
print("Train AUC with Label Encoding", AUC_train_Label)

Test AUC with Label Encoding 0.6710368061581119


Train AUC with Label Encoding 0.6744896003457685


In [48]:
print("Accuracy: Test with Label Encoding", Accu_test_Label)
print("\n")
print("Accuracy: Train with Label Encoding", Accu_train_Label)

Accuracy: Test with Label Encoding 0.6710700132100397


Accuracy: Train with Label Encoding 0.6745189664650907


# 1. Next Potential Steps -- try to improve the performance（Feature Selection）
## Feature Selection
## - More generalizable more 
## - avoid over fitting
## - prediction in train set and test set would be closer but it's not guaranteed

In [49]:
fit_rf_Label.stages[-1].featureImportances

SparseVector(23, {0: 0.0153, 1: 0.0003, 2: 0.0005, 4: 0.001, 5: 0.3009, 6: 0.2204, 7: 0.0506, 8: 0.076, 9: 0.1496, 10: 0.028, 11: 0.0073, 12: 0.0038, 13: 0.0025, 14: 0.0053, 15: 0.0049, 16: 0.0016, 17: 0.0477, 18: 0.0304, 19: 0.0145, 20: 0.0245, 21: 0.0044, 22: 0.0104})

In [50]:
input_cols_Label

['LIMIT_BAL',
 'SEXIndex',
 'EDUCATIONIndex',
 'MARRIAGEIndex',
 'AGE',
 'PAY_1',
 'PAY_2',
 'PAY_3',
 'PAY_4',
 'PAY_5',
 'PAY_6',
 'BILL_AMT1',
 'BILL_AMT2',
 'BILL_AMT3',
 'BILL_AMT4',
 'BILL_AMT5',
 'BILL_AMT6',
 'PAY_AMT1',
 'PAY_AMT2',
 'PAY_AMT3',
 'PAY_AMT4',
 'PAY_AMT5',
 'PAY_AMT6']

In [51]:
df2=fit_rf_Label.transform(train_df)

In [52]:
def ExtractFeatureImp(featureImp, dataset, featuresCol):
    list_extract = []
    for i in dataset.schema[featuresCol].metadata["ml_attr"]["attrs"]:
        list_extract = list_extract + dataset.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
    varlist = pd.DataFrame(list_extract)
    varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
    return(varlist.sort_values('score', ascending = False))

In [53]:
#with this you can extract the features, since the outcome is a pandas dataframe, we can use pandas related functions and methods
# you can check it with this command: type()
ExtractFeatureImp(fit_rf_Label.stages[-1].featureImportances, df2, "features")

Unnamed: 0,idx,name,vals,score
2,5,PAY_1,,0.300939
3,6,PAY_2,,0.220381
6,9,PAY_5,,0.149564
5,8,PAY_4,,0.075981
4,7,PAY_3,,0.05062
14,17,PAY_AMT1,,0.047747
15,18,PAY_AMT2,,0.030381
7,10,PAY_6,,0.027997
17,20,PAY_AMT4,,0.024517
0,0,LIMIT_BAL,,0.015326


In [54]:
features=ExtractFeatureImp(fit_rf_Label.stages[-1].featureImportances, df2, "features")

features=features[features.score>0.00036].iloc[:,1].tolist()
vars=features+['Default']
vars

['PAY_1',
 'PAY_2',
 'PAY_5',
 'PAY_4',
 'PAY_3',
 'PAY_AMT1',
 'PAY_AMT2',
 'PAY_6',
 'PAY_AMT4',
 'LIMIT_BAL',
 'PAY_AMT3',
 'PAY_AMT6',
 'BILL_AMT1',
 'BILL_AMT4',
 'BILL_AMT5',
 'PAY_AMT5',
 'BILL_AMT2',
 'BILL_AMT3',
 'BILL_AMT6',
 'AGE',
 'EDUCATIONIndex',
 'Default']

In [55]:
train_df_selected=fit_model_Label.transform(train_df).select(vars)
test_df_selected=fit_model_Label.transform(test_df).select(vars)

In [56]:
# we don't need to apply whole of pipeline just the last step which training the model is enough
assembler = VectorAssembler(inputCols= features, outputCol="features")
datatrain = assembler.transform(train_df_selected)
datatest = assembler.transform(test_df_selected)

log_reg_df = LogisticRegression(featuresCol='features',labelCol='Default')

fit_new = log_reg_df.fit(datatrain)

In [57]:
train_new_results = fit_new.transform(datatrain)
test_new_results = fit_new.transform(datatest)

In [58]:
AUC_test_new = AUC_eval.evaluate(test_new_results)
AUC_train_new = AUC_eval.evaluate(train_new_results)

Accu_test_new = Accu_evaluator.evaluate(test_new_results, {Accu_evaluator.metricName: "accuracy"})
Accu_train_new = Accu_evaluator.evaluate(train_new_results, {Accu_evaluator.metricName: "accuracy"})

In [59]:
print("Test AUC with Label Encoding & feature selection", AUC_test_new)
print("Train AUC with Label Encoding & feature selection", AUC_train_new)
print("\n")
print("Accuracy: Test with Label Encoding & feature selection", Accu_test_new)
print("Accuracy: Train with Label Encoding & feature selection", Accu_train_new)

Test AUC with Label Encoding & feature selection 0.6765270459580119
Train AUC with Label Encoding & feature selection 0.6725869336953496


Accuracy: Test with Label Encoding & feature selection 0.6765741963892559
Accuracy: Train with Label Encoding & feature selection 0.6726223199560198


# 2. Next Potential Steps -- try to improve the performance（Tuning the algorithms --CrossValidator）
## - Tuning the algorithms (CrossValidator) for LogisticRegression
## - As we indicate 3 values for regParam, 3 values for maxIter, and 3 values for elasticNetParam, this grid will have 3 x 3 x 3 = 27 parameter settings for CrossValidator to choose from. We will create a 5-fold cross validator.

In [60]:
import pyspark.ml.tuning as tune
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [61]:
lc_df = LogisticRegression(featuresCol='features',labelCol='Default')

In [62]:
# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lc_df.regParam, [0.01, 0.5, 2.0])
             .addGrid(lc_df.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lc_df.maxIter, [1, 5, 10])
             .build())

In [63]:
cv = CrossValidator(estimator=lc_df, estimatorParamMaps=paramGrid, evaluator=BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='Default'),numFolds=5)

In [64]:
cvModel = cv.fit(datatrain)

In [65]:
train_new0_results = cvModel.transform(datatrain)
test_new0_results = cvModel.transform(datatest)

AUC_test_new0 = AUC_eval.evaluate(test_new0_results)
AUC_train_new0 = AUC_eval.evaluate(train_new0_results)

Accu_test_new0 = Accu_evaluator.evaluate(test_new0_results, {Accu_evaluator.metricName: "accuracy"})
Accu_train_new0 = Accu_evaluator.evaluate(train_new0_results, {Accu_evaluator.metricName: "accuracy"})

In [66]:
print("Test AUC with Label Encoding & feature selection", AUC_test_new0)
print("Train AUC with Label Encoding & feature selection", AUC_train_new0)
print("\n")
print("Accuracy: Test with Label Encoding & feature selection", Accu_test_new0)
print("Accuracy: Train with Label Encoding & feature selection", Accu_train_new0)

Test AUC with Label Encoding & feature selection 0.6871444083045317
Train AUC with Label Encoding & feature selection 0.6841121710219951


Accuracy: Test with Label Encoding & feature selection 0.6874724790841039
Accuracy: Train with Label Encoding & feature selection 0.6843320505772402


# 3. Next Potential Steps -- try to improve the performance（Trying other algorithms）
## Trying other algorithms
## - Decision tree
## - Random forest
## - Gradient boosting tree
## - SVM

# Decision tree

In [67]:
from pyspark.ml.classification import DecisionTreeClassifier

In [68]:
dt_df = DecisionTreeClassifier(featuresCol='features',labelCol='Default')

fit_new1 = dt_df.fit(datatrain)

In [69]:
train_new1_results = fit_new1.transform(datatrain)
test_new1_results = fit_new1.transform(datatest)

In [70]:
AUC_test_new1 = AUC_eval.evaluate(test_new1_results)
AUC_train_new1 = AUC_eval.evaluate(train_new1_results)

Accu_test_new1 = Accu_evaluator.evaluate(test_new1_results, {Accu_evaluator.metricName: "accuracy"})
Accu_train_new1 = Accu_evaluator.evaluate(train_new1_results, {Accu_evaluator.metricName: "accuracy"})

In [71]:
print("Test AUC with Label Encoding & feature selection", AUC_test_new1)
print("Train AUC with Label Encoding & feature selection", AUC_train_new1)
print("\n")
print("Accuracy: Test with Label Encoding & feature selection", Accu_test_new1)
print("Accuracy: Train with Label Encoding & feature selection", Accu_train_new1)

Test AUC with Label Encoding & feature selection 0.7097990760893275
Train AUC with Label Encoding & feature selection 0.710971926078711


Accuracy: Test with Label Encoding & feature selection 0.7100396301188904
Accuracy: Train with Label Encoding & feature selection 0.7111324903793294


# Random forest

In [72]:
from pyspark.ml.classification import RandomForestClassifier

In [73]:
rf_df = RandomForestClassifier(featuresCol='features',labelCol='Default')
fit_new2 = rf_df.fit(datatrain)

In [74]:
train_new2_results = fit_new2.transform(datatrain)
test_new2_results = fit_new2.transform(datatest)

In [75]:
AUC_test_new2 = AUC_eval.evaluate(test_new2_results)
AUC_train_new2 = AUC_eval.evaluate(train_new2_results)

Accu_test_new2 = Accu_evaluator.evaluate(test_new2_results, {Accu_evaluator.metricName: "accuracy"})
Accu_train_new2 = Accu_evaluator.evaluate(train_new2_results, {Accu_evaluator.metricName: "accuracy"})

In [76]:
print("Test AUC with Label Encoding & feature selection", AUC_test_new2)
print("Train AUC with Label Encoding & feature selection", AUC_train_new2)
print("\n")
print("Accuracy: Test with Label Encoding & feature selection", Accu_test_new2)
print("Accuracy: Train with Label Encoding & feature selection", Accu_train_new2)

Test AUC with Label Encoding & feature selection 0.716123307658376
Train AUC with Label Encoding & feature selection 0.7187003227218774


Accuracy: Test with Label Encoding & feature selection 0.7163143989431968
Accuracy: Train with Label Encoding & feature selection 0.718829026937878


# Gradient boosting tree

In [77]:
from pyspark.ml.classification import GBTClassifier

In [78]:
gbt_df = GBTClassifier(featuresCol='features',labelCol='Default')
fit_new3 = gbt_df.fit(datatrain)

In [79]:
train_new3_results = fit_new3.transform(datatrain)
test_new3_results = fit_new3.transform(datatest)

AUC_test_new3 = AUC_eval.evaluate(test_new3_results)
AUC_train_new3 = AUC_eval.evaluate(train_new3_results)

Accu_test_new3 = Accu_evaluator.evaluate(test_new3_results, {Accu_evaluator.metricName: "accuracy"})
Accu_train_new3 = Accu_evaluator.evaluate(train_new3_results, {Accu_evaluator.metricName: "accuracy"})

In [80]:
print("Test AUC with Label Encoding & feature selection", AUC_test_new3)
print("Train AUC with Label Encoding & feature selection", AUC_train_new3)
print("\n")
print("Accuracy: Test with Label Encoding & feature selection", Accu_test_new3)
print("Accuracy: Train with Label Encoding & feature selection", Accu_train_new3)

Test AUC with Label Encoding & feature selection 0.7230400345133472
Train AUC with Label Encoding & feature selection 0.7326079658889098


Accuracy: Test with Label Encoding & feature selection 0.723139586085425
Accuracy: Train with Label Encoding & feature selection 0.7326827927432655


# SVM

In [81]:
from pyspark.ml.classification import LinearSVC

In [82]:
svm_df = LinearSVC(featuresCol='features',labelCol='Default')
fit_new4 = svm_df.fit(datatrain)

In [83]:
train_new4_results = fit_new4.transform(datatrain)
test_new4_results = fit_new4.transform(datatest)

AUC_test_new4 = AUC_eval.evaluate(test_new4_results)
AUC_train_new4 = AUC_eval.evaluate(train_new4_results)

Accu_test_new4 = Accu_evaluator.evaluate(test_new4_results, {Accu_evaluator.metricName: "accuracy"})
Accu_train_new4 = Accu_evaluator.evaluate(train_new4_results, {Accu_evaluator.metricName: "accuracy"})

In [84]:
print("Test AUC with Label Encoding & feature selection", AUC_test_new4)
print("Train AUC with Label Encoding & feature selection", AUC_train_new4)
print("\n")
print("Accuracy: Test with Label Encoding & feature selection", Accu_test_new4)
print("Accuracy: Train with Label Encoding & feature selection", Accu_train_new4)

Test AUC with Label Encoding & feature selection 0.6831246212984191
Train AUC with Label Encoding & feature selection 0.6841975988946856


Accuracy: Test with Label Encoding & feature selection 0.6833993835314839
Accuracy: Train with Label Encoding & feature selection 0.6843870258383727


# Now we will try tuning the model（Decision tree） with the ParamGridBuilder and the CrossValidator.

## As we indicate 4 values for maxDepth and 3 values for maxBin, this grid will have 4 x 3 = 12 parameter settings for CrossValidator to choose from. We will create a 5-fold CrossValidator.

In [85]:
# Create ParamGrid for Cross Validation
paramGrid1 = (ParamGridBuilder()
             .addGrid(dt_df.maxDepth, [1, 2, 6, 10])
             .addGrid(dt_df.maxBins, [20, 40, 80])
             .build())

In [86]:
# Create 5-fold CrossValidator
cv1 = CrossValidator(estimator=dt_df, estimatorParamMaps=paramGrid1, evaluator=BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='Default'),numFolds=5)

In [87]:
# Run cross validations
cvModel1 = cv1.fit(datatrain)

In [88]:
train_new01_results = cvModel1.transform(datatrain)
test_new01_results = cvModel1.transform(datatest)

AUC_test_new01 = AUC_eval.evaluate(test_new01_results)
AUC_train_new01 = AUC_eval.evaluate(train_new01_results)

Accu_test_new01 = Accu_evaluator.evaluate(test_new01_results, {Accu_evaluator.metricName: "accuracy"})
Accu_train_new01 = Accu_evaluator.evaluate(train_new01_results, {Accu_evaluator.metricName: "accuracy"})

In [89]:
print("Test AUC with Label Encoding & feature selection", AUC_test_new01)
print("Train AUC with Label Encoding & feature selection", AUC_train_new01)
print("\n")
print("Accuracy: Test with Label Encoding & feature selection", Accu_test_new01)
print("Accuracy: Train with Label Encoding & feature selection", Accu_train_new01)

Test AUC with Label Encoding & feature selection 0.7346890163211293
Train AUC with Label Encoding & feature selection 0.7673559178201877


Accuracy: Test with Label Encoding & feature selection 0.7346983707617789
Accuracy: Train with Label Encoding & feature selection 0.767372182517867


# Now we will try tuning the model（Random forest） with the ParamGridBuilder and the CrossValidator.

## As we indicate 4 values for maxDepth, 3 values for maxBin, and 2 values for numTrees, this grid will have 4 x 3 x 2 = 24 parameter settings for CrossValidator to choose from. We will create a 5-fold CrossValidator.

In [90]:
# Create ParamGrid for Cross Validation
paramGrid2 = (ParamGridBuilder()
             .addGrid(rf_df.maxDepth, [1, 2, 6, 10])
             .addGrid(rf_df.maxBins, [20, 40, 80])
             .addGrid(rf_df.numTrees, [5, 20])
             .build())

In [91]:
# Create 5-fold CrossValidator
cv2 = CrossValidator(estimator=rf_df, estimatorParamMaps=paramGrid2, evaluator=BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='Default'),numFolds=5)

In [92]:
# Run cross validations
cvModel2 = cv2.fit(datatrain)

In [93]:
train_new02_results = cvModel2.transform(datatrain)
test_new02_results = cvModel2.transform(datatest)

AUC_test_new02 = AUC_eval.evaluate(test_new02_results)
AUC_train_new02 = AUC_eval.evaluate(train_new02_results)

Accu_test_new02 = Accu_evaluator.evaluate(test_new02_results, {Accu_evaluator.metricName: "accuracy"})
Accu_train_new02 = Accu_evaluator.evaluate(train_new02_results, {Accu_evaluator.metricName: "accuracy"})

In [94]:
print("Test AUC with Label Encoding & feature selection", AUC_test_new02)
print("Train AUC with Label Encoding & feature selection", AUC_train_new02)
print("\n")
print("Accuracy: Test with Label Encoding & feature selection", Accu_test_new02)
print("Accuracy: Train with Label Encoding & feature selection", Accu_train_new02)

Test AUC with Label Encoding & feature selection 0.7546095192853022
Train AUC with Label Encoding & feature selection 0.7767887918340891


Accuracy: Test with Label Encoding & feature selection 0.7547335975341259
Accuracy: Train with Label Encoding & feature selection 0.7768829026937878


# Conclusion

# The best model is Random forest（tuning with the ParamGridBuilder and the CrossValidator.）
# - Test AUC with Label Encoding & feature selection 0.7546095192853022

# - Accuracy: Test with Label Encoding & feature selection 0.7547335975341259