# Using Apache Spark

Apache Spark is one of the most well-known multilanguage Big Data processing frameworks. With the creation of the Resilient Distributed Dataset (RDD)
and the introduction of in-memory computing, it has transformed the way in which big data is processed today. While the base API is in Scala, Apache Spark also provides APIs in Java, Python and R.

<div class="alert alert-info">
    <b> Learning Objectives </b>
    <ul>
        <li>Learn about PySpark's two main datastructure APIs (i.e. RDD and DataFrames)</li>
        <li> Understand how to rewrite tasks into map-reduce form</li>
        <li> Clean data and train a model using PySpark's ML API</li>
    </ul>
</div>

## How to install PySpark

PySpark has been already installed on all the VMs, but can be installed using `pip install pyspark`.

## How to configure PySpark to run in a Jupyter notebook

In [1]:
# some magic to make sure Spark is configured properly
import subprocess as sp

python_home = sp.check_output(["which", "python3"], encoding="UTF-8")
ipython_home = sp.check_output(["which", "ipython"], encoding="UTF-8")
java_home = sp.check_output("dirname $(dirname $(readlink -f $(which java)))", shell=True, encoding="UTF-8")

%env IPYTHON=1
%env PYSPARK_PYTHON={python_home}
%env PYSPARK_DRIVER_PYTHON={ipython_home}
%env PYSPARK_DRIVER_PYTHON_OPTS=notebook
%env JAVA_HOME={java_home}

env: IPYTHON=1
env: PYSPARK_PYTHON=/home/valeriehayot/ericsson/dask/.venv/bin/python3
env: PYSPARK_DRIVER_PYTHON=/home/valeriehayot/ericsson/dask/.venv/bin/ipython
env: PYSPARK_DRIVER_PYTHON_OPTS=notebook
env: JAVA_HOME=/usr/lib/jvm/java-11-openjdk-11.0.13.0.8-1.fc34.x86_64


## PySpark Datastructure APIs

PySpark has two main datastructure APIs to facilitate data processing. These are:
- Resilient Distributed Dataset (RDD)
- DataFrames

In this section, we will discuss what they are and how to use them efficiently.

### Resilient Distributed Datasets (RDD)

RDDs are the base datastructure in Spark. They can be viewed as a distributed collection of lists.
RDDs can be used similarly to the Dask Bags we learned about last week.<br>

RDDs are:
- immutable
- lazily evaluated
- maintain intermediate results in memory

#### How to create a Spark RDD context

A SparkContext provides information to Spark on how to access the cluster. It uses information provided in the `SparkConf` to inform it of the application requirements.

Below we are informing Spark to create a local context (i.e. use the Spark Standalone Scheduler) with just one core. To specify all the cores,
we would specify `local[*]` instead.

In [2]:
from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName("My Spark Application").setMaster("local[1]")
sc = SparkContext.getOrCreate(conf=conf)

21/11/24 20:08:17 WARN Utils: Your hostname, localhost.localdomain resolves to a loopback address: 127.0.0.1; using 172.31.172.55 instead (on interface wlp2s0)
21/11/24 20:08:17 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/11/24 20:08:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Now that our Spark Context has been created, we can now access the dashboard through a browser on `<VM IP>:4040` to view our application's status.

In [3]:
import socket
hostname = socket.gethostname()
url = f"http://{socket.gethostbyname(hostname)}:4040"
print("Spark dashboard url:", url)

Spark dashboard url: http://127.0.0.1:4040


Using the above context, we can now create an RDD. Below we will create an RDD from a Python iterable.

In [4]:
rdd = sc.parallelize(range(10))
rdd

PythonRDD[1] at RDD at PythonRDD.scala:53

As we saw last week in Dask, Spark RDDs are also lazily evaluated. To compute the result, we must apply the `collect()` action to our RDD object.

In [5]:
rdd.collect()

                                                                                

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

To split up our list into chunks to be processed separately in Spark, we can provide a second parameter in `parallelize` (numSlices) to break up the list.

In [6]:
rdd = sc.parallelize(range(10), numSlices=5)
rdd.collect() # can now check the dashboard to see that chunks were processed individually

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

