# Spark Tutorial
_______________________

## 1. The Problem

Your data is split across several disks across several computers connected by a network. And, probably, all your data combined take so much space that they would never fit a single computer. On the other hand, you still need to process your data. Well, without proper infrastructure, you would need to track down each piece of information, process them separately, and combine the information together. On top of that, you will need to repeat this process many times, too many times. 

Luckily, the Hadoop Distributed File System (HDFS) solves most of these problems. It provides also MapReduce API to do the processing. However, the MapReduce API is still very low-level, and getting things done requires too much time.

Here is where Spark comes into play. Spark provides high-level APIs for large-scale data processing. Spark can run over an existing HDFS or it can run in a standalone mode (which does not require setting up any HDFS). To be fair, Spark in standalone mode is not particularly useful but it provides an easy way to access its functionalities and test them. Moreover, whatever code you write in standalone mode that runs on your laptop will work automatically on huge HDFS. 
______________________

### 2. Spark Context

The [SparkContext] represents the entry point to access spark functionalities. 

In the Spark framework, there are two main actors: the driver and the executors. The driver has jobs that need to be run. The driver splits jobs into tasks. These tasks are submitted to executors. Once completed, results are sent back to the driver. 

We will run Spark in local mode, so that, we can avoid running a whole HDFS on our machine. We will focus mainly on the programming paradigm. However, it may be useful to know that platforms such as [DataBricks] do exist. They simplify a lot of the work necessary to set up a real cluster on which spark can run. 

In local mode, you can access the Spark Web UI in http://localhost:4040.

[SparkContext]: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.html?highlight=pyspark%20sparkcontext#pyspark.SparkContext
[DataBricks]: https://databricks.com/

In [15]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
sc = spark.sparkContext
spark

Regardless of whether you are familiar with python or not, these functions will help you a lot. [help] ```help(x)``` shows the documentation of ```x```. [type] ```type``` show a string representing the type of ```x```. [dir] ```dir(x)``` shows anything that is accessible inside of ```x```. You can find many more on the [built-in] documentation page.

[help]:https://docs.python.org/3/library/functions.html#help
[type]:https://docs.python.org/3/library/functions.html#type
[dir]:https://docs.python.org/3/library/functions.html#dir[built-in]
[built-in]:https://docs.python.org/3/library/functions.html

In [16]:
print(sc)
print(type(sc))
# help(sc)

<SparkContext master=local[*] appName=test>
<class 'pyspark.context.SparkContext'>


________________________
## 3. Resilient Distributed Dataset (RDD)

[RDDs] are one of the main abstractions of Spark. They represent immutable elements distributed across different nodes.
- **Resilient**: The system is able to recompute/recover missing or damaged partitions due to node failures.
- **Distributed**: Data resides on multiple nodes in a cluster.
- **Dataset**: Collection of data.
- **Immutable**: Once created, they cannot change.
- **Lazy evaluated**: Operations are performed only when necessary.
- **Parallel**: Operations are performed parallelly.

<div style="text-align:center"><img src="http://spark-mooc.github.io/web-assets/images/partitions.png" alt="drawing" width="600"/></div>

An RDD can be created by calling SparkContext’s [parallelize] method ```sc.parallelize()``` on an existing collection in your driver program. The elements of the collection are copied to form a distributed dataset. ```sc.parallelize``` takes two arguments:
   1. The collection used to form the RDD.
   2. the number of partitions to cut the dataset into. Spark tries to set the number of partitions automatically based on your cluster.

[parallelize]: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.parallelize.html?highlight=parallelize
[RDDs]:https://spark.apache.org/docs/latest/rdd-programming-guide.html

In [17]:
# Parallelize data using 8 partitions
# This operation is a transformation of data into an RDD
# Spark uses lazy evaluation, so no Spark jobs are run at this point
data = range(30)
rdd  = sc.parallelize(data, 4)

print(data)
print(rdd.collect())

range(0, 30)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]


In [18]:
# Each RDD gets a unique ID
print('rdd id: {0}'.format(rdd.id()))

rdd id: 5


In [19]:
# We can name each newly created RDD using the setName() method
rdd.setName('My first rdd')

My first rdd PythonRDD[5] at collect at /tmp/ipykernel_26395/3337392560.py:8

In [20]:
# Let's view the lineage (the set of transformations) of the RDD using toDebugString()
print(rdd.toDebugString())

