In [1]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkSess').getOrCreate()

In [2]:
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer

In [3]:
#load iris data
iris_df = spark.read.csv("iris.csv", inferSchema=True)

In [4]:
iris_df.take(1)

[Row(_c0=5.1, _c1=3.5, _c2=1.4, _c3=0.2, _c4='Iris-setosa')]

In [5]:
#rename columns
iris_df = iris_df.select(col("_c0").alias("sepal_length"),
                         col("_c1").alias("sepal_width"),
                         col("_c2").alias("petal_length"),
                         col("_c3").alias("petal_width"),
                         col("_c4").alias("species"))

In [6]:
iris_df.take(1)

[Row(sepal_length=5.1, sepal_width=3.5, petal_length=1.4, petal_width=0.2, species='Iris-setosa')]

In [7]:
#transform into vector structure
vectorAssembler = VectorAssembler(inputCols=["sepal_length",
"sepal_width", "petal_length", "petal_width"], outputCol="features")

In [8]:
viris_df = vectorAssembler.transform(iris_df)

In [9]:
viris_df.take(1)

[Row(sepal_length=5.1, sepal_width=3.5, petal_length=1.4, petal_width=0.2, species='Iris-setosa', features=DenseVector([5.1, 3.5, 1.4, 0.2]))]

In [10]:
indexer = StringIndexer(inputCol = "species", outputCol = "label")

In [11]:
iviris_df = indexer.fit(viris_df).transform(viris_df)

In [12]:
iviris_df.show(1)

+------------+-----------+------------+-----------+-----------+-----------------+-----+
|sepal_length|sepal_width|petal_length|petal_width|    species|         features|label|
+------------+-----------+------------+-----------+-----------+-----------------+-----+
|         5.1|        3.5|         1.4|        0.2|Iris-setosa|[5.1,3.5,1.4,0.2]|  0.0|
+------------+-----------+------------+-----------+-----------+-----------------+-----+
only showing top 1 row



### Naive Bayes

In [13]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [16]:
splits = iviris_df.randomSplit([0.6,0.4], 1)
train_df = splits[0]
test_df = splits[1]

In [17]:
train_df.count()
test_df.count()
iviris_df.count()

150

In [19]:
nb = NaiveBayes(modelType = "multinomial") 
#more than 2 diff classes, here 3 diff types of iris
nbmodel = nb.fit(train_df)

In [20]:
predictions_df = nbmodel.transform(test_df)

In [22]:
predictions_df.take(1)

[Row(sepal_length=4.3, sepal_width=3.0, petal_length=1.1, petal_width=0.1, species='Iris-setosa', features=DenseVector([4.3, 3.0, 1.1, 0.1]), label=0.0, rawPrediction=DenseVector([-9.9894, -11.3476, -11.902]), probability=DenseVector([0.7118, 0.183, 0.1051]), prediction=0.0)]

In [25]:
evaluator = MulticlassClassificationEvaluator(labelCol="label",
        predictionCol = "prediction", metricName = "accuracy")

In [27]:
nbaccuracy = evaluator.evaluate(predictions_df)

In [28]:
nbaccuracy

0.9807692307692307

### Multilayer Perceptron

In [29]:
iviris_df

DataFrame[sepal_length: double, sepal_width: double, petal_length: double, petal_width: double, species: string, features: vector, label: double]

In [31]:
iviris_df.take(1)

[Row(sepal_length=5.1, sepal_width=3.5, petal_length=1.4, petal_width=0.2, species='Iris-setosa', features=DenseVector([5.1, 3.5, 1.4, 0.2]), label=0.0)]

In [32]:
from pyspark.ml.classification import MultilayerPerceptronClassifier

In [33]:
#first layer = no. of inputs, i.e., 4
#last layer = types of outputs, i.e., 3
layers = [4,5,5,3] # 4 layer multilayer perceptron

In [34]:
#create perceptron

mlp = MultilayerPerceptronClassifier(layers=layers, seed=1)

In [35]:
mlp_model = mlp.fit(train_df)

In [36]:
mlp_predictions = mlp_model.transform(test_df)

In [37]:
mlp_evaluator = MulticlassClassificationEvaluator(metricName= "accuracy")

In [38]:
mlp_accuracy = mlp_evaluator.evaluate(mlp_predictions)

In [39]:
mlp_accuracy

0.6923076923076923

### Decision Trees

In [40]:
iviris_df

DataFrame[sepal_length: double, sepal_width: double, petal_length: double, petal_width: double, species: string, features: vector, label: double]

In [41]:
iviris_df.take(1)

[Row(sepal_length=5.1, sepal_width=3.5, petal_length=1.4, petal_width=0.2, species='Iris-setosa', features=DenseVector([5.1, 3.5, 1.4, 0.2]), label=0.0)]

In [42]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(labelCol="label", featuresCol = "features")

In [43]:
dt_model = dt.fit(train_df)

In [45]:
dt_predictions = dt_model.transform(test_df)

In [46]:
dt_evaluator = MulticlassClassificationEvaluator(labelCol="label",
                        predictionCol="prediction",
                        metricName="accuracy")

In [47]:
dt_accuracy = dt_evaluator.evaluate(dt_predictions)

In [48]:
dt_accuracy

0.9423076923076923