In [None]:

# === Parámetros principales del proceso ===
tabla_origen = "CONT_TADENDA"
tabla_destino = "lk_adenda"
schema_config = "config"
schema_origen= "staging"
schema_destino = "origin1"
lakehouse_origen = "raw_lh_tte_origen1"
lakehouse_destino = "std_lh_tte_dominioEnagas"
fecha_inicio = "1900-01-01"
workspace="ws_tte_poc_data"
usuario="adm_tte"


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from datetime import datetime
from pyspark.sql.functions import lit
from functools import reduce
import re
from delta.tables import DeltaTable
import os

In [None]:
def aplicar_mapping(df_origen, mapping_df):
    # Crear un diccionario con los tipos de datos válidos en pyspark.sql.types
    # Esto permite mapear un string como "StringType" a la clase correspondiente
    tipos_validos = {
        t.__name__: t for t in [
            StringType, ShortType, IntegerType, LongType,
            DoubleType, FloatType, BooleanType, DateType, TimestampType
        ]
    }

    # Recolectar todas las columnas origen definidas en el DataFrame de mapeo
    columnas_origen = [row['columna_origen'] for row in mapping_df.collect()]

    # Verificar si esas columnas realmente existen en el DataFrame de entrada
    columnas_faltantes = [col for col in columnas_origen if col not in df_origen.columns]

    # Si alguna columna no existe en df_origen, lanzar un error
    if columnas_faltantes:
        raise ValueError(f"Las siguientes columnas no existen en el DataFrame origen: {', '.join(columnas_faltantes)}")

    # Lista para guardar las transformaciones que se aplicarán (cast + rename)
    columnas_transformadas = []

    # Iterar sobre cada fila del DataFrame de mapeo
    for row in mapping_df.collect():
        col_origen = row['columna_origen']      # Nombre original de la columna
        col_destino = row['columna_destino']    # Nombre deseado de la columna
        tipo_dato_str = row['tipo_dato']        # Tipo de dato como string (ej. "IntegerType")

        # Validar que el tipo de dato indicado sea válido
        if tipo_dato_str not in tipos_validos:
            raise ValueError(f"Tipo de dato '{tipo_dato_str}' no es válido. Verifica el valor en la tabla de mapeo.")

        # Obtener la clase correspondiente al tipo de dato
        tipo_dato = tipos_validos[tipo_dato_str]

        # Agregar la transformación a la lista: casteo del tipo y renombre de la columna
        columnas_transformadas.append(
            F.col(col_origen).cast(tipo_dato()).alias(col_destino)
        )

    # Aplicar todas las transformaciones en un solo select y devolver el DataFrame resultante
    df_transformado = df_origen.select(*columnas_transformadas)
    return df_transformado


In [None]:
def obtener_claves_pk(validation_df):
    """
    Extrae las claves primarias únicas y limpias de un DataFrame de configuración.

    Args:
        validation_df (DataFrame): DataFrame que contiene la columna 'pk_columna',
                                   donde se especifican las claves primarias como una lista separada por comas.

    Returns:
        list: Lista de claves primarias únicas y limpias.
    """
    # Filtrar filas donde pk_columna es nula o vacía (solo espacios)
    df_filtrado = validation_df.filter("pk_columna IS NOT NULL AND TRIM(pk_columna) != ''")

    # Separar claves por coma, eliminar espacios y descartar vacíos
    claves = df_filtrado.select("pk_columna").distinct().rdd \
        .flatMap(lambda x: [col.strip() for col in x[0].split(",") if col and col.strip()])

    # Devolver lista única y ordenada
    return sorted(set(claves.collect()))


