![SPARK_PYSPARK](img/spark_pyspark.jpeg)

# üí´ Apache Spark : le moteur du Big Data moderne

Le **Big Data** d√©signe des volumes de donn√©es trop grands pour √™tre trait√©s sur une seule machine.
Historiquement, on utilisait **Hadoop MapReduce**, mais celui-ci lisait et √©crivait souvent sur disque ‚Üí lent.

**Apache Spark** est n√© pour r√©pondre √† ce probl√®me.
- **Traitement en m√©moire (RAM)** ‚Üí bien plus rapide
- **API haut niveau** (SQL, DataFrame, MLlib, etc.)
- **Multi-langages** : Scala, Java, Python (PySpark), R

Spark repose sur une architecture driver / executors.
- **Driver** : programme principal. G√®re le plan d‚Äôex√©cution et collecte les r√©sultats
- **Cluster Manager** : alloue les ressources (CPU, RAM) aux t√¢ches Spark (YARN, Kubernetes, Standalone‚Ä¶)
- **Executors** : ex√©cutent les t√¢ches sur les partitions de donn√©es.

Spark distribue automatiquement les donn√©es et le calcul sur plusieurs machines.

# ‚ú® PySpark : l‚Äôinterface Python de Spark

**PySpark** est l‚Äôinterface de programmation en Python pour Apache Spark.\
Elle permet d‚Äôutiliser la puissance du moteur Spark tout en restant dans un environnement familier pour les d√©veloppeurs Python.

Concr√®tement :
- Le code √©crit en Python est traduit en instructions que le moteur Spark ex√©cute sur le cluster.
- L‚Äôex√©cution r√©elle ne se fait pas dans le processus Python, mais dans la JVM de Spark (Java Virtual Machine).
- Cette communication passe par l‚ÄôAPI Py4J, une passerelle entre Python et Java.

Ainsi, PySpark combine :
- la simplicit√© du Python (syntaxe, int√©gration avec pandas, notebooks, etc.)
- la scalabilit√© de Spark, capable de traiter des t√©raoctets de donn√©es sur des centaines de noeuds.

# ‚å®Ô∏è Hands-On

**Objectifs**  
- Comprendre comment d√©marrer une `SparkSession` en local  
- Manipuler des DataFrames PySpark (lecture, inspection, transformations)  
- Comprendre `lazy evaluation`, `action`, `job / stage / task`  
- Sauvegarder/relire des donn√©es (CSV, Parquet)

## 1. D√©marrer Spark en local

Avant de pouvoir manipuler des donn√©es avec PySpark, il faut cr√©er une session Spark.\
Cette session est le point d‚Äôentr√©e principal vers toutes les fonctionnalit√©s de Spark (lecture de donn√©es, SQL, transformations, machine learning, etc.).

En pratique, on la cr√©e √† l‚Äôaide de la classe **SparkSession** qui permet de configurer et d‚Äôinitialiser le moteur Spark.\
Une fois cette cellule ex√©cut√©e, Spark d√©marre un contexte local et affiche des informations dans la console.
C‚Äôest le signe que le driver est pr√™t √† ex√©cuter des t√¢ches et √† communiquer avec d‚Äô√©ventuels executors.

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("IntroSparkLocal") \
    .master("local[*]") \
    .getOrCreate()

# SparkSession.builder cr√©e un objet constructeur pour d√©finir les param√®tres de la session.
# .appName("IntroSparkLocal") d√©finit un nom d‚Äôapplication visible dans l‚Äôinterface Spark (utile pour identifier ton job).
# .master("local[*]") indique le mode d‚Äôex√©cution :
#       - local signifie que le calcul se fera sur la machine locale.
#       - [*] indique que Spark utilisera tous les c≈ìurs CPU disponibles.
# .getOrCreate() cr√©e une nouvelle session Spark si elle n‚Äôexiste pas encore, ou r√©cup√®re celle d√©j√† active.

Aussi, Spark d√©marre une interface web accessible √† l‚Äôadresse suivante : http://localhost:4040

Cette interface permet de :
- visualiser les jobs Spark en cours ou termin√©s
- consulter les stages et tasks
- analyser les plans d‚Äôex√©cution
- suivre l‚Äôutilisation des ressources locales (m√©moire, CPU)

### üß≠ Exploration recommand√©e

Ouvrir l‚ÄôURL http://localhost:4040 dans le navigateur.
Parcourir les diff√©rentes sections pour comprendre comment Spark g√®re un traitement distribu√© :
- Jobs : r√©sum√© des traitements lanc√©s,
- Stages : d√©composition logique des jobs,
- Tasks : ex√©cution concr√®te sur les partitions de donn√©es,
- Storage : aper√ßu du cache et des donn√©es en m√©moire.

