In [None]:
# Databricks notebook source

# Exercice 04 : Delta Lake Avance## Objectifs pedagogiquesA la fin de cet exercice, vous serez capable de :- Utiliser le Time Travel pour consulter les anciennes versions- Effectuer des MERGE (upserts) pour mettre a jour les donnees- Implementer le Change Data Capture (CDC)- Optimiser les tables avec OPTIMIZE et Z-ORDER- Nettoyer les anciennes versions avec VACUUM- Gerer les schemas et l'evolution des donnees## Duree estimee : 90 minutes

## PARTIE 1 : Concepts theoriques### Qu'est-ce que Delta Lake ?Delta Lake est une couche de stockage qui apporte :- **ACID transactions** : Atomicite, Coherence, Isolation, Durabilite- **Time Travel** : Acces aux versions precedentes- **Schema enforcement** : Validation du schema- **Schema evolution** : Evolution controlee du schema- **Audit trail** : Historique complet des modifications### Architecture DeltaUne table Delta = Fichiers Parquet + Transaction Log (_delta_log/)Le transaction log enregistre :- Chaque modification (INSERT, UPDATE, DELETE, MERGE)- Les metadonnees (schema, partitions)- Les statistiques (min, max, count par fichier)

## PARTIE 2 : Historique et versions

In [None]:

print("EXERCICE 4.1 : Consulter l'historique d'une table")
print("=" * 70)

# Voir l'historique complet de la table
df_history = spark.sql("DESCRIBE HISTORY gares_silver")

print("Historique de la table gares_silver :")
df_history.select("version", "timestamp", "operation", "operationMetrics").show(10, truncate=False)

# Compter les versions
nb_versions = df_history.count()
print(f"\nNombre de versions : {nb_versions}")


## PARTIE 3 : Time Travel (voyage dans le temps)

In [None]:

print("EXERCICE 4.2 : Lire une version specifique")
print("=" * 70)

# Methode 1 : Lire par numero de version
df_version_0 = spark.read.format("delta").option("versionAsOf", 0).table("gares_silver")
print(f"Version 0 : {df_version_0.count()} lignes")

# Methode 2 : Lire par timestamp
from datetime import datetime, timedelta

# Lire la version d'il y a 1 heure (si elle existe)
timestamp_1h_ago = (datetime.now() - timedelta(hours=1)).strftime("%Y-%m-%d %H:%M:%S")
try:
    df_timestamp = spark.read.format("delta").option("timestampAsOf", timestamp_1h_ago).table("gares_silver")
    print(f"Version d'il y a 1h : {df_timestamp.count()} lignes")
except:
    print("Pas de version disponible il y a 1 heure")

# Methode 3 : Syntaxe SQL
spark.sql("SELECT COUNT(*) FROM gares_silver VERSION AS OF 0").show()


## PARTIE 4 : Modifications des donnees

In [None]:

print("EXERCICE 4.3 : INSERT de nouvelles donnees")
print("=" * 70)

from pyspark.sql.functions import lit

# Creer des nouvelles gares fictives pour l'exemple
nouvelles_gares = [
    ("Gare Test 1", "TST1", "A", "48.8566, 2.3522", "75056", "87001001", 48.8566, 2.3522, "75", "A", "Gare Principale"),
    ("Gare Test 2", "TST2", "B", "45.7640, 4.8357", "69123", "87002002", 45.7640, 4.8357, "69", "B", "Gare Importante")
]

from pyspark.sql.types import StructType, StructField, StringType, DoubleType

