# Launch spark session behind the jupyter notebook

In [1]:
!ls -l $SPARK_HOME

total 120
-rw-r--r--@   1 abulbasar  staff  17881 Jul  1 04:39 LICENSE
-rw-r--r--@   1 abulbasar  staff  24645 Jul  1 04:39 NOTICE
drwxr-xr-x@   3 abulbasar  staff    102 Jul  1 04:39 [34mR[m[m
-rw-r--r--@   1 abulbasar  staff   3809 Jul  1 04:39 README.md
-rw-r--r--@   1 abulbasar  staff    128 Jul  1 04:39 RELEASE
drwxr-xr-x@  25 abulbasar  staff    850 Jul  1 04:39 [34mbin[m[m
drwxr-xr-x@  10 abulbasar  staff    340 Oct 23 10:19 [34mconf[m[m
drwxr-xr-x@   5 abulbasar  staff    170 Jul  1 04:39 [34mdata[m[m
-rw-r--r--    1 abulbasar  staff    769 Oct 30 12:25 derby.log
drwxr-xr-x@   4 abulbasar  staff    136 Jul  1 04:39 [34mexamples[m[m
-rw-r--r--    1 abulbasar  staff      0 Oct 21 01:53 hello.txt
drwxr-xr-x@ 211 abulbasar  staff   7174 Jul  1 04:39 [34mjars[m[m
drwxr-xr-x@  38 abulbasar  staff   1292 Jul  1 04:39 [34mlicenses[m[m
drwxr-xr-x   10 abulbasar  staff    340 Oct 30 12:25 [34mmetastore_db[m[m
drwxr-xr-x@  16 abulbasar  staff    544 

In [2]:
# Note: set SPARK_HOME to Spark binaries before launching the Jupyter session.
import os, sys
SPARK_HOME = os.environ['SPARK_HOME']
sys.path.insert(0, os.path.join(SPARK_HOME, "python", "lib", "py4j-0.10.4-src.zip"))
sys.path.insert(0, os.path.join(SPARK_HOME, "python"))

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
print("Spark version: ", spark.version)

Spark version:  2.2.0


In [3]:
spark.sparkContext.uiWebUrl

'http://192.168.1.5:4040'

# Import libararies 

In [4]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.pipeline import Pipeline

from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import evaluation

import pandas as pd
import pyspark
import numpy as np

In [5]:
pd.__version__, np.__version__

('0.20.3', '1.12.1')

Check version of the libraries. For this notebook, I am using Spark 2.2.0

# Load Dataset

You can download the dataset from [here](https://github.com/abulbasar/data/blob/master/credit-default.csv)

In [6]:
credit = spark.read.options(header = True, inferSchema = True).csv("/data/credit-default.csv").cache()
print("Total number of records: ", credit.count())
credit.limit(10).toPandas().head()

Total number of records:  1000


Unnamed: 0,checking_balance,months_loan_duration,credit_history,purpose,amount,savings_balance,employment_length,installment_rate,personal_status,other_debtors,...,property,age,installment_plan,housing,existing_credits,default,dependents,telephone,foreign_worker,job
0,< 0 DM,6,critical,radio/tv,1169,unknown,> 7 yrs,4,single male,none,...,real estate,67,none,own,2,1,1,yes,yes,skilled employee
1,1 - 200 DM,48,repaid,radio/tv,5951,< 100 DM,1 - 4 yrs,2,female,none,...,real estate,22,none,own,1,2,1,none,yes,skilled employee
2,unknown,12,critical,education,2096,< 100 DM,4 - 7 yrs,2,single male,none,...,real estate,49,none,own,1,1,2,none,yes,unskilled resident
3,< 0 DM,42,repaid,furniture,7882,< 100 DM,4 - 7 yrs,2,single male,guarantor,...,building society savings,45,none,for free,1,1,2,none,yes,skilled employee
4,< 0 DM,24,delayed,car (new),4870,< 100 DM,1 - 4 yrs,3,single male,none,...,unknown/none,53,none,for free,2,2,2,none,yes,skilled employee


Above I am using .toPandas function to convert the dataframe to pandas dataframe. Pandas dataframe is better for  display in jupyter notebook. 

View the schema 

In [7]:
credit.printSchema()

root
 |-- checking_balance: string (nullable = true)
 |-- months_loan_duration: integer (nullable = true)
 |-- credit_history: string (nullable = true)
 |-- purpose: string (nullable = true)
 |-- amount: integer (nullable = true)
 |-- savings_balance: string (nullable = true)
 |-- employment_length: string (nullable = true)
 |-- installment_rate: integer (nullable = true)
 |-- personal_status: string (nullable = true)
 |-- other_debtors: string (nullable = true)
 |-- residence_history: integer (nullable = true)
 |-- property: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- installment_plan: string (nullable = true)
 |-- housing: string (nullable = true)
 |-- existing_credits: integer (nullable = true)
 |-- default: integer (nullable = true)
 |-- dependents: integer (nullable = true)
 |-- telephone: string (nullable = true)
 |-- foreign_worker: string (nullable = true)
 |-- job: string (nullable = true)



As I can see, there are number of columns of string type - checking_balance, credit_history etc.

Let me define a function that take a catgorical column and pass it through StringIndexer and OneHotEncoder it gives back a dataframe with same column name as the original categorical column. It reurns a new dataframe that contains categorical column replaced by OneHotEncoded vector. 

In [8]:
def encode_categorical(df, colname, one_hot = True):
    string_indexer = StringIndexer(inputCol=colname, outputCol= colname + "_idx").fit(df)
    df = string_indexer.transform(df)
    df = df.drop(colname)
    df = df.withColumnRenamed(colname + "_idx", colname)
    
    one_hot_encoder = None
    if one_hot:
        one_hot_encoder = OneHotEncoder(inputCol=colname, outputCol=colname + "_ohe" , dropLast = True)
        df = one_hot_encoder.transform(df)
        df = df.drop(colname)
        df = df.withColumnRenamed(colname + "_ohe", colname)
    
    return df, string_indexer.labels

In [9]:
df, labels = encode_categorical(credit, "purpose", True)
df.select("purpose").limit(5).toPandas()

Unnamed: 0,purpose
0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)"
1,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)"
2,"(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0)"
3,"(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)"
4,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)"


Find all columns of String datatype

In [10]:
cols = [c for c, t in credit.dtypes if t == "string"]
cols

['checking_balance',
 'credit_history',
 'purpose',
 'savings_balance',
 'employment_length',
 'personal_status',
 'other_debtors',
 'property',
 'installment_plan',
 'housing',
 'telephone',
 'foreign_worker',
 'job']

Transform each string column type into OneHotEncoded value and collect distinct values for each categorical column in list as shown below.

In [11]:
df, labels = encode_categorical(credit, "default", False)
categorical_fields = dict()
for c, t in credit.dtypes:
    if t == "string":
        df, labels = encode_categorical(df, c)
        categorical_fields[c] = labels
df.limit(5).toPandas()

Unnamed: 0,months_loan_duration,amount,installment_rate,residence_history,age,existing_credits,dependents,default,checking_balance,credit_history,...,savings_balance,employment_length,personal_status,other_debtors,property,installment_plan,housing,telephone,foreign_worker,job
0,6,1169,4,4,67,2,1,0.0,"(0.0, 1.0, 0.0)","(0.0, 1.0, 0.0, 0.0)",...,"(0.0, 1.0, 0.0, 0.0)","(0.0, 1.0, 0.0, 0.0)","(1.0, 0.0, 0.0)","(1.0, 0.0)","(0.0, 1.0, 0.0)","(1.0, 0.0)","(1.0, 0.0)",(0.0),(1.0),"(1.0, 0.0, 0.0)"
1,48,5951,2,2,22,1,1,1.0,"(0.0, 0.0, 1.0)","(1.0, 0.0, 0.0, 0.0)",...,"(1.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0)","(0.0, 1.0, 0.0)","(1.0, 0.0)","(0.0, 1.0, 0.0)","(1.0, 0.0)","(1.0, 0.0)",(1.0),(1.0),"(1.0, 0.0, 0.0)"
2,12,2096,2,3,49,1,2,0.0,"(1.0, 0.0, 0.0)","(0.0, 1.0, 0.0, 0.0)",...,"(1.0, 0.0, 0.0, 0.0)","(0.0, 0.0, 1.0, 0.0)","(1.0, 0.0, 0.0)","(1.0, 0.0)","(0.0, 1.0, 0.0)","(1.0, 0.0)","(1.0, 0.0)",(1.0),(1.0),"(0.0, 1.0, 0.0)"
3,42,7882,2,4,45,1,2,0.0,"(0.0, 1.0, 0.0)","(1.0, 0.0, 0.0, 0.0)",...,"(1.0, 0.0, 0.0, 0.0)","(0.0, 0.0, 1.0, 0.0)","(1.0, 0.0, 0.0)","(0.0, 1.0)","(0.0, 0.0, 1.0)","(1.0, 0.0)","(0.0, 0.0)",(1.0),(1.0),"(1.0, 0.0, 0.0)"
4,24,4870,3,4,53,2,2,1.0,"(0.0, 1.0, 0.0)","(0.0, 0.0, 1.0, 0.0)",...,"(1.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0)","(1.0, 0.0)","(0.0, 0.0, 0.0)","(1.0, 0.0)","(0.0, 0.0)",(1.0),(1.0),"(1.0, 0.0, 0.0)"


In [12]:
categorical_fields

{'checking_balance': ['unknown', '< 0 DM', '1 - 200 DM', '> 200 DM'],
 'credit_history': ['repaid',
  'critical',
  'delayed',
  'fully repaid this bank',
  'fully repaid'],
 'employment_length': ['1 - 4 yrs',
  '> 7 yrs',
  '4 - 7 yrs',
  '0 - 1 yrs',
  'unemployed'],
 'foreign_worker': ['yes', 'no'],
 'housing': ['own', 'rent', 'for free'],
 'installment_plan': ['none', 'bank', 'stores'],
 'job': ['skilled employee',
  'unskilled resident',
  'mangement self-employed',
  'unemployed non-resident'],
 'other_debtors': ['none', 'guarantor', 'co-applicant'],
 'personal_status': ['single male', 'female', 'married male', 'divorced male'],
 'property': ['other',
  'real estate',
  'building society savings',
  'unknown/none'],
 'purpose': ['radio/tv',
  'car (new)',
  'furniture',
  'car (used)',
  'business',
  'education',
  'repairs',
  'others',
  'domestic appliances',
  'retraining'],
 'savings_balance': ['< 100 DM',
  'unknown',
  '101 - 500 DM',
  '501 - 1000 DM',
  '> 1000 DM'],
 '

Verify that all columns in df is either of numeric or numeric vector type

In [13]:
df.printSchema()

root
 |-- months_loan_duration: integer (nullable = true)
 |-- amount: integer (nullable = true)
 |-- installment_rate: integer (nullable = true)
 |-- residence_history: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- existing_credits: integer (nullable = true)
 |-- dependents: integer (nullable = true)
 |-- default: double (nullable = true)
 |-- checking_balance: vector (nullable = true)
 |-- credit_history: vector (nullable = true)
 |-- purpose: vector (nullable = true)
 |-- savings_balance: vector (nullable = true)
 |-- employment_length: vector (nullable = true)
 |-- personal_status: vector (nullable = true)
 |-- other_debtors: vector (nullable = true)
 |-- property: vector (nullable = true)
 |-- installment_plan: vector (nullable = true)
 |-- housing: vector (nullable = true)
 |-- telephone: vector (nullable = true)
 |-- foreign_worker: vector (nullable = true)
 |-- job: vector (nullable = true)



Create a list of columns except the label column

In [14]:
cols = credit.columns
cols.remove("default")
cols

['checking_balance',
 'months_loan_duration',
 'credit_history',
 'purpose',
 'amount',
 'savings_balance',
 'employment_length',
 'installment_rate',
 'personal_status',
 'other_debtors',
 'residence_history',
 'property',
 'age',
 'installment_plan',
 'housing',
 'existing_credits',
 'dependents',
 'telephone',
 'foreign_worker',
 'job']

Use a vector assembler to transform all features into a single feature column

In [15]:
df_vect = VectorAssembler(inputCols = cols, outputCol="features").transform(df).select("features", "default")
df_vect.show(5, False)

+------------------------------------------------------------------------------------------------------------------------------------------------------+-------+
|features                                                                                                                                              |default|
+------------------------------------------------------------------------------------------------------------------------------------------------------+-------+
|(48,[1,3,5,8,17,19,23,26,27,30,32,34,36,37,39,41,42,44,45],[1.0,6.0,1.0,1.0,1169.0,1.0,1.0,4.0,1.0,1.0,4.0,1.0,67.0,1.0,1.0,2.0,1.0,1.0,1.0])         |0.0    |
|(48,[2,3,4,8,17,18,22,26,28,30,32,34,36,37,39,41,42,43,44,45],[1.0,48.0,1.0,1.0,5951.0,1.0,1.0,2.0,1.0,1.0,2.0,1.0,22.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]) |1.0    |
|(48,[0,3,5,13,17,18,24,26,27,30,32,34,36,37,39,41,42,43,44,46],[1.0,12.0,1.0,1.0,2096.0,1.0,1.0,2.0,1.0,1.0,3.0,1.0,49.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0])|0.0    |
|(48,[1,3,4,10,17,18,24,26,27,31,3

Let me spot check whether OneHotEncode worked ok.

In [16]:
credit.first()

Row(checking_balance='< 0 DM', months_loan_duration=6, credit_history='critical', purpose='radio/tv', amount=1169, savings_balance='unknown', employment_length='> 7 yrs', installment_rate=4, personal_status='single male', other_debtors='none', residence_history=4, property='real estate', age=67, installment_plan='none', housing='own', existing_credits=2, default=1, dependents=1, telephone='yes', foreign_worker='yes', job='skilled employee')

In [17]:
v = df_vect.select("features").first().features
v.toArray()

array([  0.00000000e+00,   1.00000000e+00,   0.00000000e+00,
         6.00000000e+00,   0.00000000e+00,   1.00000000e+00,
         0.00000000e+00,   0.00000000e+00,   1.00000000e+00,
         0.00000000e+00,   0.00000000e+00,   0.00000000e+00,
         0.00000000e+00,   0.00000000e+00,   0.00000000e+00,
         0.00000000e+00,   0.00000000e+00,   1.16900000e+03,
         0.00000000e+00,   1.00000000e+00,   0.00000000e+00,
         0.00000000e+00,   0.00000000e+00,   1.00000000e+00,
         0.00000000e+00,   0.00000000e+00,   4.00000000e+00,
         1.00000000e+00,   0.00000000e+00,   0.00000000e+00,
         1.00000000e+00,   0.00000000e+00,   4.00000000e+00,
         0.00000000e+00,   1.00000000e+00,   0.00000000e+00,
         6.70000000e+01,   1.00000000e+00,   0.00000000e+00,
         1.00000000e+00,   0.00000000e+00,   2.00000000e+00,
         1.00000000e+00,   0.00000000e+00,   1.00000000e+00,
         1.00000000e+00,   0.00000000e+00,   0.00000000e+00])

In [18]:
dnorm_features = []
for c in cols:
    if c in categorical_fields:
        dnorm_features.extend([c + "::" + v for v in categorical_fields[c][:-1]])
    else:
        dnorm_features.append(c)

In [19]:
pd.DataFrame({"feature": dnorm_features, "value": df_vect.select("features").first().features})

Unnamed: 0,feature,value
0,checking_balance::unknown,0.0
1,checking_balance::< 0 DM,1.0
2,checking_balance::1 - 200 DM,0.0
3,months_loan_duration,6.0
4,credit_history::repaid,0.0
5,credit_history::critical,1.0
6,credit_history::delayed,0.0
7,credit_history::fully repaid this bank,0.0
8,purpose::radio/tv,1.0
9,purpose::car (new),0.0


In [20]:
df_train, df_test = df_vect.randomSplit(weights=[0.7,0.3], seed=1)
df_train.count(), df_test.count()

(704, 296)

# Build a RandomForest Classifier

In [21]:
forest = RandomForestClassifier(labelCol="default", featuresCol="features")
forest_model = forest.fit(df_train)

# Run prediction on the whole dataset

In [22]:
df_test_pred = forest_model.transform(df_test)
df_test_pred.show(5)

+--------------------+-------+--------------------+--------------------+----------+
|            features|default|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|(48,[0,3,4,8,17,1...|    0.0|[17.2285594170798...|[0.86142797085399...|       0.0|
|(48,[0,3,4,8,17,1...|    0.0|[15.8562730607405...|[0.79281365303702...|       0.0|
|(48,[0,3,4,8,17,1...|    0.0|[13.7058714474839...|[0.68529357237419...|       0.0|
|(48,[0,3,4,8,17,1...|    0.0|[17.7285683794501...|[0.88642841897250...|       0.0|
|(48,[0,3,4,8,17,1...|    0.0|[16.7747538895373...|[0.83873769447686...|       0.0|
+--------------------+-------+--------------------+--------------------+----------+
only showing top 5 rows



In [23]:
df_test_pred.groupBy("prediction").count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|       0.0|  278|
|       1.0|   18|
+----------+-----+



# Evaluate

In [25]:
evaluator = evaluation.MulticlassClassificationEvaluator(labelCol="default", metricName="accuracy")
evaluator.evaluate(df_test_pred)

0.722972972972973

In [26]:
print("Total number of features: ", forest_model.numFeatures, "\nOrder of feature importance: \n")
pd.DataFrame({"importance": forest_model.featureImportances.toArray(), 
              "feature": dnorm_features
             }).sort_values("importance", ascending = False)

Total number of features:  48 
Order of feature importance: 



Unnamed: 0,feature,importance
17,amount,0.126363
3,months_loan_duration,0.110096
0,checking_balance::unknown,0.100432
36,age,0.070377
1,checking_balance::< 0 DM,0.055003
5,credit_history::critical,0.040432
37,installment_plan::none,0.034975
38,installment_plan::bank,0.032252
18,savings_balance::< 100 DM,0.029586
39,housing::own,0.026804
