# Plan przetwarzania

## **Zadanie:** obliczenie sumy kwadradów :  

$$\sum_{i=1}^n x_i^2 $$ 

Standardowa (**rzemieślnicza**) droga
1. Oblicz kwadrat każdego elementu.
2. Sumuj wartości kwadratów .

To podejście wymaga **przechowywania** wszystkich rezultatach pośrednich.

## Busy Evaluation

<img alt="" src="Figures/LazyEvaluation/Slide1.png" style="height:455px;width:900px" />

<p><img alt="" src="Figures/LazyEvaluation/Slide2.png" style="height:455px; width:900px" /></p>


<p><img alt="" src="Figures/LazyEvaluation/Slide3.png" style="height:455px; width:900px" /></p>



## Lazy evaluation:
 * odkłada przetwarzanie kwadratów dopóki rezultat nie jest potrzebny;
 * nie ma potrzeby przetrzymywania rezultatów pośrednich;
 * Przeskanowuje dane raz zamiast dwóch


<p><img alt="" src="Figures/LazyEvaluation/Slide4.png" style="height:455px; width:900px" /></p>


<p><img alt="" src="Figures/LazyEvaluation/Slide5.png" style="height:455px; width:900px" /></p>

* w przeciwieństwie do Pythona, komendy map/reduce nie zawsze przetwarzają obliczenia w momencie wykonania. 
* Zamiast tego budują tzw **execution plan**
* Tylko kiedy rezultat obliczeń jest potrzebny obliczenia zostają wykonane. 
* Podejście to nazywa się **lazy execution**

Zaletą lazy execution jest minimalizacja liczby dostępów do pamięci. Przykładowo : 

A=RDD.map(lambda x:x*x).filter(lambda x: x%2==0)
A.reduce(lambda x,y:x+y) 

Polecenia definiują określony plan. Dla każdej liczby x w RDD : 

1. Oblicz kwadrat x
2. Filtruj x*x którego wartość jest nieparzysta
3. Sumuj elementy które nie zostały przefiltrowane

**Execution plan** polega na obliczeniu kwadratów wszystkich elementów w RDD, zapisywaniu wyników w nowym RDD, następnie wykonywaniu przejścia filtrującego, generowaniu drugiego RDD, a następnie wykonywaniu określonego rezultatu. Wykonanie tego będzie wymagało trzykrotnej iteracji przez RDD i utworzenia 2 tymczasowych RDD. Ponieważ dostęp do pamięci jest wąskim gardłem w tego rodzaju obliczeniach, plan wykonania jest powolny.


Lepszy **execution plan** jest wykonanie kolejno wszystkich trzech operacji na każdym elemencie RDD, a następnie przejście do następnego elementu. Ten plan jest szybszy, ponieważ dokonujemy iteracji elementów RDD tylko raz i dlatego, że nie musimy zapisywać wyników pośrednich. Musimy utrzymywać tylko jedną zmienną: sumę cząstkową, a ponieważ jest to pojedyncza zmienna, możemy użyć rejestru CPU.

## Przykład

In [1]:
import sys
sys.path.append("../")

import findspark
from settings import SPARK_PATH
findspark.init(SPARK_PATH) # Ścieżka do Sparka 

In [2]:
#sc.stop()
from pyspark import SparkContext
sc = SparkContext(master = "local[4]")

In [3]:
sc

In [4]:
%%time 
RDD = sc.parallelize(range(1000000))

CPU times: user 2.59 ms, sys: 3.12 ms, total: 5.71 ms
Wall time: 788 ms


In [5]:
#print execution plan
print(RDD.toDebugString().decode())

(4) PythonRDD[1] at RDD at PythonRDD.scala:53 []
 |  ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:247 []


In [6]:
from math import cos
def taketime(i):
    [cos(j) for j in range(100)]
    return cos(i)

In [7]:
%%time
taketime(1000)

CPU times: user 47 µs, sys: 1e+03 ns, total: 48 µs
Wall time: 52.9 µs


0.5623790762907029

### Jednostki czasu
* 1 second = 1000 Milli-second ($ms$)
* 1 Millisecond = 1000 Micro-second ($\mu s$)
* 1 Microsecond = 1000 Nano-second ($ns$)

### Clock Rate
Jeden cykl 3GHz cpu zabiera $\frac{1}{3} ns$

`taketime(1000)` zabiera około 25 $\mu s$ = 75,000 cykli zegarowych

### The `map` operation.

In [8]:
%%time
Interm=RDD.map(lambda x: taketime(x))

CPU times: user 32 µs, sys: 1 µs, total: 33 µs
Wall time: 40.1 µs


In [9]:
Interm

PythonRDD[2] at RDD at PythonRDD.scala:53

## Dlaczego tak szybko ? :)

* Oczekujemy że operacja map zabierze 1,000,000 * 25 µs = 25 Sekund
* Dlaczego poprzednie polecenie zabrało 16 us ? 
* Ponieważ nie zostało dokonane żadne przetwarzanie 
* Potrzebne polecenie zdefiniowało **execution plan** lecz nie przetwarza jeszcze niczego 

In [10]:
print(Interm.toDebugString().decode())

(4) PythonRDD[2] at RDD at PythonRDD.scala:53 []
 |  ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:247 []


________(4) odnosi się do liczby partycji 

### Aktualny execution plan

