## Librerias

In [0]:
from pyspark.sql import DataFrame
from typing import Tuple, Dict
from pyspark.sql.functions import col, current_timestamp, lit, when
from pyspark.sql.types import StringType
from delta.tables import DeltaTable
from datetime import datetime
import re

## Variables

In [0]:
def variables_globales() -> Dict:
    """
    Recupera y devuelve las variables globales de configuración de almacenamiento.

    Returns:
        Dict: Un diccionario con:
            - container (str): Nombre del contenedor en ADLS Gen2,
              obtenido desde el Secret Scope "scope-mbc" y la clave "secret-env-container".
            - storage_account (str): Nombre de la cuenta de almacenamiento,
              obtenido desde el Secret Scope "scope-mbc" y la clave "secret-env-storage-account".
            - path_base (str): Ruta base ABFSS al contenedor, en el formato:
              "abfss://<container>@<storage_account>.dfs.core.windows.net"
    """
    # Recupera el nombre del contenedor desde Key Vault
    container = dbutils.secrets.get("scope-mbc", "secret-env-container")
    # Recupera el nombre de la cuenta de almacenamiento desde Key Vault
    storage_account = dbutils.secrets.get("scope-mbc", "secret-env-storage-account")
    # Construye la ruta base ABFSS al contenedor montado en ADLS Gen2
    path_base = f"abfss://{container}@{storage_account}.dfs.core.windows.net"
    
    return {
        "container": container,
        "storage_account": storage_account,
        "path_base": path_base
    }

## Funciones

In [0]:
def read_landing(
    path: str
) -> DataFrame:
    """
    Lee datos crudos desde la carpeta de Landing en formato Parquet y
    normaliza todas las columnas a tipo string para mantener uniformidad.

    Args:
        path (str): Ruta relativa dentro de la carpeta Landing, por ejemplo:
                    "bronze/sales/CountryRegionCurrency".

    Returns:
        DataFrame: Spark DataFrame con todas las columnas casteadas a string,
                   listo para procesar en la capa Bronze o Silver.
    """
    # Obtiene la ruta base de ADLS Gen2 (abfss://container@storage_account.dfs.core.windows.net)
    path_base = variables_globales()["path_base"]

    # Carga los datos crudos en Parquet desde la ubicación completa
    df = spark.read.format("parquet").load(f"{path_base}/{path}")

    # Para evitar inconsistencias de tipos en downstream, convierte todas las columnas a string
    columns_to_cast = [
        col(c).cast("string").alias(c)  # Mantiene el mismo nombre de columna
        for c in df.columns
    ]

    # Devuelve el DataFrame con el casting aplicado en todas las columnas
    return df.select(*columns_to_cast)


In [0]:
def write_bronze(
    df: DataFrame,
    table_name: str
) -> None:
    """
    Escribe datos en la tabla Bronze (Delta) usando un merge para solo insertar
    los registros nuevos, sin actualizar los existentes.

    Args:
        df (DataFrame): DataFrame de Spark con los datos crudos a insertar.
        table_name (str): Nombre completo de la tabla Delta en el metastore
                          (por ejemplo "bronze.sales_countryregioncurrency").

    Returns:
        None: La función imprime un log de la operación y llama a `insert_metrics()`
              para recopilar y almacenar métricas de ingesta.
    """

    # Carga la tabla Delta existente por nombre
    delta_table = DeltaTable.forName(spark, table_name)

    # Prepara el mapeo de columnas para el insert:
    # todas las columnas de la tabla excepto la columna de auditoría IngestionDate
    columns_to_insert = {
        col_name: f"in.{col_name}"
        for col_name in delta_table.toDF().columns
        if col_name not in ["IngestionDate"]
    }

    # Ejecuta el MERGE:
    # - Condición de match: lit(False) para que nunca haya coincidencia,
    #   de modo que todos los registros vengan por la rama whenNotMatched
    # - whenNotMatchedInsert: inserta todos los registros del DataFrame df
    delta_table.alias("m") \
        .merge(
            df.alias("in"),
            lit(False)  # fuerza siempre el insert-only
        ) \
        .whenNotMatchedInsert(values=columns_to_insert) \
        .execute()

    # Log de confirmación
    print(f"MERGE exitoso en tabla {table_name}")

    # Llama a la función de métricas tras la inserción
    return insert_metrics(get_metrics(table_name))


