### Load data

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("BDPPproject") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [2]:
cc = spark.read.csv("gs://bdppstorage/ccdefault.csv", inferSchema="true",header="true")
cc.show(5)

+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-------+
| ID|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_0|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|DEFAULT|
+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-------+
|  1|    20000|  2|        2|       1| 24|    2|    2|   -1|   -1|   -2|   -2|     3913|     3102|      689|        0|        0|        0|       0|     689|       0|       0|       0|       0|      1|
|  2|   120000|  2|        2|       2| 26|   -1|    2|    0|    0|    0|    2|     2682|     1725|     2682|     3272|     3455|     3261|       0|    1000|    1000|    1000|       0|    2000|    

### Schema and dimension

In [3]:
cc.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- LIMIT_BAL: integer (nullable = true)
 |-- SEX: integer (nullable = true)
 |-- EDUCATION: integer (nullable = true)
 |-- MARRIAGE: integer (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- PAY_0: integer (nullable = true)
 |-- PAY_2: integer (nullable = true)
 |-- PAY_3: integer (nullable = true)
 |-- PAY_4: integer (nullable = true)
 |-- PAY_5: integer (nullable = true)
 |-- PAY_6: integer (nullable = true)
 |-- BILL_AMT1: integer (nullable = true)
 |-- BILL_AMT2: integer (nullable = true)
 |-- BILL_AMT3: integer (nullable = true)
 |-- BILL_AMT4: integer (nullable = true)
 |-- BILL_AMT5: integer (nullable = true)
 |-- BILL_AMT6: integer (nullable = true)
 |-- PAY_AMT1: integer (nullable = true)
 |-- PAY_AMT2: integer (nullable = true)
 |-- PAY_AMT3: integer (nullable = true)
 |-- PAY_AMT4: integer (nullable = true)
 |-- PAY_AMT5: integer (nullable = true)
 |-- PAY_AMT6: integer (nullable = true)
 |-- DEFAULT: integer (nullable = tru

In [4]:
cc.count()

30000

### Statistical summary

In [5]:
cc.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
ID,30000,15000.5,8660.398374208891,1,30000
LIMIT_BAL,30000,167484.32266666667,129747.66156720246,10000,1000000
SEX,30000,1.6037333333333332,0.4891291960902602,1,2
EDUCATION,30000,1.8531333333333333,0.7903486597207269,0,6
MARRIAGE,30000,1.5518666666666667,0.5219696006132467,0,3
AGE,30000,35.4855,9.217904068090155,21,79
PAY_0,30000,-0.0167,1.1238015279973335,-2,8
PAY_2,30000,-0.13376666666666667,1.1971859730345495,-2,8
PAY_3,30000,-0.1662,1.1968675684465686,-2,8


In [6]:
cc.groupBy('DEFAULT').count().show()

+-------+-----+
|DEFAULT|count|
+-------+-----+
|      1| 6636|
|      0|23364|
+-------+-----+



### Crosstab computation

In [7]:
cc.crosstab('SEX', 'DEFAULT').sort("SEX_DEFAULT").show()

+-----------+-----+----+
|SEX_DEFAULT|    0|   1|
+-----------+-----+----+
|          1| 9015|2873|
|          2|14349|3763|
+-----------+-----+----+



In [8]:
cc.crosstab('EDUCATION', 'DEFAULT').sort("EDUCATION_DEFAULT").show()

+-----------------+-----+----+
|EDUCATION_DEFAULT|    0|   1|
+-----------------+-----+----+
|                0|   14|   0|
|                1| 8549|2036|
|                2|10700|3330|
|                3| 3680|1237|
|                4|  116|   7|
|                5|  262|  18|
|                6|   43|   8|
+-----------------+-----+----+



In [9]:
cc.crosstab('MARRIAGE', 'DEFAULT').sort("MARRIAGE_DEFAULT").show()

+----------------+-----+----+
|MARRIAGE_DEFAULT|    0|   1|
+----------------+-----+----+
|               0|   49|   5|
|               1|10453|3206|
|               2|12623|3341|
|               3|  239|  84|
+----------------+-----+----+



In [10]:
cc.groupby('MARRIAGE').agg({'LIMIT_BAL': 'mean'}).show()

+--------+------------------+
|MARRIAGE|    avg(LIMIT_BAL)|
+--------+------------------+
|       1|182200.89318398127|
|       3| 98080.49535603715|
|       2|156413.66073665748|
|       0|132962.96296296295|
+--------+------------------+



###  Correlation among attributes

In [11]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=['SEX', 'DEFAULT'],
    outputCol="Se_De")

creditcard = assembler.transform(cc)
#creditcard.select("Se_De").show(5)
r1 = Correlation.corr(creditcard, "Se_De").head()
print("correlation matrix:\n" + str(r1[0]))

correlation matrix:
DenseMatrix([[ 1.        , -0.03996058],
             [-0.03996058,  1.        ]])


In [12]:
assembler = VectorAssembler(inputCols=["MARRIAGE","DEFAULT"],outputCol="Ma_De")
creditcard2 = assembler.transform(cc)

r2 = Correlation.corr(creditcard2, "Ma_De").head()
print("correlation matrix:\n" + str(r2[0]))

correlation matrix:
DenseMatrix([[ 1.        , -0.02433922],
             [-0.02433922,  1.        ]])


In [13]:
from pyspark.mllib.stat import Statistics
import pandas as pd

# result can be used w/ seaborn's heatmap
def compute_correlation_matrix(cc, method='pearson'):
    cc_rdd = cc.rdd.map(lambda row: row[0:])
    corr_mat = Statistics.corr(cc_rdd, method=method)
    corr_mat_df = pd.DataFrame(corr_mat,
                    columns=cc.columns, 
                    index=cc.columns)
    return corr_mat_df
compute_correlation_matrix(cc)

Unnamed: 0,ID,LIMIT_BAL,SEX,EDUCATION,MARRIAGE,AGE,PAY_0,PAY_2,PAY_3,PAY_4,...,BILL_AMT4,BILL_AMT5,BILL_AMT6,PAY_AMT1,PAY_AMT2,PAY_AMT3,PAY_AMT4,PAY_AMT5,PAY_AMT6,DEFAULT
ID,1.0,0.026179,0.018497,0.039177,-0.029079,0.018678,-0.030575,-0.011215,-0.018494,-0.002735,...,0.040351,0.016705,0.01673,0.009742,0.008406,0.039151,0.007793,0.000652,0.003,-0.013952
LIMIT_BAL,0.026179,1.0,0.024755,-0.219161,-0.108139,0.144713,-0.271214,-0.296382,-0.286123,-0.26746,...,0.293988,0.295562,0.290389,0.195236,0.178408,0.210167,0.203242,0.217202,0.219595,-0.15352
SEX,0.018497,0.024755,1.0,0.014232,-0.031389,-0.090874,-0.057643,-0.070771,-0.066096,-0.060173,...,-0.02188,-0.017005,-0.016733,-0.000242,-0.001391,-0.008597,-0.002229,-0.001667,-0.002766,-0.039961
EDUCATION,0.039177,-0.219161,0.014232,1.0,-0.143464,0.175061,0.105364,0.121566,0.114025,0.108793,...,-0.000451,-0.007567,-0.009099,-0.037456,-0.030038,-0.039943,-0.038218,-0.040358,-0.0372,0.028006
MARRIAGE,-0.029079,-0.108139,-0.031389,-0.143464,1.0,-0.41417,0.019917,0.024199,0.032688,0.033122,...,-0.023344,-0.025393,-0.021207,-0.005979,-0.008093,-0.003541,-0.012659,-0.001205,-0.006641,-0.024339
AGE,0.018678,0.144713,-0.090874,0.175061,-0.41417,1.0,-0.039447,-0.050148,-0.053048,-0.049722,...,0.051353,0.049345,0.047613,0.026147,0.021785,0.029247,0.021379,0.02285,0.019478,0.01389
PAY_0,-0.030575,-0.271214,-0.057643,0.105364,0.019917,-0.039447,1.0,0.672164,0.574245,0.538841,...,0.179125,0.180635,0.17698,-0.079269,-0.070101,-0.070561,-0.064005,-0.05819,-0.058673,0.324794
PAY_2,-0.011215,-0.296382,-0.070771,0.121566,0.024199,-0.050148,0.672164,1.0,0.766552,0.662067,...,0.222237,0.221348,0.219403,-0.080701,-0.05899,-0.055901,-0.046858,-0.037093,-0.0365,0.263551
PAY_3,-0.018494,-0.286123,-0.066096,0.114025,0.032688,-0.053048,0.574245,0.766552,1.0,0.777359,...,0.227202,0.225145,0.222327,0.001295,-0.066793,-0.053311,-0.046067,-0.035863,-0.035861,0.235253
PAY_4,-0.002735,-0.26746,-0.060173,0.108793,0.033122,-0.049722,0.538841,0.662067,0.777359,1.0,...,0.245917,0.242902,0.239154,-0.009362,-0.001944,-0.069235,-0.043461,-0.03359,-0.026565,0.216614


### Data Cleaning

In [14]:
from pyspark.sql.functions import *
from pyspark.sql.functions import when, count, col
cc.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in cc.columns]).show()

