# Preprocessing

In [5]:
import pandas as pd;
import numpy as np;
from pyspark.sql import SparkSession

In [6]:
spark = SparkSession.builder.appName('BankDataPractice').getOrCreate()

In [7]:
df = spark.read.csv('bank.csv', header=True, inferSchema=True)

In [8]:
df.show()

+---+-----------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+
|age|        job| marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|deposit|
+---+-----------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+
| 59|     admin.| married|secondary|     no|   2343|    yes|  no|unknown|  5|  may|    1042|       1|   -1|       0| unknown|    yes|
| 56|     admin.| married|secondary|     no|     45|     no|  no|unknown|  5|  may|    1467|       1|   -1|       0| unknown|    yes|
| 41| technician| married|secondary|     no|   1270|    yes|  no|unknown|  5|  may|    1389|       1|   -1|       0| unknown|    yes|
| 55|   services| married|secondary|     no|   2476|    yes|  no|unknown|  5|  may|     579|       1|   -1|       0| unknown|    yes|
| 54|     admin.| married| tertiary|     no|    184|     no|  

Check cache so we can sure that we will read from disk once

In [9]:
df.cache()
df.is_cached

True

Check schema

In [10]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- deposit: string (nullable = true)



Check null values

In [11]:
from pyspark.sql.functions import col, count, when

df.select([count(when(col(c).isNull() , c)).alias(c) for c in df.columns]).show()

+---+---+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+
|age|job|marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|deposit|
+---+---+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+
|  0|  0|      0|        0|      0|      0|      0|   0|      0|  0|    0|       0|       0|    0|       0|       0|      0|
+---+---+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+



Counts: 

In [12]:
df.count()

11162

Categorical Columns: 

In [13]:
df[[item[0] for item in df.dtypes if item[1].startswith('string')]].columns

['job',
 'marital',
 'education',
 'default',
 'housing',
 'loan',
 'contact',
 'month',
 'poutcome',
 'deposit']

In [14]:
catCols = ['job', 'marital', 'education', 'default','housing', 'loan', 'contact', 'poutcome']

Create pipeline to transform each categorical columns using StringIndexer and OneHotEncoded

In [15]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.feature import VectorAssembler

# Define stages for the pipeline
stages = []
for col in catCols:
    # StringIndexer for converting categorical values to numerical indices
    indexer = StringIndexer(inputCol=col, outputCol=col + "_index")
    
    # OneHotEncoder for converting numerical indices to binary vectors
    encoder = OneHotEncoder(dropLast=False,inputCols=[indexer.getOutputCol()], outputCols=[col + "_onehot"])
    
    stages += [indexer, encoder]

# Assemble all OneHotEncoded columns into a single feature vector
assembler = VectorAssembler(inputCols=[col + "_onehot" for col in catCols],
                            outputCol="features")

# Add the VectorAssembler stage to the pipeline
stages.append(assembler)

# Create the pipeline
pipeline = Pipeline(stages=stages)

# Fit the pipeline to the DataFrame
pipelineModel = pipeline.fit(df)

# Apply the transformation to the DataFrame
transformed_df = pipelineModel.transform(df)




After transform: 

In [16]:
transformed_df.show(5)   

+---+----------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+---------+--------------+-------------+--------------+---------------+----------------+-------------+--------------+-------------+--------------+----------+-------------+-------------+--------------+--------------+---------------+--------------------+
|age|       job|marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|deposit|job_index|    job_onehot|marital_index|marital_onehot|education_index|education_onehot|default_index|default_onehot|housing_index|housing_onehot|loan_index|  loan_onehot|contact_index|contact_onehot|poutcome_index|poutcome_onehot|            features|
+---+----------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+---------+--------------+-------------+--------------+---------------+----------------+-------------+--

In [17]:
transformed_df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- deposit: string (nullable = true)
 |-- job_index: double (nullable = false)
 |-- job_onehot: vector (nullable = true)
 |-- marital_index: double (nullable = false)
 |-- marital_onehot: vector (nullable = true)
 |-- education_index: double (nullable = false)
 |-- education_onehot: vector (nullable = true)
 |-- default_index: double (nullable = false)
 |-- default_onehot:

# Machine learning:
Split the data in to 7 - 3

In [18]:
(train, test) = transformed_df.randomSplit([0.7, 0.3],seed = 11)

In [19]:
train.show(5)

+---+-------+-------+---------+-------+-------+-------+----+--------+---+-----+--------+--------+-----+--------+--------+-------+---------+--------------+-------------+--------------+---------------+----------------+-------------+--------------+-------------+--------------+----------+-------------+-------------+--------------+--------------+---------------+--------------------+
|age|    job|marital|education|default|balance|housing|loan| contact|day|month|duration|campaign|pdays|previous|poutcome|deposit|job_index|    job_onehot|marital_index|marital_onehot|education_index|education_onehot|default_index|default_onehot|housing_index|housing_onehot|loan_index|  loan_onehot|contact_index|contact_onehot|poutcome_index|poutcome_onehot|            features|
+---+-------+-------+---------+-------+-------+-------+----+--------+---+-----+--------+--------+-----+--------+--------+-------+---------+--------------+-------------+--------------+---------------+----------------+-------------+--------

In [20]:
test.show(5)

