# Introduction

This notebook is primarly a resource for me to refer back to in regards to using Spark aswell as an introduction to spark in general. I will be explaining terms so that theoretically anyone should be able to understand specific details about Spark. This will be heavily based on this notebook: https://www.kaggle.com/code/kkhandekar/apache-spark-beginner-tutorial 

SparkSession is the main interface allowing the user to interact with Spark. The entry point to programming Spark with the Dataset and DataFrame API.

In [1]:
import numpy as np
import pandas as pd
import sklearn
import pyspark
from pyspark.sql import SparkSession

# Classifier Libraries from Spark
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier, NaiveBayes

# Evaluation Libraries
from pyspark.ml.evaluation import MulticlassClassificationEvaluator 

# Features Libraries
from pyspark.ml.feature import StandardScaler, StringIndexer, VectorAssembler, VectorIndexer, OneHotEncoder

# Pipeline Library
from pyspark.ml import Pipeline

# DenseVector
# Type of vector in MLib which stores all its values as a continous array (Basically a list or vector in C++)
# For comparison, sparse vectors stores only non-zero values and their indices to save memory
# A sparse vector holding [4.0,0.0,5.0,0.0,7.0] will represent it like so: Vectors.sparse(5, [0, 2, 4], [4.0, 5.0, 7.0]) 
# Indices 0, 2, 4 are non zero and there are 5 total elements

from pyspark.ml.linalg import DenseVector

from sklearn.model_selection import train_test_split

from tabulate import tabulate

import gc

import os
print(os.environ.get("PYSPARK_PYTHON"))
print(os.environ.get("PYSPARK_DRIVER_PYTHON"))



C:\Users\ldomi\anaconda3\envs\xgboostENV\python.exe
C:\Users\ldomi\anaconda3\envs\xgboostENV\python.exe


In [2]:
# Each config is a key value pair where the key is what is being accessed and the value is the modification amount
spark = (SparkSession.builder
        .appName('First Spark Project') # Set a name for the applcation which is shown in the web UI
        .config("spark.executor.memory", "1G") # 1 GB of memory is allocated for this executor
        .config("spark.executor.cores", "4") # # 4 CPU cores are allocated for this executor
        .getOrCreate() # Either gets an existing SparkSesson or creates one if it doesn't exist
       )
spark.sparkContext.setLogLevel('INFO')
spark.version

'3.5.4'

In [3]:
file = './Datasets/iris.csv'

# Reading the file in csv format and also confirming it has headers
data = spark.read.format("csv").option("header", "true").load(file)

# Faster reloading
data.cache()

DataFrame[Id: string, SepalLengthCm: string, SepalWidthCm: string, PetalLengthCm: string, PetalWidthCm: string, Species: string]

In [4]:
#Total records
data.count()

150

In [5]:
# Data features and target
data.printSchema()

root
 |-- Id: string (nullable = true)
 |-- SepalLengthCm: string (nullable = true)
 |-- SepalWidthCm: string (nullable = true)
 |-- PetalLengthCm: string (nullable = true)
 |-- PetalWidthCm: string (nullable = true)
 |-- Species: string (nullable = true)



In [6]:
# Show the first 5 records
data.show(5)

+---+-------------+------------+-------------+------------+-----------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|
+---+-------------+------------+-------------+------------+-----------+
|  1|          5.1|         3.5|          1.4|         0.2|Iris-setosa|
|  2|          4.9|         3.0|          1.4|         0.2|Iris-setosa|
|  3|          4.7|         3.2|          1.3|         0.2|Iris-setosa|
|  4|          4.6|         3.1|          1.5|         0.2|Iris-setosa|
|  5|          5.0|         3.6|          1.4|         0.2|Iris-setosa|
+---+-------------+------------+-------------+------------+-----------+
only showing top 5 rows



In [7]:
# Showing how many of each target we have
data.groupBy('species').count().show()

