In [68]:
import numpy as np
import os
import shutil
import tempfile

from pyspark.sql import SparkSession
from pyspark.sql.types import *

In [86]:
spark.version


'3.2.0'

In [69]:
# create a session which allows spark to run in background and start parallelizing
# alternate: spark = SparkSession.builder.appName("Name_here").getOrCreate()

spark = SparkSession.builder.appName("iris_class").getOrCreate()

In [70]:
# read data from csv and specify that there is a header and take the schema as u want

df = spark.read.csv('iris.csv', header = True, inferSchema=True)
df.printSchema()

root
 |-- sepal.length: double (nullable = true)
 |-- sepal.width: double (nullable = true)
 |-- petal.length: double (nullable = true)
 |-- petal.width: double (nullable = true)
 |-- variety: string (nullable = true)



In [71]:
df2 = df.withColumnRenamed("sepal.length","sepal_length")\
    .withColumnRenamed("sepal.width","sepal_width")\
    .withColumnRenamed("petal.length","petal_length")\
    .withColumnRenamed("petal.width","petal_width")
df2.printSchema()

root
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- variety: string (nullable = true)



In [72]:
# we can also create our own schema,%%!

schema = StructType( [
                        StructField('sepal.length', DoubleType()),
                        StructField('sepal.width', DoubleType()),
                        StructField('petal.length', DoubleType()),
                        StructField('petal.width', DoubleType()),
                        StructField('variety', StringType()),

])

In [73]:
df3 = spark.read.csv('iris.csv', header = True, schema = schema)
df3.printSchema()

root
 |-- sepal.length: double (nullable = true)
 |-- sepal.width: double (nullable = true)
 |-- petal.length: double (nullable = true)
 |-- petal.width: double (nullable = true)
 |-- variety: string (nullable = true)



In [74]:
df3.show(5)

+------------+-----------+------------+-----------+-------+
|sepal.length|sepal.width|petal.length|petal.width|variety|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| Setosa|
|         4.9|        3.0|         1.4|        0.2| Setosa|
|         4.7|        3.2|         1.3|        0.2| Setosa|
|         4.6|        3.1|         1.5|        0.2| Setosa|
|         5.0|        3.6|         1.4|        0.2| Setosa|
+------------+-----------+------------+-----------+-------+
only showing top 5 rows



In [75]:
from pyspark.ml.feature import VectorAssembler

In [77]:
# we need to create another column which is a vector that contains all of our data

input_col = ['sepal_length', 'sepal_width', 'petal_length', 'petal_width']

vectorizer = VectorAssembler(inputCols = input_col, outputCol = 'features')

new_df = vectorizer.transform(df2)

# Everthing that we do on spark is going to apply transformation to the same table generating new columns in it.

# and spark will parallelize this operation based on the number of cores that are available in the processor

In [78]:
new_df.show(5)

+------------+-----------+------------+-----------+-------+-----------------+
|sepal_length|sepal_width|petal_length|petal_width|variety|         features|
+------------+-----------+------------+-----------+-------+-----------------+
|         5.1|        3.5|         1.4|        0.2| Setosa|[5.1,3.5,1.4,0.2]|
|         4.9|        3.0|         1.4|        0.2| Setosa|[4.9,3.0,1.4,0.2]|
|         4.7|        3.2|         1.3|        0.2| Setosa|[4.7,3.2,1.3,0.2]|
|         4.6|        3.1|         1.5|        0.2| Setosa|[4.6,3.1,1.5,0.2]|
|         5.0|        3.6|         1.4|        0.2| Setosa|[5.0,3.6,1.4,0.2]|
+------------+-----------+------------+-----------+-------+-----------------+
only showing top 5 rows



In [79]:
# encode the target column
from pyspark.ml.feature import StringIndexer

In [81]:
indexer = StringIndexer(inputCol='variety',outputCol='indexed_variety')
new_df = indexer.fit(new_df).transform(new_df)
new_df.show(5)

+------------+-----------+------------+-----------+-------+-----------------+---------------+
|sepal_length|sepal_width|petal_length|petal_width|variety|         features|indexed_variety|
+------------+-----------+------------+-----------+-------+-----------------+---------------+
|         5.1|        3.5|         1.4|        0.2| Setosa|[5.1,3.5,1.4,0.2]|            0.0|
|         4.9|        3.0|         1.4|        0.2| Setosa|[4.9,3.0,1.4,0.2]|            0.0|
|         4.7|        3.2|         1.3|        0.2| Setosa|[4.7,3.2,1.3,0.2]|            0.0|
|         4.6|        3.1|         1.5|        0.2| Setosa|[4.6,3.1,1.5,0.2]|            0.0|
|         5.0|        3.6|         1.4|        0.2| Setosa|[5.0,3.6,1.4,0.2]|            0.0|
+------------+-----------+------------+-----------+-------+-----------------+---------------+
only showing top 5 rows



