In [1]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import numpy as np
import pandas as pd

In [2]:
df = spark.read.option('header',True).option('sep',';').option('inferSchema',True).csv('/storage/business_zone/users/phongbd5/applied_ds/bank-full.csv')

In [3]:
df.show(5)

+---+------------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+
|age|         job|marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+------------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+
| 58|  management|married| tertiary|     no|   2143|    yes|  no|unknown|  5|  may|     261|       1|   -1|       0| unknown| no|
| 44|  technician| single|secondary|     no|     29|    yes|  no|unknown|  5|  may|     151|       1|   -1|       0| unknown| no|
| 33|entrepreneur|married|secondary|     no|      2|    yes| yes|unknown|  5|  may|      76|       1|   -1|       0| unknown| no|
| 47| blue-collar|married|  unknown|     no|   1506|    yes|  no|unknown|  5|  may|      92|       1|   -1|       0| unknown| no|
| 33|     unknown| single|  unknown|     no|      1|     no|  no|unknown|  5|  may|     19

In [4]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

In [5]:
df.dtypes

[('age', 'int'),
 ('job', 'string'),
 ('marital', 'string'),
 ('education', 'string'),
 ('default', 'string'),
 ('balance', 'int'),
 ('housing', 'string'),
 ('loan', 'string'),
 ('contact', 'string'),
 ('day', 'int'),
 ('month', 'string'),
 ('duration', 'int'),
 ('campaign', 'int'),
 ('pdays', 'int'),
 ('previous', 'int'),
 ('poutcome', 'string'),
 ('y', 'string')]

In [6]:
num_cols = [_[0] for _ in df.dtypes if _[1] == 'int']
char_cols = [_[0] for _ in df.dtypes if _[1] == 'string']

In [7]:
num_cols

['age', 'balance', 'day', 'duration', 'campaign', 'pdays', 'previous']

In [8]:
char_cols

['job',
 'marital',
 'education',
 'default',
 'housing',
 'loan',
 'contact',
 'month',
 'poutcome',
 'y']

In [9]:
df.columns

['age',
 'job',
 'marital',
 'education',
 'default',
 'balance',
 'housing',
 'loan',
 'contact',
 'day',
 'month',
 'duration',
 'campaign',
 'pdays',
 'previous',
 'poutcome',
 'y']

In [10]:
indexers = [StringIndexer(inputCol = _, outputCol= _ + '_index' , handleInvalid = "keep") for _ in char_cols]

In [11]:
pipeline = Pipeline(stages=indexers)

In [12]:
df_transformed = pipeline.fit(df).transform(df).cache()

In [13]:
df_transformed.count()

45211

In [14]:
df_transformed.show(10)

+---+------------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+---------+-------------+---------------+-------------+-------------+----------+-------------+-----------+--------------+-------+
|age|         job| marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|  y|job_index|marital_index|education_index|default_index|housing_index|loan_index|contact_index|month_index|poutcome_index|y_index|
+---+------------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+---------+-------------+---------------+-------------+-------------+----------+-------------+-----------+--------------+-------+
| 58|  management| married| tertiary|     no|   2143|    yes|  no|unknown|  5|  may|     261|       1|   -1|       0| unknown| no|      1.0|          0.0|            1.0|          0.0|          0.0|       0.0|          1.0|

In [15]:
df_transformed = df_transformed.drop(*char_cols)

In [16]:
for _ in df_transformed.columns:
    if _.endswith('_index'):
        df_transformed = df_transformed.withColumnRenamed(_, _[0:-6])

In [17]:
df_transformed.printSchema()

root
 |-- age: integer (nullable = true)
 |-- balance: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- job: double (nullable = false)
 |-- marital: double (nullable = false)
 |-- education: double (nullable = false)
 |-- default: double (nullable = false)
 |-- housing: double (nullable = false)
 |-- loan: double (nullable = false)
 |-- contact: double (nullable = false)
 |-- month: double (nullable = false)
 |-- poutcome: double (nullable = false)
 |-- y: double (nullable = false)



In [18]:
assembler = VectorAssembler(inputCols = df_transformed.columns[0:-1], outputCol='features')

