

<style>

@font-face {
    font-family: "Computer Modern";
    src: url('http://mirrors.ctan.org/fonts/cm-unicode/fonts/otf/cmunss.otf');
}
#notebook_panel { /* main background */
    background: #888;
    color: #f6f6f6;
}
#notebook li { /* More space between bullet points */
margin-top:0.8em;
}
div.text_cell_render{
    font-family: 'Arvo' sans-serif;
    line-height: 110%;
    font-size: 135%;
    width:1000px;
    margin-left:auto;
    margin-right:auto;
}

</style>


<center>

<p class="gap05"<p>
<h1>Introduction to</h1>
</center>
<center>
<img src="images/spark-logo.png" alt="Spark Logo" >


</center>
<p class="gap05"<p>

<p class="gap05"<p>
<center>
<h3>Darrell Aucoin</h3>
</center>
<center>

<h3>CIBC: Data Engineer Manager's WorkZone Project</h3>

</center>

<style type="text/css">
.input_prompt, .input_area, .output_prompt 
</style>

Source: Learning Spark - O'Reilly Media

# Introduction

There are several computational environments:  
- __Shared CPU, RAM, and Harddrive__: Single core computers
- __Shared RAM and Harddrive__: Modern multi-core computers
- __Shared Harddrive__: Some server clusters (sharcnet)
- __Nothing Shared__: Hadoop Clusters

# MapReduce Overview

__Definition.__ _MapReduce_ is a programming paradigm model of using parallel, distributed algorithims to process or generate data sets. MapReduce is composed of two main functions:

__Map(k,v)__: Filters and sorts data.

__Reduce(k,v)__: Aggregates data according to keys (k).

MapReduce is broken down into several steps:

1. Record Reader
2. Map
3. Combiner (Optional)
4. Partitioner
5. Shuffle and Sort
6. Reduce
7. Output Format

![alt text](images/MapReduce.gif)

## MapReduce Phases

1. __Record Reader__ Translates an input into records to be processed by the user-defined map function in the form of a key-value pair on each map cluster. 

2. __Map__ _User defined function_ outputing intermediate key-value pairs for the reducers

    - __key__ ($k_{2}$): Later, MapReduce will group and possibly aggregate data according to these keys, choosing the right keys is here is important for a good MapReduce job.

    - __value__ ($v_{2}$): The data to be grouped according to it's keys.

1. __Record Reader__
2. __Map__
3. __Combiner__ _User defined function_ that aggregates data according to intermediate keys on a mapper node
    - This can usually reduce the amount of data to be sent over the network increasing efficiency
    $$\left.\begin{array}{r}
\left(\mbox{"hello world"},1\right)\\
\left(\mbox{"hello world"},1\right)\\
\left(\mbox{"hello world"},1\right)
\end{array}\right\} \overset{\mbox{combiner}}{\longrightarrow}\left(\mbox{"hello world"},3\right) $$

4. __Partitioner__ Sends intermediate key-value pairs (k,v) to reducer by
    $$\mbox{Reducer}=\mbox{hash}\left(\mbox{k}\right)\pmod{R}$$  
5. __Shuffle and Sort__ On reducer node, sorts by key to help group equivalent keys
6. __Reduce__ _User Defined Function_ that aggregates data (v) according to keys (k) to send key-value pairs to output
7. __Output Format__ Translates final key-value pairs to file format (tab-seperated by default).

![alt text](images/MapReduce.gif)

# Spark's DAG Model
A more flexible form of MapReduce is used by Spark using __Directed Acyclic Graphs (DAG)__. 
![alt text](images/DAG.png)  

For a set of operations:  
1. Create a DAG for operations
2. Divide DAG into tasks
3. Assign tasks to nodes

# Spark Overview
Apache Spark is an alternative to the MapReduce in the Hadoop environment. 

Where Hadoop MapReduce write to harddrive after every MapReduce operation, Spark never writes to harddrive until explicitly told to.

Spark is more flexiable and faster than YARN (Hadoop MapReduce) allowing abstractions of what is to be done and letting the Spark engine optimize the query.

## Popular Use Cases

- Processing in parallel large data sets distributed across a cluster

- Performing ad hoc or interactive queries to explore and visualize data sets

- Building, training, and evaluating machine learning models using MLib

- Implementing end-to-end data pipelines from myriad steams of data

- Analyzing graph data sets and social networks

## Spark Modules
Modules built on Spark:  
- __Spark SQL and DataFrames/Datasets__: support for structured data and relational queries

- __Spark Streaming__: processing real-time data streams

- __MLlib__: built-in machine learning library

- __GraphX__: Spark’s API for graph processing

## Spark SQL and DataFrames/Datasets

