In [1]:
import pyspark

In [2]:
sc = pyspark.SparkContext('local[*]')

In [3]:
rdd = sc.parallelize(range(1000))
rdd.takeSample(False, 5)

[41, 79, 17, 825, 758]

In [4]:
from pyspark.sql import SparkSession  # entry point for pyspark

# instantiate spark instance
spark = (
    SparkSession.builder.appName("Random Forest Iris").master("local[*]").getOrCreate()
)

In [6]:
df = spark.read.csv("./data/IRIS.csv", header=True, inferSchema=True)
df.printSchema()  # to see the schema

root
 |-- Id: integer (nullable = true)
 |-- SepalLengthCm: double (nullable = true)
 |-- SepalWidthCm: double (nullable = true)
 |-- PetalLengthCm: double (nullable = true)
 |-- PetalWidthCm: double (nullable = true)
 |-- Species: string (nullable = true)



In [7]:
df.show()  # or df.show(Truncate=false) if you'd like to see all the contents

+---+-------------+------------+-------------+------------+-----------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|
+---+-------------+------------+-------------+------------+-----------+
|  1|          5.1|         3.5|          1.4|         0.2|Iris-setosa|
|  2|          4.9|         3.0|          1.4|         0.2|Iris-setosa|
|  3|          4.7|         3.2|          1.3|         0.2|Iris-setosa|
|  4|          4.6|         3.1|          1.5|         0.2|Iris-setosa|
|  5|          5.0|         3.6|          1.4|         0.2|Iris-setosa|
|  6|          5.4|         3.9|          1.7|         0.4|Iris-setosa|
|  7|          4.6|         3.4|          1.4|         0.3|Iris-setosa|
|  8|          5.0|         3.4|          1.5|         0.2|Iris-setosa|
|  9|          4.4|         2.9|          1.4|         0.2|Iris-setosa|
| 10|          4.9|         3.1|          1.5|         0.1|Iris-setosa|
| 11|          5.4|         3.7|          1.5|         0.2|Iris-

In [8]:
type(df)

pyspark.sql.dataframe.DataFrame

In [10]:
!pip install pandas

Collecting pandas
  Downloading pandas-2.0.3-cp38-cp38-win_amd64.whl.metadata (18 kB)
Collecting tzdata>=2022.1 (from pandas)
  Downloading tzdata-2024.1-py2.py3-none-any.whl.metadata (1.4 kB)
Downloading pandas-2.0.3-cp38-cp38-win_amd64.whl (10.8 MB)
   ---------------------------------------- 0.0/10.8 MB ? eta -:--:--
   ---------------------------------------- 0.0/10.8 MB ? eta -:--:--
   ---------------------------------------- 0.1/10.8 MB 1.1 MB/s eta 0:00:11
    --------------------------------------- 0.3/10.8 MB 2.0 MB/s eta 0:00:06
   - -------------------------------------- 0.4/10.8 MB 2.5 MB/s eta 0:00:05
   -- ------------------------------------- 0.7/10.8 MB 3.2 MB/s eta 0:00:04
   --- ------------------------------------ 1.0/10.8 MB 4.1 MB/s eta 0:00:03
   ----- ---------------------------------- 1.4/10.8 MB 4.4 MB/s eta 0:00:03
   ------- -------------------------------- 1.9/10.8 MB 5.3 MB/s eta 0:00:02
   -------- ------------------------------- 2.4/10.8 MB 5.8 MB/s eta 

In [11]:
import pandas as pd

pandas_df = pd.DataFrame(df.take(100), columns=df.columns)
pandas_df.describe()

Unnamed: 0,Id,SepalLengthCm,SepalWidthCm,PetalLengthCm,PetalWidthCm
count,100.0,100.0,100.0,100.0,100.0
mean,50.5,5.471,3.094,2.862,0.785
std,29.011492,0.641698,0.476057,1.448565,0.566288
min,1.0,4.3,2.0,1.0,0.1
25%,25.75,5.0,2.8,1.5,0.2
50%,50.5,5.4,3.05,2.45,0.8
75%,75.25,5.9,3.4,4.325,1.3
max,100.0,7.0,4.4,5.1,1.8


In [12]:
pandas_df.dtypes

Id                 int64
SepalLengthCm    float64
SepalWidthCm     float64
PetalLengthCm    float64
PetalWidthCm     float64
Species           object
dtype: object

In [14]:
from pyspark.ml.feature import VectorAssembler

numeric_cols = [
    "SepalLengthCm",
    "SepalWidthCm",
    "PetalLengthCm",
    "PetalWidthCm",
]  # insert numeric cols
assembler = VectorAssembler(inputCols=numeric_cols, outputCol="features")
df = assembler.transform(df)  # just use the same dataframe
df.show()