schema_gares = StructType([
    StructField("nom", StringType(), True),
    StructField("trigramme", StringType(), True),
    StructField("segment_drg", StringType(), True),
    StructField("position_geo", StringType(), True),
    StructField("code_commune", StringType(), True),
    StructField("code_uic", StringType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("code_departement", StringType(), True),
    StructField("segment_clean", StringType(), True),
    StructField("categorie_gare", StringType(), True)
])

df_nouvelles = spark.createDataFrame(nouvelles_gares, schema_gares)

# Creer une table temporaire pour les tests
df_gares_original = spark.table("gares_silver")
df_gares_original.write.format("delta").mode("overwrite").saveAsTable("gares_test")

print(f"Table de test creee : {spark.table('gares_test').count()} lignes")

# Inserer les nouvelles gares
df_nouvelles.write.format("delta").mode("append").saveAsTable("gares_test")

print(f"Apres insertion : {spark.table('gares_test').count()} lignes")

# Verifier l'historique
spark.sql("DESCRIBE HISTORY gares_test").select("version", "operation", "operationMetrics").show(truncate=False)


## PARTIE 5 : UPDATE et DELETE

In [None]:

print("EXERCICE 4.4 : UPDATE avec SQL")
print("=" * 70)

# UPDATE : Modifier les gares de test
spark.sql("""
    UPDATE gares_test
    SET segment_clean = 'X', categorie_gare = 'Gare Test'
    WHERE trigramme LIKE 'TST%'
""")

print("Gares modifiees :")
spark.sql("SELECT * FROM gares_test WHERE trigramme LIKE 'TST%'").show()

# Verifier l'historique
spark.sql("DESCRIBE HISTORY gares_test").select("version", "operation", "operationMetrics").limit(3).show(truncate=False)


In [None]:

print("EXERCICE 4.5 : DELETE avec SQL")
print("=" * 70)

# DELETE : Supprimer les gares de test
spark.sql("""
    DELETE FROM gares_test
    WHERE trigramme LIKE 'TST%'
""")

print(f"Apres suppression : {spark.table('gares_test').count()} lignes")

# Verifier qu'elles ont bien ete supprimees
spark.sql("SELECT * FROM gares_test WHERE trigramme LIKE 'TST%'").show()

# Verifier l'historique
print("\nHistorique des operations :")
spark.sql("DESCRIBE HISTORY gares_test").select("version", "operation", "operationMetrics").limit(5).show(truncate=False)


## EXERCICE PRATIQUE 4.6 : Restaurer une version precedenteLes gares de test ont ete supprimees. Utilisez Time Travel pour les recuperer !

In [None]:

# A COMPLETER
# 1. Trouvez la version AVANT la suppression
print("Versions disponibles :")
spark.sql("DESCRIBE HISTORY gares_test").select("version", "operation").show(10)

# 2. Restaurez cette version (utilisez RESTORE)
spark.sql("RESTORE TABLE gares_test TO VERSION AS OF 2")

# 3. Verifiez que les gares sont revenues
print("\nVerification :")
spark.sql("SELECT * FROM gares_test WHERE trigramme LIKE 'TST%'").show()

print(f"\nNombre total de lignes : {spark.table('gares_test').count()}")


## PARTIE 6 : MERGE (Upsert)

In [None]:

print("EXERCICE 4.7 : MERGE - INSERT + UPDATE en une operation")
print("=" * 70)

# Creer des donnees pour le MERGE
# - Certaines gares existent deja (UPDATE)
# - D'autres sont nouvelles (INSERT)
donnees_merge = [
    ("Gare Test 1", "TST1", "A", "75", "Gare Modifiee"),  # Existe -> UPDATE
    ("Gare Test 3", "TST3", "C", "13", "Gare Nouvelle")   # N'existe pas -> INSERT
]

schema_merge = StructType([
    StructField("nom", StringType(), True),
    StructField("trigramme", StringType(), True),
    StructField("segment_clean", StringType(), True),
    StructField("code_departement", StringType(), True),
    StructField("categorie_gare", StringType(), True)
])

df_updates = spark.createDataFrame(donnees_merge, schema_merge)

# Enregistrer comme table temporaire
df_updates.createOrReplaceTempView("updates_gares")

# MERGE : UPDATE si existe, INSERT sinon
spark.sql("""
    MERGE INTO gares_test AS target
    USING updates_gares AS source
    ON target.trigramme = source.trigramme
    WHEN MATCHED THEN
        UPDATE SET
            target.categorie_gare = source.categorie_gare,
            target.segment_clean = source.segment_clean
    WHEN NOT MATCHED THEN
        INSERT (nom, trigramme, segment_clean, code_departement, categorie_gare)
        VALUES (source.nom, source.trigramme, source.segment_clean, source.code_departement, source.categorie_gare)
""")

print("Resultat du MERGE :")
spark.sql("SELECT * FROM gares_test WHERE trigramme LIKE 'TST%' ORDER BY trigramme").show()

# Historique
print("\nHistorique :")
spark.sql("DESCRIBE HISTORY gares_test").select("version", "operation", "operationMetrics").limit(3).show(truncate=False)


## EXERCICE PRATIQUE 4.8 : MERGE avec DELETEImplementez un MERGE qui :1. UPDATE si la gare existe et segment_clean = 'A'2. DELETE si la gare existe mais segment_clean != 'A'3. INSERT si la gare n'existe pas

In [None]:

# A COMPLETER
# Creer des donnees de test
donnees_merge_delete = [
    ("Gare Test 1", "TST1", "A", "75", "Principale Mise a Jour"),  # UPDATE
    ("Gare Test 2", "TST2", "C", "69", "A Supprimer"),             # DELETE
    ("Gare Test 4", "TST4", "B", "33", "Nouvelle Gare")            # INSERT
]

df_merge_delete = spark.createDataFrame(donnees_merge_delete, schema_merge)
df_merge_delete.createOrReplaceTempView("merge_updates")

# MERGE avec DELETE
spark.sql("""
    MERGE INTO gares_test AS target
    USING merge_updates AS source
    ON target.trigramme = source.trigramme
    WHEN MATCHED AND source.segment_clean = 'A' THEN
        UPDATE SET target.categorie_gare = source.categorie_gare
    WHEN MATCHED AND source.segment_clean != 'A' THEN
        DELETE
    WHEN NOT MATCHED THEN
        INSERT (nom, trigramme, segment_clean, code_departement, categorie_gare)
        VALUES (source.nom, source.trigramme, source.segment_clean, source.code_departement, source.categorie_gare)
""")

print("Resultat du MERGE avec DELETE :")
spark.sql("SELECT * FROM gares_test WHERE trigramme LIKE 'TST%' ORDER BY trigramme").show()


## PARTIE 7 : Change Data Capture (CDC)

In [None]:

print("EXERCICE 4.9 : Implementer CDC avec CDF (Change Data Feed)")
print("=" * 70)

# Activer CDF sur la table
spark.sql("ALTER TABLE gares_test SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")

print("CDF active sur gares_test")

# Faire quelques modifications
spark.sql("UPDATE gares_test SET segment_clean = 'Z' WHERE trigramme = 'TST1'")
spark.sql("DELETE FROM gares_test WHERE trigramme = 'TST3'")

# Lire les changements entre deux versions
df_changes = spark.read.format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingVersion", 6) \
    .table("gares_test")

print("\nChangements captures par CDF :")
df_changes.select("nom", "trigramme", "_change_type", "_commit_version").show()

print("""
Types de changements :
- insert : Nouvelle ligne
- update_preimage : Ancienne valeur
- update_postimage : Nouvelle valeur
- delete : Ligne supprimee
""")


## PARTIE 8 : OPTIMIZE et Z-ORDER

In [None]:

print("EXERCICE 4.10 : Optimiser les performances avec OPTIMIZE")
print("=" * 70)

# Voir les statistiques avant optimisation
spark.sql("DESCRIBE DETAIL gares_silver").select("numFiles", "sizeInBytes").show()

# OPTIMIZE : Compacter les petits fichiers
spark.sql("OPTIMIZE gares_silver")

print("\nApres OPTIMIZE :")
spark.sql("DESCRIBE DETAIL gares_silver").select("numFiles", "sizeInBytes").show()


In [None]:

print("EXERCICE 4.11 : Z-ORDERING pour accelerer les filtres")
print("=" * 70)

# Z-ORDER sur les colonnes frequemment filtrees
spark.sql("OPTIMIZE gares_silver ZORDER BY (code_departement, segment_clean)")

print("Z-ORDERING applique sur code_departement et segment_clean")
print("Les requetes filtrant sur ces colonnes seront beaucoup plus rapides !")

# Verifier l'historique
spark.sql("DESCRIBE HISTORY gares_silver").select("version", "operation").limit(5).show()


## PARTIE 9 : VACUUM (nettoyage)

In [None]:

print("EXERCICE 4.12 : Nettoyer les anciennes versions avec VACUUM")
print("=" * 70)

# Voir la taille actuelle
detail_avant = spark.sql("DESCRIBE DETAIL gares_test")
print("Avant VACUUM :")
detail_avant.select("location", "numFiles", "sizeInBytes").show(truncate=False)

# VACUUM : Supprimer les fichiers de plus de 168 heures (7 jours)
# ATTENTION : Cela supprime les anciennes versions (Time Travel limite)

# Pour tester, on peut reduire la retention (EN DEV UNIQUEMENT !)
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
spark.sql("VACUUM gares_test RETAIN 0 HOURS")

print("\nApres VACUUM :")
spark.sql("DESCRIBE DETAIL gares_test").select("location", "numFiles", "sizeInBytes").show(truncate=False)

print("""
ATTENTION :
- VACUUM supprime les anciennes versions
- Time Travel ne fonctionnera plus pour les versions supprimees
- En production, garder au moins 7 jours (RETAIN 168 HOURS)
""")


## PARTIE 10 : Schema Evolution

In [None]:

print("EXERCICE 4.13 : Evolution du schema")
print("=" * 70)

from pyspark.sql.functions import lit

# Lire la table actuelle
df_gares = spark.table("gares_test")

# Ajouter une nouvelle colonne
df_avec_nouvelle_colonne = df_gares.withColumn("pays", lit("France"))

# Essayer d'inserer SANS schema evolution (va echouer)
try:
    df_avec_nouvelle_colonne.write.format("delta").mode("append").saveAsTable("gares_test")
    print("Insertion reussie")
except Exception as e:
    print(f"Erreur attendue : {str(e)[:100]}...")

# Activer l'evolution du schema
df_avec_nouvelle_colonne.write \
    .format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .saveAsTable("gares_test")

print("\nSchema evolue avec succes !")

# Verifier le nouveau schema
print("\nNouveau schema :")
spark.table("gares_test").printSchema()


## PARTIE 11 : Constraints (contraintes)

In [None]:

print("EXERCICE 4.14 : Ajouter des contraintes")
print("=" * 70)

# Ajouter une contrainte CHECK
spark.sql("""
    ALTER TABLE gares_test
    ADD CONSTRAINT valid_segment CHECK (segment_clean IN ('A', 'B', 'C', 'X', 'Z'))
""")

print("Contrainte ajoutee : segment_clean doit etre A, B, C, X ou Z")

# Essayer d'inserer une valeur invalide
try:
    spark.sql("INSERT INTO gares_test (nom, trigramme, segment_clean) VALUES ('Test Invalid', 'INV', 'INVALID')")
except Exception as e:
    print(f"\nErreur attendue : {str(e)[:150]}...")

# Verifier les contraintes
spark.sql("SHOW TBLPROPERTIES gares_test").filter("key LIKE '%constraint%'").show(truncate=False)


## EXERCICE FINAL 4.15 : Pipeline CDC completeImplementez une pipeline CDC complete qui :1. Cree une table source avec CDF active2. Simule des modifications (INSERT, UPDATE, DELETE)3. Lit les changements avec CDF4. Applique les changements a une table cible avec MERGE5. Optimise la table cible6. Documente toutes les versions

In [None]:

# A COMPLETER - SOLUTION PROPOSEE
print("PIPELINE CDC COMPLETE")
print("=" * 70)

# 1. Creer une table source avec CDF
df_source = spark.table("gares_silver").limit(100)
df_source.write.format("delta").mode("overwrite").saveAsTable("gares_source_cdc")
spark.sql("ALTER TABLE gares_source_cdc SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")

# 2. Creer une table cible
df_source.write.format("delta").mode("overwrite").saveAsTable("gares_cible_cdc")

print("Tables source et cible creees")

# 3. Simuler des modifications sur la source
spark.sql("UPDATE gares_source_cdc SET segment_clean = 'A' WHERE segment_clean = 'B' LIMIT 5")
spark.sql("DELETE FROM gares_source_cdc WHERE segment_clean = 'C' LIMIT 3")

# Nouvelles insertions
nouvelles = spark.table("gares_silver").limit(5).withColumn("trigramme", lit("NEW"))
nouvelles.write.format("delta").mode("append").saveAsTable("gares_source_cdc")

print("\nModifications appliquees a la source")

# 4. Lire les changements
version_actuelle = spark.sql("DESCRIBE HISTORY gares_cible_cdc").select("version").first()[0]

df_changes = spark.read.format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingVersion", 1) \
    .table("gares_source_cdc")

print(f"\nChangements detectes : {df_changes.count()} lignes")
df_changes.groupBy("_change_type").count().show()

# 5. Appliquer les changements a la cible avec MERGE
# (Simplifie ici - en production, traiter chaque type de changement)
df_etat_actuel = spark.table("gares_source_cdc")
df_etat_actuel.createOrReplaceTempView("source_actuelle")

spark.sql("""
    MERGE INTO gares_cible_cdc AS target
    USING source_actuelle AS source
    ON target.trigramme = source.trigramme AND target.code_uic = source.code_uic
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
""")

print("\nChangements appliques a la cible")

# 6. Optimiser
spark.sql("OPTIMIZE gares_cible_cdc")
print("Table cible optimisee")

# 7. Documenter
print("\nHistorique complet :")
spark.sql("DESCRIBE HISTORY gares_cible_cdc").select("version", "timestamp", "operation", "operationMetrics").show(truncate=False)


## RESUME DES CONCEPTS APPRISDans cet exercice, vous avez appris :1. **Time Travel** : `VERSION AS OF`, `TIMESTAMP AS OF`, `RESTORE`2. **DML Operations** : `INSERT`, `UPDATE`, `DELETE`3. **MERGE** : Upserts (INSERT + UPDATE en une operation)4. **Change Data Feed** : Capturer les modifications5. **OPTIMIZE** : Compacter les fichiers6. **Z-ORDER** : Optimiser les performances des filtres7. **VACUUM** : Nettoyer les anciennes versions8. **Schema Evolution** : `mergeSchema`9. **Constraints** : Validation des donnees10. **Historique** : `DESCRIBE HISTORY`## Bonnes pratiques Delta Lake- Activer CDF sur les tables importantes- OPTIMIZE regulierement (ex: quotidien)- Z-ORDER sur les colonnes de filtre- VACUUM avec retention >= 7 jours- Utiliser MERGE pour les upserts- Documenter les changements de schema## Prochaine etapePassez a l'**Exercice 05 : Streaming en temps reel** pour apprendre :- Structured Streaming- Ingestion en continu- Traitement en micro-batches- Integration avec Delta Lake