<div class="alert alert-info">
    <b> Some Spark terminology: </b>
    <ul>
        <li>Functions which create RDDs from existing ones are known as <b>transformations</b>. Examples of transformations include <code>map</code>, <code>filter</code>, <code>reduceByKey</code></li>
        <li>Functions which compute the results and return them to the driver are known as <b>actions</b>. Examples of actions include <code>reduce</code> and <code>collect</code>.</li>
    </ul>
</div>

#### The MapReduce Programming model

Many parallel problems can be expressed in map-reduce form. Map and reduce are not unique to Apache Spark or Big Data frameworks. In fact, they were first introduced by functional programming languages.

Map operations apply a common function across all the elements within a collection
(e.g. all the elements of an RDD or all the RDD partitions, in the case of Spark). Reduce operations perform an aggregration (e.g. sum, max, count) across all the elements.

A traditional example of a MapReduce use case is Word Count. The aim of Word Count is to obtain the count of all the unique words in a document.

A sequential implementation of word count could look something like this:

In [7]:
text = """Many parallel problems can be expressed in map-reduce form. 
          That is, apply a general function to all the elements (map)
          and then perform an aggretation of the previous results.
          A traditional example of a MapReduce use case is Word Count.
          The aim of Word Count is to obtain the count of all the unique
          words in a document.
          A sequential implementation of word count could look something like this"""


def word_count(counter, word):
    if word in counter:
        counter[word] += 1
    else:
        counter[word] = 1

# clean all the special characters from the text assuming we don't want them in our word count
# can be ommitted if we want special characters to remain
clean_text = "".join([char for char in text if char.isalnum() or char == " "])
counter = {}
for word in clean_text.split():
       word_count(counter, word.lower()) # convert word to lowercase. Can be ommitted
        
sorted(counter.items(), key=lambda x: [-x[1], x[0]])[0:15]

[('a', 5),
 ('of', 5),
 ('the', 5),
 ('count', 4),
 ('is', 3),
 ('word', 3),
 ('all', 2),
 ('in', 2),
 ('mapreduce', 2),
 ('to', 2),
 ('aggretation', 1),
 ('aim', 1),
 ('an', 1),
 ('and', 1),
 ('apply', 1)]

As we can see in the above example, we first need to apply a the count of 1 to each word, and then we need to sum the counts of each word. 

Within Python, functional programming constructs such as map/reduce also exist. Let's rewrite the above sequential code using Python's `map` and `reduce` functions. (note: map is a Python built-in, whereas reduce can be found within the `functools` library)

In [8]:
from typing import Tuple
from functools import reduce

text = """Many parallel problems can be expressed in map-reduce form. 
          That is, apply a general function to all the elements (map)
          and then perform an aggretation of the previous results.
          A traditional example of a MapReduce use case is Word Count.
          The aim of Word Count is to obtain the count of all the unique
          words in a document.
          A sequential implementation of word count could look something like this"""

def remove_special(word: str) -> Tuple[str, int]:
    """Removes all non-alphanumeric characters from words and returns its lowercase form
    
    Keyword arguments:
        word - the word to process
        
    Returns:
        A tuple where the first element is the lowercase word stripped of non-alphanumeric
        characters and the second is the words initial count (always 1).
    """
    word_chars= []
    for char in word:
        if char.isalnum():
            word_chars.append(char)
    
    return ("".join(word_chars).lower(), 1)

def reduce_by_key(accumulator: dict, entry: Tuple[str, int]) -> dict:
    """ Adds entry to the dictionary or increments entry value if entry is already in the dictionary
    
    Keyword arguments:
        accumulator - A dictionary containing the counts of all the previously seen entries
        entry - A tuple containing the entry to add
        
    Returns:
        A updated dictionary containing a count for the entry word.
    
    """
    if entry[0] in accumulator:
        accumulator[entry[0]] += entry[1]
    else:
        accumulator[entry[0]] = entry[1]
        
    return accumulator
    
clean_text = map(remove_special, text.split()) # remove special characters
clean_text = list(filter(lambda x: x != "", clean_text)) # filter out any empty strings as map mapping is 1-to-1
counter = reduce(reduce_by_key, clean_text, {}) # perform a reduce along the keys
sorted(counter.items(), key=lambda x: [-x[1], x[0]])[0:15]