Prendre quelques minutes pour parcourir cette interface avant de continuer car elle aidera √† visualiser ce qui se passe ‚Äúsous le capot‚Äù √† chaque fois qu'une op√©ration est ex√©cut√©e dans PySpark.

## 2. Charger le dataset de test :  `employees.csv`

Pour travailler concr√®tement avec Spark, nous allons charger un petit dataset de test nomm√© `employees.csv`.\
Ce fichier contient des informations simples sur des employ√©s : identifiant, nom, √¢ge, d√©partement, salaire et ann√©es d‚Äôexp√©rience.

Dans PySpark, la lecture d‚Äôun fichier CSV s‚Äôeffectue via la m√©thode `spark.read.csv()`.
Cette m√©thode renvoie un **DataFrame Spark,** c‚Äôest-√†-dire une **structure tabulaire distribu√©e** (semblable √† un tableau SQL ou √† un DataFrame Pandas, mais r√©partie sur plusieurs noeuds).

In [None]:
# Charge le fichier csv
# header=True indique que la premi√®re ligne contient les noms des colonnes
# inferSchema=True demande √† Spark de d√©duire automatiquement les types de donn√©es sinon toutes les colonnes seraient lues comme des cha√Ænes de caract√®res
df = spark.read.csv("./data/employees.csv", header=True, inferSchema=True)

# Affiche la structure (sch√©ma) du DataFrame, c‚Äôest-√†-dire les noms de colonnes et leurs types 
df.printSchema()

üîç **D√©taillons ce qu‚Äôil se passe en arri√®re-plan.**

Lors de la lecture du CSV :
1. Le driver Spark re√ßoit la requ√™te de lecture du fichier csv. Il construit un plan logique pour lire le fichier CSV. Le plan logique est une repr√©sentation interne (sous forme d‚Äôun graphe `DAG`) qui d√©crit les √©tapes n√©cessaires pour obtenir le r√©sultat final : *"quel fichier lire"*, *"avec quel format (ici CSV)"*, *"quelles colonnes ou types de donn√©es"*, *"quelles transformations appliquer (filtres, jointures, agr√©gations, etc.)"*\
Mais √† ce stade, **rien n‚Äôest encore ex√©cut√©**. Spark se contente de noter l‚Äô**intention du programme**.
2. Quand une action est d√©clench√©e (.show(), .count(), etc.), le driver envoie les t√¢ches aux executors.
3. Les executors lisent r√©ellement les donn√©es (leurs partitions), ex√©cutent les transformations, et renvoient leurs r√©sultats partiels au driver. (En mode local, les executors sont simplement des threads dans le m√™me processus JVM que le driver.)
4. Le driver agr√®ge ces r√©sultats et les renvoie au programme Python.

M√™me pour un petit CSV local, le m√©canisme est le m√™me que sur un cluster : Spark applique toujours un mod√®le distribu√©. Cela rend le passage √† grande √©chelle quasiment transparent pour le code.

### 2.1. Inspection de base

Avant de transformer les donn√©es, il est souvent utile de v√©rifier la structure du DataFrame.\
L'op√©ration est imm√©diate : elle interroge seulement les m√©tadonn√©es du DataFrame (pas les donn√©es elles-m√™mes).

In [None]:
df.columns

### 2.2. Lazy Evaluation

‚ö†Ô∏è Contrairement √† df.columns, les instructions suivantes d√©clenchent des actions Spark.\
Le moteur Spark ex√©cute r√©ellement des jobs distribu√©s pour lire les donn√©es, parcourir toutes les partitions et agr√©ger les r√©sultats partiels.

üîç **D√©taillons ce qu‚Äôil se passe en arri√®re-plan.**
- `df.count()` parcourt l‚Äôensemble du DataFrame pour compter le nombre de lignes. Pour cela : \
Spark cr√©e un job compos√© de plusieurs stages et tasks, chacun ex√©cut√© par les executors sur une partition de donn√©es.\
Une fois termin√©es, les executors envoient leurs r√©sultats partiels (leurs counts locaux) au driver.\
Le driver additionne ces valeurs pour obtenir le compte global du DataFrame,
puis renvoie le r√©sultat final au programme Python.
- `df.show(5)` demande √† Spark d‚Äôafficher les 5 premi√®res lignes du DataFrame.
Cela n√©cessite de lire physiquement les donn√©es (au moins partiellement) ‚Üí Spark d√©clenche donc aussi un job distribu√©.

