## 1. Configuration initiale

In [0]:
import os
from pyspark.sql.functions import year, month, to_date, col

print("="*60)
print("🔧 CONFIGURATION INITIALE")
print("="*60)

# Variables d'environnement
storage_account_name = os.environ.get("AZURE_STORAGE_ACCOUNT_NAME")
storage_account_key = os.environ.get("AZURE_STORAGE_ACCOUNT_KEY")

if storage_account_name and storage_account_key:
    # Configuration Spark
    spark.conf.set(
        f"fs.azure.account.key.{storage_account_name}.dfs.core.windows.net",
        storage_account_key
    )
    print("✅ Configuration Spark effectuée")
else:
    print("⚠️  Variables d'environnement non configurées")

# Nom de la base de données
DATABASE_NAME = "eau_potable"

print(f"📦 Base de données : {DATABASE_NAME}")
print("="*60)

## 2. Vérification des fichiers Parquet dans BRONZE


In [0]:
print("\n" + "="*60)
print("📋 VISUALISATION DES TABLES DISPONIBLES")
print("="*60)

# Méthode 1 : Lister toutes les tables de la base de données
print(f"\n🔍 Tables dans la base '{DATABASE_NAME}' :")
print("-" * 60)

try:
    tables_df = spark.sql(f"SHOW TABLES IN {DATABASE_NAME}")
    tables_df.show(truncate=False)
    
    # Compter le nombre de tables
    nb_tables = tables_df.count()
    print(f"\n✅ Nombre total de tables : {nb_tables}")
    
except Exception as e:
    print(f"❌ Erreur lors de la récupération des tables : {str(e)}")

# Méthode 2 : Afficher uniquement les noms des tables
print(f"\n📝 Liste des noms de tables :")
print("-" * 60)

try:
    tables = spark.catalog.listTables(DATABASE_NAME)
    
    for table in tables:
        print(f"  ✓ {table.name}")
        
except Exception as e:
    print(f"❌ Erreur : {str(e)}")

# Méthode 3 : Vérifier l'existence des tables spécifiques
print(f"\n🔎 Vérification des tables Bronze attendues :")
print("-" * 60)

expected_tables = ["dis_plv", "dis_result"]

for table_name in expected_tables:
    try:
        exists = spark.catalog.tableExists(f"{DATABASE_NAME}.{table_name}")
        if exists:
            # Compter le nombre de lignes
            count = spark.sql(f"SELECT COUNT(*) as count FROM {DATABASE_NAME}.{table_name}").first()['count']
            print(f"  ✅ {table_name} : existe ({count:,} lignes)")
        else:
            print(f"  ❌ {table_name} : n'existe pas")
    except Exception as e:
        print(f"  ⚠️  {table_name} : erreur - {str(e)}")

print("="*60)

## visu des data

In [0]:
print("\n" + "="*60)
print("👀 APERÇU DES DONNÉES - 10 PREMIÈRES LIGNES")
print("="*60)

# Table 1 : dis_plv
print(f"\n📊 Table : dis_plv (1,932,481 lignes)")
print("-" * 80)

try:
    spark.sql(f"""
        SELECT *
        FROM {DATABASE_NAME}.dis_plv
        LIMIT 10
    """).show(truncate=30)
    
    print("✅ Aperçu dis_plv affiché")
    
except Exception as e:
    print(f"❌ Erreur : {str(e)}")

# Table 2 : dis_result
print(f"\n📊 Table : dis_result (59,568,134 lignes)")
print("-" * 80)

try:
    spark.sql(f"""
        SELECT *
        FROM {DATABASE_NAME}.dis_result
        LIMIT 10
    """).show(truncate=30)
    
    print("✅ Aperçu dis_result affiché")
    
except Exception as e:
    print(f"❌ Erreur : {str(e)}")

# Bonus : Schéma des tables
print(f"\n📋 SCHÉMA DES TABLES")
print("="*60)

print(f"\n🔵 Schéma de dis_plv :")
print("-" * 60)
try:
    spark.sql(f"DESCRIBE {DATABASE_NAME}.dis_plv").show(truncate=False)
except Exception as e:
    print(f"❌ Erreur : {str(e)}")

print(f"\n🟢 Schéma de dis_result :")
print("-" * 60)
try:
    spark.sql(f"DESCRIBE {DATABASE_NAME}.dis_result").show(truncate=False)
except Exception as e:
    print(f"❌ Erreur : {str(e)}")

print("="*60)

In [0]:
%sql
use catalog `hive_metastore`; select * from `eau_potable`.`dis_plv` limit 100;

In [0]:
%sql
use catalog `hive_metastore`; select * from `eau_potable`.`dis_result` limit 100;

## analyse des distict

In [0]:
# print("\n" + "="*60)
# print("🔍 ANALYSE DES VALEURS DISTINCTES")
# print("="*60)

# # ============================================================
# # TABLE DIS_PLV - Colonnes de Conformité
# # ============================================================

# print(f"\n📊 TABLE : dis_plv")
# print("="*60)

# # Colonne 1 : plvconformitebacterio
# print(f"\n🔵 Colonne : plvconformitebacterio")
# print("-" * 60)
# try:
#     spark.sql(f"""
#         SELECT 
#             plvconformitebacterio,
#             COUNT(*) as nb_occurrences,
#             ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) as pourcentage
#         FROM {DATABASE_NAME}.dis_plv
#         GROUP BY plvconformitebacterio
#         ORDER BY nb_occurrences DESC
#     """).show(truncate=False)
# except Exception as e:
#     print(f"❌ Erreur : {str(e)}")