[('a', 5),
 ('of', 5),
 ('the', 5),
 ('count', 4),
 ('is', 3),
 ('word', 3),
 ('all', 2),
 ('in', 2),
 ('mapreduce', 2),
 ('to', 2),
 ('aggretation', 1),
 ('aim', 1),
 ('an', 1),
 ('and', 1),
 ('apply', 1)]

The above functional programming can be directly mapped to Spark RDDs. In fact, it is even simpler to do using Spark as when elements in the RDD are in tuple format, they are naturally treated as key-value pairs.

In [9]:
text = """Many parallel problems can be expressed in map-reduce form. 
          That is, apply a general function to all the elements (map)
          and then perform an aggretation of the previous results.
          A traditional example of a MapReduce use case is Word Count.
          The aim of Word Count is to obtain the count of all the unique
          words in a document.
          A sequential implementation of word count could look something like this"""

# using the Spark Context created earlier
rdd = sc.parallelize(text.split())
cleaned_rdd = rdd.map(remove_special) # creating a new RDD using the function we created earlier
cleaned_rdd = cleaned_rdd.filter(lambda x: x != "")
counter_rdd = cleaned_rdd.reduceByKey(lambda x,y: x + y) # Spark provides a transformation to reduce over a key.
sorted_rdd = counter_rdd.sortBy(lambda x: (-x[1], x[0])) # Note: Sorting in Spark is not always reliable as data can be shuffled
sorted_rdd.take(15) # take is a Spark action and therefore compute the result

[('a', 5),
 ('of', 5),
 ('the', 5),
 ('count', 4),
 ('is', 3),
 ('word', 3),
 ('all', 2),
 ('in', 2),
 ('mapreduce', 2),
 ('to', 2),
 ('aggretation', 1),
 ('aim', 1),
 ('an', 1),
 ('and', 1),
 ('apply', 1)]

#### Exercise

Using the provided text, return the sorted counts of all the words beginning with a unique letter.

`e.g "Hello World. hi there. foo bar." -> [('h', 2), ('b', 1), ('f', 1), ('t', 1), ('w', 1)]`

In [10]:
text = """Many parallel problems can be expressed in map-reduce form. 
          That is, apply a general function to all the elements (map)
          and then perform an aggretation of the previous results.
          A traditional example of a MapReduce use case is Word Count.
          The aim of Word Count is to obtain the count of all the unique
          words in a document.
          A sequential implementation of word count could look something like this"""


# <Your code here>

#### Solution

In [11]:
text = """Many parallel problems can be expressed in map-reduce form. 
          That is, apply a general function to all the elements (map)
          and then perform an aggretation of the previous results.
          A traditional example of a MapReduce use case is Word Count.
          The aim of Word Count is to obtain the count of all the unique
          words in a document.
          A sequential implementation of word count could look something like this"""

rdd = sc.parallelize(text.split())
result = (
            rdd.map(remove_special)
               .filter(lambda x: x != "")
               .map(lambda x: (x[0][0], x[1]))
               .reduceByKey(lambda x,y: x+y)
               .sortBy(lambda x: (-x[1], x[0]))
               .collect()
         )
result

[('a', 12),
 ('t', 11),
 ('c', 7),
 ('i', 6),
 ('o', 6),
 ('m', 4),
 ('p', 4),
 ('w', 4),
 ('e', 3),
 ('f', 2),
 ('l', 2),
 ('s', 2),
 ('u', 2),
 ('b', 1),
 ('d', 1),
 ('g', 1),
 ('r', 1)]

#### Shuffling in Spark

Any function that requires data to be shuffled around in Spark incurs additional overheads. All actions, which must return the results
to the driver incur shuffling overheads, in addition to certain transformations.

Transformations can be subdivided into two categories: those with narrow and those with wide dependencies. Transformations with narrow dependencies do not produce any shuffling. Examples of such transformations include `map`, `filter`, `flatMap` .

Wide dependency transformations may require some amount of shuffling key-value pairs may need to be communicated with other partitions. 
All the transformations ending with `ByKey` may involve some form of shuffling (e.g. `reduceByKey`, `groupByKey`, `foldByKey`). In general, it is best to minimize both the amount of shuffling and the amount of data involved in shuffling (e.g. `reduceByKey` can be more efficient than a `groupByKey`)


