In [1]:
!pip install Pyspark


Collecting Pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: Pyspark
  Building wheel for Pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for Pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488493 sha256=542e97cf8900d696eee268abe168050c8e6520215ad258b134de9ed9bcadeb5f
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built Pyspark
Installing collected packages: Pyspark
Successfully installed Pyspark-3.5.1


In [2]:
# start spark session
from pyspark.sql import SparkSession

In [3]:
spark=SparkSession.builder.getOrCreate()


In [4]:
spark


In [5]:
# read sklearn inbuilt data
import pandas as pd
import numpy as np
from sklearn.datasets import load_iris
iris = load_iris(as_frame=True)
iris = iris.frame
iris = spark.createDataFrame(iris)

In [6]:
iris.show()


+-----------------+----------------+-----------------+----------------+------+
|sepal length (cm)|sepal width (cm)|petal length (cm)|petal width (cm)|target|
+-----------------+----------------+-----------------+----------------+------+
|              5.1|             3.5|              1.4|             0.2|     0|
|              4.9|             3.0|              1.4|             0.2|     0|
|              4.7|             3.2|              1.3|             0.2|     0|
|              4.6|             3.1|              1.5|             0.2|     0|
|              5.0|             3.6|              1.4|             0.2|     0|
|              5.4|             3.9|              1.7|             0.4|     0|
|              4.6|             3.4|              1.4|             0.3|     0|
|              5.0|             3.4|              1.5|             0.2|     0|
|              4.4|             2.9|              1.4|             0.2|     0|
|              4.9|             3.1|              1.

In [7]:
iris.printSchema()


root
 |-- sepal length (cm): double (nullable = true)
 |-- sepal width (cm): double (nullable = true)
 |-- petal length (cm): double (nullable = true)
 |-- petal width (cm): double (nullable = true)
 |-- target: long (nullable = true)



In [8]:
iris.columns


['sepal length (cm)',
 'sepal width (cm)',
 'petal length (cm)',
 'petal width (cm)',
 'target']

In [9]:
from pyspark.ml.feature import VectorAssembler


In [10]:
featureassembler= VectorAssembler(inputCols=['sepal length (cm)',
 'sepal width (cm)',
 'petal length (cm)',
 'petal width (cm)'], outputCol='Features')

In [11]:
output = featureassembler.transform(iris)


In [12]:
output.show()


+-----------------+----------------+-----------------+----------------+------+-----------------+
|sepal length (cm)|sepal width (cm)|petal length (cm)|petal width (cm)|target|         Features|
+-----------------+----------------+-----------------+----------------+------+-----------------+
|              5.1|             3.5|              1.4|             0.2|     0|[5.1,3.5,1.4,0.2]|
|              4.9|             3.0|              1.4|             0.2|     0|[4.9,3.0,1.4,0.2]|
|              4.7|             3.2|              1.3|             0.2|     0|[4.7,3.2,1.3,0.2]|
|              4.6|             3.1|              1.5|             0.2|     0|[4.6,3.1,1.5,0.2]|
|              5.0|             3.6|              1.4|             0.2|     0|[5.0,3.6,1.4,0.2]|
|              5.4|             3.9|              1.7|             0.4|     0|[5.4,3.9,1.7,0.4]|
|              4.6|             3.4|              1.4|             0.3|     0|[4.6,3.4,1.4,0.3]|
|              5.0|           

In [13]:
modeldata=output.select('Features','target')


In [14]:
modeldata.show()


+-----------------+------+
|         Features|target|
+-----------------+------+
|[5.1,3.5,1.4,0.2]|     0|
|[4.9,3.0,1.4,0.2]|     0|
|[4.7,3.2,1.3,0.2]|     0|
|[4.6,3.1,1.5,0.2]|     0|
|[5.0,3.6,1.4,0.2]|     0|
|[5.4,3.9,1.7,0.4]|     0|
|[4.6,3.4,1.4,0.3]|     0|
|[5.0,3.4,1.5,0.2]|     0|
|[4.4,2.9,1.4,0.2]|     0|
|[4.9,3.1,1.5,0.1]|     0|
|[5.4,3.7,1.5,0.2]|     0|
|[4.8,3.4,1.6,0.2]|     0|
|[4.8,3.0,1.4,0.1]|     0|
|[4.3,3.0,1.1,0.1]|     0|
|[5.8,4.0,1.2,0.2]|     0|
|[5.7,4.4,1.5,0.4]|     0|
|[5.4,3.9,1.3,0.4]|     0|
|[5.1,3.5,1.4,0.3]|     0|
|[5.7,3.8,1.7,0.3]|     0|
|[5.1,3.8,1.5,0.3]|     0|
+-----------------+------+
only showing top 20 rows



