# Introduction aux RDD avec PySpark

Ce notebook présente les **RDD (Resilient Distributed Datasets)** dans Apache Spark, en version **PySpark**.

Objectifs :
- Comprendre ce qu’est un **RDD** et ses propriétés (résilient, distribué, immuable, lazy).
- Savoir **créer** des RDD (à partir de collections locales, de fichiers texte).
- Utiliser les **transformations** (`map`, `filter`, `flatMap`, `reduceByKey`, …).
- Utiliser les **actions** (`collect`, `count`, `reduce`, `take`, …).
- Comprendre la **persistance** (`cache`, `persist`).
- Construire un **exemple complet** : *Word Count*.


Prérequis :
- Python installé
- PySpark installé 


In [1]:
from pyspark.sql import SparkSession

# Création d'une SparkSession locale
spark = SparkSession.builder.appName("TutoRDD_PySpark").master("local[*]").getOrCreate()

# Récupération du SparkContext (point d'entrée RDD)
sc = spark.sparkContext

print(spark)
print(sc)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/11/19 14:42:20 WARN Utils: Your hostname, DavidSimplon, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/11/19 14:42:20 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/19 14:42:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


<pyspark.sql.session.SparkSession object at 0x7699e9044e90>
<SparkContext master=local[*] appName=TutoRDD_PySpark>


## 1. Qu’est-ce qu’un RDD ?

Un **RDD (Resilient Distributed Dataset)** est la **structure de données de base** de Spark.

Un RDD est :

- **Resilient (tolérant aux pannes)**  
  - Spark peut **recalculer** les partitions perdues à partir de l’historique des transformations (*lineage*).
  - Cet historique est représenté par un **DAG (Directed Acyclic Graph)**.

- **Distributed (distribué)**  
  - Les données sont **réparties en partitions** sur plusieurs nœuds du cluster.
  - Les calculs sont exécutés **en parallèle** sur ces partitions.

- **Immutable (immuable)**  
  - Un RDD ne peut pas être modifié une fois créé.
  - Chaque transformation (ex : `map`, `filter`) **crée un nouveau RDD**.

- **Lazy (évaluation paresseuse)**  
  - Les transformations ne sont **pas exécutées immédiatement**.
  - Spark construit un **plan d’exécution (DAG)**.
  - Le calcul n’est déclenché que lorsqu’on appelle une **action** (`collect`, `count`, etc.).

En résumé :  
> Un RDD est une **abstraction de données distribuées**, immuables et tolérantes aux pannes, manipulées via des **transformations** et des **actions**.


In [None]:
# Création d'un RDD à partir d'une collection locale Python
rdd = sc.parallelize([1, 2, 3, 4, 5])

print("Type de rdd :", type(rdd))
print("Contenu du RDD :", rdd.collect())

## 2. Créer des RDD

Deux méthodes classiques pour créer des RDD :

1. **`sc.parallelize(seq)`**  
   À partir d’une **collection locale** (liste Python, etc.).  
   Utile pour des petits exemples, tests, prototypage.

2. **`sc.textFile(path)`**  
   À partir d’un **fichier texte** (local, HDFS, S3, …).  
   Retourne un **RDD de chaînes** (`RDD[str]`), une ligne par élément.


In [None]:
# Exemple avec parallelize
data = list(range(1, 6))
rdd_numbers = sc.parallelize(data)

print("RDD numbers :", rdd_numbers.collect())

In [None]:
# Lecture d'un fichier texte pour créer un RDD
rdd_lines = sc.textFile("data.txt")

print("Nombre de lignes dans data.txt :", rdd_lines.count())
print("Quelques lignes :")
for line in rdd_lines.take(5):
    print("->", line)

## 3. Transformations et actions sur les RDD

Les opérations sur les RDD se classent en deux catégories :

### 3.1 Transformations

- **Créent un nouveau RDD** à partir d’un RDD existant.
- **Évaluation paresseuse** : elles ne sont **pas exécutées tout de suite**.
- Exemples : `map`, `filter`, `flatMap`, `distinct`, `union`, `intersection`,  
  `groupBy`, `groupByKey`, `reduceByKey`, `sortBy`, …

### 3.2 Actions

- **Déclenchent réellement l’exécution** des transformations.
- Renvoient un **résultat au driver** ou écrivent des données.
- Exemples : `collect`, `count`, `take`, `reduce`, `saveAsTextFile`, …

> Les **transformations décrivent le “quoi faire”**,  
> les **actions déclenchent le “fais-le maintenant”**.


In [None]:
rdd_base = sc.parallelize([1, 2, 3, 4, 5])

# Deux transformations (lazy) : rien n'est exécuté ici
rdd_transformed = rdd_base.map(lambda x: x * 2).filter(lambda x: x > 5)