#### Overhead of Python in PySpark

Just as in Python where Python built-ins are faster than user-defined functions (UDFs), Spark built-ins are significantly faster than
their equivalent using Python UDFs as a result of the increased communication overheads between Python UDFs and Scala backend. As a result, it is best to use PySpark built-ins whenever possible.

<b>Example: ReduceByKey vs CountByKey on word count</b>

ReduceByKey

In [12]:
%%timeit

text = """Many parallel problems can be expressed in map-reduce form. 
          That is, apply a general function to all the elements (map)
          and then perform an aggretation of the previous results.
          A traditional example of a MapReduce use case is Word Count.
          The aim of Word Count is to obtain the count of all the unique
          words in a document.
          A sequential implementation of word count could look something like this"""

(
    sc.parallelize(text.split(), numSlices=3)
    .map(remove_special)
    .reduceByKey(lambda x,y: x+y)
    .collectAsMap() 
)

259 ms ± 23.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


CountByKey

In [13]:
%%timeit

text = """Many parallel problems can be expressed in map-reduce form. 
          That is, apply a general function to all the elements (map)
          and then perform an aggretation of the previous results.
          A traditional example of a MapReduce use case is Word Count.
          The aim of Word Count is to obtain the count of all the unique
          words in a document.
          A sequential implementation of word count could look something like this"""

(
    sc.parallelize(text.split(), numSlices=3)
    .map(remove_special)
    .countByKey()
)

91.2 ms ± 7.09 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


#### How to terminate a SparkContext

In [14]:
sc.stop()

### DataFrames

The Spark DataFrame is used to process structured data. It is similar to the RDD, except that the additional information on datastructure enables it to perform further optimizations.

Since its parent structure is the RDD, it inherits all of the RDDs feature (i.e. immutable, lazy-evaluation, in-memory processing)

#### How to start a PySpark DataFrame session

Similar to the SparkContext, the SparkSession provides Spark with information on application requirements. The main difference between a SparkContext and a SparkSession is that SparkContexts create RDDs and SparkSessions create DataFrames.

Whereas with a SparkContext we had to create a separate SparkConf object, with SparkSession we can set application
properties from within the object.

In [15]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("My Spark DataFrame Program") \
    .master("local[1]") \
    .getOrCreate()

Using the created `SparkSession` we can now read a csv file. We will be reading the Iris dataset for this part

In [16]:
df = spark.read.csv("data/iris.csv", header=True)
df

DataFrame[sepal_length: string, sepal_width: string, petal_length: string, petal_width: string, species: string]

As with RDDs, to get the content of our DataFrame, we must use the `collect` action.

In [17]:
output = df.collect()
output[0:10]

