# ***Pyspark***


First of all, let’s define what is apache spark.
Apache Spark is an open-source system that is distributed processing system used for handling and processing big data.
It provides different APIs in Scala, Java, R, and Python. So, Pyspark is the python API of Spark. 
This approach is used to compare the results between results achieved using normal models from sklearn with the execution of the same models using pyspark.
The steps that we did as follows:

**•	Install pyspark in the colab environment.**

**•	Create a Spark session.**

> Spark session is the entry point of the spark application.

**•	Load the dataset**

> Load the CSV data with defining options such as delimiter, header, and file format.

**•	Perform some exploratory data analysis**

> To make sure that data is loaded properly and check the number of null values in each column.

**•	Apply the same data preprocessing steps such as the normal models**


> Drop unwanted columns which are day and month.

> Categorical columns encoding. In sklearn we used the label encoder while in spark we used the string indexer.

> Fix columns’ data types to be double (numeric) instead of string format. 

**•	Create the feature column**

> This is an additional step to create the feature column which is the concatenation of all columns’ features.

**•	Split the data into training and testing**

> Instead of using the train_test_split function from sklearn, we used the random split function.

**•	Build the models’ architecture**

> We applied three models which are:

1.   Random Forest

2.   Support Vector Machine

3.   Logistic Regression


In [36]:
!pip install pyspark


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [37]:
# Create spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("SparkByExamples.com").getOrCreate()

In [38]:
import pyspark.sql.functions as F
import matplotlib.pyplot as plt

In [39]:
# Data Loading and showing first 5 records 
file = '/content/bank-data.csv'
df = spark.read.option("delimiter", ";").option("header",True).csv(file)
df.show(5)

+---+------------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+
|age|         job|marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+------------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+
| 58|  management|married| tertiary|     no|   2143|    yes|  no|unknown|  5|  may|     261|       1|   -1|       0| unknown| no|
| 44|  technician| single|secondary|     no|     29|    yes|  no|unknown|  5|  may|     151|       1|   -1|       0| unknown| no|
| 33|entrepreneur|married|secondary|     no|      2|    yes| yes|unknown|  5|  may|      76|       1|   -1|       0| unknown| no|
| 47| blue-collar|married|  unknown|     no|   1506|    yes|  no|unknown|  5|  may|      92|       1|   -1|       0| unknown| no|
| 33|     unknown| single|  unknown|     no|      1|     no|  no|unknown|  5|  may|     19

In [41]:
# print the data schema (data types in each column)
df.printSchema()

