## SQL and DataFrames

There are two approaches to Spark, the DataFrame approach and the RDD approach. We are going to learn the SQL approach since it is works in the way spark intends.

In [13]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

## Session

Spark needs to use a session in order to process data in a parallel way.

A session can be built in many different ways, what is going to be a difference maker for most local machines is that we need to specify to spark to either get it or create it.

We will use this session to define our Spark DataFrames.

When Creating DataFrames we can let spark infer the schema.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

In [2]:
spark = SparkSession.builder.appName("iris_clf").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/07/21 20:12:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df = spark.read.csv('data/iris.csv', header=True, inferSchema=True)
df.show(5)

+------------+-----------+------------+-----------+------+
|sepal_length|sepal_width|petal_length|petal_width|  type|
+------------+-----------+------------+-----------+------+
|         5.1|        3.5|         1.4|        0.2|Setosa|
|         4.9|        3.0|         1.4|        0.2|Setosa|
|         4.7|        3.2|         1.3|        0.2|Setosa|
|         4.6|        3.1|         1.5|        0.2|Setosa|
|         5.0|        3.6|         1.4|        0.2|Setosa|
+------------+-----------+------------+-----------+------+
only showing top 5 rows



Or we can create our own schema

In [4]:
schema = StructType([
    StructField("sepal_length", DoubleType()),
    StructField("sepal_width", DoubleType()),
    StructField("petal_length", DoubleType()),
    StructField("petal_width", DoubleType()),
    StructField("type", StringType(), True)
])

In [8]:
df2 = spark.read.csv('data/iris.csv', header=True, schema=schema)
df2.show(5)

+------------+-----------+------------+-----------+------+
|sepal_length|sepal_width|petal_length|petal_width|  type|
+------------+-----------+------------+-----------+------+
|         5.1|        3.5|         1.4|        0.2|Setosa|
|         4.9|        3.0|         1.4|        0.2|Setosa|
|         4.7|        3.2|         1.3|        0.2|Setosa|
|         4.6|        3.1|         1.5|        0.2|Setosa|
|         5.0|        3.6|         1.4|        0.2|Setosa|
+------------+-----------+------------+-----------+------+
only showing top 5 rows



## Features

