<font color=#66c2a4></font>
<font color=#6baed6></font>
<font color=#ef3b2c></font>

- Big Data: <font color=#66c2a4>Volume, Velocity, Variety</font>
- ETL - Extraction - Transformation - Loading
- SQL - Concept of de-normalization - else problem of consistency
- NoSQL - (1) Key-value store; (2) Columnar database; (3) Document Store (e.g. JSON, BSON, XML, PDF); (4) Graph database
- Hadoop - HDFS (partition data), Distributed computing - Fault tolerance if machine fails, MapReduce (processing data), YARN - Spark will replace Hadoop
- Spark 
    - NoSQL database has limited function of analytics
    - Hadoop MapReduce: slow and complex
    - Integrated wit
    
- Spark: Cluster computing platform
    - Provide simple way to parallelize applications aross clusters
    - Hides complexity of distributed systems programming, fault tolerence - you do not have to deal with this
    - Computational engine scheduling, distribution
    
- Tachyon - Sparks technology to store data
- Driver Program (managees jobs and distributes work), Worker Node (bunch of machines)
- Spark is one of the most active Apache project (top level also)
- Why Spark is hot? 
    - <font color=#6baed6>Equivalent to more than  dozen of specialized systems</font>
    - Interactive, batch, and real-time within single framework
    - Better fault tolerence
    - Less code
    - Support multiple languages
    - In memory fast computation
    - High level abstraction
    
- Why Spark is fast?
    - In memory, much faster
        - Disk access
        - Network flow
    - Less code
    - Iterative algorithm benefit
    - 10 to 100 x faster
    
- Resilient Distributed Dataset (RDD)
- Spark Applications:
    - Transformations
    - Action 
    - Accumulators
    
- User code defines a DAG (Directed Acyclic Graph) of RDDs
- Spark web UC: http://localhost/4040
- Skew: Some machines finish work fast, idling so machines can be reassigned

- Spark SQL
- Spark MLlib
- 

- Understanding distributed computing vs single machine computing 

- Virtual Machine is Virtual Box (Oracle)
- BigData University (IBM) virtual machine 

- junhuicai8@gmail.com

