In [1]:
from pyspark.sql import SparkSession #entry point for pyspark

#instantiate spark instance
spark = SparkSession.builder.appName('Random Forest Iris').master("local[*]").getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/05/15 16:01:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


After version 3.0, `SparkSession` is the main entry point for Spark. `SparkSession.builder` creates a spark session. Any thing can go into the `appName()` to specify which jobs you are running currently. Once the spark session is instantiated, you can access the Spark UI at `localhost:4040` to view jobs.

In [2]:
df = spark.read.csv('./data/IRIS.csv', header=True, inferSchema=True)
df.printSchema() #to see the schema

                                                                                

root
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- species: string (nullable = true)



In [3]:
df.show() # or df.show(Truncate=false) if you'd like to see all the contents

+------------+-----------+------------+-----------+-----------+
|sepal_length|sepal_width|petal_length|petal_width|    species|
+------------+-----------+------------+-----------+-----------+
|         5.1|        3.5|         1.4|        0.2|Iris-setosa|
|         4.9|        3.0|         1.4|        0.2|Iris-setosa|
|         4.7|        3.2|         1.3|        0.2|Iris-setosa|
|         4.6|        3.1|         1.5|        0.2|Iris-setosa|
|         5.0|        3.6|         1.4|        0.2|Iris-setosa|
|         5.4|        3.9|         1.7|        0.4|Iris-setosa|
|         4.6|        3.4|         1.4|        0.3|Iris-setosa|
|         5.0|        3.4|         1.5|        0.2|Iris-setosa|
|         4.4|        2.9|         1.4|        0.2|Iris-setosa|
|         4.9|        3.1|         1.5|        0.1|Iris-setosa|
|         5.4|        3.7|         1.5|        0.2|Iris-setosa|
|         4.8|        3.4|         1.6|        0.2|Iris-setosa|
|         4.8|        3.0|         1.4| 

In [4]:
# do more analysis if necessary on the data, and feel free to use pandas library
import pandas as pd
pd.DataFrame(df.take(10), columns=df.columns).transpose()

Unnamed: 0,0,1,2,3,4,5,6,7,8,9
sepal_length,5.1,4.9,4.7,4.6,5.0,5.4,4.6,5.0,4.4,4.9
sepal_width,3.5,3.0,3.2,3.1,3.6,3.9,3.4,3.4,2.9,3.1
petal_length,1.4,1.4,1.3,1.5,1.4,1.7,1.4,1.5,1.4,1.5
petal_width,0.2,0.2,0.2,0.2,0.2,0.4,0.3,0.2,0.2,0.1
species,Iris-setosa,Iris-setosa,Iris-setosa,Iris-setosa,Iris-setosa,Iris-setosa,Iris-setosa,Iris-setosa,Iris-setosa,Iris-setosa


Once the exploratory data analysis is done, we can start feature transforming to prepare for feataure engineering. Feature transforming means scaling, modifying features to be used for train/test validation, converting, etc. For this purpose, we can use `VectorAssembler` in PySpark.`

In [5]:
from pyspark.ml.feature import VectorAssembler


numeric_cols = [] #insert numeric cols
assembler = VectorAssembler(inputCols=numeric_cols, outputCol="features")
df = assembler.transform(df) #just use the same dataframe
df.show()

+------------+-----------+------------+-----------+-----------+--------+
|sepal_length|sepal_width|petal_length|petal_width|    species|features|
+------------+-----------+------------+-----------+-----------+--------+
|         5.1|        3.5|         1.4|        0.2|Iris-setosa|      []|
|         4.9|        3.0|         1.4|        0.2|Iris-setosa|      []|
|         4.7|        3.2|         1.3|        0.2|Iris-setosa|      []|
|         4.6|        3.1|         1.5|        0.2|Iris-setosa|      []|
|         5.0|        3.6|         1.4|        0.2|Iris-setosa|      []|
|         5.4|        3.9|         1.7|        0.4|Iris-setosa|      []|
|         4.6|        3.4|         1.4|        0.3|Iris-setosa|      []|
|         5.0|        3.4|         1.5|        0.2|Iris-setosa|      []|
|         4.4|        2.9|         1.4|        0.2|Iris-setosa|      []|
|         4.9|        3.1|         1.5|        0.1|Iris-setosa|      []|
|         5.4|        3.7|         1.5|        0.2|

This should have created another column in your dataframe called `features` as we have denoted in `outputCol`. Now, we can use the `StringIndexer` to encode the string column of species to a label indicies. By default, the labels are assigned according to the frequencies (for imbalanced dataset). The most frequent species would get an index of 0. For balanced dataset, whichever string appears first will get 0, then so on.

In [6]:
from pyspark.ml.feature import StringIndexer

labeler = StringIndexer(inputCol="features", outputCol="encoded")
df = labeler.fit(df).transform(df)
df.show()

IllegalArgumentException: requirement failed: The input column features must be either string type or numeric type, but got org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7.

You should be able to see the new column named `encoded` with new values populated.

In [None]:
# try doing this if you've already imported pandas
# pd.DataFrame(df.take(10), columns=df.columns).transpose()

Now we have transformed the data as we needed, we can now split the data into train/test dataset.

In [7]:
train, test = df.randomSplit([0.7, 0.3], seed=42) #feel free to change the numbers in the random split or seed
print(f"Train dataset count: {str(train.count())}")
print(f"Test dataset count: {str(test.count())}")

Train dataset count: 104
Test dataset count: 46


Let's instantiate the `RandomForestClassifier` and run the model. At this point, feel free to pull up the Spark UI from `localhost:4040` and examine the executors tab.

In [None]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol="features", labelCol="encoded")
model = rf.fit(train)
predictions = model.transform(test)


In [None]:
# if the columns names here are different, do a `printSchema` on top of predictions to see the correct column names
predictions.select('sepal_length', 'sepal_width', 'petal_length', 'petal_width', 'encoded', 'rawPrediction', 'prediction', 'probability')

`featuresCol` is the list of features of the dataframe, which means if you have more features you'd like to include, you could put in a list. We create a model by fitting the training dataset, then predict on using the test dataset. `model.transform(test)` will create new columns, like `rawPrediction`, `prediction`, and `probability`.

Now we've built a model, let's evaluate the model by using `MulticlassClassificationEvaluator`.

In [8]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol='encoded', predictionCol='prediction')
accuracy = evaluator.evaluate(prediction)
print(f"Accuracy: {accuracy}%")
test_error = 1.0 - accuracy
print(f"Test Error = {test_error}")

NameError: name 'prediction' is not defined

Question: How did we perform on the model? What other metrics can we use to present if the classification models performed well?

In [None]:
The model performed well with a test accuracy of 73%. 