In [None]:
def crear_df_error(pk_cols, col, regla, mensaje_error, severidad, fecha_ejecucion):
    """
    Crea un DataFrame de errores con un esquema explícito para una validación fallida.

    Args:
        pk_cols (list): Lista de columnas que representan la clave primaria.
        col (str): Nombre de la columna donde se detectó el error.
        regla (str): Tipo de regla de validación aplicada.
        mensaje_error (str): Mensaje descriptivo del error encontrado.
        severidad (str): Nivel de severidad asignado a la regla.
        fecha_ejecucion (str): Timestamp de la ejecución actual.

    Returns:
        DataFrame: DataFrame de Spark con un solo registro de error y esquema definido.
    """
    schema_error = StructType(
        [StructField(pk, StringType(), True) for pk in pk_cols] +
        [
            StructField("columna", StringType(), True),
            StructField("tipo_regla", StringType(), True),
            StructField("error", StringType(), True),
            StructField("severidad", StringType(), True),
            StructField("fecha_ejecucion", StringType(), True),
        ]
    )
    error_data = {pk: None for pk in pk_cols}
    error_data.update({
        "columna": col,
        "tipo_regla": regla,
        "error": mensaje_error,
        "severidad": severidad,
        "fecha_ejecucion": fecha_ejecucion
    })
    return spark.createDataFrame([error_data], schema=schema_error)

