## Spark

In the same way that for many programs hellow word is the first program in spark is computing pi


In [28]:
# import random
# NUM_SAMPLES = 100000000
# def inside(p):
#  x, y = random.random(), random.random()
#  return x*x + y*y < 1
# count = sc.parallelize(range(0, NUM_SAMPLES)).filter(inside).count()
# pi = 4 * count / NUM_SAMPLES
# print("Pi is roughly", pi)

## SQL and DataFrames

There are two approaches to Spark, the DataFrame approach and the RDD approach. We are going to learn the SQL approach since it is works in the way spark intends.

In [29]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

## Session

Spark needs to use a session in order to process data in a parallel way.

A session can be built in many different ways, what is going to be a difference maker for most local machines is that we need to specify to spark to either get it or create it.

We will use this session to define our Spark DataFrames.

When Creating DataFrames we can let spark infer the schema.

In [30]:
spark = SparkSession.builder.appName('iris').getOrCreate()
df = spark.read.csv('data/iris.csv',inferSchema=True,header=True)
df.printSchema()

root
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- type: string (nullable = true)



Or we can create our own schema

In [31]:
schema = StructType([
    StructField('sepal_length',DoubleType()),
    StructField('sepal_width',DoubleType()),
    StructField('petal_length',DoubleType()),
    StructField('petal_width',DoubleType()),
    StructField('type',StringType()),
])

In [32]:
df = spark.read.csv('data/iris.csv',schema=schema,header=True)
df.printSchema()

root
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- type: string (nullable = true)



## Features

