# Manipulation Avancée de Données avec PySpark

Ce notebook est le "Couteau Suisse" du Data Engineer. Nous allons voir comment aller au-delà des simples filtres pour réaliser des opérations complexes : Jointures, UDFs (Fonctions Utilisateurs), et Optimisation.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when, udf, avg, count, desc
from pyspark.sql.types import StringType, IntegerType

spark = SparkSession.builder \
    .appName("Manipulation Avancée Expert") \
    .master("local[*]") \
    .getOrCreate()

## 1. Création de Données Relationnelles (Pour les Jointures)

Pour comprendre les jointures, il nous faut deux tables : `Employes` et `Departements`.

In [None]:
# Table Employés
data_emp = [
    (1, "Martin", 10),
    (2, "Sophie", 10),
    (3, "Paul", 20),
    (4, "Julie", 30),
    (5, "Fantôme", 99) # Dept 99 n'existe pas
]
cols_emp = ["id", "nom", "dept_id"]
df_emp = spark.createDataFrame(data_emp, cols_emp)

# Table Départements
data_dept = [
    (10, "Marketing", "Paris"),
    (20, "Finance", "Lyon"),
    (30, "IT", "Bordeaux"),
    (40, "RH", "Lille") # Aucun employé ici
]
cols_dept = ["id_dept", "nom_dept", "ville"]
df_dept = spark.createDataFrame(data_dept, cols_dept)

print("--- Employés ---")
df_emp.show()
print("--- Départements ---")
df_dept.show()

## 2. Les Jointures (Joins)

C'est le concept le plus important en base de données relationnelle.

- **Inner Join** : Ne garde que ce qui matche des deux côtés.
- **Left Join** : Garde TOUT ce qui est à gauche (Employés), même s'il n'y a pas de département.
- **Right Join** : Garde TOUT ce qui est à droite (Départements), même s'il n'y a pas d'employé.

In [None]:
# Inner Join : Martin, Sophie, Paul, Julie (Fantôme disparaît, RH disparaît)
print("--- Inner Join ---")
df_emp.join(df_dept, df_emp.dept_id == df_dept.id_dept, "inner").show()

# Left Join : Fantôme apparait avec des NULLs pour le département
print("--- Left Join ---")
df_emp.join(df_dept, df_emp.dept_id == df_dept.id_dept, "left").show()

# Full Outer Join : Tout le monde est là
print("--- Full Outer Join ---")
df_emp.join(df_dept, df_emp.dept_id == df_dept.id_dept, "outer").show()

## 3. User Defined Functions (UDFs)

Parfois, les fonctions natives de Spark ne suffisent pas. Vous voulez appliquer votre propre fonction Python.

**Attention** : Les UDFs sont plus lentes que les fonctions natives car Spark doit sérialiser les données vers Python et revenir. À utiliser avec parcimonie.

In [None]:
# Définition d'une fonction Python classique
def categoriser_salaire_complexe(nom, id):
    if len(nom) > 5 and id % 2 == 0:
        return "Elite"
    else:
        return "Standard"

# Conversion en UDF Spark (Nécessite de préciser le type de retour)
udf_salaire = udf(categoriser_salaire_complexe, StringType())

# Utilisation
df_emp.withColumn("statut", udf_salaire(col("nom"), col("id"))).show()

## 4. Statistiques et Exploration

Avant de se lancer dans le Machine Learning, il faut comprendre ses données.

In [None]:
# Statistiques descriptives (count, mean, stddev, min, max)
df_emp.describe().show()

# Matrice de Corrélation (Approximative)
print("Corrélation id vs dept_id :", df_emp.stat.corr("id", "dept_id"))

# Tableaux croisés (Pivot Table)
# Imaginons qu'on veuille compter les employés par département et par... (ici on n'a pas assez de colonnes variées, mais voici la syntaxe)
df_emp.groupBy("dept_id").count().show()

## 5. Optimisation : Cache & Persist

Si vous utilisez le même DataFrame plusieurs fois (par exemple pour l'entraîner dans un modèle, puis pour faire des stats), Spark va le recalculer DEPUIS LE DÉBUT à chaque fois.

Pour éviter ça, on le met en cache (mémoire).

In [None]:
df_gros_calcul = df_emp.join(df_dept, df_emp.dept_id == df_dept.id_dept, "outer")

# Mise en cache -> La première action sera lente, les suivantes instantanées
df_gros_calcul.cache()

# Action 1 (Met en mémoire)
print("Count 1 :", df_gros_calcul.count())

# Action 2 (Lit depuis la mémoire)
print("Count 2 (Rapide) :", df_gros_calcul.count())

# Toujours vider le cache quand fini pour libérer la RAM
df_gros_calcul.unpersist()

## 6. SQL Avancé (CTE & Subqueries)

On peut faire des requêtes très complexes avec des CTE (Common Table Expressions).

In [None]:
df_emp.createOrReplaceTempView("emp")
df_dept.createOrReplaceTempView("dept")

spark.sql("""
    WITH DeptPops AS (
        SELECT dept_id, COUNT(*) as nb_emp
        FROM emp
        GROUP BY dept_id
    )
    SELECT d.nom_dept, p.nb_emp
    FROM dept d
    JOIN DeptPops p ON d.id_dept = p.dept_id
    WHERE p.nb_emp > 1
""").show()

In [None]:
spark.stop()