+---+-------+-------+---------+-------+-------+-------+----+---------+---+-----+--------+--------+-----+--------+--------+-------+---------+--------------+-------------+--------------+---------------+----------------+-------------+--------------+-------------+--------------+----------+-------------+-------------+--------------+--------------+---------------+--------------------+
|age|    job|marital|education|default|balance|housing|loan|  contact|day|month|duration|campaign|pdays|previous|poutcome|deposit|job_index|    job_onehot|marital_index|marital_onehot|education_index|education_onehot|default_index|default_onehot|housing_index|housing_onehot|loan_index|  loan_onehot|contact_index|contact_onehot|poutcome_index|poutcome_onehot|            features|
+---+-------+-------+---------+-------+-------+-------+----+---------+---+-----+--------+--------+-----+--------+--------+-------+---------+--------------+-------------+--------------+---------------+----------------+-------------+-----

## Modeling: 

### Logistic Regression
Building logistic regression model on Loans 

In [21]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
lr = LogisticRegression(labelCol="loan_index", featuresCol="features")

lrModel = lr.fit(train)
lrPrediction = lrModel.transform(test)


In [22]:
lrPrediction.show()

+---+----------+-------+---------+-------+-------+-------+----+---------+---+-----+--------+--------+-----+--------+--------+-------+---------+--------------+-------------+--------------+---------------+----------------+-------------+--------------+-------------+--------------+----------+-------------+-------------+--------------+--------------+---------------+--------------------+--------------------+--------------------+----------+
|age|       job|marital|education|default|balance|housing|loan|  contact|day|month|duration|campaign|pdays|previous|poutcome|deposit|job_index|    job_onehot|marital_index|marital_onehot|education_index|education_onehot|default_index|default_onehot|housing_index|housing_onehot|loan_index|  loan_onehot|contact_index|contact_onehot|poutcome_index|poutcome_onehot|            features|       rawPrediction|         probability|prediction|
+---+----------+-------+---------+-------+-------+-------+----+---------+---+-----+--------+--------+-----+--------+--------

In [23]:
evaluator = MulticlassClassificationEvaluator(labelCol="loan_index", predictionCol="prediction", metricName="accuracy")
lrAccuracy = evaluator.evaluate(lrPrediction)
print("Accuracy of LogisticRegression is = %g"% (lrAccuracy))
print("Test Error of LogisticRegression = %g " % (1.0 - lrAccuracy))

Accuracy of LogisticRegression is = 1
Test Error of LogisticRegression = 0 


### DecisionTreeClassifier

In [24]:
from pyspark.ml.classification import DecisionTreeClassifier

dtc = DecisionTreeClassifier(labelCol="loan_index", featuresCol="features")

dtcModel = dtc.fit(train)
dtcPrediction = dtcModel.transform(test)


In [25]:
dtcPrediction.show()

+---+----------+-------+---------+-------+-------+-------+----+---------+---+-----+--------+--------+-----+--------+--------+-------+---------+--------------+-------------+--------------+---------------+----------------+-------------+--------------+-------------+--------------+----------+-------------+-------------+--------------+--------------+---------------+--------------------+-------------+-----------+----------+
|age|       job|marital|education|default|balance|housing|loan|  contact|day|month|duration|campaign|pdays|previous|poutcome|deposit|job_index|    job_onehot|marital_index|marital_onehot|education_index|education_onehot|default_index|default_onehot|housing_index|housing_onehot|loan_index|  loan_onehot|contact_index|contact_onehot|poutcome_index|poutcome_onehot|            features|rawPrediction|probability|prediction|
+---+----------+-------+---------+-------+-------+-------+----+---------+---+-----+--------+--------+-----+--------+--------+-------+---------+-------------

In [26]:
dtcAccuracy = evaluator.evaluate(dtcPrediction)
print("Accuracy of LogisticRegression is = %g"% (dtcAccuracy))
print("Test Error of LogisticRegression = %g " % (1.0 - dtcAccuracy))

Accuracy of LogisticRegression is = 1
Test Error of LogisticRegression = 0 


### RandomForestClassifier
 

In [27]:
from pyspark.ml.classification import RandomForestClassifier

rfc = RandomForestClassifier(labelCol="loan_index", featuresCol="features")

rfcModel = dtc.fit(train)
rfcPrediction = dtcModel.transform(test)


In [28]:
rfcPrediction.show()

+---+----------+-------+---------+-------+-------+-------+----+---------+---+-----+--------+--------+-----+--------+--------+-------+---------+--------------+-------------+--------------+---------------+----------------+-------------+--------------+-------------+--------------+----------+-------------+-------------+--------------+--------------+---------------+--------------------+-------------+-----------+----------+
|age|       job|marital|education|default|balance|housing|loan|  contact|day|month|duration|campaign|pdays|previous|poutcome|deposit|job_index|    job_onehot|marital_index|marital_onehot|education_index|education_onehot|default_index|default_onehot|housing_index|housing_onehot|loan_index|  loan_onehot|contact_index|contact_onehot|poutcome_index|poutcome_onehot|            features|rawPrediction|probability|prediction|
+---+----------+-------+---------+-------+-------+-------+----+---------+---+-----+--------+--------+-----+--------+--------+-------+---------+-------------

In [29]:
rfcAccuracy = evaluator.evaluate(rfcPrediction)
print("Accuracy of LogisticRegression is = %g"% (rfcAccuracy))
print("Test Error of LogisticRegression = %g " % (1.0 - rfcAccuracy))

Accuracy of LogisticRegression is = 1
Test Error of LogisticRegression = 0 