+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-------+
| ID|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_0|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|DEFAULT|
+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-------+
|  0|        0|  0|        0|       0|  0|    0|    0|    0|    0|    0|    0|        0|        0|        0|        0|        0|        0|       0|       0|       0|       0|       0|       0|      0|
+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+----

In [15]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler

assembler = VectorAssembler(inputCols=['ID','LIMIT_BAL','SEX','EDUCATION','MARRIAGE','AGE','PAY_0','PAY_2','PAY_3','PAY_4',
'PAY_5','PAY_6','BILL_AMT1','BILL_AMT2','BILL_AMT3','BILL_AMT4','BILL_AMT5','BILL_AMT6','PAY_AMT1','PAY_AMT1',
'PAY_AMT2','PAY_AMT3','PAY_AMT4','PAY_AMT5','PAY_AMT6'], outputCol="features")

featuredcredit = assembler.transform(cc)

scaler = StandardScaler(withMean=True, withStd=True, inputCol="features", outputCol="scaledFeatures")

# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(featuredcredit)

# Normalize each feature to have unit standard deviation.
scaledcredit = scalerModel.transform(featuredcredit)
scaledcredit.select(["features", "scaledFeatures"]).show(5)

+--------------------+--------------------+
|            features|      scaledFeatures|
+--------------------+--------------------+
|[1.0,20000.0,2.0,...|[-1.7319642067123...|
|[2.0,120000.0,2.0...|[-1.7318487385830...|
|[3.0,90000.0,2.0,...|[-1.7317332704536...|
|[4.0,50000.0,2.0,...|[-1.7316178023242...|
|[5.0,50000.0,1.0,...|[-1.7315023341948...|
+--------------------+--------------------+
only showing top 5 rows



In [16]:
renamecredit = scaledcredit.withColumnRenamed("DEFAULT", "label")
dataset = renamecredit.withColumn('features', renamecredit.scaledFeatures).select("features","label")
dataset.show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[-1.7319642067123...|    1|
|[-1.7318487385830...|    1|
|[-1.7317332704536...|    0|
|[-1.7316178023242...|    0|
|[-1.7315023341948...|    0|
+--------------------+-----+
only showing top 5 rows



### Model Training
splitting data for train and test

In [17]:
trainSet, testSet = dataset.randomSplit([0.8, 0.2], seed=12345)
print("Training Data Count: " + str(trainSet.count()))
print("Test Data Count: " + str(testSet.count()))

Training Data Count: 23861
Test Data Count: 6139


In [18]:
trainSet.groupby('label').agg({'label': 'count'}).show()

+-----+------------+
|label|count(label)|
+-----+------------+
|    1|        5223|
|    0|       18638|
+-----+------------+



In [19]:
testSet.groupby('label').agg({'label': 'count'}).show()

+-----+------------+
|label|count(label)|
+-----+------------+
|    1|        1413|
|    0|        4726|
+-----+------------+



### Logistic Regression

In [20]:
import matplotlib.pyplot as plt
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=10)
lrModel = lr.fit(trainSet)
trainingSummary = lrModel.summary
trainaccuracy = trainingSummary.accuracy
print("Coefficients: %s" % str(lrModel.coefficients))
print("Intercept: %s" % str(lrModel.intercept))
trainingSummary = lrModel.summary
print("Training accuracy: ",trainaccuracy)

## ROC
train_roc = trainingSummary.roc.toPandas()
#plt.plot(train_roc['FPR'],train_roc['TPR'])
#plt.ylabel('False Positive Rate')
#plt.xlabel('True Positive Rate')
#plt.title('ROC Curve')
#plt.show()
#trainingSummary.roc.show()
print('Training set areaUnderROC for LR model: ' + str(trainingSummary.areaUnderROC))

Coefficients: [-0.014882308932141649,-0.10423533268075222,-0.04399276772820826,-0.0761765173576866,-0.09160106292155615,0.06562702327214429,0.6311601999462618,0.07814532056869669,0.1194173752643531,0.029330430094685136,0.016095109075138208,0.028697120946969853,-0.21873225248682548,-0.021898642225123135,0.012380036227823552,0.03386184159589726,0.06235662771304676,0.047096786967934796,-0.09940875681586178,-0.09940875681586178,-0.25284318318087234,-0.05494400827182839,-0.10818973104339918,-0.03249949117192905,-0.047589094682328005]
Intercept: -1.4706375434773504
Training accuracy:  0.8109886425547965
Training set areaUnderROC for LR model: 0.7226037947790382


In [21]:
predictions = lrModel.transform(testSet)
predictions.select('label', 'prediction', 'probability').show(5)

+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|    0|       0.0|[0.86966239763636...|
|    0|       0.0|[0.82254479922521...|
|    0|       0.0|[0.74479529855262...|
|    0|       0.0|[0.71997203818794...|
|    1|       0.0|[0.67956090930478...|
+-----+----------+--------------------+
only showing top 5 rows



In [22]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = lrModel.evaluate(testSet)
bc_evaluator = BinaryClassificationEvaluator()
test_roc = bc_evaluator.evaluate(predictions)
print("Test areaUnderROC for LR model: ",test_roc)

testaccuracy = evaluator.accuracy
print("Test accuracy for LR model: ",testaccuracy)

Test areaUnderROC for LR model:  0.7297743670930633
Test accuracy for LR model:  0.8035510669490145


### Decision tree

In [23]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth = 3)
dtModel = dt.fit(trainSet)

predictions = dtModel.transform(testSet)
predictions.select('label', 'prediction', 'probability').show(5)

evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC for DT Model: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))
test = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
testaccuracy = test.evaluate(predictions)
print("Test accuracy for DT Model: ",testaccuracy )

+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|    0|       0.0|[0.85783340965755...|
|    0|       0.0|[0.85783340965755...|
|    0|       0.0|[0.85783340965755...|
|    0|       0.0|[0.63099921321793...|
|    1|       0.0|[0.85783340965755...|
+-----+----------+--------------------+
only showing top 5 rows

Test Area Under ROC for DT Model: 0.3142486385563711
Test accuracy for DT Model:  0.8156051474181463


### Random Forest Classifier

In [24]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')
rfModel = rf.fit(trainSet)

predictions = rfModel.transform(testSet)
predictions.select('label', 'prediction', 'probability').show(5)

evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC for RF Model: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))
test = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
testaccuracy = test.evaluate(predictions)
print("Test accuracy for RF Model: ",testaccuracy )

+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|    0|       0.0|[0.84517838311200...|
|    0|       0.0|[0.83100455750613...|
|    0|       0.0|[0.81113254009174...|
|    0|       0.0|[0.68017136483636...|
|    1|       0.0|[0.61369940231470...|
+-----+----------+--------------------+
only showing top 5 rows

Test Area Under ROC for RF Model: 0.7731641588190671
Test accuracy for RF Model:  0.8032252809903894


### Gradient-boosted tree classifier

In [25]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
gbt = GBTClassifier(labelCol="label", featuresCol="features", maxIter=10)
gbtModel = gbt.fit(trainSet)

predictions = gbtModel.transform(testSet)
predictions.select('label', 'prediction', 'probability').show(5)

evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC for GBT model: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))
test = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
testaccuracy = test.evaluate(predictions)
print("Test accuracy for GBT model: ",testaccuracy )

+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|    0|       0.0|[0.84505419499510...|
|    0|       0.0|[0.80797958430034...|
|    0|       0.0|[0.75627550631435...|
|    0|       0.0|[0.66030479155415...|
|    1|       1.0|[0.47891818750627...|
+-----+----------+--------------------+
only showing top 5 rows

Test Area Under ROC for GBT model: 0.7854768264818648
Test accuracy for GBT model:  0.8152793614595211


### Data Cleaning

In [26]:
from pyspark.sql.functions import countDistinct
from pyspark.sql import functions as F
## Modifiying the Flaws in the MARRIAGE column
print("Before modifying" )
cc.select(countDistinct('MARRIAGE')).show()
print("After modifying")
res = cc.withColumn("MARRIAGE", F.when(F.col("MARRIAGE")==0, 3).otherwise(F.col("MARRIAGE")))
res.select(countDistinct('MARRIAGE')).show()

Before modifying
+------------------------+
|count(DISTINCT MARRIAGE)|
+------------------------+
|                       4|
+------------------------+

After modifying
+------------------------+
|count(DISTINCT MARRIAGE)|
+------------------------+
|                       3|
+------------------------+



In [27]:
## Modifiying the Flaws in the EDUCATION column
print("Before modifying" )
cc.select(countDistinct('EDUCATION')).show()
print("After modifying")
res1 = res.withColumn('EDUCATION', F.when(F.col("EDUCATION")==5, 4).otherwise(F.col("EDUCATION"))).withColumn('EDUCATION', F.when(F.col("EDUCATION")==6, 4).otherwise(F.col("EDUCATION"))).withColumn('EDUCATION', F.when(F.col("EDUCATION")==0, 4).otherwise(F.col("EDUCATION")))
res1.select(countDistinct('EDUCATION')).show()

Before modifying
+-------------------------+
|count(DISTINCT EDUCATION)|
+-------------------------+
|                        7|
+-------------------------+

After modifying
+-------------------------+
|count(DISTINCT EDUCATION)|
+-------------------------+
|                        4|
+-------------------------+



In [28]:
##Modifiying the Flaws in the PAY_0 to PAY_6 column
print("Before modifying" )
res1.groupBy('PAY_6').count().show()
res2 = res1.withColumn('PAY_0', F.when(F.col("PAY_0")==-2, 0).otherwise(F.col("PAY_0"))).withColumn('PAY_0', F.when(F.col("PAY_0")==-1, 0).otherwise(F.col("PAY_0")))
res3 = res2.withColumn('PAY_2', F.when(F.col("PAY_2")==-2, 0).otherwise(F.col("PAY_2"))).withColumn('PAY_2', F.when(F.col("PAY_2")==-1, 0).otherwise(F.col("PAY_2")))
res4 = res3.withColumn('PAY_3', F.when(F.col("PAY_3")==-2, 0).otherwise(F.col("PAY_3"))).withColumn('PAY_3', F.when(F.col("PAY_3")==-1, 0).otherwise(F.col("PAY_3")))
res5 = res4.withColumn('PAY_4', F.when(F.col("PAY_4")==-2, 0).otherwise(F.col("PAY_4"))).withColumn('PAY_4', F.when(F.col("PAY_4")==-1, 0).otherwise(F.col("PAY_4")))
res6 = res5.withColumn('PAY_5', F.when(F.col("PAY_5")==-2, 0).otherwise(F.col("PAY_5"))).withColumn('PAY_5', F.when(F.col("PAY_5")==-1, 0).otherwise(F.col("PAY_5")))
res7 = res6.withColumn('PAY_6', F.when(F.col("PAY_6")==-2, 0).otherwise(F.col("PAY_6"))).withColumn('PAY_6', F.when(F.col("PAY_6")==-1, 0).otherwise(F.col("PAY_6")))
print("After modifying")
res7.groupBy('PAY_6').count().show()
#res7.show(2)

Before modifying
+-----+-----+
|PAY_6|count|
+-----+-----+
|   -1| 5740|
|    6|   19|
|    3|  184|
|    5|   13|
|    4|   49|
|    8|    2|
|    7|   46|
|   -2| 4895|
|    2| 2766|
|    0|16286|
+-----+-----+

After modifying
+-----+-----+
|PAY_6|count|
+-----+-----+
|    6|   19|
|    3|  184|
|    5|   13|
|    4|   49|
|    8|    2|
|    7|   46|
|    2| 2766|
|    0|26921|
+-----+-----+



### Upsampling

In [75]:
res7.groupby('DEFAULT').agg({'DEFAULT': 'count'}).show()

+-------+--------------+
|DEFAULT|count(DEFAULT)|
+-------+--------------+
|      1|          6636|
|      0|         23364|
+-------+--------------+



In [95]:
from pyspark.sql.functions import col

res8 = res7.where(col('DEFAULT')==1).sample(True, 3.552, seed = 1000)

In [96]:
res8.groupby('DEFAULT').agg({'DEFAULT': 'count'}).show()

+-------+--------------+
|DEFAULT|count(DEFAULT)|
+-------+--------------+
|      1|         23617|
+-------+--------------+



In [103]:
res9= res7.where(col('DEFAULT')==0)
resUp = res9.union(res8)

In [126]:
resUp.groupBy('DEFAULT').count().show()

+-------+-----+
|DEFAULT|count|
+-------+-----+
|      1|23617|
|      0|23364|
+-------+-----+



In [107]:
resUp.count()

46981

In [129]:
resUp.show(5)

+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-------+
| ID|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_0|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|DEFAULT|
+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-------+
|  3|    90000|  2|        2|       2| 34|    0|    0|    0|    0|    0|    0|    29239|    14027|    13559|    14331|    14948|    15549|    1518|    1500|    1000|    1000|    1000|    5000|      0|
|  4|    50000|  2|        2|       1| 37|    0|    0|    0|    0|    0|    0|    46990|    48233|    49291|    28314|    28959|    29547|    2000|    2019|    1200|    1100|    1069|    1000|    

### Scaling

In [109]:
assembler1 = VectorAssembler(inputCols=['LIMIT_BAL','SEX','EDUCATION','MARRIAGE','AGE','PAY_0','PAY_2','PAY_3','PAY_4',
'PAY_5','PAY_6','BILL_AMT1','BILL_AMT2','BILL_AMT3','BILL_AMT4','BILL_AMT5','BILL_AMT6','PAY_AMT1','PAY_AMT1',
'PAY_AMT2','PAY_AMT3','PAY_AMT4','PAY_AMT5','PAY_AMT6'], outputCol="features")

new_featuredcredit = assembler1.transform(resUp)

new_scaler = StandardScaler(withMean=True, withStd=True, inputCol="features", outputCol="scaledFeatures")

# Compute summary statistics by fitting the StandardScaler
new_scalerModel = new_scaler.fit(new_featuredcredit)

# Normalize each feature to have unit standard deviation.
new_scaledcredit = new_scalerModel.transform(new_featuredcredit)
new_scaledcredit.select(["features", "scaledFeatures"]).show(5)

+--------------------+--------------------+
|            features|      scaledFeatures|
+--------------------+--------------------+
|[90000.0,2.0,2.0,...|[-0.5084569171350...|
|[50000.0,2.0,2.0,...|[-0.8253636123811...|
|[50000.0,1.0,2.0,...|[-0.8253636123811...|
|[50000.0,1.0,1.0,...|[-0.8253636123811...|
|[500000.0,1.0,1.0...|[2.73983670913821...|
+--------------------+--------------------+
only showing top 5 rows



In [110]:
rename1credit = new_scaledcredit.withColumnRenamed("DEFAULT", "label")
new_dataset = rename1credit.withColumn('features', rename1credit.scaledFeatures).select("features","label")
new_dataset.show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[-0.5084569171350...|    0|
|[-0.8253636123811...|    0|
|[-0.8253636123811...|    0|
|[-0.8253636123811...|    0|
|[2.73983670913821...|    0|
+--------------------+-----+
only showing top 5 rows



In [111]:
trainData, testData = new_dataset.randomSplit([0.8, 0.2], seed=12345)
print("Training Data Count: " + str(trainData.count()))
print("Test Data Count: " + str(testData.count()))

Training Data Count: 37486
Test Data Count: 9495


In [127]:
trainData.groupby('label').agg({'label': 'count'}).show()

+-----+------------+
|label|count(label)|
+-----+------------+
|    1|       18861|
|    0|       18625|
+-----+------------+



In [128]:
testData.groupby('label').agg({'label': 'count'}).show()

+-----+------------+
|label|count(label)|
+-----+------------+
|    1|        4756|
|    0|        4739|
+-----+------------+



### Logistic Regression

In [120]:
from time import *
start_time= time()
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=10)
lrModel = lr.fit(trainData)
trainaccuracy = trainingSummary.accuracy
print("Coefficients: %s" % str(lrModel.coefficients))
print("Intercept: %s" % str(lrModel.intercept))
trainingSummary = lrModel.summary
print("Training accuracy: ",trainaccuracy)


train_roc = trainingSummary.roc.toPandas()
#plt.plot(train_roc['FPR'],train_roc['TPR'])
#plt.ylabel('False Positive Rate')
#plt.xlabel('True Positive Rate')
#plt.title('ROC Curve')
#plt.show()
#trainingSummary.roc.show()

print('Training set areaUnderROC for Lr Model: ' + str(trainingSummary.areaUnderROC))

predictions = lrModel.transform(testData)
predictions.select('label', 'prediction', 'probability').show(5)


evaluator = lrModel.evaluate(testData)
bc_evaluator = BinaryClassificationEvaluator()
test_roc = bc_evaluator.evaluate(predictions)
print("Test areaUnderROC for Lr Model: ",test_roc)

testaccuracy = evaluator.accuracy
print("Test accuracy for Lr Model: ",testaccuracy)
end_time= time()
elapsed_time= end_time - start_time
print("total time to train_model =", elapsed_time)

Coefficients: [-0.18402566768206807,-0.045263511891231345,-0.0499133441399052,-0.061679278181662356,0.05645264902547603,0.832075589083554,0.08915184829920111,0.12051287618677695,0.09229375296997767,0.09394689555621283,0.1533515456879479,-0.01969513232957178,0.04237145302050489,0.019365917839077665,0.014667586243851134,-0.00821441302037695,-0.0025385876417177073,-0.06569602729814221,-0.06569602729814221,-0.13451367101327508,-0.03638774672882685,-0.05141082358634954,-0.04372935701121444,-0.014382689750708163]
Intercept: 0.14108793168512
Training accuracy:  0.701141759590247
Training set areaUnderROC for Lr Model: 0.7607091583819316
+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|    0|       0.0|[0.58333139055280...|
|    0|       0.0|[0.58561830672272...|
|    0|       1.0|[0.20043934776592...|
|    0|       1.0|[0.39504787453140...|
|    0|       0.0|[0.58201600120321...|
+-----+----------+--------------------+
on

### Decision tree

In [121]:
from time import *
start_time= time()
dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth = 3)
dtModel = dt.fit(trainData)

predictions = dtModel.transform(testData)
predictions.select('label', 'prediction', 'probability').show(5)

evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC for Dt Model: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))
test = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
testaccuracy = test.evaluate(predictions)
print("Test accuracy for Dt Model: ",testaccuracy )
end_time= time()
elapsed_time= end_time - start_time
print("total time to train_model =", elapsed_time)

+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|    0|       0.0|[0.57038961038961...|
|    0|       0.0|[0.57038961038961...|
|    0|       1.0|[0.39007960808328...|
|    0|       1.0|[0.39007960808328...|
|    0|       0.0|[0.57038961038961...|
+-----+----------+--------------------+
only showing top 5 rows

Test Area Under ROC for Dt Model: 0.6326805061023083
Test accuracy for Dt Model:  0.6934175882043181
total time to train_model = 6.606166124343872


### Random Forest Classifier

In [122]:
from time import *
start_time= time()
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')
rfModel = rf.fit(trainData)

predictions = rfModel.transform(testData)
predictions.select('label', 'prediction', 'probability').show(5)

evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC for RF Model: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))
test = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
testaccuracy = test.evaluate(predictions)
print("Test accuracy for RF Model: ",testaccuracy )
end_time= time()
elapsed_time= end_time - start_time
print("total time to train_model =", elapsed_time)

+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|    0|       0.0|[0.55245233091023...|
|    0|       0.0|[0.58397082491319...|
|    0|       1.0|[0.28273130902425...|
|    0|       1.0|[0.29257624088055...|
|    0|       0.0|[0.58397082491319...|
+-----+----------+--------------------+
only showing top 5 rows

Test Area Under ROC for RF Model: 0.7696167398238515
Test accuracy for RF Model:  0.7057398630858347
total time to train_model = 8.496789693832397


### Gradient-boosted tree classifier

In [123]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from time import *
start_time= time()
gbt = GBTClassifier(labelCol="label", featuresCol="features", maxIter=10)
gbtModel = gbt.fit(trainData)

predictions = gbtModel.transform(testData)
predictions.select('label', 'prediction', 'probability').show(5)

evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC for GBT Model: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))
test = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
testaccuracy = test.evaluate(predictions)
print("Test accuracy for GBT Model: ",testaccuracy )
end_time= time()
elapsed_time= end_time - start_time
print("total time to train_model =", elapsed_time)

+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|    0|       0.0|[0.51900917223155...|
|    0|       0.0|[0.52788728329705...|
|    0|       1.0|[0.19602352228908...|
|    0|       1.0|[0.21026664781771...|
|    0|       0.0|[0.53620297633958...|
+-----+----------+--------------------+
only showing top 5 rows

Test Area Under ROC for GBT Model: 0.7870400729696544
Test accuracy for GBT Model:  0.7080568720379147
total time to train_model = 13.787724494934082


### Model selection

In [124]:
## MOdel selection for random forest

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from time import *
start_time= time()

rf1 = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees= 8)
pipeline = Pipeline(stages=[rf1])
paramGrid = ParamGridBuilder()\
                .addGrid(rf1.maxDepth, [1, 2, 4, 5])\
                .addGrid(rf1.minInstancesPerNode, [1, 2, 4, 5])\
                .build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=5) 

cvModel = crossval.fit(trainData)
#evaluator.evaluate(cvModel.transform(trainData))

predictions1 = cvModel.transform(testData)
predictions1.select('label', 'prediction', 'probability').show(5)

bc_evaluator = BinaryClassificationEvaluator()
test_roc = bc_evaluator.evaluate(predictions1, {bc_evaluator.metricName: "areaUnderROC"})
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
testaccuracy = evaluator.evaluate(predictions1)
print("Test_roc for RF Model:",test_roc)
print("Test accuracy for RF Model: ",testaccuracy )
end_time= time()
elapsed_time= end_time - start_time
print("total time to train_model =", elapsed_time)

+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|    0|       0.0|[0.53971904483744...|
|    0|       0.0|[0.58221902640167...|
|    0|       1.0|[0.20564538962397...|
|    0|       1.0|[0.27223174106691...|
|    0|       0.0|[0.58221902640167...|
+-----+----------+--------------------+
only showing top 5 rows

Test_roc for RF Model: 0.7648114459566495
Test accuracy for RF Model:  0.7025803054239074
total time to train_model = 140.38608646392822


In [125]:
# Model selection for Logistic regression
from time import *
start_time= time()
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .build()

crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=5)  # use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(trainData)

predictions = cvModel.transform(testData)
predictions.select('label', 'prediction', 'probability').show(5)


bc_evaluator = BinaryClassificationEvaluator()
test_roc = bc_evaluator.evaluate(predictions, {bc_evaluator.metricName: "areaUnderROC"})
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
testaccuracy = evaluator.evaluate(predictions)
print("Test_roc For LR Model:",test_roc)
print("Test accuracy For LR Model: ",testaccuracy )
end_time= time()
elapsed_time= end_time - start_time
print("total time to train_model =", elapsed_time)

+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|    0|       0.0|[0.58164445982028...|
|    0|       0.0|[0.58417912501184...|
|    0|       1.0|[0.20150973215139...|
|    0|       1.0|[0.38431413820671...|
|    0|       0.0|[0.58093667101223...|
+-----+----------+--------------------+
only showing top 5 rows

Test_roc For LR Model: 0.7620161851508245
Test accuracy For LR Model:  0.6983675618746709
total time to train_model = 26.41662573814392
