<a href="https://colab.research.google.com/github/GrigoreGeorgeAlexandru/Colab-projects/blob/main/Laborator2_B_SparkContext_Workers_Lazy_Evaluations_stud.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# SparkContext - numărul de noduri _worker_ și evaluarea _lazy_

## Verificarea impactului numărului de _workers_
La inițializarea unui `SparkContext` putem specifica numărul de noduri _worker_. În general, este recomandat să avem un nod _worker_/_core_ al mașinii, însă acest număr poate fi diferit. În următoarele celule de cod vom examina impactul numărului de _workers_ pe câteva operații paralelizate.

In [None]:
from time import time
from pyspark import SparkContext

In [None]:
for j in range(1,5):
    sc= SparkContext(master = "local[%d]"%(j))
    print(sc.defaultParallelism)
    t0=time()
    for i in range(10):
        sc.parallelize([1,2]*10000).reduce(lambda x,y:x+y)
    print(f"{j} executors, time = {time()-t0}")
    sc.stop()

In [None]:
#sc.defaultParallelism
#sc.stop()
#t1=time()
time()-t1

#### Analizăm următorul rezultat: <br>1 executors, time = 1.6259338855743408<br>2 executors, time = 0.8395042419433594<br>3 executors, time = 0.7692611217498779<br>4 executors, time = 0.7232298851013184
#### Observăm că durează aproape dublu pentru 1 _worker_, iar apoi valorile sunt apropiate pentru 2, 3, 4 _workers_. Aceasta are loc deoarece codul este rulat pe o  mașină virtuală Linux ce utilizează doar 2 core-uri ale mașinii gazdă. În cazul în care codul de mai sus este rulat pe o mașină cu 4 core-uri, se vor observa îmbunătățiri până la 4 core-uri, urmate de o aplatizare. <br> Utilizarea mai multor workers/core nu este benefică deoarece, în acel caz, nu se face decât o schimbare de context ce nu accelerează calculul paralel.

## Ideea evaluării de tip _lazy_ : amânarea execuției până la momentul la care aceasta este cu adevărat necesară

### Creați un RDD cu 1 million de elemente

In [None]:
sc = SparkContext(master="local[2]")

In [None]:
%%time
rdd1 = sc.parallelize(range(1000000))

Wall time: 4 ms


### Creați o funcție denumită `taketime` ce calculează valoarea cosinus a primelor 100 de valori întregi, iar la final returnează cos(x), unde x a fost primit ca parametru.

### Cât timp durează execuția lui `taketime`?

In [None]:
%%time
taketime(2)

### Aplicați operația `map` utilizând funcția anterioară

In [None]:
%%time
interim = rdd1.map(lambda x: taketime(x))

Wall time: 0 ns


#### Se observă că fiecare execuție a lui _taketime_ durează similar, iar operația _map_ pe un RDD cu 1000000 elemente are o durată asemănătoare. <br><br>Acest lucru are loc datorită evaluării _lazy_, adică nimic nu este calculat în celula precedentă, ci are loc doar o planificare a execuției. Variabila `interim` nu pointează către o structură de date, ci către un plan de execuție exprimat prin intermediul unui graf de dependențe. Acest graf de dependențe definește modul în care un RDD este calculat pornind de la un altul.

### Putem vizualiza graful de dependențe utilizând metoda `toDebugString`

In [None]:
print(interim.toDebugString().decode())

![](https://raw.githubusercontent.com/tirthajyoti/Spark-with-Python/master/Images/RDD_dependency_graph.PNG)

### Execuția efectivă prin metoda `reduce`

In [None]:
%%time
print('output =',interim.reduce(lambda x,y:x+y))

output = -0.28870546796847574
Wall time: 9.06 s


In [None]:
1000000*31e-6

31.0

#### Remarcăm timpul relativ scurt, având în vedere că se efectueaza 1000000 operații cu funcția _taketime_. Acesta este rezultatul execuției paralele cu 2 core-uri.

### În exemplul anterior nu am salvat (materializat) niciun rezultat intermediar din `interim`, deci o altă operație simplă (precum numărarea elementelor pozitive) va dura aproape la fel de mult timp.

In [None]:
%%time
print(interim.filter(lambda x:x>0).count())

500000
Wall time: 9.37 s


## Păstrarea în cache (_caching_) pentru reducerea timpului de calcul pe operații similare (demersul presupune utilizarea memoriei)

### Efectuați calculele anterioare utilizând metoda `cache` pentru a determina graful de dependențe să planifice _caching_-ul

In [None]:
%%time
interim = rdd1.map(lambda x: taketime(x)).cache()

Wall time: 12 ms


In [None]:
#Afișați graful de dependențe pentru interim


(2) PythonRDD[5] at RDD at PythonRDD.scala:53 [Memory Serialized 1x Replicated]
 |  ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:262 [Memory Serialized 1x Replicated]


In [None]:
%%time
#Obțineți suma elementelor cu ajutorul metodei reduce și a cache-ului


output = -0.28870546796847574
Wall time: 10.2 s


### Aplicați metoda `filter` pentru numărarea elementelor pozitive. Calculul se va realiza cu ajutorul rezultatului din cache.

In [None]:
%%time
#Aplicați metoda filter


Wall time: 0 ns


#### Se observă că durata execuției a fost mai scurtă în condițiile utilizării rezultatului din _cache_, ce a putut fi folosit pentru compararea cu 0 și numarare.

In [None]:
sc.stop()