### Introduction

We often set out to learn and/or use a particular API or library in the context of having to get something done quickly to meet a business objective.  Being time crunched, we commonly fall into the mindset of treating things as ***black boxes***.  In other word, we just make sure that we give the black box the right input and checking that it gives us a reasonable output.

<img src='00_black_box.png'>

This hacking mentality works fine for most APIs, but applying this approach to **Spark** can quickly rob you of the performance improvements it can provide.  It is common to implement Spark in an analysis or application and not see any significant performance improvement.  This is often because the developer isn't using the API in the way it was designed to be used.


The main focus of this presentation is to leave you with enough of an understanding of how **Spark** does what it does so that when you actually start using it, you'll have a good sense for how to use it in a way that gives you the significant performance gains that motivated you to use Spark in the first place.

### Acknowledgement

Most of the content of this presentation came from lecture materials from this class:

https://www.edx.org/course/big-data-analytics-using-spark



## Spark - What problems does it try to solve?

Two big problems with "big" data:

### 1. Memory capacity

How does data that's larger than the memory capacity of your local system get processes?

+ buy more memory
+ abstract memory to use hard drive space: virtual memory
+ distributed computing with specialized hardware (the early and expensive solution)
+ distributed computing with comodity hardware (the modern, cost-effective solution)
  - This is what **Hadoop** and **Spark** enable us to do.

### 2. Computational throughput

Even after addressing the memory issue, what about speed?  Even with hardware capable of distibuted computing, making good use of this hardware to run computations effciently has typically required abandoning high-level languages like Java, Python, R etc. for more low-level languages that can control how the hardware does its job e.g. machine language.

### What Spark brings to the table

+ Spark provides APIs for high-level languages to implement parallel processing on large datasets.
+ It's generally much faster than **Hadoop** which uses disks for storage and depends on disk read and write speeds
+ Spark does much of its processing in memory and tries to minimize disk reads and writes.
+ See the chart in the Appendix for a detailed comparison.

## Power of Spark comes from minimizing latencies

What are sources of latency?

<img src="./01.02b Latencies and Storage Types_small.png">



## Memory Locality

+ Access locality refers to the ability of software to make good use of the cache and memory
  - cache = super fast, but small memory (L1 and L2) in CPU holding commands and data from prior processing requests
+ Memory is broken up into pages.
+ Software that uses the same or neighboring pages repeatedly has good access locality.
+ Hardware is designed to speed up such software

## Two types of Locality

<img src="01.03b slide - temporal locality_small.png">

Temporal locality means that you're using the same elements again and again in quick succession.  The degree of this reuse is the degree to which we have temporal locality.

<img src="01.03c slide - spatial locality_small.png">

Spatial locality is minimizing the physical distance between elements used in a calculation.

Linked lists (shown below) make good use of space, but have poor spatial locality.  Indexed arrays are typically layed out in consecutive memory locations which gives them good spatial locality.  A similar array to the one shown below would be allocated on two pages instead of 4.

<img src="01.03d slide - linked list vs array_small.png">

<img src="./01.04b slide - basic idea behind cache_small.png">

### Cache Hits are fast

If CPU needs something that is already in cache, it doesn't need to go off and grab it from storage:

<img src="01.04c slide - cache hit_small.png">

### Cache Misses are slow

Three steps in processing a cache miss

1. choose byte in cache to drop
2. delete the item in the cache
3. read in the requested byte

<img src="01.04e slide - cache miss1,2and3_small.png">

### Cache is effective if most accesses are hits

## Word count example

### Unsorted list of words

<img src="./01.04h slide - unsorted = poor locality_small.png">

+ temporal locality for very common words like "the"
+ unsorted list has no spatial locality - lots of jumping around to find words

### Sort list of words / good locality

<img src="./01.04i slide - sorted = good locality_small.png">

## Row vs. Column Major Layout

The way you traverse a 2D array effects speed.

+ numpy arrays are row-major order by default
+ consider the following array:

<img src="01.05b slide - row major_small.png">

+ in the real-world, column elements may be 1000's of locations apart
+ scanning the array row by row is mor local than scanning column by column
+ locality implies speed

This is an example provided by my instructor:

<img src="01.05d slide - example row vs col scan speed diff_small.png">

I saw a smaller difference when I tied something similar below.  The lecture is a few years old, so maybe the numpy project has done some optimizations?

In [13]:
import numpy as np

In [14]:
%%time
n = 2000
a_row = np.arange(n) / n
a = np.arange(n)

