# Numărarea cuvintelor

### Numărarea aparițiilor cuvintelor într-un text este, de obicei, primul exercițiu pentru aplicarea metodei map-reduce.

## Problema
**Input:** Un fișier text ce constă din cuvinte separate prin spații.  
**Output:** O listă de cuvinte și numărul lor de apariții. Lista va fi sortată descrescător după numărul de apariții.

Vom folosi fișierul ce conține cartea "Moby Dick" ca input.

In [None]:
#Porniti SparkContext cu 4 core-uri


### Definiți o funcție pentru afișarea corespunzătoare a planului de execuție

In [None]:
def pretty_print_plan(rdd):
    for x in rdd.toDebugString().decode().split('\n'):
        print(x)

### Utilizați metoda `textFile()` pentru a citi textul din fișier

In [None]:
%%time
text_file = sc.textFile("../Data/Moby-Dick.txt")

In [None]:
type(text_file)

## Pașii pentru numărarea cuvintelor

* se separă elementele unei linii în funcție de separatori (spații).
* se mapează `word` la `(word,1)`
* se numără aparițiile fiecărui cuvânt.

In [None]:
%%time
words =     text_file.flatMap(lambda line: line.split(" "))
#obtineti cuvintele nevide
# not_empty = 
#mapati word -> (word, 1)
#key_values= 
#numarati cuvintele, cu ajutorul reduceByKey, cu o functie lambda ca argument
#counts=    

### flatMap()
In linia de cod:
```python
words =     text_file.flatMap(lambda line: line.split(" "))
```
este utilizat `flatMap` în loc de `map`.

Motivul este acela că operația `line.split(" ")` generează o **listă** de string-uri, deci dacă am folosi `map` rezultatul ar fi un RDD de liste de cuvinte, nu un RDD de cuvinte.

Diferența dintre `map` și `flatMap` este aceea că `flatMap` asteaptă câte o listă care provine din rezultatul câte unui map și **concatenează** listele spre a forma RDD-ul.

## Planul de execuție
În ultima celulă am definit planul de execuție, dar nu l-am executat.

* Pregătirea planului a durat ~100ms, un interval de timp netrivial, dar mai scurt decat timpul necesar execuției.
* Vom analiza planul de execuție.

### Detaliile planului de execuție
Pentru a determina cărui RDD îi corespunde fiecare pas din plan, vom afișa planul de execuție pentru fiecare dintre RDD-uri.  

Se va observa că planurile de execuție pentru `words`, `not_empty` și `key_values` sunt aceleași.

In [None]:
pretty_print_plan(text_file)

In [None]:
pretty_print_plan(words)

In [None]:
pretty_print_plan(not_empty)

In [None]:
pretty_print_plan(key_values)

In [None]:
pretty_print_plan(counts)

| Execution plan   | RDD |  Comments |
| :---------------------------------------------------------------- | :------------: | :--- |
|`(2)_PythonRDD[6] at RDD at PythonRDD.scala:48 []`| **counts** | Final RDD|
|`_/__MapPartitionsRDD[5] at mapPartitions at PythonRDD.scala:436 []`| **---"---** |
|`_/__ShuffledRDD[4] at partitionBy at NativeMethodAccessorImpl.java:0 [`| **---"---** | RDD is partitioned by key |
|`_+-(2)_PairwiseRDD[3] at reduceByKey at <timed exec>:4 []`| **---"---** | Perform mapByKey |
|`____/__PythonRDD[2] at reduceByKey at <timed exec>:4 []`| **words, not_empty, key_values** | The result of  partitioning into words|
| | |  removing empties, and making into (word,1) pairs|
|`____/__../../Data/Moby-Dick.txt MapPartitionsRDD[1] at textFile at Nat`| **text_file** | The partitioned text |
|`____/__../../Data/Moby-Dick.txt HadoopRDD[0] at textFile at NativeMeth`| **---"---** | The text source |

## Execuția
Numărăm aparițiile fiecărui cuvânt. Abia acum modelul cu execuție _lazy_ efectuează operațiile, motiv pentru care execuția durează mai mult.

In [None]:
%%time
## Run #1
Count=counts.count()  # Count = the number of different words
Sum=counts.map(lambda x:x[1]).reduce(lambda x,y:x+y) # 
print('Different words=%5.0f, total words=%6.0f, mean no. occurances per word=%4.2f'%(Count,Sum,float(Sum)/Count))

### Amortizarea
Atunci când aceleași comenzi sunt efectuate în mod repetat pe aceleași date, timpul pentru execuțiile ulterioare tinde să descrească.

Celulele de mai jos sunt identice celei anterioare, cu o excepție la `Run #3`

Observăm că `Run #2` durează mai puțin decât `Run #1` cu toate că nu a fost aplicată explicit metoda `cache()`. Motivul este acela că Spark pune în cache (materializează) `key_values`, înainte de a executa `reduceByKey()`. Efectuarea operației _reduceByKey_ necesită o amestecare (_shuffle_), iar un _shuffle_ necesită materializarea RDD-ului de intrare. Prin urmare, observăm ca punerea în cache (caching-ul) are uneori loc chiar dacă nu a fost solicitat explicit.

In [None]:
%%time
## Run #2
Count=counts.count()
Sum=counts.map(lambda x:x[1]).reduce(lambda x,y:x+y)
print('Different words=%5.0f, total words=%6.0f, mean no. occurances per word=%4.2f'%(Count,Sum,float(Sum)/Count))

### Caching Explicit
În `Run #3` solicităm explicit punerea în _cache_ a lui `counts`. Aceasta va reduce timpul la următoarea execuție `Run #4`, însă nu în mod semnificativ.

In [None]:
%%time
## Run #3, cache
Count=counts.cache().count()
Sum=counts.map(lambda x:x[1]).reduce(lambda x,y:x+y)
print('Different words=%5.0f, total words=%6.0f, mean no. occurances per word=%4.2f'%(Count,Sum,float(Sum)/Count))

In [None]:
%%time
#Run #4
Count=counts.count()
Sum=counts.map(lambda x:x[1]).reduce(lambda x,y:x+y)
print('Different words=%5.0f, total words=%6.0f, mean no. occurances per word=%4.2f'%(Count,Sum,float(Sum)/Count))

In [None]:
%%time
#Run #5
Count=counts.count()
Sum=counts.map(lambda x:x[1]).reduce(lambda x,y:x+y)
print('Different words=%5.0f, total words=%6.0f, mean no. occurances per word=%4.2f'%(Count,Sum,float(Sum)/Count))

In [None]:
sc.stop()