# # Colonne 2 : plvconformitechimique
# print(f"\n🔵 Colonne : plvconformitechimique")
# print("-" * 60)
# try:
#     spark.sql(f"""
#         SELECT 
#             plvconformitechimique,
#             COUNT(*) as nb_occurrences,
#             ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) as pourcentage
#         FROM {DATABASE_NAME}.dis_plv
#         GROUP BY plvconformitechimique
#         ORDER BY nb_occurrences DESC
#     """).show(truncate=False)
# except Exception as e:
#     print(f"❌ Erreur : {str(e)}")

# # Colonne 3 : plvconformitereferencebact
# print(f"\n🔵 Colonne : plvconformitereferencebact")
# print("-" * 60)
# try:
#     spark.sql(f"""
#         SELECT 
#             plvconformitereferencebact,
#             COUNT(*) as nb_occurrences,
#             ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) as pourcentage
#         FROM {DATABASE_NAME}.dis_plv
#         GROUP BY plvconformitereferencebact
#         ORDER BY nb_occurrences DESC
#     """).show(truncate=False)
# except Exception as e:
#     print(f"❌ Erreur : {str(e)}")

# # ============================================================
# # TABLE DIS_RESULT - Colonnes d'Analyse
# # ============================================================

# print(f"\n\n📊 TABLE : dis_result")
# print("="*60)

# # Colonne 4 : cdparametresiseeaux
# print(f"\n🟢 Colonne : cdparametresiseeaux (TOP 20)")
# print("-" * 60)
# try:
#     spark.sql(f"""
#         SELECT 
#             cdparametresiseeaux,
#             COUNT(*) as nb_occurrences,
#             ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) as pourcentage
#         FROM {DATABASE_NAME}.dis_result
#         GROUP BY cdparametresiseeaux
#         ORDER BY nb_occurrences DESC
#         LIMIT 20
#     """).show(truncate=False)
# except Exception as e:
#     print(f"❌ Erreur : {str(e)}")

# # Colonne 5 : libmajparametre
# print(f"\n🟢 Colonne : libmajparametre (TOP 30)")
# print("-" * 60)
# try:
#     spark.sql(f"""
#         SELECT 
#             libmajparametre,
#             COUNT(*) as nb_occurrences,
#             ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) as pourcentage
#         FROM {DATABASE_NAME}.dis_result
#         GROUP BY libmajparametre
#         ORDER BY nb_occurrences DESC
#         LIMIT 30
#     """).show(truncate=False)
# except Exception as e:
#     print(f"❌ Erreur : {str(e)}")

# # Colonne 6 : libminparametre
# print(f"\n🟢 Colonne : libminparametre (TOP 30)")
# print("-" * 60)
# try:
#     spark.sql(f"""
#         SELECT 
#             libminparametre,
#             COUNT(*) as nb_occurrences,
#             ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) as pourcentage
#         FROM {DATABASE_NAME}.dis_result
#         WHERE libminparametre IS NOT NULL
#         GROUP BY libminparametre
#         ORDER BY nb_occurrences DESC
#         LIMIT 30
#     """).show(truncate=False)
# except Exception as e:
#     print(f"❌ Erreur : {str(e)}")

# # Colonne 7 : qualitparam
# print(f"\n🟢 Colonne : qualitparam")
# print("-" * 60)
# try:
#     spark.sql(f"""
#         SELECT 
#             qualitparam,
#             COUNT(*) as nb_occurrences,
#             ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) as pourcentage
#         FROM {DATABASE_NAME}.dis_result
#         GROUP BY qualitparam
#         ORDER BY nb_occurrences DESC
#     """).show(truncate=False)
# except Exception as e:
#     print(f"❌ Erreur : {str(e)}")

# # Colonne 8 : insituana
# print(f"\n🟢 Colonne : insituana")
# print("-" * 60)
# try:
#     spark.sql(f"""
#         SELECT 
#             insituana,
#             COUNT(*) as nb_occurrences,
#             ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) as pourcentage
#         FROM {DATABASE_NAME}.dis_result
#         GROUP BY insituana
#         ORDER BY nb_occurrences DESC
#     """).show(truncate=False)
# except Exception as e:
#     print(f"❌ Erreur : {str(e)}")

# # ============================================================
# # STATISTIQUES GLOBALES
# # ============================================================

# print(f"\n\n📈 STATISTIQUES GLOBALES")
# print("="*60)

# print(f"\n🔹 Nombre total de valeurs distinctes par colonne :")
# print("-" * 60)

# try:
#     print(f"\n📋 TABLE dis_plv :")
#     spark.sql(f"""
#         SELECT 
#             'plvconformitebacterio' as colonne,
#             COUNT(DISTINCT plvconformitebacterio) as nb_valeurs_distinctes
#         FROM {DATABASE_NAME}.dis_plv
        
#         UNION ALL
        
#         SELECT 
#             'plvconformitechimique' as colonne,
#             COUNT(DISTINCT plvconformitechimique) as nb_valeurs_distinctes
#         FROM {DATABASE_NAME}.dis_plv
        
#         UNION ALL
        
#         SELECT 
#             'plvconformitereferencebact' as colonne,
#             COUNT(DISTINCT plvconformitereferencebact) as nb_valeurs_distinctes
#         FROM {DATABASE_NAME}.dis_plv
#     """).show(truncate=False)
    
#     print(f"\n📋 TABLE dis_result :")
#     spark.sql(f"""
#         SELECT 
#             'cdparametresiseeaux' as colonne,
#             COUNT(DISTINCT cdparametresiseeaux) as nb_valeurs_distinctes
#         FROM {DATABASE_NAME}.dis_result
        
#         UNION ALL
        
#         SELECT 
#             'libmajparametre' as colonne,
#             COUNT(DISTINCT libmajparametre) as nb_valeurs_distinctes
#         FROM {DATABASE_NAME}.dis_result
        
