RDD Spark et Opération. 

Un RDD est la structure de base pour le traitement des données dans Apache Spark. il s'agit d'une collection : 
- Résilent : si un noeud tombe, spark peut en reconstruire le RDD à partir des transformations initiales grace aux DAG
- Distribué -> Les données sont distribuées sur différents noeuds du cluster qui permet un traitement parallèle. 
- Immutable -> Une fois crée, un RDD ne peut pas être modifié, mais on peut en générer un nouveau à partir de transformation. 
- Layly Evaluated : les transformations appliquées sur le RDD tel que map, filter ne sont pas appliqué qu'au moment d'une action comme : collect(), count() etc.
- Partitionné -> Spark divise automatiquement les données en partitions pour améliorer l'efficacité du calcul. 

- Transformations (créent un nouveau RDD) : map(), filter(), flatMap(), reduceByKey(), etc.
- Actions (déclenchent l’exécution des transformations) : collect(), count(), reduce(), saveAsTextFile(), etc.

In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
import random
#créer un SparkContext
sc = SparkContext.getOrCreate()
spark = SparkSession.builder.appName("RDD_to_DF").getOrCreate()


# <span style="color:red; font-size:1em; font-weight:bold"> Différence entre SparkContext et SparkSession :</span> 



| 🏷️ Caractéristique  | ⚡ SparkContext (`sc`) | 🚀 SparkSession (`spark`) |
|-----------------|-----------------|------------------|
| **Définition** | Point d’entrée original de Spark pour accéder au cluster. | API unifiée introduite dans Spark 2.0 qui regroupe `SparkContext`, `SQLContext` et `HiveContext`. |
| **Utilisation principale** | Manipulation bas niveau des RDDs. | Manipulation des DataFrames, SQL, streaming et interactions avec Spark SQL. |
| **Création** | `sc = SparkContext.getOrCreate()` | `spark = SparkSession.builder.appName("App").getOrCreate()` |
| **Support SQL/DataFrame** | ❌ Non pris en charge directement. | ✅ Prise en charge complète (`spark.sql()`, `spark.read` etc.). |
| **Accès au SparkContext** | ✅ C'est lui-même le contexte principal. | ✅ Peut accéder à `SparkContext` via `spark.sparkContext`. |


In [2]:
#Exemple de création d'un RDD spark et de sa Transformation
data = [1,2,3,4,5,6,7,8,9,10]
rdd = sc.parallelize(data) # cétte opération permet de créer un RDD immutable. 
# Transformer les données : Nous allons les multiplier par 2
rdd_multiplie = rdd.map(lambda x: x * 2)

In [3]:
# Action : Récupération des résultats. 
result = rdd_multiplie.collect()
result

[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]

In [4]:
# lire le fichier .txt dans un RDD spark. 
text_rdd = sc.textFile('/home/jovyan/work/tears_in_rain.txt')

In [5]:
# Afficher les 3 premier éléments de notre RDD
text_rdd.take(3)

["I've seen things you people wouldn't believe. ",
 'Attack ships on fire off the shoulder of Orion. ',
 'I watched C-beams glitter in the dark near the Tannhäuser Gate. ']

In [6]:
print(f"le nombre d'élément : {text_rdd.count()}")
print(f"les 10 premiers éléments  : {text_rdd.take(10)}")

le nombre d'élément : 5
les 10 premiers éléments  : ["I've seen things you people wouldn't believe. ", 'Attack ships on fire off the shoulder of Orion. ', 'I watched C-beams glitter in the dark near the Tannhäuser Gate. ', 'All those moments will be lost in time, like tears in rain. ', 'Time to die.']


In [7]:
# Spliter les phrases en listes de Mots
split_text = text_rdd.map(lambda x: x.split())

In [8]:
split_text.take(1)

[["I've", 'seen', 'things', 'you', 'people', "wouldn't", 'believe.']]

<span style="color:red; font-size:1em; font-weight:bold">L'exemple suivant permet de voir les points suivants :</span> 

- Génération d'une liste de nombre de 10 Millions contenues entre 1 et 1000 
- Création d'un RDD Spark à l'aide de la fontion parrallelize()
- Transformer les valeurs de la liste en Tuples pour pouvoir créer un DataFrame spark. 
- Création du DataFrame Spark à l'aide de la fonction createDataFrame. 

Pourquoi devons nous transformer les valeurs de notre liste en Tuple pour créer un DataFrame Spark ? 

La raison est due à la structure des données dans Spark. Généralement les données sont définies sous forme de tuple ou chaque tuple représente une colonne. 

liste = [1,2,3,4] --> Spark ne saura pas l'interpréter sous forme de lignes et des colonnes. Il est nécessaire de convertir en tuple sous forme de (1,), (2,)

In [None]:
number = 10_000_000
rdd_numbers = sc.parallelize(range(number)).map(lambda x: random.randint(1 , 1000))
df = spark.createDataFrame(rdd_numbers.map(lambda x: (x,)))
display(df.show(3))

# <span style="color:red; font-size:1em; font-weight:bold">Requêter nos données grace à SparkSql</span> 

Executer des requêtes sql sur un dataframe SPARK nécessite de créer une vue d'une table en mémoire. aveec la fonctione .createOrReplaceTempView pour le charger en mémoire. A la suite de quoi nous pouvons exécuter nos requêtes sql. 

- La vue permet de requêter les données de manières plus optimale. 
- Attention : Si la vue existe déja, nous ne pouvons pas la récréer. Il faudrait alors soit la supprimer ou utiliser une clause if not exits. 

In [None]:
# df.createTempView("my_view") # première option 
df.createOrReplaceTempView("my_view") # Deuxième option 
result = spark.sql("select * from my_view where _1 = 994")
result.show()

