Détection automatique du catalog Unity et des schémas

In [0]:
catalogs = [row.catalog for row in spark.sql("SHOW CATALOGS").collect()]
unity_catalogs = [c for c in catalogs if c != "hive_metastore"]

if len(unity_catalogs) == 1:
    default_catalog = unity_catalogs[0]
else:

    default_catalog = next((c for c in unity_catalogs if c.startswith("dbw_")), "hive_metastore")

dbutils.widgets.text("my_catalog", default_catalog, "Catalog détecté")
catalog = dbutils.widgets.get("my_catalog")
    
dbutils.widgets.text("my_schema", "silver", "Schéma Silver")

silver_schema = dbutils.widgets.get("my_schema")
bronze_schema = "bronze"
logs_schema = "logs"
log_table = "silver_processing_log"


Récupération de toutes les tables bronze existantes

In [0]:
bronze_tables = [
    row.tableName for row in spark.sql(f"SHOW TABLES IN {catalog}.{bronze_schema}").collect()
    if row.tableName.startswith("bronze_")
]

Fonction de log

In [0]:
from datetime import datetime
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType

def log_silver_processing_result(table_name, status, rows_inserted=None, message=None):
    schema = StructType([
        StructField("table_name", StringType(), False),
        StructField("timestamp", TimestampType(), False),
        StructField("status", StringType(), False),
        StructField("rows_inserted", IntegerType(), True),
        StructField("message", StringType(), True)
    ])

    data = [{
        "table_name": table_name,
        "timestamp": datetime.now(),
        "status": status,
        "rows_inserted": int(rows_inserted) if rows_inserted is not None else None,
        "message": message[:5000] if message else None
    }]

    df_log = spark.createDataFrame(data, schema=schema)
    df_log.write.mode("append").format("delta").saveAsTable(f"{catalog}.{logs_schema}.{log_table}")


Fonction SCD2

In [0]:
from pyspark.sql.functions import sha2, concat_ws, current_timestamp, lit, col, row_number, coalesce
from pyspark.sql.window import Window
from delta.tables import DeltaTable

def process_scd2(table_bronze):
    try:
        # Construction des noms de tables Silver et Bronze
        table_suffix = table_bronze.replace("bronze_", "")
        table_silver = f"silver_{table_suffix}"

        full_bronze = f"{catalog}.{bronze_schema}.{table_bronze}"
        full_silver = f"{catalog}.{silver_schema}.{table_silver}"

        # Chargement des données Bronze
        bronze_df = spark.table(full_bronze)

        # Définition des colonnes à exclure du hash
        colonnes_techniques = ["ingestion_timestamp", "valid_from", "valid_to", "is_current", "hash"]
        columns_to_hash = [c for c in bronze_df.columns if c not in colonnes_techniques]

        # Détection des colonnes clé primaire (exclut rowguid explicitement)
        primary_keys = [c for c in columns_to_hash if c.lower().endswith("id") and c.lower() != "rowguid"]
        if not primary_keys:
            message = f"Aucune clé primaire détectée pour {table_bronze}. Table ignorée."
            log_silver_processing_result(table_silver, "KO", message=message)
            return

        # Calcul du hash uniquement à partir des colonnes métier
        bronze_hashed = bronze_df.withColumn("hash", sha2(concat_ws("||", *columns_to_hash), 256))

        # Initialisation des tables Silver si elles n'existent pas
        if not spark._jsparkSession.catalog().tableExists(full_silver):
            silver_initial = bronze_hashed \
                .withColumn("valid_from", current_timestamp()) \
                .withColumn("valid_to", lit(None).cast("timestamp")) \
                .withColumn("is_current", lit(True))

            silver_initial.write.format("delta").saveAsTable(full_silver)
            log_silver_processing_result(
                table_silver, "OK",
                rows_inserted=silver_initial.count(),
                message="Table Silver initialisée"
            )
            return

        # Chargement de la Silver existante active
        silver_df = spark.table(full_silver).filter("is_current = true")

        # Création des colonnes temporaires pour stabiliser les PK sans toucher aux données métier
        for pk in primary_keys:
            bronze_hashed = bronze_hashed.withColumn(f"_pk_{pk}", coalesce(col(pk).cast("string"), lit("__NULL__")))
            silver_df = silver_df.withColumn(f"_pk_{pk}", coalesce(col(pk).cast("string"), lit("__NULL__")))

        # Détection des lignes modifiées ou nouvelles
        join_condition = [col(f"src._pk_{pk}") == col(f"tgt._pk_{pk}") for pk in primary_keys]
        joined_df = bronze_hashed.alias("src").join(
            silver_df.alias("tgt"),
            on=join_condition,
            how="left"
        )

        changes_df = joined_df.filter("tgt.hash IS NULL OR src.hash != tgt.hash") \
                              .select("src.*")

        # Si aucun changement détecté, log et sortie
        if changes_df.isEmpty():
            log_silver_processing_result(
                table_silver, "OK",
                rows_inserted=0,
                message="Aucun changement détecté"
            )
            return

        # Sécurisation : ne garder qu'une seule version par combinaison de clés
        window_spec = Window.partitionBy(*primary_keys).orderBy("hash")
        changes_df = changes_df.withColumn("row_num", row_number().over(window_spec)) \
                               .filter("row_num = 1") \
                               .drop("row_num")

        insert_count = changes_df.count()

        silver_delta = DeltaTable.forName(spark, full_silver)

        # Construction dynamique du merge_condition
        merge_condition = " AND ".join([f"tgt.{pk} = src.{pk}" for pk in primary_keys])

        # Mise à jour des lignes existantes pour les fermer
        silver_delta.alias("tgt").merge(
            source=changes_df.alias("src"),
            condition=merge_condition
        ).whenMatchedUpdate(
            condition="tgt.is_current = true",
            set={
                "valid_to": current_timestamp(),
                "is_current": lit(False)
            }
        ).execute()

        # Nettoyage avant insertion dans Silver (retirer les colonnes techniques _pk_*)
        changes_to_insert = changes_df \
            .withColumn("valid_from", current_timestamp()) \
            .withColumn("valid_to", lit(None).cast("timestamp")) \
            .withColumn("is_current", lit(True))

        cols_to_keep = [c for c in changes_to_insert.columns if not c.startswith("_pk_")]
        changes_to_insert = changes_to_insert.select(*cols_to_keep)

        # Insertion des nouvelles versions actives
        changes_to_insert.write.format("delta").mode("append").saveAsTable(full_silver)

        # Log final
        log_silver_processing_result(
            table_silver, "OK",
            rows_inserted=insert_count,
            message="Traitement SCD2 appliqué"
        )

    except Exception as e:
        table_suffix = table_bronze.replace("bronze_", "")
        table_silver = f"silver_{table_suffix}"
        log_silver_processing_result(table_silver, "KO", message=str(e))

# Exécution de la fonction SCD2 en bouclant sur toutes les tables bronze
for table in bronze_tables:
    process_scd2(table)