In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[2]") \
    .appName("Load Data From HDFS") \
    .getOrCreate()

In [2]:
spark

In [3]:
# load hadoop format file parquet from hdfs

#df = spark.read.parquet("hdfs://localhost:9820/proyek/bank-additional.parq")
bank_df = spark.read.parquet("bank-additional.parq")
bank_df.show(5)
bank_df.printSchema()

+---+---------+-------+-----------+-------+----+---------+-----+-----------+--------+--------+--------+-----------+------------+--------------+-------------+---------+-----------+---+
|age|      job|marital|  education|housing|loan|  contact|month|day_of_week|duration|campaign|previous|   poutcome|emp_var_rate|cons_price_idx|cons_conf_idx|euribor3m|nr_employed|  y|
+---+---------+-------+-----------+-------+----+---------+-----+-----------+--------+--------+--------+-----------+------------+--------------+-------------+---------+-----------+---+
| 56|housemaid|married|   basic.4y|     no|  no|telephone|  may|        mon|     261|       1|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|     5191.0| no|
| 57| services|married|high.school|     no|  no|telephone|  may|        mon|     149|       1|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|     5191.0| no|
| 37| services|married|high.school|    yes|  no|telephone|  may|        mon|    

### Importing needful libraries

In [4]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.sql.functions import mean, col, split, col, regexp_extract, when, lit
from pyspark.ml.feature import StringIndexer, IndexToString, VectorAssembler, VectorIndexer
from pyspark.ml.feature import QuantileDiscretizer, OneHotEncoder

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [5]:
# Number of customers in the dataframe
clients_count = bank_df.count()
print("Number of customers is {}".format(clients_count))

Number of customers is 41166


In [6]:
# Number of customers which are subscribed vs. those not subscribed a term deposit
groupBy_clients = bank_df.groupBy("y").count()
groupBy_clients.show()

+---+-----+
|  y|count|
+---+-----+
| no|36526|
|yes| 4640|
+---+-----+



In [7]:
bank_df.describe([t[0] for t in bank_df.dtypes if t[1] == 'int']).show()

+-------+------------------+------------------+------------------+-------------------+
|summary|               age|          duration|          campaign|           previous|
+-------+------------------+------------------+------------------+-------------------+
|  count|             41166|             41166|             41166|              41166|
|   mean| 40.02356313462566| 258.3929456347471|  2.54875382597289|0.17305543409609872|
| stddev|10.421897109457673|259.30296411836315|2.6453043592282746| 0.4950171507468902|
|    min|                17|                 0|                 1|                  0|
|    max|                98|              4918|                32|                  7|
+-------+------------------+------------------+------------------+-------------------+



### Preparing Data for Machine Learning

In [8]:
def get_dummy(df, categoricalCols, continuousCols, labelCol):
    indexers = [StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)) for c in categoricalCols]
    encoders = [OneHotEncoder(inputCol=indexer.getOutputCol(), outputCol="{0}_encoded".format(indexer.getOutputCol())) for indexer in indexers]
    
    assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders] + continuousCols, outputCol="features")
    indexer = StringIndexer(inputCol=labelCol, outputCol='indexedLabel')
    
    pipeline = Pipeline(stages = indexers + encoders + [assembler] + [indexer])
    model=pipeline.fit(df)
    data = model.transform(df)
    
    data = data.withColumn('label', col(labelCol))
    
    return data.select('age', 'job', 'marital', 'education', 'housing', 'loan', 'contact', 'month', 'day_of_week', 'duration', 'campaign', 'previous', 'poutcome', 'emp_var_rate', 'cons_price_idx', 'cons_conf_idx', 'euribor3m', 'nr_employed', 'features', 'indexedLabel', 'label'), StringIndexer(inputCol='label').fit(data)

In [9]:
# transform the data
categoricalColumns = ['job', 'marital', 'education', 'housing', 'loan', 'contact', 'month', 'day_of_week', 'poutcome']
numericCols = ['age', 'duration', 'campaign', 'previous', 'emp_var_rate', 'cons_price_idx', 'cons_conf_idx', 'euribor3m', 'nr_employed']
(bank_df, labelindexer) = get_dummy(bank_df, categoricalColumns, numericCols, 'y')
bank_df.show(5)

