<center>
    <h1> BL-INFO-I590 - Data Science On Ramp </h1>
    <h1> Machine Learning With Spark, Fall 2018 </h1>
    <h1> Final Project </h1>
<center>

<center>
  <h1> DHIVYA SWAMINATHAN </h1>
  <h1> <i> UID : 2000434729 </i> </h1>
</center>

In [1]:
# importing necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import numpy as np
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, MinMaxScaler, VectorAssembler, Normalizer
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col, avg, count, monotonically_increasing_id, sqrt, mean as _mean, stddev as _stddev, explode, udf
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler

## Data Loading

In [2]:
# Loading data into a spark dataframe
spark = SparkSession.builder.getOrCreate()

path = "/Users/dhivyaslh/Documents/IUB/FALL 2018/Data Science on Ramp/ML with Spark/Project/"
file = "default of credit card clients1.csv"

input_txt = file

columns = ['ID','Credit Limit', 'Gender', 'Education','Marital status', 'Age', 
           'Repayment status - September','Repayment status - August','Repayment status - July', 'Repayment status - June','Repayment status - May','Repayment status - April',
           'Amount of bill statement - September','Amount of bill statement - August','Amount of bill statement - July', 'Amount of bill statement - June','Amount of bill statement - May','Amount of bill statement - April',
           'Amount of previous payment - September','Amount of previous payment - August','Amount of previous payment - July', 'Amount of previous payment - June','Amount of previous payment - May','Amount of previous payment - April',
           'Potential Defaulter']

data = spark.read.load(input_txt, format = "csv")
data = data.toDF(*columns)
data.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Credit Limit: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Education: string (nullable = true)
 |-- Marital status: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Repayment status - September: string (nullable = true)
 |-- Repayment status - August: string (nullable = true)
 |-- Repayment status - July: string (nullable = true)
 |-- Repayment status - June: string (nullable = true)
 |-- Repayment status - May: string (nullable = true)
 |-- Repayment status - April: string (nullable = true)
 |-- Amount of bill statement - September: string (nullable = true)
 |-- Amount of bill statement - August: string (nullable = true)
 |-- Amount of bill statement - July: string (nullable = true)
 |-- Amount of bill statement - June: string (nullable = true)
 |-- Amount of bill statement - May: string (nullable = true)
 |-- Amount of bill statement - April: string (nullable = true)
 |-- Amount of previous payment - S

## Data Transformation

In [3]:
stringCols = ['Credit Limit', 'Gender', 'Education','Marital status', 'Age', 
              'Repayment status - September','Repayment status - August','Repayment status - July', 
              'Repayment status - June','Repayment status - May','Repayment status - April']

doubleCols = list(set(data.columns) - set(stringCols) - {'ID','Potential Defaulter'})

In [4]:
# cleaning data
for column in data.schema.names:
    if column not in stringCols:
        data = data.withColumn(column, data[column].cast('double'))
data = data.withColumnRenamed("_c0", "Year")
data.printSchema()

root
 |-- ID: double (nullable = true)
 |-- Credit Limit: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Education: string (nullable = true)
 |-- Marital status: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Repayment status - September: string (nullable = true)
 |-- Repayment status - August: string (nullable = true)
 |-- Repayment status - July: string (nullable = true)
 |-- Repayment status - June: string (nullable = true)
 |-- Repayment status - May: string (nullable = true)
 |-- Repayment status - April: string (nullable = true)
 |-- Amount of bill statement - September: double (nullable = true)
 |-- Amount of bill statement - August: double (nullable = true)
 |-- Amount of bill statement - July: double (nullable = true)
 |-- Amount of bill statement - June: double (nullable = true)
 |-- Amount of bill statement - May: double (nullable = true)
 |-- Amount of bill statement - April: double (nullable = true)
 |-- Amount of previous payment - S

In [5]:
# Dropping null values
data = data.na.drop(subset=data.columns)
data.show()

+----+------------+------+---------+--------------+---+----------------------------+-------------------------+-----------------------+-----------------------+----------------------+------------------------+------------------------------------+---------------------------------+-------------------------------+-------------------------------+------------------------------+--------------------------------+--------------------------------------+-----------------------------------+---------------------------------+---------------------------------+--------------------------------+----------------------------------+-------------------+
|  ID|Credit Limit|Gender|Education|Marital status|Age|Repayment status - September|Repayment status - August|Repayment status - July|Repayment status - June|Repayment status - May|Repayment status - April|Amount of bill statement - September|Amount of bill statement - August|Amount of bill statement - July|Amount of bill statement - June|Amount of bill stateme

