# Creating Classification Model in Spark ML

In this notebook, we will utilize Spark's Machine Learning library to build a logistic regression model that helps to predict default likelihood using the Lending Club loan data.

In [1]:
import sys
print(sys.version)

3.6.4 |Anaconda custom (64-bit)| (default, Mar 13 2018, 01:15:57) 
[GCC 7.2.0]


In [2]:
print(spark.version)

2.2.0-cdh6.0.0


In [3]:
import pandas as pd
import numpy as np

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
# from pyspark.ml.feature import OneHotEncoderEstimator
# OneHotEncoderEstimator is available starting from Spark 2.3
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel
from pyspark.ml.evaluation import BinaryClassificationEvaluator

from pyspark.sql.functions import *
from pyspark.sql.types import *

###  Step 1.  Read from CSV into Spark DF

In [4]:
!ls -l '/project/msca/kadochnikov/data/spark'

total 8480
-rwxr-xr-x 1 kadochnikov kadochnikov 3974305 Apr 26  2018 adult_data.csv
-rwxr-xr-x 1 kadochnikov kadochnikov 2003153 Apr 26  2018 adult_test.csv
-rwxr-xr-x 1 kadochnikov kadochnikov    3861 Apr 26  2018 Advertising.csv
-rwxr-xr-x 1 kadochnikov kadochnikov   26585 Apr 26  2018 Credit.csv
-rwxr-xr-x 1 kadochnikov kadochnikov 2484483 Apr 26  2018 loan_sub_new.csv
drwxr-xr-x 3 kadochnikov kadochnikov     512 Apr 30  2018 models
-rwxr-xr-x 1 kadochnikov kadochnikov   84143 Apr 26  2018 WineData.csv


In [5]:
#Pull the dataset from Linux directory
df10 = spark.read.csv('file:///project/msca/kadochnikov/data/spark/loan_sub_new.csv', header=True, inferSchema=True)
df10.cache()
df10.limit(100).toPandas().head(5)

Unnamed: 0,ID,MEMBER_ID,LOAN_STATUS,LOAN_AMNT,TERM,INT_RATE,VERIFICATION_STATUS,PURPOSE,POLICY_CODE,APPLICATION_TYPE,...,DELINQ_2YRS,MTHS_SINCE_LAST_DELINQ,TOTAL_ACC,INQ_LAST_6MTHS,MTHS_SINCE_LAST_RECORD,OPEN_ACC,PUB_REC,COLLECTIONS_12_MTHS_EX_MED,ACC_NOW_DELINQ,DEFAULT
0,1068545,1303147,Fully Paid,7000,36 months,11.71,Not Verified,debt_consolidation,1,INDIVIDUAL,...,0,26,26,0,33,8,1,0,0,non-default
1,1062177,1294027,Fully Paid,15000,36 months,17.27,Source Verified,59.83,1,INDIVIDUAL,...,1,18,16,0,93,6,1,0,0,non-default
2,1066424,1291243,Fully Paid,5500,36 months,7.9,Source Verified,car,1,INDIVIDUAL,...,0,35,23,0,52,10,1,0,0,non-default
3,1064908,1298959,Fully Paid,9000,36 months,14.65,Source Verified,debt_consolidation,1,INDIVIDUAL,...,1,20,51,1,85,21,1,0,0,non-default
4,1064623,1298440,Fully Paid,5600,36 months,10.65,Not Verified,debt_consolidation,1,INDIVIDUAL,...,1,4,25,0,90,11,1,0,0,non-default


### Step 2 - Run basic Exploratory Data Analysis

In [6]:
#Lowcase column names
df20 = df10.toDF(*[c.lower() for c in df10.columns])

In [7]:
df20.printSchema()

