# <center>Big Data &ndash; Exercises </center>
## <center>Fall 2023 &ndash; Week 8 &ndash; ETH Zurich</center>
## <center>YARN + Spark</center>

# Exercise 1 &mdash; What is YARN?

Fundamentally, “**Y**et **A**nother **R**esource **N**egotiator”. **YARN**  is a resource scheduler designed to work on existing and new Hadoop clusters. 

YARN supports pluggable schedulers. The task of the scheduler is to share the resources of a large cluster among different tenants (applications) while trying to meet application demands (memory, CPU). A user may have several applications active at a time. 

### 1.1 &ndash; List at least 3 main shortcomings of MapReduce v1, which are addressed by YARN design.

### Answer

1. **Scalability Limitations in MapReduce v1**:
   - *Bottleneck*: MapReduce v1 suffered from a bottleneck with the JobTracker, responsible for resource scheduling and monitoring. This limited scalability, allowing for only around 5K nodes and up to 40K concurrent tasks (per Yahoo!).
   - *YARN Solution*: YARN's design decentralizes resource management, eliminating the JobTracker bottleneck. It allows for much greater scalability by enabling efficient resource allocation and management across the cluster, accommodating larger node counts (~10K) and higher concurrent task loads.

2. **Rigidity in Job Types**:
   - *MapReduce Specificity*: MapReduce v1 was confined to supporting only MapReduce-specific jobs. This limitation hindered the ability to schedule and execute diverse workloads such as MPI, graph processing, or user-specific code.
   - *YARN's Adaptability*: YARN's architecture is designed to accommodate various workloads beyond MapReduce, enabling the sharing of the cluster resources among different types of applications and frameworks. It allows for scheduling and executing non-MapReduce workloads like MPI, graph processing, and custom user code, enhancing the cluster's versatility.

3. **Resource Utilization Efficiency**:
   - *Idle Resources*: MapReduce v1 exhibited inefficiencies in resource utilization where reducers often waited for mappers to finish (and vice-versa), resulting in idle resources during these wait times.
   - *YARN's Dynamic Resource Allocation*: YARN optimizes resource utilization by allowing independent scheduling of resources for different tasks. It enables simultaneous execution of various tasks, ensuring that all available resources are utilized efficiently at any given time, reducing idle periods and maximizing overall cluster performance.

4. **Flexibility and Dynamic Configuration**:
   - *Static Mapper and Reducer Roles*: MapReduce v1 constrained the roles of mappers and reducers to configurations set at job initiation, lacking the flexibility to adapt roles during runtime.
   - *YARN's Dynamic Configuration*: YARN provides greater flexibility by allowing dynamic allocation and reallocation of resources during runtime. Applications can adapt their resource requirements and configurations dynamically, allowing for more efficient resource utilization and adaptive task handling based on changing workloads or conditions.

### Components of YARN


1. **ResourceManager (RM):**
   - **Global Resource Manager**: Controls and manages the cluster's resources.
   - **Resource Allocation**: Allocates resources to various applications.
   - **Scheduling**: Schedules resources among competing applications based on policies and priorities.
   - **Manages NodeManagers**: Monitors NodeManagers and their resource availability.

2. **NodeManager (NM):**
   - **Node-level Resource Manager**: Manages resources on individual cluster nodes.
   - **Monitors Resource Usage**: Tracks resource utilization (CPU, memory, etc.) on the node.
   - **Execution and Monitoring**: Executes and monitors containers (fundamental units of execution) on the node.
   - **Reports to ResourceManager**: Regularly communicates with the ResourceManager, providing updates on available resources and container status.

3. **ApplicationMaster (AM):**
   - **Per-Application Coordinator**: Manages a specific application running on YARN.
   - **Resource Negotiation**: Negotiates resources from the ResourceManager for the application's tasks or containers.
   - **Application Lifecycle Management**: Coordinates the execution lifecycle, handling task execution, monitoring progress, and handling failures or retries.
   - **Optimizes Execution**: Optimizes task execution based on resource availability and application requirements.

4. **Containers:**
   - **Execution Units**: Containers encapsulate allocated resources (CPU, memory, etc.) for running specific tasks or components of an application.
   - **Isolated Execution Environment**: Provides a controlled environment for executing tasks without interference from other containers on the same node.
   - **Managed by NodeManager**: Managed and executed by the NodeManager on respective nodes.

5. **ResourceManager Web UI:**
   - **Visualization and Monitoring**: Provides a web-based interface for administrators and users to monitor resource utilization, application status, and cluster health.
   - **Insight into Resource Allocation**: Offers insights into resource allocation, application progress, and overall cluster performance.



### Steps taken to launch a job in YARN


