### Problem Statement:

#### The input data is the iris dataset. It contains values of information about flower samples. For each sample, the petal and sepal length and width are recorded along with the type of the flower. Building a decision tree model on the dataset that can predict the type of flower based on the petal and sepal information.

### 1.	Write a Pyspark code for decision tree like exercise you have completed in pyhton. 

#### Importing all the required spark libraries

In [1]:
from pyspark import SparkContext,SparkConf
sc=SparkContext.getOrCreate()
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.mllib.util import MLUtils
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import Row
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from collections import Counter

#### Importing all the required python libraries

In [2]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder

In [3]:
# Building a spark session
ss = SparkSession.builder.appName("DecisionTreeClassifier").getOrCreate()

#### Reading the data initially as a dataframe for perorming Exploratory Data Analysis. As EDA is a basic part for building any model for a data science project

In [4]:
# Reading the data
dataset=pd.read_csv("iris.csv")

In [5]:
# Describing the basic stat of the dataset
dataset.describe()

Unnamed: 0.1,Unnamed: 0,Sepal.Length,Sepal.Width,Petal.Length,Petal.Width
count,150.0,150.0,150.0,150.0,150.0
mean,75.5,5.843333,3.057333,3.758,1.199333
std,43.445368,0.828066,0.435866,1.765298,0.762238
min,1.0,4.3,2.0,1.0,0.1
25%,38.25,5.1,2.8,1.6,0.3
50%,75.5,5.8,3.0,4.35,1.3
75%,112.75,6.4,3.3,5.1,1.8
max,150.0,7.9,4.4,6.9,2.5


In [6]:
# As the first variable is a index so we can drop it
data=dataset[['Sepal.Length','Sepal.Width','Petal.Length','Petal.Width','Species']]

In [7]:
# Checking for the datatype of each variable
data.info()
# There are three variables of float datatype and one of categorical datatype

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 150 entries, 0 to 149
Data columns (total 5 columns):
Sepal.Length    150 non-null float64
Sepal.Width     150 non-null float64
Petal.Length    150 non-null float64
Petal.Width     150 non-null float64
Species         150 non-null object
dtypes: float64(4), object(1)
memory usage: 5.9+ KB


In [8]:
# Checking for null values
data.isnull().sum()
# No null values present

Sepal.Length    0
Sepal.Width     0
Petal.Length    0
Petal.Width     0
Species         0
dtype: int64

In [9]:
# Converting the categorical datatype into int datatype using labelEncoder
for col in data.columns:
    if data[col].dtype == 'object':
        lbl = LabelEncoder()
        lbl.fit(data[col])
        data[col] = lbl.transform(data[col])

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  


#### Converting the data into RDD

In [10]:
rdd_data=sc.parallelize(data.values)

In [11]:
rdd_data.take(2)

[array([5.1, 3.5, 1.4, 0.2, 0. ]), array([4.9, 3. , 1.4, 0.2, 0. ])]

In [12]:
# Converting the rdd into rows
rdd_row = rdd_data.map(lambda p: Row(sepallength=int(p[0]),sepalwidth=int(p[1]),petallenght=int(p[2]),petalwidth=int(p[3]),species=int(p[4])))

#### Converting the rows into dataframe

In [13]:
iris_data=ss.createDataFrame(rdd_row)

In [14]:
iris_data.describe()

DataFrame[summary: string, petallenght: string, petalwidth: string, sepallength: string, sepalwidth: string, species: string]

In [15]:
iris_data.describe().show()

+-------+------------------+------------------+-----------------+------------------+------------------+
|summary|       petallenght|        petalwidth|      sepallength|        sepalwidth|           species|
+-------+------------------+------------------+-----------------+------------------+------------------+
|  count|               150|               150|              150|               150|               150|
|   mean|3.3066666666666666|              0.86|5.386666666666667|2.6466666666666665|               1.0|
| stddev| 1.772067637999897|0.7144716919918647|0.841751662485759|0.5326477696681351|0.8192319205190405|
|    min|                 1|                 0|                4|                 2|                 0|
|    max|                 6|                 2|                7|                 4|                 2|
+-------+------------------+------------------+-----------------+------------------+------------------+



