In [0]:
# ======================================================================================
# BIBLIOTHÈQUES
# ======================================================================================
import logging
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, to_timestamp, trim, upper, current_timestamp, regexp_replace
from pyspark.sql.types import IntegerType, DecimalType

# ======================================================================================
# 1. DÉCLARATION DES PARAMÈTRES (WIDGETS)
# ======================================================================================
dbutils.widgets.text("storage_account", "stsalesinsightcuxm0611", "Nom du compte de stockage")
dbutils.widgets.text("container", "data", "Nom du conteneur")
dbutils.widgets.text("source_folder", "bronze/sales_orders/", "Dossier source dans la couche Bronze")
dbutils.widgets.text("destination_folder", "silver/sales_orders/", "Dossier de destination dans la couche Silver")
dbutils.widgets.text("date_format", "M/d/yyyy H:mm", "Format de date de la source")

# ======================================================================================
# 2. DÉFINITION DES FONCTIONS
# ======================================================================================

def read_data_from_bronze(source_path: str) -> DataFrame:
    """
    Lit les données au format Parquet depuis le chemin source spécifié.
    :param source_path: Chemin complet vers les données sources dans la couche Bronze.
    :return: DataFrame Spark contenant les données brutes.
    """
    logging.info(f"Début de la lecture depuis : {source_path}")
    try:
        df = spark.read.format("parquet").load(source_path)
        logging.info(f"Lecture réussie.")
        return df
    except Exception as e:
        logging.error(f"❌ ERREUR lors de la lecture des données depuis {source_path}", exc_info=True)
        raise e

def transform_bronze_to_silver(bronze_df: DataFrame, date_f: str) -> DataFrame:
    """
    Applique toutes les transformations métier pour nettoyer et standardiser les données.
    :param bronze_df: DataFrame contenant les données brutes.
    :param date_f: Format de date à utiliser pour la conversion.
    :return: DataFrame transformé, prêt pour la couche Silver.
    """
    logging.info("Début des transformations de données.")
    
    # Caster les colonnes dans les types de données corrects
    df_typed = bronze_df \
        .withColumn("QUANTITYORDERED", col("QUANTITYORDERED").cast(IntegerType())) \
        .withColumn("PRICEEACH", regexp_replace(col("PRICEEACH"), ',', '.').cast(DecimalType(10, 2))) \
        .withColumn("SALES", col("SALES").cast(DecimalType(12, 2))) \
        .withColumn("MSRP", col("MSRP").cast(DecimalType(10, 2))) \
        .withColumn("ORDERLINENUMBER", col("ORDERLINENUMBER").cast(IntegerType())) \
        .withColumn("ORDERDATE", to_timestamp(col("ORDERDATE"), date_f))

    # Nettoyer et standardiser les colonnes texte
    df_cleaned = df_typed \
        .withColumn("STATUS", trim(upper(col("STATUS")))) \
        .withColumn("PRODUCTLINE", trim(upper(col("PRODUCTLINE")))) \
        .withColumn("DEALSIZE", trim(upper(col("DEALSIZE")))) \
        .withColumn("COUNTRY", trim(upper(col("COUNTRY"))))

    # Ajouter une colonne d'audit pour la traçabilité
    df_silver = df_cleaned.withColumn("silver_processing_timestamp_utc", current_timestamp())

    logging.info("Transformations terminées avec succès.")
    return df_silver

def write_data_to_silver(silver_df: DataFrame, destination_path: str) -> None:
    """
    Écrit le DataFrame transformé dans la couche Silver au format Delta, en écrasant les données existantes.
    :param silver_df: DataFrame transformé.
    :param destination_path: Chemin complet du dossier de destination dans la couche Silver.
    """
    logging.info(f"Début de l'écriture vers : {destination_path}")
    try:
        silver_df.write \
            .format("delta") \
            .mode("overwrite") \
            .option("overwriteSchema", "true") \
            .save(destination_path)
        logging.info("✅ Écriture dans la couche Silver terminée avec succès.")
    except Exception as e:
        logging.error(f"❌ ERREUR lors de l'écriture dans la couche Silver.", exc_info=True)
        raise e

# ======================================================================================
# 3. POINT D'ENTRÉE PRINCIPAL (MAIN)
# ======================================================================================
if __name__ == "__main__":
    
    # Configuration du logging
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
    
    logging.info("===================================================")
    logging.info("DÉMARRAGE DU PIPELINE BRONZE-TO-SILVER")
    logging.info("===================================================")

    try:
        # Récupération des paramètres et construction des chemins
        storage_account_name = dbutils.widgets.get("storage_account").strip()
        container_name = dbutils.widgets.get("container").strip()
        source_folder = dbutils.widgets.get("source_folder").strip()
        dest_folder = dbutils.widgets.get("destination_folder").strip()
        date_format_str = dbutils.widgets.get("date_format").strip()

        source_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/{source_folder}"
        destination_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/{dest_folder}"

        scope_name = "dbricks-scope-projet"
        secret_key_name = "adls-access-key"

        # Configuration de l'authentification
        access_key = dbutils.secrets.get(scope=scope_name, key=secret_key_name)
        spark.conf.set(f"fs.azure.account.key.{storage_account_name}.dfs.core.windows.net", access_key)

        # Orchestration des appels de fonctions
        bronze_dataframe = read_data_from_bronze(source_path)
        silver_dataframe = transform_bronze_to_silver(bronze_dataframe, date_format_str)
        write_data_to_silver(silver_dataframe, destination_path)

        logging.info("===================================================")
        logging.info("PIPELINE BRONZE-TO-SILVER TERMINÉ AVEC SUCCÈS")
        logging.info("===================================================")

    except Exception as e:
        logging.error("Le pipeline a échoué dans le bloc principal.", exc_info=True)
        raise e