In [1]:
from pyspark.sql.types import StructType, StructField, StringType, LongType, TimestampType
from pyspark.sql.functions import col, lit, max as Fmax, row_number, current_timestamp
from pyspark.sql.window import Window
from pyspark.sql import functions as F

StatementMeta(, 1a57df19-f8b6-40db-80f1-3c1400ed39b5, 3, Finished, Available, Finished)

In [2]:
tablas = [
    {
        "origen": "clasificacionCanal",
        "dimension": "dimensionCanalSilver",
        "campo_clave": "Código",
        "columnas_base": ["Código", "Canal", "Descripcion"],
        "nombre_sk": "skCanal"
    },
    {
        "origen": "clasificacionIndustria",
        "dimension": "dimensionIndustriaSilver",
        "campo_clave": "Código",
        "columnas_base": ["Sector", "Código", "Descripcion"],
        "nombre_sk": "skIndustria"
    },
    {
        "origen": "clasificacionSegmentoComercial",
        "dimension": "dimensionSegmentoComercialSilver",
        "campo_clave": "Código",
        "columnas_base": ["Código", "Segmento", "DescripciónBreve"],
        "nombre_sk": "skSegmentoComercial"
    },
    {
        "origen": "clasificacionValorEstrategico",
        "dimension": "dimensionValorEstrategicoSilver",
        "campo_clave": "Código",
        "columnas_base": ["valorCliente", "Código", "Descripción"],
        "nombre_sk": "skValorEstrategico"
    }
]

StatementMeta(, 1a57df19-f8b6-40db-80f1-3c1400ed39b5, 4, Finished, Available, Finished)

In [3]:
invalid_chars = [' ', ',', ';', '{', '}', '(', ')', '\n', '\t', '=']
def limpiar_nombre_columna(nombre):
    for ch in invalid_chars:
        nombre = nombre.replace(ch, '_')
    return nombre

StatementMeta(, 1a57df19-f8b6-40db-80f1-3c1400ed39b5, 5, Finished, Available, Finished)

In [4]:
for tabla in tablas:
    print(f"🔄 Procesando: {tabla['origen']}")

    # 1. Cargar datos base
    df_base = spark.sql(f"SELECT * FROM lakehouseSilver.{tabla['origen']}")
    df_base = df_base.dropDuplicates([tabla["campo_clave"]])

    # 2. Cargar dimensión existente (si existe)
    try:
        df_actual = spark.sql(f"SELECT * FROM lakehouseSilver.{tabla['dimension']}")
        existe = True
    except:
        existe = False
        schema = df_base.schema.add(tabla["nombre_sk"], LongType()).add("FechaCreacion", TimestampType()).add("EsActual", StringType())
        df_actual = spark.createDataFrame([], schema)

    # 3. Detectar nuevos
    df_nuevos = df_base.join(df_actual.select(tabla["campo_clave"]), on=tabla["campo_clave"], how="left_anti")

    # 4. Calcular nueva SK
    if existe and tabla["nombre_sk"] in df_actual.columns and df_actual.count() > 0:
        max_sk = df_actual.agg(F.max(tabla["nombre_sk"])).collect()[0][0] or 0
    else:
        max_sk = 0

    # 5. Generar SK nuevos
    window = Window.orderBy(tabla["campo_clave"])
    df_nuevos_sk = df_nuevos.withColumn(tabla["nombre_sk"], F.row_number().over(window) + max_sk)

    # 6. Auditoría
    now = F.current_timestamp()
    df_nuevos_sk = df_nuevos_sk.withColumn("FechaCreacion", now).withColumn("EsActual", F.lit("Sí"))

    for colname in ["FechaCreacion", "EsActual"]:
        if colname not in df_actual.columns:
            df_actual = df_actual.withColumn(colname, F.lit(None).cast(TimestampType() if colname == "FechaCreacion" else StringType()))

    # 7. Unir
    df_final = df_actual.unionByName(df_nuevos_sk)

    # 8. Renombrar columnas con caracteres inválidos
    df_final = df_final.select([F.col(c).alias(limpiar_nombre_columna(c)) for c in df_final.columns])

    # 9. Validar duplicados
    if df_final.groupBy(tabla["nombre_sk"]).count().filter("count > 1").count() > 0:
        raise Exception(f"❌ Duplicados en {tabla['nombre_sk']}")

    # 10. Guardar en Silver como Delta Table
    df_final.write.mode("overwrite").format("delta").saveAsTable(f"lakehouseSilver.{tabla['dimension']}")

    print(f"✅ {tabla['dimension']} actualizada correctamente.\n")


StatementMeta(, 1a57df19-f8b6-40db-80f1-3c1400ed39b5, 6, Finished, Available, Finished)

🔄 Procesando: clasificacionCanal
✅ dimensionCanalSilver actualizada correctamente.

🔄 Procesando: clasificacionIndustria
✅ dimensionIndustriaSilver actualizada correctamente.

🔄 Procesando: clasificacionSegmentoComercial
✅ dimensionSegmentoComercialSilver actualizada correctamente.

🔄 Procesando: clasificacionValorEstrategico
✅ dimensionValorEstrategicoSilver actualizada correctamente.

