#![Spark Logo](http://spark-mooc.github.io/web-assets/images/ta_Spark-logo-small.png) + ![Python Logo](http://spark-mooc.github.io/web-assets/images/python-logo-master-v3-TM-flattened_small.png)
# ** TP - RDD**

Le but est de prendre en main les transformations et les actions  
N'hésitez pas à regarder la doc ou à demander de l'aide :)

# [Lien vers la Doc PySpark.RDD](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html)

### SparkContext et SparkSession (Point d'entrée)
* Afficher `sc` et `spark`, quelle est la différence ? Utiliser `help` 
* Afficher la version de spark

In [0]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf


In [0]:
?spark

[0;31mType:[0m        SparkSession
[0;31mString form:[0m pyspark.sql.session.SparkSession instance
[0;31mDocstring:[0m  
The entry point to programming Spark with the Dataset and DataFrame API.

A SparkSession can be used to create :class:`DataFrame`, register :class:`DataFrame` as
tables, execute SQL over tables, cache tables, and read parquet files.
To create a :class:`SparkSession`, use the following builder pattern:

.. versionchanged:: 3.4.0
    Support Spark Connect.

.. autoattribute:: builder
   :annotation:

Examples
--------
Create a Spark session.

>>> spark = (
...     SparkSession.builder
...         .master("local")
...         .appName("Word Count")
...         .config("spark.some.config.option", "some-value")
...         .getOrCreate()
... )

Create a Spark session from a Spark context.

>>> sc = spark.sparkContext
>>> spark = SparkSession(sc)


In [0]:
?sc

[0;31mType:[0m        RemoteContext
[0;31mString form:[0m dbruntime.spark_connection.RemoteContext instance
[0;31mDocstring:[0m   <no docstring>


In [0]:
help(sc)

Help on RemoteContext in module dbruntime.spark_connection object:

class RemoteContext(pyspark.context.SparkContext)
 |  RemoteContext(master: Optional[str] = None, appName: Optional[str] = None, sparkHome: Optional[str] = None, pyFiles: Optional[List[str]] = None, environment: Optional[Dict[str, Any]] = None, batchSize: int = 0, serializer: 'Serializer' = CloudPickleSerializer(), conf: Optional[pyspark.conf.SparkConf] = None, gateway: Optional[py4j.java_gateway.JavaGateway] = None, jsc: Optional[py4j.java_gateway.JavaObject] = None, profiler_cls: Type[pyspark.profiler.BasicProfiler] = <class 'pyspark.profiler.BasicProfiler'>, udf_profiler_cls: Type[pyspark.profiler.UDFBasicProfiler] = <class 'pyspark.profiler.UDFBasicProfiler'>, memory_profiler_cls: Type[pyspark.profiler.MemoryProfiler] = <class 'pyspark.profiler.MemoryProfiler'>)
 |  
 |  Method resolution order:
 |      RemoteContext
 |      pyspark.context.SparkContext
 |      builtins.object
 |  
 |  Methods inherited from pyspar

#### Utiliser les RDD pour faire des transformations et des actions
* Créer une collection Python de 10.000 entiers
* Créer un RDD de base Spark à partir de cette collection
* Compter le nombre d'élement du RDD


``range(), parallelize(), count()``

In [0]:
# Créer une liste de 1 à 10.000
data = range(1, 10001)

# Créer un rdd à partir de la collection data
rdd = sc.parallelize(data)

# Compter le nombre d'éléments
rdd.count()

Out[33]: 10000

###  Collecter les données
* Quel est le premier élément du rdd ?
* Afficher les 20 premiers éléments du rdd
* Quelle différence entre take et takeOrdered ?
* Créer un sample aléatoire avec remise de 50%, à partir de 'rdd'
* Quelle est la différence entre takeSample() et sample()?
* Collecter le sample (ne jamais faire un collect sur un RDD/Dataset en entier)

* `first(), collect(), take(), takeSample(), sample(), top()`

In [0]:
rdd.first()

Out[5]: 1

In [0]:
rdd.take(20)

Out[6]: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]

In [0]:
rdd.takeSample(withReplacement=True, num=5000, seed=42)

Out[7]: [2738,
 6607,
 9999,
 6642,
 6351,
 8413,
 5689,
 8865,
 4461,
 1918,
 2475,
 2103,
 829,
 4719,
 1729,
 7557,
 5654,
 1086,
 702,
 4708,
 424,
 1087,
 2835,
 7111,
 8911,
 5411,
 2588,
 8034,
 7858,
 9770,
 3545,
 4446,
 8589,
 9766,
 3316,
 355,
 67,
 8662,
 9888,
 8383,
 9852,
 2550,
 7222,
 8775,
 7200,
 6823,
 6693,
 5404,
 4934,
 4427,
 6753,
 5232,
 7827,
 2123,
 4606,
 263,
 4253,
 6443,
 7875,
 863,
 9410,
 1084,
 6856,
 9263,
 6438,
 929,
 5285,
 4768,
 4826,
 3360,
 5983,
 5678,
 2641,
 9003,
 9110,
 5938,
 2587,
 1317,
 207,
 6792,
 9348,
 1683,
 1450,
 2329,
 6529,
 3023,
 3475,
 5771,
 4609,
 9957,
 9541,
 9364,
 779,
 7498,
 1355,
 7378,
 7818,
 3682,
 149,
 4807,
 2885,
 1543,
 1916,
 2021,
 2020,
 2125,
 334,
 4709,
 2849,
 5794,
 7728,
 8984,
 1590,
 1088,
 3457,
 7282,
 7542,
 349,
 5578,
 433,
 4667,
 8546,
 9918,
 6451,
 3125,
 6327,
 5139,
 7639,
 4057,
 5975,
 3224,
 8805,
 3205,
 9348,
 4789,
 5463,
 3859,
 6265,
 3105,
 8818,
 8289,
 5259,
 1129,
 181,


In [0]:
?rdd.top

[0;31mSignature:[0m [0;32mdef[0m [0mtop[0m[0;34m([0m[0mnum[0m[0;34m:[0m [0mint[0m[0;34m,[0m [0mkey[0m[0;34m:[0m [0mOptional[0m[0;34m[[0m[0mCallable[0m[0;34m[[0m[0;34m[[0m[0mT[0m[0;34m][0m[0;34m,[0m [0;34m'S'[0m[0;34m][0m[0;34m][0m[0;34m=[0m[0;32mNone[0m[0;34m)[0m [0;34m->[0m [0mList[0m[0;34m[[0m[0mT[0m[0;34m][0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Get the top N elements from an RDD.

.. versionadded:: 1.0.0

Parameters
----------
num : int
    top N
key : function, optional
    a function used to generate key for comparing

Returns
-------
list
    the top N elements

See Also
--------
:meth:`RDD.takeOrdered`
:meth:`RDD.max`
:meth:`RDD.min`

Notes
-----
This method should only be used if the resulting array is expected
to be small, as all the data is loaded into the driver's memory.

It returns the list sorted in descending order.

Examples
--------
>>> sc.parallelize([10, 4, 2, 12, 3]).top(1)
[12]
>>> sc.parallelize

In [0]:
rdd_sample = rdd.sample(withReplacement=True, fraction=0.5, seed=42)


In [0]:
rdd.top(1)

Out[11]: [10000]

### Fréquence de nombre aléatoire sur le sample
 * Pour chaque élément du sample, compter le nombre de fois qu'apparait chaque nombre
 * Quels sont les nombres qui apparaissent le plus fréquemment ?
 
`count(), countByValue()`

In [0]:
count_by_value = rdd_sample.countByValue()

In [0]:
sorted(count_by_value.items(), key=lambda x : x[1], reverse=True)

Out[17]: [(131, 5),
 (8989, 5),
 (205, 4),
 (1167, 4),
 (1196, 4),
 (1550, 4),
 (2043, 4),
 (2288, 4),
 (2832, 4),
 (3971, 4),
 (5317, 4),
 (5859, 4),
 (6541, 4),
 (6851, 4),
 (7116, 4),
 (7309, 4),
 (7554, 4),
 (8659, 4),
 (9873, 4),
 (9882, 4),
 (9982, 4),
 (82, 3),
 (169, 3),
 (199, 3),
 (208, 3),
 (311, 3),
 (469, 3),
 (545, 3),
 (613, 3),
 (726, 3),
 (888, 3),
 (1041, 3),
 (1143, 3),
 (1192, 3),
 (1212, 3),
 (1319, 3),
 (1324, 3),
 (1348, 3),
 (1533, 3),
 (1605, 3),
 (1689, 3),
 (1897, 3),
 (1925, 3),
 (1926, 3),
 (2028, 3),
 (2056, 3),
 (2091, 3),
 (2132, 3),
 (2153, 3),
 (2247, 3),
 (2356, 3),
 (2375, 3),
 (2391, 3),
 (2533, 3),
 (2608, 3),
 (2693, 3),
 (2792, 3),
 (2911, 3),
 (3033, 3),
 (3037, 3),
 (3381, 3),
 (3478, 3),
 (3479, 3),
 (3497, 3),
 (3763, 3),
 (3813, 3),
 (3879, 3),
 (3880, 3),
 (3915, 3),
 (3916, 3),
 (3925, 3),
 (3929, 3),
 (4040, 3),
 (4064, 3),
 (4294, 3),
 (4323, 3),
 (4333, 3),
 (4342, 3),
 (4404, 3),
 (4440, 3),
 (4480, 3),
 (4589, 3),
 (4854, 3),
 (4975, 

### Partitioning
* Combien de partitions possède le rdd ?
* Créer un nouveau rdd à partir de `rdd` de seulement 8 partitions et vérifier ce nombre

In [0]:
rdd.getNumPartitions()

Out[20]: 8

In [0]:
rdd_repartitioned = rdd

In [0]:
rdd_repartitioned = rdd.partitionBy(8)

In [0]:
type(rdd)

Out[50]: pyspark.rdd.PipelinedRDD

In [0]:
import pyspark

In [0]:
?pyspark.rdd.PipelinedRDD

[0;31mType:[0m        type
[0;31mString form:[0m <class 'pyspark.rdd.PipelinedRDD'>
[0;31mFile:[0m        /databricks/spark/python/pyspark/rdd.py
[0;31mLine:[0m        5390
[0;31mDocstring:[0m  
Examples
--------
Pipelined maps:

>>> rdd = sc.parallelize([1, 2, 3, 4])
>>> rdd.map(lambda x: 2 * x).cache().map(lambda x: 2 * x).collect()
[4, 8, 12, 16]
>>> rdd.map(lambda x: 2 * x).map(lambda x: 2 * x).collect()
[4, 8, 12, 16]

Pipelined reduces:

>>> from operator import add
>>> rdd.map(lambda x: 2 * x).reduce(add)
20
>>> rdd.flatMap(lambda x: [x, x]).reduce(add)
20


In [0]:
#rdd_repartitioned.count()

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-1905306230174712>:1[0m
[0;32m----> 1[0m [43mrdd_repartitioned[49m[38;5;241;43m.[39;49m[43mcount[49m[43m([49m[43m)[49m

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     47[0m [38;5;28;01mtry[39;00m:
[0;32m---> 48[0m     res [38;5;241m=[39m [43mfunc[49m[43m([49m[38;5;241;43m*[39;49m[43margs[49m[43m,[49m[43m [49m[38;5;241;43m*[39;49m[38;5;241;43m*[39;49m[43mkwargs[49m[43m)[49m
[1;32m     49[0m     logger[38;5;241m.[39mlog_success(
[1;32m     50[0m         module_name, class_name, function_name, time[38;5;241m.[39mperf_counter() [38;5;241m-[39m start, signature


### Map - Permet de générer un nouvel élément pour chaque élément du rdd
* Ajouter 1000 à chaque nombre du rdd
  * Si le nombre est pair alors le remplacer par 'toto'

`map(), lambda expression`

In [0]:
rdd.map(lambda x: x + 1000).map(lambda x: 'toto' if x%2==0 else x).take(10)

Out[59]: [1001, 'toto', 1003, 'toto', 1005, 'toto', 1007, 'toto', 1009, 'toto']

In [0]:
rdd.map(lambda x: 'toto' if x%2==0 else x +1000).take(5)

Out[60]: [1001, 'toto', 1003, 'toto', 1005]

### Reduce
* Faire la somme de tous les nombres pairs du RDD
 * en utilisant une lambda expression
 * en utilisant une fonction
 
`reduce(), lambda expression`

In [0]:
rdd.filter(lambda x: x%2==0).reduce(lambda x, y: x+y)

Out[65]: 25005000

In [0]:
def myfunc(x,y):
  return x+y

In [0]:
rdd.filter(lambda x: x%2==0).reduce(myfunc)

Out[67]: 25005000

### Supprimer les doublons et filtres
* Transformer `sentence` en une liste de mots distincts
* Filtrer sentence pour conserver uniquement les mots de 4 ou + caractères

`lambda expression, flatMap(), distinct(), filter()`

In [0]:
sentence = ["Mise en place des traitements Big Data avec Apache Spark", "Apache Spark vs Hadoop", "Le Big Data peut être défini par les 4V"]
rdd_sentence = sc.parallelize(sentence)


In [0]:
?rdd.flatMap

[0;31mSignature:[0m [0;32mdef[0m [0mflatMap[0m[0;34m([0m[0mf[0m[0;34m:[0m [0mCallable[0m[0;34m[[0m[0;34m[[0m[0mT[0m[0;34m][0m[0;34m,[0m [0mIterable[0m[0;34m[[0m[0mU[0m[0;34m][0m[0;34m][0m[0;34m,[0m [0mpreservesPartitioning[0m[0;34m:[0m [0mbool[0m[0;34m=[0m[0;32mFalse[0m[0;34m)[0m [0;34m->[0m [0;34m'RDD[U]'[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Return a new RDD by first applying a function to all elements of this
RDD, and then flattening the results.

.. versionadded:: 0.7.0

Parameters
----------
f : function
    a function to turn a T into a sequence of U
preservesPartitioning : bool, optional, default False
    indicates whether the input function preserves the partitioner,
    which should be False unless this is a pair RDD and the input

Returns
-------
:class:`RDD`
    a new :class:`RDD` by applying a function to all elements

See Also
--------
:meth:`RDD.map`
:meth:`RDD.mapPartitions`
:meth:`RDD.mapPartitionsWithInd

In [0]:
rdd_words = rdd_sentence.flatMap(lambda x: x.split()).distinct().filter(lambda x: len(x)>=4)

In [0]:
rdd_words.collect()

Out[87]: ['Mise',
 'Hadoop',
 'être',
 'Spark',
 'peut',
 'défini',
 'Data',
 'avec',
 'Apache',
 'traitements',
 'place']

In [0]:
rdd_sentence_distinct = <TODO>
print(rdd_sentence_distinct.collect())
#Result : ['Mise', 'vs', 'Hadoop', 'être', 'des', 'Spark', 'Le', 'peut', 'défini', 'les', '4V', 'Data', 'par', 'avec', 'Apache', 'en', 'traitements', 'place', 'Big']

In [0]:
def filter_word_len(word):
  

# Filtrer rdd_sentence_distinct
<TODO>

### Chainage de transformation
 * En chainant les transformations :
   * Diviser les nombres par 2
   * Filtrer pour conserver les nombres impairs 
   * Calculer la somme

In [0]:
<TODO>

** Lisibilité et code style **
* Il est possible d'écrire le code sous cette forme là
```
 sc.parallelize(data).map(XXXX).filter(YYYY).reduce(ZZZZ)
 
 ```
 OU

```
(sc
 .parallelize(data)
 .map(lambda y: y - 1)
 .filter(lambda x: x < 10)
 .collect())```

### Opérations sur les ensembles
* Calculer les valeurs distinctes de rdd_1
* Calculer l'union de rdd_1 et rdd_2
* Calculer l'intersection de rdd_1 et rdd_2
* Calculer la soustraction de rdd_1 et rdd_2 (càd les éléments qui sont dans rdd_1 mais pas dans rdd_2)
* Que fait rdd_1 + rdd_2 ?

`distinct, union, intersection, subtract`

In [0]:
# Charger le texte dans un RDD
rdd_1 = sc.parallelize(["spark","spark","python","sql","rdd"])
rdd_2 = sc.parallelize(["spark","sql","pandas"])

In [0]:
rdd_1.union(rdd_2)

Out[90]: UnionRDD[92] at union at NativeMethodAccessorImpl.java:0

### Calculs de stats
`stats(), mean(),sum()`

In [0]:
rdd.mean()

Out[92]: 5000.5

#### ** Des RDD un peu plus avancés : PairRDD **

** `groupByKey` and `reduceByKey` **
 
Let's investigate the additional transformations: [groupByKey()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.groupByKey) and [reduceByKey()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.reduceByKey).
 
Both of these transformations operate on pair RDDs.  A pair RDD is an RDD where each element is a pair tuple (key, value).  For example, `sc.parallelize([('a', 1), ('a', 2), ('b', 1)])` would create a pair RDD where the keys are 'a', 'a', 'b' and the values are 1, 2, 1.
The `reduceByKey()` transformation gathers together pairs that have the same key and applies a function to two associated values at a time. `reduceByKey()` operates by applying the function first within each partition on a per-key basis and then across the partitions.
While both the `groupByKey()` and `reduceByKey()` transformations can often be used to solve the same problem and will produce the same answer, the `reduceByKey()` transformation works much better for large distributed datasets. This is because Spark knows it can combine output with a common key on each partition *before* shuffling (redistributing) the data across nodes.  Only use `groupByKey()` if the operation would not benefit from reducing the data before the shuffle occurs.
 
Look at the diagram below to understand how `reduceByKey` works.  Notice how pairs on the same machine with the same key are combined (by using the lamdba function passed into reduceByKey) before the data is shuffled. Then the lamdba function is called again to reduce all the values from each partition to produce one final result.
 
![reduceByKey() figure](http://spark-mooc.github.io/web-assets/images/reduce_by.png)
 
On the other hand, when using the `groupByKey()` transformation - all the key-value pairs are shuffled around, causing a lot of unnecessary data to being transferred over the network.
 
To determine which machine to shuffle a pair to, Spark calls a partitioning function on the key of the pair. Spark spills data to disk when there is more data shuffled onto a single executor machine than can fit in memory. However, it flushes out the data to disk one key at a time, so if a single key has more key-value pairs than can fit in memory an out of memory exception occurs. This will be more gracefully handled in a later release of Spark so that the job can still proceed, but should still be avoided.  When Spark needs to spill to disk, performance is severely impacted.
 
![groupByKey() figure](http://spark-mooc.github.io/web-assets/images/group_by.png)
 
As your dataset grows, the difference in the amount of data that needs to be shuffled, between the `reduceByKey()` and `groupByKey()` transformations, becomes increasingly exaggerated.
 
Here are more transformations to prefer over `groupByKey()`:
  * [combineByKey()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.combineByKey) can be used when you are combining elements but your return type differs from your input value type.
  * [foldByKey()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.foldByKey) merges the values for each key using an associative function and a neutral "zero value".
 
Now let's go through a simple `groupByKey()` and `reduceByKey()` example.

In [0]:
pairRDD = sc.parallelize([('a', 1), ('a', 2), ('a', 3), ('b', 1)])

### Aggrégation (key, list)

In [0]:
pairRDD.groupByKey().map(lambda x : (x[0], list(x[1]))).collect()

In [0]:
pairRDD.groupByKey().mapValues(lambda x: list(x)).collect()

### Réduire le résultat par une somme (key, value)

In [0]:
pairRDD.reduceByKey(lambda x,y : x+y).collect()

In [0]:
pairRDD.groupByKey().map(lambda x: (x[0], sum(x[1]))).collect()

#### ** Caching RDDs **

For efficiency Spark keeps your RDDs in memory. By keeping the contents in memory, Spark can quickly access the data. However, memory is limited, so if you try to keep too many RDDs in memory, Spark will automatically delete RDDs from memory to make space for new RDDs. If you later refer to one of the RDDs, Spark will automatically recreate the RDD for you, but that takes time.
 
So, if you plan to use an RDD more than once, then you should tell Spark to cache that RDD. You can use the `cache()` operation to keep the RDD in memory. However, you must still trigger an action on the RDD, such as `collect()` for the RDD to be created, and only then will the RDD be cached. Keep in mind that if you cache too many RDDs and Spark runs out of memory, it will delete the least recently used (LRU) RDD first. Again, the RDD will be automatically recreated when accessed.
 
You can check if an RDD is cached by using the `is_cached` attribute, and you can see your cached RDD in the "Storage" section of the Spark web UI. If you click on the RDD's name, you can see more information about where the RDD is stored.

In [0]:
# Name the RDD
sample_rdd.setName('My Filtered RDD')
# Cache the RDD
sample_rdd.cache()
# Trigger an action
sample_rdd.collect()
# Is it cached
print(sample_rdd.is_cached)

** Unpersist and storage options **
 
Spark automatically manages the RDDs cached in memory and will save them to disk if it runs out of memory. For efficiency, once you are finished using an RDD, you can optionally tell Spark to stop caching it in memory by using the RDD's `unpersist()` method to inform Spark that you no longer need the RDD in memory.
 
You can see the set of transformations that were applied to create an RDD by using the `toDebugString()` method, which will provide storage information, and you can directly query the current storage information for an RDD using the `getStorageLevel()` operation.
 
** Advanced: ** Spark provides many more options for managing how RDDs are stored in memory or even saved to disk. You can explore the API for RDD's [persist()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.persist) operation using Python's [help()](https://docs.python.org/2/library/functions.html?highlight=help#help) command.  The `persist()` operation, optionally, takes a pySpark [StorageLevel](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.StorageLevel) object.

In [0]:
# If we are done with the RDD we can unpersist it so that its memory can be reclaimed
sample_rdd.unpersist()
# Storage level for a non cached RDD
print(sample_rdd.getStorageLevel())
sample_rdd.cache()
# Storage level for a cached RDD
print(sample_rdd.getStorageLevel())

In [0]:
# Super, tu peux maintenant passer au TP 2 !