root
 |-- id: integer (nullable = true)
 |-- member_id: integer (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- loan_amnt: integer (nullable = true)
 |-- term: string (nullable = true)
 |-- int_rate: double (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- purpose: string (nullable = true)
 |-- policy_code: integer (nullable = true)
 |-- application_type: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: double (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- dti: double (nullable = true)
 |-- delinq_2yrs: integer (nullable = true)
 |-- mths_since_last_delinq: integer (nullable = true)
 |-- total_acc: integer (nullable = true)
 |-- inq_last_6mths: integer (nullable = true)
 |-- mths_since_last_record: integer (nullable = true)
 |-- open_acc: integer (nullable = true)
 |-- pub_rec: integer (nullable = true)
 |-- collections_12_mths_ex_med: integer (nullable = true)
 |-- acc_now_delinq: intege

In [8]:
df20.describe(['loan_amnt']).show()

+-------+------------------+
|summary|         loan_amnt|
+-------+------------------+
|  count|             16733|
|   mean|11754.677882029522|
| stddev| 7157.199683129913|
|    min|               500|
|    max|             35000|
+-------+------------------+



In [9]:
pd.set_option('max_rows',10)
pd.set_option('max_columns',100)

df20.limit(5).toPandas().head()

Unnamed: 0,id,member_id,loan_status,loan_amnt,term,int_rate,verification_status,purpose,policy_code,application_type,home_ownership,annual_inc,addr_state,dti,delinq_2yrs,mths_since_last_delinq,total_acc,inq_last_6mths,mths_since_last_record,open_acc,pub_rec,collections_12_mths_ex_med,acc_now_delinq,default
0,1068545,1303147,Fully Paid,7000,36 months,11.71,Not Verified,debt_consolidation,1,INDIVIDUAL,OWN,39120.0,FL,21.01,0,26,26,0,33,8,1,0,0,non-default
1,1062177,1294027,Fully Paid,15000,36 months,17.27,Source Verified,59.83,1,INDIVIDUAL,MORTGAGE,44400.0,FL,3.59,1,18,16,0,93,6,1,0,0,non-default
2,1066424,1291243,Fully Paid,5500,36 months,7.9,Source Verified,car,1,INDIVIDUAL,OWN,59000.0,PA,6.65,0,35,23,0,52,10,1,0,0,non-default
3,1064908,1298959,Fully Paid,9000,36 months,14.65,Source Verified,debt_consolidation,1,INDIVIDUAL,RENT,45000.0,NY,17.01,1,20,51,1,85,21,1,0,0,non-default
4,1064623,1298440,Fully Paid,5600,36 months,10.65,Not Verified,debt_consolidation,1,INDIVIDUAL,RENT,60000.0,CA,16.36,1,4,25,0,90,11,1,0,0,non-default


### Step 2: Preprocess Data

#### 2.1. Convert categorical variables into dummy variables

Since we are going to try algorithms like Logistic Regression, we will have to convert the categorical variables in the dataset into numeric variables. We will use One-Hot Encoding to convert categories into binary vectors with at most one nonzero value (eg: (Blue: [1, 0]), (Green: [0, 1]), (Red: [0, 0])).

In [10]:
categorical_all = [item[0] for item in df20.dtypes if item[1].startswith('string')]
categorical_exclude = ['loan_status', 'application_type', 'default']
categoricalColumns = [c for c in categorical_all if c not in categorical_exclude]
categoricalColumns

['term', 'verification_status', 'purpose', 'home_ownership', 'addr_state']

One-hot encoding maps a column of label indices to a column of binary vectors, with at most a single one-value. This encoding allows algorithms which expect continuous features, such as Logistic Regression, to use categorical features.  
https://spark.apache.org/docs/2.2.0/ml-features.html#onehotencoder

In [11]:
###One-Hot Encoding

stages = [] # stages in our Pipeline
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol+"Index")
    encoder = OneHotEncoder(inputCol=categoricalCol+"Index", outputCol=categoricalCol+"classVec")
    stages += [stringIndexer, encoder] 

Spark MLlib represents ML workflow as a Pipeline, which consists of a sequence of PipelineStages (Transformers and Estimators) to be run in a specific order.  They are specified as an ordered array and work as Directed Acyclic Graph (DAG)  
https://spark.apache.org/docs/2.2.0/ml-pipeline.html

In [12]:
stages

[StringIndexer_428db9e4d9b18d1a6a6a,
 OneHotEncoder_4ee8bc75fbb3e067786e,
 StringIndexer_4fa095dde910e42d3946,
 OneHotEncoder_46eeaebf7ca685257901,
 StringIndexer_4b519a30654166ed4454,
 OneHotEncoder_4084ad7a4a86696c54b2,
 StringIndexer_46929b84f28c4fd824d6,
 OneHotEncoder_4206b4d52ed11745a173,
 StringIndexer_4b2889ad12e8a50aeb7d,
 OneHotEncoder_4d948859f9ab929dcc68]

#### 2.2 Convert target variable into label indices
StringIndexer encodes a string column of labels to a column of label indices. The indices are in [0, numLabels), ordered by label frequencies, so the most frequent label gets index 0  
https://spark.apache.org/docs/2.2.0/ml-features.html#stringindexer

In [13]:
# Convert label into label indices using the StringIndexer

label_stringIdx = StringIndexer(inputCol = "default", outputCol = "label")
stages += [label_stringIdx]

In [14]:
stages

[StringIndexer_428db9e4d9b18d1a6a6a,
 OneHotEncoder_4ee8bc75fbb3e067786e,
 StringIndexer_4fa095dde910e42d3946,
 OneHotEncoder_46eeaebf7ca685257901,
 StringIndexer_4b519a30654166ed4454,
 OneHotEncoder_4084ad7a4a86696c54b2,
 StringIndexer_46929b84f28c4fd824d6,
 OneHotEncoder_4206b4d52ed11745a173,
 StringIndexer_4b2889ad12e8a50aeb7d,
 OneHotEncoder_4d948859f9ab929dcc68,
 StringIndexer_4cd7ad963f7464c078ce]

#### 2.3 Transform all features into a vector using VectorAssembler
VectorAssembler is a transformer that combines a given list of columns into a single vector column. It is useful for combining raw features and features generated by different feature transformers into a single feature vector, in order to train ML models like logistic regression and decision trees.  

In [15]:
numeric_all = [item[0] for item in df20.dtypes if not item[1].startswith('string')]
numeric_exclude = ['id', 'member_id', 'default', 'policy_code']
numericCols = [c for c in numeric_all if c not in numeric_exclude]
numericCols

['loan_amnt',
 'int_rate',
 'annual_inc',
 'dti',
 'delinq_2yrs',
 'mths_since_last_delinq',
 'total_acc',
 'inq_last_6mths',
 'mths_since_last_record',
 'open_acc',
 'pub_rec',
 'collections_12_mths_ex_med',
 'acc_now_delinq']

In simple terms: we are combining the values from all predictors / variables into a single variable called "features"
https://spark.apache.org/docs/2.2.0/ml-features.html#vectorassembler

In [16]:
assemblerInputs = list(map(lambda c: c + "classVec", categoricalColumns)) + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [17]:
stages

[StringIndexer_428db9e4d9b18d1a6a6a,
 OneHotEncoder_4ee8bc75fbb3e067786e,
 StringIndexer_4fa095dde910e42d3946,
 OneHotEncoder_46eeaebf7ca685257901,
 StringIndexer_4b519a30654166ed4454,
 OneHotEncoder_4084ad7a4a86696c54b2,
 StringIndexer_46929b84f28c4fd824d6,
 OneHotEncoder_4206b4d52ed11745a173,
 StringIndexer_4b2889ad12e8a50aeb7d,
 OneHotEncoder_4d948859f9ab929dcc68,
 StringIndexer_4cd7ad963f7464c078ce,
 VectorAssembler_4381af31fcdf46de7fea]

#### 2.4 Run Pipeline

In [18]:
# Create a Pipeline.
pipeline = Pipeline(stages=stages)

# Run the feature transformations.
#  - fit() computes feature statistics as needed.
pipelineModel = pipeline.fit(df20)

#  - transform() actually transforms the features.
df30 = pipelineModel.transform(df20)

In [19]:
p_df30 = df30.limit(10).toPandas()
p_df30.head()

Unnamed: 0,id,member_id,loan_status,loan_amnt,term,int_rate,verification_status,purpose,policy_code,application_type,home_ownership,annual_inc,addr_state,dti,delinq_2yrs,mths_since_last_delinq,total_acc,inq_last_6mths,mths_since_last_record,open_acc,pub_rec,collections_12_mths_ex_med,acc_now_delinq,default,termIndex,termclassVec,verification_statusIndex,verification_statusclassVec,purposeIndex,purposeclassVec,home_ownershipIndex,home_ownershipclassVec,addr_stateIndex,addr_stateclassVec,label,features
0,1068545,1303147,Fully Paid,7000,36 months,11.71,Not Verified,debt_consolidation,1,INDIVIDUAL,OWN,39120.0,FL,21.01,0,26,26,0,33,8,1,0,0,non-default,0.0,(1.0),2.0,"(0.0, 0.0)",0.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",2.0,"(0.0, 0.0, 1.0)",2.0,"(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",0.0,"(1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
1,1062177,1294027,Fully Paid,15000,36 months,17.27,Source Verified,59.83,1,INDIVIDUAL,MORTGAGE,44400.0,FL,3.59,1,18,16,0,93,6,1,0,0,non-default,0.0,(1.0),0.0,"(1.0, 0.0)",14.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",0.0,"(1.0, 0.0, 0.0)",2.0,"(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",0.0,"(1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
2,1066424,1291243,Fully Paid,5500,36 months,7.9,Source Verified,car,1,INDIVIDUAL,OWN,59000.0,PA,6.65,0,35,23,0,52,10,1,0,0,non-default,0.0,(1.0),0.0,"(1.0, 0.0)",7.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, ...",2.0,"(0.0, 0.0, 1.0)",10.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",0.0,"(1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
3,1064908,1298959,Fully Paid,9000,36 months,14.65,Source Verified,debt_consolidation,1,INDIVIDUAL,RENT,45000.0,NY,17.01,1,20,51,1,85,21,1,0,0,non-default,0.0,(1.0),0.0,"(1.0, 0.0)",0.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",1.0,"(0.0, 1.0, 0.0)",1.0,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",0.0,"(1.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
4,1064623,1298440,Fully Paid,5600,36 months,10.65,Not Verified,debt_consolidation,1,INDIVIDUAL,RENT,60000.0,CA,16.36,1,4,25,0,90,11,1,0,0,non-default,0.0,(1.0),2.0,"(0.0, 0.0)",0.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",1.0,"(0.0, 1.0, 0.0)",0.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",0.0,"(1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."


In [20]:
p_df30[['home_ownership','home_ownershipIndex', 'home_ownershipclassVec']].head(10)

Unnamed: 0,home_ownership,home_ownershipIndex,home_ownershipclassVec
0,OWN,2.0,"(0.0, 0.0, 1.0)"
1,MORTGAGE,0.0,"(1.0, 0.0, 0.0)"
2,OWN,2.0,"(0.0, 0.0, 1.0)"
3,RENT,1.0,"(0.0, 1.0, 0.0)"
4,RENT,1.0,"(0.0, 1.0, 0.0)"
5,MORTGAGE,0.0,"(1.0, 0.0, 0.0)"
6,RENT,1.0,"(0.0, 1.0, 0.0)"
7,RENT,1.0,"(0.0, 1.0, 0.0)"
8,MORTGAGE,0.0,"(1.0, 0.0, 0.0)"
9,RENT,1.0,"(0.0, 1.0, 0.0)"


In [21]:
pd.set_option('display.max_colwidth', -1)

p_df30[['purpose','purposeIndex', 'purposeclassVec']].head(10)

Unnamed: 0,purpose,purposeIndex,purposeclassVec
0,debt_consolidation,0.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)"
1,59.83,14.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)"
2,car,7.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)"
3,debt_consolidation,0.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)"
4,debt_consolidation,0.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)"
5,medical,6.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)"
6,debt_consolidation,0.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)"
7,debt_consolidation,0.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)"
8,debt_consolidation,0.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)"
9,debt_consolidation,0.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)"


