In [1]:
from pyspark.sql import SparkSession  # entry point for pyspark

# instantiate spark instance
spark = (
    SparkSession.builder.appName("Random Forest Iris").master("local[*]").getOrCreate()
)

In [2]:
df = spark.read.csv("./data/iris.data", header=False, inferSchema=True)
df.printSchema()  # to see the schema

root
 |-- _c0: double (nullable = true)
 |-- _c1: double (nullable = true)
 |-- _c2: double (nullable = true)
 |-- _c3: double (nullable = true)
 |-- _c4: string (nullable = true)



In [3]:
df = df.withColumnRenamed('_c0', 'sepal_length')
df = df.withColumnRenamed('_c1', 'sepal_width')
df = df.withColumnRenamed('_c2', 'petal_length')
df = df.withColumnRenamed('_c3', 'petal_width')
df = df.withColumnRenamed('_c4', 'species')
df.printSchema()

root
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- species: string (nullable = true)



In [4]:
df.show()

+------------+-----------+------------+-----------+-----------+
|sepal_length|sepal_width|petal_length|petal_width|    species|
+------------+-----------+------------+-----------+-----------+
|         5.1|        3.5|         1.4|        0.2|Iris-setosa|
|         4.9|        3.0|         1.4|        0.2|Iris-setosa|
|         4.7|        3.2|         1.3|        0.2|Iris-setosa|
|         4.6|        3.1|         1.5|        0.2|Iris-setosa|
|         5.0|        3.6|         1.4|        0.2|Iris-setosa|
|         5.4|        3.9|         1.7|        0.4|Iris-setosa|
|         4.6|        3.4|         1.4|        0.3|Iris-setosa|
|         5.0|        3.4|         1.5|        0.2|Iris-setosa|
|         4.4|        2.9|         1.4|        0.2|Iris-setosa|
|         4.9|        3.1|         1.5|        0.1|Iris-setosa|
|         5.4|        3.7|         1.5|        0.2|Iris-setosa|
|         4.8|        3.4|         1.6|        0.2|Iris-setosa|
|         4.8|        3.0|         1.4| 

In [5]:
type(df)

pyspark.sql.dataframe.DataFrame

In [6]:
import pandas as pd
pandas_df = pd.DataFrame(df.take(100), columns=df.columns)
pandas_df.describe()

Unnamed: 0,sepal_length,sepal_width,petal_length,petal_width
count,100.0,100.0,100.0,100.0
mean,5.471,3.094,2.862,0.785
std,0.641698,0.476057,1.448565,0.566288
min,4.3,2.0,1.0,0.1
25%,5.0,2.8,1.5,0.2
50%,5.4,3.05,2.45,0.8
75%,5.9,3.4,4.325,1.3
max,7.0,4.4,5.1,1.8


In [7]:
pandas_df.dtypes

sepal_length    float64
sepal_width     float64
petal_length    float64
petal_width     float64
species          object
dtype: object

In [8]:
from pyspark.ml.feature import VectorAssembler

numeric_cols = [
    "sepal_length",
    "sepal_width",
    "petal_length",
    "petal_width",
]  # insert numeric cols
assembler = VectorAssembler(inputCols=numeric_cols, outputCol="features")
df = assembler.transform(df)  # just use the same dataframe
df.show()

+------------+-----------+------------+-----------+-----------+-----------------+
|sepal_length|sepal_width|petal_length|petal_width|    species|         features|
+------------+-----------+------------+-----------+-----------+-----------------+
|         5.1|        3.5|         1.4|        0.2|Iris-setosa|[5.1,3.5,1.4,0.2]|
|         4.9|        3.0|         1.4|        0.2|Iris-setosa|[4.9,3.0,1.4,0.2]|
|         4.7|        3.2|         1.3|        0.2|Iris-setosa|[4.7,3.2,1.3,0.2]|
|         4.6|        3.1|         1.5|        0.2|Iris-setosa|[4.6,3.1,1.5,0.2]|
|         5.0|        3.6|         1.4|        0.2|Iris-setosa|[5.0,3.6,1.4,0.2]|
|         5.4|        3.9|         1.7|        0.4|Iris-setosa|[5.4,3.9,1.7,0.4]|
|         4.6|        3.4|         1.4|        0.3|Iris-setosa|[4.6,3.4,1.4,0.3]|
|         5.0|        3.4|         1.5|        0.2|Iris-setosa|[5.0,3.4,1.5,0.2]|
|         4.4|        2.9|         1.4|        0.2|Iris-setosa|[4.4,2.9,1.4,0.2]|
|         4.9|  

