# ML Iris - Spark

### Install missing packages

In [2]:
!pip install pandas

Collecting pandas
  Downloading pandas-1.5.3-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (12.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.2/12.2 MB[0m [31m7.0 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hCollecting numpy>=1.20.3
  Downloading numpy-1.24.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (17.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m17.3/17.3 MB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
Installing collected packages: numpy, pandas
Successfully installed numpy-1.24.2 pandas-1.5.3
[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.3.1[0m[39;49m -> [0m[32;49m23.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3 -m pip install --upgrade pip[0m


### Install dependencies

In [3]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import Row
from pyspark.sql import SQLContext
from pyspark import SparkFiles
import os
import pandas as pd

### Create Spark & SQL context

In [4]:
sc =SparkContext()
sqlContext = SQLContext(sc)

23/03/14 13:28:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### Load Iris data

In [6]:
data_dir="data"
file = os.path.join(data_dir,"iris.csv")
panda_df = pd.read_csv(file)

In [7]:
iris_df=sqlContext.createDataFrame(panda_df)
iris_df.printSchema()

  for column, series in pdf.iteritems():


root
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- variety: string (nullable = true)



In [72]:
from pyspark.ml.feature import StringIndexer, StandardScaler, OneHotEncoder
stringIndexer = StringIndexer(inputCol="variety", outputCol="ind_variety")
si_model = stringIndexer.fit(iris_df)
irisNormDf = si_model.transform(iris_df)
irisNormDf.printSchema()
irisNormDf.select("variety","ind_variety").distinct().collect()

root
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- variety: string (nullable = true)
 |-- ind_variety: double (nullable = false)



[Row(variety='Virginica', ind_variety=2.0),
 Row(variety='Versicolor', ind_variety=1.0),
 Row(variety='Setosa', ind_variety=0.0)]

### Perform Data Analytics

In [10]:
irisNormDf.describe().show()

                                                                                

+-------+------------------+------------------+------------------+------------------+---------+------------------+
|summary|      sepal_length|       sepal_width|      petal_length|       petal_width|  variety|       ind_variety|
+-------+------------------+------------------+------------------+------------------+---------+------------------+
|  count|               150|               150|               150|               150|      150|               150|
|   mean| 5.843333333333334|3.0573333333333323|3.7579999999999996|1.1993333333333336|     null|               1.0|
| stddev|0.8280661279778636|0.4358662849366982| 1.765298233259466|0.7622376689603465|     null|0.8192319205190406|
|    min|               4.3|               2.0|               1.0|               0.1|   Setosa|               0.0|
|    max|               7.9|               4.4|               6.9|               2.5|Virginica|               2.0|
+-------+------------------+------------------+------------------+--------------

### Prepare data for ML

In [21]:
from pyspark.ml.linalg import Vectors
def transformToLabeledPoint(row) :
    lp = ( row["variety"], row["ind_variety"], \
                Vectors.dense([row["sepal_length"],\
                        row["sepal_width"], \
                        row["petal_length"], \
                        row["petal_width"]]))
    return lp

irisLp = irisNormDf.rdd.map(transformToLabeledPoint)
irisLpDf = sqlContext.createDataFrame(irisLp,["species","label", "features"])

scaler = StandardScaler(withMean=True, inputCol="features", outputCol="scaledFeatures")
scaler_model = scaler.fit(irisLpDf)
irisLpDf_scaled = scaler_model.transform(irisLpDf)

                                                                                

In [32]:
irisLpDf_scaled = irisLpDf_scaled.drop("features")
irisLpDf_scaled.show()
irisLpDf_scaled.cache()

+-------+-----+--------------------+
|species|label|      scaledFeatures|
+-------+-----+--------------------+
| Setosa|  0.0|[-0.8976738791967...|
| Setosa|  0.0|[-1.1392004834649...|
| Setosa|  0.0|[-1.3807270877331...|
| Setosa|  0.0|[-1.5014903898672...|
| Setosa|  0.0|[-1.0184371813308...|
| Setosa|  0.0|[-0.5353839727944...|
| Setosa|  0.0|[-1.5014903898672...|
| Setosa|  0.0|[-1.0184371813308...|
| Setosa|  0.0|[-1.7430169941354...|
| Setosa|  0.0|[-1.1392004834649...|
| Setosa|  0.0|[-0.5353839727944...|
| Setosa|  0.0|[-1.2599637855990...|
| Setosa|  0.0|[-1.2599637855990...|
| Setosa|  0.0|[-1.8637802962695...|
| Setosa|  0.0|[-0.0523307642581...|
| Setosa|  0.0|[-0.1730940663921...|
| Setosa|  0.0|[-0.5353839727944...|
| Setosa|  0.0|[-0.8976738791967...|
| Setosa|  0.0|[-0.1730940663921...|
| Setosa|  0.0|[-0.8976738791967...|
+-------+-----+--------------------+
only showing top 20 rows



DataFrame[species: string, label: double, scaledFeatures: vector]

### Perform Machine Learning

In [33]:
#Split into training and testing data
(trainingData, testData) = irisLpDf_scaled.randomSplit([0.8, 0.2])
trainingData.count()
testData.count()
testData.collect()

[Row(species='Setosa', label=0.0, scaledFeatures=DenseVector([-1.26, 0.7862, -1.2225, -1.3111])),
 Row(species='Setosa', label=0.0, scaledFeatures=DenseVector([-0.5354, 1.9333, -1.3924, -1.0487])),
 Row(species='Versicolor', label=1.0, scaledFeatures=DenseVector([0.793, -0.5904, 0.477, 0.3945])),
 Row(species='Versicolor', label=1.0, scaledFeatures=DenseVector([-1.1392, -1.5081, -0.2594, -0.2615])),
 Row(species='Versicolor', label=1.0, scaledFeatures=DenseVector([0.0684, 0.3273, 0.5903, 0.788])),
 Row(species='Versicolor', label=1.0, scaledFeatures=DenseVector([0.6722, -0.361, 0.307, 0.1321])),
 Row(species='Versicolor', label=1.0, scaledFeatures=DenseVector([-0.5354, -0.1315, 0.4203, 0.3945])),
 Row(species='Virginica', label=2.0, scaledFeatures=DenseVector([0.793, -0.1315, 0.8169, 1.0504]))]

In [42]:
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [36]:
#Decision Tree
dtClassifer = DecisionTreeClassifier(maxDepth=4, labelCol="label",\
                featuresCol="scaledFeatures")
dtModel = dtClassifer.fit(trainingData)

                                                                                

In [None]:
#Random Forest
rdClassifier = RandomForestClassifier(labelCol="label",
                                      featuresCol="scaledFeatures", numTrees=10)
rdModel = rdClassifier.fit(trainingData)

In [67]:
models = {'DecisionTreeClassifier': dtModel, 'RandomForestClassifier': rdModel}
models

{'DecisionTreeClassifier': DecisionTreeClassificationModel: uid=DecisionTreeClassifier_32315f125cd5, depth=4, numNodes=11, numClasses=3, numFeatures=4,
 'RandomForestClassifier': RandomForestClassificationModel: uid=RandomForestClassifier_2e01cbd75688, numTrees=10, numClasses=3, numFeatures=4}

In [62]:
#Predict on the test data
predictions = [ models[list(models)[i]].transform(testData) for i in range(len(models)) ]
# predictions.select("prediction","species","label").collect()

In [69]:
#Evaluate accuracy
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", \
                    labelCol="label",metricName="accuracy")
evaluators = [ evaluator.evaluate(prediction) for prediction in predictions ]
accuracies = dict()
for i in range(len(list(models))):
    accuracies[list(models)[i]] = evaluators[i]
accuracies

{'DecisionTreeClassifier': 0.75, 'RandomForestClassifier': 0.875}

### Gradient Boosted Tree Classifier: One vs All

In [89]:
#One Hot Encoder
encoder = OneHotEncoder(dropLast=False, inputCol="label", outputCol="label_vec").fit(irisLpDf_scaled)
irisLpDf_encoded = encoder.transform(irisLpDf_scaled)
irisLpDf_encoded = irisLpDf_encoded.select('*', vector_to_array('label_vec').alias('label_col'))

In [90]:
#Binary Classification
num_categories = len(irisLpDf_encoded.first()['label_col']) 
cols_expanded = [(col('label_col')[i].alias(f'{si_model.labels[i]}')) for i in range(num_categories)]
irisLpDf_encoded = irisLpDf_encoded.select('scaledFeatures',*cols_expanded)
irisLpDf_encoded.show()

+--------------------+------+----------+---------+
|      scaledFeatures|Setosa|Versicolor|Virginica|
+--------------------+------+----------+---------+
|[-0.8976738791967...|   1.0|       0.0|      0.0|
|[-1.1392004834649...|   1.0|       0.0|      0.0|
|[-1.3807270877331...|   1.0|       0.0|      0.0|
|[-1.5014903898672...|   1.0|       0.0|      0.0|
|[-1.0184371813308...|   1.0|       0.0|      0.0|
|[-0.5353839727944...|   1.0|       0.0|      0.0|
|[-1.5014903898672...|   1.0|       0.0|      0.0|
|[-1.0184371813308...|   1.0|       0.0|      0.0|
|[-1.7430169941354...|   1.0|       0.0|      0.0|
|[-1.1392004834649...|   1.0|       0.0|      0.0|
|[-0.5353839727944...|   1.0|       0.0|      0.0|
|[-1.2599637855990...|   1.0|       0.0|      0.0|
|[-1.2599637855990...|   1.0|       0.0|      0.0|
|[-1.8637802962695...|   1.0|       0.0|      0.0|
|[-0.0523307642581...|   1.0|       0.0|      0.0|
|[-0.1730940663921...|   1.0|       0.0|      0.0|
|[-0.5353839727944...|   1.0|  

In [104]:
irisLpDf_binary_class = {'Setosa': irisLpDf_encoded.select('scaledFeatures','Setosa'),
                         'Versicolor': irisLpDf_encoded.select('scaledFeatures','Versicolor'),
                         'Virginica': irisLpDf_encoded.select('scaledFeatures','Virginica')}
list(irisLpDf_binary_class)

['Setosa', 'Versicolor', 'Virginica']

In [113]:
from pyspark.ml.classification import GBTClassifier

for i in range(len(list(irisLpDf_binary_class))):
    #Split into training and testing data
    (trainingData, testData) = irisLpDf_binary_class[list(irisLpDf_binary_class)[i]].randomSplit([0.8, 0.2])
    trainingData.count()
    testData.count()
    testData.collect()
    
    gbClassifier = GBTClassifier(labelCol = list(irisLpDf_binary_class)[i], featuresCol = 'scaledFeatures')
    gbModel = gbClassifier.fit(trainingData)
    predictions = gbModel.transform(testData)
    
    #Evaluate accuracy
    multi_evaluator = MulticlassClassificationEvaluator(labelCol = list(irisLpDf_binary_class)[i], metricName = 'accuracy')
    print(list(irisLpDf_binary_class)[i], ' - Accuracy:', multi_evaluator.evaluate(predictions))

Setosa  - Accuracy: 1.0
Versicolor  - Accuracy: 0.918918918918919
Virginica  - Accuracy: 0.92