In [22]:
p_df30[['default', 'label']].head(10)

Unnamed: 0,default,label
0,non-default,0.0
1,non-default,0.0
2,non-default,0.0
3,non-default,0.0
4,non-default,0.0
5,non-default,0.0
6,non-default,0.0
7,non-default,0.0
8,non-default,0.0
9,non-default,0.0


#### 2.5 Keep relevant columns for data modeling
SparkML models like logistic regression and decision trees will only use two columns for modeling: "label" as target and "features" as predictors

In [23]:
#cols = numericCols + categoricalColumns
selectedcols = ["label", "features"] #+ cols
df40 = df30.select(selectedcols)
#df40.printSchema()

#### 2.6 Split Data into training and testing sets

In [24]:
(trainingData, testData) = df40.randomSplit([0.7, 0.3], seed = 123)

print('Training Records: {}.  Test Records: {}'.format(trainingData.count(), testData.count()))

Training Records: 11669.  Test Records: 5064


In [25]:
df40.printSchema()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)



#### Balance sample between classes

In [26]:
ratio_adjust = 1.0 ## ratio of pos to neg in the df_subsample

 
#counts = trainingData.select('binary_response').groupBy('label').count().collect()
counts = trainingData.groupBy('label').count().collect()

if counts[0][1] > counts[1][1]:
    down_class = counts[0][0]