In [None]:
def aplicar_validaciones(df, validation_config_df):
    """
    Aplica un conjunto de reglas de validación sobre un DataFrame principal,
    utilizando una configuración de reglas definidas en otro DataFrame.

    Por cada regla violada, se crea un registro de error que es acumulado y,
    al finalizar, todos los errores son escritos en la carpeta 'Files' del lakehouse correspondiente.

    Reglas soportadas:
        - not_null
        - unique
        - range
        - regex_match
        - value_in_list
        - fk_exists (verifica existencia en tabla referencial Delta)
        - date_range
        - min_value
        - max_value

    Args:
        df (DataFrame): DataFrame principal a validar.
        validation_config_df (DataFrame): DataFrame con las reglas de validación.
            Se espera que tenga las siguientes columnas:
                - regla_tipo
                - columna_1
                - columna_2 (opcional para algunas reglas)
                - valor (usado en reglas como range, regex, value_in_list, etc.)
                - tabla_referencia / columna_referencia (para fk_exists)
                - descripcion (mensaje del error)
                - severidad

    Returns:
        DataFrame: DataFrame de errores encontrados durante las validaciones.

    Nota:
        Esta función depende de variables globales que deben estar definidas antes de su ejecución:
            - workspace: Nombre del Fabric workspace.
            - lakehouse_destino: Lakehouse donde se encuentra la tabla de referencia (si aplica).
            - schema_destino: Carpeta del esquema en el lakehouse.
            - tabla_destino: Nombre de la tabla origen que se está validando.
    """
    errores = []
    fecha_ejecucion = datetime.now().strftime("%Y%m%d_%H%M%S")
    pk_cols = obtener_claves_pk(validation_config_df)

    for row in validation_config_df.toLocalIterator():
        regla = row['regla_tipo']
        col = row['columna_1']
        descripcion = row['descripcion']
        severidad = row['severidad']
        operador = row['operador']
        valor = row['valor']
        col2 = row['columna_2']
        tabla_ref = row['tabla_referencia']
        col_ref = row['columna_referencia']

        df_error = None

        try:
            if regla == "not_null":
                df_error = df.filter(F.col(col).isNull())

            elif regla == "unique":
                df_error = (
                    df.groupBy(col).count().filter("count > 1")
                    .join(df, on=col, how="inner")
                )

            elif regla == "range":
                rango = [float(x.strip()) for x in valor.split(",")]
                if len(rango) != 2:
                    raise ValueError(f"Regla 'range' mal definida: se esperaban 2 valores, se recibió: '{valor}'")
                min_val, max_val = rango
                df_error = df.filter(~((F.col(col) >= min_val) & (F.col(col) <= max_val)))

            elif regla == "regex_match":
                regex_udf = F.udf(lambda x: not re.match(valor, x) if x else True, StringType())
                df_error = df.filter(regex_udf(F.col(col)))

            elif regla == "value_in_list":
                try:
                    lista = [float(x.strip()) for x in valor.split(",") if x.strip()]
                    df_error = df.filter(~F.col(col).cast("double").isin(lista))
                except:
                    lista = [x.strip() for x in valor.split(",") if x.strip()]
                    df_error = df.filter(~F.col(col).isin(lista))

            elif regla == "fk_exists":
                try:
                    ref_df_path = f"abfss://{workspace}@onelake.dfs.fabric.microsoft.com/{lakehouse_destino}.Lakehouse/Tables/{schema_destino}/{tabla_ref}"

                    ref_df_temp = spark.read.format("delta").load(ref_df_path)

                    if col_ref not in ref_df_temp.columns:
                        raise Exception(f"La columna '{col_ref}' no existe en la tabla '{tabla_ref}'.")

                    ref_df = F.broadcast(ref_df_temp.select(col_ref).distinct())
                    df_error = df.join(ref_df, df[col] == ref_df[col_ref], "left_anti")

                except Exception as e:
                    mensaje_error = f"No se pudo acceder a la tabla '{tabla_ref}' o columna '{col_ref}': {str(e)}"
                    df_error = crear_df_error(pk_cols, col, regla, mensaje_error, severidad, fecha_ejecucion)
                    errores.append(df_error)
                    continue

            elif regla == "date_range":
                df_error = df.filter(
                    ~((F.col(col) >= F.to_timestamp(F.lit(valor))) & (F.col(col) <= F.to_timestamp(F.lit(col2))))
                )

            elif regla == "min_value":
                df_error = df.filter(F.col(col) < F.lit(valor))

            elif regla == "max_value":
                df_error = df.filter(F.col(col) > F.lit(valor))

        except Exception as e:
            mensaje_error = f"Error procesando regla '{regla}' para columna '{col}': {str(e)}"
            print(f"⚠️ {mensaje_error}")
            df_error = crear_df_error(pk_cols, col, regla, mensaje_error, severidad, fecha_ejecucion)
            errores.append(df_error)
            continue

        if df_error is not None and not df_error.rdd.isEmpty():
            for pk in pk_cols:
                if pk not in df_error.columns:
                    df_error = df_error.withColumn(pk, F.lit(None))

            df_error = df_error.withColumn("error", F.lit(descripcion)) \
                               .withColumn("severidad", F.lit(severidad)) \
                               .withColumn("columna", F.lit(col)) \
                               .withColumn("tipo_regla", F.lit(regla)) \
                               .withColumn("fecha_ejecucion", F.lit(fecha_ejecucion)) \
                               .select(*pk_cols, "columna", "tipo_regla", "error", "severidad", "fecha_ejecucion")

            errores.append(df_error)

    if errores:
        df_errores_final = reduce(lambda df1, df2: df1.unionByName(df2, allowMissingColumns=True), errores)
    else:
        df_errores_final = spark.createDataFrame([], schema=crear_df_error(pk_cols, "", "", "", "", "").schema)

    ruta_salida = f"abfss://{workspace}@onelake.dfs.fabric.microsoft.com/{lakehouse_destino}.Lakehouse/Files/valitation_error/{schema_destino}/{tabla_destino}/val_errors_{tabla_destino}_{fecha_ejecucion}"
    df_errores_final.write.mode("overwrite").option("header", True).csv(ruta_salida)

    return df_errores_final


