In [None]:
# =====================================
# 1) Lectura desde Bronze
# =====================================
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Leemos la tabla creada en Bronze (dentro del Lakehouse)
df_bz = spark.table("bronze_personas")  # <-- ajusta si tu nombre difiere

# Vista rápida
print("Registros en Bronze:", df_bz.count())
df_bz.display(5)


In [None]:
# =====================================
# 2) Reglas de limpieza / estandarización (Silver)
#    - Normaliza textos y formatos
#    - Valida email / documento / teléfono
#    - Corrige rangos numéricos
# =====================================

# Expresiones de validación
EMAIL_REGEX = r"^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$"
DNI_REGEX   = r"^[0-9]{8}$"          # 8 dígitos
CEL_REGEX   = r"^9[0-9]{8}$"         # 9 dígitos empezando en 9

# Catálogo simple de normalización de país
normalize_pais = F.when(F.lower(F.col("Pais")).isin("peru","perú","pe"), F.lit("PERU")).otherwise(F.lit(None))

# Catálogo “ciudad desconocida” si viene vacío
normalize_ciudad = F.when(F.trim(F.col("Ciudad"))=="", None).otherwise(F.initcap(F.trim(F.col("Ciudad"))))

# Campos validados / normalizados
df_sv = (
    df_bz
    # --- Limpieza básica de espacios y casing ---
    .withColumn("Nombre" , F.initcap(F.trim(F.col("Nombre"))))
    .withColumn("Apellido", F.initcap(F.trim(F.col("Apellido"))))
    .withColumn("Email"  , F.lower(F.trim(F.col("Email"))))
    .withColumn("Pais"   , normalize_pais)
    .withColumn("Ciudad" , normalize_ciudad)

    # --- Validaciones de identificadores ---
    .withColumn("Documento", F.when(F.col("Documento").rlike(DNI_REGEX), F.col("Documento")).otherwise(F.lit(None)))
    .withColumn("Telefono" , F.when(F.col("Telefono").rlike(CEL_REGEX), F.col("Telefono")).otherwise(F.lit(None)))
    .withColumn("Email"    , F.when(F.col("Email").rlike(EMAIL_REGEX), F.col("Email")).otherwise(F.lit(None)))

    # --- Rangos y outliers ---
    .withColumn("Edad", F.when((F.col("Edad")>=16) & (F.col("Edad")<=100), F.col("Edad")).otherwise(F.lit(None)))
    .withColumn("FrecuenciaVisita", F.when(F.col("FrecuenciaVisita")>=0, F.col("FrecuenciaVisita")).otherwise(F.lit(None)))
    .withColumn("TicketPromedio",   F.when(F.col("TicketPromedio")>0, F.col("TicketPromedio")).otherwise(F.lit(None)))
    .withColumn("PuntosAcumulados", F.when(F.col("PuntosAcumulados")>=0, F.col("PuntosAcumulados")).otherwise(F.lit(None)))

    # --- Normalización de género y estado ---
    .withColumn("Genero", F.when(F.col("Genero").isin("M","F"), F.col("Genero")).otherwise(F.lit(None)))
    .withColumn("EstadoCliente", F.when(F.trim(F.col("EstadoCliente"))=="", None).otherwise(F.col("EstadoCliente")))

    # --- Fechas a tipo date si vinieron como string ---
    .withColumn("FechaRegistro", F.to_date("FechaRegistro"))
    .withColumn("UltimaCompra" , F.to_date("UltimaCompra"))
)

df_sv.display(5)


In [None]:
# =====================================
# 3) Deduplicación (Silver)
#    Regla: clave de negocio = coalesce(Documento, Email, Telefono, IdCliente)
#    Conserva el registro con fecha más reciente (UltimaCompra/FechaRegistro)
# =====================================

# Clave de negocio combinada (prioridad: documento > email > teléfono > id)
df_keyed = df_sv.withColumn(
    "BK_CLIENTE",
    F.coalesce(F.col("Documento"), F.col("Email"), F.col("Telefono"), F.col("IdCliente").cast("string"))
)

# Orden de "recencia"
order_cols = [
    F.col("UltimaCompra").desc_nulls_last(),
    F.col("FechaRegistro").desc_nulls_last(),
    F.col("IdCliente").desc()  # desempate
]

# Window para elegir el registro más reciente por BK
w = Window.partitionBy("BK_CLIENTE").orderBy(*order_cols)

df_sv_dedup = (df_keyed
               .withColumn("rn", F.row_number().over(w))
               .filter(F.col("rn")==1)
               .drop("rn"))

print("Registros tras deduplicación:", df_sv_dedup.count())
display(df_sv_dedup)


In [None]:
# =====================================
# 4) Guardar Silver
#    Convención: prefijo 'silver_' en el nombre de la tabla
# =====================================

(df_sv_dedup.write
 .format("delta")
 .mode("overwrite")
 .saveAsTable("silver_personas"))

print("✅ Silver creado: silver_personas")