else:
    down_class = counts[1][0]
    

higherBound = counts[0][1]
treshold_to_filter = int(ratio_adjust * float(counts[1][1]) / counts[0][1] * higherBound)
 
randGen = lambda x: np.random.randint(0, higherBound) if x == down_class else -1
 
udfRandGen = udf(randGen, IntegerType())

trainingData = trainingData.withColumn("randIndex", udfRandGen("label"))
trainingData = trainingData.filter(trainingData['randIndex'] < treshold_to_filter).drop('randIndex')
 
print("Distribution of Pos and Neg cases of the down-sampled training data are: \n", trainingData.groupBy("label").count().take(3))

Distribution of Pos and Neg cases of the down-sampled training data are: 
 [Row(label=0.0, count=2910), Row(label=1.0, count=2925)]


### Step 3: Data Modeling - Logistic Regression

#### 3.1 Fit model with Training Data

In [27]:
trainingData.printSchema()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)



In [28]:
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)

%time lrModel = lr.fit(trainingData)

CPU times: user 9.24 ms, sys: 1.76 ms, total: 11 ms
Wall time: 3.28 s


#### 3.2 Make predictions on test data

In [29]:
predictions = lrModel.transform(testData)
#selected = predictions.select("label", "prediction", "probability")

#  - Let's take a look at the output
predictions.select(["label", "prediction", "probability"]).show(5, truncate=False)