# make a big matrix
for i in range(n-1):
    a = np.vstack((a, a_row))
    
print(a.shape)

(2000, 2000)
Wall time: 17.4 s


In [15]:
a[:10, :10]

array([[0.0e+00, 1.0e+00, 2.0e+00, 3.0e+00, 4.0e+00, 5.0e+00, 6.0e+00,
        7.0e+00, 8.0e+00, 9.0e+00],
       [0.0e+00, 5.0e-04, 1.0e-03, 1.5e-03, 2.0e-03, 2.5e-03, 3.0e-03,
        3.5e-03, 4.0e-03, 4.5e-03],
       [0.0e+00, 5.0e-04, 1.0e-03, 1.5e-03, 2.0e-03, 2.5e-03, 3.0e-03,
        3.5e-03, 4.0e-03, 4.5e-03],
       [0.0e+00, 5.0e-04, 1.0e-03, 1.5e-03, 2.0e-03, 2.5e-03, 3.0e-03,
        3.5e-03, 4.0e-03, 4.5e-03],
       [0.0e+00, 5.0e-04, 1.0e-03, 1.5e-03, 2.0e-03, 2.5e-03, 3.0e-03,
        3.5e-03, 4.0e-03, 4.5e-03],
       [0.0e+00, 5.0e-04, 1.0e-03, 1.5e-03, 2.0e-03, 2.5e-03, 3.0e-03,
        3.5e-03, 4.0e-03, 4.5e-03],
       [0.0e+00, 5.0e-04, 1.0e-03, 1.5e-03, 2.0e-03, 2.5e-03, 3.0e-03,
        3.5e-03, 4.0e-03, 4.5e-03],
       [0.0e+00, 5.0e-04, 1.0e-03, 1.5e-03, 2.0e-03, 2.5e-03, 3.0e-03,
        3.5e-03, 4.0e-03, 4.5e-03],
       [0.0e+00, 5.0e-04, 1.0e-03, 1.5e-03, 2.0e-03, 2.5e-03, 3.0e-03,
        3.5e-03, 4.0e-03, 4.5e-03],
       [0.0e+00, 5.0e-04, 1.0e-03, 1.

In [16]:
%%time

# scan column by column
s=0
for i in range(n): s += sum(a[:,i])

print(s)

3997000.499999999
Wall time: 881 ms


In [17]:
%%time

# scan row by row
s=0
for i in range(n): s += sum(a[i,:])

print(s)

3997000.5
Wall time: 855 ms


## Memory vs SSD Experiments

<img src="01.06a slide - latency experiments table of result_small.png">

## The Memory Heirarchy

<img src="01.07b slide - the memory hierarchy_small.png">

+ Real system have several large storage types:
  - Top of hierachy: small and fast storage close to CPU
  - Bottom of hierarchy: large and slow storage further from CPU
+ Caching is used to transfer data between different levels fo the hierarchy.
+ To the programmer / compiler does not need to know
  - The hardware provides an <font style="color: red; font-weight: bold">abstraction</font>: memory looks like a single large array
+ But performance depends on program's access pattern.

## From Memory Locality in a Single Machine to Many Machines

<img src="01.07c slide - clusters extend the memory hierarchy_small.png">

## Resilient Distributed Dataset (RDD) - Introduction

+ The special thing about data processing clusters is that they share their memory.
+ Locality in this context means that the data the CPU needs for its computation resides on the same computer as the CPU.
+ Since this is not always the case, we need a mechanism for tranferring data between computers
+ Preferably, this mechanism should be hidden from the developer.
+ In Spark, this system abstraction is called a <font style="color: red; font-weight: bold">resilient distibuted dataset, or RDD</font>

## Sizes and latencies in a typical memory hierarchy

<img src="01.07d slide - sizes of latencies in a typical memory hierarchy_small.png">

## Making History: Google 2003

+ Lary Page and Sergey Brin develop a method for storing very large files on multiple <font style="color: red; font-weight: bold">commodity</font> computers.
+ Each file is broken in fixed-sized **chuncks**.
+ Each chunk is stored on <mark>multiple</mark> **chunk servers**.
+ The location of the chunks is managed by the **master**

<img src="01.08f slide - large files broken into chunks_small.png">

## HDFS: Distibuting Chunks

<img src="01.08g slide - chunks are distributed randomly_small.png" width="500" height="600">

<img src="01.08h slide - chunks are distributed randomly_small.png" width="500" height="600">

## Properties of GFS/HDFS

+ <font style="color: red; font-weight: bold">Commodity Hardware:</font> Low cost per byte of storage.
+ <font style="color: red; font-weight: bold">Locality:</font> data stored close to CPU.
+ <font style="color: red; font-weight: bold">Redundancy:</font> can recover from server failures.
+ <font style="color: red; font-weight: bold">Simple abstraction:</font> looks to user like standard file system (files, directories, etc.) Chunk mechanism is hidden.

## Locality

+ tempting to process on just the top or middle server shown below
+ better solution is to process the two chunks on two different servers
  - Map-Reduce allows us to do this which we'll discuss shortly

<img src="01.08k slide - locality with HDFS_small.png">

## Redundancy

If the middle server goes down, we can recover without any loss of information.

<img src="01.08j slide - redundancy with HDFS_small.png">

## Map-Reduce

+ HDFS is a <font style="color: red; font-weight: bold">storage</font> **abstraction**
+ <font style="color: blue; font-weight: bold">Map-Reduce</font> is a <font style="color: red; font-weight: bold">computation</font> **abstraction** that works well ith HDFS
+ Allows developers to specify parallel computation without knowing how the hardware is organized.
+ Foundation for Hadoop and Spark

## Two ways to do the same thing

Here's a simple example of computing the squares of numbers in a list.  They give the same result, but the difference in **how** the result is computed is important.

In [18]:
L = [0,1,2,3]

In [19]:
S=[]
# usual way to compute squares
for i in L:
    S.append(i*i)
    
# or similarly, using list comprehension
# S = [i * i for i in L]
print(S)

[0, 1, 4, 9]


In [20]:
S=[]
# other way - map(fun, iter)
S = map(lambda x: x*x, L)
print(list(S))

[0, 1, 4, 9]


## An Unnecessary Constraint

Doing this simple operation in the **usual way** is telling the computer what order to do the computation.  But this order constraint is not necessary.  By specifying the computation the **other way**, we give the interpreter an opportunity to take advantage of parallel processing.

## Getting a feel for Map-Reduce

In [21]:
# sum of squares the usual way

sum_squares = sum([i*i for i in L])
print(sum_squares)

14


In [22]:
# sum of square as an execution plan

import functools as ft
# ft.reduce() takes in a function and a list as an argument.
# The function is called with a lambda function and an iterable and a new reduced result is returned
#     function ---------------------v,              v------- generates list of square as shown above
sum_squares_ex_plan = ft.reduce((lambda x,y: x+y), map(lambda i: i*i, L))
print(sum_squares_ex_plan)

14


## Q1. Why is this a wrong way?

In [None]:
## THE WRONG WRONG WRONG WAY!!!

sum_squares_wrong_way = ft.reduce(lambda x,y: x + y*y, L)
print(sum_squares_wrong_way)

## Why Order Independence?

To compute things in parallel, we have to organize our computation such that order doesn't matter.  For example, look at a sum:

<img src="./Order independence, sum example.png">

+ Computation order can be chosen by compiler/interpreter/optimizer
+ Allows for **parallel computation** on subsets of data
  - Modern hardware calls for parallel computation, but parallel computation is very hard to program
+ Using map-reduce, developer **exposes** to the compiler opportunities for parallel computation.
  - telling the compiler:  _"I don't care what order you do the sum (or whatever calculation). I just want the correct final result."_


## Distributed Computing Evolution

<img src="./Google FS 2003.png" width="700" height="850">

+ Master knows where all the data is
+ data itself is chopped up into chunks
+ and distibuted across chunk servers as described earlier

## Google Map-Reduce, 2004

<img src="01.10b slide - Google Map-Reduce 2004_small.png" width="700" height="850">

## Apache Hadoop, 2006

<img src="01.10c slide - Apache Hadoop 2006_small.png">

## Apache Spark, 2014

+ Matei Zaharia, MPLab, Berkeley (Now in MIT)
+ Main difference from Hadoop: maximizes memory uses instead of just using disk

## Spark, java, scala & python

+ The native language of the Hadoop eco-system (predecessor to Spark) is Java
+ Spark can  be programmed in Java, but code tends to be long
+ **Scala** (another JVM-based language) allows parallel programming to be abstracted. It's the core language of Spark
  - The main problem with Scala is that is has a small user base.
  - Need to learn Scala if you want to extend Spark.
+ **PySpark** provides the Python API for Spark programming
  - Does not always achieve the same efficiencies but is easier to learn

## Spark Context

+ The pyspark program runs on the main node
+ cont w/ 01.10f content

## Resilient Distibuted Dataset (RDD) - Main Spark Data Structure

+ A list whose elements are distibuted over several computers
+ When in RDD form, the elements of the list can be manipulated only through RDD specific methods
  - Why we need to do this will become clear shortly
+ RDDs can be created from a list on the master node or from a file.
+ RDDs can be translated back to a local list using `collect()` method.

## Basic Examples

Sum of squares:  
<img src="01.10h slide - basic sum of squares example_small.png">

Squares of a list:  
<img src="01.10j slide - RDD to RDD transform example_small.png">

Both of these examples **materialize** the results.  More on materialization shortly.

## Cheap Checks

As we develop Spark, we need to adopt a _computationally-frugal_ mindset.  Instead of displaying the entire result, consider displaying the first or the first `n` items or even taking a sample using:

+ `RDD.first()`
+ `RDD.take(n)`
+ `RDD.sample(withReplacement, fraction, seed=None)`

## Hardware Organization

<img src="0111a_hardware_organization.png">

### In local installation, cores serve as master & slaves

## Software Organization

<img src="0111b_spatial_software_organization.png">

+ Each RDD is partitioned among the workers
+ Workers manager **partitions** and **executors**
+ Executors execute **tasks** on their partitions, are myopic (only focused on their chunk of processing)

## Spatial Organization (more detail)

<img src="0111d_spatial_organization_details_small.png">

+ SparkContext (sc) is the abtractionthat encapsulates the cluster for the driver node (and the developer).
+ Worker nodes manage resources in a single slave machine.
+ Worker nodes communicate with the cluster manager.
+ Executors are the processes that can perform **tasks**.
+ Cache refers to the local memory on the slave machine.

## Materialization 

+ An RDD is essentially an execution plan until a result needs to be stored in memory.
+ Results that need to strored in memory are said to be **materialized**.

## RDD Processing

<img src="01.11f slide - RDD Processing_small.png">

+ RDDs by default, are not materialzized
+ They do materialize if cached or otherwise persisted.

## Temportal Organization - RDD Graph and Physical Plan

<img src="01.11h slide - Temporal vs Spatial organization_small.png">

## Sumary of Terms and concepts of execution - _keep the earlier pictures in mind_

+ RDDs are **partitioned** across workers.
+ RDD graph defines the **lineage** of the RDDs.
+ SparkContext divides the RDD graph into **stages** which defines the execution plan (corresponding to physical plan)
+ A **task** corresponds to _one stage_ and is restricted to _one partitions_.
+ An **executor** is a process that performs tasks

## Three groups of Spark commands

+ **Creation:** RDD from files, databases, or data on driver node.
+ **Transformations:** RDD to RDD (no materialization)
+ **Actions:** RDD to data on drive node, databases, files (causes materialization)

### _"Nobody works with RDDs"_

+ Spark Dataframe = RDD + schema (meta-data)
+ Spark SQL = query large Spark data structures

## Whale Classification

+ Problem Description and EDA notebook
+ Classification notebook
+ Boosted Trees vs. Random Forest
  - which is more suited for Spark?

## Prediction Function Logic

### Correction to image below: subscript on test features should go to 12 (not 10)

### Predictions are made using the following procedure:

1. generate boostrap predictions (columns on right of the image)
2. filter out the values below the 10th percentile and valuse above 90th percentile (minR=0.10, maxR=0.90)
3. use logic defined in the **Calculating predictions** section of the notebook:
  - If respective minimum score and maximum score values are both less than 0, predict -1 (Cuvier's)
  - If respective minimum score value is less than 0 and maximum score value is greater than 0, predict 0 (Unsure)
  - If respective minimum score and maximum score values are both greater than 0, predict 1 (Gervais)

<img src="./whale_classification_logic.jpg">

After accumulating the bootstrap margin scores:

+ sort each row low to high from left to right: line 35 of cell 14
+ filter out the lower 10% (minR=0.1) and upper 90% (maxR=0.9): line 36 of cell 14
+ predict on the test set based on criteria described in the **Calculating predictions** and **Computing the score ranges** sections

## References

+ http://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=rdd#pyspark.RDD 
+ tbd

## Answers to discussion questions

Q1. Map, Reduce should not depend on: order of items in the list (commutativity) or order of operations (associativity).  It is this independence that allows parallel computation.

Q2. tbd

## Appendix

<img src='./00a_hadoop_vs_spark.png'>