In [0]:
def get_metrics(
    table_name: str
) -> Tuple[Dict[str, int], str]:
    """
    Extrae métricas de la última operación Delta Lake (MERGE/WRITE) realizada sobre una tabla Delta.

    Args:
        table_name (str): Nombre completo de la tabla Delta en el metastore
                          (p. ej. "bronze.sales_countryregioncurrency").

    Returns:
        Tuple[Dict[str, int], str]:
            - Dict[str, int]: Diccionario con las métricas de operación extraídas de `operationMetrics`,
              convertido a enteros. Posibles claves:
                * numInsertedRows   — número de filas insertadas
                * numUpdatedRows    — número de filas actualizadas
                * numDeletedRows    — número de filas eliminadas
                * numOutputBytes    — bytes escritos en disco
            - str: El mismo `table_name`, para facilitar su paso a funciones encadenadas.
    """

    # Carga la tabla Delta por su nombre en el metastore
    delta_table = DeltaTable.forName(spark, table_name)

    try:
        # Recupera el historial de transacciones, ordenado de más reciente a más antiguo
        history_df = delta_table.history()

        # Recorre cada fila del historial buscando la operación MERGE o WRITE más reciente
        for row in history_df.collect():
            if row.operation in ("MERGE", "WRITE"):
                # Obtiene el diccionario de métricas de la operación
                raw_metrics = row.asDict().get("operationMetrics", {})
                # Convierte cada valor numérico de string a int, ignorando claves no numéricas
                numeric_metrics = {}
                for key, value in raw_metrics.items():
                    try:
                        numeric_metrics[key] = int(value)
                    except (ValueError, TypeError):
                        # Si no es un número, se omite
                        continue
                return numeric_metrics, table_name

        # Si no encuentra operación MERGE/WRITE, devuelve diccionario vacío
        return {}, table_name

    except Exception as e:
        # En caso de error al obtener el historial, imprime el error y devuelve vacío
        print(f"Warning: no se pudieron extraer métricas de {table_name}: {e}")
        return {}, table_name


In [0]:
def merge(
    table_name: str,
    df: DataFrame,
    identity_column: list = [],
    enable_delete: bool = False
) -> Dict:
    """
    Ejecuta un MERGE dinámico sobre una tabla Delta Lake usando su clave primaria.

    Esta función:
      1. Consulta automáticamente las columnas de clave primaria desde el metastore Delta.
      2. Construye las condiciones de merge basadas en esas columnas.
      3. Excluye columnas de auditoría o identidad de las operaciones de actualización/inserción.
      4. Inserta nuevas filas y actualiza existentes.
      5. Opcionalmente, elimina filas en destino que ya no estén en el DataFrame fuente.
      6. Devuelve las métricas (insertadas, actualizadas, eliminadas) para alimentar la tabla de control.

    Args:
        table_name (str): Nombre de la tabla destino en formato 'schema.table'.
        df (DataFrame): DataFrame fuente con los registros a mergear.
        identity_column (list, optional): Lista de columnas de identidad (surrogate keys)
                                          que no deben insertarse manualmente. Default: [].
        enable_delete (bool, optional): Si True, elimina en destino las filas ausentes
                                        en el DataFrame fuente. Default: False.

    Returns:
        Dict: Métricas extraídas tras ejecutar el merge, con las claves:
              - numInsertedRows
              - numUpdatedRows
              - numDeletedRows
              - numOutputBytes
    """

    # 1. Recuperar las columnas que componen la clave primaria de la tabla
    query = f"""
    SELECT cu.column_name
      FROM system.information_schema.key_column_usage AS cu
      JOIN system.information_schema.table_constraints AS tc
        ON cu.constraint_catalog = tc.constraint_catalog
       AND cu.constraint_schema  = tc.constraint_schema
       AND cu.constraint_name    = tc.constraint_name
     WHERE concat_ws('.', cu.table_schema, cu.table_name) = '{table_name}'
       AND tc.constraint_type = 'PRIMARY KEY'
       AND cu.table_catalog = 'lakehouse'
     ORDER BY cu.ordinal_position
    """
    df_query = spark.sql(query)
    columns_key = [row['column_name'] for row in df_query.collect()]

    # 2. Construir la condición de merge usando cada columna de la clave primaria
    merge_conditions = " AND ".join([f"m.{c} = src.{c}" for c in columns_key])

    # 3. Cargar la tabla Delta destino
    delta_table = DeltaTable.forName(spark, table_name)
    target_columns = delta_table.toDF().columns

    # 4. Definir qué columnas actualizar (excluyendo PKs y columnas de auditoría)
    exclusion_list_update = set(columns_key + ["FechaAuditoriaCreacion"])
    columns_to_update = {
        col: f"src.{col}"
        for col in target_columns
        if col not in exclusion_list_update
    }

    # 5. Definir qué columnas insertar (excluyendo columnas de identidad)
    exclusion_list_insert = set(identity_column)
    columns_to_insert = {
        col: f"src.{col}"
        for col in target_columns
        if col not in exclusion_list_insert
    }

    # 6. Construir el builder de MERGE
    merge_builder = (
        delta_table.alias("m")
            .merge(df.alias("src"), merge_conditions)
            .whenMatchedUpdate(set=columns_to_update)
            .whenNotMatchedInsert(values=columns_to_insert)
    )

    # 7. Si se habilita delete, eliminar filas en destino no presentes en la fuente
    if enable_delete:
        merge_builder = merge_builder.whenNotMatchedBySourceDelete()

    # 8. Ejecutar el MERGE
    merge_builder.execute()
    print(f"MERGE exitoso en tabla {table_name}")

    # 9. Registrar y devolver métricas para la tabla de control
    return insert_metrics(get_metrics(table_name))