#         UNION ALL
        
#         SELECT 
#             'libminparametre' as colonne,
#             COUNT(DISTINCT libminparametre) as nb_valeurs_distinctes
#         FROM {DATABASE_NAME}.dis_result
        
#         UNION ALL
        
#         SELECT 
#             'qualitparam' as colonne,
#             COUNT(DISTINCT qualitparam) as nb_valeurs_distinctes
#         FROM {DATABASE_NAME}.dis_result
        
#         UNION ALL
        
#         SELECT 
#             'insituana' as colonne,
#             COUNT(DISTINCT insituana) as nb_valeurs_distinctes
#         FROM {DATABASE_NAME}.dis_result
#     """).show(truncate=False)
    
# except Exception as e:
#     print(f"❌ Erreur : {str(e)}")

# print("="*60)
# print("✅ Analyse des valeurs distinctes terminée")
# print("="*60)

# 🚀 CRÉATION DE LA TABLE SILVER : silver_plv_clean

In [0]:
print("\n" + "="*60)
print("🔧 CRÉATION TABLE SILVER : silver_plv_clean_2")
print("="*60)

from pyspark.sql.functions import (
    col, to_date, year, month, quarter, 
    when, trim, regexp_replace, concat, lit,
    date_format, dayofweek, coalesce
)

# ============================================================
# ÉTAPE 1 : LECTURE DES DONNÉES BRONZE
# ============================================================

print(f"\n📖 Lecture de la table Bronze : dis_plv")
print("-" * 60)

try:
    df_bronze = spark.table(f"{DATABASE_NAME}.dis_plv")
    nb_lignes_bronze = df_bronze.count()
    print(f"✅ Données chargées : {nb_lignes_bronze:,} lignes")
except Exception as e:
    print(f"❌ Erreur de lecture : {str(e)}")
    raise

# ============================================================
# ÉTAPE 2 : NETTOYAGE ET TYPAGE
# ============================================================

print(f"\n🧹 Nettoyage et Typage des données...")
print("-" * 60)

df_clean = df_bronze

# 2.1 - Convertir dateprel en DATE
print("  ➤ Conversion dateprel (string → DATE)")
df_clean = df_clean.withColumn(
    "dateprel_clean",
    to_date(col("dateprel"), "yyyy-MM-dd")
)

# 2.2 - Extraire l'heure de heureprel (ex: "12h35" → 12)
print("  ➤ Extraction heure (string '12h35' → INT 12)")
df_clean = df_clean.withColumn(
    "heure_prel",
    regexp_replace(col("heureprel"), "h.*", "").cast("int")
)

# 2.3 - Convertir annee en INT
print("  ➤ Conversion annee (string → INT)")
df_clean = df_clean.withColumn(
    "annee_int",
    col("annee").cast("int")
)

# 2.4 - Nettoyer pourcentdebit ("100 %" → 100.0)
print("  ➤ Nettoyage pourcentdebit ('100 %' → 100.0)")
df_clean = df_clean.withColumn(
    "pourcentdebit_clean",
    when(
        col("pourcentdebit").isNotNull(),
        regexp_replace(col("pourcentdebit"), " %", "").cast("float")
    ).otherwise(None)
)

print("✅ Nettoyage et typage terminés")

# ============================================================
# ÉTAPE 3 : ENRICHISSEMENT TEMPOREL
# ============================================================

print(f"\n📅 Enrichissement temporel...")
print("-" * 60)

# 3.1 - Extraire année, mois, trimestre, semestre
print("  ➤ Extraction : année, mois, trimestre, semestre")
df_clean = df_clean.withColumn("annee_prel", year(col("dateprel_clean")))
df_clean = df_clean.withColumn("mois_prel", month(col("dateprel_clean")))
df_clean = df_clean.withColumn("trimestre_prel", quarter(col("dateprel_clean")))
df_clean = df_clean.withColumn(
    "semestre_prel",
    when(col("mois_prel") <= 6, 1).otherwise(2)
)

# 3.2 - Jour de la semaine (1=Dimanche, 2=Lundi, ..., 7=Samedi)
print("  ➤ Calcul du jour de la semaine")
df_clean = df_clean.withColumn("jour_semaine_num", dayofweek(col("dateprel_clean")))

# Mapper les numéros aux noms de jours
df_clean = df_clean.withColumn(
    "jour_semaine",
    when(col("jour_semaine_num") == 1, "Dimanche")
    .when(col("jour_semaine_num") == 2, "Lundi")
    .when(col("jour_semaine_num") == 3, "Mardi")
    .when(col("jour_semaine_num") == 4, "Mercredi")
    .when(col("jour_semaine_num") == 5, "Jeudi")
    .when(col("jour_semaine_num") == 6, "Vendredi")
    .when(col("jour_semaine_num") == 7, "Samedi")
    .otherwise("Inconnu")
)

print("✅ Enrichissement temporel terminé")

# ============================================================
# ÉTAPE 4 : TRAITEMENT DES VALEURS NULL
# ============================================================

print(f"\n🔄 Traitement des valeurs NULL...")
print("-" * 60)

# Compter les NULL avant traitement
null_count_before = df_clean.filter(
    col("cdreseauamont").isNull() | 
    col("nomreseauamont").isNull() | 
    col("pourcentdebit_clean").isNull()
).count()

print(f"  ℹ️  Lignes avec NULL avant : {null_count_before:,}")