In [16]:
assembler = VectorAssembler(inputCols=["petallenght","petalwidth","sepallength","sepalwidth"],outputCol="features")

In [17]:
iris_data = assembler.transform(iris_data) 

#### Dividing data into training and testing dataset of 70 to 30 ratio respectively

In [18]:
(trainingData, testData) = iris_data.randomSplit([0.7, 0.3])

In [19]:
# number of observations in training dataset
trainingData.count()

104

In [20]:
testData.count()

46

In [21]:
trainingData.take(5)

[Row(petallenght=1, petalwidth=0, sepallength=4, sepalwidth=3, species=0, features=DenseVector([1.0, 0.0, 4.0, 3.0])),
 Row(petallenght=1, petalwidth=0, sepallength=4, sepalwidth=3, species=0, features=DenseVector([1.0, 0.0, 4.0, 3.0])),
 Row(petallenght=1, petalwidth=0, sepallength=4, sepalwidth=3, species=0, features=DenseVector([1.0, 0.0, 4.0, 3.0])),
 Row(petallenght=1, petalwidth=0, sepallength=4, sepalwidth=3, species=0, features=DenseVector([1.0, 0.0, 4.0, 3.0])),
 Row(petallenght=1, petalwidth=0, sepallength=4, sepalwidth=3, species=0, features=DenseVector([1.0, 0.0, 4.0, 3.0]))]

In [22]:
testData.take(5)

[Row(petallenght=1, petalwidth=0, sepallength=4, sepalwidth=2, species=0, features=DenseVector([1.0, 0.0, 4.0, 2.0])),
 Row(petallenght=1, petalwidth=0, sepallength=4, sepalwidth=3, species=0, features=DenseVector([1.0, 0.0, 4.0, 3.0])),
 Row(petallenght=1, petalwidth=0, sepallength=4, sepalwidth=3, species=0, features=DenseVector([1.0, 0.0, 4.0, 3.0])),
 Row(petallenght=1, petalwidth=0, sepallength=4, sepalwidth=3, species=0, features=DenseVector([1.0, 0.0, 4.0, 3.0])),
 Row(petallenght=1, petalwidth=0, sepallength=4, sepalwidth=3, species=0, features=DenseVector([1.0, 0.0, 4.0, 3.0]))]

In [23]:
# Building the Decision Tree Classifier
dtreeClassifer = DecisionTreeClassifier(maxDepth=4, labelCol="species",featuresCol="features")

In [24]:
# Fitting the training data into the model
dtreeModel = dtreeClassifer.fit(trainingData)

In [25]:
#Predict on the test data
predictions = dtreeModel.transform(testData)
predictions.select("prediction","species").show(truncate=False)

+----------+-------+
|prediction|species|
+----------+-------+
|0.0       |0      |
|0.0       |0      |
|0.0       |0      |
|0.0       |0      |
|0.0       |0      |
|0.0       |0      |
|0.0       |0      |
|0.0       |0      |
|0.0       |0      |
|0.0       |0      |
|0.0       |0      |
|0.0       |0      |
|0.0       |0      |
|0.0       |0      |
|0.0       |0      |
|0.0       |0      |
|0.0       |0      |
|0.0       |0      |
|0.0       |0      |
|1.0       |1      |
+----------+-------+
only showing top 20 rows



In [26]:
#Evaluate accuracy
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="species",metricName="accuracy")
evaluator.evaluate(predictions)      
evaluator

MulticlassClassificationEvaluator_458194489626489b5481

In [27]:
# Draw a confusion matrix
predictions.groupBy("species","prediction").count().show()

+-------+----------+-----+
|species|prediction|count|
+-------+----------+-----+
|      2|       2.0|   11|
|      0|       0.0|   19|
|      1|       1.0|   14|
|      2|       1.0|    2|
+-------+----------+-----+



#### Here, Species 0 was correctly predicted and total count of it is 19
#### Species 1 correctly predicted and total count of its is 14
#### In Species 2, 11 are correctly predicted and 2 are missclassified and predicted as species 1, which is obvious as this is the error captured by the decision tree.

In [28]:
# Compute test error
evaluator = MulticlassClassificationEvaluator(labelCol="species", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print ("Decision Tree accuracy = %2.4f" %accuracy)

Decision Tree accuracy = 0.9565
