# Import

In [None]:
import os
import glob
import logging
from pyspark.sql import SparkSession
from pyspark.sql import functions as F, Window

# Config

In [None]:
STORAGE_ACCOUNT_NAME = os.environ["STORAGE_ACCOUNT_NAME"]
FILESYSTEM_NAME_SILVER = os.environ["CONTAINER_SILVER"]
SECRET_SCOPE_NAME = os.environ["SECRET_SCOPE_NAME"]
SECRET_KEY_NAME = os.environ["SECRET_KEY_NAME"]
MOUNT_POINT_SILVER = "/mnt/donnees-qualite-eau-silver"

# Configuration du logging

In [None]:
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s"
)
logger = logging.getLogger(__name__)

# Vérifier / Créer le montage du Data Lake si nécessaire

In [None]:
def ensure_mount(mount_point: str, container_name: str):
    """
    Vérifie si un point de montage Databricks existe, et le crée si nécessaire.

    Args:
        mount_point (str): Chemin du point de montage local dans Databricks (ex: '/mnt/datalake').
        container_name (str): Nom du conteneur Blob Storage à monter.

    Raises:
        Exception: Si la création du montage échoue.

    Remarques:
        - Utilise les variables globales SCOPE, KEY_NAME et STORAGE_ACCOUNT pour accéder aux secrets et au compte de stockage.
        - Si le montage existe déjà, la fonction ne fait rien.
    """
    try:
        mounts = [m.mountPoint for m in dbutils.fs.mounts()]
        if mount_point in mounts:
            logger.info(f"Mount already exists: {mount_point}")
            return

        logger.info(f"Mount does not exist. Creating mount at {mount_point}...")
        storage_key = dbutils.secrets.get(scope=SECRET_SCOPE_NAME, key=SECRET_KEY_NAME)

        configs = {
            f"fs.azure.account.key.{STORAGE_ACCOUNT_NAME}.blob.core.windows.net": storage_key
        }

        dbutils.fs.mount(
            source=f"wasbs://{container_name}@{STORAGE_ACCOUNT_NAME}.blob.core.windows.net/",
            mount_point=mount_point,
            extra_configs=configs
        )
        logger.info(f"Mount created successfully at {mount_point}")

    except Exception as e:
        logger.error(f"Failed to ensure mount: {e}", exc_info=True)
        raise

In [None]:
# Crée les montages si besoin
ensure_mount(MOUNT_POINT_SILVER,FILESYSTEM_NAME_SILVER)

# Fonction utilitaire

In [None]:
def transform_com_silver(df):
    # Renommer les colonnes
    df = (
        df
        .withColumnRenamed("Inseecommune", "insee_commune")
        .withColumnRenamed("Nomcommune", "nom_commune")
        .withColumnRenamed("Quartier", "quartier")
        .withColumnRenamed("Cdreseau", "cd_reseau")
        .withColumnRenamed("Nomreseau", "nom_reseau")
        .withColumnRenamed("Debutalim", "debut_alim")
    )
    
    # Convertir et enrichissement
    df = (
        df
        .withColumn("debut_alim", F.to_date("debut_alim", "yyyy-MM-dd"))
        .withColumn("updated_at", F.current_timestamp())
    )
    
    # Garder la ligne avec l'année la plus récente pour chaque cd_reseau
    window_spec = Window.partitionBy(["cd_reseau","insee_commune","quartier"]).orderBy(F.col("annee").desc())
    df = (
        df
        .withColumn("row_number", F.row_number().over(window_spec))
        .filter(F.col("row_number") == 1)
        .drop("row_number")
    )
    # Liste finale des colonnes à garder
    colonnes_a_garder = [
        "insee_commune",
        "nom_commune",
        "quartier",
        "cd_reseau",
        "nom_reseau",
        "debut_alim",
        "annee",
        "updated_at"
    ]

    # Sélectionner uniquement ces colonnes (drop tout le reste automatiquement)
    df = df.select([c for c in colonnes_a_garder if c in df.columns])
    return df

