# A quick summary

* Import Libraries
* Build Spark Session
* Data Load
* Data Exploration & Preparation
* Feature Engineering
* Data Scaling
* Data Split
* Build, Train & Evaluate Model


## Installing Spark Python

**apt-get update. upgrade :** 

This command is used to install the latest versions of the packages currently installed on the user's system. The installed packages which have new packages available are retrieved and installed.

**!apt install openjdk-8-jdk-headless -qq > /dev/null**

latest java verion was being installed.

In [1]:
!apt update > /dev/null
!apt install openjdk-8-jdk-headless -qq > /dev/null







This coding will help us to install latest verion of spark in google colab.

In [2]:
!wget -q http://apache.osuosl.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz


!tar xf spark-3.1.2-bin-hadoop3.2.tgz

!pip install -q pyspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"

new spark session has been activated and we are using port number 4050 in the code

## Importing Libraries

In [3]:
#Generic Libraries
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

#Apache Spark Libraries
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,regexp_replace

#Apache Spark ML CLassifier Libraries
from pyspark.ml.classification import DecisionTreeClassifier,RandomForestClassifier,NaiveBayes

#Apache Spark Evaluation Library
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#Apache Spark Features libraries
from pyspark.ml.feature import StandardScaler,StringIndexer, VectorAssembler, VectorIndexer, OneHotEncoder

#Apache Spark Pipelin Library
from pyspark.ml import Pipeline

# Apache Spark `DenseVector`
from pyspark.ml.linalg import DenseVector

#Data Split Libraries
import sklearn
from sklearn.model_selection import train_test_split


#Tabulating Data
from tabulate import tabulate

#Garbage
import gc

all the necessary libraries required for this dataset have been written above.

## Build Spark Session

In [4]:
from pyspark.sql import SparkSession
spark = (SparkSession.builder
                  .appName('Apache Spark Beginner Tutorial')
                  .config("spark.executor.memory", "1G")
                  .config("spark.executor.cores","4")
                  .getOrCreate())

new spark session has been activated.

In [5]:
spark.sparkContext.setLogLevel('INFO')

In [6]:
spark.version

'3.1.2'

It will help us to show which verion of spark are we using

## Data Load

In [7]:
!wget -q https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data

We are taking the dataset iris dataset by the name iris.data

wget will help us to take data from the server.

In [8]:
data_file = "./iris.data"

New name have been assigned to dataset called data_file

In [9]:
data = spark.read.format("csv") \
       .option("header", "true") \
       .option("inferSchema","true")\
       .load(data_file) 

This coding will help to show the data in SQL format

In [10]:
data = data.withColumnRenamed("5.1","sepal_length").withColumnRenamed("3.5","sepal_width").withColumnRenamed("1.4","petal_length").withColumnRenamed("0.2","petal_width").withColumnRenamed("Iris-setosa","species")

Changing the name of all the columns given in the Dataset

In [11]:
data.cache()

DataFrame[sepal_length: double, sepal_width: double, petal_length: double, petal_width: double, species: string]

cache will help us in the faster reuse of the data as it get stored in the memory

## Data Exploration & Preparation

In [12]:
#Total records 
data.count()

149

with the help of count we are able to check the to data avaiable in our dataset

In [13]:
data.show(5)

+------------+-----------+------------+-----------+-----------+
|sepal_length|sepal_width|petal_length|petal_width|    species|
+------------+-----------+------------+-----------+-----------+
|         4.9|        3.0|         1.4|        0.2|Iris-setosa|
|         4.7|        3.2|         1.3|        0.2|Iris-setosa|
|         4.6|        3.1|         1.5|        0.2|Iris-setosa|
|         5.0|        3.6|         1.4|        0.2|Iris-setosa|
|         5.4|        3.9|         1.7|        0.4|Iris-setosa|
+------------+-----------+------------+-----------+-----------+
only showing top 5 rows



Here we checking the top 5 data

In [14]:
data = data.withColumn("species",regexp_replace(data["species"],"Iris-",""))

Here we are able replace the Iris- with the space with help of regexp_replace

In [15]:
data.show(5)

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|         4.9|        3.0|         1.4|        0.2| setosa|
|         4.7|        3.2|         1.3|        0.2| setosa|
|         4.6|        3.1|         1.5|        0.2| setosa|
|         5.0|        3.6|         1.4|        0.2| setosa|
|         5.4|        3.9|         1.7|        0.4| setosa|
+------------+-----------+------------+-----------+-------+
only showing top 5 rows