+---------------+-----+
|        species|count|
+---------------+-----+
| Iris-virginica|   50|
|    Iris-setosa|   50|
|Iris-versicolor|   50|
+---------------+-----+



In [8]:
# Summary Statistics
data.describe().show()

+-------+------------------+------------------+-------------------+------------------+------------------+--------------+
|summary|                Id|     SepalLengthCm|       SepalWidthCm|     PetalLengthCm|      PetalWidthCm|       Species|
+-------+------------------+------------------+-------------------+------------------+------------------+--------------+
|  count|               150|               150|                150|               150|               150|           150|
|   mean|              75.5| 5.843333333333335| 3.0540000000000007|3.7586666666666693|1.1986666666666672|          NULL|
| stddev|43.445367992456916|0.8280661279778637|0.43359431136217375| 1.764420419952262|0.7631607417008414|          NULL|
|    min|                 1|               4.3|                2.0|               1.0|               0.1|   Iris-setosa|
|    max|                99|               7.9|                4.4|               6.9|               2.5|Iris-virginica|
+-------+------------------+----

In [9]:
# Converting the targets into numeric format
# StringIndexer is a Spark MLib transformer used to convert categorical string values into numerical values
# Setosa is 0.0, next is 1.0, next is 2.0
# The values are actually assigned in descending order of frequency but in this case they all have the same frequency
SIndexer = StringIndexer(inputCol='Species', outputCol='species_indx')
data = SIndexer.fit(data).transform(data)


data.show(5)

+---+-------------+------------+-------------+------------+-----------+------------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|species_indx|
+---+-------------+------------+-------------+------------+-----------+------------+
|  1|          5.1|         3.5|          1.4|         0.2|Iris-setosa|         0.0|
|  2|          4.9|         3.0|          1.4|         0.2|Iris-setosa|         0.0|
|  3|          4.7|         3.2|          1.3|         0.2|Iris-setosa|         0.0|
|  4|          4.6|         3.1|          1.5|         0.2|Iris-setosa|         0.0|
|  5|          5.0|         3.6|          1.4|         0.2|Iris-setosa|         0.0|
+---+-------------+------------+-------------+------------+-----------+------------+
only showing top 5 rows



In [10]:
# Creating a separate dataframe to drop the Species Label and reorder
df = data.select("species_indx","SepalLengthCm", "SepalWidthCm", "PetalLengthCm", "petalWidthCm")

#Inspect the dataframe
df.show(5)



+------------+-------------+------------+-------------+------------+
|species_indx|SepalLengthCm|SepalWidthCm|PetalLengthCm|petalWidthCm|
+------------+-------------+------------+-------------+------------+
|         0.0|          5.1|         3.5|          1.4|         0.2|
|         0.0|          4.9|         3.0|          1.4|         0.2|
|         0.0|          4.7|         3.2|          1.3|         0.2|
|         0.0|          4.6|         3.1|          1.5|         0.2|
|         0.0|          5.0|         3.6|          1.4|         0.2|
+------------+-------------+------------+-------------+------------+
only showing top 5 rows



In [11]:
# We are converting the datafram to an rdd using df.rdd
# RDD stands for Resilient Distributed Dataset and it is an immutable distributed collection of elements
# Basically RDDs are split into partitions allowing parallel ocmputation

# .map applies the lambda function to each row of the  RDD
# Spark MLlib expects data to be in the format (label, features) where the feature is a feature vector as a DenseVector type
input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))
# Creating a new data fram to hold the inputs with headers
df_indx = spark.createDataFrame(input_data, ["label", "features"])
df_indx.show(5)

+-----+-----------------+
|label|         features|
+-----+-----------------+
|  0.0|[5.1,3.5,1.4,0.2]|
|  0.0|[4.9,3.0,1.4,0.2]|
|  0.0|[4.7,3.2,1.3,0.2]|
|  0.0|[4.6,3.1,1.5,0.2]|
|  0.0|[5.0,3.6,1.4,0.2]|
+-----+-----------------+
only showing top 5 rows