+-----+----------+----------------------------------------+
|label|prediction|probability                             |
+-----+----------+----------------------------------------+
|0.0  |0.0       |[0.7116845268508744,0.2883154731491256] |
|0.0  |0.0       |[0.6346407540230323,0.3653592459769677] |
|0.0  |0.0       |[0.5163503227070855,0.4836496772929146] |
|0.0  |1.0       |[0.48967973604290754,0.5103202639570925]|
|0.0  |0.0       |[0.5964436763604236,0.4035563236395764] |
+-----+----------+----------------------------------------+
only showing top 5 rows



In [30]:
predictions.select(["label", "prediction", "probability"]).limit(10).toPandas().head(5)

Unnamed: 0,label,prediction,probability
0,0.0,0.0,"[0.7116845268508744, 0.2883154731491256]"
1,0.0,0.0,"[0.6346407540230323, 0.3653592459769677]"
2,0.0,0.0,"[0.5163503227070855, 0.4836496772929146]"
3,0.0,1.0,"[0.48967973604290754, 0.5103202639570925]"
4,0.0,0.0,"[0.5964436763604236, 0.4035563236395764]"


#### 3.3 Create confusion matrix

In [31]:
predictions.crosstab("label", "prediction").show()

+----------------+----+----+
|label_prediction| 0.0| 1.0|
+----------------+----+----+
|             1.0| 516| 747|
|             0.0|2451|1350|
+----------------+----+----+



In [32]:
selected = predictions.select("label", "prediction", "probability")

selected = selected.withColumn("label", selected["label"].cast(IntegerType()))
selected = selected.withColumn("prediction", selected["prediction"].cast(IntegerType()))

selected.crosstab("label", "prediction").show()

+----------------+----+----+
|label_prediction|   0|   1|
+----------------+----+----+
|               1| 516| 747|
|               0|2451|1350|
+----------------+----+----+



