# Spark practice - Credit Data

## Introduction

The purpose of this notebook is to practice the basics of transforming data and machine learning in PySpark. I implement a simple pipeline for transforming the data and train and evaluate a logistic regression, decision tree, and random forest model.

The data used is the [default of credit card clients data set](https://archive.ics.uci.edu/ml/datasets/default+of+credit+card+clients) 
by Yeh, I. C., & Lien, C. H. (2009) on payment defaults of Taiwenese customers, downloaded from the UCI Machine Learning Laboratory. This dataset has 30,000 observations for 23 features plus two columns with ID and the label for the default status.

The label and the features are as follows:

* DEFAULT: Default on payment (Yes = 1; No = 0)
* LIMIT_BAL: Credit amount in New Taiwan dollar
* SEX: Gender (1 = male; 2 = female). 
* EDUCATION: Education status (1 = graduate school; 2 = university; 3 = high school; 4 = others)
* MARRIAGE: Marital status (1 = married; 2 = single; 3 = others)
* AGE: Age in years
* PAY_0 to PAY_6: Monthly payment records of the past 6 months (-1 = pay duly; 1 = payment delay for one month; 2 = payment delay for two months; ...; 8 = payment delay for eight months; 9 = payment delay for nine months and above)
* BILL_AMT1 to BILL_AMT6: Amount of bill statement for the past 6 months
* PAY_AMT1 to PAY_AMT6: Amount of previous monthly payments in New Taiwan Dollar for the past 6 months

## Initialize Spark and import required libraries

In [38]:
# "Find" PySpark and start a Spark session
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [2]:
# Import required packages
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.sql.functions import *

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [26]:
# maybe unneccessary packages - TEST
from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

import pyspark

## Load and explore data

The data is provided in an Excel file and read into a dataframe. Before reading the data, this file was converted to a csv file and the first line with generic column names was deleted.

In [3]:
# Read csv file
data = spark.read.csv("data/default of credit card clients.csv", sep = ";", header = True, inferSchema = True)

# Give the label column a shorter name
data = data.withColumnRenamed("default payment next month", "DEFAULT")

print("Number of observations:", data.count())
print("Number of columns:", len(data.schema.names))
print("Column names:", data.schema.names)

Number of observations: 30000
Number of columns: 25
Column names: ['ID', 'LIMIT_BAL', 'SEX', 'EDUCATION', 'MARRIAGE', 'AGE', 'PAY_0', 'PAY_2', 'PAY_3', 'PAY_4', 'PAY_5', 'PAY_6', 'BILL_AMT1', 'BILL_AMT2', 'BILL_AMT3', 'BILL_AMT4', 'BILL_AMT5', 'BILL_AMT6', 'PAY_AMT1', 'PAY_AMT2', 'PAY_AMT3', 'PAY_AMT4', 'PAY_AMT5', 'PAY_AMT6', 'DEFAULT']


In [4]:
print(data.select("ID", "LIMIT_BAL", "SEX", "EDUCATION", "MARRIAGE", "AGE", "PAY_0", "PAY_0", 
                  "BILL_AMT1", "BILL_AMT2", "DEFAULT").show(5))

+---+---------+---+---------+--------+---+-----+-----+---------+---------+-------+
| ID|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_0|PAY_0|BILL_AMT1|BILL_AMT2|DEFAULT|
+---+---------+---+---------+--------+---+-----+-----+---------+---------+-------+
|  1|    20000|  2|        2|       1| 24|    2|    2|     3913|     3102|      1|
|  2|   120000|  2|        2|       2| 26|   -1|   -1|     2682|     1725|      1|
|  3|    90000|  2|        2|       2| 34|    0|    0|    29239|    14027|      0|
|  4|    50000|  2|        2|       1| 37|    0|    0|    46990|    48233|      0|
|  5|    50000|  1|        2|       1| 57|   -1|   -1|     8617|     5670|      0|
+---+---------+---+---------+--------+---+-----+-----+---------+---------+-------+
only showing top 5 rows

None


The data is imbalanced: Only 22% of all customers defaulted, i.e. have a label of 1:

In [5]:
print("Default == 1:", data.filter(data.DEFAULT == 1).count())
print("Default == 0:", data.filter(data.DEFAULT == 0).count())

Default == 1: 6636
Default == 0: 23364


In [6]:
# Same query with Spark SQL: 
data.createOrReplaceTempView("view1")
print(spark.sql("SELECT DEFAULT, count(DEFAULT) from view1 group by DEFAULT").show())

+-------+--------------+
|DEFAULT|count(DEFAULT)|
+-------+--------------+
|      1|          6636|
|      0|         23364|
+-------+--------------+

None


## Data preparation pipeline

The data is first split into a training and a test sample. The pipeline essentially just converts all categorical features as such and normalizes all numerical features. 

The features PAY_0 to PAY_6 are not strictly numerical and, for a proper analysis, should probably be factorized as well. As this notebook is just for practicing, that doesn't really matter though.

In [7]:
# Drop ID column
data = data.select(data.schema.names[1:])

# Split data into training and test sample
splits = data.randomSplit([0.75, 0.25])
data_train = splits[0]
data_test = splits[1]

# Get and convert categorical features (SEX, EDUCATION, MARRIAGE)
categorical_features = data.schema.names[1:4]
catVect = VectorAssembler(inputCols = categorical_features, outputCol = "catFeatures")
catIdx = VectorIndexer(inputCol = catVect.getOutputCol(), outputCol = "idxCatFeatures")

# Get and normalize numerical features
numerical_features = data.schema.names[0:1] + data.schema.names[4:]
numVect = VectorAssembler(inputCols = numerical_features, outputCol = "numFeatures")
minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), outputCol = "normFeatures")