In [11]:
display(result)

DataFrame[_1: bigint]

# <span style="color:red; font-size:1em; font-weight:bold">Manipuler nos DataFrames</span> 


In [12]:
# Première façon d'accéder à nos colonne simplement en les référençant entre [""]
df["_1"]

Column<'_1'>

### `functions` de PySpark
Accéder aux colonnes grace au module de PySpark sql qui s'appel functions. 
En revanche cette méthode ne fonction qu'à l'intérieur des commandes sql. Raison pour laquelle on l'utilise dans la méthode select. 
Fonctions du module `functions` de PySpark

Le module **`functions`** de PySpark propose une large gamme de fonctions pour effectuer des manipulations et des agrégations sur les colonnes d'un DataFrame. Voici quelques-unes des fonctions les plus couramment utilisées :

### Résumé des fonctions de `functions` de PySpark :

- **Sélection** : `col()`, `alias()`, `when()`, `lit()`, `cast()`.
- **Agrégation** : `avg()`, `sum()`, `min()`, `max()`, `count()`, `countDistinct()`, `stddev()`, `variance()`, `first()`, `last()`.
- **Fenêtre** : `row_number()`, `rank()`, `dense_rank()`, `ntile()`, `lag()`, `lead()`.
- **Mathématiques** : `abs()`, `sqrt()`, `round()`, `exp()`, `log()`, `pow()`.
- **Chaînes** : `length()`, `trim()`, `regexp_extract()`, `regexp_replace()`.
- **Dates** : `current_date()`, `current_timestamp()`, `date_format()`, `to_date()`, `year()`, `month()`, `dayofmonth()`.
- **Nulles** : `isNull()`, `isNotNull()`, `coalesce()`.
- **Autres transformations** : `collect_list()`, `collect_set()`, `concat_ws()`.


In [13]:
from pyspark.sql import functions as f 
result.select(f.col("_1"))

DataFrame[_1: bigint]

In [14]:
result.select(f.avg("_1"))

DataFrame[avg(_1): double]

### `Action` de PySpark

In [None]:
df.show()

In [16]:
df.take(2)

[Row(_1=23), Row(_1=42)]

In [17]:
df.count()

10000000

In [18]:
df.describe()

DataFrame[summary: string, _1: string]

In [19]:
df.describe().toPandas()

Unnamed: 0,summary,_1
0,count,10000000.0
1,mean,500.3427784
2,stddev,288.67221665815606
3,min,1.0
4,max,1000.0


In [20]:
display(df)

DataFrame[_1: bigint]

# <span style="color:red; font-size:1em; font-weight:bold">Utiliser Limit pour récupérer qu'un nombre limiter de données</span> 

Lors de la manipulation des très gros volumes de données il faut veiller à ne pas faire des opérations qui peuvent saturer la mémoire. 

In [21]:
df.limit(5).describe().toPandas()

Unnamed: 0,summary,_1
0,count,5.0
1,mean,442.4
2,stddev,261.8659962652654
3,min,145.0
4,max,756.0


### `Sauvergarder nos résultat`

### La méthode .write.mode("overwrite").csv()

Nous préférons cette méthode dans le cas ou notre DataFrame est très grand. ça permet de : 

- Gagner en performance : plus rapide d'écrire les partitions que tout fusionner.
- Eviter les problèmes de mémoires. 
- Quand les données sont  stockés sur  : HDFS, S3, Google Cloud Storage. 

Nous utiliserons cette méthodes si nous voulons garder les partitions de nos données. Ou si nous souhaitons les partitionner selon des critères 
Exemple : Années et Mois. 

df.write.partitionBy("année", "mois").csv("hdfs://chemin_output", header=True)


In [24]:
df.write.mode("overwrite").csv("./home/jovyan/work/monDfIssueDePySpark")

### La méthode .coalesce(1).write.mode("overwrite").csv

Cette méthode est à éviter lorsque le jeu de données est extrêmement lourd. En effet, elle n'est pas efficace et très gourmande en mémoire. 
A utiliser uniquement si nous voulons avoir un seul jeu de données xlsx ou un pandas. 


- En comparaison le même df de 10 000 000 d'enregistrement est enregistré en 3.4s en multipart contre 21.8s en une seule partie et 3.3 en Parquet en plusiuers partie et 20.9 en une seule partie. 

ça montre que le mode parquet reste quand même le mode le plus efficace à l'export. Mais tout dépend de notre besoin final et l'usage qui sera fait de la donnée une fois exportée. µ

In [23]:
df.coalesce(1).write.mode("overwrite").csv("./home/jovyan/work/monDfEnUnSeulFichier", header=True)


### Les fichier Parquet 
🚀 Avantages du format Parquet :
✅ Stockage optimisé & compression :

Parquet stocke les données en colonnes et applique une compression efficace.
Résultat : Fichiers plus petits (jusqu’à 75% d’économie comparé à CSV).
✅ Lecture rapide & sélective :

Spark ne charge que les colonnes utilisées dans une requête, ce qui accélère l'analyse.
Ex : avec CSV, Spark doit lire tout le fichier même si on ne veut qu'une colonne.
✅ Gère les types de données & les schémas :

Parquet conserve les types natifs (int, float, date, etc.), contrairement à CSV où tout est en string.
✅ Meilleur pour les traitements distribués :

Supporte le partitionnement et l'indexation des fichiers → booste la performance.
Fonctionne super bien avec Hive, AWS Athena, Google BigQuery.

In [None]:
# En plusieurs fichiers 
df.write.mode("overwrite").parquet("./home/jovyan/work/MonFichierParquet")


In [None]:
# En un seul fichier 
df.coalesce(1).write.mode("overwrite").parquet("./home/jovyan/work/MonFichierParquetEnUnSeulFichier")