b'(4) My first rdd PythonRDD[5] at collect at /tmp/ipykernel_26395/3337392560.py:8 []\n |  ParallelCollectionRDD[4] at readRDDFromFile at PythonRDD.scala:274 []'


In [21]:
# Let's see how many partitions the RDD will be split into by using the getNumPartitions()
rdd.getNumPartitions()

4

In [22]:
type(rdd)

pyspark.rdd.PipelinedRDD

_______________

## 4. Transformations vs Actions
There are two types of operations that you can perform on an RDD: Transformations and Actions. 
- **Transformations**. Transformations are applied on RDDs and produce other RDDs. Additionally, Transformations are lazily evaluated, meaning that, they are not computed until an action is performed. Some common transformations are [map], and [filter].
- **Actions**. Actions do not return RDDs anymore. Actions do set in motion the sequence of transformation required to produce the result. Once the computation is done you get the result as output. Some common actions are [collect], [count], [reduce], and [take].

[map]:https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.map.html?highlight=map
[filter]:https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.filter.html?highlight=filter
[reduce]:https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.reduce.html?highlight=reduce
[collect]:https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.collect.html?highlight=collect
[count]:https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.count.html?highlight=count
[take]:https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.take.html?highlight=take


### 4.1 The map() Transformation
```map(f)```, the most common Spark transformation: it applies a function ```f``` to each item in the dataset, and outputs the resulting dataset. When you run [map] on a dataset, a single stage of tasks is launched. A stage is a group of tasks that all perform the same computation, but on different input data. One task is launched for each partition, as shown in the example below. A task is a unit of execution that runs on a single machine. When we run ```map(f)``` within a partition, a new task applies ```f``` to all of the entries in a particular partition and outputs a new partition. In this example figure, the dataset is broken into four partitions (using three workers), so four ```map()``` tasks are launched.

<img src="http://spark-mooc.github.io/web-assets/images/tasks.png" alt="drawing" width="600"/><img src="http://spark-mooc.github.io/web-assets/images/map.png" alt="drawing" width="600"/>

[map]: https://spark.apache.org/docs/3.2.1/api/python/reference/api/pyspark.RDD.map.html?highlight=map

In [23]:
# Create sub function to subtract 1
def sub(value): return (value - 1)

# Apply sub
rdd2 = rdd.map(sub)

We have applied ```sub()``` to ```rdd```. So, each element in ```rdd``` gets decremented of ```1```. However, no computation as yet started. As mentioned earlier, spark is lazily evaluated. This means, that only when we require certain operation to be done, the whole computantion will actually start. Let's see one of these operation that force the computation to start, the [collect] action.

You should feel a little bit of fear each time you call ```.collect()``` as it brings the data you requested on your machine memory. But what if you requested several GBs of data by mistake. Well, your machine may crash as the memory gets saturated and you may loose several hours worth of work.

[collect]: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.collect.html

In [24]:
print(rdd2.collect())

[-1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28]


### 4.2 The Filter() Transformation
The [filter] transformation is used, surprisingly, to ```filter()``` elements of an RDD. It works similarly to the ```map()``` transformations. It applies a function to all elements in an RDD. For example, suppose that ```f``` does return ```True``` if the input is odd and ```False``` otherwise. Suppose that you have an RDD containing a list of numbers. If you apply the ```f``` to the RDD you obtain again another RDD but with only odd numbers. Let's try it out.

The figure below shows how this would work on the small four-partition dataset.
 
<img src="http://spark-mooc.github.io/web-assets/images/tasks.png" alt="drawing" width="600"/><img src="http://spark-mooc.github.io/web-assets/images/filter.png" alt="drawing" width="600"/>

[filter]:https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.filter.html?highlight=filter


In [25]:
### keep only Perfect Squares

def isLessThan10(x): return True if x < 10 else False

result = rdd.filter(isLessThan10).collect()
print(result)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]


### 4.3 The reduce() function
Let us see the reduce function outside the Spark framework. The [reduce] function is a little bit harder to understand. ```reduce(f)``` takes, again, a function which we will call ```f```. This time around ```f(*,*)``` will take two arguments: The first one, we will call it the **accumulator**. The second one, we will call it the **current value**. For now, forget that data are stored in RDDs. Suppose that you have a list of 100 numbers [1,2,3,...,100]. Once again, ```f``` is called on every element of our list.
1) the first time ```f``` is called, the accumulator takes the first value of the list (```1```, in our example). Meanwhile the second argument of ```f``` is the second number in the list (```2```, in our example).
2) the second time ```f``` is called, the accumulator takes the value of the output of 1). Meanwhile, the second argument of ```f``` is the third number in the list (```3```, in our example).
3) the third time ```f``` is called, the accumulator takes the value of the output of 2). Meanwhile, the second argument of ```f``` is the fourth number in the list (```4```, in our example).
4) and so on ...
99) the 99th time ```f``` is called, the accumulator takes the value of the output of 98). Meanwhile, the second argument of ```f``` is the 100th number in the list (```100```, in our example).