In [19]:
assembler.transform(df_transformed).show(10)

+---+-------+---+--------+--------+-----+--------+----+-------+---------+-------+-------+----+-------+-----+--------+---+--------------------+
|age|balance|day|duration|campaign|pdays|previous| job|marital|education|default|housing|loan|contact|month|poutcome|  y|            features|
+---+-------+---+--------+--------+-----+--------+----+-------+---------+-------+-------+----+-------+-----+--------+---+--------------------+
| 58|   2143|  5|     261|       1|   -1|       0| 1.0|    0.0|      1.0|    0.0|    0.0| 0.0|    1.0|  0.0|     0.0|0.0|(16,[0,1,2,3,4,5,...|
| 44|     29|  5|     151|       1|   -1|       0| 2.0|    1.0|      0.0|    0.0|    0.0| 0.0|    1.0|  0.0|     0.0|0.0|(16,[0,1,2,3,4,5,...|
| 33|      2|  5|      76|       1|   -1|       0| 7.0|    0.0|      0.0|    0.0|    0.0| 1.0|    1.0|  0.0|     0.0|0.0|(16,[0,1,2,3,4,5,...|
| 47|   1506|  5|      92|       1|   -1|       0| 0.0|    0.0|      3.0|    0.0|    0.0| 0.0|    1.0|  0.0|     0.0|0.0|(16,[0,1,2,3,4,5,...|

# Linear Regression 

In [20]:
from pyspark.ml.regression import LinearRegression

In [21]:
linear_df = df_transformed.select(*num_cols)

In [22]:
linear_df

DataFrame[age: int, balance: int, day: int, duration: int, campaign: int, pdays: int, previous: int]

In [23]:
features_list = linear_df.columns.copy()

In [24]:
features_list.remove('balance')

In [25]:
assembler = VectorAssembler(inputCols=features_list, outputCol='features')

In [26]:
linear_df = assembler.transform(df_transformed)

In [27]:
reg = LinearRegression(featuresCol='features', labelCol='balance')

In [28]:
reg_model = reg.fit(linear_df) # fit model

In [29]:
reg_model

LinearRegression_4e57ac40288ce7d273a9

In [30]:
# view the coefficients and intercepts for each variable
import pandas as pd
for k, v in linear_df.schema["features"].metadata["ml_attr"]["attrs"].items():
    features_df = pd.DataFrame(v)

In [31]:
print(reg_model.coefficients, reg_model.intercept)

[28.08397290892997,3.3055463619496286,0.24882841970901756,-14.142676297161454,-0.08248810233032043,23.462992800762525] 124.92130092818479


In [32]:
features_df['coefficients'] = reg_model.coefficients

In [33]:
features_df

Unnamed: 0,idx,name,coefficients
0,0,age,28.083973
1,1,day,3.305546
2,2,duration,0.248828
3,3,campaign,-14.142676
4,4,pdays,-0.082488
5,5,previous,23.462993


In [34]:
# prediction result
reg_model.transform(linear_df).show(10)

+---+-------+---+--------+--------+-----+--------+----+-------+---------+-------+-------+----+-------+-----+--------+---+--------------------+------------------+
|age|balance|day|duration|campaign|pdays|previous| job|marital|education|default|housing|loan|contact|month|poutcome|  y|            features|        prediction|
+---+-------+---+--------+--------+-----+--------+----+-------+---------+-------+-------+----+-------+-----+--------+---+--------------------+------------------+
| 58|   2143|  5|     261|       1|   -1|       0| 1.0|    0.0|      1.0|    0.0|    0.0| 0.0|    1.0|  0.0|     0.0|0.0|[58.0,5.0,261.0,1...|1821.2034908050935|
| 44|     29|  5|     151|       1|   -1|       0| 2.0|    1.0|      0.0|    0.0|    0.0| 0.0|    1.0|  0.0|     0.0|0.0|[44.0,5.0,151.0,1...| 1400.656743912082|
| 33|      2|  5|      76|       1|   -1|       0| 7.0|    0.0|      0.0|    0.0|    0.0| 1.0|    1.0|  0.0|     0.0|0.0|[33.0,5.0,76.0,1....| 1073.070910435676|
| 47|   1506|  5|      92|  

# Multi-collinearity with VIF (Variance Inflation Factor)

In [35]:
def vif_calculator(df, features_list):
    vif_list = []
    for i in features_list:
        temp_features_list = features_list.copy()
        temp_features_list.remove(i)
        temp_target = i
        assembler = VectorAssembler(inputCols = temp_features_list, outputCol = 'features')
        temp_df = assembler.transform(df)
        reg = LinearRegression(featuresCol = 'features', labelCol = i)
        reg_model = reg.fit(temp_df) # fit model
        temp_vif = 1/(1 - reg_model.summary.r2)
        vif_list.append(temp_vif)
    return vif_list

In [36]:
features_list

['age', 'day', 'duration', 'campaign', 'pdays', 'previous']

In [37]:
features_df['vif'] = vif_calculator(linear_df.drop('features'), features_list)

In [38]:
print(features_df)

   idx      name  coefficients       vif
0    0       age     28.083973  1.000917
1    1       day      3.305546  1.034350
2    2  duration      0.248828  1.007627
3    3  campaign    -14.142676  1.039907
4    4     pdays     -0.082488  1.276182
5    5  previous     23.462993  1.261321


# Logistic Regression

In [39]:
from pyspark.ml.classification import LogisticRegression

In [40]:
df_transformed.columns

['age',
 'balance',
 'day',
 'duration',
 'campaign',
 'pdays',
 'previous',
 'job',
 'marital',
 'education',
 'default',
 'housing',
 'loan',
 'contact',
 'month',
 'poutcome',
 'y']

In [41]:
logistic_df = df_transformed.select(*num_cols, 'y')

In [42]:
logistic_df.columns

['age', 'balance', 'day', 'duration', 'campaign', 'pdays', 'previous', 'y']

In [43]:
assembler = VectorAssembler(inputCols = logistic_df.columns[0:-1], outputCol = 'features')

In [44]:
logistic_df = assembler.transform(logistic_df)

In [45]:
logistic_df.show(10,False)

+---+-------+---+--------+--------+-----+--------+---+------------------------------------+
|age|balance|day|duration|campaign|pdays|previous|y  |features                            |
+---+-------+---+--------+--------+-----+--------+---+------------------------------------+
|58 |2143   |5  |261     |1       |-1   |0       |0.0|[58.0,2143.0,5.0,261.0,1.0,-1.0,0.0]|
|44 |29     |5  |151     |1       |-1   |0       |0.0|[44.0,29.0,5.0,151.0,1.0,-1.0,0.0]  |
|33 |2      |5  |76      |1       |-1   |0       |0.0|[33.0,2.0,5.0,76.0,1.0,-1.0,0.0]    |
|47 |1506   |5  |92      |1       |-1   |0       |0.0|[47.0,1506.0,5.0,92.0,1.0,-1.0,0.0] |
|33 |1      |5  |198     |1       |-1   |0       |0.0|[33.0,1.0,5.0,198.0,1.0,-1.0,0.0]   |
|35 |231    |5  |139     |1       |-1   |0       |0.0|[35.0,231.0,5.0,139.0,1.0,-1.0,0.0] |
|28 |447    |5  |217     |1       |-1   |0       |0.0|[28.0,447.0,5.0,217.0,1.0,-1.0,0.0] |
|42 |2      |5  |380     |1       |-1   |0       |0.0|[42.0,2.0,5.0,380.0,1.0,-1

In [46]:
binary_clf = LogisticRegression(featuresCol='features', labelCol='y', family='binomial')

In [47]:
logistic_df = logistic_df.withColumn('y', col('y').cast(IntegerType()))

In [48]:
binary_clf_model = binary_clf.fit(logistic_df) # fit binary model

In [49]:
binary_clf_model.transform(logistic_df).show(10,False)

+---+-------+---+--------+--------+-----+--------+---+------------------------------------+----------------------------------------+-----------------------------------------+----------+
|age|balance|day|duration|campaign|pdays|previous|y  |features                            |rawPrediction                           |probability                              |prediction|
+---+-------+---+--------+--------+-----+--------+---+------------------------------------+----------------------------------------+-----------------------------------------+----------+
|58 |2143   |5  |261     |1       |-1   |0       |0  |[58.0,2143.0,5.0,261.0,1.0,-1.0,0.0]|[2.1176813936656,-2.1176813936656]      |[0.8926098759774228,0.10739012402257708] |0.0       |
|44 |29     |5  |151     |1       |-1   |0       |0  |[44.0,29.0,5.0,151.0,1.0,-1.0,0.0]  |[2.7078044172889126,-2.7078044172889126]|[0.9374855970560074,0.06251440294399248] |0.0       |
|33 |2      |5  |76      |1       |-1   |0       |0  |[33.0,2.0,5.0,76

In [50]:
print(binary_clf_model.coefficients)

[0.007959289992554713,3.7181275567506194e-05,-0.0016500733138635486,0.0036371977016167118,-0.128043283553752,0.0021135713490607728,0.0859380108519198]


In [51]:
print(binary_clf_model.intercept) #model intercept for binary model

-3.4699010654247706


In [52]:
multinomial_clf = LogisticRegression(featuresCol = 'features', labelCol = 'y', family = 'multinomial')

In [53]:
multinomial_clf_model = multinomial_clf.fit(logistic_df)

In [54]:
print(multinomial_clf_model.coefficientMatrix)  # coefficient of Class 0 and Class 1

DenseMatrix([[-3.97962982e-03, -1.85907216e-05,  8.24926374e-04,
              -1.81859931e-03,  6.40216709e-02, -1.05678834e-03,
              -4.29688093e-02],
             [ 3.97962982e-03,  1.85907216e-05, -8.24926374e-04,
               1.81859931e-03, -6.40216709e-02,  1.05678834e-03,
               4.29688093e-02]])


In [55]:
print(multinomial_clf_model.interceptVector) # Intercepts of Class 0 and Class 1

[1.7349520795817952,-1.7349520795817952]


In [56]:
multinomial_clf_model.transform(logistic_df).show(10,False)

+---+-------+---+--------+--------+-----+--------+---+------------------------------------+----------------------------------------+-----------------------------------------+----------+
|age|balance|day|duration|campaign|pdays|previous|y  |features                            |rawPrediction                           |probability                              |prediction|
+---+-------+---+--------+--------+-----+--------+---+------------------------------------+----------------------------------------+-----------------------------------------+----------+
|58 |2143   |5  |261     |1       |-1   |0       |0  |[58.0,2143.0,5.0,261.0,1.0,-1.0,0.0]|[1.0588423047295485,-1.0588423047295485]|[0.8926101842349116,0.10738981576508844] |0.0       |
|44 |29     |5  |151     |1       |-1   |0       |0  |[44.0,29.0,5.0,151.0,1.0,-1.0,0.0]  |[1.3539038318422276,-1.3539038318422276]|[0.9374857873151383,0.0625142126848616]  |0.0       |
|33 |2      |5  |76      |1       |-1   |0       |0  |[33.0,2.0,5.0,76

# Decision Tree

In [57]:
from pyspark.ml.classification import DecisionTreeClassifier

In [58]:
clf = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'y', impurity = 'gini') #gini based model

In [61]:
clf_model = clf.fit(logistic_df)

In [63]:
clf_model.transform(logistic_df).show(10,False) #future predictions

+---+-------+---+--------+--------+-----+--------+---+------------------------------------+---------------+----------------------------------------+----------+
|age|balance|day|duration|campaign|pdays|previous|y  |features                            |rawPrediction  |probability                             |prediction|
+---+-------+---+--------+--------+-----+--------+---+------------------------------------+---------------+----------------------------------------+----------+
|58 |2143   |5  |261     |1       |-1   |0       |0  |[58.0,2143.0,5.0,261.0,1.0,-1.0,0.0]|[8533.0,755.0] |[0.9187123169681309,0.08128768303186908]|0.0       |
|44 |29     |5  |151     |1       |-1   |0       |0  |[44.0,29.0,5.0,151.0,1.0,-1.0,0.0]  |[19932.0,373.0]|[0.9816301403595173,0.01836985964048264]|0.0       |
|33 |2      |5  |76      |1       |-1   |0       |0  |[33.0,2.0,5.0,76.0,1.0,-1.0,0.0]    |[19932.0,373.0]|[0.9816301403595173,0.01836985964048264]|0.0       |
|47 |1506   |5  |92      |1       |-1   

In [64]:
print(clf_model.featureImportances)

(7,[0,2,3,4,5],[0.07029188881576587,0.015298882843227956,0.7092650456120744,0.0029936718415604106,0.2021505108873715])


# Regression Tree

In [65]:
from pyspark.ml.regression import DecisionTreeRegressor

In [67]:
reg = DecisionTreeRegressor(featuresCol='features', labelCol='balance', impurity='variance')

In [68]:
reg_model = reg.fit(linear_df)

In [69]:
print(reg_model.featureImportances) #feature importance

(6,[0,1,2,3,4,5],[0.3698438457163591,0.30424559689419733,0.1507290562998903,0.028890629842263742,0.12374645056225508,0.02254442068503449])


In [70]:
reg_model.transform(linear_df).show(10,False) #future predictions

+---+-------+---+--------+--------+-----+--------+----+-------+---------+-------+-------+----+-------+-----+--------+---+-----------------------------+------------------+
|age|balance|day|duration|campaign|pdays|previous|job |marital|education|default|housing|loan|contact|month|poutcome|y  |features                     |prediction        |
+---+-------+---+--------+--------+-----+--------+----+-------+---------+-------+-------+----+-------+-----+--------+---+-----------------------------+------------------+
|58 |2143   |5  |261     |1       |-1   |0       |1.0 |0.0    |1.0      |0.0    |0.0    |0.0 |1.0    |0.0  |0.0     |0.0|[58.0,5.0,261.0,1.0,-1.0,0.0]|1937.1573236889692|
|44 |29     |5  |151     |1       |-1   |0       |2.0 |1.0    |0.0      |0.0    |0.0    |0.0 |1.0    |0.0  |0.0     |0.0|[44.0,5.0,151.0,1.0,-1.0,0.0]|1348.3228379513014|
|33 |2      |5  |76      |1       |-1   |0       |7.0 |0.0    |0.0      |0.0    |0.0    |1.0 |1.0    |0.0  |0.0     |0.0|[33.0,5.0,76.0,1.0,-1.0,

# Random Forest

In [71]:
from pyspark.ml.classification import RandomForestClassifier # Classification

In [72]:
clf = RandomForestClassifier(featuresCol='features', labelCol='y')
clf_model = clf.fit(logistic_df)

In [74]:
print(clf_model.featureImportances)
# print(clf_model.toDebugString)

(7,[0,1,2,3,4,5,6],[0.0712224558282332,0.021323757857823398,0.03467989305640842,0.6594593977720731,0.010004203459231179,0.15888984057433286,0.04442045145189777])


In [75]:
from pyspark.ml.regression import RandomForestRegressor # Regression

In [76]:
from pyspark.ml.regression import RandomForestRegressor

In [77]:
reg = RandomForestRegressor(featuresCol='features', labelCol='balance')
reg_model = reg.fit(linear_df)

In [78]:
print(reg_model.featureImportances)
# print(reg_model.toDebugString)

(6,[0,1,2,3,4,5],[0.3809530638699519,0.20958069504046067,0.0970537166707919,0.07602969796561258,0.15345908416837026,0.08292374228481265])


# One-vs-Rest

In [None]:
# instantiate the base classifier.
clf = RandomForestClassifier(featuresCol = 'features', labelCol = 'education')

# instantiate the One Vs Rest Classifier.
ovr = OneVsRest(classifier = clf, featuresCol = 'features', labelCol = 'education')

# train the multiclass model.
ovrModel = ovr.fit(train)

# score the model on test data.
predictions = ovrModel.transform(test)

# obtain evaluator.
evaluator = MulticlassClassificationEvaluator(metricName = "accuracy", labelCol = 'education')

# compute the classification error on test data.
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))