1. **Submitting the Job:**
   - A user submits their job/application to the YARN cluster. This submission might be through a command-line interface, an API call, or a higher-level framework built on top of YARN, such as Apache Hadoop MapReduce, Apache Spark, or others.

2. **ResourceManager Receives the Request:**
   - The ResourceManager (RM) receives the job/application submission request.
   - The RM is responsible for managing the cluster's resources and allocating resources to various applications.

3. **Resource Negotiation:**
   - The ResourceManager negotiates resources for the submitted job/application.
   - It considers the resource requirements specified by the user or the job/application (e.g., memory, CPU cores) and the current availability of resources in the cluster.

4. **Allocation of Containers:**
   - Upon resource allocation, the ResourceManager assigns containers to the ApplicationMaster (AM) for the submitted job/application.
   - Each container encapsulates the allocated resources (CPU, memory, etc.) needed to execute specific tasks.

5. **Launching the ApplicationMaster:**
   - The ResourceManager launches the ApplicationMaster on a chosen node in the cluster.
   - The ApplicationMaster is responsible for managing the execution of the user's job/application. It coordinates with the ResourceManager for resource needs and oversees task execution.

6. **Task Execution within Containers:**
   - The ApplicationMaster, once initialized, starts sending requests to the NodeManagers (NMs) to launch containers on specific nodes within the cluster.
   - NodeManagers launch containers and execute tasks associated with the job/application within these containers.

7. **Container Execution and Monitoring:**
   - NodeManagers monitor and manage the execution of containers.
   - They _report container status updates (start, finish, failure) to the ResourceManager, which in turn communicates this information to the ApplicationMaster._

8. **Task Completion and Job Termination:**
   - As tasks within containers complete their execution, they report their status back to the ApplicationMaster.
   - The ApplicationMaster keeps track of task completion, aggregates results, and manages the overall progress of the job/application.

9. **Finalization and Resource Release:**
   - Once the job/application completes its tasks, the ApplicationMaster signals the ResourceManager about job completion.
   - The ResourceManager releases the allocated resources back to the cluster for other applications and awaits further job submissions.



### 1.2 &ndash; State which of the following statements are true:

1. The ResourceManager has to provide fault tolerance for resources across the cluster 

    The statement is __false__ because, in the context of YARN (Yet Another Resource Negotiator), the responsibility for fault tolerance of resources across the cluster does not lie primarily with the ResourceManager. Fault tolerance in YARN is primarily handled by other components:

    1. **NodeManagers**: These are responsible for managing resources on individual nodes in the cluster. They monitor resource usage, report to the ResourceManager, and manage container execution. NodeManagers are crucial in handling failures at the node level and ensuring that tasks or containers are rerun on other healthy nodes in case of node failures.

    2. **ApplicationMaster**: Each application running on YARN has its own ApplicationMaster. The ApplicationMaster is responsible for negotiating resources with the ResourceManager and managing the execution of the application's tasks or containers. It monitors the progress of the application and handles failures or retries at the application level.

    3. **HDFS**: For fault tolerance of data, YARN often relies on the underlying HDFS. HDFS replicates data across multiple nodes in the cluster to ensure that data remains available even if some nodes fail.


1. Container allocation/deallocation can take place in a dynamic fashion as the application progresses. 

    __True__

1. YARN plans to allow applications to only request resources in terms of memory usage and number of CPUs.
    
    __False__, because because YARN, as a resource management framework, offers flexibility in the type of resources that applications can request beyond just memory and CPU.

    YARN is designed to accommodate diverse workloads and varied resource requirements beyond memory and CPU. It supports the notion of "resource types," which enables applications to request and utilize different kinds of resources based on their specific needs. These resource types can include but are not limited to:

    1. **Memory:** Applications can request a certain amount of memory for their tasks or containers.
    2. **CPU:** They can also specify the number of CPU cores required for their execution.
    3. **GPUs:** For applications that benefit from GPU acceleration, YARN allows for GPU resource requests.
    4. **Disk Space:** Applications might need specific disk space for temporary storage or data processing.
    5. **Custom Resources:** YARN allows for defining custom resource types to cater to specific application requirements that go beyond standard memory and CPU, such as specialized hardware or software licenses.

    By enabling applications to request various types of resources, YARN supports a wide range of workloads, including machine learning, data analytics, scientific computing, and more, each with distinct resource needs beyond memory and CPU. Therefore, the statement is false because YARN's design is not limited solely to memory usage and the number of CPUs when it comes to resource requests.