In practice, the [reduce] function applies a function to every element of the list while accumulating results.

[reduce]:https://docs.python.org/3/library/functools.html

In [26]:
from functools import reduce ### not using spark

def sumAll(acc, x): return acc + x

result = reduce(sumAll, range(30))
print(result)

435


In [27]:
# Now let us accumulate only odd numbers. 
# You do not need to understand this function too deeply.
# Just keep in mind that the reduce function is a lot more flexible than it appears.

def isOdd(x):
    return True if x % 2 == 1 else False

def AccumulateOdds(acc, x):
    if type(acc) != list: return [e for e in (acc,x) if isOdd(e)]
    else: return acc + [x] if isOdd(x) else acc
                
print(reduce(AccumulateOdds,list(range(30))))

[1, 3, 5, 7, 9, 11, 13, 15, 17, 19, 21, 23, 25, 27, 29]


### 4.3.1 Exercises

* (★) given a list `[1,2,3,4,5]` multiply all the elements togheter using `reduce` from functools (result: `120`).
* (★) given a list `[1,2,3,4,5]` sum all the elements greater than `2` using `reduce`  from functools (result: `12`).
* (★) given a list `[1,2,3,4,5]` compute the average using `reduce` function  from functools (result: `3`).
* (★★) given a list `[1,2,3,4,5]` filter even elements using the `reduce` function  from functools (result: `[1,3,5]`).

### 4.4 The reduce() action
Now the [reduce] spark action differs a bit from the functools ·```reduce()```, but the main concepts are still valid. Again the ```reduce(*)``` action takes a function that is applied to evey element in the RDD. This function, call it ```f(*,*)```, takes two arguments. The first one accumulates the results and it is fed back to successive ```f(*,*)``` calls. However, the second argument can be an accumulator too. With simple reducer such as our ```sumAll(*,*)```, this does not make any difference. However, To reduces such as ```AccumulateOdds(*,*)```, it makes a lot of difference. 


[reduce]:https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.reduce.html?highlight=reduce

In [28]:
# again a function that sums the whole elements
result = rdd.reduce(sumAll)
print(result)

435


In [29]:
# Now let us accumulate only odd numbers. 
# You do not need to understand this function too deeply.
# Just keep in mind that the reduce function is a lot more flexible than it appears.

def AccumulateOddsSpark(acc, x):
    if type(acc) != list and type(x) != list: return [e for e in (acc,x) if isOdd(e)]
    if type(acc) == list and type(x) != list: return acc + [x] if isOdd(x) else acc
    if type(acc) == list and type(x) == list: return acc + x

result = rdd.reduce(AccumulateOddsSpark)
print(result)

# A little side note. There is a drastic difference from using the .filter(isOdd).collect() and using this reducer.
# In the first case the spark context is responsible for gathering all the filtered results. 
# Instead, in this case, we are directly gathering results oursevels. 
# Of course, this can lead to inefficiencies and it is quite error prone.
# Again, this is to show the flexibility of the reduce function.
# If you can obtain your result using map and reduce, just use them.

[1, 3, 5, 7, 9, 11, 13, 15, 17, 19, 21, 23, 25, 27, 29]


### 4.3.1 Exercises

* (★) given a list `[1,2,3,4,5]` multiply all the elements togheter using `reduce` from spark (result: `120`).
* (★) given a list `[1,2,3,4,5]` sum all the elements greater than `2` using `reduce` from spark (result: `12`).
* (★) given a list `[1,2,3,4,5]` compute the average using `reduce` from spark (result: `3`).
* (★★) given a list `[1,2,3,4,5]` filter even elements using the `reduce` function from spark (result: `[1,3,5]`).


### 4.5 The Count() Action

One of the most basic actions that we can run is the [count()] method which will count the number of elements in an RDD.

Each task counts the entries in its partition and sends the result to your SparkContext, which adds up all of the counts. The figure below shows what would happen if we ran `count()` on a small example dataset with just four partitions.