In [12]:
# Creating the standard scaler, specifying to scale the features DenseVector
stdScaler = StandardScaler(inputCol = "features", outputCol = "features_scaled")

# Fitting the standard scaler onto the dataframe
scaler = stdScaler.fit(df_indx)

# Transforming and outputting the scaled dataframe
df_scaled = scaler.transform(df_indx)


df_scaled.show(5)

+-----+-----------------+--------------------+
|label|         features|     features_scaled|
+-----+-----------------+--------------------+
|  0.0|[5.1,3.5,1.4,0.2]|[6.15892840883878...|
|  0.0|[4.9,3.0,1.4,0.2]|[5.9174018045706,...|
|  0.0|[4.7,3.2,1.3,0.2]|[5.67587520030241...|
|  0.0|[4.6,3.1,1.5,0.2]|[5.55511189816831...|
|  0.0|[5.0,3.6,1.4,0.2]|[6.03816510670469...|
+-----+-----------------+--------------------+
only showing top 5 rows



In [13]:
# As we can see this adds a new columns therefore we must also drop it
df_scaled = df_scaled.drop("features")
df_scaled.show(5)

+-----+--------------------+
|label|     features_scaled|
+-----+--------------------+
|  0.0|[6.15892840883878...|
|  0.0|[5.9174018045706,...|
|  0.0|[5.67587520030241...|
|  0.0|[5.55511189816831...|
|  0.0|[6.03816510670469...|
+-----+--------------------+
only showing top 5 rows



In [14]:
training_Set, testing_Set = df_scaled.randomSplit([0.9,0.1], seed = 42)
training_Set.show(5)

+-----+--------------------+
|label|     features_scaled|
+-----+--------------------+
|  0.0|[5.19282199176603...|
|  0.0|[5.31358529390013...|
|  0.0|[5.31358529390013...|
|  0.0|[5.31358529390013...|
|  0.0|[5.43434859603422...|
+-----+--------------------+
only showing top 5 rows



In [15]:
model = ['Decision Tree','Random Forest','Naive Bayes']
model_results = []

In [16]:
# Decision Tree

dtc = DecisionTreeClassifier(labelCol = "label", featuresCol= "features_scaled")
dtc_model = dtc.fit(training_Set)

dtc_pred = dtc_model.transform(testing_Set)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
dtc_acc = evaluator.evaluate(dtc_pred)
model_results.extend([[model[0],'{:.2%}'.format(dtc_acc)]])   

In [17]:
# -- Random Forest Classifier --

rfc = RandomForestClassifier(labelCol="label", featuresCol="features_scaled", numTrees=10)          #instantiate the model
rfc_model = rfc.fit(training_Set)                                                                     #train the model
rfc_pred = rfc_model.transform(testing_Set)                                                           #model predictions

#Evaluate the Model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
rfc_acc = evaluator.evaluate(rfc_pred)
#print("Random Forest Classifier Accuracy =", '{:.2%}'.format(rfc_acc))
model_results.extend([[model[1],'{:.2%}'.format(rfc_acc)]])                                            #appending to list

In [18]:
# -- Naive Bayes Classifier --

nbc = NaiveBayes(smoothing=1.0,modelType="multinomial", labelCol="label",featuresCol="features_scaled")    #instantiate the model
nbc_model = nbc.fit(training_Set)                                                                          #train the model
nbc_pred = nbc_model.transform(testing_Set)                                                                #model predictions

#Evaluate the Model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
nbc_acc = evaluator.evaluate(nbc_pred)
#print("Naive Bayes Accuracy =", '{:.2%}'.format(nbc_acc))
model_results.extend([[model[2],'{:.2%}'.format(nbc_acc)]])                                            #appending to list

In [19]:
gc.collect()

499

In [20]:
print (tabulate(model_results, headers=["Classifier Models", "Accuracy"]))

Classifier Models    Accuracy
-------------------  ----------
Decision Tree        77.78%
Random Forest        77.78%
Naive Bayes          88.89%