In [0]:
def insert_metrics(metrics_tuple: Tuple[Dict[str, int], str]) -> None:
    """
    Inserta las métricas de la última operación de MERGE/WRITE en la tabla de auditoría Delta.

    Args:
        metrics_tuple (Tuple[Dict[str, int], str]):
            - metrics (Dict[str, int]): Diccionario con métricas extraídas de Delta:
                * numTargetRowsInserted   — filas insertadas
                * numTargetRowsUpdated    — filas actualizadas
                * numTargetRowsDeleted    — filas eliminadas
                * numTargetBytesAdded     — bytes escritos en disco
                * executionTimeMs         — tiempo de ejecución del merge en milisegundos
            - table_name (str): Nombre completo de la tabla destino en formato 'layer.table'.

    Raises:
        ValueError: Si el nombre de la tabla no contiene un punto separador.
    """

    # Desempaqueta la tupla de métricas
    metrics, table_name = metrics_tuple

    # Recupera identificadores de ejecución desde widgets de Databricks
    job_id       = int(dbutils.widgets.get('JobId'))
    job_run_id   = int(dbutils.widgets.get('JobRunId'))
    task_run_id  = int(dbutils.widgets.get('TaskRunId'))
    start_time   = dbutils.widgets.get('StartTime')  # como string

    # Valida formato de table_name y separa layer y nombre de tabla
    if '.' not in table_name:
        raise ValueError(f"El nombre de tabla '{table_name}' debe estar en formato 'layer.table'")
    layer, table = table_name.split(".")

    # Construye un DataFrame con las métricas y añade columnas de contexto
    df_metrics = (
        spark.createDataFrame([metrics])
            # Contexto de Job/Task
            .withColumn('job_id', lit(job_id))
            .withColumn('job_run_id', lit(job_run_id))
            .withColumn('task_run_id', lit(task_run_id))
            .withColumn('job_start_time', lit(start_time).cast("timestamp"))
            .withColumn('job_end_time', current_timestamp())
            .withColumn('job_duration_seconds',
                        (col("job_end_time").cast("long") - col("job_start_time").cast("long")))
            # Contexto de tabla y capa
            .withColumn('table', lit(table))
            .withColumn('layer', lit(layer))
            # Métricas de ingesta
            .withColumn('rows_in',
                        when(col("layer") != "bronze", lit(0))
                        .otherwise(col('numTargetRowsInserted')))
            .withColumn('rows_inserted', col("numTargetRowsInserted"))
            .withColumn('rows_updated', col("numTargetRowsUpdated"))
            .withColumn('rows_deleted', col("numTargetRowsDeleted"))
            .withColumn('file_bytes', col("numTargetBytesAdded"))
            .withColumn('merge_duration_seconds',
                        col("executionTimeMs") / lit(1000))
            # Estado de la operación
            .withColumn('job_status',
                        when(col("file_bytes") > 0, lit("Success"))
                        .otherwise(lit("Failed")))
            # Selecciona el orden de columnas definitivo para la tabla de auditoría
            .select(
                "job_id", "job_run_id", "task_run_id",
                "job_start_time", "job_end_time", "job_duration_seconds",
                "job_status", "table", "layer",
                "rows_in", "rows_inserted", "rows_updated", "rows_deleted",
                "file_bytes", "merge_duration_seconds"
            )
    )

    # Carga la tabla Delta de auditoría
    delta_table = DeltaTable.forName(spark, "auditoria.ingestion_log")

    # Prepara el mapeo de columnas para inserción, excluyendo snapshot_time
    columns_to_insert = {
        col_name: f"in.{col_name}"
        for col_name in delta_table.toDF().columns
        if col_name not in ["snapshot_time"]
    }

    # Ejecuta MERGE forzando solo inserts (lit(False) evita coincidencias)
    delta_table.alias("m") \
        .merge(
            df_metrics.alias("in"),
            lit(False)
        ) \
        .whenNotMatchedInsert(values=columns_to_insert) \
        .execute()

    # Log de confirmación
    print("MERGE exitoso en auditoria.ingestion_log")


In [0]:
def normalize_query(query: str, database: str = "adventureworks2022") -> str:
    # Quitar corchetes de columnas y tabla/esquema
    query = re.sub(r"\[([^\]]+)\]", r"\1", query)
    
    # Reemplazar FROM Schema.Table por FROM database.Schema.Table
    query = re.sub(
        r"\bFROM\s+(\w+)\.(\w+)",
        fr"FROM {database}.\1.\2",
        query,
        flags=re.IGNORECASE
    )
    return query.strip()