+---+-------------+------------+-------------+------------+-----------+-----------------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|         features|
+---+-------------+------------+-------------+------------+-----------+-----------------+
|  1|          5.1|         3.5|          1.4|         0.2|Iris-setosa|[5.1,3.5,1.4,0.2]|
|  2|          4.9|         3.0|          1.4|         0.2|Iris-setosa|[4.9,3.0,1.4,0.2]|
|  3|          4.7|         3.2|          1.3|         0.2|Iris-setosa|[4.7,3.2,1.3,0.2]|
|  4|          4.6|         3.1|          1.5|         0.2|Iris-setosa|[4.6,3.1,1.5,0.2]|
|  5|          5.0|         3.6|          1.4|         0.2|Iris-setosa|[5.0,3.6,1.4,0.2]|
|  6|          5.4|         3.9|          1.7|         0.4|Iris-setosa|[5.4,3.9,1.7,0.4]|
|  7|          4.6|         3.4|          1.4|         0.3|Iris-setosa|[4.6,3.4,1.4,0.3]|
|  8|          5.0|         3.4|          1.5|         0.2|Iris-setosa|[5.0,3.4,1.5,0.2]|
|  9|     

In [16]:
from pyspark.ml.feature import StringIndexer

labeler = StringIndexer(inputCol="Species", outputCol="encoded")
df = labeler.fit(df).transform(df)
df.show()

+---+-------------+------------+-------------+------------+-----------+-----------------+-------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|         features|encoded|
+---+-------------+------------+-------------+------------+-----------+-----------------+-------+
|  1|          5.1|         3.5|          1.4|         0.2|Iris-setosa|[5.1,3.5,1.4,0.2]|    0.0|
|  2|          4.9|         3.0|          1.4|         0.2|Iris-setosa|[4.9,3.0,1.4,0.2]|    0.0|
|  3|          4.7|         3.2|          1.3|         0.2|Iris-setosa|[4.7,3.2,1.3,0.2]|    0.0|
|  4|          4.6|         3.1|          1.5|         0.2|Iris-setosa|[4.6,3.1,1.5,0.2]|    0.0|
|  5|          5.0|         3.6|          1.4|         0.2|Iris-setosa|[5.0,3.6,1.4,0.2]|    0.0|
|  6|          5.4|         3.9|          1.7|         0.4|Iris-setosa|[5.4,3.9,1.7,0.4]|    0.0|
|  7|          4.6|         3.4|          1.4|         0.3|Iris-setosa|[4.6,3.4,1.4,0.3]|    0.0|
|  8|          5.0| 

In [17]:
pd.DataFrame(df.take(10), columns=df.columns)

Unnamed: 0,Id,SepalLengthCm,SepalWidthCm,PetalLengthCm,PetalWidthCm,Species,features,encoded
0,1,5.1,3.5,1.4,0.2,Iris-setosa,"[5.1, 3.5, 1.4, 0.2]",0.0
1,2,4.9,3.0,1.4,0.2,Iris-setosa,"[4.9, 3.0, 1.4, 0.2]",0.0
2,3,4.7,3.2,1.3,0.2,Iris-setosa,"[4.7, 3.2, 1.3, 0.2]",0.0
3,4,4.6,3.1,1.5,0.2,Iris-setosa,"[4.6, 3.1, 1.5, 0.2]",0.0
4,5,5.0,3.6,1.4,0.2,Iris-setosa,"[5.0, 3.6, 1.4, 0.2]",0.0
5,6,5.4,3.9,1.7,0.4,Iris-setosa,"[5.4, 3.9, 1.7, 0.4]",0.0
6,7,4.6,3.4,1.4,0.3,Iris-setosa,"[4.6, 3.4, 1.4, 0.3]",0.0
7,8,5.0,3.4,1.5,0.2,Iris-setosa,"[5.0, 3.4, 1.5, 0.2]",0.0
8,9,4.4,2.9,1.4,0.2,Iris-setosa,"[4.4, 2.9, 1.4, 0.2]",0.0
9,10,4.9,3.1,1.5,0.1,Iris-setosa,"[4.9, 3.1, 1.5, 0.1]",0.0


In [18]:
train, test = df.randomSplit(
    [0.7, 0.3], seed=42
)
print(f"Train dataset count: {str(train.count())}")
print(f"Test dataset count: {str(test.count())}")

Train dataset count: 104
Test dataset count: 46


In [19]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol="features", labelCol="encoded")
model = rf.fit(train)
predictions = model.transform(test)


In [20]:
# if the columns names here are different, do a `printSchema` on top of predictions to see the correct column names
predictions.select(
    "SepalLengthCm",
    "SepalWidthCm",
    "PetalLengthCm",
    "PetalWidthCm",
    "encoded",
    "rawPrediction",
    "prediction",
    "probability",
)

DataFrame[SepalLengthCm: double, SepalWidthCm: double, PetalLengthCm: double, PetalWidthCm: double, encoded: double, rawPrediction: vector, prediction: double, probability: vector]

In [21]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(
    labelCol="encoded", predictionCol="prediction"
)
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}%")
test_error = 1.0 - accuracy
print(f"Test Error = {test_error}")

Accuracy: 0.9335469107551487%
Test Error = 0.06645308924485127