Spark doesn`t need the X and Y separated in the standard format.

As you will see throught the notebook spark will directly operate on the DataFrame after specifying an input and aoutput column.

By default Spark takes a column called deatures as the input in all classifiers and the Y column is called labels

We can create the  feature column by using a vector assembler

In [10]:
from pyspark.ml.feature import VectorAssembler

In [11]:
input_col = ['sepal_length', 'sepal_width', 'petal_length', 'petal_width']
vectorizer = VectorAssembler(inputCols = input_col, outputCol='features')

df = vectorizer.transform(df)

df.show(5)


+------------+-----------+------------+-----------+------+-----------------+
|sepal_length|sepal_width|petal_length|petal_width|  type|         features|
+------------+-----------+------------+-----------+------+-----------------+
|         5.1|        3.5|         1.4|        0.2|Setosa|[5.1,3.5,1.4,0.2]|
|         4.9|        3.0|         1.4|        0.2|Setosa|[4.9,3.0,1.4,0.2]|
|         4.7|        3.2|         1.3|        0.2|Setosa|[4.7,3.2,1.3,0.2]|
|         4.6|        3.1|         1.5|        0.2|Setosa|[4.6,3.1,1.5,0.2]|
|         5.0|        3.6|         1.4|        0.2|Setosa|[5.0,3.6,1.4,0.2]|
+------------+-----------+------------+-----------+------+-----------------+
only showing top 5 rows



## Encoding

We can use a string indexes in the same way as the vector assembler to ordinally encode our types

In [12]:
from pyspark.ml.feature import StringIndexer

In [15]:
indexer = StringIndexer(inputCol='type', outputCol='indexed_type')
df = indexer.fit(df).transform(df)
df.show(5)

+------------+-----------+------------+-----------+------+-----------------+------------+
|sepal_length|sepal_width|petal_length|petal_width|  type|         features|indexed_type|
+------------+-----------+------------+-----------+------+-----------------+------------+
|         5.1|        3.5|         1.4|        0.2|Setosa|[5.1,3.5,1.4,0.2]|         0.0|
|         4.9|        3.0|         1.4|        0.2|Setosa|[4.9,3.0,1.4,0.2]|         0.0|
|         4.7|        3.2|         1.3|        0.2|Setosa|[4.7,3.2,1.3,0.2]|         0.0|
|         4.6|        3.1|         1.5|        0.2|Setosa|[4.6,3.1,1.5,0.2]|         0.0|
|         5.0|        3.6|         1.4|        0.2|Setosa|[5.0,3.6,1.4,0.2]|         0.0|
+------------+-----------+------------+-----------+------+-----------------+------------+
only showing top 5 rows



## Train/Test split

Spark Dataframes come pre-equipped with a random split function that will give you as many portions as specified.

The proportions for each portion are passed in a list

In [17]:
df_train, df_test = df.randomSplit([0.7, 0.3], seed=1)

df_train.show()

+------------+-----------+------------+-----------+----------+-----------------+------------+
|sepal_length|sepal_width|petal_length|petal_width|      type|         features|indexed_type|
+------------+-----------+------------+-----------+----------+-----------------+------------+
|         4.3|        3.0|         1.1|        0.1|    Setosa|[4.3,3.0,1.1,0.1]|         0.0|
|         4.4|        2.9|         1.4|        0.2|    Setosa|[4.4,2.9,1.4,0.2]|         0.0|
|         4.4|        3.0|         1.3|        0.2|    Setosa|[4.4,3.0,1.3,0.2]|         0.0|
|         4.4|        3.2|         1.3|        0.2|    Setosa|[4.4,3.2,1.3,0.2]|         0.0|
|         4.6|        3.2|         1.4|        0.2|    Setosa|[4.6,3.2,1.4,0.2]|         0.0|
|         4.6|        3.4|         1.4|        0.3|    Setosa|[4.6,3.4,1.4,0.3]|         0.0|
|         4.6|        3.6|         1.0|        0.2|    Setosa|[4.6,3.6,1.0,0.2]|         0.0|
|         4.7|        3.2|         1.3|        0.2|    Setos

## Classifiers

Many Spark classifiers unfortunatelly do not handle good multylabel classification so be very carefull with which you choose.

they can all be found here: https://spark.apache.org/docs/latest/ml-classification-regression.html

In [18]:
from pyspark.ml.classification import RandomForestClassifier

## Specifying input and target

As I said the default names are features an label, but we can also specify them

In [19]:
rf_clf = RandomForestClassifier(featuresCol='features', labelCol='indexed_type')

## Fitting

It is done in the same way as SKlearn

In [20]:
rf_clf = rf_clf.fit(df_train)

## Predictions
    
This part is a little different. Spark will not output a prediction vector, it will direclty add a column to the DataFrame.

To predict we call the method 'transform' from the classifier

In [21]:
df_test = rf_clf.transform(df_test)
df_test.show(5)

+------------+-----------+------------+-----------+------+-----------------+------------+--------------+---------------+----------+
|sepal_length|sepal_width|petal_length|petal_width|  type|         features|indexed_type| rawPrediction|    probability|prediction|
+------------+-----------+------------+-----------+------+-----------------+------------+--------------+---------------+----------+
|         4.5|        2.3|         1.3|        0.3|Setosa|[4.5,2.3,1.3,0.3]|         0.0|[20.0,0.0,0.0]|  [1.0,0.0,0.0]|       0.0|
|         4.6|        3.1|         1.5|        0.2|Setosa|[4.6,3.1,1.5,0.2]|         0.0|[20.0,0.0,0.0]|  [1.0,0.0,0.0]|       0.0|
|         4.8|        3.1|         1.6|        0.2|Setosa|[4.8,3.1,1.6,0.2]|         0.0|[20.0,0.0,0.0]|  [1.0,0.0,0.0]|       0.0|
|         4.8|        3.4|         1.6|        0.2|Setosa|[4.8,3.4,1.6,0.2]|         0.0|[20.0,0.0,0.0]|  [1.0,0.0,0.0]|       0.0|
|         4.8|        3.4|         1.9|        0.2|Setosa|[4.8,3.4,1.9,0.2]|

## Selecting

This structures are built to be parallelized in the CPU so we cannot access them in a standard fashion.

To get a subset of columns we need to use select

In [24]:
df_test.select('type','probability','indexed_type','prediction').show(10)


+----------+---------------+------------+----------+
|      type|    probability|indexed_type|prediction|
+----------+---------------+------------+----------+
|    Setosa|  [1.0,0.0,0.0]|         0.0|       0.0|
|    Setosa|  [1.0,0.0,0.0]|         0.0|       0.0|
|    Setosa|  [1.0,0.0,0.0]|         0.0|       0.0|
|    Setosa|  [1.0,0.0,0.0]|         0.0|       0.0|
|    Setosa|[0.75,0.25,0.0]|         0.0|       0.0|
|Versicolor|  [0.0,1.0,0.0]|         1.0|       1.0|
| Virginica|[0.0,0.85,0.15]|         2.0|       1.0|
|    Setosa|  [1.0,0.0,0.0]|         0.0|       0.0|
|Versicolor|  [0.0,1.0,0.0]|         1.0|       1.0|
|    Setosa|[0.95,0.05,0.0]|         0.0|       0.0|
+----------+---------------+------------+----------+
only showing top 10 rows



## Evaluating

In a very familiar format 

In [25]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [26]:
criterion = MulticlassClassificationEvaluator(labelCol='indexed_type')

In [28]:
acc = criterion.evaluate(df_test)
print(f'The Accurcy of the predictions is : {acc*100} %')

The Accurcy of the predictions is : 92.53373313343329 %