<img src="http://spark-mooc.github.io/web-assets/images/count.png" alt="drawing" width="600"/>


[count()]: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.count.html?highlight=count

In [30]:
print(rdd.count())

30


____________
## 5. Additional Actions

* [first]: `first()` returns the first available elements from the RDD. it depends on how the RDD is partitioned.
* [take]: `take(num)` returns the first `num` elements from the RDD. it depends on how the RDD is partitioned.
* [top]: `top(num, key=None)` returns the first (in descending order) `num` elements according to the `key`. 
* [takeOrdered]: `takeOrdered(num, key=None)`returns the first (int ascending order) `num` elements according to the `key`.
* [takeSample]: `takeSample(withReplacement, num, seed=None)` it randomly select `num` elements from the RDD. If `withReplacement=True` then it can return the same elements multiple times. Using two times the same `seed` yields the same results.
* [countByValue]: `countByValue()` returns the count of each unique value in this RDD as a dictionary of (value, count) pairs.

[first]: https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.RDD.first.html?highlight=first#pyspark.RDD.first 
[take]: https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.RDD.take.html?highlight=take#pyspark.RDD.take
[top]: https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.RDD.top.html?highlight=top#pyspark.RDD.top
[takeOrdered]: https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.RDD.takeOrdered.html?highlight=takeordered#pyspark.RDD.takeOrdered
[takeSample]: https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.RDD.takeSample.html?highlight=takesample#pyspark.RDD.takeSample
[countByValue]: https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.RDD.countByValue.html?highlight=countbyvalue#pyspark.RDD.countByValue

In [31]:
print(f"rdd.first()                    = {rdd.first()}")
print(f"rdd.take(5)                    = {rdd.take(5)}")
print(f"rdd.top(5, lambda x:x)         = {rdd.top(5, lambda x:x)}")
print(f"rdd.takeOrdered(5, lambda x:x) = {rdd.takeOrdered(5, lambda x:x)}")
print(f"rdd.takeSample(True, 5, 14)    = {rdd.takeSample(True, 5, 14)}")
print(f"rdd.countByValue()             = {rdd.countByValue()}")

rdd.first()                    = 0
rdd.take(5)                    = [0, 1, 2, 3, 4]
rdd.top(5, lambda x:x)         = [29, 28, 27, 26, 25]
rdd.takeOrdered(5, lambda x:x) = [0, 1, 2, 3, 4]
rdd.takeSample(True, 5, 14)    = [8, 21, 28, 22, 3]
rdd.countByValue()             = defaultdict(<class 'int'>, {0: 1, 1: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1, 7: 1, 8: 1, 9: 1, 10: 1, 11: 1, 12: 1, 13: 1, 14: 1, 15: 1, 16: 1, 17: 1, 18: 1, 19: 1, 20: 1, 21: 1, 22: 1, 23: 1, 24: 1, 25: 1, 26: 1, 27: 1, 28: 1, 29: 1})


________________
## 6. Additional Transformations

* [flatMap]: `flatMap(f)` returns a new RDD by first applying `f` to all elements of this RDD, and then flattening the results.
* [groupByKey]: `groupByKey()` Groups the values for each key in the RDD into a single sequence. This transformation operates on pair RDDs.  
* [reduceByKey]:  `reduceByKey(func)` Merges the values for each key using an associative and commutative reduce function.



Both of these transformations ([groupByKey] and [reduceByKey]) operate on pair RDDs.  A pair RDD is an RDD where each element is a pair tuple (key, value).  For example, `sc.parallelize([('a', 1), ('a', 2), ('b', 1)])` would create a pair RDD where the keys are 'a', 'a', 'b' and the values are 1, 2, 1.
The `reduceByKey()` transformation gathers together pairs that have the same key and applies a function to two associated values at a time. `reduceByKey()` operates by applying the function first within each partition on a per-key basis and then across the partitions.
While both the `groupByKey()` and `reduceByKey()` transformations can often be used to solve the same problem and will produce the same answer, the `reduceByKey()` transformation works much better for large distributed datasets. This is because Spark knows it can combine output with a common key on each partition *before* shuffling (redistributing) the data across nodes.  Only use `groupByKey()` if the operation would not benefit from reducing the data before the shuffle occurs.
 
Look at the diagram below to understand how `reduceByKey` works.  Notice how pairs on the same machine with the same key are combined (by using the lamdba function passed into reduceByKey) before the data is shuffled. Then the lamdba function is called again to reduce all the values from each partition to produce one final result.