In [15]:
# split data
train_data,test_data=modeldata.randomSplit([0.8,0.2])

In [16]:
train_data.show()


+-----------------+------+
|         Features|target|
+-----------------+------+
|[4.3,3.0,1.1,0.1]|     0|
|[4.4,2.9,1.4,0.2]|     0|
|[4.4,3.0,1.3,0.2]|     0|
|[4.4,3.2,1.3,0.2]|     0|
|[4.5,2.3,1.3,0.3]|     0|
|[4.6,3.1,1.5,0.2]|     0|
|[4.6,3.2,1.4,0.2]|     0|
|[4.6,3.4,1.4,0.3]|     0|
|[4.6,3.6,1.0,0.2]|     0|
|[4.7,3.2,1.3,0.2]|     0|
|[4.8,3.0,1.4,0.1]|     0|
|[4.8,3.0,1.4,0.3]|     0|
|[4.8,3.1,1.6,0.2]|     0|
|[4.8,3.4,1.9,0.2]|     0|
|[4.9,2.4,3.3,1.0]|     1|
|[4.9,3.0,1.4,0.2]|     0|
|[4.9,3.1,1.5,0.1]|     0|
|[4.9,3.6,1.4,0.1]|     0|
|[5.0,2.0,3.5,1.0]|     1|
|[5.0,3.0,1.6,0.2]|     0|
+-----------------+------+
only showing top 20 rows



In [17]:
# naive bayes model
from pyspark.ml.classification import NaiveBayes

In [18]:
nb = NaiveBayes(featuresCol='Features', labelCol='target')


In [19]:
nb=nb.fit(train_data)


In [20]:
y_pred = nb.transform(test_data)


In [21]:
y_pred.show()


+-----------------+------+--------------------+--------------------+----------+
|         Features|target|       rawPrediction|         probability|prediction|
+-----------------+------+--------------------+--------------------+----------+
|[4.7,3.2,1.6,0.2]|     0|[-11.711945770724...|[0.67427159159554...|       0.0|
|[4.8,3.4,1.6,0.2]|     0|[-11.999678157167...|[0.70401394724019...|       0.0|
|[4.9,3.1,1.5,0.2]|     0|[-11.552402138936...|[0.68615090626687...|       0.0|
|[5.0,3.4,1.5,0.2]|     0|[-11.948380210224...|[0.72724413450924...|       0.0|
|[5.0,3.4,1.6,0.4]|     0|[-12.866354780443...|[0.65448620747553...|       0.0|
|[5.2,3.4,1.4,0.2]|     0|[-11.897082263280...|[0.74927405659588...|       0.0|
|[5.4,3.4,1.7,0.2]|     0|[-12.620904238144...|[0.71235904679819...|       0.0|
|[5.5,2.3,4.0,1.3]|     1|[-19.941452515722...|[0.04887442905957...|       1.0|
|[5.5,3.5,1.3,0.2]|     0|[-12.025271017935...|[0.78370268669890...|       0.0|
|[6.1,2.9,4.7,1.4]|     1|[-22.736929883

In [22]:
# confusion matrix
y_pred.groupBy('target', 'prediction').count().show()

+------+----------+-----+
|target|prediction|count|
+------+----------+-----+
|     0|       0.0|    8|
|     1|       1.0|    8|
|     1|       2.0|    1|
|     2|       2.0|    9|
|     2|       1.0|    1|
+------+----------+-----+



In [23]:
from sklearn.metrics import confusion_matrix
pred=y_pred.select("prediction").collect()
orig=y_pred.select("target").collect()
print(confusion_matrix(orig, pred))

[[8 0 0]
 [0 8 1]
 [0 1 9]]


In [24]:
# evaluation
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [25]:
evaluator = MulticlassClassificationEvaluator(
    labelCol='target', predictionCol='prediction')

In [26]:
accuracy = evaluator.evaluate(y_pred)
accuracy

0.9259259259259258

In [27]:
# close connection to spark
spark.stop()