In [None]:
display(df.count())
display(df.show(5))

## 3. S√©lections, filtres, tris

Dans Spark, les DataFrames offrent une interface haut niveau pour manipuler les donn√©es de mani√®re d√©clarative, un peu comme en SQL.
Les op√©rations de s√©lection, filtrage et tri sont des transformations qui **ne modifient pas les donn√©es originales** : elles produisent de **nouveaux DataFrames**.

üí° Chaque transformation cr√©e un nouveau plan logique dans Spark (**aucune lecture ou ex√©cution r√©elle n‚Äôa lieu** tant qu‚Äôune action, comme .show() ou .count(), n‚Äôest pas appel√©e).\
Ainsi, m√™me un simple tri ou filtre suit la m√™me logique de planification distribu√©e que sur un cluster.

In [None]:
from pyspark.sql.functions import col

# S√©lection de colonnes
df_sel = df.select("name", "age", "department")
df_sel.show(5)

# Filtres
employees_filtered = df_sel.filter(col("age") >= 30) # Ne conserver que les employ√©s d'un certain √¢ge
employees_filtered = employees_filtered.orderBy(col("age").desc())
employees_filtered.show(5)

### 3.1. Spark SQL

Il est possible d‚Äô√©crire directement du ***SQL*** gr√¢ce √† ***Spark SQL***, un module int√©gr√© √† Apache Spark qui permet d‚Äôinterroger les donn√©es via des requ√™tes SQL standard.\
Pour cela, il suffit d‚Äôenregistrer un DataFrame comme vue temporaire √† l‚Äôaide de `createOrReplaceTempView()`. Cette vue agit comme une table virtuelle accessible uniquement durant la session en cours.\
On peut ensuite ex√©cuter des requ√™tes SQL sur cette vue avec la m√©thode `spark.sql()`, comme dans l‚Äôexemple ci-dessous, qui fait exactement la m√™me chose que le job pr√©c√©dent.

In [None]:
# Cr√©ation ou remplacement d'une vue temporaire √† partir du DataFrame
df.createOrReplaceTempView("employees")

# Requ√™te SQL ex√©cut√©e via Spark SQL
result = spark.sql("""
    SELECT department,
           AVG(salary) AS avg_salary
    FROM employees
    GROUP BY department
""")

# Affichage du r√©sultat
result.show()

### üß© Exercices

> Essayez de modifier et d‚Äôexp√©rimenter :
> - s√©lectionnez d‚Äôautres colonnes avec .select().
> - changez la condition du filtre (age < 25, department == "HR", etc.).
> - triez par une autre colonne, ou combinez plusieurs tris
> - ajoutez plusieurs conditions avec les op√©rateurs logiques `&` (et), `|` (ou)

In [None]:
# Votre code ici

> Pensez √† tester aussi Spark SQL dans cet exercice.

In [None]:
# Votre code ici

## 4. Nettoyage des donn√©es

Dans Spark, nous pouvons appliquer des fonctions de transformation directement sur les colonnes d‚Äôun DataFrame de mani√®re distribu√©e.

Dans l‚Äôexemple ci-dessous :
- nous normalisons les noms de d√©partement
- nous traitons les valeurs manquantes

üí° Ces op√©rations sont aussi "lazy" : elles ne modifient pas les donn√©es tant qu‚Äôaucune action n‚Äôest ex√©cut√©e.

In [None]:
from pyspark.sql.functions import when, trim, upper

# Normaliser le nom du d√©partement: trim + upper
df_sel = df
df_clean = df_sel.withColumn("department_norm", upper(trim(col("department"))))

# G√©rer les valeurs manquantes d'√¢ge et de salaire
df_clean = df_clean.withColumn("age", when(col("age").isNull(), 0).otherwise(col("age")))
df_clean = df_clean.na.fill({"salary": 0.0})

df_clean.select("employee_id","name","department","department_norm","age","salary").show(5)

### 4.1. Cr√©er des colonnes calcul√©es

La pr√©paration des donn√©es consiste aussi √† cr√©er de nouvelles colonnes d√©riv√©es √† partir des colonnes existantes.\
Dans Spark, cela se fait avec la m√©thode `withColumn()`, qui permet d‚Äôajouter une colonne calcul√©e sans modifier le DataFrame original.

üëâ Si le nom de colonne existe d√©j√†, la colonne est remplac√©e (√©cras√©e) par la nouvelle version.\
üëâ Si le nom n‚Äôexiste pas, une nouvelle colonne est simplement ajout√©e au DataFrame, c'est le cas dans l'exemple suivant.

