In [0]:
from pyspark.sql.functions import when, coalesce, to_date, substring, initcap, trim, col, lit, lower, current_timestamp, upper, create_map, expr, to_timestamp, date_format, round, try_to_timestamp
from pyspark.sql.types import LongType, StringType, DateType, IntegerType, DecimalType, TimestampType
from datetime import datetime, date
from itertools import chain

In [0]:
numeros_literales = {
    "cero": "0", "uno": "1", "dos": "2", "tres": "3",
    "cuatro": "4", "cinco": "5", "seis": "6", "siete": "7",
    "ocho": "8", "nueve": "9", "diez": "10"
}

mapping_numeros = create_map([lit(x) for x in chain(*numeros_literales.items())])

catalog_name = "webinar"
fecha_actual = datetime.now()

df = (
    spark.table(f"{catalog_name}.bronze.ventas")
    .withColumn(
        "cantidad_mapeada",
        coalesce(
            mapping_numeros[lower(trim(col("cantidad")))],
            col("cantidad")
        )
    )
    .withColumn(
        "cantidad",
        when(
            expr("try_cast(cantidad_mapeada as INT)") > 0,
            expr("try_cast(cantidad_mapeada as INT)")
        ).otherwise(None)
    )
    .drop("cantidad_mapeada")
    .withColumn(
        "monto",
        when(
            expr("try_cast(monto as DOUBLE)") > 0,
            round(expr("try_cast(monto as DOUBLE)"), 2)
        ).otherwise(None)
    )
    .withColumn(
        "fecha_venta",
        expr("""
            coalesce(
                try_to_timestamp(trim(fecha_venta), 'yyyy/MM/dd HH:mm:ss'),
                try_to_timestamp(trim(fecha_venta), 'yyyy-MM-dd HH:mm:ss'),
                try_to_timestamp(trim(fecha_venta), 'dd/MM/yyyy HH:mm:ss'),
                try_to_timestamp(trim(fecha_venta), 'yyyy-MM-dd'),
                try_to_timestamp(trim(fecha_venta), 'yyyy/MM/dd'),
                try_to_timestamp(trim(fecha_venta), 'dd/MM/yyyy')
            )
        """)
    )
    .withColumn("fecha_venta",
        when(
            (col("fecha_venta") > lit(fecha_actual)) | 
            (col("fecha_venta") < lit("2000-01-01")),
            None
        )
        .otherwise(col("fecha_venta"))
    )
    .withColumn(
        "anio_mes",
        when(
            col("fecha_venta").isNotNull(),
            date_format(col("fecha_venta"), "yyyy-MM")
        )
    )
    .dropDuplicates(["id_venta"])
    .withColumn("updated_at", current_timestamp())
    .select(
        col("id_venta").cast(LongType()),
        col("id_cliente").cast(LongType()),
        col("id_tienda").cast(LongType()),
        col("id_producto").cast(LongType()),
        col("cantidad").cast(IntegerType()),
        col("monto").cast(DecimalType(18, 2)),
        col("fecha_venta").cast(TimestampType()),
        col("anio_mes").cast(StringType()),
        col("updated_at")
    )
)

df.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.silver.ventas")