#### 3.4 Evaluate the model and print areaUnderROC

The default metric for the BinaryClassificationEvaluator is areaUnderROC

In [33]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.getMetricName()
evaluator.evaluate(predictions)
print(evaluator.getMetricName() + ": " + str(evaluator.evaluate(predictions)))

areaUnderROC: 0.6597582458922808


##### 3.5. Print Intercept and Coefficients

In [34]:
print ('Model Intercept: ', lrModel.intercept)
print ('Model coefficients:', lrModel.coefficients)

Model Intercept:  -0.2292310452345045
Model coefficients: [-0.2987902892992814,0.2234992324668268,0.03300403473560674,-0.4254522750014258,-0.37586734006110706,-0.5069650895799332,-0.31504654304244156,-0.4530272865740959,0.26710929733468336,-0.19515305175108605,-0.6079856298317105,0.4675437501548968,-0.26921245792725085,-0.21903134811022756,-0.9394242500205409,-2.12564734738114,5.048976823890919,-0.521571823031849,-0.2234221608542267,-0.3107711560115854,-0.24188504684332623,-0.0961272213039713,-0.25503840350210305,-0.3843760648345451,-0.1912869005330703,-0.43223791697369407,-0.3529921623641833,-0.334488943509195,-0.1758113129995602,-0.034489625174876554,-0.13452708630913238,-0.13625854475353794,-0.2712474713283907,0.1204762311743691,-0.09520757030040129,-0.5835218764955971,-0.2584561430329766,-0.16464790319601535,0.001530744664665917,-0.40670616284523714,-0.4212787865726386,-0.2946428628301351,-0.10542046700218047,-0.12017614096741021,-0.33335941855847206,0.06830244465894045,-0.22907339

### Step 4. Save trained model

#### 4.1 Save the model in Linux file system

In [35]:
save(name='Spark Logistic Regression Model', model=lrModel, test_data=testData, algorithm_type='Classification') 

'model=lrModel, test_data=testData, algorithm_type=Classification)' was not found in history, as a file, url, nor in the user namespace.


In [36]:
print ("Training count: {}; Test count: {}".format(trainingData.count(), testData.count()))

Training count: 5882; Test count: 5064


In [37]:
lrModel.write().overwrite().save('hdfs:///user/kadochnikov/spark/models/loan_logistic_regression')
#  - can use shorthand is overwrite is not needed
#lrModel.save("../models/loan_logistic_regression")

In [38]:
!hadoop fs -ls -R '/user/kadochnikov/spark/models/loan_logistic_regression'

drwxr-xr-x   - kadochnikov kadochnikov          0 2018-11-10 08:51 /user/kadochnikov/spark/models/loan_logistic_regression/data
-rw-r--r--   3 kadochnikov kadochnikov          0 2018-11-10 08:51 /user/kadochnikov/spark/models/loan_logistic_regression/data/_SUCCESS
-rw-r--r--   3 kadochnikov kadochnikov       4658 2018-11-10 08:51 /user/kadochnikov/spark/models/loan_logistic_regression/data/part-00000-30285a0c-d6f3-4782-add2-7917b05229a3-c000.snappy.parquet
drwxr-xr-x   - kadochnikov kadochnikov          0 2018-11-10 08:51 /user/kadochnikov/spark/models/loan_logistic_regression/metadata
-rw-r--r--   3 kadochnikov kadochnikov          0 2018-11-10 08:51 /user/kadochnikov/spark/models/loan_logistic_regression/metadata/_SUCCESS
-rw-r--r--   3 kadochnikov kadochnikov        488 2018-11-10 08:51 /user/kadochnikov/spark/models/loan_logistic_regression/metadata/part-00000


#### 4.2 Retrieve saved model

In [39]:
samelrModel = LogisticRegressionModel.load('hdfs:///user/kadochnikov/spark/models/loan_logistic_regression')

### Step 5.  Score the new data and store in DashDB

#### Step 5.2 Prepare features (same steps as 2.1 - 2.4)

In [40]:
# Create and execute a Pipeline.
pipeline = Pipeline(stages=stages)
df50 = pipelineModel.transform(df20)

#### Step 5.3 Score data

In [41]:
scored_data = lrModel.transform(df50).select("id","member_id","prediction","probability")
scored_data.printSchema()

root
 |-- id: integer (nullable = true)
 |-- member_id: integer (nullable = true)
 |-- prediction: double (nullable = true)
 |-- probability: vector (nullable = true)



In [42]:
scored_data.limit(10).toPandas().head()

Unnamed: 0,id,member_id,prediction,probability
0,1068545,1303147,0.0,"[0.6152221386102559, 0.38477786138974407]"
1,1062177,1294027,1.0,"[0.4341078933459012, 0.5658921066540988]"
2,1066424,1291243,0.0,"[0.7433780501161734, 0.2566219498838266]"
3,1064908,1298959,1.0,"[0.4954618705096305, 0.5045381294903695]"
4,1064623,1298440,0.0,"[0.6032347964081389, 0.396765203591861]"


In [43]:
def to_array(col):
    def to_array_(v):
        return v.toArray().tolist()
    return udf(to_array_, ArrayType(DoubleType()))(col)

scored_data_to_db = scored_data.withColumn("prob_", to_array(col("probability"))).select(["id","member_id","prediction",] + [col("prob_")[i] for i in range(2)])

In [44]:
scored_data_to_db.limit(10).toPandas().head(5)

Unnamed: 0,id,member_id,prediction,prob_[0],prob_[1]
0,1068545,1303147,0.0,0.615222,0.384778
1,1062177,1294027,1.0,0.434108,0.565892
2,1066424,1291243,0.0,0.743378,0.256622
3,1064908,1298959,1.0,0.495462,0.504538
4,1064623,1298440,0.0,0.603235,0.396765


#### Step 5.4 Save scored dataframe as Parquet file

In [45]:
!hadoop fs -rm -r -f 'hdfs:///user/kadochnikov/spark/scored/'

scored_data.write.parquet('hdfs:///user/kadochnikov/spark/scored/')

!hadoop fs -ls -R 'hdfs:///user/kadochnikov/spark/scored/'

2018-11-10 08:51:42,117 INFO  [main] fs.TrashPolicyDefault (TrashPolicyDefault.java:moveToTrash(168)) - Moved: 'hdfs://nameservice1/user/kadochnikov/spark/scored' to trash at: hdfs://nameservice1/user/kadochnikov/.Trash/Current/user/kadochnikov/spark/scored
-rw-r--r--   3 kadochnikov kadochnikov          0 2018-11-10 08:51 hdfs:///user/kadochnikov/spark/scored/_SUCCESS
-rw-r--r--   3 kadochnikov kadochnikov     406179 2018-11-10 08:51 hdfs:///user/kadochnikov/spark/scored/part-00000-217db7d6-e1d7-4202-a8e1-a64c67b09a39-c000.snappy.parquet


## With pipeline defined, we can easily build other classifiers

### Decision tree model

In [46]:
from pyspark.ml.classification import DecisionTreeClassifier

# Create initial Decision Tree Model
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=3)