In [None]:
def transform_plv_silver(df):
    df = (
        df
        .withColumnRenamed("cddept", "cd_dept")
        .withColumnRenamed("cdreseau", "cd_reseau")
        .withColumnRenamed("referenceprel", "reference_prel")
        .withColumnRenamed("dateprel", "date_prel")
        .withColumnRenamed("heureprel", "heure_prel")
        .withColumnRenamed("inseecommuneprinc", "insee_commune_princ")
        .withColumnRenamed("nomcommuneprinc", "nom_commune_princ")
        .withColumnRenamed("cdreseauamont", "cd_reseau_amont")
        .withColumnRenamed("nomreseauamont", "nom_reseau_amont")
        .withColumnRenamed("pourcentdebit", "pourcent_debit")
        .withColumnRenamed("plvconformitebacterio", "plv_conformite_bacterio")
        .withColumnRenamed("plvconformitechimique", "plv_conformite_chimique")
        .withColumnRenamed("plvconformitereferencebact", "plv_conformite_reference_bact")
        .withColumnRenamed("plvconformitereferencechim", "plv_conformite_reference_chim")
        .withColumnRenamed("conclusionprel", "conclusion_prel")
        .withColumnRenamed("ugelib", "unite_gestion")
        .withColumnRenamed("distrlib", "organisme_exploitant")
        .withColumnRenamed("moalib", "maitre_ouvrage")
    )
    # Convertir et enrichissement
    df = (
        df
        .withColumn("date_prel", F.to_date("date_prel", "yyyy-MM-dd"))
        .withColumn("updated_at", F.current_timestamp())
    )
    # Garder la ligne avec l'année la plus récente pour chaque cd_reseau
    df = (
        df
        .dropDuplicates(["cd_reseau","insee_commune_princ","date_prel"])
    )
    # Liste finale des colonnes à garder
    colonnes_a_garder = [
        "cd_dept",
        "cd_reseau",
        "reference_prel",
        "date_prel",
        "heure_prel",
        "insee_commune_princ",
        "nom_commune_princ",
        "cd_reseau_amont",
        "nom_reseau_amont",
        "pourcent_debit",
        "plv_conformite_bacterio",
        "plv_conformite_chimique",
        "plv_conformite_reference_bact",
        "plv_conformite_reference_chim",
        "conclusion_prel",
        "unite_gestion",
        "organisme_exploitant",
        "maitre_ouvrage",
        "updated_at",
        "annee"
    ]

    # Garder uniquement ces colonnes (drop tout le reste)
    df = df.select([c for c in colonnes_a_garder if c in df.columns])
    return df

In [None]:
def transform_result_silver(df):
    df = (
        df
        .withColumnRenamed("cddept", "cd_dept")
        .withColumnRenamed("referenceprel", "reference_prel")
        .withColumnRenamed("cdparametresiseeaux", "cd_parametre_sise_eaux")
        .withColumnRenamed("cdparametre", "cd_parametre")
        .withColumnRenamed("libminparametre", "lib_parametre")
        .withColumnRenamed("libwebparametre", "lib_parametre_gp")
        .withColumnRenamed("qualitparam", "is_qualitatif")
        .withColumnRenamed("insituana", "insitu_analyse")
        .withColumnRenamed("rqana", "resultat_analyse")
        .withColumnRenamed("cdunitereferencesiseeaux", "cd_unite_reference_sise_eaux")
        .withColumnRenamed("cdunitereference", "cd_unite_reference")
        .withColumnRenamed("limitequal", "limite_qualite")
        .withColumnRenamed("refqual", "ref_qualite")
        .withColumnRenamed("valtraduite", "val_traduite")
        .withColumnRenamed("casparam", "cd_cas_param")
        .withColumnRenamed("referenceanl", "cd_ana_labo")
    )
    # Convertir et enrichissement
    df = (
        df
        .withColumn("updated_at", F.current_timestamp())
    )
     # Transformer les O/N en True/False
    df = df.withColumn(
        "is_qualitatif",
        F.when(F.upper(F.col("is_qualitatif")) == "O", F.lit(True))
         .when(F.upper(F.col("is_qualitatif")) == "N", F.lit(False))
         .otherwise(F.lit(None))
    )
    # Garder la ligne avec l'année la plus récente pour chaque cd_reseau
    df = (
        df
        .dropDuplicates(["cd_dept","reference_prel","cd_parametre"])
    )
    # Liste des colonnes finales à garder
    colonnes_a_garder = [
        "cd_dept",
        "reference_prel",
        "cd_parametre_sise_eaux",
        "cd_parametre",
        "lib_parametre",
        "lib_parametre_gp",
        "is_qualitatif",
        "insitu_analyse",
        "resultat_analyse",
        "cd_unite_reference_sise_eaux",
        "cd_unite_reference",
        "limite_qualite",
        "ref_qualite",
        "val_traduite",
        "cd_cas_param",
        "cd_ana_labo",
        "updated_at",
        "annee"
    ]

    # Garder uniquement ces colonnes (drop tout le reste automatiquement)
    df = df.select([c for c in colonnes_a_garder if c in df.columns])
    return df