In [9]:
from pyspark.ml.feature import StringIndexer

labeler = StringIndexer(inputCol="species", outputCol="encoded")
df = labeler.fit(df).transform(df)
df.show()

+------------+-----------+------------+-----------+-----------+-----------------+-------+
|sepal_length|sepal_width|petal_length|petal_width|    species|         features|encoded|
+------------+-----------+------------+-----------+-----------+-----------------+-------+
|         5.1|        3.5|         1.4|        0.2|Iris-setosa|[5.1,3.5,1.4,0.2]|    0.0|
|         4.9|        3.0|         1.4|        0.2|Iris-setosa|[4.9,3.0,1.4,0.2]|    0.0|
|         4.7|        3.2|         1.3|        0.2|Iris-setosa|[4.7,3.2,1.3,0.2]|    0.0|
|         4.6|        3.1|         1.5|        0.2|Iris-setosa|[4.6,3.1,1.5,0.2]|    0.0|
|         5.0|        3.6|         1.4|        0.2|Iris-setosa|[5.0,3.6,1.4,0.2]|    0.0|
|         5.4|        3.9|         1.7|        0.4|Iris-setosa|[5.4,3.9,1.7,0.4]|    0.0|
|         4.6|        3.4|         1.4|        0.3|Iris-setosa|[4.6,3.4,1.4,0.3]|    0.0|
|         5.0|        3.4|         1.5|        0.2|Iris-setosa|[5.0,3.4,1.5,0.2]|    0.0|
|         

In [10]:
pd.DataFrame(df.take(10), columns=df.columns)

Unnamed: 0,sepal_length,sepal_width,petal_length,petal_width,species,features,encoded
0,5.1,3.5,1.4,0.2,Iris-setosa,"[5.1, 3.5, 1.4, 0.2]",0.0
1,4.9,3.0,1.4,0.2,Iris-setosa,"[4.9, 3.0, 1.4, 0.2]",0.0
2,4.7,3.2,1.3,0.2,Iris-setosa,"[4.7, 3.2, 1.3, 0.2]",0.0
3,4.6,3.1,1.5,0.2,Iris-setosa,"[4.6, 3.1, 1.5, 0.2]",0.0
4,5.0,3.6,1.4,0.2,Iris-setosa,"[5.0, 3.6, 1.4, 0.2]",0.0
5,5.4,3.9,1.7,0.4,Iris-setosa,"[5.4, 3.9, 1.7, 0.4]",0.0
6,4.6,3.4,1.4,0.3,Iris-setosa,"[4.6, 3.4, 1.4, 0.3]",0.0
7,5.0,3.4,1.5,0.2,Iris-setosa,"[5.0, 3.4, 1.5, 0.2]",0.0
8,4.4,2.9,1.4,0.2,Iris-setosa,"[4.4, 2.9, 1.4, 0.2]",0.0
9,4.9,3.1,1.5,0.1,Iris-setosa,"[4.9, 3.1, 1.5, 0.1]",0.0


In [11]:
train, test = df.randomSplit(
    [0.7, 0.3], seed=42
)
print(f"Train dataset count: {str(train.count())}")
print(f"Test dataset count: {str(test.count())}")

Train dataset count: 104
Test dataset count: 46


In [12]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol="features", labelCol="encoded")
model = rf.fit(train)
predictions = model.transform(test)

In [13]:
# if the columns names here are different, do a `printSchema` on top of predictions to see the correct column names
predictions.select(
    "sepal_length",
    "sepal_width",
    "petal_length",
    "petal_width",
    "encoded",
    "rawPrediction",
    "prediction",
    "probability",
)

DataFrame[sepal_length: double, sepal_width: double, petal_length: double, petal_width: double, encoded: double, rawPrediction: vector, prediction: double, probability: vector]

In [15]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(
    labelCol="encoded", predictionCol="prediction"
)
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}%")
test_error = 1.0 - accuracy
print(f"Test Error = {test_error}")

Accuracy: 0.978458139351377%
Test Error = 0.021541860648622957