# Train model with Training Data
%time dtModel = dt.fit(trainingData)

CPU times: user 9.55 ms, sys: 1.25 ms, total: 10.8 ms
Wall time: 4.67 s


In [47]:
print("numNodes = ", dtModel.numNodes)
print("depth = ", dtModel.depth)

numNodes =  15
depth =  3


In [48]:
predictions = dtModel.transform(testData)

In [49]:
predictions.select(["label", "prediction", "probability"]).show(5, truncate=False)

+-----+----------+----------------------------------------+
|label|prediction|probability                             |
+-----+----------+----------------------------------------+
|0.0  |0.0       |[0.6571285140562249,0.3428714859437751] |
|0.0  |0.0       |[0.6571285140562249,0.3428714859437751] |
|0.0  |0.0       |[0.6571285140562249,0.3428714859437751] |
|0.0  |1.0       |[0.41598360655737704,0.5840163934426229]|
|0.0  |0.0       |[0.6571285140562249,0.3428714859437751] |
+-----+----------+----------------------------------------+
only showing top 5 rows



In [50]:
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)

0.43488607719392086

In [51]:
selected = predictions.select("label", "prediction", "probability")

selected = selected.withColumn("label", selected["label"].cast(IntegerType()))
selected = selected.withColumn("prediction", selected["prediction"].cast(IntegerType()))