__RDD (Resilient Distributed Dataset)__: RDDs are a fault-tolerant distributed collection of items distributed across many compute nodes and can be manipulated in parallel. 

- If a portion of the RDD fails then only a portion of RDD needs to be recalculated from previous RDDs.

__Spark DataFrames__: Are built on top of RDDs and have commonly used optimized functions for data manipulation. These are designed for stuctured data like csv, JSON, and text data.

- DataFrames and Datasets have their APIs unified in the version of Spark we are using, thus I will simply refer to them as DataFrames.

- DataFrames can be named as a table to enable SQL to perform queries on them.

```python
table = spark.read.json("s3://..../filename.json") \
        .createOrReplaceTempView("table_name")
results = spark.sql("select * from table_name")

```

## Spark Streaming
Spark Streaming enables data to be continously added as new rows to a table, where developers can issue queries against like a regular DataFrame.

```python
from pyspark.sql.functions import explode, split

lines = (spark 
        .readStream 
        .format("socket") 
        .option("host", "localhost") 
        .option("port", 9999) 
        .load())
words = lines.select(explode(split(lines.value, "")).alias("word"))
word_counts = words.groupBy("word").count()
query = (word_counts
        .writeStream
        .format("kafka")
        .option("topic", "output"))
```

## Machine Learning (MLib)
MLib is the Machine Learning component built within Spark. This enables various Machine Learning Models to be training on Big Data in a distributed setting.

```python
from pyspark.ml.classification import LogisticRegression

training = spark.read.csv("s3://.../filename_train.csv")
test = spark.read.csv("s3://.../filename_test.csv")
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
lrModel = lr.fit(training)
lrModel.transform(test)
```

## Graph Processing GraphX
GraphX is the Spark library for manipulating graphs (social network graphs, network topology graphs) and performing graph paralllel computations.

Scala
```scala
val graph = Graph(vertices, edges)
messages = spark.textFile("hdfs://filename")
val graph2 = graph.joinVertices(messages) {
(id, vertex, msg) => ...}
```

# Spark Basic Concepts

## Transformations and Actions

There are two types of RDD operations: transformations and actions.

__Transformations__ construct a new RDD from the previous one. (mapping, filtering, etc.)
- A list of transformations is set onto a DataFrame to make a new DataFrame

__Actions__ compute a result based on an RDD and either return it to the driver program or save it to a file.
- Spark constructs a Directed Acyclic Graph (DAG) based on transformations and only groups them into stages and tasks when an action is called.

__Note__: Reminder that DataFrames are a type of RDDs and everything that can be said about RDDs applies to DataFrames as well.

- __No computations or memory is used__ until an action is called (save to HDD, report back to user, etc.)

- Each time an action uses a RDD, it has to use all of the transformations before it. This is important when feeding a DataFrame into a Machine Learning Model
    - Unless it's cached (saved into memory)

### Examples of Transformations and Actions

| Transformations | Actions   |
|-----------------|-----------|
| `orderBy()`       | `show()`    |
| `groupBy()`       | `take()`    |
| `filter()`        | `count()`   |
| `select()`        | `collect()` |
| `join()`          | `save()`    |

## Distributed Execution Components

__Spark Driver__: Instantiates the SparkSession.

- Communicates with the cluster manager.

- Requests resources from cluster manager for executors.

- Transforms all Spark operations into DAG computations.

- Schedules operations and distributes thems as tasks on Spark executors.

__SparkSession__: Conduit for all Spark operations and data.

- creates JVM runtime parameters.

- defines DataFrames.

- reads from data sources.

- accesses catalog metadata.

- issues Spark SQL queries.

__Cluster Manager__: Manages and allocates resources for the cluster of nodes.

- Can use built-in standalone cluster manger, or Apache Hadoop, YARN, Apache Mesos, and Kubernets.

__Spark Executor__: Runs on each worker cluster node and executes task on said workers.

- Communicates with the driver program.

## Spark Terms

__Application__: User program built on Spark using it's APIs.

__SparkSession__: Point of entry to interact with Spark functionality.

__Job__: Parallel computation consisting of mutiple tasks that gets spawned in response to a spark action.

__Stage__: Each job gets divided into smaller sets of tasks called stages which can be operated within one worker.

__Task__: Single unit of work or execution that will be sent to a Spark executor.

## Spark Operation
First a `SparkSession` is created and an application is sent to the Spark driver. The Spark driver then converts the application into one or more spark jobs, and each job into a DAG. 

The DAG is then broken down into stages based on what operations can be performed serially or in parallel.

Each Stage is composed of Spark tasks (units of execution). This is federated across the Spark exectors and each task maps to a single core and partition of the data.