In [None]:
from pyspark.sql.functions import avg

# Cat√©goriser les revenus
df_feat = df_clean.withColumn(
    "salary_band",
    when(col("salary") < 3000, "LOW").when(col("salary") < 6000, "MID").otherwise("HIGH")
)

df_feat.show(5)


## 5. Regroupement et agr√©gation

La phase d‚Äôagr√©gation permet de r√©sumer, regrouper et interpr√©ter les informations issues de grands volumes de donn√©es.\
Avec PySpark, ces op√©rations s‚Äôeffectuent facilement gr√¢ce aux fonctions de groupement (`groupBy`), de comptage (`count`), et d‚Äôagr√©gation (`agg`).

In [None]:
# Nombre d'employ√©s par cat√©gorie de revenus
df_feat\
    .groupBy("salary_band")\
    .count()\
    .orderBy(col("count").desc())\
    .show()

In [None]:
from pyspark.sql.functions import round as sround

# Analyse des salaires moyens par d√©partement 
dept_stats = df_feat\
                .groupBy("department_norm")\
                .agg(sround(avg("salary"),2).alias("avg_salary"))\
                .orderBy(col("avg_salary").desc())

dept_stats.show()


### üß© Exercices

> Comment Spark d√©coupe les transformations ci-dessus en **stages** ?  
> Ouvrez l'onglet **SQL / DAG** dans l'UI Spark puis analysez les informations pr√©sent√©es.


## 6. Sauvegarde de r√©sultats (CSV / Parquet)

Une fois les traitements r√©alis√©s nous pouvons sauvegarder les r√©sultats pour les r√©utiliser ou les partager.\
PySpark permet d‚Äô√©crire facilement un DataFrame dans diff√©rents formats, notamment CSV et Parquet (format binaire optimis√© pour le Big Data).

- Le format CSV est universel mais peu efficace sur de gros volumes, car il ne conserve pas les types de donn√©es.
- Le format Parquet, lui, conserve le sch√©ma, compresse les donn√©es et acc√©l√®re les lectures : il est donc privil√©gi√© dans les environnements distribu√©s.

