# Spark Preparation
We check if we are in Google Colab.  If this is the case, install all necessary packages.

To run spark in Colab, we need to first install all the dependencies in Colab environment i.e. Apache Spark 3.3.2 with hadoop 3.3, Java 8 and Findspark to locate the spark in the system. The tools installation can be carried out inside the Jupyter Notebook of the Colab.
Learn more from [A Must-Read Guide on How to Work with PySpark on Google Colab for Data Scientists!](https://www.analyticsvidhya.com/blog/2020/11/a-must-read-guide-on-how-to-work-with-pyspark-on-google-colab-for-data-scientists/)

In [1]:
try:
  import google.colab
  IN_COLAB = True
except:
  IN_COLAB = False

In [2]:
if IN_COLAB:
    !apt-get install openjdk-8-jdk-headless -qq > /dev/null
    !wget -q https://dlcdn.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
    !tar xf spark-3.3.2-bin-hadoop3.tgz
    !mv spark-3.3.2-bin-hadoop3 spark
    !pip install -q findspark
    import os
    os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
    os.environ["SPARK_HOME"] = "/content/spark"

# Start a Local Cluster
Use findspark.init() to start a local cluster.  If you plan to use remote cluster, skip the findspark.init() and change the cluster_url according.

In [3]:
import findspark
findspark.init()

In [4]:
spark_url = 'local'

In [5]:
from pyspark.sql import SparkSession

In [6]:
spark = SparkSession.builder\
        .master(spark_url)\
        .appName('Spark ML')\
        .getOrCreate()

24/10/22 14:52:02 WARN Utils: Your hostname, Natawuts-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 10.203.212.239 instead (on interface en0)
24/10/22 14:52:02 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/22 14:52:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Spark SQL Data Preparation

First, we read a csv file.  We can provide option such as delimiter and header.  We then rename the colume names to remove dot ('.') in the names.

In [7]:
from pyspark.sql.functions import col

In [8]:
path = 'bank-additional-full.csv'

In [9]:
df = spark.read.option("delimiter", ";").option("header", True).csv(path)
cols = [c.replace('.', '_') for c in df.columns]
df = df.toDF(*cols)

In [10]:
cols = ['age', 'duration', 'campaign', 'pdays', 'previous', 'nr_employed']
for c in cols:
    df = df.withColumn(c, col(c).cast('int'))

cols = ['emp_var_rate', 'cons_price_idx', 'cons_conf_idx', 'euribor3m']
for c in cols:
    df = df.withColumn(c, col(c).cast('double'))
    
df = df.withColumn('label', df.y.cast('boolean').cast('int'))

In [11]:
df.show(10)

+---+-----------+-------+-------------------+-------+-------+----+---------+-----+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+-----+
|age|        job|marital|          education|default|housing|loan|  contact|month|day_of_week|duration|campaign|pdays|previous|   poutcome|emp_var_rate|cons_price_idx|cons_conf_idx|euribor3m|nr_employed|  y|label|
+---+-----------+-------+-------------------+-------+-------+----+---------+-----+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+-----+
| 56|  housemaid|married|           basic.4y|     no|     no|  no|telephone|  may|        mon|     261|       1|  999|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|       5191| no|    0|
| 57|   services|married|        high.school|unknown|     no|  no|telephone|  may|        mon|     149|       1|  999|       0|nonexistent|     

# Split data
We split data into 80% training and 20% testing data

In [12]:
train_df, test_df = df.randomSplit([0.8,0.2])

In [13]:
train_df.count()

[Stage 2:>                                                          (0 + 1) / 1]                                                                                

33114

In [14]:
test_df.count()

8074

# Spark ML Pipeline
Pipeline is a serie of data transformation to transform data for training and inferring.  A column can contain categorical data or numerical data:
- For categorical data, we have to convert to unique numeric value using **'StringIndexer'** and perform feature encoding with **'OneHotEncoder'**.
- For numerical data, we do not have to do anything.

Once we transform all features, we vectorize them into a single column.

In [15]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline

We first setup a pipeline of all data transformation.
- First, we transform all selected string columns
  + use a Transformer, *StringIndexer*, to encode labels in the column to indices (stored in columnnameIndex)
  + perform one hot encoder on the index to map the label index to a binary vector

In [16]:
stages = []

In [17]:
categoricalAttributes = ['job', 'marital', 'education', 'default', 
                         'housing', 'loan', 'contact', 
                         'month', 'day_of_week', 'poutcome']
for columnName in categoricalAttributes:
    stringIndexer = StringIndexer(inputCol=columnName, outputCol=columnName+ "Index")
    stages.append(stringIndexer)
    oneHotEncoder = OneHotEncoder(inputCol=columnName+ "Index", outputCol=columnName + "Vec")
    stages.append(oneHotEncoder)
    
categoricalCols = [s + "Vec" for s in categoricalAttributes]

In [18]:
numericColumns = ['age', 'campaign', 'pdays', 'previous',
           'emp_var_rate', 'cons_price_idx', 'cons_conf_idx', 
                  'euribor3m', 'nr_employed']

In [19]:
# Combine all the feature columns into a single column in the dataframe

allFeatureCols =  numericColumns + categoricalCols
vectorAssembler = VectorAssembler(
    inputCols=allFeatureCols,
    outputCol="features")
stages.append(vectorAssembler)

In [20]:
stages

[StringIndexer_ffda055fc968,
 OneHotEncoder_f01dd5c1f638,
 StringIndexer_af40fc88937a,
 OneHotEncoder_d902a6dcca16,
 StringIndexer_7a687dfc6c75,
 OneHotEncoder_93f526a26441,
 StringIndexer_ab9c789a4a2d,
 OneHotEncoder_709cd86bec4d,
 StringIndexer_7731bc85e958,
 OneHotEncoder_e104a753a6f7,
 StringIndexer_c9ae083d764b,
 OneHotEncoder_3ed117ab8bf2,
 StringIndexer_5e4cc95bedde,
 OneHotEncoder_d229e73e270a,
 StringIndexer_97f5da83f206,
 OneHotEncoder_b3e234e05319,
 StringIndexer_069c072a16fb,
 OneHotEncoder_24f0244f036b,
 StringIndexer_c32e815f20df,
 OneHotEncoder_5cfac4997319,
 VectorAssembler_e769a3c42ebd]

# Feature Extraction Pipeline
We build 2 pipelines, feature transformation pipeline and ML pipeline.  This allows us to reuse the feature extraction pipeline with several ML algorithms.  **'fit'** method is called to create a model and we can use **'transform'** to actual transform or infer data

In [21]:
# Build pipeline for feature extraction

featurePipeline = Pipeline(stages=stages)
featureOnlyModel = featurePipeline.fit(train_df)

When we 'fit' a pipeline to the data, we have the model.
As we put only 'Transfomer' in the pipeline, the model is for feature extraction only.

We apply our feature extraction model with 'transform' operation to our training and testing data 
to create new DataFrames with 'features' column that can be used in the next pipeline.

In [22]:
trainingFeaturesDf = featureOnlyModel.transform(train_df)
testFeaturesDf = featureOnlyModel.transform(test_df)

trainingFeaturesDF and testFeaturesDF are training and testing DataFrames with feature columns

In [23]:
trainingFeaturesDf.show(1)

24/10/22 14:55:55 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+---+-------+-------+---------+-------+-------+-------+--------+-----+-----------+--------+--------+-----+--------+--------+------------+--------------+-------------+---------+-----------+---+-----+--------+---------------+------------+-------------+--------------+-------------+------------+-------------+------------+----------+---------+---------+------------+-------------+----------+-------------+----------------+--------------+-------------+-------------+--------------------+
|age|    job|marital|education|default|housing|   loan| contact|month|day_of_week|duration|campaign|pdays|previous|poutcome|emp_var_rate|cons_price_idx|cons_conf_idx|euribor3m|nr_employed|  y|label|jobIndex|         jobVec|maritalIndex|   maritalVec|educationIndex| educationVec|defaultIndex|   defaultVec|housingIndex|housingVec|loanIndex|  loanVec|contactIndex|   contactVec|monthIndex|     monthVec|day_of_weekIndex|day_of_weekVec|poutcomeIndex|  poutcomeVec|            features|
+---+-------+-------+---------+-

In [24]:
set(trainingFeaturesDf.columns) - set(train_df.columns)

{'contactIndex',
 'contactVec',
 'day_of_weekIndex',
 'day_of_weekVec',
 'defaultIndex',
 'defaultVec',
 'educationIndex',
 'educationVec',
 'features',
 'housingIndex',
 'housingVec',
 'jobIndex',
 'jobVec',
 'loanIndex',
 'loanVec',
 'maritalIndex',
 'maritalVec',
 'monthIndex',
 'monthVec',
 'poutcomeIndex',
 'poutcomeVec'}

In [25]:
# Peek into training features

trainingFeaturesDf.select("features", "label").rdd.take(5)

                                                                                

[Row(features=SparseVector(52, {0: 17.0, 1: 2.0, 2: 999.0, 3: 1.0, 4: -2.9, 5: 92.201, 6: -31.4, 7: 0.869, 8: 5076.0, 19: 1.0, 21: 1.0, 25: 1.0, 30: 1.0, 36: 1.0, 39: 1.0, 51: 1.0}), label=1),
 Row(features=SparseVector(52, {0: 17.0, 1: 3.0, 2: 4.0, 3: 2.0, 4: -2.9, 5: 92.201, 6: -31.4, 7: 0.869, 8: 5076.0, 19: 1.0, 21: 1.0, 25: 1.0, 30: 1.0, 32: 1.0, 34: 1.0, 36: 1.0, 39: 1.0}), label=0),
 Row(features=SparseVector(52, {0: 17.0, 1: 2.0, 2: 999.0, 3: 2.0, 4: -2.9, 5: 92.201, 6: -31.4, 7: 0.869, 8: 5076.0, 19: 1.0, 21: 1.0, 25: 1.0, 30: 1.0, 32: 1.0, 34: 1.0, 36: 1.0, 39: 1.0, 51: 1.0}), label=0),
 Row(features=SparseVector(52, {0: 17.0, 1: 1.0, 2: 2.0, 3: 2.0, 4: -3.4, 5: 92.431, 6: -26.9, 7: 0.742, 8: 5017.0, 19: 1.0, 21: 1.0, 29: 1.0, 30: 1.0, 33: 1.0, 35: 1.0, 36: 1.0, 43: 1.0, 49: 1.0}), label=1),
 Row(features=SparseVector(52, {0: 17.0, 1: 3.0, 2: 4.0, 3: 2.0, 4: -2.9, 5: 92.201, 6: -31.4, 7: 0.884, 8: 5076.0, 19: 1.0, 21: 1.0, 29: 1.0, 30: 1.0, 32: 1.0, 34: 1.0, 36: 1.0, 39: 1.0,

# Machine Learning Pipeline

Spark ML supports several standard ML algorithm.  In this example, we demonstrate how to use logistic regression and decision tree models.

In [26]:
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier

Calculate accuracy

In [27]:
def calculateAccuracy(results):
    correct = results.filter(results['label'] == results['prediction']).count()
    total = results.count()
    return 1.0*correct/total

## Logistic Regression Model
Configure an machine learning pipeline, which consists of only one stage containing an estimator (classification) (Logistic regression in this case)

In [28]:
lr = LogisticRegression(maxIter=10, regParam=0.01)
lrPipeline = Pipeline(stages=[lr])

Fit the pipeline to create a model from the training data.  The logistic regression estimator looks for column 'features' and 'labels' to create an ML model.

In [29]:
lrPipelineModel = lrPipeline.fit(trainingFeaturesDf)

24/10/22 15:01:46 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/10/22 15:01:46 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
                                                                                

With our trained model, we transform testFeaturesDf to predict the results.  The predicted results are stored in 'prediciton' column.  We then use our calculateAccuracy function to calculate the results.

In [30]:
results = lrPipelineModel.transform(testFeaturesDf)
print('LogisticRegression Model test accuracy = ', calculateAccuracy(results))

LogisticRegression Model test accuracy =  0.9030220460738172


In [31]:
results.select('label', 'prediction').rdd.take(5)

[Row(label=0, prediction=0.0),
 Row(label=1, prediction=1.0),
 Row(label=0, prediction=0.0),
 Row(label=1, prediction=1.0),
 Row(label=1, prediction=1.0)]

In [32]:
results.show(5)

+---+-------+-------+-----------+-------+-------+----+---------+-----+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+-----+--------+---------------+------------+-------------+--------------+-------------+------------+-------------+------------+-------------+---------+-------------+------------+-------------+----------+-------------+----------------+--------------+-------------+-------------+--------------------+--------------------+--------------------+----------+
|age|    job|marital|  education|default|housing|loan|  contact|month|day_of_week|duration|campaign|pdays|previous|   poutcome|emp_var_rate|cons_price_idx|cons_conf_idx|euribor3m|nr_employed|  y|label|jobIndex|         jobVec|maritalIndex|   maritalVec|educationIndex| educationVec|defaultIndex|   defaultVec|housingIndex|   housingVec|loanIndex|      loanVec|contactIndex|   contactVec|monthIndex|     monthVec|day_of_weekIndex|day_of_weekVec|poutcomeI

## DecisionTree Model

Once again a ML pipeline is created with only an estimator in the pipeline.  We then fit the pipeline with the trainingFeaturesDf to train a model.  Then, we transform testFeaturesDf to predict the results.

In [33]:
dt = DecisionTreeClassifier(labelCol='label', featuresCol='features')
dtPipeline = Pipeline(stages=[dt])

In [34]:
dtPipelineModel = dtPipeline.fit(trainingFeaturesDf)

In [35]:
results = dtPipelineModel.transform(testFeaturesDf)
print('DecisionTree Model test accuracy = ', calculateAccuracy(results))

DecisionTree Model test accuracy =  0.9035174634629676


In [36]:
testFeaturesDf.columns

['age',
 'job',
 'marital',
 'education',
 'default',
 'housing',
 'loan',
 'contact',
 'month',
 'day_of_week',
 'duration',
 'campaign',
 'pdays',
 'previous',
 'poutcome',
 'emp_var_rate',
 'cons_price_idx',
 'cons_conf_idx',
 'euribor3m',
 'nr_employed',
 'y',
 'label',
 'jobIndex',
 'jobVec',
 'maritalIndex',
 'maritalVec',
 'educationIndex',
 'educationVec',
 'defaultIndex',
 'defaultVec',
 'housingIndex',
 'housingVec',
 'loanIndex',
 'loanVec',
 'contactIndex',
 'contactVec',
 'monthIndex',
 'monthVec',
 'day_of_weekIndex',
 'day_of_weekVec',
 'poutcomeIndex',
 'poutcomeVec',
 'features']

In [37]:
results.columns

['age',
 'job',
 'marital',
 'education',
 'default',
 'housing',
 'loan',
 'contact',
 'month',
 'day_of_week',
 'duration',
 'campaign',
 'pdays',
 'previous',
 'poutcome',
 'emp_var_rate',
 'cons_price_idx',
 'cons_conf_idx',
 'euribor3m',
 'nr_employed',
 'y',
 'label',
 'jobIndex',
 'jobVec',
 'maritalIndex',
 'maritalVec',
 'educationIndex',
 'educationVec',
 'defaultIndex',
 'defaultVec',
 'housingIndex',
 'housingVec',
 'loanIndex',
 'loanVec',
 'contactIndex',
 'contactVec',
 'monthIndex',
 'monthVec',
 'day_of_weekIndex',
 'day_of_weekVec',
 'poutcomeIndex',
 'poutcomeVec',
 'features',
 'rawPrediction',
 'probability',
 'prediction']