###### Student: Katayoun B.
##### Email:   katayounb@gmail.com 
###### Winer 2020 - Big data infrastructure

In [1]:
#pip install findspark   # you need to ru this once 

In [2]:
import findspark

In [3]:
findspark.init()

In [4]:
from pyspark.sql import SparkSession
import pyspark.sql as sparksql
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [5]:

spark = SparkSession.builder.appName('iris').getOrCreate()
train = spark.read.csv('/Users/katy/desktop/Data-infrastructure/project/data/iris.csv', inferSchema=True,header=True)

In [6]:
train.printSchema()

root
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- species: string (nullable = true)



In [7]:
train.describe()

DataFrame[summary: string, sepal_length: string, sepal_width: string, petal_length: string, petal_width: string, species: string]

In [8]:
train.dtypes

[('sepal_length', 'double'),
 ('sepal_width', 'double'),
 ('petal_length', 'double'),
 ('petal_width', 'double'),
 ('species', 'string')]

#### vectorize all numerical columns into one feature column

In [9]:
vector_assembler = VectorAssembler(inputCols=["sepal_length", "sepal_width", "petal_length", "petal_width"],outputCol="features")

In [10]:
df_temp = vector_assembler.transform(train)

In [11]:
df_temp.show(3)

+------------+-----------+------------+-----------+-------+-----------------+
|sepal_length|sepal_width|petal_length|petal_width|species|         features|
+------------+-----------+------------+-----------+-------+-----------------+
|         5.1|        3.5|         1.4|        0.2| setosa|[5.1,3.5,1.4,0.2]|
|         4.9|        3.0|         1.4|        0.2| setosa|[4.9,3.0,1.4,0.2]|
|         4.7|        3.2|         1.3|        0.2| setosa|[4.7,3.2,1.3,0.2]|
+------------+-----------+------------+-----------+-------+-----------------+
only showing top 3 rows



#### Drop columns that we dont need 

In [12]:
df = df_temp.drop("sepal_length", "sepal_width", "petal_length", "petal_width")
df.show(3)


+-------+-----------------+
|species|         features|
+-------+-----------------+
| setosa|[5.1,3.5,1.4,0.2]|
| setosa|[4.9,3.0,1.4,0.2]|
| setosa|[4.7,3.2,1.3,0.2]|
+-------+-----------------+
only showing top 3 rows



#### conver text label to numeric using StringIndexer - output columns will be "speciesIndex"

In [13]:
from pyspark.ml.feature import StringIndexer
l_indexer = StringIndexer(inputCol="species", outputCol="speciesIndex")
df = l_indexer.fit(df).transform(df)

In [14]:
df.show(3)

+-------+-----------------+------------+
|species|         features|speciesIndex|
+-------+-----------------+------------+
| setosa|[5.1,3.5,1.4,0.2]|         2.0|
| setosa|[4.9,3.0,1.4,0.2]|         2.0|
| setosa|[4.7,3.2,1.3,0.2]|         2.0|
+-------+-----------------+------------+
only showing top 3 rows



#### now divide the data into train and test 

In [15]:
(trainingData, testData) = df.randomSplit([0.7, 0.3])

#### Decision Tree Classifier 

In [16]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#### Training the model 

In [17]:
dt = DecisionTreeClassifier(labelCol="speciesIndex", featuresCol="features")
model = dt.fit(trainingData)

#### using training model to make prediction and see the result 

In [18]:
predictions = model.transform(testData)

In [19]:
predictions.select("prediction", "speciesIndex").show(5)

+----------+------------+
|prediction|speciesIndex|
+----------+------------+
|       2.0|         2.0|
|       2.0|         2.0|
|       2.0|         2.0|
|       2.0|         2.0|
|       2.0|         2.0|
+----------+------------+
only showing top 5 rows



#### try to see the accuracy of the prediction by calculating the Test Error

In [20]:
evaluator = MulticlassClassificationEvaluator(labelCol="speciesIndex", predictionCol="prediction", metricName="accuracy")


In [21]:
accuracy = evaluator.evaluate(predictions)

In [22]:
print("Test Error = %g " % (1.0 - accuracy))

Test Error = 0.0512821 


In [23]:
print(model)

DecisionTreeClassificationModel (uid=DecisionTreeClassifier_cfccfa052199) of depth 4 with 15 nodes


##### RandomForestClassifier

In [24]:
from pyspark.ml.classification import RandomForestClassifier

#### Training the model 

In [25]:
rf = RandomForestClassifier(labelCol="speciesIndex",featuresCol="features", numTrees=10)
model = rf.fit(trainingData)

#### predict on the test set 

In [26]:
predictions = model.transform(testData)

In [27]:
predictions.select("prediction", "speciesIndex").show(5)

+----------+------------+
|prediction|speciesIndex|
+----------+------------+
|       2.0|         2.0|
|       2.0|         2.0|
|       2.0|         2.0|
|       2.0|         2.0|
|       2.0|         2.0|
+----------+------------+
only showing top 5 rows



#### evaluate the accuracy by calculating Test error 

In [28]:
evaluator =MulticlassClassificationEvaluator(labelCol="speciesIndex",predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

Test Error = 0.025641


In [29]:
print(model)

RandomForestClassificationModel (uid=RandomForestClassifier_12cdbcef0756) with 10 trees