### üîé Format Parquet
Pour aller plus loin concernant le format Parquet, voici quelques ressources :
- üìò [Documentation officielle Apache Parquet](https://parquet.apache.org/docs/)  
- üé• [Qu‚Äôest-ce qu'une base de donn√©es orient√©e colonne ?](https://www.youtube.com/watch?v=1MnvuNg33pA)

In [None]:
# √âcriture en CSV (dossier de sortie partitionn√©)
dept_stats.write.mode("overwrite").csv("./outputs/dept_stats_csv")

# √âcriture en Parquet (conserve le sch√©ma, plus efficace)
dept_stats.write.mode("overwrite").parquet("./outputs/dept_stats_parquet")

# Relecture du Parquet
reloaded = spark.read.parquet("./outputs/dept_stats_parquet")
reloaded.printSchema()
reloaded.show()


## 7. RDD (Resilient Distributed Dataset)

Jusqu‚Äô√† pr√©sent, vous avez manipul√© des **DataFrames** : une interface structur√©e, avec des colonnes nomm√©es, des types de donn√©es, et des op√©rations famili√®res (s√©lection, groupBy, agg, etc.).\
Les DataFrames sont tr√®s pratiques, car ils permettent d‚Äô√©crire du code lisible et optimis√© gr√¢ce au moteur **Catalyst** de Spark.

Mais sous cette interface haut niveau se cache la brique fondamentale de Spark : le **RDD (Resilient Distributed Dataset)**.\
Comprendre ce concept vous permettra de mieux saisir comment Spark distribue, ex√©cute et reconstruit vos donn√©es √† grande √©chelle.

Un RDD est une collection d‚Äôobjets distribu√©e √† travers le cluster Spark.
- **Resilient** : il peut √™tre reconstruit automatiquement en cas de panne.
- **Distributed** : les donn√©es sont d√©coup√©es en partitions r√©parties sur plusieurs n≈ìuds.
- **Dataset** : c‚Äôest simplement un ensemble d‚Äô√©l√©ments (lignes, objets, etc.).

Autrement dit, un RDD est une collection r√©partie sur plusieurs machines, que Spark sait manipuler en parall√®le.

### 7.1. Manipulation de base

In [None]:
sc = spark.sparkContext

# Exemple d'un RDD √† partir d‚Äôune liste locale
# Spark cr√©e un RDD contenant ces 5 √©l√©ments.
rdd = sc.parallelize([1, 2, 3, 4, 5])

In [None]:
# Rien n‚Äôest ex√©cut√© tant qu‚Äôon ne d√©clenche pas une action
display(rdd.collect())

In [None]:
# On peut transformer les donn√©es
rdd2 = rdd.map(lambda x: x * 2)
display(rdd2.collect())

#### üß© Exercices

> Soit la liste de produits :

```bash
ventes = [
    ("chaise", 200),
    ("table", 450),
    ("lampe", 100),
    ("chaise", 150),
    ("table", 300),
    ("lampe", 80)
]
```

> - Cr√©ez un RDD √† partir de cette liste.
> - R√©cup√©rez les produits dont le prix d√©passe 100‚Ç¨
> - Calculez le CA total par produit (possible d'utiliser .reduceByKey())
> - Triez par CA d√©croissant (possible d'utiliser .sortBy())

In [None]:
# Votre code ici

### 7.2. Manipulation de fichiers

In [None]:
# Spark permet de lire et de manipuler des fichiers sous forme de RDD.
employees_rdd = sc.textFile("./data/employees.csv")
employees_rdd.collect()

#### üß© Exercices

> Ne r√©cup√©rer que les employ√©s du d√©partement IT.\
> Indice : .map(), .filter(), .split()

```bash
Exemple :

[['22', 'Alice Bernard', '33.0', 'IT', '6231.15', '10'],
 ['32', 'Hugo Girard', '58.0', 'IT', '8616.31', '20'],
 ['40', 'Emma Morel', '42.0', 'IT', '6115.84', '10'],
 ['72', 'Tom Petit', '41.0', 'IT', '5449.79', '7'],
 ['83', 'Ines Durand', '54.0', 'IT', '7555.4', '13']]
 ```

In [None]:
# Votre code ici

## 8. Approfondir

Ce notebook propose une **introduction pratique** √† Apache Spark et PySpark.  
Il ne couvre pas toutes les subtilit√©s du framework, notamment celles li√©es √† l‚Äô**optimisation des performances**, la **gestion des clusters**, ou les **fonctionnalit√©s avanc√©es**, MLlib, ou Structured Streaming.  

Pour compl√©ter votre apprentissage, il est **fortement recommand√© de poursuivre l‚Äôexploration** √† travers d'autres ressources.\
Il existe **de nombreuses ressources de qualit√© disponibles en ligne**, il suffit de chercher un peu pour trouver des explications, tutoriels et exemples adapt√©s √† votre niveau.

L‚Äôobjectif de cet approfondissement est de comprendre Apache Spark dans son contexte Big Data, non pas comme un simple outil, mais comme une √©volution et une r√©ponse aux limites de Hadoop.
Il s‚Äôagit d‚Äôidentifier les diff√©rences d‚Äôarchitecture, de performance et de paradigme de calcul, tout en comprenant comment ces deux technologies peuvent coexister et se compl√©ter.

#### Axes d'approfondissement
> Architecture de Spark
- R√¥les du Driver, des Executors et du Cluster Manager
- Traitement en m√©moire et ex√©cution des DAG (Directed Acyclic Graph)
- Modes d‚Äôex√©cution : Local vs Cluster

> Comparaison Hadoop / Spark
- Stockage : HDFS et compatibilit√© avec Spark
- Gestion des ressources : r√¥le de YARN
- Mod√®le de calcul : batch vs in-memory

> √âcosyst√®me et usages
- Modules : Spark SQL, MLlib, Streaming, GraphX


#### Ressources officielles
- [Documentation officielle de Spark](https://spark.apache.org/docs/latest/) ‚Äî la source principale pour comprendre les modules (SQL, MLlib, Streaming, GraphX).  
- [API PySpark sur spark.apache.org](https://spark.apache.org/docs/latest/api/python/) ‚Äî r√©f√©rence compl√®te des fonctions disponibles.  
- [Apache Spark sur GitHub](https://github.com/apache/spark) ‚Äî pour explorer le code source et les √©volutions du projet.  

#### Tutoriels et cours gratuits
- [Spark Tutorial (DataCamp)](https://www.datacamp.com/tutorial/pyspark-tutorial-getting-started-with-pyspark) ‚Äî bonnes bases pour pratiquer avec PySpark.  
- [YouTube ‚Äì ‚ÄúApache Spark Full Course‚Äù by Simplilearn](https://www.youtube.com/watch?v=_C8kWso4ne4) ‚Äî formation vid√©o compl√®te et vulgaris√©e.   