# Remplacer les NULL
print("  ➤ Remplacement des NULL par 'NON RENSEIGNE'")
df_clean = df_clean.withColumn(
    "cdreseauamont",
    coalesce(col("cdreseauamont"), lit("NON RENSEIGNE"))
)
df_clean = df_clean.withColumn(
    "nomreseauamont",
    coalesce(col("nomreseauamont"), lit("NON RENSEIGNE"))
)
df_clean = df_clean.withColumn(
    "pourcentdebit_clean",
    coalesce(col("pourcentdebit_clean"), lit(0.0))
)

print("✅ Traitement des NULL terminé")

# ============================================================
# ÉTAPE 5 : SÉLECTION ET RÉORGANISATION DES COLONNES
# ============================================================

print(f"\n📋 Sélection des colonnes finales...")
print("-" * 60)

df_silver = df_clean.select(
    # Identifiants
    col("cddept"),
    col("cdreseau"),
    col("inseecommuneprinc"),
    col("nomcommuneprinc"),
    col("cdreseauamont"),
    col("nomreseauamont"),
    
    # Prélèvement
    col("referenceprel"),
    col("dateprel_clean").alias("dateprel"),
    col("heure_prel"),
    col("pourcentdebit_clean").alias("pourcentdebit"),
    
    # Enrichissement temporel
    col("annee_int").alias("annee"),
    col("annee_prel"),
    col("mois_prel"),
    col("trimestre_prel"),
    col("semestre_prel"),
    col("jour_semaine"),
    
    # Conclusions et conformité
    col("conclusionprel"),
    col("plvconformitebacterio"),
    col("plvconformitechimique"),
    col("plvconformitereferencebact"),
    col("plvconformitereferencechim"),
    
    # Organismes
    col("ugelib"),
    col("distrlib"),
    col("moalib")
)

print("✅ Colonnes sélectionnées")

# ============================================================
# ÉTAPE 6 : DÉDOUBLONNAGE
# ============================================================

print(f"\n🔍 Dédoublonnage des données...")
print("-" * 60)

nb_lignes_avant = df_silver.count()
print(f"  ℹ️  Lignes avant dédoublonnage : {nb_lignes_avant:,}")

# Identifier les doublons
df_doublons = df_silver.groupBy(df_silver.columns).count().filter(col("count") > 1)
nb_doublons = df_doublons.count()
print(f"  ⚠️  Doublons détectés : {nb_doublons:,}")

if nb_doublons > 0:
    print(f"\n  📊 Aperçu des doublons (TOP 5) :")
    df_doublons.orderBy(col("count").desc()).show(5, truncate=40)

# Supprimer les doublons
df_silver_unique = df_silver.dropDuplicates()
nb_lignes_apres = df_silver_unique.count()

print(f"  ✅ Lignes après dédoublonnage : {nb_lignes_apres:,}")
print(f"  ➤ Lignes supprimées : {nb_lignes_avant - nb_lignes_apres:,}")

# ============================================================
# ÉTAPE 7 : CRÉATION DE LA TABLE SILVER
# ============================================================

print(f"\n💾 Création de la table Silver...")
print("-" * 60)

table_name = f"{DATABASE_NAME}.silver_plv_clean_2"

try:
    # Supprimer la table si elle existe déjà
    spark.sql(f"DROP TABLE IF EXISTS {table_name}")
    print(f"  ℹ️  Table existante supprimée : {table_name}")
    
    # Créer la table
    df_silver_unique.write.format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .saveAsTable(table_name)
    
    print(f"✅ Table créée avec succès : {table_name}")
    
except Exception as e:
    print(f"❌ Erreur lors de la création : {str(e)}")
    raise

# ============================================================
# ÉTAPE 8 : VÉRIFICATION ET STATISTIQUES
# ============================================================

print(f"\n📊 Statistiques finales")
print("="*60)

# Compter les lignes dans la table
nb_lignes_table = spark.table(table_name).count()
print(f"✅ Lignes dans la table : {nb_lignes_table:,}")

# Afficher le schéma
print(f"\n📋 Schéma de la table :")
print("-" * 60)
spark.table(table_name).printSchema()

# Afficher un aperçu
print(f"\n👀 Aperçu des 10 premières lignes :")
print("-" * 60)
spark.table(table_name).show(10, truncate=30)

# Statistiques par année
print(f"\n📈 Répartition par année :")
print("-" * 60)
spark.sql(f"""
    SELECT 
        annee,
        COUNT(*) as nb_prelevements,
        ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) as pourcentage
    FROM {table_name}
    GROUP BY annee
    ORDER BY annee DESC
""").show()

# Statistiques de conformité
print(f"\n🎯 Statistiques de conformité :")
print("-" * 60)
spark.sql(f"""
    SELECT 
        plvconformitebacterio,
        plvconformitechimique,
        COUNT(*) as nb_prelevements
    FROM {table_name}
    GROUP BY plvconformitebacterio, plvconformitechimique
    ORDER BY nb_prelevements DESC
    LIMIT 10
""").show()

print("="*60)
print("✅ TABLE SILVER silver_plv_clean_2 CRÉÉE AVEC SUCCÈS !")
print("="*60)

In [0]:
# print("\n" + "="*60)
# print("🗑️  SUPPRESSION DE LA TABLE silver_plv_clean")
# print("="*60)

# # ============================================================
# # ÉTAPE 1 : SUPPRESSION DE LA VUE TEMPORAIRE
# # ============================================================

# print(f"\n🔹 ÉTAPE 1 : Suppression de la vue temporaire")
# print("-" * 60)

# try:
#     # Vérifier si la vue temporaire existe
#     temp_tables = [table.name for table in spark.catalog.listTables("default") if table.isTemporary]
    
#     if "silver_plv_clean" in temp_tables:
#         spark.catalog.dropTempView("silver_plv_clean")
#         print("✅ Vue temporaire 'silver_plv_clean' supprimée")
#     else:
#         print("ℹ️  Aucune vue temporaire 'silver_plv_clean' trouvée")
        