In [None]:
def registrar_auditoria_carga(ruta_auditoria, total_registros, registros_insertados,registros_actualizados):
    """
    Registra información de auditoría de un proceso de UPSERT en un archivo CSV en OneLake.

    Args:
        ruta_auditoria (str): Ruta del directorio en OneLake donde guardar el log de auditoría.
        nombre_tabla (str): Nombre de la tabla de destino.
        usuario (str): Usuario que ejecuta el proceso.
        total_registros (int): Número total de registros procesados.
        registros_insertados (int): Registros insertados en la tabla.
        registros_actualizados (int): Registros actualizados en la tabla.
        schema_destino (str): Esquema en el lakehouse de destino.
        lakehouse_origen (str): Nombre del lakehouse de origen.
        lakehouse_destino (str): Nombre del lakehouse de destino.
        workspace (str): Nombre del workspace de Microsoft Fabric.
    """
    spark = SparkSession.getActiveSession()
    if spark is None:
        raise ValueError("No hay sesión activa de Spark.")

    fecha_carga = datetime.now().strftime("%Y-%m-%d")
    fecha_ejecucion = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    datos_auditoria = [{
        "tabla": {tabla_destino},
        "usuario": {usuario},
        "fecha_carga": fecha_carga,
        "fecha_ejecucion": fecha_ejecucion,
        "total_registros": str(total_registros),
        "registros_insertados": str(registros_insertados),
        "registros_actualizados": str(registros_actualizados),
        "schema_destino": {schema_destino},
        "lakehouse_origen": {lakehouse_origen},
        "lakehouse_destino": {lakehouse_destino},
        "workspace": {workspace}
    }]

    schema = StructType([
        StructField("tabla", StringType(), True),
        StructField("usuario", StringType(), True),
        StructField("fecha_carga", StringType(), True),
        StructField("fecha_ejecucion", StringType(), True),
        StructField("total_registros", StringType(), True),
        StructField("registros_insertados", StringType(), True),
        StructField("registros_actualizados", StringType(), True),
        StructField("schema_destino", StringType(), True),
        StructField("lakehouse_origen", StringType(), True),
        StructField("lakehouse_destino", StringType(), True),
        StructField("workspace", StringType(), True)
    ])

    df_auditoria = spark.createDataFrame(datos_auditoria, schema=schema)

    # Ruta con timestamp único para evitar sobreescritura
    ruta_completa = f"{ruta_auditoria}/auditoria_{tabla_destino}_{fecha_carga.replace('-', '')}_{datetime.now().strftime('%H%M%S')}.csv"

    df_auditoria.write.mode("overwrite").option("header", True).csv(ruta_completa)

    print(f"📝 Auditoría registrada en: {ruta_completa}")

In [None]:
def hacer_upsert(df_nuevos, ruta_tabla, claves_pk, auditoria_path=None):
    """
    Realiza un upsert sobre una tabla Delta en una ruta ABFSS y agrega información de auditoría.
    También añade los campos 'fecha_carga' y 'usuario' al DataFrame destino.
    
    Args:
        df_nuevos (DataFrame): DataFrame con los nuevos datos.
        ruta_tabla (str): Ruta ABFSS de la tabla Delta.
        claves_pk (list): Lista de claves primarias.
        auditoria_path (str, optional): Ruta para guardar logs de auditoría (Delta o Files).
        usuario (str, optional): Usuario que ejecuta la carga. Por defecto: 'desconocido'.
    """

    if not claves_pk:
        raise ValueError("Debe proporcionar al menos una clave primaria para hacer el upsert.")

    # Campos de auditoría
    fecha_carga_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    df_nuevos = df_nuevos.withColumn("fecha_carga", lit(fecha_carga_str)) \
                         .withColumn("usuario", lit(usuario))

    # Verificar existencia de tabla
    try:
        delta_table = DeltaTable.forPath(spark, ruta_tabla)
        tabla_existe = True
        print(f"ℹ️ Tabla encontrada en ruta: {ruta_tabla}")
    except:
        tabla_existe = False
        print(f"🛈 La tabla no existe. Se creará en: {ruta_tabla}")

    if not tabla_existe:
        # Crear tabla con nuevas columnas
        df_nuevos.write.format("delta").mode("overwrite").save(ruta_tabla)
        print(f"✅ Tabla creada con campos de auditoría en: {ruta_tabla}")
        num_insertados = df_nuevos.count()
        num_actualizados = 0

    else:
        # Leer tabla actual
        df_destino = spark.read.format("delta").load(ruta_tabla)

        # Comparar para auditoría
        join_cond = [df_destino[pk] == df_nuevos[pk] for pk in claves_pk]
        df_updates = df_destino.join(df_nuevos, join_cond, "inner")
        df_inserts = df_nuevos.join(df_destino, join_cond, "left_anti")
        num_actualizados = df_updates.count()
        num_insertados = df_inserts.count()

        # Crear vista temporal
        df_nuevos.createOrReplaceTempView("source")

        cond = " AND ".join([f"target.{pk} = source.{pk}" for pk in claves_pk])
        columnas = df_nuevos.columns
        set_expr = ", ".join([f"target.{col} = source.{col}" for col in columnas])
        insert_cols = ", ".join(columnas)
        insert_vals = ", ".join([f"source.{col}" for col in columnas])

        query = f"""
            MERGE INTO delta.`{ruta_tabla}` AS target
            USING source
            ON {cond}
            WHEN MATCHED THEN UPDATE SET {set_expr}
            WHEN NOT MATCHED THEN INSERT ({insert_cols}) VALUES ({insert_vals})
        """
        spark.sql(query)
        print(f"✅ Merge completado en: {ruta_tabla}")

        auditoria_path = f"abfss://{workspace}@onelake.dfs.fabric.microsoft.com/{lakehouse_destino}.Lakehouse/Files/auditoria_cargas/{schema_destino}/{tabla_destino}"
        registrar_auditoria_carga(
    ruta_auditoria=auditoria_path,
    total_registros=df_nuevos.count(),
    registros_insertados=num_insertados,
    registros_actualizados=num_actualizados
    )

