In [0]:
catalog       = "smartdavid_catalog"
schema_bronze = "bronze"
schema_silver = "silver"

In [0]:
df_united       = spark.table(f"{catalog}.{schema_bronze}.unit_data")
df_pagos_prico  = spark.table(f"{catalog}.{schema_bronze}.pagos_prico")
df_pagos_mepeco = spark.table(f"{catalog}.{schema_bronze}.pagos_mepeco")

print(f"united_data:   {df_united.count()} registros")
print(f"pagos_prico:   {df_pagos_prico.count()} registros")
print(f"pagos_mepeco:  {df_pagos_mepeco.count()} registros")

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType, IntegerType

df_united_clean = (
    df_united
    # Renombrar columnas clave
    .withColumnRenamed("NUM_RESOL",     "num_resol")
    .withColumnRenamed("RUC",           "ruc")
    .withColumnRenamed("NOMBRE",        "nombre_contribuyente")
    .withColumnRenamed("DEP",           "departamento")
    .withColumnRenamed("TIPO_FRACC",    "tipo_fraccionamiento")
    .withColumnRenamed("ESTADO",        "estado")
    .withColumnRenamed("COLABORADOR",   "colaborador")
    .withColumnRenamed("OBSERVACIONES", "observaciones")
    .withColumnRenamed("FEC_VENC",      "fecha_vencimiento")
    .withColumnRenamed("MONTO_CTA",     "monto_cuota")
    .withColumnRenamed("SALDO",         "saldo")
    .withColumnRenamed("COD_TRI",       "cod_tributario")
    # Limpiar espacios
    .withColumn("num_resol",   F.trim(F.col("num_resol")))
    .withColumn("ruc",         F.trim(F.col("ruc")))
    .withColumn("colaborador", F.trim(F.col("colaborador")))
    .withColumn("estado",      F.trim(F.col("estado")))
    # Tipar columnas numéricas
    .withColumn("monto_cuota", F.col("monto_cuota").cast(DoubleType()))
    .withColumn("saldo",       F.col("saldo").cast(DoubleType()))
    # Convertir fecha
    .withColumn("fecha_vencimiento", F.to_date(F.col("fecha_vencimiento"), "M/d/yyyy"))
    # Filtrar registros sin RUC
    .filter(F.col("ruc").isNotNull())
    # Marcar si tiene observación registrada
    .withColumn("tiene_observacion", 
        F.when(
            (F.col("observaciones").isNotNull()) & (F.trim(F.col("observaciones")) != ""), 
            True
        ).otherwise(False))
)

print(f"united_data limpio: {df_united_clean.count()} registros")
df_united_clean.limit(3).display()

In [0]:
def clean_pagos(df, fuente):
    return (
        df
        .withColumn("valor",      F.trim(F.col("valor")))
        .withColumn("pag_numruc", F.trim(F.col("pag_numruc")))
        .withColumn("ddp_nombre", F.trim(F.col("ddp_nombre")))
        .withColumn("crt_fecpag", F.to_date(F.col("crt_fecpag"), "d/MM/yyyy"))
        .withColumn("pago_total", F.col("pago_total").cast(DoubleType()))
        .withColumn("pago",       F.col("pago").cast(DoubleType()))
        .withColumn("fuente_cartera", F.lit(fuente))
        .select(
            "pag_numruc", "ddp_nombre", "valor",
            "crt_fecpag", "pago_total", "pago",
            "pag_perpag", "crt_codtri", "fuente_cartera"
        )
        .filter(F.col("valor").isNotNull())
    )

df_prico_clean  = clean_pagos(df_pagos_prico,  "PRICO")
df_mepeco_clean = clean_pagos(df_pagos_mepeco, "MEPECO")

# Unir ambos datasets de pagos
df_pagos_all = df_prico_clean.union(df_mepeco_clean)

print(f"Pagos consolidados: {df_pagos_all.count()} registros")
df_pagos_all.limit(3).display()

In [0]:
# Cruce: pagos.valor == united.num_resol
df_joined = (
    df_pagos_all
    .join(
        df_united_clean.select(
            "num_resol", "colaborador", "departamento",
            "nombre_contribuyente", "observaciones",
            "tiene_observacion", "saldo", "estado"
        ),
        df_pagos_all["valor"] == df_united_clean["num_resol"],
        how="left"
    )
    .withColumn("tiene_gestion",
        F.when(F.col("colaborador").isNotNull(), True).otherwise(False)
    )
)

total          = df_joined.count()
con_gestion    = df_joined.filter(F.col("tiene_gestion") == True).count()
sin_gestion    = df_joined.filter(F.col("tiene_gestion") == False).count()

print(f"Total pagos procesados : {total}")
print(f"Pagos CON gestión      : {con_gestion}")
print(f"Pagos SIN gestión      : {sin_gestion}")
df_joined.limit(5).display()

In [0]:
# Tabla 1: united_data limpio
df_united_clean.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(f"{catalog}.{schema_silver}.united_data_clean")

# Tabla 2: pagos consolidados
df_pagos_all.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(f"{catalog}.{schema_silver}.pagos_consolidado")

# Tabla 3: pagos con gestión (JOIN)
df_joined.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(f"{catalog}.{schema_silver}.pagos_con_gestion")

print("Las 3 tablas Silver guardadas exitosamente")

In [0]:
%sql
SELECT 'united_data_clean'  AS tabla, COUNT(*) AS registros FROM smartdavid_catalog.silver.united_data_clean
UNION ALL
SELECT 'pagos_consolidado'  AS tabla, COUNT(*) AS registros FROM smartdavid_catalog.silver.pagos_consolidado
UNION ALL
SELECT 'pagos_con_gestion'  AS tabla, COUNT(*) AS registros FROM smartdavid_catalog.silver.pagos_con_gestion;