selected.crosstab("label", "prediction").show()

+----------------+----+----+
|label_prediction|   0|   1|
+----------------+----+----+
|               1| 584| 679|
|               0|2466|1335|
+----------------+----+----+



Now we will try tuning the model with the ParamGridBuilder and the CrossValidator.

As we indicate 3 values for maxDepth and 3 values for maxBin, this grid will have 3 x 3 = 9 parameter settings for CrossValidator to choose from. We will create a 5-fold CrossValidator.


In [52]:
# Create ParamGrid for Cross Validation
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [1, 2, 6, 10])
             .addGrid(dt.maxBins, [20, 40, 80])
             .build())

In [53]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=dt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
%time cvModel = cv.fit(trainingData)

CPU times: user 1.19 s, sys: 321 ms, total: 1.52 s
Wall time: 2min 48s


In [54]:
predictions = cvModel.transform(testData)

In [55]:
predictions.select(["label", "prediction", "probability"]).show(5, truncate=False)

+-----+----------+----------------------------------------+
|label|prediction|probability                             |
+-----+----------+----------------------------------------+
|0.0  |0.0       |[0.839541547277937,0.16045845272206305] |
|0.0  |0.0       |[0.7489177489177489,0.2510822510822511] |
|0.0  |0.0       |[0.7692307692307693,0.23076923076923078]|
|0.0  |0.0       |[0.5071428571428571,0.4928571428571429] |
|0.0  |0.0       |[1.0,0.0]                               |
+-----+----------+----------------------------------------+
only showing top 5 rows



In [56]:
evaluator.evaluate(predictions)

0.5291521191968692

### Random Forest Model

In [57]:
from pyspark.ml.classification import RandomForestClassifier

# Create an initial RandomForest model.
rf = RandomForestClassifier(labelCol="label", featuresCol="features")

# Train model with Training Data
%time rfModel = rf.fit(trainingData)

CPU times: user 12.4 ms, sys: 3.55 ms, total: 15.9 ms
Wall time: 3.28 s


In [58]:
predictions = rfModel.transform(testData)

In [59]:
selected = predictions.select("label", "prediction", "probability")

selected = selected.withColumn("label", selected["label"].cast(IntegerType()))
selected = selected.withColumn("prediction", selected["prediction"].cast(IntegerType()))

selected.crosstab("label", "prediction").show()

+----------------+----+----+
|label_prediction|   0|   1|
+----------------+----+----+
|               1| 534| 729|
|               0|2551|1250|
+----------------+----+----+



In [60]:
evaluator.evaluate(predictions)

0.6695865758541989

Now we will try tuning the model with the ParamGridBuilder and the CrossValidator.

As we indicate 3 values for maxDepth, 2 values for maxBin, and 2 values for numTrees, this grid will have 3 x 2 x 2 = 12 parameter settings for CrossValidator to choose from. We will create a 5-fold CrossValidator.


In [61]:
# Create ParamGrid for Cross Validation
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(rf.maxDepth, [2, 4, 6])
             .addGrid(rf.maxBins, [20, 60])
             .addGrid(rf.numTrees, [5, 20])
             .build())

In [62]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations.  This can take about 6 minutes since it is training over 20 trees!
%time cvModel = cv.fit(trainingData)

CPU times: user 1.47 s, sys: 392 ms, total: 1.86 s
Wall time: 3min 11s


In [63]:
predictions = cvModel.transform(testData)

In [64]:
evaluator.evaluate(predictions)

0.6694923180402268

In [65]:
selected = predictions.select("label", "prediction", "probability")

selected = selected.withColumn("label", selected["label"].cast(IntegerType()))
selected = selected.withColumn("prediction", selected["prediction"].cast(IntegerType()))

selected.crosstab("label", "prediction").show()

+----------------+----+----+
|label_prediction|   0|   1|
+----------------+----+----+
|               1| 517| 746|
|               0|2527|1274|
+----------------+----+----+