Esquema de la tabla de auditoría: config.validacion_errores
Campo	Tipo	Descripción
pk	string	Identificador de la fila (uno o más)
columna	string	Columna evaluada
error	string	Mensaje de error o regla fallida
severidad	string	Nivel (alta, media, baja)
tabla_destino	string	Nombre de la tabla donde se aplicó la validación
fecha_carga	timestamp	Fecha y hora en que se detectó el error

In [None]:
# Leer datos desde lakehouse origen
df_raw = spark.read.table(f"{lakehouse_origen}.{schema_destino}.{tabla_origen}") \
    .filter(F.col("fecha_carga") >= F.lit(fecha_inicio))

# Aplicar mapping
df_standard = aplicar_mapping(df_raw, mapping_df)

# Obtener claves primarias
claves_pk = obtener_claves_pk(validation_config_df)

# Aplicar validaciones
df_errores = aplicar_validaciones(df_standard, validation_config_df)

# Excluir registros inválidos
if df_errores.count() > 0:
    pks_errores = df_errores.select(claves_pk).distinct()
    df_validos = df_standard.join(pks_errores, on=claves_pk, how="left_anti")
else:
    df_validos = df_standard

# Realizar UPSERT de datos válidos
hacer_upsert(df_validos, f"{lakehouse_destino}.{schema_destino}.{tabla_destino}", claves_pk)




5. EJEMPLO DE USO

Imagina que queremos procesar:

Origen: CONT_TADENDA desde raw_lakehouse

Destino: CONT_TADENDA_STANDARD en standard_lakehouse

Primary Key: IDN_ADENDA

Solo registros desde 2025-01-01

In [None]:
tabla_origen = "CONT_TADENDA"
tabla_destino = "CONT_TADENDA_STANDARD"
lakehouse_origen = "raw_lakehouse"
lakehouse_destino = "standard_lakehouse"
schema_config = "config"
schema_destino = "origin1"
fecha_inicio = "2025-01-01"


Y ejecutar el pipeline tal cual está.
✅ RESUMEN FINAL
Módulo	Funcionalidad
Mapping dinámico	Cast automático y mapeo entre origen y destino
Validaciones	Extensibles (not_null, fk_exists, regex, range, etc.)
Upsert	Insert/update usando claves primarias
Performance	broadcast, incrementalidad y vistas temporales
Reutilización	Parametrizable para cualquier tabla, esquema y lakehouse

¿Dónde implementar acciones correctivas tras validación?

Las acciones correctivas deben colocarse después de aplicar las validaciones, pero antes del upsert, y solo en los registros válidos, o en un bloque separado de transformación de errores.

Hay 2 formas de manejarlo según tu estrategia:

