# Supervised Machine Learning with PySpark

In [31]:
%matplotlib inline

# Filter warnings
import warnings
warnings.filterwarnings("ignore")

# Data manipulation
import pandas as pd
import numpy as np

# Data visualization
import matplotlib.pyplot as plt
import seaborn as sns

# Set font scale and style
plt.rcParams.update({'font.size': 15})

# Normal distribution from scipy
from scipy.stats import norm

# Pyspark modules
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark import sql, SparkContext, SparkConf
import pyspark.sql.functions as F
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import Imputer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import LinearSVC
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Pickle
import joblib

In [3]:
# Create spark session
spark = SparkSession.builder.appName("project").getOrCreate()

# Load data 

In [52]:
df = spark.read.csv('../data/hmeq.csv', inferSchema = True, header = True)  # load data

df = df.toDF(*[c.lower() for c in df.columns]) # column names in lower case

df.createOrReplaceTempView("raw_table") # create table for sql query

print("Data size:", (df.count(), len(df.columns))) # print data size

df.show() # display table

Data size: (5960, 13)
+---+----+--------+--------+-------+------+----+-----+------+------------+----+----+------------+
|bad|loan| mortdue|   value| reason|   job| yoj|derog|delinq|       clage|ninq|clno|     debtinc|
+---+----+--------+--------+-------+------+----+-----+------+------------+----+----+------------+
|  1|1100| 25860.0| 39025.0|HomeImp| Other|10.5|    0|     0|94.366666667|   1|   9|        null|
|  1|1300| 70053.0| 68400.0|HomeImp| Other| 7.0|    0|     2|121.83333333|   0|  14|        null|
|  1|1500| 13500.0| 16700.0|HomeImp| Other| 4.0|    0|     0|149.46666667|   1|  10|        null|
|  1|1500|    null|    null|   null|  null|null| null|  null|        null|null|null|        null|
|  0|1700| 97800.0|112000.0|HomeImp|Office| 3.0|    0|     0|93.333333333|   0|  14|        null|
|  1|1700| 30548.0| 40320.0|HomeImp| Other| 9.0|    0|     0|101.46600191|   1|   8|37.113613558|
|  1|1800| 48649.0| 57037.0|HomeImp| Other| 5.0|    3|     2|        77.1|   1|  17|        null

In [53]:
def df_to_numeric(df, cat_cols):
    """
    Convert numerical columns in a spark dataframe to float
    
    Paramters    
    ---------
    df: spark dataframe        
    cat_cols: list of categorical column names to be 
              removed before conversion
    Returns        
    --------        
    spark dataframe with numerical columns as float  
    """
    # Remove categorical column names        
    col_names = [x for x in df.columns if x not in cat_cols]
    for col_name in col_names:
        df = df.withColumn(col_name, df[col_name].cast(FloatType()))
    return df

In [55]:
# List of categorical columns
cat_cols = [field for (field, dataType) in df.dtypes if dataType == "string"]

# Convert numerical cols to float
df = df_to_numeric(df, cat_cols)

# Data types and missing values

The data shows that there are two categorical variables and some variables have missing values

In [5]:
df.printSchema()

root
 |-- bad: integer (nullable = true)
 |-- loan: integer (nullable = true)
 |-- mortdue: double (nullable = true)
 |-- value: double (nullable = true)
 |-- reason: string (nullable = true)
 |-- job: string (nullable = true)
 |-- yoj: double (nullable = true)
 |-- derog: integer (nullable = true)
 |-- delinq: integer (nullable = true)
 |-- clage: double (nullable = true)
 |-- ninq: integer (nullable = true)
 |-- clno: integer (nullable = true)
 |-- debtinc: double (nullable = true)



In [6]:
# count each column record
df.agg(*[F.count(c).alias(c) for c in df.columns]).show()

+----+----+-------+-----+------+----+----+-----+------+-----+----+----+-------+
| bad|loan|mortdue|value|reason| job| yoj|derog|delinq|clage|ninq|clno|debtinc|
+----+----+-------+-----+------+----+----+-----+------+-----+----+----+-------+
|5960|5960|   5442| 5848|  5708|5681|5445| 5252|  5380| 5652|5450|5738|   4693|
+----+----+-------+-----+------+----+----+-----+------+-----+----+----+-------+



In [7]:
# count each column missing values
df.agg(*(F.sum(F.col(c).isNull().cast("int"))\
            .alias(c) for c in df.columns)
         ).show()

+---+----+-------+-----+------+---+---+-----+------+-----+----+----+-------+
|bad|loan|mortdue|value|reason|job|yoj|derog|delinq|clage|ninq|clno|debtinc|
+---+----+-------+-----+------+---+---+-----+------+-----+----+----+-------+
|  0|   0|    518|  112|   252|279|515|  708|   580|  308| 510| 222|   1267|
+---+----+-------+-----+------+---+---+-----+------+-----+----+----+-------+