+---+---------+-------+-----------+-------+----+---------+-----+-----------+--------+--------+--------+-----------+------------+--------------+-------------+---------+-----------+--------------------+------------+-----+
|age|      job|marital|  education|housing|loan|  contact|month|day_of_week|duration|campaign|previous|   poutcome|emp_var_rate|cons_price_idx|cons_conf_idx|euribor3m|nr_employed|            features|indexedLabel|label|
+---+---------+-------+-----------+-------+----+---------+-----+-----------+--------+--------+--------+-----------+------------+--------------+-------------+---------+-----------+--------------------+------------+-----+
| 56|housemaid|married|   basic.4y|     no|  no|telephone|  may|        mon|     261|       1|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|     5191.0|(48,[8,11,18,22,2...|         0.0|   no|
| 57| services|married|high.school|     no|  no|telephone|  may|        mon|     149|       1|       0|nonexistent|     

In [10]:
# fit the following featureIndexer model on the whole of the bank_df dataframe
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(bank_df)

featureIndexer.transform(bank_df).show(5)

+---+---------+-------+-----------+-------+----+---------+-----+-----------+--------+--------+--------+-----------+------------+--------------+-------------+---------+-----------+--------------------+------------+-----+--------------------+
|age|      job|marital|  education|housing|loan|  contact|month|day_of_week|duration|campaign|previous|   poutcome|emp_var_rate|cons_price_idx|cons_conf_idx|euribor3m|nr_employed|            features|indexedLabel|label|     indexedFeatures|
+---+---------+-------+-----------+-------+----+---------+-----+-----------+--------+--------+--------+-----------+------------+--------------+-------------+---------+-----------+--------------------+------------+-----+--------------------+
| 56|housemaid|married|   basic.4y|     no|  no|telephone|  may|        mon|     261|       1|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|     5191.0|(48,[8,11,18,22,2...|         0.0|   no|(48,[8,11,18,22,2...|
| 57| services|married|high.school| 

In [11]:
bank_df.show(5, False)

+---+---------+-------+-----------+-------+----+---------+-----+-----------+--------+--------+--------+-----------+------------+--------------+-------------+---------+-----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+-----+
|age|job      |marital|education  |housing|loan|contact  |month|day_of_week|duration|campaign|previous|poutcome   |emp_var_rate|cons_price_idx|cons_conf_idx|euribor3m|nr_employed|features                                                                                                                                                                              |indexedLabel|label|
+---+---------+-------+-----------+-------+----+---------+-----+-----------+--------+--------+--------+-----------+------------+--------------+-------------+---------+-----------+---------------------------------------------------------

### Data Splitting

In [12]:
# split dataset to trainingData and testData

(trainingData, testData) = bank_df.randomSplit([0.7, 0.3], seed=10)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 28758
Test Dataset Count: 12408


In [13]:
# print sample of trainingData and testData

print("The first 5 samples of the Training Dataset:")
trainingData.show(5, False)
print("The first 5 samples of the Test Dataset:")
testData.show(5, False)

The first 5 samples of the Training Dataset:
+---+-------+-------+-----------+-------+----+---------+-----+-----------+--------+--------+--------+-----------+------------+--------------+-------------+---------+-----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+-----+
|age|job    |marital|education  |housing|loan|contact  |month|day_of_week|duration|campaign|previous|poutcome   |emp_var_rate|cons_price_idx|cons_conf_idx|euribor3m|nr_employed|features                                                                                                                                                                                               |indexedLabel|label|
+---+-------+-------+-----------+-------+----+---------+-----+-----------+--------+--------+--------+-----------+------------+--------------+-------------+------

# Fit and Evaluate Models

### Logistic Regression

In [14]:
# build logistic regression model
lr = LogisticRegression(labelCol="indexedLabel", featuresCol="features")

### Pipeline Architecture

In [15]:
#build Pipeline architecture

#convert indexed labels back to original labels
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=labelindexer.labels) 

#chain indexers and tree in Pipeline
pipeline = Pipeline(stages=[featureIndexer, lr, labelConverter])

#train model
lrModel = pipeline.fit(trainingData)

### Predictions

In [16]:
# Make predictions on the test data using the transform() method.
predictions = lrModel.transform(testData)
predictions.show(5)

+---+-------+-------+-----------+-------+----+--------+-----+-----------+--------+--------+--------+-----------+------------+--------------+-------------+---------+-----------+--------------------+------------+-----+--------------------+--------------------+--------------------+----------+--------------+
|age|    job|marital|  education|housing|loan| contact|month|day_of_week|duration|campaign|previous|   poutcome|emp_var_rate|cons_price_idx|cons_conf_idx|euribor3m|nr_employed|            features|indexedLabel|label|     indexedFeatures|       rawPrediction|         probability|prediction|predictedLabel|
+---+-------+-------+-----------+-------+----+--------+-----+-----------+--------+--------+--------+-----------+------------+--------------+-------------+---------+-----------+--------------------+------------+-----+--------------------+--------------------+--------------------+----------+--------------+
| 18|student| single|   basic.4y|    yes| yes|cellular|  apr|        thu|     184|

In [17]:
#select some columns from predictions dataframe
predictions.select("features", "label", "probability", "predictedLabel").show(5)

+--------------------+-----+--------------------+--------------+
|            features|label|         probability|predictedLabel|
+--------------------+-----+--------------------+--------------+
|(48,[10,12,18,21,...|   no|[0.89808906089028...|            no|
|(48,[10,12,15,21,...|   no|[0.75292475818045...|            no|
|(48,[10,12,16,22,...|   no|[0.90942266587252...|            no|
|(48,[10,12,16,22,...|   no|[0.94317192199486...|            no|
|(48,[10,12,16,21,...|  yes|[0.81292751406678...|            no|
+--------------------+-----+--------------------+--------------+
only showing top 5 rows



In [18]:
# save bank_addtional_prediction to csv
import pandas as pd
bank_additional_prediction = predictions.select('age', 'job', 'marital', 'education', 'housing', 'loan', 'contact', 'month', 'day_of_week', 'duration', 'campaign', 'previous', 'poutcome', 'emp_var_rate', 'cons_price_idx', 'cons_conf_idx', 'euribor3m', 'nr_employed', 'features', 'label', 'probability', 'predictedLabel')
bank_additional_prediction.toPandas().to_csv('bank_additional_prediction.csv', header = True)

In [19]:
# get the model from stages
model = lrModel.stages[1]
model

LogisticRegressionModel: uid=LogisticRegression_4146bfaacc88, numClasses=2, numFeatures=48

In [20]:
# get features names from encoded feature
from itertools import chain

attrs = sorted(
    (attr["idx"], attr["name"]) for attr in (chain(*predictions
        .schema[model.summary.featuresCol]
        .metadata["ml_attr"]["attrs"].values())))

[(name, model.coefficients[idx]) for idx, name in attrs]

[('job_indexed_encoded_admin.', 0.08924866289698549),
 ('job_indexed_encoded_blue-collar', -0.24583594553044985),
 ('job_indexed_encoded_technician', -0.02380195005234879),
 ('job_indexed_encoded_services', -0.10140805567917176),
 ('job_indexed_encoded_management', -0.10213303493281291),
 ('job_indexed_encoded_retired', 0.24856611859861943),
 ('job_indexed_encoded_entrepreneur', -0.040485407906996715),
 ('job_indexed_encoded_self-employed', -0.13684451567532407),
 ('job_indexed_encoded_housemaid', 0.04176295724993643),
 ('job_indexed_encoded_unemployed', 0.03186790174864883),
 ('job_indexed_encoded_student', 0.12259598114110253),
 ('marital_indexed_encoded_married', -0.04413899022758612),
 ('marital_indexed_encoded_single', 0.04813474416240916),
 ('marital_indexed_encoded_divorced', -0.041962393822326964),
 ('education_indexed_encoded_university.degree', 0.10944950804034131),
 ('education_indexed_encoded_high.school', -0.05192568576235654),
 ('education_indexed_encoded_basic.9y', -0.11

### Compute the model accuracy

In [21]:
cm = predictions.select("label", "predictedLabel")          
cm.groupby('label').agg({'label': 'count'}).show()  
cm.groupby('predictedLabel').agg({'predictedLabel': 'count'}).show()

+-----+------------+
|label|count(label)|
+-----+------------+
|   no|       11028|
|  yes|        1380|
+-----+------------+

+--------------+---------------------+
|predictedLabel|count(predictedLabel)|
+--------------+---------------------+
|            no|                11582|
|           yes|                  826|
+--------------+---------------------+



In [22]:
#group predictions by label and predictedLabel
predictions.groupBy('label', 'predictedLabel').count().show()

+-----+--------------+-----+
|label|predictedLabel|count|
+-----+--------------+-----+
|   no|            no|10753|
|   no|           yes|  275|
|  yes|           yes|  551|
|  yes|            no|  829|
+-----+--------------+-----+



In [23]:
print("The Accuracy for test set is {}".format(cm.filter(cm.label == cm.predictedLabel).count()/cm.count()))

The Accuracy for test set is 0.9110251450676983


In [24]:
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
print("The Accuracy for test set is {}".format(evaluator.evaluate(predictions)))

The Accuracy for test set is 0.9110251450676983
