In [0]:
catalog_name = "dmc_tallerfinaldatabricks_mariovento"

schema_bronze = "bronze"
schema_silver = "silver"

In [0]:
from pyspark.sql.functions import expr, when, trim, col, to_date, row_number
from pyspark.sql.window import Window
from delta.tables import DeltaTable

In [0]:
silver_ventas = (
    spark.table(f"{catalog_name}.{schema_bronze}.ventasbase_raw")  # Lee la tabla raw
    .withColumn("id_venta", expr("try_cast(id_venta as int)"))  # Convierte id_venta a entero
    .withColumn("id_cliente", expr("try_cast(id_cliente as int)"))  # Convierte id_cliente a entero
    .withColumn("id_tienda", expr("try_cast(id_tienda as int)"))  # Convierte id_tienda a entero
    .withColumn("id_producto", expr("try_cast(id_producto as int)"))  # Convierte id_producto a entero
    # Corrige la columna cantidad: si es "diez" la pone en 10, si no intenta convertir a entero
    .withColumn("cantidad", when(trim(col("cantidad")) == "diez", 10)
        .otherwise(expr("try_cast(cantidad as int)"))
    )
    .withColumn("monto", col("monto").cast("double"))  # Convierte monto a double
    # Si el monto es negativo, lo corrige a 0 en monto_corregido
    .withColumn("monto_corregido", when(col("monto") < 0, 0).otherwise(col("monto")))
    # Marca si el monto fue corregido o no
    .withColumn("estado_monto", when(col("monto") < 0, "Corregido").otherwise("No corregido"))
    # Limpia y normaliza la fecha de venta en varios formatos posibles
    .withColumn(
        "fecha_venta",
        when(col("fecha_venta") == "N/A", None)
        .when(col("fecha_venta").rlike("^[0-9]{4}-[0-9]{2}-[0-9]{2}$"), to_date(col("fecha_venta"), "yyyy-MM-dd").cast("timestamp"))
        .when(col("fecha_venta").rlike("^[0-9]{2}-[0-9]{2}-[0-9]{4}$"), to_date(col("fecha_venta"), "MM-dd-yyyy").cast("timestamp"))
        .when(col("fecha_venta").rlike("^[0-9]{2}/[0-9]{2}/[0-9]{4}$"), to_date(col("fecha_venta"), "dd/MM/yyyy").cast("timestamp"))
        .when(col("fecha_venta").rlike("^[0-9]{4}/[0-9]{2}/[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}$"), to_date(col("fecha_venta"), "yyyy/MM/dd HH:mm:ss").cast("timestamp"))  
        .otherwise(None)
    )
    # Limpia y normaliza la fecha de actualización en varios formatos posibles
    .withColumn(
        "updated_at",
        when(col("updated_at") == "N/A", None)
        .when(col("updated_at").rlike("^[0-9]{4}-[0-9]{2}-[0-9]{2}$"), to_date(col("updated_at"), "yyyy-MM-dd").cast("timestamp"))
        .when(col("updated_at").rlike("^[0-9]{2}-[0-9]{2}-[0-9]{4}$"), to_date(col("updated_at"), "MM-dd-yyyy").cast("timestamp"))
        .when(col("updated_at").rlike("^[0-9]{2}/[0-9]{2}/[0-9]{4}$"), to_date(col("updated_at"), "dd/MM/yyyy").cast("timestamp"))
        .when(col("updated_at").rlike("^[0-9]{4}/[0-9]{2}/[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}$"), to_date(col("updated_at"), "yyyy/MM/dd HH:mm:ss").cast("timestamp"))  
        .otherwise(None)
    )
    .dropDuplicates(["id_venta"])  # Elimina duplicados por id_venta
    .select(
        "id_venta", "id_cliente", "id_tienda", "id_producto", "cantidad",
        "monto", "monto_corregido", "estado_monto",
        "fecha_venta", "updated_at"
    )  # Selecciona solo las columnas relevantes
)

In [0]:
display(silver_ventas)

In [0]:
windows_dedup = Window.partitionBy("id_venta", "id_producto").orderBy(col("updated_at").desc())

silver_ventas_dedup = (
    silver_ventas
    .withColumn("dedup", row_number().over(windows_dedup))
    .filter(col("dedup") == 1)
    .drop("dedup")
)

In [0]:
silver_ventas_dedup.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_silver}.ventas")

In [0]:
%sql
select count (*) FROM dmc_tallerfinaldatabricks_mariovento.silver.ventas

In [0]:
path_base = "/Volumes/dmc_tallerfinaldatabricks_mariovento/default/input"

path_ventas_incremento = f"{path_base}/ventas_incremento.csv"

In [0]:
ventas_incremento = spark.read.option("header", True).option("inferSchema", False).csv(path_ventas_incremento)

In [0]:
display(ventas_incremento)

