## Always install the PySpark module with every runtime!

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=78cb353ffa7e6971f738a846f10fce656c039512de4a4a7967ac02f6734528bf
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


## Import the necessary libraries

In [None]:
import numpy as np
import pandas as pd

import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from sklearn.datasets import load_wine
from sklearn.metrics import confusion_matrix

## Create a spark session

In [None]:
spark = SparkSession.builder.getOrCreate()
print(spark)

<pyspark.sql.session.SparkSession object at 0x7f9c73826e30>


## Import your Dataset (first pandas df then a spark df)
Simple method: `df = pd.read_csv("https://github.com/YBIFoundation/Dataset/raw/main/IRIS.csv")`

In [None]:
# Import the dataset into your spark session
df = load_iris(as_frame=True)
df = df.frame
df = spark.createDataFrame(df)
df.show()

+-----------------+----------------+-----------------+----------------+------+
|sepal length (cm)|sepal width (cm)|petal length (cm)|petal width (cm)|target|
+-----------------+----------------+-----------------+----------------+------+
|              5.1|             3.5|              1.4|             0.2|     0|
|              4.9|             3.0|              1.4|             0.2|     0|
|              4.7|             3.2|              1.3|             0.2|     0|
|              4.6|             3.1|              1.5|             0.2|     0|
|              5.0|             3.6|              1.4|             0.2|     0|
|              5.4|             3.9|              1.7|             0.4|     0|
|              4.6|             3.4|              1.4|             0.3|     0|
|              5.0|             3.4|              1.5|             0.2|     0|
|              4.4|             2.9|              1.4|             0.2|     0|
|              4.9|             3.1|              1.

In [None]:
# Preliminary Checks on how the dataset looks like
df.printSchema()

root
 |-- sepal length (cm): double (nullable = true)
 |-- sepal width (cm): double (nullable = true)
 |-- petal length (cm): double (nullable = true)
 |-- petal width (cm): double (nullable = true)
 |-- target: long (nullable = true)



## Club necessary features into a single vector using the VectorAssembler Module!

In [None]:
# Group all the related features together as a single vector using the VectorAssembler
featureassembler = VectorAssembler(inputCols=['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)', 'petal width (cm)'], outputCol='Features')
output_features = featureassembler.transform(df)
output_features.show()

+-----------------+----------------+-----------------+----------------+------+-----------------+
|sepal length (cm)|sepal width (cm)|petal length (cm)|petal width (cm)|target|         Features|
+-----------------+----------------+-----------------+----------------+------+-----------------+
|              5.1|             3.5|              1.4|             0.2|     0|[5.1,3.5,1.4,0.2]|
|              4.9|             3.0|              1.4|             0.2|     0|[4.9,3.0,1.4,0.2]|
|              4.7|             3.2|              1.3|             0.2|     0|[4.7,3.2,1.3,0.2]|
|              4.6|             3.1|              1.5|             0.2|     0|[4.6,3.1,1.5,0.2]|
|              5.0|             3.6|              1.4|             0.2|     0|[5.0,3.6,1.4,0.2]|
|              5.4|             3.9|              1.7|             0.4|     0|[5.4,3.9,1.7,0.4]|
|              4.6|             3.4|              1.4|             0.3|     0|[4.6,3.4,1.4,0.3]|
|              5.0|           

In [None]:
# Select only the necessary columns like the feature vector and the target column like the species column
model_df = output_features.select('Features', 'target')
model_df.show()

+-----------------+------+
|         Features|target|
+-----------------+------+
|[5.1,3.5,1.4,0.2]|     0|
|[4.9,3.0,1.4,0.2]|     0|
|[4.7,3.2,1.3,0.2]|     0|
|[4.6,3.1,1.5,0.2]|     0|
|[5.0,3.6,1.4,0.2]|     0|
|[5.4,3.9,1.7,0.4]|     0|
|[4.6,3.4,1.4,0.3]|     0|
|[5.0,3.4,1.5,0.2]|     0|
|[4.4,2.9,1.4,0.2]|     0|
|[4.9,3.1,1.5,0.1]|     0|
|[5.4,3.7,1.5,0.2]|     0|
|[4.8,3.4,1.6,0.2]|     0|
|[4.8,3.0,1.4,0.1]|     0|
|[4.3,3.0,1.1,0.1]|     0|
|[5.8,4.0,1.2,0.2]|     0|
|[5.7,4.4,1.5,0.4]|     0|
|[5.4,3.9,1.3,0.4]|     0|
|[5.1,3.5,1.4,0.3]|     0|
|[5.7,3.8,1.7,0.3]|     0|
|[5.1,3.8,1.5,0.3]|     0|
+-----------------+------+
only showing top 20 rows



## Split the dataset

In [None]:
# Split the Dataset
train_data, test_data = model_df.randomSplit([0.8, 0.2])

## Instantiate the model class and fit the dataset

In [None]:
# Intiate the model object
model = NaiveBayes(featuresCol='Features', labelCol='target')

In [None]:
# Fit the dataset
model = model.fit(train_data)

## Start the predictions using transform() method of the ML model class

In [None]:
# Predictions using Transform
predictions = model.transform(test_data)

## Finding Accuracies and do the Math stuff!

In [None]:
# Find the accuracies counts
predictions.groupBy('target', 'prediction').count().show()

+------+----------+-----+
|target|prediction|count|
+------+----------+-----+
|     0|       0.0|   10|
|     1|       1.0|   10|
|     2|       2.0|   10|
|     2|       1.0|    1|
+------+----------+-----+



In [None]:
# Get the confusion matrix
predicted_values = predictions.select('prediction').collect()
original_values = predictions.select('target').collect()
cm = confusion_matrix(original_values, predicted_values)
print(cm)

[[10  0  0]
 [ 0 10  0]
 [ 0  1 10]]


In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol='target', predictionCol='prediction')
accuracy = evaluator.evaluate(predictions)
print(f"The final accuracy of the model is: {accuracy}")
print(f"Test error of the model is: {1.0 - accuracy}")

The final accuracy of the model is: 0.967741935483871
Test error of the model is: 0.032258064516129004