# except Exception as e:
#     print(f"⚠️  Erreur lors de la suppression de la vue temporaire : {str(e)}")

# # ============================================================
# # ÉTAPE 2 : SUPPRESSION DE LA TABLE DELTA (si elle existe)
# # ============================================================

# print(f"\n🔹 ÉTAPE 2 : Suppression de la table Delta Lake (si elle existe)")
# print("-" * 60)

# try:
#     # Vérifier si la table existe dans le catalogue
#     table_exists = spark.catalog.tableExists(f"{DATABASE_NAME}.silver_plv_clean")
    
#     if table_exists:
#         spark.sql(f"DROP TABLE IF EXISTS {DATABASE_NAME}.silver_plv_clean")
#         print(f"✅ Table '{DATABASE_NAME}.silver_plv_clean' supprimée du catalogue")
#     else:
#         print(f"ℹ️  Aucune table '{DATABASE_NAME}.silver_plv_clean' trouvée dans le catalogue")
        
# except Exception as e:
#     print(f"⚠️  Erreur lors de la suppression de la table : {str(e)}")

# # ============================================================
# # ÉTAPE 3 : VÉRIFICATION
# # ============================================================

# print(f"\n🔹 ÉTAPE 3 : Vérification de la suppression")
# print("-" * 60)

# try:
#     # Vérifier les vues temporaires
#     temp_tables = [table.name for table in spark.catalog.listTables("default") if table.isTemporary]
    
#     if "silver_plv_clean" not in temp_tables:
#         print("✅ Vue temporaire supprimée avec succès")
#     else:
#         print("⚠️  La vue temporaire existe toujours")
    
#     # Vérifier les tables dans la base de données
#     table_exists = spark.catalog.tableExists(f"{DATABASE_NAME}.silver_plv_clean")
    
#     if not table_exists:
#         print(f"✅ Table Delta supprimée avec succès")
#     else:
#         print(f"⚠️  La table existe toujours dans {DATABASE_NAME}")
    
#     # Lister les tables actuelles dans la base
#     print(f"\n📋 Tables actuelles dans '{DATABASE_NAME}' :")
#     spark.sql(f"SHOW TABLES IN {DATABASE_NAME}").show(truncate=False)
    
# except Exception as e:
#     print(f"❌ Erreur lors de la vérification : {str(e)}")

# # ============================================================
# # RÉSUMÉ
# # ============================================================

# print("\n" + "="*60)
# print("✅ OPÉRATION DE SUPPRESSION TERMINÉE")
# print("="*60)
# print("🗑️  Vue temporaire : vérifiée")
# print("🗑️  Table Delta : vérifiée")
# print("="*60)

In [0]:
%sql
use catalog `hive_metastore`; use schema `eau_potable`; select * from `eau_potable`.`silver_plv_clean_2` limit 100;

In [0]:
%sql
use catalog `hive_metastore`; select * from `eau_potable`.`dis_result` limit 45;

# silver result

In [0]:
print("\n" + "="*60)
print("🔧 CRÉATION TABLE SILVER : silver_result_clean_2")
print("="*60)

from pyspark.sql.functions import (
    col, when, trim, regexp_replace, lit, coalesce, upper
)

# ============================================================
# ÉTAPE 1 : LECTURE DES DONNÉES BRONZE
# ============================================================

print(f"\n📖 Lecture de la table Bronze : dis_result")
print("-" * 60)

try:
    df_bronze = spark.table(f"{DATABASE_NAME}.dis_result")
    nb_lignes_bronze = df_bronze.count()
    print(f"✅ Données chargées : {nb_lignes_bronze:,} lignes")
except Exception as e:
    print(f"❌ Erreur de lecture : {str(e)}")
    raise

# ============================================================
# ÉTAPE 2 : CONVERSION DES TYPES
# ============================================================

print(f"\n🔄 Conversion des types...")
print("-" * 60)

df_clean = df_bronze

# 2.1 - Convertir annee en INT
print("  ➤ Conversion annee (string → INT)")
df_clean = df_clean.withColumn(
    "annee_int",
    col("annee").cast("int")
)

# 2.2 - Convertir valtraduite en FLOAT
print("  ➤ Conversion valtraduite (string → FLOAT)")
df_clean = df_clean.withColumn(
    "valtraduite_float",
    col("valtraduite").cast("float")
)

print("✅ Conversion des types terminée")

# ============================================================
# ÉTAPE 3 : CATÉGORISATION DES PARAMÈTRES
# ============================================================

print(f"\n🏷️  Catégorisation des paramètres...")
print("-" * 60)

# 3.1 - Créer categorie_parametre
print("  ➤ Création de categorie_parametre")

df_clean = df_clean.withColumn(
    "categorie_parametre",
    when(col("cdparametre").isin(
        "ECOLI", "CTF", "STRF", "BSIR", "GT22_68", "GT36_44"
    ), "BACTERIOLOGIE")
    .when(col("cdparametre").isin(
        "PH", "CDT25", "TEAU", "TURBNFU", "COULF", "COULQ"
    ), "PHYSICO_CHIMIE")
    .when(col("cdparametre").isin(
        "NH4", "NO2", "NO3", "CL", "SO4", "CA", "MG", "NO3_NO2"
    ), "CHIMIE_MINERALE")
    .when(col("cdparametre").isin(
        "CL2LIB", "CL2TOT", "O3", "CLO2"
    ), "DESINFECTANTS")
    .when(col("cdparametre").isin(
        "COT", "CLVYL"
    ), "CHIMIE_ORGANIQUE")
    .when(col("cdparametre").isin(
        "ASP", "ODQ", "SAVQ"
    ), "QUALITATIF")
    .when(col("libmajparametre").rlike("(?i)(PLOMB|CUIVRE|FER|ZINC|NICKEL|CHROME|CADMIUM|MERCURE|ARSENIC|ALUMINIUM)"), "METAUX")
    .when(col("libmajparametre").rlike("(?i)(PESTICIDE|HERBICIDE|INSECTICIDE|ATRAZINE|SIMAZINE)"), "PESTICIDES")
    .when(col("cdparametre").isin("TA", "TAC", "TH"), "DURETE_ALCALINITE")
    .otherwise("AUTRES")
)