<p><img alt="" src="Figures/ExecutionPlan/Slide1.jpg" style="height:120px; width:900px" /></p>


In [11]:
%%time
print('out=',Interm.reduce(lambda x,y:x+y))

out= -0.2887054679684655
CPU times: user 16.6 ms, sys: 7.66 ms, total: 24.3 ms
Wall time: 15.4 s


### Dlaczego tak szybko ? Part 2 
* Oczekujemy że operacja mapowania zabierze 1,000,000 * 25 us = 25 sekund
* Map+reduce zabiera tylko około ~4 seconds
* Why?

## Przetwarzanie różnych obliczeń bazujących na tym samym planie przetwarzania.¶

* Plan przetwarzania zdefiniowany przez Interm może być przetwarzany więcej niż raz,

* Przykład : oblicz wszystkie wyjścia funkcji map które są większe niż zero

In [12]:
%%time
print('out=',Interm.filter(lambda x:x>0).count())

out= 500000
CPU times: user 17.4 ms, sys: 5.03 ms, total: 22.5 ms
Wall time: 12.2 s


### Cena za brak materializacji
* Czas przetwarzania (3.04 sec) jest podobny do tego przy reduce (3.77 sec).
* Dzieje się tak ponieważ rezultat Interm nie został zapisany w pamięci(zmaterializowany)
* W związku z czym funkcja map musi być przetworzona jeszcze raz 

Środkowy blok: `Map(Taketime)` jest przetwarzany dwukrotnie.
<p><img alt="" src="Figures/ExecutionPlan/Slide2.jpg" style="height:200px; width:900px" /></p>

# Caching

### Podstawowa idea

<p><img alt="" src="Figures/ExecutionPlan/basic.jpg"/></p>

<p><img alt="" src="Figures/ExecutionPlan/basic_idea.png" style="height:500px; /></p>

<p><img alt="" src="Figures/ExecutionPlan/cache_hit.png" style="height:500px; /></p>

<p><img alt="" src="Figures/ExecutionPlan/cache_miss.png"/></p>

<p><img alt="" src="Figures/ExecutionPlan/cms1.png"/></p>

<p><img alt="" src="Figures/ExecutionPlan/cms2.png"/></p>

* Przetwarzanie jest **efektywne jeżeli większość dostępów to cache hit**
* W momencie w którym następuje cache miss obliczenia zajmują o wiele więcej czasu przez latencje spowodowaną operacją odczytu i zapisu danych do pamięci cache 

### Cache'owanie rezultatów pośrednich

* W wyzej wymienionym planie przetwarzania chcemy zachować pośrednie rezultaty Interm w celu ponownego użycia go bez wykonywania tych samych obliczeń n-razy
* Metoda `cache()` wkazuje na to że wygenerowane RDD w tym planie przetwarzania powinno być przechowane w pamięci. Pamiętaj jednak że jest to **plan cache'owania**. Faktyczne cache'owanie będzie wykonywane w mommencie którym rezultat będzie potrzebny

In [13]:
%%time
Interm=RDD.map(lambda x: taketime(x)).cache()

CPU times: user 9.67 ms, sys: 2.93 ms, total: 12.6 ms
Wall time: 37.2 ms


Poprzez dodanie cache po `Map(Taketime)`, zapisujemy rezultat mapowania dla kolejnych obliczeń
<p><img alt="" src="Figures/ExecutionPlan/Slide3.jpg" style="height:200px; width:900px" /></p>

### Plan cache'owania

In [14]:
print(Interm.toDebugString().decode())

(4) PythonRDD[5] at RDD at PythonRDD.scala:53 [Memory Serialized 1x Replicated]
 |  ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:247 [Memory Serialized 1x Replicated]


### Porównanie planów z i bez cache'u

**Plan z Cache**
```
(4) PythonRDD[5] at RDD at PythonRDD.scala:48 [Memory Serialized 1x Replicated]
 |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:489 [Memory Serialized 1x Replicated]
```  
**Plan bez Cache**
```
(4) PythonRDD[2] at RDD at PythonRDD.scala:48 []
 |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:489 []
```
Różnica polega na tym że plan dla obu RDD zawiera **[Memory Serialized 1x Replicated]** co jest planem materializacji RDD podczas przetwarzania.

### Tworzenie cache
Poniższa komenda przetwarza pierwszy map-reduce cache'uje rezultat mapowania w pamięci

In [15]:
%%time
print('out=',Interm.reduce(lambda x,y:x+y))

out= -0.2887054679684655
CPU times: user 10.6 ms, sys: 4.2 ms, total: 14.8 ms
Wall time: 12.6 s


### Wykorzystanie cache

Tym razem Interm jest buforowany. Dlatego drugie użycie Interm jest znacznie szybsze niż wtedy, gdy nie używaliśmy cache

In [16]:
%%time
print('out=',Interm.filter(lambda x:x>0).count())

out= 500000
CPU times: user 14.4 ms, sys: 4.71 ms, total: 19.1 ms
Wall time: 426 ms


# Podsumowanie 
* Spark używa **leniwej ewaluacji** w celu zaoszczędzenia czasu i pamięci.
* Kiedy to samo RDD jest potrzebne jako wejście dla kilku operacji przetwarzania, dobrą praktyką jest **zachować  RDD w pamięci cache** w celu zwiększenia prędkości operacji odczytu i zapisu danych