In [0]:
ventas_incremento_normalizado = (
    ventas_incremento
    # Convierte las columnas de IDs a tipo bigint
    .withColumn("id_venta", col("id_venta").cast("bigint"))
    .withColumn("id_cliente", col("id_cliente").cast("bigint"))
    .withColumn("id_tienda", col("id_tienda").cast("bigint"))
    .withColumn("id_producto", col("id_producto").cast("bigint"))
    # Normaliza la columna cantidad: si es "5u" la pone en 5, si no intenta convertir a entero
    .withColumn("cantidad", when(trim(col("cantidad")) == "5u", 5) 
        .otherwise(col("cantidad").cast("int"))
    )
    # Convierte monto a decimal con 2 decimales
    .withColumn("monto", col("monto").cast("decimal(18,2)"))
    # Limpia y normaliza la fecha de venta en varios formatos posibles
    .withColumn(
        "fecha_venta",
        when(col("fecha_venta") == "N/A", None)
        .when(col("fecha_venta").rlike("^[0-9]{4}-[0-9]{2}-[0-9]{2}$"), to_date(col("fecha_venta"), "yyyy-MM-dd").cast("timestamp"))
        .when(col("fecha_venta").rlike("^[0-9]{2}-[0-9]{2}-[0-9]{4}$"), to_date(col("fecha_venta"), "MM-dd-yyyy").cast("timestamp"))
        .when(col("fecha_venta").rlike("^[0-9]{2}/[0-9]{2}/[0-9]{4}$"), to_date(col("fecha_venta"), "dd/MM/yyyy").cast("timestamp"))
        .when(col("fecha_venta").rlike("^[0-9]{4}/[0-9]{2}/[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}$"), to_date(col("fecha_venta"), "yyyy/MM/dd HH:mm:ss").cast("timestamp"))  
        .otherwise(None)
    )
    # Limpia y normaliza la fecha de actualización en varios formatos posibles
    .withColumn(
        "updated_at",
        when(col("updated_at") == "N/A", None)
        .when(col("updated_at").rlike("^[0-9]{4}-[0-9]{2}-[0-9]{2}$"), to_date(col("updated_at"), "yyyy-MM-dd").cast("timestamp"))
        .when(col("updated_at").rlike("^[0-9]{2}-[0-9]{2}-[0-9]{4}$"), to_date(col("updated_at"), "MM-dd-yyyy").cast("timestamp"))
        .when(col("updated_at").rlike("^[0-9]{2}/[0-9]{2}/[0-9]{4}$"), to_date(col("updated_at"), "dd/MM/yyyy").cast("timestamp"))
        .when(col("updated_at").rlike("^[0-9]{4}/[0-9]{2}/[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}$"), to_date(col("updated_at"), "yyyy/MM/dd HH:mm:ss").cast("timestamp"))  
        .otherwise(None)
    )
    # Elimina duplicados por id_venta
    .dropDuplicates(["id_venta"])
    # Selecciona solo las columnas relevantes
    .select(
        "id_venta", "id_cliente", "id_tienda", "id_producto", "cantidad",
        "monto", "fecha_venta", "updated_at"
    )
)

In [0]:
ventas_incremento_normalizado.count()

In [0]:
windows_dedup = Window.partitionBy("id_venta", "id_producto").orderBy(col("updated_at").desc())

ventas_incremento_dedup = (
    ventas_incremento_normalizado
    .withColumn("dedup", row_number().over(windows_dedup))
    .filter(col("dedup") == 1)
    .drop("dedup")
)

In [0]:
ventas_incremento_normalizado.count()

In [0]:
##MERGE
# Obtiene la tabla Delta de ventas en el esquema silver
target = DeltaTable.forName(spark, f"{catalog_name}.{schema_silver}.ventas")

# Realiza un MERGE entre la tabla target y los datos nuevos (ventas_incremento_dedup)
merge = (
    target.alias("m")
    .merge(
        ventas_incremento_dedup.alias("in"),
        # Condición de coincidencia: mismo id_venta y id_producto
        (col("m.id_venta") == col("in.id_venta")) &
        (col("m.id_producto") == col("in.id_producto"))
    )
    # Si hay coincidencia y la fecha de actualización del incremento es mayor, actualiza los datos
    .whenMatchedUpdate(
        condition= "in.updated_at > m.updated_at",
        set = {
            "id_cliente": col("in.id_cliente"),
            "id_tienda": col("in.id_tienda"),
            "id_producto": col("in.id_producto"),
            "cantidad": col("in.cantidad"),
            "monto": col("in.monto"),
            "fecha_venta": col("in.fecha_venta"),
            "updated_at": col("in.updated_at")
        }
    )
    # Si no hay coincidencia, inserta el nuevo registro
    .whenNotMatchedInsert(
        values = {
            "id_venta": col("in.id_venta"),
            "id_cliente": col("in.id_cliente"),
            "id_tienda": col("in.id_tienda"),
            "id_producto": col("in.id_producto"),
            "cantidad": col("in.cantidad"),
            "monto": col("in.monto"),
            "fecha_venta": col("in.fecha_venta"),
            "updated_at": col("in.updated_at")
        }
    )
    .execute()  # Ejecuta el MERGE
)

In [0]:
%sql
select count (*) from dmc_tallerfinaldatabricks_mariovento.silver.ventas