# 3.2 - Créer sous_categorie
print("  ➤ Création de sous_categorie")

df_clean = df_clean.withColumn(
    "sous_categorie",
    # BACTERIOLOGIE
    when(col("cdparametre") == "ECOLI", "ECOLI")
    .when(col("cdparametre").isin("CTF"), "COLIFORMES")
    .when(col("cdparametre") == "STRF", "ENTEROCOQUES")
    .when(col("cdparametre") == "BSIR", "SPORES")
    .when(col("cdparametre").isin("GT22_68", "GT36_44"), "AEROBIES")
    
    # CHIMIE_MINERALE
    .when(col("cdparametre").isin("NO3", "NO3_NO2"), "NITRATES")
    .when(col("cdparametre") == "NO2", "NITRITES")
    .when(col("cdparametre") == "NH4", "AMMONIUM")
    .when(col("cdparametre") == "CL", "CHLORURES")
    .when(col("cdparametre") == "SO4", "SULFATES")
    .when(col("cdparametre") == "CA", "CALCIUM")
    .when(col("cdparametre") == "MG", "MAGNESIUM")
    
    # PHYSICO_CHIMIE
    .when(col("cdparametre") == "PH", "PH")
    .when(col("cdparametre") == "CDT25", "CONDUCTIVITE")
    .when(col("cdparametre") == "TEAU", "TEMPERATURE")
    .when(col("cdparametre") == "TURBNFU", "TURBIDITE")
    .when(col("cdparametre").isin("COULF", "COULQ"), "COULEUR")
    
    # DESINFECTANTS
    .when(col("cdparametre") == "CL2LIB", "CHLORE_LIBRE")
    .when(col("cdparametre") == "CL2TOT", "CHLORE_TOTAL")
    .when(col("cdparametre") == "O3", "OZONE")
    .when(col("cdparametre") == "CLO2", "DIOXYDE_CHLORE")
    
    # CHIMIE_ORGANIQUE
    .when(col("cdparametre") == "COT", "CARBONE_ORGANIQUE")
    .when(col("cdparametre") == "CLVYL", "CHLORURE_VINYLE")
    
    # QUALITATIF
    .when(col("cdparametre") == "ASP", "ASPECT")
    .when(col("cdparametre") == "ODQ", "ODEUR")
    .when(col("cdparametre") == "SAVQ", "SAVEUR")
    
    # DURETE_ALCALINITE
    .when(col("cdparametre") == "TA", "TITRE_ALCALIMETRIQUE")
    .when(col("cdparametre") == "TAC", "TITRE_ALCALIMETRIQUE_COMPLET")
    .when(col("cdparametre") == "TH", "TITRE_HYDROTIMETRIQUE")
    
    .otherwise("AUTRE")
)

print("✅ Catégorisation terminée")

# ============================================================
# ÉTAPE 4 : SÉLECTION DES COLONNES (SUPPRESSION libminparametre)
# ============================================================

print(f"\n📋 Sélection des colonnes finales...")
print("-" * 60)
print("  ➤ Suppression de la colonne : libminparametre")

df_silver = df_clean.select(
    # Identifiants
    col("cddept"),
    col("referenceprel"),
    col("cdparametresiseeaux"),
    col("cdparametre"),
    
    # Paramètre (libminparametre supprimée)
    col("libmajparametre"),
    col("libwebparametre"),
    
    # Catégorisation (nouvelles colonnes)
    col("categorie_parametre"),
    col("sous_categorie"),
    
    # Qualité et type
    col("qualitparam"),
    col("insituana"),
    
    # Résultats
    col("rqana"),
    col("valtraduite_float").alias("valtraduite"),
    
    # Unités
    col("cdunitereferencesiseeaux"),
    col("cdunitereference"),
    
    # Limites (conservées brutes)
    col("limitequal"),
    col("refqual"),
    
    # Autres
    col("casparam"),
    col("referenceanl"),
    
    # Année
    col("annee_int").alias("annee")
)

print("✅ Colonnes sélectionnées (libminparametre supprimée)")

# ============================================================
# ÉTAPE 5 : DÉDOUBLONNAGE
# ============================================================

print(f"\n🔍 Dédoublonnage des données...")
print("-" * 60)

nb_lignes_avant = df_silver.count()
print(f"  ℹ️  Lignes avant dédoublonnage : {nb_lignes_avant:,}")

# Identifier les doublons
df_doublons = df_silver.groupBy(df_silver.columns).count().filter(col("count") > 1)
nb_doublons = df_doublons.count()
print(f"  ⚠️  Doublons détectés : {nb_doublons:,}")

if nb_doublons > 0:
    print(f"\n  📊 Aperçu des doublons (TOP 5) :")
    df_doublons.select("referenceprel", "cdparametre", "valtraduite", "count") \
        .orderBy(col("count").desc()).show(5, truncate=40)

# Supprimer les doublons
df_silver_unique = df_silver.dropDuplicates()
nb_lignes_apres = df_silver_unique.count()