On the other hand, when using the `groupByKey()` transformation - all the key-value pairs are shuffled around, causing a lot of unnecessary data to be transferred over the network.
 
To determine which machine to shuffle a pair to, Spark calls a partitioning function on the key of the pair. Spark spills data to disk when there is more data shuffled onto a single executor machine than can fit in memory. However, it flushes out the data to disk one key at a time, so if a single key has more key-value pairs than can fit in memory an  out-of-memory exception occurs. This will be more gracefully handled in a later release of Spark so that the job can still proceed, but should still be avoided.  When Spark needs to spill to disk, performance is severely impacted.

<img src="http://spark-mooc.github.io/web-assets/images/group_by.png" alt="drawing" width="600"/> <img src="http://spark-mooc.github.io/web-assets/images/reduce_by.png" alt="drawing" width="600"/>

[flatMap]: https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.RDD.flatMap.html?highlight=flatmap#pyspark.RDD.flatMap
[groupByKey]: https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.RDD.groupByKey.html?highlight=groupbykey#pyspark.RDD.groupByKey
[reduceByKey]: https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.RDD.reduceByKey.html?highlight=reducebykey#pyspark.RDD.reduceByKey

In [32]:
pair_rdd = sc.parallelize([('a', 1), ('a', 2), ('b', 1)])
print("flatMap    :", pair_rdd.flatMap(lambda x: [x,x]).collect())
print("groupByKey :", pair_rdd.groupByKey().map(lambda x:(x[0],list(x[1]))).collect())
print("reduceByKey:", pair_rdd.reduceByKey(lambda x,y:x+y).collect())

flatMap    : [('a', 1), ('a', 1), ('a', 2), ('a', 2), ('b', 1), ('b', 1)]
groupByKey : [('a', [1, 2]), ('b', [1])]
reduceByKey: [('a', 3), ('b', 1)]


### 6.1 Exercises

You will work on this pair rdd (name, price): 
```
('alpha', '9/2/22' , '932$'), 
('alpha', '10/2/22', '904$'), 
('alpha', '11/2/22', '806$'),
('beta' , '9/2/22' , '2831$'), 
('beta' , '10/2/22', '2732$'), 
('beta' , '11/2/22', '2685$'),
('gamma', '9/2/22' , '312$'), 
('gamma', '10/2/22', '301$'), 
('gamma', '11/2/22', '285$')
```

* (★) compute the total price. (result: `11788`) 
* (★★) compute the average per name. (result: `('alpha', 880.666), ('beta', 2749.333), ('gamma',299.333)`) 


______________________________________
## 7. RDDs Memory Management

For efficiency Spark keeps your RDDs in RAM memory. By keeping the contents in memory, Spark can quickly access the data. However, memory is limited, so if you try to keep too many RDDs in memory, Spark will automatically delete RDDs from memory to make space for new RDDs. If you later refer to one of the RDDs, Spark will automatically recreate the RDD for you, but that takes time.
 
So, if you plan to use an RDD more than once, then you should tell Spark to cache that RDD. You can use the `cache()` operation to keep the RDD in memory. However, you must still trigger an action on the RDD, such as `collect()` for the RDD to be created, and only then will the RDD be cached. Keep in mind that if you cache too many RDDs and Spark runs out of memory, it will delete the least recently used (LRU) RDD first. Again, the RDD will be automatically recreated when accessed.
 
You can check if an RDD is cached by using the `is_cached` attribute, and you can see your cached RDD in the "Storage" section of the Spark web UI. If you click on the RDD's name, you can see more information about where the RDD is stored.

[cache]: https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.RDD.cache.html?highlight=cache#pyspark.RDD.cache

In [33]:
new_rdd = sc.parallelize(range(10))
new_rdd.setName("MyRDD")
print("before .cache()              :", new_rdd.is_cached)
new_rdd.cache()
print("after  .cache()              :", new_rdd.is_cached)
new_rdd.collect()
print("after  .cache() and an action:", new_rdd.is_cached)

before .cache()              : False
after  .cache()              : True
after  .cache() and an action: True


Spark automatically manages the RDDs cached in memory and will save them to disk if it runs out of memory. For efficiency, once you are finished using an RDD, you can optionally tell Spark to stop caching it in memory by using the RDD's `unpersist()` method to inform Spark that you no longer need the RDD in memory.

In [34]:
new_rdd.unpersist()

MyRDD PythonRDD[31] at RDD at PythonRDD.scala:53