# C'est seulement ici, avec une ACTION, que Spark exécute le plan
result = rdd_transformed.collect()
print(result)  # [6, 8, 10]

## 4. Transformations courantes

### 4.1 `map` – Transformer chaque élément

**Description :** applique une fonction à chaque élément du RDD.


In [None]:
rdd = sc.parallelize(range(1, 6))
doubled = rdd.map(lambda x: x * 2)

print(doubled.collect())  # [2, 4, 6, 8, 10]

### 4.2 `filter` – Garder certains éléments

**Description :** ne conserve que les éléments qui vérifient un prédicat.

Exemple : garder les nombres pairs.


In [None]:
rdd = sc.parallelize(range(1, 6))
even = rdd.filter(lambda x: x % 2 == 0)

print(even.collect())  # nombres pair

### 4.3 `flatMap` – Transformer et aplatir

**Description :** comme `map`, mais la fonction retourne une **séquence**,  
et Spark **aplatit** toutes les séquences en un seul RDD.

Exemple : découper des phrases en mots.


In [None]:
rdd = sc.parallelize(["bonjour scala", "hello spark"])

words = rdd.flatMap(lambda line: line.split(" "))

print(words.collect())  

### 4.4 `distinct` – Supprimer les doublons


In [None]:
rdd = sc.parallelize([1, 2, 2, 3, 4, 4])
unique = rdd.distinct()

print(unique.collect())  

### 4.5 `union` – Fusionner deux RDD


In [None]:
rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize([4, 5, 6])

combined = rdd1.union(rdd2)

print(combined.collect())  # [1, 2, 3, 4, 5, 6] (ordre non garanti)

### 4.6 `intersection` – Obtenir les éléments communs


In [None]:
rdd1 = sc.parallelize(range(1, 6))
rdd2 = sc.parallelize(range(3, 8))

inter = rdd1.intersection(rdd2)

print(sorted(inter.collect())) 

### 4.7 `groupBy` – Grouper selon une clé personnalisée

Ici, on sépare les nombres pairs et impairs.


In [None]:
rdd = sc.parallelize(range(1, 6))

grouped = rdd.groupBy(lambda x: x % 2)

# groupBy retourne (clé, iterable)
for key, values in grouped.collect():
    print(key, list(values))
# Ex : 0 [2, 4]  et 1 [1, 3, 5]

### 4.8 `groupByKey` et `reduceByKey` (RDD de paires)

On travaille ici avec un RDD de **paires (clé, valeur)**, par exemple `(mot, 1)`.

- `groupByKey` : regroupe **toutes les valeurs** par clé → peut être **très coûteux** (gros shuffle).
- `reduceByKey` : agrège les valeurs par clé **au fur et à mesure**, beaucoup plus efficace.

**Bonne pratique** : préférer `reduceByKey` quand c’est possible.


In [None]:
data = [("a", 1), ("b", 1), ("a", 2)]
rdd = sc.parallelize(data)

grouped = rdd.groupByKey()

for key, values in grouped.collect():
    print(key, list(values))
# ('a', [1, 2]), ('b', [1])

In [None]:
data = [("a", 1), ("b", 1), ("a", 2)]
rdd = sc.parallelize(data)

reduced = rdd.reduceByKey(lambda a, b: a + b)

print(reduced.collect())  # [('a', 3), ('b', 1)] (ordre non garanti)

### 4.9 `sortBy` – Trier un RDD

On peut trier selon une fonction clé.


In [None]:
rdd = sc.parallelize([5, 2, 8, 1])

sorted_rdd = rdd.sortBy(lambda x: x)

print(sorted_rdd.collect())  # [1, 2, 5, 8]

## 5. Actions sur les RDD

Les **actions** déclenchent l’exécution du plan de calcul et retournent un résultat ou écrivent des données.

Actions courantes :

- `collect()` : renvoie **tous les éléments** au driver (à éviter sur de très gros RDD).
- `count()` : compte le nombre d’éléments.
- `take(n)` : renvoie les `n` premiers éléments.
- `reduce(f)` : réduit les éléments via une fonction associative (`+`, `max`, etc.).
- `saveAsTextFile(path)` : écrit le contenu dans des fichiers texte.


In [None]:
from operator import add

rdd = sc.parallelize([1, 2, 3, 4, 5])

print("collect() :", rdd.collect())
print("count()   :", rdd.count())
print("reduce()  :", rdd.reduce(add))
print("take(3)   :", rdd.take(3))

In [None]:
# Le chemin doit être un dossier qui n'existe pas encore.
# Exemple : "output_rdd" (Spark va créer le dossier et des fichiers part-* dedans).

