In [None]:
!pip install pyspark --quiet

In [29]:
import numpy as np
import pandas as pd

#Apache Spark Libraries
import pyspark
from pyspark.sql import SparkSession

#Apache Spark ML CLassifier Libraries
from pyspark.ml.classification import DecisionTreeClassifier,RandomForestClassifier,NaiveBayes

#Apache Spark Evaluation Library
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#Apache Spark Features libraries
from pyspark.ml.feature import StandardScaler,StringIndexer, VectorAssembler, VectorIndexer, OneHotEncoder

#Apache Spark Pipelin Library
from pyspark.ml import Pipeline

from tabulate import tabulate

# Apache Spark `DenseVector`
from pyspark.ml.linalg import DenseVector

import warnings
warnings.simplefilter('ignore')


import gc

In [2]:
#Building Spark Session
spark = (SparkSession.builder
         .appName("Apache Spark Beginner Tutorial")
        .config("spark.executor.memory","1G")
        .config("spark.executor.cores","4")
        .getOrCreate())

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/21 16:05:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark.version

'3.5.0'

In [4]:
url = '../input/iris-dataset/iris.csv'

data = spark.read.csv(url, header=True, inferSchema=True)
#for faster re-use
data.cache()

DataFrame[sepal_length: double, sepal_width: double, petal_length: double, petal_width: double, species: string]

In [5]:
data.count()

150

In [6]:
data.printSchema()

root
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- species: string (nullable = true)



In [7]:
data.show(5)

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| setosa|
|         4.9|        3.0|         1.4|        0.2| setosa|
|         4.7|        3.2|         1.3|        0.2| setosa|
|         4.6|        3.1|         1.5|        0.2| setosa|
|         5.0|        3.6|         1.4|        0.2| setosa|
+------------+-----------+------------+-----------+-------+
only showing top 5 rows



In [8]:
data.groupBy("species").count().show()

+----------+-----+
|   species|count|
+----------+-----+
| virginica|   50|
|versicolor|   50|
|    setosa|   50|
+----------+-----+



In [9]:
data.describe().show()

24/02/21 16:05:40 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+------------------+-------------------+------------------+------------------+---------+
|summary|      sepal_length|        sepal_width|      petal_length|       petal_width|  species|
+-------+------------------+-------------------+------------------+------------------+---------+
|  count|               150|                150|               150|               150|      150|
|   mean| 5.843333333333335| 3.0540000000000007|3.7586666666666693|1.1986666666666672|     NULL|
| stddev|0.8280661279778637|0.43359431136217375| 1.764420419952262|0.7631607417008414|     NULL|
|    min|               4.3|                2.0|               1.0|               0.1|   setosa|
|    max|               7.9|                4.4|               6.9|               2.5|virginica|
+-------+------------------+-------------------+------------------+------------------+---------+



In [10]:
string_indexer = StringIndexer(inputCol='species', outputCol='species_index')
data = string_indexer.fit(data).transform(data)

data.show(5)

+------------+-----------+------------+-----------+-------+-------------+
|sepal_length|sepal_width|petal_length|petal_width|species|species_index|
+------------+-----------+------------+-----------+-------+-------------+
|         5.1|        3.5|         1.4|        0.2| setosa|          0.0|
|         4.9|        3.0|         1.4|        0.2| setosa|          0.0|
|         4.7|        3.2|         1.3|        0.2| setosa|          0.0|
|         4.6|        3.1|         1.5|        0.2| setosa|          0.0|
|         5.0|        3.6|         1.4|        0.2| setosa|          0.0|
+------------+-----------+------------+-----------+-------+-------------+
only showing top 5 rows



In [11]:
df = data.select("species_index","sepal_length", "sepal_width", "petal_length", "petal_width")
df.show(5)

+-------------+------------+-----------+------------+-----------+
|species_index|sepal_length|sepal_width|petal_length|petal_width|
+-------------+------------+-----------+------------+-----------+
|          0.0|         5.1|        3.5|         1.4|        0.2|
|          0.0|         4.9|        3.0|         1.4|        0.2|
|          0.0|         4.7|        3.2|         1.3|        0.2|
|          0.0|         4.6|        3.1|         1.5|        0.2|
|          0.0|         5.0|        3.6|         1.4|        0.2|
+-------------+------------+-----------+------------+-----------+
only showing top 5 rows



In [16]:
DenseVector([1,5,6])

DenseVector([1.0, 5.0, 6.0])

In [17]:
input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

In [18]:
df_indx = spark.createDataFrame(input_data, ["label", "features"])

                                                                                

In [19]:
df_indx.show()

[Stage 19:>                                                         (0 + 1) / 1]