[Row(sepal_length='5.1', sepal_width='3.5', petal_length='1.4', petal_width='0.2', species='setosa'),
 Row(sepal_length='4.9', sepal_width='3.0', petal_length='1.4', petal_width='0.2', species='setosa'),
 Row(sepal_length='4.7', sepal_width='3.2', petal_length='1.3', petal_width='0.2', species='setosa'),
 Row(sepal_length='4.6', sepal_width='3.1', petal_length='1.5', petal_width='0.2', species='setosa'),
 Row(sepal_length='5.0', sepal_width='3.6', petal_length='1.4', petal_width='0.2', species='setosa'),
 Row(sepal_length='5.4', sepal_width='3.9', petal_length='1.7', petal_width='0.4', species='setosa'),
 Row(sepal_length='4.6', sepal_width='3.4', petal_length='1.4', petal_width='0.3', species='setosa'),
 Row(sepal_length='5.0', sepal_width='3.4', petal_length='1.5', petal_width='0.2', species='setosa'),
 Row(sepal_length='4.4', sepal_width='2.9', petal_length='1.4', petal_width='0.2', species='setosa'),
 Row(sepal_length='4.9', sepal_width='3.1', petal_length='1.5', petal_width='0.1',

PySpark returns a list of Rows (similar to NamedTuples) rather than a DataFrame object. To access row values within the list, we can do so using their named index. 

In [18]:
output[0].sepal_length

'5.1'

In [19]:
output[0]["sepal_length"]

'5.1'

In [20]:
output[0][0]

'5.1'

Should we want formatted output, we can use the `show` function.

In [21]:
df.show()

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| setosa|
|         4.9|        3.0|         1.4|        0.2| setosa|
|         4.7|        3.2|         1.3|        0.2| setosa|
|         4.6|        3.1|         1.5|        0.2| setosa|
|         5.0|        3.6|         1.4|        0.2| setosa|
|         5.4|        3.9|         1.7|        0.4| setosa|
|         4.6|        3.4|         1.4|        0.3| setosa|
|         5.0|        3.4|         1.5|        0.2| setosa|
|         4.4|        2.9|         1.4|        0.2| setosa|
|         4.9|        3.1|         1.5|        0.1| setosa|
|         5.4|        3.7|         1.5|        0.2| setosa|
|         4.8|        3.4|         1.6|        0.2| setosa|
|         4.8|        3.0|         1.4|        0.1| setosa|
|         4.3|        3.0|         1.1| 

#### How to process a PySpark DataFrame

Spark DataFrames can be manipulated in one of three ways:
1. Using the default DataFrame API
2. Via SQL queries
3. Using pandas-on-spark

We will perform one-hot encoding of the species column using all three methods.

In [22]:
# Get species list
all_species = df.dropDuplicates(["species"]).select("species").collect()
all_species

[Row(species='virginica'), Row(species='versicolor'), Row(species='setosa')]

**1. Using the default DataFrame API**

In [23]:
from pyspark.sql import functions as F

df_api = None
for species in all_species:
    if df_api is None:
        df_api = df.withColumn(species.species, F.when(df.species == species.species, 1).otherwise(0))
    else:
        df_api = df_api.withColumn(species.species, F.when(df.species == species.species, 1).otherwise(0))

df_api.show()

+------------+-----------+------------+-----------+-------+---------+----------+------+
|sepal_length|sepal_width|petal_length|petal_width|species|virginica|versicolor|setosa|
+------------+-----------+------------+-----------+-------+---------+----------+------+
|         5.1|        3.5|         1.4|        0.2| setosa|        0|         0|     1|
|         4.9|        3.0|         1.4|        0.2| setosa|        0|         0|     1|
|         4.7|        3.2|         1.3|        0.2| setosa|        0|         0|     1|
|         4.6|        3.1|         1.5|        0.2| setosa|        0|         0|     1|
|         5.0|        3.6|         1.4|        0.2| setosa|        0|         0|     1|
|         5.4|        3.9|         1.7|        0.4| setosa|        0|         0|     1|
|         4.6|        3.4|         1.4|        0.3| setosa|        0|         0|     1|
|         5.0|        3.4|         1.5|        0.2| setosa|        0|         0|     1|
|         4.4|        2.9|      

**2. Using SQL queries**

In [24]:
df.createOrReplaceTempView("table_iris")

query = "SELECT *"

for species in all_species:
    query += f", CAST(CASE WHEN species LIKE '{species.species}' THEN 1 ELSE 0 END AS int) as {species.species}"
    
query += " FROM table_iris"

df_sql = spark.sql(query)
print("Submitted query:\n\n", query.replace(", ", ",\n"), "\n")
df_sql.show()

Submitted query:

 SELECT *,
CAST(CASE WHEN species LIKE 'virginica' THEN 1 ELSE 0 END AS int) as virginica,
CAST(CASE WHEN species LIKE 'versicolor' THEN 1 ELSE 0 END AS int) as versicolor,
CAST(CASE WHEN species LIKE 'setosa' THEN 1 ELSE 0 END AS int) as setosa FROM table_iris 

+------------+-----------+------------+-----------+-------+---------+----------+------+
|sepal_length|sepal_width|petal_length|petal_width|species|virginica|versicolor|setosa|
+------------+-----------+------------+-----------+-------+---------+----------+------+
|         5.1|        3.5|         1.4|        0.2| setosa|        0|         0|     1|
|         4.9|        3.0|         1.4|        0.2| setosa|        0|         0|     1|
|         4.7|        3.2|         1.3|        0.2| setosa|        0|         0|     1|
|         4.6|        3.1|         1.5|        0.2| setosa|        0|         0|     1|
|         5.0|        3.6|         1.4|        0.2| setosa|        0|         0|     1|
|         5.4|

**3. Using Pandas-on-Spark**<br/>
Pandas on Spark is a new API that was introduce in Spark 3.2.0 (the current latest version of Spark). It allows users to process Spark DataFrames using
Pandas functionality.

In [25]:
psdf = df.to_pandas_on_spark()
for species in all_species:
    psdf[species.species] = 0
    psdf[species.species] = psdf[species].where(psdf.species != species.species, 1)
psdf[0:15]

21/11/24 20:08:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
21/11/24 20:08:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
21/11/24 20:08:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


Unnamed: 0,sepal_length,sepal_width,petal_length,petal_width,species,virginica,versicolor,setosa
0,5.1,3.5,1.4,0.2,setosa,0,0,1
1,4.9,3.0,1.4,0.2,setosa,0,0,1
2,4.7,3.2,1.3,0.2,setosa,0,0,1
3,4.6,3.1,1.5,0.2,setosa,0,0,1
4,5.0,3.6,1.4,0.2,setosa,0,0,1
5,5.4,3.9,1.7,0.4,setosa,0,0,1
6,4.6,3.4,1.4,0.3,setosa,0,0,1
7,5.0,3.4,1.5,0.2,setosa,0,0,1
8,4.4,2.9,1.4,0.2,setosa,0,0,1
9,4.9,3.1,1.5,0.1,setosa,0,0,1


#### Converting between the RDD and DataFrame APIs
It is very simple to convert between both APIs. In this example we will access the RDD API using our existing DataFrame and use a `map` task to drop the species column, we will then revert back to the DataFrame API to display the data neatly with `show`.

**DataFrame to RDD**

In [26]:
from pyspark.sql import Row
rdd_fromdf = df_api.rdd # the RDD API can now be accessed.

# Create a new Row object without the Species column
rdd_fromdf = rdd_fromdf.map(lambda x: Row(**{k:x[k] for k,v in x.asDict().items() if k != "species"}))
rdd_fromdf.collect()[0:3]

                                                                                

[Row(sepal_length='5.1', sepal_width='3.5', petal_length='1.4', petal_width='0.2', virginica=0, versicolor=0, setosa=1),
 Row(sepal_length='4.9', sepal_width='3.0', petal_length='1.4', petal_width='0.2', virginica=0, versicolor=0, setosa=1),
 Row(sepal_length='4.7', sepal_width='3.2', petal_length='1.3', petal_width='0.2', virginica=0, versicolor=0, setosa=1)]

**RDD to DataFrame**

In [27]:
df_fromrdd = rdd_fromdf.toDF()
df_fromrdd.show()

+------------+-----------+------------+-----------+---------+----------+------+
|sepal_length|sepal_width|petal_length|petal_width|virginica|versicolor|setosa|
+------------+-----------+------------+-----------+---------+----------+------+
|         5.1|        3.5|         1.4|        0.2|        0|         0|     1|
|         4.9|        3.0|         1.4|        0.2|        0|         0|     1|
|         4.7|        3.2|         1.3|        0.2|        0|         0|     1|
|         4.6|        3.1|         1.5|        0.2|        0|         0|     1|
|         5.0|        3.6|         1.4|        0.2|        0|         0|     1|
|         5.4|        3.9|         1.7|        0.4|        0|         0|     1|
|         4.6|        3.4|         1.4|        0.3|        0|         0|     1|
|         5.0|        3.4|         1.5|        0.2|        0|         0|     1|
|         4.4|        2.9|         1.4|        0.2|        0|         0|     1|
|         4.9|        3.1|         1.5| 

In [28]:
# Terminating a SparkSession

spark.stop()

### Machine Learning With Spark MLlib (DataFrame-based)

Spark's MLlib API provides users with the ability to apply a variety of machine learning algorithms on their data.
Spark provides both an RDD and DataFrame-based API. We will be focusing on the DataFrame API.

In this section, we will be reusing the [palmerpenguins](https://allisonhorst.github.io/palmerpenguins/) dataset to train a Random Forest classifier.

#### Data Preparation with PySpark

The first step we want to do is load the data and remove any columns with NaN values.

In [29]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col


spark = SparkSession \
    .builder \
    .appName("My Spark DataFrame Program") \
    .master("local[1]") \
    .getOrCreate()

df = spark.read.csv("data/penguins.csv", header=True)
df = df.replace("?", None).replace("_", None).dropna()
df = df.select(*(col(c).cast("float").alias(c) 
                 if c != "species" and c != "island" and c != "sex" 
                 else c
                 for c in df.columns ))
df.show()

+-------+---------+----------------+---------------+-----------------+-----------+------+
|species|   island|culmen_length_mm|culmen_depth_mm|flipper_length_mm|body_mass_g|   sex|
+-------+---------+----------------+---------------+-----------------+-----------+------+
| Adelie|Torgersen|            39.1|           18.7|            181.0|     3750.0|  MALE|
| Adelie|Torgersen|            39.5|           17.4|            186.0|     3800.0|FEMALE|
| Adelie|Torgersen|            40.3|           18.0|            195.0|     3250.0|FEMALE|
| Adelie|Torgersen|            36.7|           19.3|            193.0|     3450.0|FEMALE|
| Adelie|Torgersen|            39.3|           20.6|            190.0|     3650.0|  MALE|
| Adelie|Torgersen|            38.9|           17.8|            181.0|     3625.0|FEMALE|
| Adelie|Torgersen|            39.2|           19.6|            195.0|     4675.0|  MALE|
| Adelie|Torgersen|            41.1|           17.6|            182.0|     3200.0|FEMALE|
| Adelie|T

Next, we will need to one-hot encode both the sex and island columns. To do so, we must first map the categorical data to categorical indices and then we can one-hot encode the data.

In [30]:
from pyspark.ml.feature import StringIndexer
stringIndexer = StringIndexer(inputCols=["island", "sex", "species"], 
                              outputCols=["island_idx", "sex_idx", "species_idx"],
                              stringOrderType="alphabetAsc")
model = stringIndexer.fit(df)
td = model.transform(df)

td.show()

+-------+---------+----------------+---------------+-----------------+-----------+------+----------+-------+-----------+
|species|   island|culmen_length_mm|culmen_depth_mm|flipper_length_mm|body_mass_g|   sex|island_idx|sex_idx|species_idx|
+-------+---------+----------------+---------------+-----------------+-----------+------+----------+-------+-----------+
| Adelie|Torgersen|            39.1|           18.7|            181.0|     3750.0|  MALE|       2.0|    1.0|        0.0|
| Adelie|Torgersen|            39.5|           17.4|            186.0|     3800.0|FEMALE|       2.0|    0.0|        0.0|
| Adelie|Torgersen|            40.3|           18.0|            195.0|     3250.0|FEMALE|       2.0|    0.0|        0.0|
| Adelie|Torgersen|            36.7|           19.3|            193.0|     3450.0|FEMALE|       2.0|    0.0|        0.0|
| Adelie|Torgersen|            39.3|           20.6|            190.0|     3650.0|  MALE|       2.0|    1.0|        0.0|
| Adelie|Torgersen|            3

In [31]:
from pyspark.ml.feature import OneHotEncoder

ohe = OneHotEncoder(inputCols=["island_idx", "sex_idx"], outputCols=["island_enc", "sex_enc"])
model = ohe.fit(td)

enc = model.transform(td)
enc['island', 'sex', 'island_enc', 'sex_enc'].show()

+---------+------+-------------+-------------+
|   island|   sex|   island_enc|      sex_enc|
+---------+------+-------------+-------------+
|Torgersen|  MALE|    (2,[],[])|    (1,[],[])|
|Torgersen|FEMALE|    (2,[],[])|(1,[0],[1.0])|
|Torgersen|FEMALE|    (2,[],[])|(1,[0],[1.0])|
|Torgersen|FEMALE|    (2,[],[])|(1,[0],[1.0])|
|Torgersen|  MALE|    (2,[],[])|    (1,[],[])|
|Torgersen|FEMALE|    (2,[],[])|(1,[0],[1.0])|
|Torgersen|  MALE|    (2,[],[])|    (1,[],[])|
|Torgersen|FEMALE|    (2,[],[])|(1,[0],[1.0])|
|Torgersen|  MALE|    (2,[],[])|    (1,[],[])|
|Torgersen|  MALE|    (2,[],[])|    (1,[],[])|
|Torgersen|FEMALE|    (2,[],[])|(1,[0],[1.0])|
|Torgersen|FEMALE|    (2,[],[])|(1,[0],[1.0])|
|Torgersen|  MALE|    (2,[],[])|    (1,[],[])|
|Torgersen|FEMALE|    (2,[],[])|(1,[0],[1.0])|
|Torgersen|  MALE|    (2,[],[])|    (1,[],[])|
|   Biscoe|FEMALE|(2,[0],[1.0])|(1,[0],[1.0])|
|   Biscoe|  MALE|(2,[0],[1.0])|    (1,[],[])|
|   Biscoe|FEMALE|(2,[0],[1.0])|(1,[0],[1.0])|
|   Biscoe|  

We next need to merge all of our features into a a column of vectors to pass it to the classifier.

In [32]:
from pyspark.ml.feature import VectorAssembler

input_columns = [name for name in enc.columns if "species" not in name and 
                 "idx" not in name and
                 "sex" != name and
                 "island" != name]
assembler = VectorAssembler(inputCols=input_columns, outputCol="features")
va = assembler.transform(enc)

va.select("features").show()

+--------------------+
|            features|
+--------------------+
|[39.0999984741210...|
|[39.5,17.39999961...|
|[40.2999992370605...|
|[36.7000007629394...|
|[39.2999992370605...|
|[38.9000015258789...|
|[39.2000007629394...|
|[41.0999984741210...|
|[38.5999984741210...|
|[34.5999984741210...|
|[36.5999984741210...|
|[38.7000007629394...|
|[42.5,20.70000076...|
|[34.4000015258789...|
|[46.0,21.5,194.0,...|
|[37.7999992370605...|
|[37.7000007629394...|
|[35.9000015258789...|
|[38.2000007629394...|
|[38.7999992370605...|
+--------------------+
only showing top 20 rows



Finally, we can split our data into training and test sets and then we are ready to train and evaluate our model

In [33]:
train, test = va.randomSplit([0.8, 0.2], seed=0)
test.show()

+-------+---------+----------------+---------------+-----------------+-----------+------+----------+-------+-----------+-------------+-------------+--------------------+
|species|   island|culmen_length_mm|culmen_depth_mm|flipper_length_mm|body_mass_g|   sex|island_idx|sex_idx|species_idx|   island_enc|      sex_enc|            features|
+-------+---------+----------------+---------------+-----------------+-----------+------+----------+-------+-----------+-------------+-------------+--------------------+
| Adelie|   Biscoe|            38.2|           20.0|            190.0|     3900.0|  MALE|       0.0|    1.0|        0.0|(2,[0],[1.0])|    (1,[],[])|[38.2000007629394...|
| Adelie|   Biscoe|            38.6|           17.2|            199.0|     3750.0|FEMALE|       0.0|    0.0|        0.0|(2,[0],[1.0])|(1,[0],[1.0])|[38.5999984741210...|
| Adelie|   Biscoe|            40.1|           18.9|            188.0|     4300.0|  MALE|       0.0|    1.0|        0.0|(2,[0],[1.0])|    (1,[],[])|[4

#### Random Forest Classification with PySpark

Now that we have encoded our data, we can proceed to train our model

In [34]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol ="features", labelCol ="species_idx")
rf_model = rf.fit(train)
predictions = rf_model.transform(test)

Now that our model is trained, we can evaluate it using Spark's `MulticlassClassificationEvaluator`.

In [35]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="species_idx", predictionCol="prediction")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %s" % (accuracy))
print("Test Error = %s" % (1.0 - accuracy))

Accuracy = 1.0
Test Error = 0.0


### Further Reading

- [RDD Guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html)
- [SQL Guide](https://spark.apache.org/docs/latest/sql-programming-guide.html)
- [MLlib Guide](https://spark.apache.org/docs/latest/ml-guide.html)
- [PySpark API](https://spark.apache.org/docs/latest/api/python/index.html)