print(f"  ✅ Lignes après dédoublonnage : {nb_lignes_apres:,}")
print(f"  ➤ Lignes supprimées : {nb_lignes_avant - nb_lignes_apres:,}")

# ============================================================
# ÉTAPE 6 : CRÉATION DE LA TABLE SILVER
# ============================================================

print(f"\n💾 Création de la table Silver...")
print("-" * 60)

table_name = f"{DATABASE_NAME}.silver_result_clean_2"

try:
    # Supprimer la table si elle existe déjà
    spark.sql(f"DROP TABLE IF EXISTS {table_name}")
    print(f"  ℹ️  Table existante supprimée : {table_name}")
    
    # Créer la table (partitionnée par annee pour performance)
    df_silver_unique.write.format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .partitionBy("annee") \
        .saveAsTable(table_name)
    
    print(f"✅ Table créée avec succès : {table_name}")
    print(f"  ➤ Partitionnement : par annee")
    
except Exception as e:
    print(f"❌ Erreur lors de la création : {str(e)}")
    raise

# ============================================================
# ÉTAPE 7 : VÉRIFICATION ET STATISTIQUES
# ============================================================

print(f"\n📊 Statistiques finales")
print("="*60)

# Compter les lignes dans la table
nb_lignes_table = spark.table(table_name).count()
print(f"✅ Lignes dans la table : {nb_lignes_table:,}")

# Afficher le schéma
print(f"\n📋 Schéma de la table :")
print("-" * 60)
spark.table(table_name).printSchema()

# Afficher un aperçu
print(f"\n👀 Aperçu des 10 premières lignes :")
print("-" * 60)
spark.table(table_name).show(10, truncate=30)

# Statistiques par catégorie
print(f"\n📈 Répartition par catégorie de paramètre :")
print("-" * 60)
spark.sql(f"""
    SELECT 
        categorie_parametre,
        COUNT(*) as nb_analyses,
        ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) as pourcentage
    FROM {table_name}
    GROUP BY categorie_parametre
    ORDER BY nb_analyses DESC
""").show(truncate=False)

# Statistiques par année
print(f"\n📅 Répartition par année :")
print("-" * 60)
spark.sql(f"""
    SELECT 
        annee,
        COUNT(*) as nb_analyses,
        ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) as pourcentage
    FROM {table_name}
    GROUP BY annee
    ORDER BY annee DESC
""").show()

# Top 10 paramètres
print(f"\n🔝 Top 10 paramètres analysés :")
print("-" * 60)
spark.sql(f"""
    SELECT 
        libmajparametre,
        categorie_parametre,
        sous_categorie,
        COUNT(*) as nb_analyses
    FROM {table_name}
    GROUP BY libmajparametre, categorie_parametre, sous_categorie
    ORDER BY nb_analyses DESC
    LIMIT 10
""").show(truncate=40)

# Répartition qualitparam
print(f"\n🔬 Répartition par type de paramètre (Numérique/Qualitatif) :")
print("-" * 60)
spark.sql(f"""
    SELECT 
        CASE 
            WHEN qualitparam = 'N' THEN 'NUMERIQUE'
            WHEN qualitparam = 'O' THEN 'QUALITATIF'
            ELSE 'AUTRE'
        END as type_parametre,
        COUNT(*) as nb_analyses,
        ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) as pourcentage
    FROM {table_name}
    GROUP BY qualitparam
    ORDER BY nb_analyses DESC
""").show(truncate=False)

# Répartition insituana
print(f"\n🏢 Répartition par lieu d'analyse :")
print("-" * 60)
spark.sql(f"""
    SELECT 
        CASE 
            WHEN insituana = 'L' THEN 'LABORATOIRE'
            WHEN insituana = 'T' THEN 'TERRAIN'
            ELSE 'AUTRE'
        END as lieu_analyse,
        COUNT(*) as nb_analyses,
        ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) as pourcentage
    FROM {table_name}
    GROUP BY insituana
    ORDER BY nb_analyses DESC
""").show(truncate=False)

print("="*60)
print("✅ TABLE SILVER silver_result_clean_2 CRÉÉE AVEC SUCCÈS !")
print("="*60)

In [0]:
%sql
use catalog `hive_metastore`; select * from `eau_potable`.`silver_result_clean_2` limit 1000;

In [0]:
print("\n" + "="*60)
print("📤 EXPORT DES TABLES SILVER VERS DATALAKE")
print("="*60)

from pyspark.sql.functions import col
import time

# ============================================================
# ÉTAPE 1 : CONFIGURATION DES CHEMINS
# ============================================================

print(f"\n🔧 Configuration des chemins d'export...")
print("-" * 60)

# Nom du conteneur cible
container_name = "silver"

# Construire le chemin de base
datalake_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net"

# Chemins pour chaque table
path_silver_plv = f"{datalake_path}/silver_plv_clean_2"
path_silver_result = f"{datalake_path}/silver_result_clean_2"

print(f"✅ Conteneur cible : {container_name}")
print(f"✅ Chemin silver_plv_clean_2 : {path_silver_plv}")
print(f"✅ Chemin silver_result_clean_2 : {path_silver_result}")

# ============================================================
# ÉTAPE 2 : EXPORT silver_plv_clean_2
# ============================================================

print(f"\n📦 Export de silver_plv_clean_2...")
print("-" * 60)

try:
    start_time = time.time()
    
    # Lire la table depuis le metastore
    df_plv = spark.table(f"{DATABASE_NAME}.silver_plv_clean_2")
    nb_lignes_plv = df_plv.count()
    print(f"  ℹ️  Lignes à exporter : {nb_lignes_plv:,}")
    
    # Coalesce à 1 pour avoir UN SEUL fichier parquet
    df_plv_single = df_plv.coalesce(1)
    
    # Exporter en parquet
    df_plv_single.write \
        .mode("overwrite") \
        .format("parquet") \
        .save(path_silver_plv)
    
    elapsed_time = time.time() - start_time
    print(f"✅ Export silver_plv_clean_2 terminé en {elapsed_time:.2f}s")
    
