## Execution Plans, Lazy Evaluation and Caching

### **Task:** calculate the sum of squares :  $$\sum_{i=1}^n x_i^2 $$  
The standard (or **busy**) way to do this is
1. Calculate the square of each element.
2. Sum the squares.

This requires **storing** all intermediate results.

<img alt="" src="Figures/LazyEvaluation/Slide1.png" style="height:455px;width:900px" />

<p><img alt="" src="Figures/LazyEvaluation/Slide2.png" style="height:455px; width:900px" /></p>


<p><img alt="" src="Figures/LazyEvaluation/Slide3.png" style="height:455px; width:900px" /></p>


### **lazy** evaluation:
 
* **postpone** computing the square until result is needed.
* No need to store intermediate results.
* Scan through the data once, rather than twice.

<p><img alt="" src="Figures/LazyEvaluation/Slide4.png" style="height:455px; width:900px" /></p>


<p><img alt="" src="Figures/LazyEvaluation/Slide5.png" style="height:455px; width:900px" /></p>


### Lazy Evaluation

Unlike a regular python program, map/reduce commands do not always perform any computation when they are executed. Instead, they construct something called an **execution plan**. Only when a result is needed does the computation start. This approach is also called **lazy execution**.

The benefit from lazy execution is in minimizing the the number of memory accesses. Consider for example the following map/reduce commands:
```python
A=RDD.map(lambda x:x*x).filter(lambda x: x%2==0)
A.reduce(lambda x,y:x+y) 
```

The commands defines the following plan. For each number `x` in the RDD:
1. Compute the square of `x`
2. Filter out `x*x` whose value is odd.
3. Sum the elements that were not filtered out.

A naive execution plan is to square all items in the RDD, store the results in a new RDD, then perform a filtering pass, generating a second RDD,  and finally perform the summation. Doing this will require iterating through the RDD three times, and creating 2 interim RDDs. As memory access is the bottleneck in this type of computation, the execution plan is slow.

A better execution plan is to perform all three operations on each element of the RDD in sequence, and then move to the next element. This plan is faster because we iterate through the elements of the RDD only once, and because we don't need to save the intermediate results. We need to maintain only one variable: the partial sum, and as that is a single variable, we can use a CPU register.