Spark doesn`t need the X and Y separated in the standard format.

As you will see throught the notebook spark will directly operate on the DataFrame after specifying an input and aoutput column.

By default Spark takes a column called deatures as the input in all classifiers and the Y column is called labels

We can create the  feature column by using a vector assembler

In [33]:
numericCols = ['sepal_length', 'sepal_width', 'petal_length', 'petal_width']
assembler = VectorAssembler(inputCols=numericCols,outputCol='feature')
df = assembler.transform(df)
df.show()

+------------+-----------+------------+-----------+------+-----------------+
|sepal_length|sepal_width|petal_length|petal_width|  type|          feature|
+------------+-----------+------------+-----------+------+-----------------+
|         5.1|        3.5|         1.4|        0.2|Setosa|[5.1,3.5,1.4,0.2]|
|         4.9|        3.0|         1.4|        0.2|Setosa|[4.9,3.0,1.4,0.2]|
|         4.7|        3.2|         1.3|        0.2|Setosa|[4.7,3.2,1.3,0.2]|
|         4.6|        3.1|         1.5|        0.2|Setosa|[4.6,3.1,1.5,0.2]|
|         5.0|        3.6|         1.4|        0.2|Setosa|[5.0,3.6,1.4,0.2]|
|         5.4|        3.9|         1.7|        0.4|Setosa|[5.4,3.9,1.7,0.4]|
|         4.6|        3.4|         1.4|        0.3|Setosa|[4.6,3.4,1.4,0.3]|
|         5.0|        3.4|         1.5|        0.2|Setosa|[5.0,3.4,1.5,0.2]|
|         4.4|        2.9|         1.4|        0.2|Setosa|[4.4,2.9,1.4,0.2]|
|         4.9|        3.1|         1.5|        0.1|Setosa|[4.9,3.1,1.5,0.1]|

## Encoding

We can use a string indexes in the same way as the vector assembler to ordinally encode our types

In [34]:
indexer = StringIndexer(inputCol='type',outputCol='target')
df = indexer.fit(df).transform(df)
df.show()

+------------+-----------+------------+-----------+------+-----------------+------+
|sepal_length|sepal_width|petal_length|petal_width|  type|          feature|target|
+------------+-----------+------------+-----------+------+-----------------+------+
|         5.1|        3.5|         1.4|        0.2|Setosa|[5.1,3.5,1.4,0.2]|   2.0|
|         4.9|        3.0|         1.4|        0.2|Setosa|[4.9,3.0,1.4,0.2]|   2.0|
|         4.7|        3.2|         1.3|        0.2|Setosa|[4.7,3.2,1.3,0.2]|   2.0|
|         4.6|        3.1|         1.5|        0.2|Setosa|[4.6,3.1,1.5,0.2]|   2.0|
|         5.0|        3.6|         1.4|        0.2|Setosa|[5.0,3.6,1.4,0.2]|   2.0|
|         5.4|        3.9|         1.7|        0.4|Setosa|[5.4,3.9,1.7,0.4]|   2.0|
|         4.6|        3.4|         1.4|        0.3|Setosa|[4.6,3.4,1.4,0.3]|   2.0|
|         5.0|        3.4|         1.5|        0.2|Setosa|[5.0,3.4,1.5,0.2]|   2.0|
|         4.4|        2.9|         1.4|        0.2|Setosa|[4.4,2.9,1.4,0.2]|

In [35]:
df.show()

+------------+-----------+------------+-----------+------+-----------------+------+
|sepal_length|sepal_width|petal_length|petal_width|  type|          feature|target|
+------------+-----------+------------+-----------+------+-----------------+------+
|         5.1|        3.5|         1.4|        0.2|Setosa|[5.1,3.5,1.4,0.2]|   2.0|
|         4.9|        3.0|         1.4|        0.2|Setosa|[4.9,3.0,1.4,0.2]|   2.0|
|         4.7|        3.2|         1.3|        0.2|Setosa|[4.7,3.2,1.3,0.2]|   2.0|
|         4.6|        3.1|         1.5|        0.2|Setosa|[4.6,3.1,1.5,0.2]|   2.0|
|         5.0|        3.6|         1.4|        0.2|Setosa|[5.0,3.6,1.4,0.2]|   2.0|
|         5.4|        3.9|         1.7|        0.4|Setosa|[5.4,3.9,1.7,0.4]|   2.0|
|         4.6|        3.4|         1.4|        0.3|Setosa|[4.6,3.4,1.4,0.3]|   2.0|
|         5.0|        3.4|         1.5|        0.2|Setosa|[5.0,3.4,1.5,0.2]|   2.0|
|         4.4|        2.9|         1.4|        0.2|Setosa|[4.4,2.9,1.4,0.2]|

## Train/Test split

Spark Dataframes come pre-equipped with a random split function that will give you as many portions as specified.

The proportions for each portion are passed in a list

In [36]:
df_train,df_test  = df.randomSplit([0.7,0.3])

In [37]:
df_test.show()

+------------+-----------+------------+-----------+----------+-----------------+------+
|sepal_length|sepal_width|petal_length|petal_width|      type|          feature|target|
+------------+-----------+------------+-----------+----------+-----------------+------+
|         4.6|        3.6|         1.0|        0.2|    Setosa|[4.6,3.6,1.0,0.2]|   2.0|
|         4.7|        3.2|         1.3|        0.2|    Setosa|[4.7,3.2,1.3,0.2]|   2.0|
|         4.8|        3.0|         1.4|        0.1|    Setosa|[4.8,3.0,1.4,0.1]|   2.0|
|         4.8|        3.0|         1.4|        0.3|    Setosa|[4.8,3.0,1.4,0.3]|   2.0|
|         4.8|        3.1|         1.6|        0.2|    Setosa|[4.8,3.1,1.6,0.2]|   2.0|
|         4.8|        3.4|         1.6|        0.2|    Setosa|[4.8,3.4,1.6,0.2]|   2.0|
|         5.0|        3.0|         1.6|        0.2|    Setosa|[5.0,3.0,1.6,0.2]|   2.0|
|         5.0|        3.4|         1.5|        0.2|    Setosa|[5.0,3.4,1.5,0.2]|   2.0|
|         5.0|        3.6|      

## Classifiers

Many Spark classifiers unfortunatelly do not handle good multylabel classification so be very carefull with which you choose.

they can all be found here: https://spark.apache.org/docs/latest/ml-classification-regression.html

## Specifying input and target

As I said the default names are features an label, but we can also specify them

In [38]:
clf = RandomForestClassifier(featuresCol="feature",labelCol='target')

## Fitting

It is done in the same way as SKlearn

In [39]:
clf = clf.fit(df_train)

## Predictions
    
This part is a little different. Spark will not output a prediction vector, it will direclty add a column to the DataFrame.

To predict we call the method 'transform' from the classifier

In [40]:
prediction = clf.transform(df_test)
prediction.show(5)

+------------+-----------+------------+-----------+------+-----------------+------+--------------+-------------+----------+
|sepal_length|sepal_width|petal_length|petal_width|  type|          feature|target| rawPrediction|  probability|prediction|
+------------+-----------+------------+-----------+------+-----------------+------+--------------+-------------+----------+
|         4.6|        3.6|         1.0|        0.2|Setosa|[4.6,3.6,1.0,0.2]|   2.0|[0.0,0.0,20.0]|[0.0,0.0,1.0]|       2.0|
|         4.7|        3.2|         1.3|        0.2|Setosa|[4.7,3.2,1.3,0.2]|   2.0|[0.0,0.0,20.0]|[0.0,0.0,1.0]|       2.0|
|         4.8|        3.0|         1.4|        0.1|Setosa|[4.8,3.0,1.4,0.1]|   2.0|[0.0,0.0,20.0]|[0.0,0.0,1.0]|       2.0|
|         4.8|        3.0|         1.4|        0.3|Setosa|[4.8,3.0,1.4,0.3]|   2.0|[0.0,0.0,20.0]|[0.0,0.0,1.0]|       2.0|
|         4.8|        3.1|         1.6|        0.2|Setosa|[4.8,3.1,1.6,0.2]|   2.0|[0.0,0.0,20.0]|[0.0,0.0,1.0]|       2.0|
+-------

## Selecting

This structures are built to be parallelized in the CPU so we cannot access them in a standard fashion.

To get a subset of columns we need to use select

In [41]:
prediction.select(['type','target','probability','prediction']).show()

+----------+------+--------------------+----------+
|      type|target|         probability|prediction|
+----------+------+--------------------+----------+
|    Setosa|   2.0|       [0.0,0.0,1.0]|       2.0|
|    Setosa|   2.0|       [0.0,0.0,1.0]|       2.0|
|    Setosa|   2.0|       [0.0,0.0,1.0]|       2.0|
|    Setosa|   2.0|       [0.0,0.0,1.0]|       2.0|
|    Setosa|   2.0|       [0.0,0.0,1.0]|       2.0|
|    Setosa|   2.0|       [0.0,0.0,1.0]|       2.0|
|    Setosa|   2.0|       [0.0,0.0,1.0]|       2.0|
|    Setosa|   2.0|       [0.0,0.0,1.0]|       2.0|
|    Setosa|   2.0|       [0.0,0.0,1.0]|       2.0|
|    Setosa|   2.0|       [0.0,0.0,1.0]|       2.0|
|    Setosa|   2.0|       [0.0,0.0,1.0]|       2.0|
|    Setosa|   2.0|       [0.0,0.0,1.0]|       2.0|
|Versicolor|   1.0|     [0.05,0.95,0.0]|       1.0|
|    Setosa|   2.0|     [0.0,0.05,0.95]|       2.0|
|Versicolor|   1.0|       [0.0,1.0,0.0]|       1.0|
|Versicolor|   1.0|       [0.0,1.0,0.0]|       1.0|
|Versicolor|

## Evaluating

In a very familiar format 

In [42]:
evaler = MulticlassClassificationEvaluator(labelCol='target')
acc = evaler.evaluate(prediction)
print(f'Accuracy = {acc}')
print(f'Test Error = {1-acc}')

Accuracy = 0.9768746534459702
Test Error = 0.023125346554029758