In [97]:
# randomSplit takes doubles or else throws error, https://stackoverflow.com/questions/68313877/i-get-py4jjavaerror-when-using-randomsplit-on-google-colab
df_train, df_test = new_df.randomSplit([80., 20.], seed=13) #we can also include validation
df_train.show(5)

+------------+-----------+------------+-----------+-------+-----------------+---------------+
|sepal_length|sepal_width|petal_length|petal_width|variety|         features|indexed_variety|
+------------+-----------+------------+-----------+-------+-----------------+---------------+
|         4.3|        3.0|         1.1|        0.1| Setosa|[4.3,3.0,1.1,0.1]|            0.0|
|         4.4|        3.0|         1.3|        0.2| Setosa|[4.4,3.0,1.3,0.2]|            0.0|
|         4.4|        3.2|         1.3|        0.2| Setosa|[4.4,3.2,1.3,0.2]|            0.0|
|         4.5|        2.3|         1.3|        0.3| Setosa|[4.5,2.3,1.3,0.3]|            0.0|
|         4.6|        3.2|         1.4|        0.2| Setosa|[4.6,3.2,1.4,0.2]|            0.0|
+------------+-----------+------------+-----------+-------+-----------------+---------------+
only showing top 5 rows



In [98]:
print('Total rows and columns in complete dataframe', (new_df.count(), len(new_df.columns)))
print('Total rows and columns in complete Train', (df_train.count(), len(df_train.columns)))
print('Total rows and columns in complete Test', (df_test.count(), len(df_test.columns)))

Total rows and columns in complete dataframe (150, 7)
Total rows and columns in complete Train (123, 7)
Total rows and columns in complete Test (27, 7)


In [99]:
from pyspark.ml.classification import RandomForestClassifier

In [100]:
rf_clf = RandomForestClassifier(featuresCol='features', labelCol='indexed_variety')
rf_clf = rf_clf.fit(df_train)

In [101]:
df_test = rf_clf.transform(df_test)
df_test.show(5)

+------------+-----------+------------+-----------+----------+-----------------+---------------+--------------------+--------------------+----------+
|sepal_length|sepal_width|petal_length|petal_width|   variety|         features|indexed_variety|       rawPrediction|         probability|prediction|
+------------+-----------+------------+-----------+----------+-----------------+---------------+--------------------+--------------------+----------+
|         4.4|        2.9|         1.4|        0.2|    Setosa|[4.4,2.9,1.4,0.2]|            0.0|      [20.0,0.0,0.0]|       [1.0,0.0,0.0]|       0.0|
|         4.6|        3.1|         1.5|        0.2|    Setosa|[4.6,3.1,1.5,0.2]|            0.0|      [20.0,0.0,0.0]|       [1.0,0.0,0.0]|       0.0|
|         4.8|        3.0|         1.4|        0.3|    Setosa|[4.8,3.0,1.4,0.3]|            0.0|      [20.0,0.0,0.0]|       [1.0,0.0,0.0]|       0.0|
|         5.0|        2.3|         3.3|        1.0|Versicolor|[5.0,2.3,3.3,1.0]|            1.0|[0.0

In [102]:
df_test.select('variety', 'features', 'indexed_variety', 'rawPrediction', 'probability', 'prediction').show(5)

+----------+-----------------+---------------+--------------------+--------------------+----------+
|   variety|         features|indexed_variety|       rawPrediction|         probability|prediction|
+----------+-----------------+---------------+--------------------+--------------------+----------+
|    Setosa|[4.4,2.9,1.4,0.2]|            0.0|      [20.0,0.0,0.0]|       [1.0,0.0,0.0]|       0.0|
|    Setosa|[4.6,3.1,1.5,0.2]|            0.0|      [20.0,0.0,0.0]|       [1.0,0.0,0.0]|       0.0|
|    Setosa|[4.8,3.0,1.4,0.3]|            0.0|      [20.0,0.0,0.0]|       [1.0,0.0,0.0]|       0.0|
|Versicolor|[5.0,2.3,3.3,1.0]|            1.0|[0.0,19.979166666...|[0.0,0.9989583333...|       1.0|
|    Setosa|[5.0,3.0,1.6,0.2]|            0.0|      [20.0,0.0,0.0]|       [1.0,0.0,0.0]|       0.0|
+----------+-----------------+---------------+--------------------+--------------------+----------+
only showing top 5 rows



In [103]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [108]:
criterion = MulticlassClassificationEvaluator(labelCol='indexed_variety', predictionCol='prediction')

In [109]:
acc = criterion.evaluate(df_test)
acc

1.0