For more on RDDs and lazy evaluation see [here in the spark manual](http://spark.apache.org/docs/latest/programming-guide.html#rdd-operations)

## Experimenting with Lazy Evaluation

#### The `%%time` magic

The `%%time` command is a *cell magic* which measures the execution time of the cell. We will mostly be interested in the wall time, which includes the time it takes to move data in the memory hierarchy.

For more on jupyter magics [See here](https://ipython.org/ipython-doc/3/interactive/magics.html)

### Preparations
In the following cells we create an RDD and define a function which wastes some time and then returns `cos(i)`. We want the function to waste some time so that the time it takes to compute the `map` operation is significant.

In [1]:
from pyspark import SparkContext
sc = SparkContext(master="local[4]")  #note that we set the number of workers to 3

We create an RDD with one million elements to amplify the effects of lazy evaluation and caching.

In [2]:
%%time
RDD=sc.parallelize(range(1000000))

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 604 ms


It takes about 01.-0.5 sec.  to create the RDD. 

In [3]:
print(RDD.toDebugString().decode())

(4) PythonRDD[1] at RDD at PythonRDD.scala:48 []
 |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:175 []


### Define a computation
The role of the function `taketime` is to consume CPU cycles.

In [4]:
from math import cos
def taketime(i):
    [cos(j) for j in range(100)]
    return cos(i)

In [5]:
%%time
taketime(1)

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 52 µs


0.5403023058681398

### Time units
* 1 second = 1000 Milli-second ($ms$)
* 1 Millisecond = 1000 Micro-second ($\mu s$)
* 1 Microsecond = 1000 Nano-second ($ns$)

### Clock Rate
One cycle of a 3GHz cpu takes $\frac{1}{3} ns$

`taketime(1000)` takes about 25 $\mu s$ = 75,000 clock cycles.

### The `map` operation. 

In [6]:
%%time
Interm=RDD.map(lambda x: taketime(x))

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 33.4 µs


### How come so fast?
* We expect this map operation to take 1,000,000 * 25 $\mu s$ = 25 Seconds.

* **Why** did the previous cell take just 29 $\mu s$?

* Because **no computation was done** 

* The cell defined an **execution plan**, but did not execute it yet.

**Lazy Execution** refers to this type of behaviour. The system delays actual computation until the latest possible moment. Instead of computing the content of the RDD, it adds the RDD to the **execution plan**.

Using Lazy evaluation of a plan has two main advantages relative to immediate execution of each step:
1. A single pass over the data, rather than multiple passes.
2. Smaller memory footprint becase no intermediate results are saved.

### Execution Plans
At this point the variable `Interm` does not point to an actual data structure. Instead, it points to an execution plan expressed as a **dependence graph**. The dependence graph defines how the RDDs are computed from each other.

The dependence graph associated with an RDD can be printed out using the method `toDebugString()`.

In [7]:
print(Interm.toDebugString().decode())

(4) PythonRDD[2] at RDD at PythonRDD.scala:48 []
 |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:175 []


**Interm=** `(4) PythonRDD[2] at RDD at PythonRDD.scala:48 []`  
`______(4)` corresponds to the number of partitions 
  
**RDD   ** `=   |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:489 []`

At this point only the two left blocks of the plan have been declared.
<p><img alt="" src="Figures/ExecutionPlan/Slide1.jpg" style="height:120px; width:900px" /></p>


### Actual execution
The `reduce` command needs to output an actual output, **spark** therefor has to actually execute the `map` and the `reduce`. Some real computation needs to be done, which takes about 1 - 3 seconds (Wall time) depending on the machine used and on it's load.

In [8]:
%%time
print('out=',Interm.reduce(lambda x,y:x+y))

out= -0.2887054679684464
CPU times: user 10 ms, sys: 10 ms, total: 20 ms
Wall time: 25 s


### How come so fast? (take 2)
* We expect this map operation to take 1,000,000 * 25 $\mu s$ = 25 Seconds.
* Map+reduce takes only ~4 second. 
* Why?

* Because we have 4 workers, rather than one.
* Because the measurement of a single iteration of `taketime` is an overestimate.

### Executing a different calculation based on the same plan.
The plan defined by `Interm` might need to be executed more than once.

**Example:** compute the number of map outputs that are larger than zero.

In [9]:
%%time
print('out=',Interm.filter(lambda x:x>0).count())

out= 500000
CPU times: user 20 ms, sys: 0 ns, total: 20 ms
Wall time: 22.7 s


### The price of not materializing
* The run-time (3.4 sec) is similar to that of the reduce (4.4 sec).
* Because the intermediate results in `Interm` have not been saved in memory (materialized)
* They need to be recomputed.

The middle block: `Map(Taketime)` is executed twice. Once for each final step.
<p><img alt="" src="Figures/ExecutionPlan/Slide2.jpg" style="height:200px; width:900px" /></p>

### Caching intermediate results
* We sometimes want to keep the intermediate results in memory so that we can reuse them later without recalculating. * This will reduce the running time, at the cost of requiring more memory.
* The method `cache()` indicates that the RDD generates in this plan should be stored in memory. Note that this is a **plan to cache**. The actual caching will be done only when the final result is needed.

In [10]:
%%time
Interm=RDD.map(lambda x: taketime(x)).cache()

CPU times: user 10 ms, sys: 0 ns, total: 10 ms
Wall time: 47.1 ms


By adding the Cache after `Map(Taketime)`, we save the results of the map for the second computation.
<p><img alt="" src="Figures/ExecutionPlan/Slide3.jpg" style="height:200px; width:900px" /></p>

### Plan to cache
The definition of `Interm` is almost the same as before. However, the *plan* corresponding to `Interm` is more elaborate and contains information about how the intermediate results <span class="girk">will be</span> cached and replicated.

Note that `PythonRDD[4]` is now <span class="mark">[Memory Serialized 1x Replicated]</span>

In [11]:
print(Interm.toDebugString().decode())

(4) PythonRDD[5] at RDD at PythonRDD.scala:48 [Memory Serialized 1x Replicated]
 |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:489 [Memory Serialized 1x Replicated]


#### Comparing plans with and without cache
Plan with Cache
```
(4) PythonRDD[33] at RDD at PythonRDD.scala:48 [Memory Serialized 1x Replicated]
 |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:489 [Memory Serialized 1x Replicated]
```  
Plan without Cache
```
(4) PythonRDD[2] at RDD at PythonRDD.scala:48 []
 |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:489 []
```
The difference is that the plan for both RDDs includes **[Memory Serialized 1x Replicated]** which is the plan to materialize both RDDs when they are computed.

### Creating the cache
The following command executes the first map-reduce command **and** caches the result of the `map` command in memory. 

In [12]:
%%time
print('out=',Interm.reduce(lambda x,y:x+y))

out= -0.2887054679684655
CPU times: user 5.43 ms, sys: 3.79 ms, total: 9.22 ms
Wall time: 3.59 s


### Using the cache
This time `Interm` is cached. Therefor the second use of `Interm` is much faster than when we did not use `cache`: 0.25 second instead of 1.9 second. (your milage may vary depending on the computer you are running this on).

In [13]:
%%time
print('out=',Interm.filter(lambda x:x>0).count())

out= 500000
CPU times: user 5.31 ms, sys: 2.97 ms, total: 8.28 ms
Wall time: 121 ms


## Summary
* Spark uses **Lazy Evaluation** to save time and space.
* When the same RDD is needed as input for several computations, it can be better to keep it in memory, also called `cache()`.
* Next Video, Partitioning and Gloming

## Partitioning and Gloming

* When an RDD is created, you can specify the number of partitions.
* The default is the number of workers defined when you set up `SparkContext`

In [14]:
A=sc.parallelize(range(1000000))
print(A.getNumPartitions())

4


We can repartition `A` into a different number of partitions.

In [15]:
D=A.repartition(10)
print(D.getNumPartitions())

10


We can also define the number of partitions when creating the RDD.

In [16]:
A=sc.parallelize(range(1000000),numSlices=10)
print(A.getNumPartitions())

10


### Why is the #Partitions important?
* They define the unit the executor works on.
* You should have at least as pany partitions as workers.
* Smaller partitions can allow more parallelization.

## Repartitioning for Load Balancing

Suppose we start with 10 partitions, all with exactly the same number of elements

In [17]:
A=sc.parallelize(range(1000000))\
    .map(lambda x:(x,x)).partitionBy(10)
print(A.glom().map(len).collect())

[100000, 100000, 100000, 100000, 100000, 100000, 100000, 100000, 100000, 100000]


* Suppose we want to use `filter()` to select some of the elements in `A`.
* Some partitions might have more elements remaining than others.

In [18]:
#select 10% of the entries
B=A.filter(lambda pair: pair[0]%5==0)
# get no. of partitions
print(B.glom().map(len).collect())

[100000, 0, 0, 0, 0, 100000, 0, 0, 0, 0]


* Future operations on `B` will use only two workers.
* The other workers will do nothing,   
  because their partitions are empty.

* To fix the situation we need to repartition the RDD.  
* One way to do that is to repartition using a new key.

* The method **`.partitionBy(k)`** expects to get a **`(key,value)`** RDD where keys are integers.  
* Partitions the RDD into **`k`** partitions.
* The element **`(key,value)`** is placed into partition no.  **`key % k`**

In [19]:
C=B.map(lambda pair:(pair[1]/10,pair[1])).partitionBy(10) 
print(C.glom().map(len).collect())

[20000, 20000, 20000, 20000, 20000, 20000, 20000, 20000, 20000, 20000]


Another approach is to use random partitioning using **`repartition(k)`**
* An **advantage** of random partitioning is that it does not require defining a key.
* A **disadvantage** of random partitioning is that you have no control on the partitioning.

In [20]:
C=B.repartition(10)
print(C.glom().map(len).collect())

[20000, 20000, 20000, 20000, 20000, 20000, 20000, 20000, 20000, 20000]


### Glom()
* In general, spark does not allow the worker to refer to specific elements of the RDD.
* Keeps the language clean, but can be a major limitation.

* **glom()** transforms each partition into a tuple (immutabe list) of elements.
* Creates an RDD of tules. One tuple per partition.
* workers can refer to elements of the partition by index. 
* but you cannot assign values to the elements, the RDD is still immutable.

* Now we can understand the command used above to count the number of elements in each partition.
* We use `glom()` to make each partition into a tuple.
* We use `len` on each partition to get the length of the tuple - size of the partition.
* We `collect` the results to print them out.

In [21]:
print(C.glom().map(len).collect())

[20000, 20000, 20000, 20000, 20000, 20000, 20000, 20000, 20000, 20000]


### A more elaborate example
There are many things that you can do using `glom()`.  

Below is an example, can you figure out what it does?

In [22]:
def getPartitionInfo(G):
    d=0
    if len(G)>1: 
        for i in range(len(G)-1):
            d+=abs(G[i+1][1]-G[i][1]) # access the glomed RDD that is now a  list
        return (G[0][0],len(G),d)
    else:
        return(None)

output=B.glom().map(lambda B: getPartitionInfo(B)).collect()
print(output)

[(0, 100000, 999990), None, None, None, None, (5, 100000, 999990), None, None, None, None]


## Summary
* We learned why partitions are important and how to control them.
* We Learned how `glom()` can be used to allow workers to access their partitions as lists.