In [None]:
def upsert_delta_table(df, table_name: str, key_cols: list, partition_cols: list = ["annee"]):
    """
    Simule un UPSERT (update + insert) sur une table Delta sans utiliser DeltaTable.

    Args:
        df (DataFrame): Nouveau DataFrame à fusionner.
        table_name (str): Nom complet de la table cible (ex: 'silver.dis_plv').
        key_cols (list): Liste des colonnes servant de clé unique.
        partition_cols (list ): Colonne(s) de partition.

    Returns:
        None
    """
    try:
        schema_name = table_name.split(".")[0]
        spark.sql(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")

        # Vérifie si la table existe déjà
        table_exists = spark.catalog.tableExists(table_name)

        if table_exists:
            logger.info(f"Fusion incrémentale dans '{table_name}'...")

            df_existing = spark.table(table_name)
            # Union + suppression des doublons selon les clés métier
            df_merged = (
                df_existing.unionByName(df, allowMissingColumns=True)
                .dropDuplicates(key_cols)
            )
        else:
            logger.info(f"Création de la table '{table_name}'...")
            df_merged = df


        # Écriture finale (overwrite total car on a fusionné en mémoire)
        (
            df_merged.write
            .format("delta")
            .mode("overwrite")
            .option("overwriteSchema", "true")
            .partitionBy(*partition_cols)
            .saveAsTable(table_name)
        )

        logger.info(f"Upsert Spark-only terminé pour '{table_name}'")

    except Exception as e:
        logger.error(f"Erreur lors de l'upsert dans '{table_name}': {e}", exc_info=True)
        raise

In [None]:
def write_df_to_parquet(df, path: str, mode: str = "overwrite"):
    """
    Écrit un DataFrame Spark en format Parquet dans le Data Lake.

    Args:
        df (DataFrame): Le DataFrame Spark à écrire.
        path (str): Chemin complet dans le Data Lake (ex: '/mnt/datalake/bronze/dis_plv/').
        mode (str): Mode d’écriture ('overwrite', 'append', etc.).
    """
    try:
        if df.isEmpty():
            logger.warning(f"Aucune donnée à écrire dans {path}.")
            return

        logger.info(f"Écriture en Parquet dans {path} (mode={mode})...")
        df.write.mode(mode).parquet(path)
        logger.info(f"Données stockées avec succès dans {path}")

    except Exception as e:
        logger.error(f"Erreur écriture Parquet dans {path}: {e}", exc_info=True)

# Récupération des données du schema Bronze

In [None]:
df_com_bronze = spark.table("bronze.dis_com")
df_plv_bronze = spark.table("bronze.dis_plv")
df_result_bronze = spark.table("bronze.dis_result")

# Transformation des données

In [None]:
df_com_silver = transform_com_silver(df_com_bronze)
df_plv_silver = transform_plv_silver(df_plv_bronze)
df_result_silver= transform_result_silver(df_result_bronze)

# Insertion dans le schema Silver

In [None]:
upsert_delta_table(df_com_silver,"silver.dis_com",["cd_reseau","insee_commune","quartier"],["annee"])
upsert_delta_table(df_plv_silver,"silver.dis_plv",["cd_reseau","insee_commune_princ","date_prel"],["annee","cd_dept"])
upsert_delta_table(df_result_silver,"silver.dis_result",["cd_dept","reference_prel","cd_parametre"],["annee","cd_dept"])

# Copie en parque dans le datalake

In [None]:
write_df_to_parquet(df_com_silver, f"{MOUNT_POINT_SILVER}/dis_com/")
write_df_to_parquet(df_plv_silver, f"{MOUNT_POINT_SILVER}/dis_plv/")
write_df_to_parquet(df_result_silver, f"{MOUNT_POINT_SILVER}/dis_result/")