# Impute missing categorical values
All missing values in the categorical columns will be given another level called 'missing'

In [49]:
# Dictionary of labels for missing vlaues in cat_cols
dict_missing = {cat_col: 'missing' for cat_col in cat_cols}

# Fill missing values in cat_cols
df = df.fillna(dict_missing)

# Class distribution

In [8]:
query = """ 
select 
    bad, 
    count(*) as count,
    count(*)*100/(select count(*) from raw_table) as percentage
from raw_table
group by 1
"""
spark.sql(query).show()

+---+-----+------------------+
|bad|count|        percentage|
+---+-----+------------------+
|  1| 1189|19.949664429530202|
|  0| 4771|  80.0503355704698|
+---+-----+------------------+



# Create a test set
We split the data into $80 \%$ training set and $20 \%$ test set

In [56]:
trainDF, testDF = df.randomSplit([.8, .2], seed=42)

In [57]:
print('Training set size:', (trainDF.count(), len(trainDF.columns)))
print('Test set size:', (testDF.count(), len(testDF.columns)))

Training set size: (4817, 13)
Test set size: (1143, 13)


# Transformation pipelines

In [58]:
cat_cols = [field for (field, dataType) in trainDF.dtypes if dataType == "string"]
num_cols = [field for (field, dataType) in trainDF.dtypes if ((dataType == "double") \
                                                              & (field != "bad"))]

index_output_cols = [x + "index" for x in cat_cols]
ohe_output_cols = [x + "ohe" for x in cat_cols]
num_imputed_output_cols = [x + "imputed" for x in num_cols]

s_indexer = StringIndexer(inputCols = cat_cols, 
                          outputCols = index_output_cols, 
                          handleInvalid="skip")

cat_encoder = OneHotEncoder(inputCols = index_output_cols,
                           outputCols = ohe_output_cols)

num_imputer = Imputer(inputCols = num_cols, 
                      outputCols = num_imputed_output_cols)

assembler_inputs = ohe_output_cols + num_imputed_output_cols

assembler = VectorAssembler(inputCols = assembler_inputs, outputCol = "unscaled_features")
scaler = StandardScaler(inputCol = 'unscaled_features', outputCol = "features")

# 1. Logistic Regression

In [73]:
# Train a logistic regression model.
lr = LogisticRegression(labelCol = "bad", featuresCol = "features",
                       maxIter=10, regParam=0.3, elasticNetParam=0.8, 
                        family="multinomial")

# Create Pipeline
pipeline = Pipeline(stages = [s_indexer, cat_encoder, num_imputer, assembler, scaler, lr])

# Fit the training set 
lr_model = pipeline.fit(trainDF)

# prediction on the training set
pred_lr = lr_model.transform(trainDF)

In [76]:
# Select example rows to display
pred_lr.select("features", "bad", "prediction").show(10)

+--------------------+---+----------+
|            features|bad|prediction|
+--------------------+---+----------+
|(6,[3],[2.6715770...|0.0|       0.0|
|(6,[3],[2.6715770...|0.0|       0.0|
|(6,[5],[5.5380902...|0.0|       0.0|
|(6,[2],[2.3845409...|0.0|       0.0|
|(6,[3],[2.6715770...|0.0|       0.0|
|(6,[4],[2.9430833...|0.0|       0.0|
|(6,[3],[2.6715770...|0.0|       0.0|
|(6,[4],[2.9430833...|0.0|       0.0|
|(6,[3],[2.6715770...|0.0|       0.0|
|(6,[4],[2.9430833...|0.0|       0.0|
+--------------------+---+----------+
only showing top 10 rows



In [68]:
# lr_trained_model = lr_model.stages[-1] 

In [91]:
evaluator = MulticlassClassificationEvaluator( \
                  labelCol='bad', \
                  predictionCol="prediction", \
                  metricName="accuracy")

lr_accuracy = evaluator.evaluate(pred_lr)
lr_prc = evaluator.evaluate(pred_lr, {evaluator.metricName: "weightedPrecision"})
lr_recall = evaluator.evaluate(pred_lr, {evaluator.metricName: "weightedRecall"})
lr_f1 = evaluator.evaluate(pred_lr, {evaluator.metricName: "f1"})

print("Metric for Logistic Regression")
print('-'*40)
print("Accuracy: %.3f" % lr_accuracy)
print("Weighted Precision: %.3f" % lr_prc)
print("Weighted Recall: %.3f" % lr_recall)
print("F1-Score: %.3f" %lr_f1)

Metric for Logistic Regression
----------------------------------------
Accuracy: 0.796
Weighted Precision: 0.633
Weighted Recall: 0.796
F1-Score: 0.705