- These transformations does not alter the original data, rather than change the values in the DataFrame, a new DataFrame is created from the last one. They are also evaluated lazily (only executed after an action is issued).
    - This is an important feature that creates fault tolerance and parallelization for Spark jobs

### Narrow and Wide Transformations
__Narrow transformations__ are transformations that are row independent. Data does not have to be exchanged over the network for this evaluation.

- `filter()`, `contains()`

__Wide transformations__ in contrast require data exhanged over the network. This is because there is a dependancy of one row to another.

- `groupBy()`, `orderBy()`

## Deployment Modes
| Mode           | Spark driver                   | Spark executor                       | Cluster manager                                                                     |   |
|----------------|--------------------------------|--------------------------------------|-------------------------------------------------------------------------------------|---|
| local          | Single JVM                     | same JVM as driver                   | same host                                                                           |   |
| Standalone     | Any node on the cluster        | JVM for each node                    | allocated ot any host in cluster                                                    |   |
| YARN (client)  | on client, not part of cluster | YARN's NodeManger's container        | YARN's Resource Manager and Applicaiton Master allocates containers on NodeManagers |   |
| YARN (cluster) | with YARN Application Master   | same as client                       | same a client                                                                       |   |
| Kubernetes     | in Kubernetes pod              | each worker runs within it's own pod | Kubernetes Master                                                                   |   |

- Uses data locality to minimize network bandwith 

## Initializing a SparkConcept
```python
from pyspark.sql import SparkSession

spark = SparkSession \
        .builder \
        .appName("Python Spark SQL basic example") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
```

# Spark Web UI

When Spark is deployed, the Spark driver launchs a web UI, running by default on port 4040, where you can view metrics and details of it's operations.

Details like:
- A list of scheduler stages and tasks

- A summary of RDD sizes and memory usage

- Information on the environment

- Information about he running executors

- All the Spark SQL queries exectued

In local mode: http://localhost:4040

• https://databricks.com/try to use databrick's Spark's community edition

### Example
Based around Stats Club presentation example I did at University of Waterloo: https://github.com/DarrellAucoin/IntroSQL/wiki

The problem is we are trying to solve is to give a budget report including when Stats Club went over budget on it's events.

In [1]:
import pandas as pd
pd.read_csv("data/event.csv")

Unnamed: 0,name,type,presenter,organizer,poster,start_time,end_time,location,budget,Description
0,Begining of Term,social,,judith,judith,2015-01-28 19:00:00,2015-01-28 22:00:00,C & D,90.0,Come and play games with your fellow stats Clu...
1,End of Term,social,,dominick,dominick,,,,160.0,End of Term social at a local Pub. A joint eve...
2,Intro to Hadoop,educational,darrell,darrell,gilberto,2015-03-25 14:30:00,2015-03-25 16:00:00,M3-3103,90.0,Hadoop is a distributed computing system desig...
3,Intro to SQL,educational,darrell,darrell,patrick,2015-02-05 18:00:00,2015-02-05 19:30:00,MC-3003,90.0,SQL is a relational database language and alon...
4,Prof Talk: Machine Learning,educational,Chong Zhang,patrick,dominick,2015-03-03 15:00:00,2015-03-03 16:00:00,M3-2134,90.0,Machine Learning and Data Mining: How We can C...
5,Prof Talk 2,educational,,judith,judith,,,,90.0,
6,Intro to SQL: Basic Queries,educational,darrell,darrell,darrell,2015-03-09 18:00:00,2015-03-09 19:30:00,MC-3003,100.0,SQL is a relational database language and alon...
7,Intro to SQL: Advanced Queries,educational,darell,darrell,darrell,2015-03-12 18:00:00,2015-03-12 19:30:00,MC-3003,100.0,SQL is a relational database language and alon...


In [2]:
pd.read_csv("data/expenses.csv")

Unnamed: 0,event,expense,price
0,Intro to SQL,pizza,87.43
1,Intro to SQL,pop,15.34
2,Begining of Term,pop,13.23
3,Begining of Term,pizza,45.34
4,End of Term,pop,23.23
5,End of Term,veggie platter,25.23
6,End of Term,fries,21.21
7,Intro to Hadoop,coffee,23.12
8,Intro to Hadoop,water,10.23
9,Intro to Hadoop,donuts,53.23


First start up the `SparkSession`

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DecimalType, TimestampType
from pyspark.sql.functions import expr

spark = (SparkSession
        .builder
        .appName("sparkExample")
        .getOrCreate())

Now we'll create the DataFrames for event and expenses. 

We will also create the schema as well. For this particular example we don't need to, however with the schema specified we don't need use up spark resources for infering the types and we can trouble shoot potential problems here before they become problems later on.