root
 |-- age: string (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: string (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: string (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- campaign: string (nullable = true)
 |-- pdays: string (nullable = true)
 |-- previous: string (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- y: string (nullable = true)



In [42]:
# Dataframe shape

print((df.count(), len(df.columns)))

(45211, 17)


In [43]:
# print data summary (like describe method in pandas)

df.summary().show()

+-------+------------------+-------+--------+---------+-------+------------------+-------+-----+--------+-----------------+-----+------------------+-----------------+------------------+------------------+--------+-----+
|summary|               age|    job| marital|education|default|           balance|housing| loan| contact|              day|month|          duration|         campaign|             pdays|          previous|poutcome|    y|
+-------+------------------+-------+--------+---------+-------+------------------+-------+-----+--------+-----------------+-----+------------------+-----------------+------------------+------------------+--------+-----+
|  count|             45211|  45211|   45211|    45211|  45211|             45211|  45211|45211|   45211|            45211|45211|             45211|            45211|             45211|             45211|   45211|45211|
|   mean| 40.93621021432837|   null|    null|     null|   null|1362.2720576850766|   null| null|    null|15.806418791886

In [44]:
# Count the number of missing values in each column

from pyspark.sql.functions import isnan, when, count, col
df.select([count(when(isnan(column), column)).alias(column) for column in df.columns]).show()

+---+---+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+
|age|job|marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+---+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+
|  0|  0|      0|        0|      0|      0|      0|   0|      0|  0|    0|       0|       0|    0|       0|       0|  0|
+---+---+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+



In [45]:
# For loop to print the unique values in each column

for column in df.columns:
  print(column)
  print(df.select(F.collect_set(column).alias(column)).first()[column])
  print('*' * 50)

age
['87', '66', '94', '76', '81', '82', '58', '56', '38', '92', '51', '78', '90', '41', '21', '52', '49', '45', '62', '72', '37', '42', '22', '33', '25', '74', '73', '84', '75', '77', '79', '69', '43', '60', '18', '55', '29', '88', '30', '47', '32', '85', '71', '86', '83', '44', '46', '31', '68', '80', '48', '61', '19', '23', '26', '39', '95', '65', '67', '93', '24', '28', '59', '35', '57', '70', '36', '20', '89', '40', '27', '63', '53', '54', '34', '64', '50']
**************************************************
job
['housemaid', 'retired', 'services', 'management', 'unknown', 'blue-collar', 'admin.', 'student', 'entrepreneur', 'unemployed', 'self-employed', 'technician']
**************************************************
marital
['married', 'divorced', 'single']
**************************************************
education
['unknown', 'tertiary', 'secondary', 'primary']
**************************************************
default
['no', 'yes']
********************************************

In [46]:
# Drop unwanted columns (day , month)

print(len(df.columns))
to_drop = ['day','month']
df = df.select([column for column in df.columns if column not in to_drop])
len(df.columns)

17


15

In [47]:
df.show(5)

+---+------------+-------+---------+-------+-------+-------+----+-------+--------+--------+-----+--------+--------+---+
|age|         job|marital|education|default|balance|housing|loan|contact|duration|campaign|pdays|previous|poutcome|  y|
+---+------------+-------+---------+-------+-------+-------+----+-------+--------+--------+-----+--------+--------+---+
| 58|  management|married| tertiary|     no|   2143|    yes|  no|unknown|     261|       1|   -1|       0| unknown| no|
| 44|  technician| single|secondary|     no|     29|    yes|  no|unknown|     151|       1|   -1|       0| unknown| no|
| 33|entrepreneur|married|secondary|     no|      2|    yes| yes|unknown|      76|       1|   -1|       0| unknown| no|
| 47| blue-collar|married|  unknown|     no|   1506|    yes|  no|unknown|      92|       1|   -1|       0| unknown| no|
| 33|     unknown| single|  unknown|     no|      1|     no|  no|unknown|     198|       1|   -1|       0| unknown| no|
+---+------------+-------+---------+----

In [48]:
# Select the categorical columns that need to be encoded

cat_columns = ['job','marital','education','default','housing','loan','contact','poutcome','y']
cat_df = df.select([column for column in df.columns if column in cat_columns])

In [49]:
# Apply the StringIndexer to convert from textual format to numeric format (numerics are stored in new columns)

from pyspark.ml.feature import StringIndexer
for col in cat_columns:
  indexer_object = StringIndexer(inputCol=col, outputCol="Encoded_"+col) 
  indexer_object.fit(cat_df)
  df = indexer_object.fit(df).transform(df)
df.show(5)

+---+------------+-------+---------+-------+-------+-------+----+-------+--------+--------+-----+--------+--------+---+-----------+---------------+-----------------+---------------+---------------+------------+---------------+----------------+---------+
|age|         job|marital|education|default|balance|housing|loan|contact|duration|campaign|pdays|previous|poutcome|  y|Encoded_job|Encoded_marital|Encoded_education|Encoded_default|Encoded_housing|Encoded_loan|Encoded_contact|Encoded_poutcome|Encoded_y|
+---+------------+-------+---------+-------+-------+-------+----+-------+--------+--------+-----+--------+--------+---+-----------+---------------+-----------------+---------------+---------------+------------+---------------+----------------+---------+
| 58|  management|married| tertiary|     no|   2143|    yes|  no|unknown|     261|       1|   -1|       0| unknown| no|        1.0|            0.0|              1.0|            0.0|            0.0|         0.0|            1.0|            

In [50]:
# Print the value counts for target column before and after encoding

df.groupBy('y').count().show()

df.groupBy('Encoded_y').count().show()

+---+-----+
|  y|count|
+---+-----+
| no|39922|
|yes| 5289|
+---+-----+

+---------+-----+
|Encoded_y|count|
+---------+-----+
|      0.0|39922|
|      1.0| 5289|
+---------+-----+



In [51]:
# casting data types for string columns to be double 

from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col
Corrected_Typedf = df.withColumn("age", col("age").cast("double"))
Corrected_Typedf1 = Corrected_Typedf.withColumn("balance", col("balance").cast("double"))
Corrected_Typedf2 = Corrected_Typedf1.withColumn("duration", col("duration").cast("double"))
Corrected_Typedf3 = Corrected_Typedf2.withColumn("pdays", col("pdays").cast("double"))
Corrected_Typedf4 = Corrected_Typedf3.withColumn("campaign", col("campaign").cast("double"))
Corrected_Typedf_final = Corrected_Typedf4.withColumn("previous", col("previous").cast("double"))

Corrected_Typedf_final.show(5)

+----+------------+-------+---------+-------+-------+-------+----+-------+--------+--------+-----+--------+--------+---+-----------+---------------+-----------------+---------------+---------------+------------+---------------+----------------+---------+
| age|         job|marital|education|default|balance|housing|loan|contact|duration|campaign|pdays|previous|poutcome|  y|Encoded_job|Encoded_marital|Encoded_education|Encoded_default|Encoded_housing|Encoded_loan|Encoded_contact|Encoded_poutcome|Encoded_y|
+----+------------+-------+---------+-------+-------+-------+----+-------+--------+--------+-----+--------+--------+---+-----------+---------------+-----------------+---------------+---------------+------------+---------------+----------------+---------+
|58.0|  management|married| tertiary|     no| 2143.0|    yes|  no|unknown|   261.0|     1.0| -1.0|     0.0| unknown| no|        1.0|            0.0|              1.0|            0.0|            0.0|         0.0|            1.0|        

In [53]:
Corrected_Typedf_final.printSchema()

root
 |-- age: double (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: double (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- campaign: double (nullable = true)
 |-- pdays: double (nullable = true)
 |-- previous: double (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- y: string (nullable = true)
 |-- Encoded_job: double (nullable = false)
 |-- Encoded_marital: double (nullable = false)
 |-- Encoded_education: double (nullable = false)
 |-- Encoded_default: double (nullable = false)
 |-- Encoded_housing: double (nullable = false)
 |-- Encoded_loan: double (nullable = false)
 |-- Encoded_contact: double (nullable = false)
 |-- Encoded_poutcome: double (nullable = false)
 |-- Encoded_y: double (nullable = false)



In [55]:
# Select only encoded and numeric values to be used in model training

final_cols = ['Encoded_job','Encoded_marital','Encoded_education','Encoded_default','Encoded_housing','Encoded_loan','Encoded_contact','Encoded_poutcome',
              'age','balance','duration','campaign','pdays','previous','Encoded_y']
len(final_cols)

15

In [57]:
final_df = Corrected_Typedf_final.select([c for c in df.columns if c in final_cols])
final_df.show(5)

+----+-------+--------+--------+-----+--------+-----------+---------------+-----------------+---------------+---------------+------------+---------------+----------------+---------+
| age|balance|duration|campaign|pdays|previous|Encoded_job|Encoded_marital|Encoded_education|Encoded_default|Encoded_housing|Encoded_loan|Encoded_contact|Encoded_poutcome|Encoded_y|
+----+-------+--------+--------+-----+--------+-----------+---------------+-----------------+---------------+---------------+------------+---------------+----------------+---------+
|58.0| 2143.0|   261.0|     1.0| -1.0|     0.0|        1.0|            0.0|              1.0|            0.0|            0.0|         0.0|            1.0|             0.0|      0.0|
|44.0|   29.0|   151.0|     1.0| -1.0|     0.0|        2.0|            1.0|              0.0|            0.0|            0.0|         0.0|            1.0|             0.0|      0.0|
|33.0|    2.0|    76.0|     1.0| -1.0|     0.0|        7.0|            0.0|              0

In [60]:
# Use VectorAssembler to create the features column which represents all features concatenated

from pyspark.ml.feature import VectorAssembler
featurs = ['age','balance','duration','campaign','pdays','previous','Encoded_job','Encoded_marital','Encoded_education','Encoded_default','Encoded_housing',
           'Encoded_loan','Encoded_contact','Encoded_poutcome',]

Assembler_Object = VectorAssembler(inputCols=featurs, outputCol="features")
final = Assembler_Object.transform(final_df)
final.show(5)

+----+-------+--------+--------+-----+--------+-----------+---------------+-----------------+---------------+---------------+------------+---------------+----------------+---------+--------------------+
| age|balance|duration|campaign|pdays|previous|Encoded_job|Encoded_marital|Encoded_education|Encoded_default|Encoded_housing|Encoded_loan|Encoded_contact|Encoded_poutcome|Encoded_y|            features|
+----+-------+--------+--------+-----+--------+-----------+---------------+-----------------+---------------+---------------+------------+---------------+----------------+---------+--------------------+
|58.0| 2143.0|   261.0|     1.0| -1.0|     0.0|        1.0|            0.0|              1.0|            0.0|            0.0|         0.0|            1.0|             0.0|      0.0|(14,[0,1,2,3,4,6,...|
|44.0|   29.0|   151.0|     1.0| -1.0|     0.0|        2.0|            1.0|              0.0|            0.0|            0.0|         0.0|            1.0|             0.0|      0.0|(14,[0,

In [61]:
# Split the data into training and testing sets

train, test = final.randomSplit([0.8, 0.1], seed = 42)
print("Number of training samples : {}".format(train.count()))
print("Number of testing samples : {}".format(test.count()))


Number of training samples : 40142
Number of testing samples : 5069


# Modeling
## Raandom Forest

In [62]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'Encoded_y')
rfModel = rf.fit(train)
rf_predictions = rfModel.transform(test)

In [63]:
rf_predictions.show(5)

+----+-------+--------+--------+-----+--------+-----------+---------------+-----------------+---------------+---------------+------------+---------------+----------------+---------+--------------------+--------------------+--------------------+----------+
| age|balance|duration|campaign|pdays|previous|Encoded_job|Encoded_marital|Encoded_education|Encoded_default|Encoded_housing|Encoded_loan|Encoded_contact|Encoded_poutcome|Encoded_y|            features|       rawPrediction|         probability|prediction|
+----+-------+--------+--------+-----+--------+-----------+---------------+-----------------+---------------+---------------+------------+---------------+----------------+---------+--------------------+--------------------+--------------------+----------+
|18.0|  156.0|   298.0|     2.0| 82.0|     4.0|       10.0|            1.0|              0.0|            0.0|            1.0|         0.0|            0.0|             2.0|      0.0|[18.0,156.0,298.0...|[13.3879348876142...|[0.669396

In [64]:
rf_predictions.select("Encoded_y", "prediction").show(10)

+---------+----------+
|Encoded_y|prediction|
+---------+----------+
|      0.0|       0.0|
|      1.0|       0.0|
|      0.0|       0.0|
|      0.0|       0.0|
|      0.0|       0.0|
|      0.0|       0.0|
|      0.0|       0.0|
|      0.0|       0.0|
|      0.0|       0.0|
|      0.0|       0.0|
+---------+----------+
only showing top 10 rows



In [65]:
# Random Forest Evaluation

from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(labelCol="Encoded_y")
rf_accuracy = evaluator.evaluate(rf_predictions)
print("Random Forest Accuracy equals: {}" .format((rf_accuracy)))
print("Random Forest est Error equals: {}" .format(((1.0 - rf_accuracy))))


Random Forest Accuracy equals: 0.874477555165082
Random Forest est Error equals: 0.12552244483491803


## Support Vactor Classifier

In [67]:
from pyspark.ml.classification import LinearSVC

svc = LinearSVC(featuresCol = 'features', labelCol = 'Encoded_y')
svcModel = svc.fit(train)
svcModel_predictions = svcModel.transform(test)

In [68]:
svcModel_predictions.select("Encoded_y", "prediction").show(10)

+---------+----------+
|Encoded_y|prediction|
+---------+----------+
|      0.0|       0.0|
|      1.0|       0.0|
|      0.0|       0.0|
|      0.0|       0.0|
|      0.0|       0.0|
|      0.0|       0.0|
|      0.0|       0.0|
|      0.0|       0.0|
|      0.0|       0.0|
|      0.0|       0.0|
+---------+----------+
only showing top 10 rows



In [72]:
# Support Vactor Classifier Evaluation

svc_accuracy = evaluator.evaluate(svcModel_predictions)
print("Support Vector Classifier Accuracy equals: {}" .format((svc_accuracy)))
print("Support Vector Classifier est Error equals: {}" .format(((1.0 - svc_accuracy))))


Support Vector Classifier Accuracy equals: 0.8720837789831566
Support Vector Classifier est Error equals: 0.1279162210168434


## Logistic Regression

In [73]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol = 'features', labelCol = 'Encoded_y')
lrModel = lr.fit(train)
lrModel_predictions = lrModel.transform(test)

In [74]:
lrModel_predictions.select("Encoded_y", "prediction").show(10)

+---------+----------+
|Encoded_y|prediction|
+---------+----------+
|      0.0|       1.0|
|      1.0|       0.0|
|      0.0|       0.0|
|      0.0|       0.0|
|      0.0|       0.0|
|      0.0|       0.0|
|      0.0|       0.0|
|      0.0|       0.0|
|      0.0|       0.0|
|      0.0|       0.0|
+---------+----------+
only showing top 10 rows



In [75]:
# Logistic Regression Evaluation

lr_accuracy = evaluator.evaluate(lrModel_predictions)
print("Logistic Regression Accuracy equals: {}" .format((lr_accuracy)))
print("Logistic Regression Test Error equals: {}" .format(((1.0 - lr_accuracy))))


Logistic Regression Accuracy equals: 0.8810703872060314
Logistic Regression Test Error equals: 0.11892961279396863