rdd = sc.parallelize(["ligne 1", "ligne 2", "ligne 3"])

rdd.saveAsTextFile("output_rdd")

print("Données écrites dans le dossier 'output_rdd'")

Spark considère le résultat comme un ensemble de partitions, donc il écrit :
un répertoire : output_rdd/ dedans, plusieurs fichiers texte nommés en général :
- part-00000
- part-00001
- ...

et souvent un fichier de marqueur : _SUCCESS

In [None]:
# Relire les données écrites dans le dossier "output_rdd"
rdd2 = sc.textFile("output_rdd")
print(rdd2.collect())
# ['ligne 1', 'ligne 2', 'ligne 3']

## 6. Persistance des RDD : `cache()` et `persist()`

Par défaut, un RDD **n’est pas persistant**.  
À chaque fois qu’on lance une **action**, Spark **recalcule tout l’historique** du RDD depuis la source.

Si tu fais :

```scala
val rdd = sc.textFile("bigfile.txt").map(...)
val a = rdd.count()
val b = rdd.collect()
```

Spark relit et retravaille `bigfile.txt` **deux fois**.

### Pourquoi persister un RDD ?

- Tu vas **réutiliser** le même RDD dans plusieurs actions.
- Les transformations pour construire ce RDD sont **coûteuses** (joins, filtres lourds, etc.).
- Tu veux **accélérer** les traitements en évitant les recalculs.

### Méthodes :

- `rdd.cache()`  
  - stocke en mémoire (`MEMORY_ONLY`).  
  - suffisant dans beaucoup de cas.

- `rdd.persist(storageLevel)`  
  - permet de choisir le niveau de stockage (mémoire + disque, etc.).


In [None]:
rdd = sc.textFile("data.txt").map(lambda line: line.upper())

# Première action
nb_lignes = rdd.count()

# Deuxième action
contenu = rdd.collect()

print("Nombre de lignes :", nb_lignes)
print("Premières lignes :", contenu[:5])

In [None]:
rdd = sc.textFile("data.txt").map(lambda line: line.upper())

# Demande à Spark de mettre ce RDD en cache lors de la première action
rdd_cached = rdd.cache()

# 1ère action : déclenche le calcul + le cache
nb_lignes = rdd_cached.count()

# 2ème action : Spark lit depuis le cache (plus rapide)
contenu = rdd_cached.collect()

print("Nombre de lignes :", nb_lignes)
print("Premières lignes :", contenu[:5])

## 7. Exemple complet : Word Count avec RDD

Objectif : trouver les **mots les plus fréquents** dans un fichier texte.

Étapes :

1. Lire le fichier avec `sc.textFile`.
2. Découper chacune des lignes en mots (`flatMap`).
3. Associer à chaque mot la valeur 1 (`map` → `(mot, 1)`).
4. Agréger par mot (`reduceByKey`).
5. Trier par fréquence (`sortBy`).
6. Afficher les `N` mots les plus fréquents.



In [None]:
# 1. Lecture du fichier
lines = sc.textFile("data.txt")

# 2. Découpage en mots (on peut améliorer le split selon les cas)
words = lines.flatMap(lambda line: line.split())

# 3. (mot) -> (mot, 1)
pairs = words.map(lambda w: (w, 1))

# 4. Agrégation par mot
word_counts = pairs.reduceByKey(lambda a, b: a + b)

# 5. Tri par fréquence décroissante
sorted_counts = word_counts.sortBy(lambda kv: kv[1], ascending=False)

# 6. Afficher les 20 mots les plus fréquents
for word, count in sorted_counts.take(20):
    print(f"{word}: {count}")

## 8. Conclusion & pistes pour aller plus loin

Dans ce notebook, tu as vu :

- Ce qu’est un **RDD** (Resilient Distributed Dataset).
- Les propriétés clés : **résilience**, **distribution**, **immutabilité**, **lazy evaluation**.
- Comment **créer** des RDD (`parallelize`, `textFile`).
- Les **transformations** courantes : `map`, `filter`, `flatMap`, `distinct`,
  `union`, `intersection`, `groupBy`, `groupByKey`, `reduceByKey`, `sortBy`.
- Les **actions** : `collect`, `count`, `take`, `reduce`, `saveAsTextFile`.
- La **persistance** avec `cache()` / `persist()`.
- Un **workflow complet** RDD avec l'exemple *Word Count*.


Pour aller plus loin :

- Tester d’autres transformations (`join`, `leftOuterJoin`, etc.).
- Comparer RDD et **DataFrames** pour le même problème.
- Surveiller l’exécution avec l’**interface Web** de Spark (Spark UI).


In [None]:
spark.stop()