In [6]:
# Tranforming categorical variables using one hot encoding techniques and using a vector assembler to  transform data
stages = []

for categoricalCol in stringCols:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]
label_stringIdx = StringIndexer(inputCol = 'Potential Defaulter', outputCol = 'label')
stages += [label_stringIdx]

assemblerInputs = [c + "classVec" for c in stringCols] + doubleCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [7]:
cols = data.columns
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(data)
cleanedData = pipelineModel.transform(data)
selectedCols = ['label', 'features'] + cols
cleanedData = cleanedData.select(selectedCols)
cleanedData = cleanedData.drop('ID')
cleanedData.printSchema()

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- Credit Limit: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Education: string (nullable = true)
 |-- Marital status: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Repayment status - September: string (nullable = true)
 |-- Repayment status - August: string (nullable = true)
 |-- Repayment status - July: string (nullable = true)
 |-- Repayment status - June: string (nullable = true)
 |-- Repayment status - May: string (nullable = true)
 |-- Repayment status - April: string (nullable = true)
 |-- Amount of bill statement - September: double (nullable = true)
 |-- Amount of bill statement - August: double (nullable = true)
 |-- Amount of bill statement - July: double (nullable = true)
 |-- Amount of bill statement - June: double (nullable = true)
 |-- Amount of bill statement - May: double (nullable = true)
 |-- Amount of bill statement - April: double (nullable

In [8]:
# Splitting dataset into train, validation and test
train, Val, test = cleanedData.randomSplit([0.7, 0.1, 0.2], seed=12345)
print("Training Dataset Count: " + str(train.count()))
print("Validation Dataset Count: " + str(Val.count()))
print("Test Dataset Count: " + str(test.count()))

Training Dataset Count: 20827
Validation Dataset Count: 3034
Test Dataset Count: 6139


## Model Learning and Evaluation

### LOGISTIC REGRESSION

In [9]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=10)
lrModel = lr.fit(train)

In [10]:
predictions = lrModel.transform(Val)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()
print('Logistic Regression Validation Accuracy', evaluator.evaluate(predictions))

Logistic Regression Validation Accuracy 0.7525270025626745


In [11]:
predictions = lrModel.transform(test)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()
print('Logistic Regression Test Accuracy', evaluator.evaluate(predictions))

Logistic Regression Test Accuracy 0.764108009525811


### DECISION TREE CLASSIFIER

In [12]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth = 3)
dtModel = dt.fit(train)
predictions = dtModel.transform(Val)
evaluator = BinaryClassificationEvaluator()
print("Decision Tree Classifier Validation Accuracy: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

Decision Tree Classifier Validation Accuracy: 0.3462319326651485


In [13]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth = 3)
dtModel = dt.fit(train)
predictions = dtModel.transform(test)
evaluator = BinaryClassificationEvaluator()
print("Decision Tree Classifier Test Accuracy: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

Decision Tree Classifier Test Accuracy: 0.3290220962831218


### RANDOM FOREST CLASSIFIER

In [14]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')
rfModel = rf.fit(train)
predictions = rfModel.transform(Val)
evaluator = BinaryClassificationEvaluator()
print("Random Forest Classifier Validation Accuracy: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

Random Forest Classifier Validation Accuracy: 0.7414295005660864


In [15]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')
rfModel = rf.fit(train)
predictions = rfModel.transform(test)
evaluator = BinaryClassificationEvaluator()
print("Random Forest Classifier Test Accuracy: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

Random Forest Classifier Test Accuracy: 0.7566521719470646


### GRADIENT BOOSTING CLASSIFIER

In [16]:
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(maxIter=10)
gbtModel = gbt.fit(train)
predictions = gbtModel.transform(Val)
evaluator = BinaryClassificationEvaluator()
print("Gradient Boosting Classifier Validation Accuracy: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

Gradient Boosting Classifier Validation Accuracy: 0.7494655694549698


In [17]:
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(maxIter=10)
gbtModel = gbt.fit(train)
predictions = gbtModel.transform(test)
evaluator = BinaryClassificationEvaluator()
print("Gradient Boosting Classifier Test Accuracy: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

Gradient Boosting Classifier Test Accuracy: 0.7715802761281753


## Out of the classifiers tried, Gradient Boosting Classifier gives the best accuracy of 77.2%