1. Communications between the ResourceManager and NodeManagers are heartbeat-based. 

    __True__:
    NodeManagers periodically send heartbeats (every a few seconds) to the ResourceManager to provide status updates and information about the resources available on their respective nodes. These heartbeats serve as a way for NodeManagers to communicate their health, report resource utilization, and update the ResourceManager about the status of containers they are managing.

    The ResourceManager relies on these heartbeat messages from NodeManagers to keep track of node health, available resources, and to detect any issues or failures at the node level. By maintaining a regular heartbeat exchange, the ResourceManager can dynamically manage resources, allocate or reallocate resources as needed, and make decisions based on the most up-to-date information about the cluster's state.
    This heartbeat-based communication ensures that the ResourceManager stays informed about the health and status of the nodes in the cluster, allowing for effective resource allocation, fault tolerance, and overall cluster management within the YARN framework.

1. The ResourceManager does not have a global view of all usage of cluster resources. Therefore, it tries to make better scheduling decisions based on probabilistic prediction. 

    __False__ (last session)

1. ResourceManager has the ability to request resources back from a running application.

    __True__: In the YARN (Yet Another Resource Negotiator) framework, the ResourceManager (RM) has the capability to reclaim or request resources back from a running application under certain circumstances.

    This feature is known as "resource preemption." Preemption allows the ResourceManager to reclaim resources from applications that are utilizing more resources than their fair share or if there's a need to allocate resources to higher-priority applications.

    1. **Queue-based Preemption:** YARN supports resource allocation based on queues with varying priorities. If a higher-priority queue or application needs resources that are currently being used by a lower-priority application, the ResourceManager can reclaim resources from the lower-priority application to satisfy the higher-priority request.

    2. **Over-allocation or Fairness:** If an application is using resources significantly beyond its allocated share or if it's causing other applications to be starved of resources, the ResourceManager can step in and reclaim resources to maintain fairness and prevent resource monopolization.

    3. **Dynamic Resource Demands:** In environments where resource demands fluctuate dynamically, the ResourceManager can adjust resource allocations across applications to ensure optimal resource utilization and meet changing demands efficiently.

    Resource preemption ensures that resources are distributed fairly and efficiently among competing applications in the cluster. However, preemption is typically used judiciously to avoid disrupting long-running jobs or causing unnecessary interference with the application's execution, aiming to balance fairness and cluster efficiency.

### 1.3 &ndash; Whose responsibility is it? Say which component of YARN is resposible for each of the following tasks.

1. Fault Tolerance of running applications *[ResourceManager | ApplicationMaster | NodeManager ]*
    
    AM
    
1. Asking for resources needed for an application *[ResourceManager | ApplicationMaster | NodeManager ]*
    
    AM

1. Providing leases to use containers *[ResourceManager | ApplicationMaster | NodeManager]*

    RM

1. Tracking status and progress of running applications *[ResourceManager | ApplicationMaster | NodeManager]*

    AM

__Bonus__: Handling container failures. *[ResourceManager | ApplicationMaster | NodeManager ]*

RM + AM + NM

1. **Container Failure Detection:** When a container fails, the NodeManager detects the failure and reports it to the ResourceManager.

2. **ResourceManager's Role:** The ResourceManager receives information about container failures and manages the cluster's resources. It reallocates resources and attempts to recover from the failure by potentially rerunning the failed container on a different node if necessary. _The ResourceManager ensures that the resources requested by the ApplicationMaster are made available for the failed task to be retried._

3. **ApplicationMaster's Response:** The ApplicationMaster, which is responsible for managing the specific application's execution, receives updates from the ResourceManager about container status changes. Upon detecting a container failure, the ApplicationMaster can take action, such as:

   - **Retrying Failed Tasks:** The ApplicationMaster might initiate a retry of the failed task by requesting the ResourceManager to allocate resources for a new container to replace the failed one. This retry can occur on the same or a different node.
   
   - **Handling Failures:** The ApplicationMaster might implement error handling strategies, such as logging the failure, updating the job status, resuming from a checkpoint, or taking other appropriate actions based on the application's logic and requirements.

YARN's design allows for targeted recovery by rerunning specific failed tasks or containers, minimizing the impact of failures on the overall job execution. The ApplicationMaster is responsible for managing these recovery actions based on the status updates received from the ResourceManager.

### 1.4 &ndash; What is the typical configuration for YARN? Choose for the following components how many instances of them there are in a cluster.

```
1. ResourceManager -- a                a. One per cluster

2. ApplicationMaster -- c                b. One per node

3. NodeManager -- b                      c. Many per cluster, but usually not per node

4. Container -- d                        d. Many per node 
```

## 2. Setup the Spark cluster on Docker

1. Start docker <br> 
   docker-compose up -d

2. Access jupyter notebook <br> 
   http://localhost:8888/lab/tree/work/Exercise08_Spark.ipynb

## 3. Apache Spark Architecuture