In [4]:
eventSchema = StructType([     
    StructField("name",StringType(),False),     
    StructField("type",StringType(),False), 
    StructField("presenter",StringType(),True), 
    StructField("organizer",StringType(),True), 
    StructField("poster",StringType(),True), 
    StructField("start_time",TimestampType(),True), 
    StructField("end_time",TimestampType(),True), 
    StructField("location",StringType(),True),  
    StructField("budget",DecimalType(precision=10, scale=2),True), 
    StructField("description",StringType(),True)])

eventDF = spark.read.csv("data/event.csv", header = True, schema = eventSchema, 
                         timestampFormat = "yyyy/MM/dd HH:mm:ss")
eventDF.createOrReplaceTempView("event")
eventDF.printSchema()

root
 |-- name: string (nullable = true)
 |-- type: string (nullable = true)
 |-- presenter: string (nullable = true)
 |-- organizer: string (nullable = true)
 |-- poster: string (nullable = true)
 |-- start_time: timestamp (nullable = true)
 |-- end_time: timestamp (nullable = true)
 |-- location: string (nullable = true)
 |-- budget: decimal(10,2) (nullable = true)
 |-- description: string (nullable = true)



In [5]:
expensesSchema = StructType([ 
    StructField("event",StringType(),False), 
    StructField("expense",StringType(),False), 
    StructField("price",DecimalType(precision=10, scale=2),False)])

expensesDF = spark.read.csv("data/expenses.csv", header = True, schema = expensesSchema) 
expensesDF.createOrReplaceTempView("expenses")
expensesDF.printSchema()

root
 |-- event: string (nullable = true)
 |-- expense: string (nullable = true)
 |-- price: decimal(10,2) (nullable = true)



To solve this we need to sum up all the expenses for each event and then join this with the events table.

In [6]:
expenses = expensesDF.groupBy("event").sum("price") \
            .withColumnRenamed("sum(price)", "expense") 
event = eventDF.select("name", "type", "budget")
budget_report = expenses.join(event, expenses.event == event.name, "leftouter") \
                .select("event", "type", "budget", "expense") \
                .withColumn("warning", 
                            expr("""CASE WHEN budget - expense < 0 THEN 'Over budget'
                            ELSE NULL END"""))

In [7]:
budget_report.show()

+--------------------+-----------+------+-------+-----------+
+--------------------+-----------+------+-------+-----------+
|     Intro to Hadoop|educational| 90.00|  96.81|Over budget|
|Intro to SQL: Bas...|educational|100.00|  62.63|       null|
|         End of Term|     social|160.00| 160.65|Over budget|
|         Prof Talk 2|educational| 90.00|  77.21|       null|
|Intro to SQL: Adv...|educational|100.00|  62.63|       null|
|    Begining of Term|     social| 90.00|  58.57|       null|
|Prof Talk: Machin...|educational| 90.00|  82.87|       null|
|        Intro to SQL|educational| 90.00| 102.77|Over budget|
+--------------------+-----------+------+-------+-----------+



In [8]:
budget_report.printSchema()

root
 |-- event: string (nullable = true)
 |-- type: string (nullable = true)
 |-- budget: decimal(10,2) (nullable = true)
 |-- expense: decimal(20,2) (nullable = true)



Since we created a view of the original tables (`eventDF.createOrReplaceTempView("event")`, `expensesDF.createOrReplaceTempView("expenses")`) we can also pass a text SQL query through `spark.sql`.

In [9]:
df = spark.sql("""WITH cost (event, expenses) AS
                    (SELECT event, SUM(price) AS expenses
                    FROM expenses
                    GROUP BY event)
                SELECT e.name, e.type, e.budget, cost.expenses,
                    CASE
                    WHEN e.budget - cost.expenses < 0 THEN 'Over budget'
                    ELSE NULL
                    END AS warning
                FROM event AS e RIGHT OUTER JOIN cost ON e.name = cost.event""")
df.show()

+--------------------+-----------+------+--------+-----------+
+--------------------+-----------+------+--------+-----------+
|     Intro to Hadoop|educational| 90.00|   96.81|Over budget|
|Intro to SQL: Bas...|educational|100.00|   62.63|       null|
|         End of Term|     social|160.00|  160.65|Over budget|
|         Prof Talk 2|educational| 90.00|   77.21|       null|
|Intro to SQL: Adv...|educational|100.00|   62.63|       null|
|    Begining of Term|     social| 90.00|   58.57|       null|
|Prof Talk: Machin...|educational| 90.00|   82.87|       null|
|        Intro to SQL|educational| 90.00|  102.77|Over budget|
+--------------------+-----------+------+--------+-----------+



When everything is done, be nice to your database administrator and close the session.

In [10]:
spark.stop()

# Questions

In [11]:
#sudo jupyter nbconvert IntroSpark.ipynb --to slides --post serve