# Define pipeline 
featVect = VectorAssembler(inputCols=["idxCatFeatures", "normFeatures"], outputCol = "features")
pipeline = Pipeline(stages = [catVect, catIdx, numVect, minMax, featVect])
pipeline_object = pipeline.fit(data_train)

# Run training and test data through the pipeline
data_train = pipeline_object.transform(data_train).select("features", col("DEFAULT").alias("label"))
data_test = pipeline_object.transform(data_test).select("features", col("DEFAULT").alias("label")) 

In [8]:
print(data_train.show(5))
print("Number of observations in training / test data:", data_train.count(), "/", data_test.count())

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[0.0,1.0,1.0,0.0,...|    1|
|(24,[1,2,4,5,6,7,...|    1|
|[0.0,1.0,1.0,0.0,...|    0|
|[0.0,1.0,1.0,0.0,...|    0|
|[0.0,1.0,1.0,0.0,...|    0|
+--------------------+-----+
only showing top 5 rows

None
Number of observations in training / test data: 22668 / 7332


## Train and evaluate classification models

### Define evaluation metrics functions

In [25]:
accuracy = MulticlassClassificationEvaluator(
    labelCol = "label", predictionCol = "prediction", metricName = "accuracy")
precision = MulticlassClassificationEvaluator(
    labelCol = "label", predictionCol = "prediction", metricName = "weightedPrecision")
recall = MulticlassClassificationEvaluator(
    labelCol = "label", predictionCol = "prediction", metricName = "weightedRecall")

### Logistic regression

https://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html#pyspark.ml.classification.LogisticRegression

In [78]:
logit = LogisticRegression(labelCol = "label", featuresCol = "features", maxIter = 20, regParam = 0.2)
model = logit.fit(data_train)
predictions_df = model.transform(data_test)

print("Accuracy: {:.4}".format(accuracy.evaluate(predictions_df)))
print("Weighted precision: {:.4}".format(precision.evaluate(predictions_df)))
print(" Weighted recall: {:.4}".format(recall.evaluate(predictions_df)))

Accuracy: 0.9857
Weighted precision: 0.9859
 Weighted recall: 0.9857


### Decision tree

https://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html#pyspark.ml.classification.DecisionTreeClassifier

In [79]:
tree = DecisionTreeClassifier(labelCol = "label", featuresCol = "features", maxDepth = 4, maxBins = 32, 
                              minInstancesPerNode = 1, minInfoGain = 0.0, impurity = "gini", seed = 123)
model = tree.fit(data_train)
predictions_df = model.transform(data_test)

print("Accuracy: {:.4}".format(accuracy.evaluate(predictions_df)))
print("Weighted precision: {:.4}".format(precision.evaluate(predictions_df)))
print("Weighted recall: {:.4}".format(recall.evaluate(predictions_df)))
# always perfect scores - something doesn't seem right

Accuracy: 1.0
Weighted precision: 1.0
Weighted recall: 1.0


### Random forest

https://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html#pyspark.ml.classification.RandomForestClassifier

In [80]:
rf = RandomForestClassifier(labelCol = "label", featuresCol = "features", maxDepth = 4, maxBins = 32, 
                            minInstancesPerNode = 1, minInfoGain=0.0, impurity = "gini", numTrees = 10, seed = 123) 
model = rf.fit(data_train)
predictions_df = model.transform(data_test)

print("Accuracy: {:.4}".format(accuracy.evaluate(predictions_df)))
print("Weighted precision: {:.4}".format(precision.evaluate(predictions_df)))
print("Weighted recall: {:.4}".format(recall.evaluate(predictions_df)))

Accuracy: 0.9208
Weighted precision: 0.9281
Weighted recall: 0.9208


### Calculating (unweighted) metrics manually

In [81]:
tp = int(predictions_df.filter("prediction == 1.0 AND label == 1").count())
fp = int(predictions_df.filter("prediction == 1.0 AND label == 0").count())
tn = int(predictions_df.filter("prediction == 0.0 AND label == 0").count())
fn = int(predictions_df.filter("prediction == 0.0 AND label == 1").count())

print("true positives:", tp)
print("false positives:", fp)
print("true negatives:", tn)
print("false negatives:", fn)

print("Accuracy: {:.4}".format((tp+tn)/(tp+fp+tn+fn)))
print("Precision: {:.4}".format((tp)/(tp+fp)))
print("Recall: {:.4}".format((tp)/(tp+fn)))

true positives: 1043
false positives: 0
true negatives: 5708
false negatives: 581
Accuracy: 0.9208
Precision: 1.0
Recall: 0.6422