[Spark Programming Guide](https://spark.apache.org/docs/latest/programming-guide.html)  
[PySpark API Documentation](https://spark.apache.org/docs/latest/api/python/index.html)

```python
vagrant up
vagrant halt
```

Apachie Spark is a *cluster computing platform*. 

Spark contains is a `computational engine` that is responsible for scheduling, distributing, and monitoring applications consisting of many computational tasks across many worker machines or a computing cluster.

---
![spark-components](./images/stack.png)

**Spark Core** - Contains basic functionaliy of Spark (scheduling, memory management, fault recovery, interacting with storage systems, etc.) and API that defines `resilient distributed datasets (RDDS)`

**Spark SQL** - Spark's package for working with structured data. Allows querying data vis `SQL`, `HQL`

**Spark Streaming** - Spark components that enables processing of live streams of data.

**MLlib** - Machine learning library. Algorithms for classification, regression, clustering, and collaborative filtering.

**GraphX** - Library for manipulating graphs and performing graph-parallel computations.

---

In Spark, computations are expressed through operations on distributed collections (`resilient distributed datasets`) that are automatically parallelized across the cluster. RDDs are Spark's fundamental abstraction for distributed data and computation.

---
Every Spark application consists of a `driver program` that launches various parallel operations on a cluster. The driver program contains an application's main function and defines distributed datasets on the cluster, then applies operations to them. 

Driver programs access Spark through a `SparkContext` object, which represents a connection to a computing cluster. SparkContext is used to build RDDs and various operations can be performed on RDDs.

![distributed-execution](./images/distributed.png)

To run operations, driver programs typically manage a large number of nodes called `excutors`. Spark automatically takes a function and ships it to the executor nodes.

In Spark all work is expressed as either creating new RDDs, transforming new RDDs, or calling operations on RDDs to compute a result. 

---
**Resilient Distributed Dataset** - An RDD is simply an immutable distributed collection of objects. Each RDD is split into multiple `partitions`, which may be computed on different nodes of the cluster. RDD can contain any type of Python, Java, or Scala objects, including user defined classes.

> Think of each RDD as consisting of instructions on how to compute the data that has been built up through transformations.


RDDs can be created in two ways by:
- Loading an external dataset
- Distributing a collection of objects (e.g., a list or set) in their driver programs, i.e. Parallelizing a collection in your driver program.

Once created RDDs offer two types of operations:

- **Transformations** are operations on RDDs that return a new RDD. Most transformations are *element-wise*. As new RDDs are derived from each other using transformations, Spark keeps track of the set of dependencies between different RDDs, called the `lineage graph`

![lineage-graph](./images/lineage.png)

- **Actions** are operations that return a result to the driver program  or write it to a storage and kick off a computation. i.e. compute a result based on an RDD, and either return it to the driver program or save it to an external storage system. 

> Confused whether a given function is a transformation or an action: look at its return type: `Transformations return RDDs`, whereas actions return some other data type.

Transformations and actions are different because of the way spark computes RDDs. Although a new RDD can be defined any time, Spark computes them only in a `lazy` fashion, i.e. the first time they are used in an action. `Lazy` fashion computation reduces wastage of storage space as Spark sees the whole chain of transformations, it computes just the data needed for its result. Transformations on RDDs are lazily evaluated, meaning that Spark will not begin to execute until it sees an action. Lazy evaluation means that when a transformation is called on an RDD, the operation is not immediately performed. Instead, Spark internally records metadata to indicate that this operation has been requested. Spark uses lazy evaluation to reduce the number of passes it has to take over data by grouping operatons together.

RDDs by default are `recomputed` from scratch each time an action is run on them. To avoid this inefficiency and to reuse an RDD in multiple actions ask Spark to `persist` it. When Spark `persists` an RDD, the nodes that compute the RDD store their partitions. If a node that has data persisted on it fails, Spark will recompute the lost partitions of the data when needed. 

In Python, the data that persist stores is always searialized, so the default is instead stored in the Java Virtual Machine as pickled objects.

If caching of too much data to fit in memory is attempted, Spark will automatically evict old partitions using a `Least Recently Used (LRU)` cache policy.

> The ability to always recompute an RDD is actually why RDD's are called `resilient`. When a machine holding RDD data fails, Spark uses this ability to recomputed the missing partitions.

---
Every Spark program and shell session will work as follows:

1. Create some input RDDs from external data
2. Transform input RDDs to define new RDDs using `transformations`
3. Ask Spark to `persist` any intermediated RDDs that will need to be reused
4. Launch `actions` to start a parallel computation, which is then optimized and executed by Spark

## Creating RDD

- `sc.textFile(<file>)` - Loading a text file
- `sc.parallelize(<collecion>)` - Simplest way to create RDDs is to take an existing collection and pass it to this method
- `RDD.persist()` - Asking Spark to persist an RDD
- `RDD.unpersist()` - Manually remove RDDs from the cache

## Transformations

- `RDD.map(<function>)` - Transformation takes in a function and applies to each element in the RDD with the result of the function being the new value of each element in the resulting RDD
- `RDD.filter(<function>)` - Transformation takes in a function and returns an RDD that only has elements that pass the filter funtion.
![map-filter](./images/trans-1.png)
- `RDD.flatMap(<function>)` - Transformation takes in a function and applies to each element in the RDD and returns an iterator with return values, i.e. returns multiple output elements for each input element
![flatMap](./images/flatmap.png)

#### Pseudo Set Operations (RDDs are not sets)
- `RDD.distinct()` - Transformation returns a new RDD with only distinct or unique items (expensive!)
- `RDD.union(other)` - Set operation returns an RDD consisting of the data from both sources
- `RDD.intersection(other)` - Set operaton returns only elements in both RDDs
- `RDD.subtract(other)` - Set operation takes in another RDD and returns an RDD that has only values present in the first RDD and not the second
![](./images/setop.png)
- `RDD.cartesian(other)` - Transformation returns all possible pairs of `(a, b)` where `a` is in the source RDD and `b` is in the other RDD (very expensive!)
![](./images/cart.png)
- `RDD.sample(withReplacement, fraction, [seed])` - Sample and RDD with or without replacement

## Actions
- `RDD.reduce(<function>)` - Action takes a function that operates on *two elements of the type in an RDD* and return *a new element of the same type*. Simple example: $+$ used to sum an RDD. 
```python 
RDD.reduce(lambda x, y: x + y)
```
- `RDD.fold()` - 
- `RDD.aggregate(<function>)` - Avoids the constraint of having the return be the same type as the RDD that is being operated on. Need to supply an initial zero value () of the type that is intended to be returned. Then supply a function to combine the elements from RDD with accumulator. Finally supply a second function to merge two accumulator, given that each node accumulates its own results locally.
```python
RDD.aggregate((0, 0),
(lambda k, v: (k[0] + v, k[1] + 1),
(lambda k1, k2: (k1[0] + k2[0], k1[1] + k2[1])))) # Summing count
```
- `RDD.collect()` - Action returns entire RDD's data to driver program. It suffers from the restriction that all your data must fit on a single machine, as it all needs to be copied to the driver.
- `RDD.take(n)` - Action returns `n` elements from the RDD and attempts to minimize the number of partitions it accesses, so it may represent a biased collection. These operations do not return the elements in the expected order.
- `RDD.top(n)` - Action returns `n` top elements from an RDD
- `RDD.takeSample(withReplacement, n, seed)` - Action returns sample of RDD either with or without replacement
- `RDD.foreach(<function>)` - Perform computation on each element in the RDD without bringing it back locally
- `RDD.count()` - Returns a count of the elements in an RDD
- `RDD.countByValue()` - Returns a map of each unique value to its count, i.e. Number of times each element occurs in the RDD
- `RDD.takeOrdered(n, ordering)` - Return `n` elements based on provided ordering

Key/Value RDDs are commonly used to perform aggregations. Often need to do some initial `ETl (Extract-Transform-Load)` to get data into Key/Value format

RDDs containing Key/Value pairs are called `Pair RDDs`

```python
# Extract a Field from an RDD Row object. NOTE: Prediction results are RDDs with Row objects
RDD.map(lambda i: i.Field_Name)

# Minimum value in an RDD
RDD.min() 

# Maximum value in an RDD
RDD.max() 

# Unique items in an RDD
RDD.distinct()

# Count number of items in an RDD
RDD.count()

# Sum RDD values
RDD.sum()

# Sum elements of RDD
RDD.reduce(lambda x, y: x + y)

# For counting purposes use lambda to get tuple: (key, 1)
RDD.map(lambda key: (key, 1)) 

# Check intermediate result (Most useful action!!!)
RDD.take(n) # n = int

# Persist an RDD with default storage level
RDD.cache()

# Test if an RDD is Cached
RDD.is_cached

# Concatinate 2 RDDS (vertically), i.e. Produce an RDD containing elements from both RDDS
RDD.union(otherRDD)

# Produce an RDD containing only elements found in both RDDs
RDD.intersection(otherRDD)

# Combine 2 RDDs by taking cartesian product: RDD = [a], otherRDD = [b]. Cartesian Product = [('a', 'b'), ('a', 'c')]
RDD.cartesian(otherRDD)

## Pair RDDs i.e. RDDs with tuples (key, count)
# Summing RDDs with tuples (key, 1)
pairRDD.reduceByKey(lambda x, y: x + y)

# Filtering RDD with tuples (key, count)
pairRDD.filter(lambda i: i[1] > value)
pairRDD.filter(lambda i: i[0] == string)

# Extract Keys from RDDs with tuples (key, count) - 1
pairRDD.map(lambda i: i[0]) 

# Extract Keys from RDDs with tuples (key, count) - 2
pairRDD.map(lambda (x, y): x) 

# Extract counts from RDDs with tuples (key, count)
pairRDD.map(lambda (x, y): y) 

# flatMap example: [(a, [1, 2]), (b, [3, 2, 4])] to [1, 2, 3, 2, 4]
RDD.flatMap(lambda i: i[1])

## Access Iterable Object elements by converting it to list
# Example: [('a', 12), ('b', 10), ('a', 6), ('c', 3), ('b', 8), ('a', 2), ('b', 5)]
# pairRDD.groupByKey().collect() returns [('a', <iterable_obj_1>), ('b', <iterable_obj_2>), ('c', <iterable_obj_3)]
# Converting Iterable Object to a list
pairRDD.groupByKey().map(lambda i: (i[0], list(i[1])))

# Extracting top 'n' (key, value) pairs - as per keys or values
pairRDD.takeOrdered(n, lambda i: i[1]) # Ascending as per values
pairRDD.takeOrdered(n, lambda i: -1 * i[1]) # Descending as per values - lambda function multiplies the count by -1

# Extracting top 'n' (key, value) pairs - as per key function
pairRDD.takeOrdered(n, key = key_function)

# Convert a pair RDD to a list
pairRDD.collect()

# Convert a pair RDD to a dict
pairRDD.collectAsMap()

# Sort a pair RDD by Key
pairRDD.sortByKey(ascending = True)

# Sort a pair RDD
pairRDD.sortBy(key_function, ascending = True, numPartitions = None)

# Group By Key operation on Pair RDDs
pairRDD.groupByKey() # Generates a pair RDD of type (key, iterator)
groupedPairRDD.map(lambda i: (i[0], sum(i[1]))) # Count
groupedPairRDD.map(lambda i: (i[0], len(set(i[1])))) # Unique value count
groupedPairRDD.map(lambda i: (i[0], float(len(i[1]))/len(set(i[1])))) # Average

## Broadcast Variables
# Creating Broadcast Variables
broadcastVariable = sc.broadcast(v) # v can be: list, dict, tuple

# Accessing Broadcast Variables values
broadcastVariable.value

# Join 2 pair RDDs RDD => (K, V) and otherRDD => (K, W) to get new pair RDD (K, (V, W))
pairRDD.join(otherPairRDD)


# Return each (key, value) pair in an RDD that has no pair with matching key in other RDD
pairRDD.subtractByKey(otherPairRDD)

## Extracting value from a tuple inside a tuple, i.e. Extract K and W from (K, (V, W))
pairRDD.map(lambda (K, (V, W): (K, W))

# Or
pairRDD.map(lambda (K, U): (K, U[0])) # Here U = (V, W)

## Repartition when using a "gzip" file: a single gzipped file cannot be loaded in parallel by multiple tasks, so Spark will load it with 1 task and thus give you an RDD with 1 partition. A gzipped file is not splittable, so Spark will always use 1 task to read the file
RDD = sc.textFile(gzipFileName).repartition(num_Partitions) 

## Lab-3 Spark: "Accumulators", "Left Join"

## Whenever we examine only a subset of a large dataset, there is the potential that the result will depend on the order we perform operations, such as joins, or how the data is partitioned across the workers. What we want to guarantee is that we always see the same results for a subset, independent of how we manipulate or store the data. We can do that by sorting before we examine a subset. You might think that the most obvious choice when dealing with an RDD of tuples would be to use the sortByKey() method. However this choice is problematic, as we can still end up with different results if the key is not unique. A better technique is to sort the RDD by both the key and value, which we can do by combining the key and value into a single string and then sorting on that string.

## Splitting an RDD into Training, Validation and Test sets
RDD.randomSplit([weights], seed_number)


```

### Aggregate By Key

```python
RDD = sc.parallelize([('a', 2), ('b', 5), ('a', 3), ('b', 1), ('c', 4), ('a', 3), ('c', 7)])

aggregateByKeyRDD = RDD.aggregateByKey((0, 0), 
                                        lambda x, y: (x[0] + y,    x[1] + 1),
                                        lambda x, y: (x[0] + y[0], x[1] + y[1]))

print aggRDD.take(3)

> [('a', (8, 3)), ('c', (11, 2)), ('b', (6, 2))]
```

1. First lambda expression: <font color=steelblue>**lambda x, y: (x[0] + y,    x[1] + 1)**</font> is for `Within-Partition Reduction Step`, where:
    - **x** - is a Tuple that holds: **(runningSum, runningCount)**
    - **y** - is a Scalar that holds: the **next value**

2. Second lambda expression: <font color=steelblue>**lambda x, y: (x[0] + y[0], x[1] + y[1])**</font> is for `Cross-Partition Reduction Step`, where:
    - **x** - is a Tuple that holds: **(runningSum, runningCount)**
    - **y** - is a Tuple that holds: **(nextPartitionsSum, nextPartitionsCount)**
    
```python
RDD.aggregateByKey(()
                 lambda function for Within Partition Reduction Step - Accumulator, value,
                 lambda function for Cross Partition Reduction Step - Accumulator-1, Accumulator-2)
```


### Combine By Key

[Reference](http://abshinn.github.io/python/apache-spark/2014/10/11/using-combinebykey-in-apache-spark/)

**combineByKey** Method requires 3 lambda functions:
   - *createCombiner* - First aggregation step for each key. The argument of this function corresponds to the `value` in a `key-value` pair
   - *mergeValue* - Tells `combineByKey` what to do when a combiner is given a new value
   - *mergeCombiner* - Tells `combineByKey` how to merge two combiners

```python
RDD.combineByKey(createCombiner
                 mergeValue,
                 mergeCombiners)
                                
                 
RDD = sc.parallelize([('a', 2), ('b', 5), ('a', 3), ('b', 1), ('c', 4), ('a', 3), ('c', 7)])

# Result: (key, (sum, count))
combineByKeyRDD = RDD.combineByKey(lambda value: (value, 1),
                                   lambda x, value: (x[0] + value, x[1] + 1),
                                   lambda x, y: (x[0] + y[0], x[1] + y[1]))

print combineByKeyRDD.take(3)

> [('a', (8, 3)), ('c', (11, 2)), ('b', (6, 2))]
```
> #### Computing Sum and Count using `combineByKey`
> **`Create Combiner Function: `** <font color=steelblue>**lambda** value: (value, 1)</font> - This function is the first aggregation step for each key. The argument of this function corresponds to the **`value`** in a **`key-value`** pair. For sum and count: create **combiner** to be a tuple in the form of **(sum, count)**. The very first step in this aggregation is then **(value, 1)**, where **value** is the first RDD value that **`combineByKey`** comes across and **1** initializes the count

---
> **`Merge Value Function: `** <font color=steelblue>**lambda** x, value: (x[0] + value, x[1] + 1)</font> - This function tells **`combineByKey`** what to do when a **combiner** is given a **new value**. The arguments to this function are a **`combiner`** and a **`new value`**. For sum and count the **combiner** is defined as a tuple in the form of **(sum, count)**. The **`new value`** is merged by adding it to the first element of the tuple while incrementing **1** to the second element of the tuple

---
> **`Merge Combiners Function: `** <font color=steelblue>**lambda** x, y: (x[0] + y[0], x[1] + y[1])</font> - This function tells **`combineByKey`** how to merge two **combiners**. For sum and count the **combiners** are defined as a tuples in the form of **(sum, count)**. The combiners are merged by adding the first and last elements together

```python

## Create an array
np.array([...])

## Scalar multiplication with an ndarray use: *
value * np.array([...])

## Element-wise multiplication
x * y

## Dot product
np.dot(x, y)

# or
x.dot(y)

## Generate a matrix
np.matrix([[...], [...]])

## Transpose of a matrix
x.T

## Inverting a square matrix. Note: Square matrices are not guaranteed to have an inverse.
from numpy.linalg import inv

# Note: inv(x) * x = I
inv(x)

## Combining ndarrays
# Combine arrays column-wise
np.hstack((x, y))

# Combine arrays row-wise
np.vstack((x, y))

## PySpark's DenseVector (DenseVector is used to store arrays of values for use in PySpark.  DenseVector actually stores values in a NumPy array and delegates calculations to that object. A new DenseVector is created using DenseVector() and passing in an NumPy array or a Python list. Note: DenseVector stores all values as np.float64 and DenseVector objects exist locally and are not inherently distributed.  DenseVector objects can be used in the distributed setting by either passing functions that contain them to resilient distributed dataset (RDD) transformations or by distributing them directly as RDDs.)

from pyspark.mllib.linalg import DenseVector

DenseVector.dot()

## Functional Programming: In functional programming, you will often pass functions to other functions as parameters, and lambda can be used to reduce the amount of code necessary and to make the code more readable. Some commonly used functions in functional programming are map, filter, and reduce. Map transforms a series of elements by applying a function individually to each element in the series. It then returns the series of transformed elements. Filter also applies a function individually to each element in a series; however, with filter, this function evaluates to True or False and only elements that evaluate to True are retained. Finally, reduce operates on pairs of elements in a series. It applies a function that takes in two values and returns a single value.

```

- Broadcast
- Accumulators