# Classification

In [1]:
import findspark
findspark.init()
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate() 
spark

In [4]:
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler, StringIndexer

In [17]:
iris_df = spark.read.csv("iris.data", inferSchema=True)
iris_df.show(2)

+---+---+---+---+-----------+
|_c0|_c1|_c2|_c3|        _c4|
+---+---+---+---+-----------+
|5.1|3.5|1.4|0.2|Iris-setosa|
|4.9|3.0|1.4|0.2|Iris-setosa|
+---+---+---+---+-----------+
only showing top 2 rows



In [18]:
iris_df = iris_df.select(col("_c0").alias("sepal_length")
,col("_c1").alias("sepal_width")
,col("_c2").alias("petal_length")
,col("_c3").alias("petal_width")
,col("_c4").alias("species"))



In [19]:
iris_df.show(2, vertical=True)



-RECORD 0-------------------
 sepal_length | 5.1         
 sepal_width  | 3.5         
 petal_length | 1.4         
 petal_width  | 0.2         
 species      | Iris-setosa 
-RECORD 1-------------------
 sepal_length | 4.9         
 sepal_width  | 3.0         
 petal_length | 1.4         
 petal_width  | 0.2         
 species      | Iris-setosa 
only showing top 2 rows



In [20]:
vectorAssembler = VectorAssembler(inputCols=[
    "sepal_length", "sepal_width", 
    "petal_length", "petal_width"
], outputCol="features"
                                 )




In [21]:
viris_df = vectorAssembler.transform(iris_df)

In [22]:
viris_df.show(2, vertical=True)

-RECORD 0-------------------------
 sepal_length | 5.1               
 sepal_width  | 3.5               
 petal_length | 1.4               
 petal_width  | 0.2               
 species      | Iris-setosa       
 features     | [5.1,3.5,1.4,0.2] 
-RECORD 1-------------------------
 sepal_length | 4.9               
 sepal_width  | 3.0               
 petal_length | 1.4               
 petal_width  | 0.2               
 species      | Iris-setosa       
 features     | [4.9,3.0,1.4,0.2] 
only showing top 2 rows



In [23]:
indexer = StringIndexer(inputCol="species", outputCol="label")

In [28]:
iviris_df = indexer.fit(viris_df).transform(viris_df)

In [29]:
iviris_df.show(2, vertical=True)

-RECORD 0-------------------------
 sepal_length | 5.1               
 sepal_width  | 3.5               
 petal_length | 1.4               
 petal_width  | 0.2               
 species      | Iris-setosa       
 features     | [5.1,3.5,1.4,0.2] 
 label        | 0.0               
-RECORD 1-------------------------
 sepal_length | 4.9               
 sepal_width  | 3.0               
 petal_length | 1.4               
 petal_width  | 0.2               
 species      | Iris-setosa       
 features     | [4.9,3.0,1.4,0.2] 
 label        | 0.0               
only showing top 2 rows



## Naive Bayes

In [30]:
from pyspark.ml.classification import NaiveBayes

In [31]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [57]:
splits = iviris_df.randomSplit([0.6, 0.4], seed=1)

#splits = iviris_df.select("features", "label").randomSplit([0.6, 0.4], seed=1)

In [58]:
train_df = splits[0]
test_df = splits[1]

In [59]:
train_df.show(1, vertical=True)

-RECORD 0-------------------------
 sepal_length | 4.4               
 sepal_width  | 2.9               
 petal_length | 1.4               
 petal_width  | 0.2               
 species      | Iris-setosa       
 features     | [4.4,2.9,1.4,0.2] 
 label        | 0.0               
only showing top 1 row



In [49]:
print(train_df.count())
print(test_df.count())

98
52


In [50]:
iviris_df.count()

150

In [51]:
nb = NaiveBayes(modelType="multinomial")

In [52]:
nbModel = nb.fit(train_df)

In [53]:
predictions_df = nbModel.transform(test_df)

In [54]:
predictions_df.show(2, vertical=True)

-RECORD 0-----------------------------
 features      | [4.3,3.0,1.1,0.1]    
 label         | 0.0                  
 rawPrediction | [-9.9894495209670... 
 probability   | [0.71183175063155... 
 prediction    | 0.0                  
-RECORD 1-----------------------------
 features      | [4.5,2.3,1.3,0.3]    
 label         | 0.0                  
 rawPrediction | [-10.475670388034... 
 probability   | [0.52746779120113... 
 prediction    | 0.0                  
only showing top 2 rows



In [55]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="accuracy"
)

In [56]:
evaluator.evaluate(predictions_df)

0.9807692307692307

## Multilayer Perceptron

In [62]:
from pyspark.ml.classification import MultilayerPerceptronClassifier


In [72]:
layers = [4, 10, 7, 3]
# layer 1st: number of entries (features)
# layer last: output layer: number of different labels in the output

In [73]:
mlp = MultilayerPerceptronClassifier(layers=layers, seed=1)

In [74]:
mlp_model = mlp.fit(train_df)

In [75]:
mlp_predictions = mlp_model.transform(test_df)

In [76]:
mlp_evaluator = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="accuracy"
)

In [77]:
mlp_evaluator.evaluate(mlp_predictions)

0.9423076923076923

## Decision Trees

In [78]:
from pyspark.ml.classification import DecisionTreeClassifier

In [81]:
dc = DecisionTreeClassifier(labelCol="label", featuresCol="features", seed=1)




In [82]:
dc_model = dc.fit(train_df)

Exception ignored in: <function JavaWrapper.__del__ at 0x7fd4ce0bb0d0>
Traceback (most recent call last):
  File "/usr/local/opt/apache-spark/libexec/python/pyspark/ml/wrapper.py", line 42, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'DecisionTreeClassifier' object has no attribute '_java_obj'


In [83]:
dt_predictions = dc_model.transform(test_df)

In [84]:
dt_evaluator = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="accuracy"
)

In [85]:
dt_evaluator.evaluate(dt_predictions)

0.9423076923076923