Here again we are checking the top 5 data

In [16]:
#Records per Species
data.groupBy('species').count().show()

+----------+-----+
|   species|count|
+----------+-----+
| virginica|   50|
|versicolor|   50|
|    setosa|   49|
+----------+-----+



Here we are using the groupby function in our to evaluate the to species per categories

In [34]:
data.describe().show()

+-------+------------------+------------------+------------------+------------------+---------+------------------+
|summary|      sepal_length|       sepal_width|      petal_length|       petal_width|  species|      species_indx|
+-------+------------------+------------------+------------------+------------------+---------+------------------+
|  count|               149|               149|               149|               149|      149|               149|
|   mean| 5.848322147651008| 3.051006711409397|3.7744966442953043|1.2053691275167793|     null|0.9932885906040269|
| stddev|0.8285940572656166|0.4334988777167475|1.7596511617753412|0.7612920413899604|     null|0.8178469120551444|
|    min|               4.3|               2.0|               1.0|               0.1|   setosa|               0.0|
|    max|               7.9|               4.4|               6.9|               2.5|virginica|               2.0|
+-------+------------------+------------------+------------------+--------------

Inorder for our model to make predictions the Species aka Label column should be a numerical value (models don't like string!). To achieve this we shall use String Indexing on the Species columns

In [18]:
#String Indexing the Species column
SIndexer = StringIndexer(inputCol='species', outputCol='species_indx')
data = SIndexer.fit(data).transform(data)

#Inspect the dataset
data.show(5)


+------------+-----------+------------+-----------+-------+------------+
|sepal_length|sepal_width|petal_length|petal_width|species|species_indx|
+------------+-----------+------------+-----------+-------+------------+
|         4.9|        3.0|         1.4|        0.2| setosa|         2.0|
|         4.7|        3.2|         1.3|        0.2| setosa|         2.0|
|         4.6|        3.1|         1.5|        0.2| setosa|         2.0|
|         5.0|        3.6|         1.4|        0.2| setosa|         2.0|
|         5.4|        3.9|         1.7|        0.4| setosa|         2.0|
+------------+-----------+------------+-----------+-------+------------+
only showing top 5 rows



String indexing was being done using StringIndexer function. After that we fit the data and transform it

## Feature Engineering

The Spark model needs two columns: “label” and “features” and we are not going to do much feature engineering because we want to focus on the mechanics of training the model in Spark. 

So, creating a seperate dataframe with re-ordered columns, then defining an input data using Dense Vector. A Dense Vector is a local vector that is backed by a double array that represents its entry values. In other words, it's used to store arrays of values for use in PySpark.


In [19]:
#creating a seperate dataframe with re-ordered columns
df = data.select("species_indx","sepal_length", "sepal_width", "petal_length", "petal_width")

#Inspect the dataframe
df.show(5)

+------------+------------+-----------+------------+-----------+
|species_indx|sepal_length|sepal_width|petal_length|petal_width|
+------------+------------+-----------+------------+-----------+
|         2.0|         4.9|        3.0|         1.4|        0.2|
|         2.0|         4.7|        3.2|         1.3|        0.2|
|         2.0|         4.6|        3.1|         1.5|        0.2|
|         2.0|         5.0|        3.6|         1.4|        0.2|
|         2.0|         5.4|        3.9|         1.7|        0.4|
+------------+------------+-----------+------------+-----------+
only showing top 5 rows



**Note:** Observe that the species column which is our label (aka Target) is now at beginning of the dataframe

In [20]:
# Define the `input_data` as Dense Vector
input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

**Note:** Observe the definition of the Dense Vector. So,when we create a new indexed dataframe(below) the machine understands that the first column is a Label (Target) and the remaining columns are Features.

In [21]:
# Creating a new Indexed Dataframe
df_indx = spark.createDataFrame(input_data, ["label", "features"])

In [22]:
#view the indexed dataframe
df_indx.show(5)

+-----+-----------------+
|label|         features|
+-----+-----------------+
|  2.0|[4.9,3.0,1.4,0.2]|
|  2.0|[4.7,3.2,1.3,0.2]|
|  2.0|[4.6,3.1,1.5,0.2]|
|  2.0|[5.0,3.6,1.4,0.2]|
|  2.0|[5.4,3.9,1.7,0.4]|
+-----+-----------------+
only showing top 5 rows



## Data Scaling

This is also known as Feature Scaling. It is a method of normalizing the features of the data. Scaling can make a difference between a weak machine learning model and a better one. 

In this tutorial we will use a Standard Scaler to scale our feature data. Apache Spark has a Standard Scaler library to do the job.

In [23]:
#Initialize Standard Scaler
stdScaler = StandardScaler(inputCol="features", outputCol="features_scaled")

#Fit the Standard Scaler to the indexed Dataframe
scaler = stdScaler.fit(df_indx)

#Transform the dataframe
df_scaled =scaler.transform(df_indx)

In [24]:
#Viewing the Scaled Data
df_scaled.show(5)

+-----+-----------------+--------------------+
|label|         features|     features_scaled|
+-----+-----------------+--------------------+
|  2.0|[4.9,3.0,1.4,0.2]|[5.91363159925396...|
|  2.0|[4.7,3.2,1.3,0.2]|[5.67225888091706...|
|  2.0|[4.6,3.1,1.5,0.2]|[5.55157252174862...|
|  2.0|[5.0,3.6,1.4,0.2]|[6.03431795842241...|
|  2.0|[5.4,3.9,1.7,0.4]|[6.51706339509620...|
+-----+-----------------+--------------------+
only showing top 5 rows



In [25]:
#Dropping the Features column
df_scaled = df_scaled.drop("features")

## Data Split

Just like always, before building a model we shall split our scaled dataset into training & test sets. 
Training Dataset = 90%
Test Dataset = 10%

In [26]:
train_data, test_data = df_scaled.randomSplit([0.9, 0.1], seed = 12345)

In [27]:
#Inspect Training Data
train_data.show(5)

+-----+--------------------+
|label|     features_scaled|
+-----+--------------------+
|  0.0|[5.91363159925396...|
|  0.0|[6.03431795842241...|
|  0.0|[6.03431795842241...|
|  0.0|[6.15500431759086...|
|  0.0|[6.27569067675931...|
+-----+--------------------+
only showing top 5 rows



## Build, Train & Evaluate Model

In this step we will create multiple models, train them on our scaled dataset and then compare their accuracy.

In [28]:
model = ['Decision Tree','Random Forest','Naive Bayes']
model_results = []

In [29]:
# -- Decision Tree Classifier --

dtc = DecisionTreeClassifier(labelCol="label", featuresCol="features_scaled")          #instantiate the model
dtc_model = dtc.fit(train_data)                                                        #train the model
dtc_pred = dtc_model.transform(test_data)                                              #model predictions

#Evaluate the Model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
dtc_acc = evaluator.evaluate(dtc_pred)
#print("Decision Tree Classifier Accuracy =", '{:.2%}'.format(dtc_acc))
model_results.extend([[model[0],'{:.2%}'.format(dtc_acc)]])                               #appending to list
    

In [30]:
# -- Random Forest Classifier --

rfc = RandomForestClassifier(labelCol="label", featuresCol="features_scaled", numTrees=10)          #instantiate the model
rfc_model = rfc.fit(train_data)                                                                     #train the model
rfc_pred = rfc_model.transform(test_data)                                                           #model predictions

#Evaluate the Model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
rfc_acc = evaluator.evaluate(rfc_pred)
#print("Random Forest Classifier Accuracy =", '{:.2%}'.format(rfc_acc))
model_results.extend([[model[1],'{:.2%}'.format(rfc_acc)]])                                            #appending to list

In [31]:
# -- Naive Bayes Classifier --

nbc = NaiveBayes(smoothing=1.0,modelType="multinomial", labelCol="label",featuresCol="features_scaled")    #instantiate the model
nbc_model = nbc.fit(train_data)                                                                          #train the model
nbc_pred = nbc_model.transform(test_data)                                                                #model predictions

#Evaluate the Model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
nbc_acc = evaluator.evaluate(nbc_pred)
#print("Naive Bayes Accuracy =", '{:.2%}'.format(nbc_acc))
model_results.extend([[model[2],'{:.2%}'.format(nbc_acc)]])                                            #appending to list

In [32]:
#freeing memory
gc.collect()

503

Tabulating the results.

In [33]:
print (tabulate(model_results, headers=["Classifier Models", "Accuracy"]))

Classifier Models    Accuracy
-------------------  ----------
Decision Tree        81.82%
Random Forest        81.82%
Naive Bayes          90.91%


From here we can observe that Naive Bayes is giving us the better result in machine learning as compared to the Decision tree and Random Forest