except Exception as e:
    print(f"❌ Erreur lors de l'export silver_plv_clean_2 : {str(e)}")
    raise

# ============================================================
# ÉTAPE 3 : EXPORT silver_result_clean_2
# ============================================================

print(f"\n📦 Export de silver_result_clean_2...")
print("-" * 60)

try:
    start_time = time.time()
    
    # Lire la table depuis le metastore
    df_result = spark.table(f"{DATABASE_NAME}.silver_result_clean_2")
    nb_lignes_result = df_result.count()
    print(f"  ℹ️  Lignes à exporter : {nb_lignes_result:,}")
    
    # Coalesce à 1 pour avoir UN SEUL fichier parquet
    df_result_single = df_result.coalesce(1)
    
    # Exporter en parquet
    df_result_single.write \
        .mode("overwrite") \
        .format("parquet") \
        .save(path_silver_result)
    
    elapsed_time = time.time() - start_time
    print(f"✅ Export silver_result_clean_2 terminé en {elapsed_time:.2f}s")
    
except Exception as e:
    print(f"❌ Erreur lors de l'export silver_result_clean_2 : {str(e)}")
    raise

# ============================================================
# ÉTAPE 4 : VÉRIFICATION DES EXPORTS
# ============================================================

print(f"\n🔍 Vérification des exports...")
print("="*60)

# Vérification silver_plv_clean_2
print(f"\n📂 Vérification : silver_plv_clean_2")
print("-" * 60)
try:
    df_plv_verify = spark.read.parquet(path_silver_plv)
    nb_lignes_plv_verify = df_plv_verify.count()
    nb_colonnes_plv = len(df_plv_verify.columns)
    
    print(f"  ✅ Fichier lu avec succès")
    print(f"  ✅ Lignes : {nb_lignes_plv_verify:,}")
    print(f"  ✅ Colonnes : {nb_colonnes_plv}")
    
    # Vérifier que le nombre de lignes correspond
    if nb_lignes_plv == nb_lignes_plv_verify:
        print(f"  ✅ Cohérence vérifiée : {nb_lignes_plv:,} lignes")
    else:
        print(f"  ⚠️  Incohérence : {nb_lignes_plv:,} → {nb_lignes_plv_verify:,}")
    
    # Afficher aperçu
    print(f"\n  👀 Aperçu (5 premières lignes) :")
    df_plv_verify.show(5, truncate=30)
    
except Exception as e:
    print(f"  ❌ Erreur de vérification : {str(e)}")

# Vérification silver_result_clean_2
print(f"\n📂 Vérification : silver_result_clean_2")
print("-" * 60)
try:
    df_result_verify = spark.read.parquet(path_silver_result)
    nb_lignes_result_verify = df_result_verify.count()
    nb_colonnes_result = len(df_result_verify.columns)
    
    print(f"  ✅ Fichier lu avec succès")
    print(f"  ✅ Lignes : {nb_lignes_result_verify:,}")
    print(f"  ✅ Colonnes : {nb_colonnes_result}")
    
    # Vérifier que le nombre de lignes correspond
    if nb_lignes_result == nb_lignes_result_verify:
        print(f"  ✅ Cohérence vérifiée : {nb_lignes_result:,} lignes")
    else:
        print(f"  ⚠️  Incohérence : {nb_lignes_result:,} → {nb_lignes_result_verify:,}")
    
    # Afficher aperçu
    print(f"\n  👀 Aperçu (5 premières lignes) :")
    df_result_verify.show(5, truncate=30)
    
except Exception as e:
    print(f"  ❌ Erreur de vérification : {str(e)}")

# ============================================================
# ÉTAPE 5 : LISTER LES FICHIERS CRÉÉS
# ============================================================

print(f"\n📋 Liste des fichiers créés dans le datalake")
print("="*60)

# Lister fichiers silver_plv_clean_2
print(f"\n📁 Dossier : silver_plv_clean_2")
print("-" * 60)
try:
    files_plv = dbutils.fs.ls(path_silver_plv)
    for file in files_plv:
        size_mb = file.size / (1024 * 1024)
        print(f"  📄 {file.name} - {size_mb:.2f} MB")
except Exception as e:
    print(f"  ❌ Erreur : {str(e)}")

# Lister fichiers silver_result_clean_2
print(f"\n📁 Dossier : silver_result_clean_2")
print("-" * 60)
try:
    files_result = dbutils.fs.ls(path_silver_result)
    for file in files_result:
        size_mb = file.size / (1024 * 1024)
        print(f"  📄 {file.name} - {size_mb:.2f} MB")
except Exception as e:
    print(f"  ❌ Erreur : {str(e)}")

# ============================================================
# RÉSUMÉ FINAL
# ============================================================

print(f"\n" + "="*60)
print("✅ EXPORT TERMINÉ AVEC SUCCÈS")
print("="*60)

print(f"\n📊 Résumé de l'export :")
print("-" * 60)
print(f"  ✅ silver_plv_clean_2 : {nb_lignes_plv:,} lignes exportées")
print(f"  ✅ silver_result_clean_2 : {nb_lignes_result:,} lignes exportées")
print(f"\n📍 Localisation :")
print(f"  📂 Conteneur : {container_name}")
print(f"  📂 Storage Account : {storage_account_name}")
print(f"\n📦 Format : Parquet (1 fichier par table)")

print("="*60)