Spark is a cluster computing platform designed to be fast and general purpose. Spark extends the MapReduce model to efficiently cover a wide range of workloads that previously required separate distributed systems, including interactive queries and stream processing. Spark offers the ability to run computations in memory.

At a high level, every Spark application consists of a **driver program** that launches various parallel operations on a cluster. The driver program contains your application's main function and defines distributed datasets on the cluster, then applies operations to them.

Driver programs access Spark through a **SparkContext** object, which represents a connection to a computing cluster. There is no need to create a SparkContext; it is created for you automatically when you run the first code cell in the Jupyter

The driver communicates with a potentially large number of distributed workers called **executors**. The driver runs in its own process and each executor is a separate process. A driver and its executors are together termed a Spark application.

![Image of Account](http://spark.apache.org/docs/latest/img/cluster-overview.png)

### 3.1 Understand resilient distributed datasets (RDD)

An RDD in Spark is simply an immutable distributed collection of objects. Each RDD is split into multiple partitions, which may be computed on different nodes of the cluster. 

##### What are RDD operations?
RDDs offer two types of operations: **transformations** and **actions**.

* **Transformations** create a new dataset from an existing one. Transformations are lazy, meaning that no transformation is executed until you execute an action.
* **Actions** compute a result based on an RDD, and either return it to the driver program or save it to an external storage system (e.g., HDFS)

Transformations and actions are different because of the way Spark computes RDDs. Although you can define new RDDs any time, Spark computes them only in a **lazy** fashion, that is, the first time they are used in an action.

##### Create Spark session (Spark install within jupyter docker image) and context

RDDs can be created from stable storage or by transforming other RDDs. Run the cells below to create RDDs from the sample data files that have been copied to hdfs. 

### Why was RDD designed to be immutable?

### Why was RDD designed to be "lazy"?

### Why is Spark much faster than MapReduce?

Spark outperforms Hadoop MapReduce in several aspects, leading to its faster processing speed and improved performance:

1. **In-Memory Processing:** Spark performs computations in memory, reducing the need to read and write to disk after each step, unlike Hadoop MapReduce, which writes to disk between map and reduce phases. This in-memory processing significantly reduces I/O overhead and speeds up data processing.

2. **DAG Execution Model:** Spark uses a Directed Acyclic Graph (DAG) execution model, allowing it to optimize and chain together multiple operations or transformations into a single job. This eliminates the overhead of writing intermediate results to disk after each stage, improving efficiency.

3. **Resilient Distributed Datasets (RDDs):** Spark's RDDs are a fundamental abstraction that allows for fault tolerance and distributed data processing. RDDs can be cached in memory across multiple operations, reducing data replication and improving the speed of subsequent computations.

4. **Lazy Evaluation:** Spark uses lazy evaluation, deferring the actual execution of operations until an action is triggered. This optimization allows it to optimize the execution plan and minimize unnecessary computations, resulting in faster processing.

5. **Rich APIs and Libraries:** Spark offers a wide range of high-level APIs in Python, Scala, Java, and R, along with various libraries (e.g., Spark SQL, MLlib, GraphX), enabling diverse and complex data processing tasks to be performed more efficiently compared to the limited capabilities of MapReduce.

6. **Interactive and Iterative Processing:** Spark is well-suited for interactive data analysis and iterative machine learning algorithms due to its in-memory computations and iterative processing capabilities, resulting in faster response times for these workloads compared to MapReduce.

7. **Dynamic Partitioning and Pipelining:** Spark provides dynamic partitioning, enabling users to control how data is distributed across nodes, and pipelining optimizations that minimize shuffling and data movement, leading to improved performance.

While Spark demonstrates faster performance compared to Hadoop MapReduce in many scenarios, the choice between the two depends on specific use cases, data sizes, resource availability, and processing requirements. In certain situations, such as handling large-scale batch processing or when fault tolerance is critical, MapReduce might still be a suitable choice despite Spark's performance advantages.

In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
#* use spark session
spark = SparkSession.builder.master('local').getOrCreate()
sc = spark.sparkContext

In [6]:
# Create an RDD
fruits = sc.textFile('fruits.txt')
yellowThings = sc.textFile('yellowthings.txt')

##### RDD transformations


In [7]:
# map
fruitsReversed = fruits.map(lambda fruit: fruit[::-1])
fruitsReversed.collect()

['elppa',
 'ananab',
 'nolem yranac',
 'eparg',
 'nomel',
 'egnaro',
 'elppaenip',
 'yrrebwarts']

In [8]:
# filter
shortFruits = fruits.filter(lambda fruit: len(fruit) <= 5)
shortFruits.collect()

['apple', 'grape', 'lemon']

In [9]:
# flatMap
characters = (fruits
                  .filter(lambda fruit: len(fruit) <= 5)
                  .flatMap(lambda fruit: list(fruit)))
characters.collect()

['a', 'p', 'p', 'l', 'e', 'g', 'r', 'a', 'p', 'e', 'l', 'e', 'm', 'o', 'n']

In [10]:
# union between fruits and yellowthings datasets
fruitsAndYellowThings = fruits.union(yellowThings)
fruitsAndYellowThings.collect()

['apple',
 'banana',
 'canary melon',
 'grape',
 'lemon',
 'orange',
 'pineapple',
 'strawberry',
 'banana',
 'bee',
 'butter',
 'canary melon',
 'gold',
 'lemon',
 'pineapple',
 'sunflower']

In [11]:
# intersection between fruits and yellowthings datasets
yellowFruits = fruits.intersection(yellowThings)
yellowFruits.collect()

['banana', 'pineapple', 'canary melon', 'lemon']

In [12]:
# distinct elements in the two datasets
distinctFruitsAndYellowThings = fruitsAndYellowThings.distinct()
distinctFruitsAndYellowThings.collect()

['banana',
 'orange',
 'pineapple',
 'butter',
 'gold',
 'sunflower',
 'apple',
 'canary melon',
 'grape',
 'lemon',
 'strawberry',
 'bee']

In [13]:
# groupByKey
yellowThingsByFirstLetter = yellowThings.map(lambda thing: (thing[0], thing)).groupByKey()
for letter, lst in yellowThingsByFirstLetter.collect():
    print("Words starting with: ", letter)
    for obj in lst:
        print(" > ", obj)

Words starting with:  b
 >  banana
 >  bee
 >  butter
Words starting with:  c
 >  canary melon
Words starting with:  g
 >  gold
Words starting with:  l
 >  lemon
Words starting with:  p
 >  pineapple
Words starting with:  s
 >  sunflower


In [14]:
# MapReduce: key is the number of characters of the fruit name (len(fruit))
numFruitsByLength = fruits.map(lambda fruit: (len(fruit), 1)).reduceByKey(lambda a, b: a + b)
numFruitsByLength.collect()

[(5, 3), (6, 2), (12, 1), (9, 1), (10, 1)]

##### RDD actions

In [15]:
# collect
fruitsArray = fruits.collect()
yellowThingsArray = yellowThings.collect()
print(fruitsArray)
print(yellowThingsArray)

['apple', 'banana', 'canary melon', 'grape', 'lemon', 'orange', 'pineapple', 'strawberry']
['banana', 'bee', 'butter', 'canary melon', 'gold', 'lemon', 'pineapple', 'sunflower']


In [30]:
# count - how many fruits are
fruits.count()

8

In [29]:
# take - show the first three fruits
fruits.take(3)

['apple', 'banana', 'canary melon']

In [31]:
# reduce - what letters are used
fruits.map(lambda fruit: set(fruit)).reduce(lambda x, y: x.union(y))

{' ',
 'a',
 'b',
 'c',
 'e',
 'g',
 'i',
 'l',
 'm',
 'n',
 'o',
 'p',
 'r',
 's',
 't',
 'w',
 'y'}

##### Lazy evaluation
Lazy evaluation means that when we call a transformation on an RDD (for instance, calling `map()`), the operation is not immediately performed. Instead, Spark internally records metadata to indicate that this operation has been requested. Rather than thinking of an RDD as containing specific data, it is best to think of each RDD as
consisting of instructions on how to compute the data that we build up through transformations. Loading data into an RDD is lazily evaluated in the same way transformations are. So, when we call `sc.textFile()`, the data is not loaded until it is necessary. As with transformations, the operation (in this case, reading the data) can
occur multiple times.


Finally, as you derive new RDDs from each other using transformations, Spark keeps track of the set of dependencies between different RDDs, called the lineage graph. For instance, the code bellow corresponds to the following graph:

<img src="https://cloud.inf.ethz.ch/s/BiRkaa97xKEZ7NE/download" height="400">

In [16]:
apples = fruits.filter(lambda x: "apple" in x)
lemons = yellowThings.filter(lambda x: "lemon" in x)
applesAndLemons = apples.union(lemons)
print(applesAndLemons.collect())
print(applesAndLemons.toDebugString().decode("utf-8")) # decode used for nice formatting

['apple', 'pineapple', 'lemon']
(2) UnionRDD[48] at union at NativeMethodAccessorImpl.java:0 []
 |  PythonRDD[46] at RDD at PythonRDD.scala:53 []
 |  fruits.txt MapPartitionsRDD[16] at textFile at NativeMethodAccessorImpl.java:0 []
 |  fruits.txt HadoopRDD[15] at textFile at NativeMethodAccessorImpl.java:0 []
 |  PythonRDD[47] at RDD at PythonRDD.scala:53 []
 |  yellowthings.txt MapPartitionsRDD[18] at textFile at NativeMethodAccessorImpl.java:0 []
 |  yellowthings.txt HadoopRDD[17] at textFile at NativeMethodAccessorImpl.java:0 []


### 3.2 Exercise

1. What does the code below do?
1. Draw the linage graph for the code
1. List actions and transformations used in it
1. When are all computations executed?
1. If we call `result.collect()` again, what will Spark do to perform the action? 

In [19]:
text = sc.textFile('fruits.txt')
words = text.flatMap(lambda x: x.split(" "))
result = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
result.saveAsTextFile('result')
result.collect()

[('apple', 1),
 ('banana', 1),
 ('canary', 1),
 ('melon', 1),
 ('grape', 1),
 ('lemon', 1),
 ('orange', 1),
 ('pineapple', 1),
 ('strawberry', 1)]

### 3.3 Persistence (Caching)

Spark's RDDs are by default recomputed each time you run an action on
them. If you would like to reuse an RDD in multiple actions, you can ask Spark to persist it using `RDD.persist()`. After computing it the first time, Spark will store the RDD contents in memory (partitioned across the machines in your cluster), and reuse them in future actions. Persisting RDDs on disk instead of memory is also possible.

If you attempt to cache too much data to fit in memory, Spark will automatically evict old partitions using a Least Recently Used (LRU) cache policy. For the memory-only storage levels, it will recompute these partitions the next time they are accessed,
while for the memory-and-disk ones, it will write them out to disk. In either case, this means that you don't have to worry about your job breaking if you ask Spark to cache too much data. However, caching unnecessary data can lead to eviction of useful data
and more recomputation time. Finally, RDDs come with a method called `unpersist()` that lets you manually remove them from the cache.

Please note that both `persist()` and `cache()` (which is a simple wrapper that calls `persist(storageLevel=StorageLevel.MEMORY_ONLY)` - see [here](https://spark.apache.org/docs/latest/api/python/_modules/pyspark/rdd.html#RDD.cache) for details -) are lazy operations themselves. The caching operation will, in fact, only take place when the first action is called. With successive action calls, the cached RDD will be used.  

### 3.4 Exercise:
1. Write some code which can benefit from caching.
1. Where should we ask Spark to persist the RDD in Exercise 3.2 to prevent it from re-executing the code when we call `collect()` again?

#### Explicit Caching of RDDs

In [187]:
from pyspark.storagelevel import StorageLevel

In [188]:
#* Example 1
fruits.cache()
fruits.count(), fruits.take(3)

(8, ['apple', 'banana', 'canary melon'])

In [56]:
#* Memory-cached deserialized object + (Memory-cached) serialized Java object
fruits.is_cached, str(fruits.getStorageLevel())

(True, 'Memory Serialized 1x Replicated')

In [57]:
fruits.unpersist()
#* Memory-cached serialized Java object
fruits.is_cached, str(fruits.getStorageLevel())

(False, 'Serialized 1x Replicated')

In [189]:
#* Example 2
words = fruits.flatMap(lambda x: x.split(" "))
result = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
result.persist(StorageLevel.MEMORY_AND_DISK_2)
result.collect()

[('apple', 1),
 ('banana', 1),
 ('canary', 1),
 ('melon', 1),
 ('grape', 1),
 ('lemon', 1),
 ('orange', 1),
 ('pineapple', 1),
 ('strawberry', 1)]

In [60]:
#* Disk (if falls out of mem) +
#* Memory-cached deserialized object +
#* Memory-cached serialized Java object +
#* Replicated on 2 nodes
result.is_cached, str(result.getStorageLevel())

(True, 'Disk Memory Serialized 2x Replicated')

In [42]:
result.unpersist()
result.is_cached, str(result.getStorageLevel())

(False, 'Serialized 1x Replicated')

### 3.5 Working with Key/Value Pairs


Spark provides special operations on RDDs containing key/value pairs. These RDDs
are called *pair RDDs*. Pair RDDs are a useful building block in many programs, as
they expose operations that allow you to act on each key in parallel or regroup data
across the network. For example, pair RDDs have a `reduceByKey()` method that can
aggregate data separately for each key, and a `join()` method that can merge two
RDDs together by grouping elements with the same key. Pair RDDs are also still RDDs. 

In [69]:
# Example
rdd = sc.parallelize([("key1", 0) ,("key2", 3), ("key1", 8) ,("key3", 3), ("key3", 9)])
rdd2 = rdd.mapValues(lambda x: (x, 1))
rdd3 = rdd2.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
print(rdd2.collect())
print(rdd3.collect())
print(rdd3.toDebugString().decode("utf-8"))

[('key1', (0, 1)), ('key2', (3, 1)), ('key1', (8, 1)), ('key3', (3, 1)), ('key3', (9, 1))]
[('key1', (8, 2)), ('key2', (3, 1)), ('key3', (12, 2))]
(1) PythonRDD[199] at collect at <ipython-input-69-f780a7e4e5c2>:6 []
 |  MapPartitionsRDD[197] at mapPartitions at PythonRDD.scala:145 []
 |  ShuffledRDD[196] at partitionBy at NativeMethodAccessorImpl.java:0 []
 +-(1) PairwiseRDD[195] at reduceByKey at <ipython-input-69-f780a7e4e5c2>:4 []
    |  PythonRDD[194] at reduceByKey at <ipython-input-69-f780a7e4e5c2>:4 []
    |  ParallelCollectionRDD[193] at readRDDFromFile at PythonRDD.scala:274 []


### 3.6 Exercise
1. What does the code above do? 
2. Where can it be used? Complete the code to perform the desired functionality. 

In [70]:
rdd = sc.parallelize([("key1", 0) ,("key2", 3),("key1", 8) ,("key3", 3),("key3", 9)])
rdd2 = rdd.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
average = rdd2.mapValues(lambda x: x[0] / x[1])
print(average.collect())

[('key1', 4.0), ('key2', 3.0), ('key3', 6.0)]


### 3.7 Spark Partitioning
Spark programs can choose to control their RDDs' partitioning
to reduce communication. Partitioning will not be helpful in all applications, for
example, if a given RDD is scanned only once, there is no point in partitioning it in
advance. It is useful only when a dataset is reused multiple times in key-oriented
operations such as joins.

Spark's partitioning is available on all RDDs of key/value pairs, and causes the system
to group elements based on a function of each key. Although Spark does not give
explicit control of which worker node each key goes to (partly because the system is
designed to work even if specific nodes fail), it lets the program ensure that a set of
keys will appear together on some node.


Many of Spark's operations involve shuffling data by key across the network. All of
these will benefit from partitioning. Examples of operations that benefit from
partitioning are `cogroup()`, `groupWith()`, `join()`, `leftOuterJoin()`, `rightOuterJoin()`, `groupByKey()`, `reduceByKey()`, `combineByKey()`, and `lookup()`.

By default PySpark uses hash partitioning as the partitioning function. A way to define a custom partition is by using the function `partitionBy()`. To use `partitionBy()` the RDD must consist of tuple objects. This function is a transformation, therefore a new RDD will be returned. In the following example we are going to see a default partitioning scheme of Spark as well as a custom partitioning.

Partitioning allows some Spark code to run more efficiently, in particular running 'pair' operations on pair RDD (eg. mapValues, reduceByKey) is guaranteed to produce no shuffling in the cluster and also preserve the partitions.

### Why do we need partitioning?

In [177]:
nums = [(k, str(v)) for k in range(3) for v in range(2)]
pairs = sc.parallelize(nums)

In [178]:
print("Number of partitions: ", pairs.getNumPartitions())
pairs.glom().collect()

Number of partitions:  1


[[(0, '0'), (0, '1'), (1, '0'), (1, '1'), (2, '0'), (2, '1')]]

#### Hash partitioning (default)

In [176]:
pairs = sc.parallelize(nums).partitionBy(3, lambda k: hash(k))

In [170]:
print("Number of partitions: ", pairs.getNumPartitions())
pairs.glom().collect()

Number of partitions:  3


[[(0, '0'), (0, '1')], [(1, '0'), (1, '1')], [(2, '0'), (2, '1')]]

#### Range partitioning

In [180]:
pairs = sc.parallelize(nums).partitionBy(3, lambda k: k > 1)

In [183]:
print("Number of partitions: ", pairs.getNumPartitions())
pairs.glom().collect()

Number of partitions:  3


[[(0, '0'), (0, '1'), (1, '0'), (1, '1')], [(2, '0'), (2, '1')], []]

### 3.8 Converting a user program into tasks

A Spark driver is responsible for converting a user program into units of physical execution called tasks. At a high level, all Spark programs follow the same structure: they create RDDs from some input, derive new RDDs from those using transformations, and perform actions to collect or save data. A Spark program implicitly creates a logical **directed acyclic graph (DAG)** of operations.
When the driver runs, it converts this logical graph into a physical execution plan.

Spark performs several optimizations, such as "pipelining" map transformations together to merge them, and converts the execution graph into a set of **stages**.
Each stage, in turn, consists of multiple tasks. The tasks are bundled up and prepared to be sent to the cluster. Tasks are the smallest unit of work in Spark; a typical user program can launch hundreds or thousands of individual tasks.

Each RDD maintains a pointer to one or more parents along with metadata about what
type of relationship they have. For instance, when you call `val b = a.map()` on an
RDD, the RDD `b` keeps a reference to its parent `a`. These pointers allow an RDD to be
traced to all of its ancestors.

The following phases occur during Spark execution:
* User code defines a DAG (directed acyclic graph) of RDDs. Operations on RDDs create new RDDs that refer back to their parents, thereby creating a graph.
* Actions force translation of the DAG to an execution plan. When you call an action on an RDD, it must be computed. This requires computing its parent RDDs as well. 
* Spark's scheduler submits a job to compute all needed RDDs. That job will have one or more stages, which are parallel waves of computation composed of tasks. Each stage will correspond to one or more RDDs in the DAG. A single stage can correspond to multiple RDDs due to pipelining.
* Tasks are scheduled and executed on a cluster
* Stages are processed in order, with individual tasks launching to compute segments of the RDD. Once the final stage is finished in a job, the action is complete.

If you visit the application's web UI (http://localhost:4040/jobs/), you will see how many stages occur in order to
fulfill an action. For more details about the content of this page, see [Spark job debugging](https://docs.microsoft.com/en-us/azure/hdinsight/hdinsight-apache-spark-job-debugging) for Azure Spark.

### 3.9 Exercise. 

1. Why is Spark faster than Hadoop MapReduce?
1. Study the examples above via Spark UI. Observe how many stages they have. 
1. Which of the graphs below are DAGs?

<img src="https://cloud.inf.ethz.ch/s/kkes72cJjsoHkbY/download" height="400">

### 3.10 True or False
Say if the following statements are *true* or *false*, and explain why.

1. Each RDD is split into multiple partitions, which may be computed on different nodes of the cluster.

    True

1. Transformations construct a new RDD from a previous one and immediately calculate the result

    False
    
1. Spark's RDDs are by default recomputed each time you run an action on them

    True???

1. After computing an RDD, Spark will store its contents in memory and reuse them in future actions.
    
    False
    
1. When you derive new RDDs using transformations, Spark keeps track of the set of dependencies between different RDDs.

    True
    

### Equivalence: function calls VS SQL

### Difference between RDD, DF, DS

## 4. TF-IDF in Spark (OPTIONAL)
In this exercise you will implement a simple query engine over the Gutenberg dataset using Spark.
The [Gutenberg dataset](https://www.gutenberg.org/) consists of 3036 free ebooks. The goal of this exercise is to develop a search engine to find the most relevant books given a text query.

### 4.1 Get the data
1. You can download the dataset (the smallest one) from: https://zenodo.org/record/3360392

2. Unzip and put all .txt files under a directory gutenberg

3. docker cp gutenberg jupyter:/home/jovyan/work

### 4.2 Understand TF-IDF

[TF-IDF](https://en.wikipedia.org/wiki/Tf%E2%80%93idf) is a statistic to determine the relative importance of the words in a set of documents. Is is computed as the product of two statistics, term frequency (`tf`) and inverse document frequency (`idf`). 

Given a word `t`, a document `d` (in this case, a book) and the collection of all documents `D` we can define `tf(t, d)` as the number of times `t` appears in `d`. This gives us some information about the content of a document but because some terms (eg. "the") are so common, term frequency will tend to incorrectly emphasize documents which happen to use the word "the" more frequently, without giving enough weight to the more meaningful terms.

The inverse document frequency `idf(t, D)` is a measure of how much information the word provides, that is, whether the term is common or rare across all documents. It can be computed as:

<img src="https://cloud.inf.ethz.ch/s/gw25WWcbd9iXBdK/download" width="300">

where $|D|$ is the total number of documents and the denominator represents how many documents contain the word $t$ at least once. However, this would cause a division-by-zero exception if the user query a word that never appear in the dataset. A better formulation would be:

<img src="https://cloud.inf.ethz.ch/s/fXffB5g59y3y2an/download" width="300">

Then, the `tdidf(t, d, D)` is calculated as follows:

<img src="https://cloud.inf.ethz.ch/s/2dAsg3k2QaL3XMz/download" width="300">

A high weight in `tfidf` is reached by a high term frequency (in the given document) and a low document frequency of the term in the whole collection of documents.

Having already implemented TF-IDF last week in pseudocode, in this week we are going to implement it in Spark. The following code snippet imports the whole dataset into an RDD.

In [None]:
# sc is automatically defined as SparkContext
# docs will be an RDD in the format [(docName, content)]
docs = sc.wholeTextFiles("gutenberg/*.txt", minPartitions=100)

# number of documents in the folder
docs_number = docs.count()

# display the [(docName, content)] values
#docs.collect()

#### TF-IDF solution code