+-----+-----------------+
|label|         features|
+-----+-----------------+
|  0.0|[5.1,3.5,1.4,0.2]|
|  0.0|[4.9,3.0,1.4,0.2]|
|  0.0|[4.7,3.2,1.3,0.2]|
|  0.0|[4.6,3.1,1.5,0.2]|
|  0.0|[5.0,3.6,1.4,0.2]|
|  0.0|[5.4,3.9,1.7,0.4]|
|  0.0|[4.6,3.4,1.4,0.3]|
|  0.0|[5.0,3.4,1.5,0.2]|
|  0.0|[4.4,2.9,1.4,0.2]|
|  0.0|[4.9,3.1,1.5,0.1]|
|  0.0|[5.4,3.7,1.5,0.2]|
|  0.0|[4.8,3.4,1.6,0.2]|
|  0.0|[4.8,3.0,1.4,0.1]|
|  0.0|[4.3,3.0,1.1,0.1]|
|  0.0|[5.8,4.0,1.2,0.2]|
|  0.0|[5.7,4.4,1.5,0.4]|
|  0.0|[5.4,3.9,1.3,0.4]|
|  0.0|[5.1,3.5,1.4,0.3]|
|  0.0|[5.7,3.8,1.7,0.3]|
|  0.0|[5.1,3.8,1.5,0.3]|
+-----+-----------------+
only showing top 20 rows



                                                                                

In [20]:
std_scaler = StandardScaler(inputCol='features', outputCol='features_scaled')
scaler = std_scaler.fit(df_indx)
df_scaled =scaler.transform(df_indx)
df_scaled.show(5)

                                                                                

+-----+-----------------+--------------------+
|label|         features|     features_scaled|
+-----+-----------------+--------------------+
|  0.0|[5.1,3.5,1.4,0.2]|[6.15892840883878...|
|  0.0|[4.9,3.0,1.4,0.2]|[5.9174018045706,...|
|  0.0|[4.7,3.2,1.3,0.2]|[5.67587520030241...|
|  0.0|[4.6,3.1,1.5,0.2]|[5.55511189816831...|
|  0.0|[5.0,3.6,1.4,0.2]|[6.03816510670469...|
+-----+-----------------+--------------------+
only showing top 5 rows



In [21]:
df_scaled = df_scaled.drop('features')

In [22]:
train_data, test_data = df_scaled.randomSplit([0.9, 0.1], seed=42)

In [23]:
train_data.show(5)

[Stage 24:>                                                         (0 + 1) / 1]

+-----+--------------------+
|label|     features_scaled|
+-----+--------------------+
|  0.0|[5.19282199176603...|
|  0.0|[5.31358529390013...|
|  0.0|[5.31358529390013...|
|  0.0|[5.31358529390013...|
|  0.0|[5.43434859603422...|
+-----+--------------------+
only showing top 5 rows



                                                                                

In [24]:
model = ['Decision Tree','Random Forest','Naive Bayes']
model_results = []

In [25]:
dtc = DecisionTreeClassifier(labelCol='label', featuresCol='features_scaled')
dtc_model = dtc.fit(train_data)

dtc_pred = dtc_model.transform(test_data) 

evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')

dtc_acc = evaluator.evaluate(dtc_pred)
model_results.extend([[model[0],'{:.2%}'.format(dtc_acc)]])                               #appending to list


In [26]:
rfc = RandomForestClassifier(labelCol="label", featuresCol="features_scaled", numTrees=10)          #instantiate the model
rfc_model = rfc.fit(train_data)                                                                     #train the model
rfc_pred = rfc_model.transform(test_data)                                                           #model predictions

#Evaluate the Model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
rfc_acc = evaluator.evaluate(rfc_pred)
#print("Random Forest Classifier Accuracy =", '{:.2%}'.format(rfc_acc))
model_results.extend([[model[1],'{:.2%}'.format(rfc_acc)]])                                            #appending to list


In [27]:
nbc = NaiveBayes(smoothing=1.0,modelType="multinomial", labelCol="label",featuresCol="features_scaled")    #instantiate the model
nbc_model = nbc.fit(train_data)                                                                          #train the model
nbc_pred = nbc_model.transform(test_data)                                                                #model predictions

#Evaluate the Model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
nbc_acc = evaluator.evaluate(nbc_pred)
#print("Naive Bayes Accuracy =", '{:.2%}'.format(nbc_acc))
model_results.extend([[model[2],'{:.2%}'.format(nbc_acc)]])                                            #appending to list


24/02/21 16:06:44 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS


In [30]:
#freeing memory
gc.collect()

1630

In [31]:
from tabulate import tabulate
print(tabulate(model_results, headers=["Classifier Models", "Accuracy"]))

Classifier Models    Accuracy
-------------------  ----------
Decision Tree        77.78%
Random Forest        77.78%
Naive Bayes          88.89%