🔧 Opción 1: Corrección antes de la validación

Para campos como not_null, si deseas imputar un valor cuando falte (en vez de lanzar error), puedes hacerlo antes de validar.

Ventaja: El campo ya no fallará la validación.

In [None]:
df_standard = df_standard.withColumn(
    "codigo_adenda",
    F.when(F.col("codigo_adenda").isNull(), F.lit("SC")).otherwise(F.col("codigo_adenda"))
)


pción 2: Corrección después de la validación

Si prefieres identificar el error primero, y luego corregir solo ciertos casos permitidos, debes hacerlo tras la validación, sobre los errores.

🧩 ¿Cómo implementar esto de forma modular?

Te recomiendo crear una función llamada corregir_datos(df) que aplique estas reglas de forma centralizada.

In [None]:
def corregir_datos(df: DataFrame) -> DataFrame:
    return (
        df.withColumn("codigo_adenda", 
                      F.when(F.col("codigo_adenda").isNull(), F.lit("SC"))
                       .otherwise(F.col("codigo_adenda")))
          .withColumn("estado",
                      F.when(~F.col("estado").isin("activo", "inactivo", "baja"), F.lit("SACO"))
                       .otherwise(F.col("estado")))
          .withColumn("monto",
                      F.when(F.col("monto").isNull(), F.lit(0))
                       .otherwise(F.col("monto")))
    )


Puedes aplicar esta función directamente a df_standard antes de las validaciones, o después de haber excluido los errores que no se pueden corregir.
# 1. Aplicar mapping
df_standard = aplicar_mapping(df_raw, mapping_df)

# 2. Corregir datos según reglas
df_standard = corregir_datos(df_standard)

# 3. Validar
df_errores = aplicar_validaciones(df_standard, validation_config_df)

# 4. Excluir inválidos
...

In [None]:
from functools import reduce

def aplicar_validaciones(df, validation_config_df):
    errores = []
    for row in validation_config_df.collect():
        regla = row['regla_tipo']
        col = row['columna_1']
        descripcion = row['descripcion']
        severidad = row['severidad']
        pk_cols = row['pk_columna'].split(",")
        operador = row['operador']
        valor = row['valor']
        col2 = row['columna_2']
        tabla_ref = row['tabla_referencia']
        col_ref = row['columna_referencia']

        df_error = None

        if regla == "not_null":
            df_error = df.filter(F.col(col).isNull())

        elif regla == "unique":
            df_error = df.groupBy(col).count().filter("count > 1").join(df, on=col, how="inner")

        elif regla == "range":
            df_error = df.filter(~((F.col(col) >= F.lit(valor)) & (F.col(col) <= F.col(col2))))

        elif regla == "regex_match":
            regex_udf = F.udf(lambda x: not re.match(valor, x) if x else True, StringType())
            df_error = df.filter(regex_udf(F.col(col)))

        elif regla == "value_in_list":
            lista = valor.split(",")
            df_error = df.filter(~F.col(col).isin(lista))

        elif regla == "fk_exists":
            ref_df = F.broadcast(spark.read.format("delta").load(f"abfss://{workspace}@onelake.dfs.fabric.microsoft.com/{lakehouse_origen}.Lakehouse/Tables/{schema_origen}//{tabla_ref}").select(col_ref).distinct())
            df_error = df.join(ref_df, df[col] == ref_df[col_ref], "left_anti")

        if df_error is not None:
            for pk in pk_cols:
                if pk not in df_error.columns:
                    df_error = df_error.withColumn(pk, F.lit(None))

            df_error = df_error.withColumn("error", F.lit(descripcion)) \
                               .withColumn("severidad", F.lit(severidad)) \
                               .withColumn("columna", F.lit(col)) \
                               .select(*pk_cols, "columna", "error", "severidad")
            errores.append(df_error)

    # Combinar los DataFrames de errores usando reduce
    return reduce(lambda df1, df2: df1.unionByName(df2), errores) if errores else spark.createDataFrame([], schema="pk string, columna string, error string, severidad string")