In [1]:
# NB_Producto360_Silver
# Alimentar tablas Silver de Producto_360 en Direccion.Producto360

from pyspark.sql import functions as F
from pyspark.sql.utils import AnalysisException

# ------------------------------------------------------------------
# 1. Configuración básica
# ------------------------------------------------------------------

# Nombre de la base (SQL endpoint) del lakehouse Bronze_shortcuts
bronze_db = "Bronze"

cutoff_date = "2018-01-01"              # Fecha mínima (ventas/compras)
empresa = "rac"                         # dataareaid

filtro_empresa = (F.col("dataareaid") == F.lit(empresa)) | F.col("dataareaid").isNull()

spark.sql("CREATE SCHEMA IF NOT EXISTS silver")


StatementMeta(, 02ae5559-0462-4249-9402-2f4da448c42b, 3, Finished, Available, Finished)

DataFrame[]

In [2]:

# ------------------------------------------------------------------
# 2. Tabla de watermarks (una fila por entidad)
# ------------------------------------------------------------------

spark.sql("""
CREATE TABLE IF NOT EXISTS dbo._watermarks_producto360 (
    entity        STRING NOT NULL,
    last_watermark TIMESTAMP
)
USING delta
""")

def get_last_watermark(entity: str):
    df = (spark.table("dbo._watermarks_producto360")
               .filter(F.col("entity") == F.lit(entity)))
    if df.rdd.isEmpty():
        return None
    return df.agg(F.max("last_watermark").alias("wm")).collect()[0]["wm"]

def upsert_watermark(entity: str, new_wm):
    if new_wm is None:
        return
    # convert to string ISO
    wm_str = new_wm.strftime("%Y-%m-%d %H:%M:%S")
    spark.sql(f"""
    MERGE INTO dbo._watermarks_producto360 AS tgt
    USING (SELECT '{entity}' AS entity,
                  TIMESTAMP '{wm_str}' AS last_watermark) AS src
       ON tgt.entity = src.entity
    WHEN MATCHED THEN
        UPDATE SET last_watermark = src.last_watermark
    WHEN NOT MATCHED THEN
        INSERT (entity, last_watermark)
        VALUES (src.entity, src.last_watermark)
    """)

def table_exists(full_name: str) -> bool:
    # full_name: "silver.factSalesLine_Producto360"
    try:
        return spark.catalog.tableExists(full_name)
    except AnalysisException:
        return False


StatementMeta(, 02ae5559-0462-4249-9402-2f4da448c42b, 4, Finished, Available, Finished)

In [3]:
# ------------------------------------------------------------------
# 3. dimFecha_Producto360 (full refresh, pequeña)
# ------------------------------------------------------------------

from datetime import date, timedelta

dim_fecha_table = "silver.dimFecha_Producto360"

start_date = date(2021, 1, 1)
end_date   = date.today() + timedelta(days=365)

dates = [start_date + timedelta(days=i)
         for i in range((end_date - start_date).days + 1)]

df_fecha = (spark.createDataFrame([(d,) for d in dates], ["Fecha"])
            .withColumn("FechaKey", F.date_format("Fecha", "yyyyMMdd").cast("int"))
            .withColumn("Anio", F.year("Fecha"))
            .withColumn("Mes", F.month("Fecha"))
            .withColumn("Dia", F.dayofmonth("Fecha"))
            .withColumn("NombreMes", F.date_format("Fecha", "MMMM"))
            .withColumn("AnioMes", F.date_format("Fecha", "yyyyMM").cast("int"))
           )

df_fecha.createOrReplaceTempView("src_dim_fecha")

spark.sql(f"DROP TABLE IF EXISTS {dim_fecha_table}")
spark.sql(f"""
CREATE TABLE {dim_fecha_table} AS
SELECT * FROM src_dim_fecha
""")

StatementMeta(, 520f2c59-04dc-4bec-b5eb-1691a0957a21, 5, Finished, Available, Finished)

DataFrame[]

In [None]:
# ------------------------------------------------------------------
# 4. dimAlmacen_Producto360 (a partir de inventonhandstoragereportline)
#     - para Producto_360 basta con inventario actual
# ------------------------------------------------------------------

dim_alm_table = "silver.dimAlmacen_Producto360"

inv_src = f"{bronze_db}.Bronze_shortcuts.inventonhandstoragereportline"

df_alm = (spark.table(inv_src)
          .filter(filtro_empresa)
          .select("dataareaid", "inventsiteid", "inventlocationid", "wmslocationid")
          .dropDuplicates()
         )

df_alm.createOrReplaceTempView("src_dim_almacen")

spark.sql(f"DROP TABLE IF EXISTS {dim_alm_table}")
spark.sql(f"""
CREATE TABLE {dim_alm_table} AS
SELECT * FROM src_dim_almacen
""")

StatementMeta(, 8346c3e2-af29-492a-bf4d-e3cf8d6fb719, 8, Finished, Available, Finished)

DataFrame[]

In [3]:
# ------------------------------------------------------------------
# 5. factInventarioActual_Producto360
#     Nuevo diseño: basado en InventSum (vista de D365)
# ------------------------------------------------------------------

from delta.tables import DeltaTable
from pyspark.sql import functions as F

entity = "factInventarioActual_Producto360"
fact_inv_table = "silver.factInventarioActual_Producto360"

# Tablas fuente en Bronze_shortcuts (ajusta el catálogo si aplica)
invsum_src            = f"{bronze_db}.Bronze_shortcuts.inventsum"
inventdim_src         = f"{bronze_db}.Bronze_shortcuts.inventdim"
inventtable_src       = f"{bronze_db}.Bronze_shortcuts.inventtable"
inventitemgroup_src   = f"{bronze_db}.Bronze_shortcuts.inventitemgroupitem"

# Definir si es o no carga completa
force_full = True

# Watermark (usa el mismo helper que ya tienes en el notebook)
last_wm = None if force_full else get_last_watermark(entity)

# ----------------------------------------------------------
# 5.1. Cargar InventSum (todas las filas de la empresa, Closed = 0)
# ----------------------------------------------------------
df_invsum_all = (
    spark.table(invsum_src)
         .filter(filtro_empresa)
         .filter(F.col("closed") == 0)
         .filter(F.col("sysdatastatecode") == 0)
         .filter(F.col("isexcludedfrominventoryvalue") == 0)
)

# Si no hay columna SinkModifiedOn con ese nombre, ajusta aquí:
wm_col = "SinkModifiedOn"

max_wm_row = (
    df_invsum_all
    .agg(F.max(F.col(wm_col)).alias("max_wm"))
    .collect()[0]
)

max_wm_global = max_wm_row["max_wm"]

df_dim = spark.table(inventdim_src).alias("d")


if last_wm is None or force_full:
    df_invsum_for_agg = df_invsum_all

else:
    df_changes = df_invsum_all.filter(F.col(wm_col) > F.lit(last_wm))

    if df_changes.rdd.isEmpty():
        print(f"[{entity}] Sin cambios nuevos desde watermark {last_wm}.")
        df_invsum_for_agg = df_invsum_all.limit(0)  # evita NameError y no reprocesa
    else:
        changed_site_loc = (
            df_changes.alias("s")
            .join(df_dim, F.col("s.inventdimid") == F.col("d.inventdimid"), "inner")
            .select(
                F.col("s.dataareaid").alias("dataareaid"),
                F.col("s.itemid").alias("itemid"),
                F.col("d.inventsiteid").alias("inventsiteid"),
                F.col("d.inventlocationid").alias("inventlocationid"),
            )
            .distinct()
        )

        df_invsum_for_agg = (
            df_invsum_all.alias("s")
            .join(df_dim, F.col("s.inventdimid") == F.col("d.inventdimid"), "inner")
            .join(
                changed_site_loc.alias("k"),
                on=[
                    F.col("s.dataareaid") == F.col("k.dataareaid"),
                    F.col("s.itemid") == F.col("k.itemid"),
                    F.col("d.inventsiteid") == F.col("k.inventsiteid"),
                    F.col("d.inventlocationid") == F.col("k.inventlocationid"),
                ],
                how="inner"
            )
            .select("s.*")
        )

# Si después de la lógica anterior no hay filas, salimos
if df_invsum_for_agg.rdd.isEmpty():
    print(f"[{entity}] No hay filas a procesar (FULL o INCR).")
else:
    # ----------------------------------------------------------
    # 5.2. Joins a InventDim, InventTable, InventItemGroupItem
    # ----------------------------------------------------------
    df_dim = spark.table(inventdim_src).alias("d")

    df_it = (
        spark.table(inventtable_src)
             .filter(filtro_empresa)
             .alias("t")
    )

    df_ig = (
        spark.table(inventitemgroup_src)
             .filter(filtro_empresa)
             .alias("ig")
    )

    df_join = (
        df_invsum_for_agg.alias("s")
        .join(df_dim, F.col("s.inventdimid") == F.col("d.inventdimid"), "inner")
        .join(
            df_it,
            (F.col("s.itemid") == F.col("t.itemid")) &
            (F.col("s.dataareaid") == F.col("t.dataareaid")),
            "inner"
        )
        .join(
            df_ig,
            (F.col("t.itemid") == F.col("ig.itemid")) &
            (F.col("t.dataareaid") == F.col("ig.itemdataareaid")),
            "left"
        )
    )

    # ----------------------------------------------------------
    # 5.3. Selección y agregación (misma lógica que la vista de D365)
    # ----------------------------------------------------------
    df_base = df_join.select(
        F.col("s.dataareaid").alias("dataareaid"),
        F.col("s.itemid").alias("itemid"),
        F.col("d.inventsiteid").alias("inventsiteid"),
        F.col("d.inventlocationid").alias("inventlocationid"),
        F.col("t.namealias").alias("namealias"),
        F.col("ig.itemgroupid").alias("itemgroupid"),

        # Cantidades/valores
        F.col("s.postedqty"),
        F.col("s.postedvalue"),
        F.col("s.physicalvalue"),
        F.col("s.deducted"),
        F.col("s.registered"),
        F.col("s.received"),
        F.col("s.picked"),
        F.col("s.reservphysical"),
        F.col("s.reservordered"),
        F.col("s.onorder"),
        F.col("s.ordered"),
        F.col("s.arrived"),
        F.col("s.quotationreceipt"),
        F.col("s.quotationissue"),
        F.col("s.physicalinvent"),
        F.col("s.availphysical"),
        F.col("s.availordered"),
        F.col(f"s.{wm_col}")
    )

    df_agg = (
        df_base
        .groupBy(
            "dataareaid",
            "itemid",
            "inventsiteid",
            "inventlocationid"
        )
        .agg(
            # Campos descriptivos: tomamos alguno de los valores (MAX/MIN)
            F.max("namealias").alias("namealias"),
            F.max("itemgroupid").alias("itemgroupid"),
            F.sum("postedqty").alias("postedqty"),
            F.sum("postedvalue").alias("postedvalue"),
            F.sum("physicalvalue").alias("physicalvalue"),
            F.sum("deducted").alias("deducted"),
            F.sum("registered").alias("registered"),
            F.sum("received").alias("received"),
            F.sum("picked").alias("picked"),
            F.sum("reservphysical").alias("reservphysical"),
            F.sum("reservordered").alias("reservordered"),
            F.sum("onorder").alias("onorder"),
            F.sum("ordered").alias("ordered"),
            F.sum("arrived").alias("arrived"),
            F.sum("quotationreceipt").alias("quotationreceipt"),
            F.sum("quotationissue").alias("quotationissue"),
            F.sum("physicalinvent").alias("physicalinvent"),
            F.sum("availphysical").alias("availphysical"),
            F.sum("availordered").alias("availordered"),


            F.max(wm_col).alias("row_watermark")
        )
    )

    # Auditoría básica (ajusta a tu estándar si ya tienes columnas definidas)
    df_agg = (
        df_agg
        .withColumn("etl_entity", F.lit(entity))
        .withColumn("etl_run_ts", F.current_timestamp())
    )

    # ----------------------------------------------------------
    # 5.4. MERGE / UPSERT a la tabla Delta factInventarioActual_Producto360
    #      Clave de negocio: empresa + item + sitio + almacén
    # ----------------------------------------------------------
    if spark.catalog.tableExists(fact_inv_table):
        delta_tbl = DeltaTable.forName(spark, fact_inv_table)

        (
            delta_tbl.alias("t")
            .merge(
                df_agg.alias("s"),
                """
                t.dataareaid      = s.dataareaid
                AND t.itemid      = s.itemid
                AND t.inventsiteid    = s.inventsiteid
                AND t.inventlocationid = s.inventlocationid
                """
            )
            .whenMatchedUpdateAll()
            .whenNotMatchedInsertAll()
            .execute()
        )
    else:
        (
            df_agg
            .write
            .format("delta")
            .mode("overwrite")
            .partitionBy("inventsiteid")
            .saveAsTable(fact_inv_table)
        )

    # ----------------------------------------------------------
    # 5.5. Actualizar watermark del proceso
    # ----------------------------------------------------------
    if max_wm_global is not None:
        if last_wm is None or max_wm_global >= last_wm:
            upsert_watermark(entity, max_wm_global)
            print(f"[{entity}] Watermark actualizado a {max_wm_global}.")
        else:
            print(f"[{entity}] Omitido: max_wm_global ({max_wm_global}) < last_wm ({last_wm}).")

StatementMeta(, 02ae5559-0462-4249-9402-2f4da448c42b, 7, Finished, Available, Finished)

[factInventarioActual_Producto360] Watermark actualizado a 2025-12-16 16:28:44.553576.


In [5]:
# ------------------------------------------------------------------
# 6. factSalesLine_Producto360 (incremental + TDC mensual)
# ------------------------------------------------------------------

# ------------------------------------------------------------------
# TDC: leer tabla de tipos de cambio y preparar helper
# --- TDC helper (una sola vez por ejecución) ---
tdc_src = "dbo.tdc_historico_mensual"

tc_full = (spark.table(tdc_src)
           .select(
               F.col("Key_Año_Mes").cast("string").alias("Key_Año_Mes"),
               F.col("tdc_mensual_promedio").cast("double").alias("tdc_mensual_promedio")
           ))

# último mes con TDC disponible como STRING, p.ej. "202511"
max_key_tdc = tc_full.agg(F.max("Key_Año_Mes").alias("max_k")).collect()[0]["max_k"]

# versión para JOIN sin colisiones de nombre
tc_sel = tc_full.select(
    F.col("Key_Año_Mes").alias("Key_Año_Mes_tdc"),
    "tdc_mensual_promedio"
)

# ------------------------------------------------------------------

entity = "factSalesLine_Producto360"
fact_sales_table = "silver.factSalesLine_Producto360"
salesline_src = f"{bronze_db}.Bronze_shortcuts.salesline"
salestable_src = f"{bronze_db}.Bronze_shortcuts.salestable"

# Definir si es o no carga completa
force_full = False

# Watermark (usa el mismo helper que ya tienes en el notebook)
last_wm = None if force_full else get_last_watermark(entity)

sl = spark.table(salesline_src).alias("sl")
st = spark.table(salestable_src).alias("st")
# tc = spark.table(tdc_src).alias("tc")

df_sales = (
    sl.join(
        st,
        (F.col("sl.dataareaid") == F.col("st.dataareaid")) &
        (F.col("sl.salesid")    == F.col("st.salesid")),
        "inner"
    )
    .filter((F.col("sl.dataareaid") == F.lit(empresa)) | F.col("sl.dataareaid").isNull())
    .filter(F.col("st.deliverydate") >= F.lit(cutoff_date))
    .select(
        F.col("sl.recid").alias("recid"),
        F.col("sl.dataareaid").alias("dataareaid"),
        F.col("sl.salesid").alias("salesid"),
        F.col("sl.itemid").alias("itemid"),
        "sl.costprice",
        "sl.salesprice",
        "sl.salesqty",
        "sl.salesunit",
        "sl.currencycode",
        "sl.custaccount",
        "sl.deliveryname",
        "sl.lineamount",
        "sl.linedisc",
        "sl.purchorderformnum",
        "sl.qtyordered",
        "sl.mcrmarginpercent",
        "sl.expectedretqty",
        "st.discpercent",
        "st.inventlocationid",
        "st.inventsiteid",
        "st.quotationid",
        "st.receiptdateconfirmed",
        "st.salesname",
        "st.salesstatus",
        F.to_date(F.col("st.deliverydate")).alias("fecha_solicitud"),
        F.to_date(F.col("st.modifiedon")).alias("fecha_modificacion"),
        F.col("sl.SinkModifiedOn").alias("SinkModifiedOn")
    )
)

# Incremental por SinkModifiedOn
if last_wm is not None:
    df_sales = df_sales.filter(F.col("SinkModifiedOn") > F.lit(last_wm))

if not df_sales.rdd.isEmpty():
    # Agregar llave Año-Mes y TDC
    df_sales = (
    df_sales
    # Año-Mes de la fecha de la operación
    .withColumn("Key_Año_Mes", F.date_format(F.col("fecha_modificacion"), "yyyyMM"))
    # Si la fecha es posterior al último mes con TDC,
    # usar el último Key_Año_Mes disponible
    .withColumn(
        "Key_Año_Mes_fx",
        F.when(F.col("Key_Año_Mes") > F.lit(max_key_tdc),
               F.lit(max_key_tdc)
        ).otherwise(F.col("Key_Año_Mes"))
    )
    # Join contra la tabla de TDC usando la versión ajustada
    .join(
        tc_sel,
        F.col("Key_Año_Mes_fx") == F.col("Key_Año_Mes_tdc"),
        "left"
    )
    .withColumn(
        "TDC_Mensual",
        F.when(F.col("currencycode") == F.lit("MXN"),
               F.lit(1.0))
         .otherwise(
             F.coalesce(F.col("tdc_mensual_promedio"), F.lit(1.0))
         )
    )
        .withColumn("importe_moneda_original", F.col("lineamount"))
        .withColumn("importe_mxn", F.col("lineamount") * F.col("TDC_Mensual"))
        .withColumn("importe_usd", F.col("importe_mxn") / F.col("tdc_mensual_promedio"))
        .withColumn("precio_unitario_mxn", F.col("salesprice") * F.col("TDC_Mensual"))
        .withColumn("precio_unitario_usd", F.col("precio_unitario_mxn") / F.col("tdc_mensual_promedio"))
        .drop("tdc_mensual_promedio")   # opcional, puedes dejarlo si quieres
    )

    max_wm_sales = df_sales.agg(F.max("SinkModifiedOn").alias("wm")).collect()[0]["wm"]

    df_sales.createOrReplaceTempView("src_fact_sales")

    if not table_exists(fact_sales_table):
        spark.sql(f"""
        CREATE TABLE {fact_sales_table} AS
        SELECT * FROM src_fact_sales
        """)
    else:
        spark.sql(f"""
        MERGE INTO {fact_sales_table} AS tgt
        USING src_fact_sales AS src
          ON  tgt.recid = src.recid
        WHEN MATCHED THEN
          UPDATE SET *
        WHEN NOT MATCHED THEN
          INSERT *
        """)

    upsert_watermark(entity, max_wm_sales)


StatementMeta(, 520f2c59-04dc-4bec-b5eb-1691a0957a21, 7, Finished, Available, Finished)

In [6]:
# ------------------------------------------------------------------
# X. silver.purch_datephysical_agg (MIN(datephysical) por OC + item)
#      incremental a partir de inventtransorigin + inventtrans
# Este proceso es para obtener la fecha física de manera más ligera
# ------------------------------------------------------------------

entity = "purch_datephysical_agg"
phys_table = "silver.purch_datephysical_agg"
ito_src = f"{bronze_db}.Bronze_shortcuts.inventtransorigin"
it_src  = f"{bronze_db}.Bronze_shortcuts.inventtrans"

# ita_src  = f"{bronze_db}.Bronze_shortcuts.inventtransarchive"


# Definir si es o no carga completa
force_full = False

# Watermark (usa el mismo helper que ya tienes en el notebook)
last_wm = None if force_full else get_last_watermark(entity)

ito = (spark.table(ito_src).alias("ito")
       .filter(
           (F.col("ito.dataareaid") == F.lit(empresa)) |
           F.col("ito.dataareaid").isNull()
       )
       .select(
           "recid",
           "referenceid",
           "itemid",
           "dataareaid"
       )
)

it = (spark.table(it_src).alias("it")
      .filter(
          (F.col("it.dataareaid") == F.lit(empresa)) |
          F.col("it.dataareaid").isNull()
      )
      .select(
          "inventtransorigin",
          "itemid",
          "dataareaid",
          "datephysical",
          "datefinancial",
          "SinkModifiedOn"
      )
)

# Incremental: solo inventtrans modificados desde el último watermark
if last_wm is not None:
    it = it.filter(F.col("it.SinkModifiedOn") > F.lit(last_wm))

df_phys = (
    ito.join(
        it,
        (F.col("ito.recid")      == F.col("it.inventtransorigin")) &
        (F.col("ito.itemid")     == F.col("it.itemid")) &
        (F.col("ito.dataareaid") == F.col("it.dataareaid")),
        "inner"
    )
    .groupBy("ito.dataareaid", "ito.referenceid", "ito.itemid")
    .agg(
        F.min("it.datephysical").alias("datephysical_min"),
        F.min("it.datefinancial").alias("datefinancial_min"),
        
        F.max("it.SinkModifiedOn").alias("max_SinkModifiedOn")
    )
)

if not df_phys.rdd.isEmpty():
    max_wm_phys = df_phys.agg(F.max("max_SinkModifiedOn").alias("wm")).collect()[0]["wm"]

    df_phys.createOrReplaceTempView("src_purch_datephysical")

    if not table_exists(phys_table):
        # Primera carga
        spark.sql(f"""
        CREATE TABLE {phys_table} AS
        SELECT
            dataareaid,
            referenceid,
            itemid,
            datephysical_min,
            datefinancial_min
        FROM src_purch_datephysical
        """)
    else:
        # MERGE incremental:
        # si ya existe, dejamos el MIN global de datephysical;
        # si no existe, insertamos.
        spark.sql(f"""
        MERGE INTO {phys_table} AS tgt
        USING (
            SELECT
                dataareaid,
                referenceid,
                itemid,
                datephysical_min,
                datefinancial_min
            FROM src_purch_datephysical
        ) AS src
          ON  tgt.dataareaid  = src.dataareaid
          AND tgt.referenceid = src.referenceid
          AND tgt.itemid      = src.itemid
        WHEN MATCHED THEN
          UPDATE SET
            tgt.datephysical_min = tgt.datephysical_min,
            tgt.datefinancial_min = tgt.datefinancial_min
        WHEN NOT MATCHED THEN
          INSERT (dataareaid, referenceid, itemid, datephysical_min, datefinancial_min)
          VALUES (src.dataareaid, src.referenceid, src.itemid, src.datephysical_min, src.datefinancial_min)
        """)

    upsert_watermark(entity, max_wm_phys)


StatementMeta(, 520f2c59-04dc-4bec-b5eb-1691a0957a21, 8, Finished, Available, Finished)

In [7]:
# ------------------------------------------------------------------
# 7. factPurchLine_Producto360 (incremental + datephysical + TDC)
# ------------------------------------------------------------------

entity = "factPurchLine_Producto360"
fact_purch_table = "silver.factPurchLine_Producto360"
purchline_src = f"{bronze_db}.Bronze_shortcuts.purchline"
purchtable_src = f"{bronze_db}.Bronze_shortcuts.purchtable"
phys_src = "silver.purch_datephysical_agg"

# Definir si es o no carga completa
force_full = False

# Watermark (usa el mismo helper que ya tienes en el notebook)
last_wm = None if force_full else get_last_watermark(entity)

pl = spark.table(purchline_src).alias("pl")
pt = spark.table(purchtable_src).alias("pt")
ph = spark.table(phys_src).alias("ph")
# tc = spark.table(tdc_src).alias("tc")

df_purch = (
    pl.join(
        pt,
        (F.col("pl.dataareaid") == F.col("pt.dataareaid")) &
        (F.col("pl.purchid")    == F.col("pt.purchid")),
        "inner"
    )
    .filter(
        (F.col("pl.dataareaid") == F.lit(empresa)) |
        F.col("pl.dataareaid").isNull()
    )
    .filter(F.col("pt.deliverydate") >= F.lit(cutoff_date))
    # Join a la tabla agregada con datephysical
    .join(
        ph,
        (F.col("pl.dataareaid") == F.col("ph.dataareaid")) &
        (F.col("pl.purchid")    == F.col("ph.referenceid")) &
        (F.col("pl.itemid")     == F.col("ph.itemid")),
        "left"
    )
    .select(
        F.col("pl.recid").alias("recid"),
        F.col("pl.dataareaid").alias("dataareaid"),
        F.col("pl.itemid").alias("itemid"),
        F.col("pl.purchid").alias("purchid"),
        F.col("pl.purchstatus").alias("purchstatus"),
        "pl.lineamount",
        "pl.confirmedtaxamount",
        "pl.currencycode",
        "pl.linepercent",
        "pl.externalitemid",
        "pl.inventrefid",
        "pl.purchprice",
        "pl.purchqty",
        "pl.purchunit",
        "pl.vendaccount",
        F.col("pt.purchstatus").alias("purchstatus_header"),
        "pt.cashdisc",
        "pt.cashdiscpercent",
        "pt.purchname",
        F.col("pt.deliverydate").alias("FechaCompra"),
        F.col("ph.datephysical_min").alias("FechaFisica"),
        F.col("ph.datefinancial_min").alias("FechaFinanciera"),
        F.col("pl.SinkModifiedOn").alias("SinkModifiedOn")
       # F.col("pl.createddatetime").alias("createddatetime")
    )
)

# Incremental por purchline.SinkModifiedOn
if last_wm is not None:
    df_purch = df_purch.filter(F.col("SinkModifiedOn") > F.lit(last_wm))

if not df_purch.rdd.isEmpty():
    # Agregar llave Año-Mes (STRING) y TDC con fallback al último mes disponible
    df_purch = (
        df_purch
        .withColumn("Key_Año_Mes", F.date_format(F.col("FechaCompra"), "yyyyMM").cast("string"))
        .withColumn(
            "Key_Año_Mes_fx",
            F.when(F.col("Key_Año_Mes") > F.lit(max_key_tdc),  # ambas STRING
                   F.lit(max_key_tdc)
            ).otherwise(F.col("Key_Año_Mes"))
        )
        .join(
            tc_sel,
            F.col("Key_Año_Mes_fx") == F.col("Key_Año_Mes_tdc"),
            "left"
        )
        .withColumn(
            "TDC_Mensual",
            F.when(F.col("currencycode") == F.lit("MXN"), F.lit(1.0))
             .otherwise(F.coalesce(F.col("tdc_mensual_promedio"), F.lit(1.0)))
        )
        .withColumn("importe_moneda_original", F.col("lineamount"))
        .withColumn("importe_mxn", F.col("lineamount") * F.col("TDC_Mensual"))
        .withColumn("importe_usd", F.col("importe_mxn") / F.col("tdc_mensual_promedio"))
        .withColumn("precio_unitario_mxn", F.col("purchprice") * F.col("TDC_Mensual"))
        .withColumn("precio_unitario_usd", F.col("precio_unitario_mxn") / F.col("tdc_mensual_promedio"))
        .drop("tdc_mensual_promedio")   # opcional
    )

    max_wm_purch = df_purch.agg(F.max("SinkModifiedOn").alias("wm")).collect()[0]["wm"]

    df_purch.createOrReplaceTempView("src_fact_purch")

    if not table_exists(fact_purch_table):
        spark.sql(f"""
        CREATE TABLE {fact_purch_table} AS
        SELECT * FROM src_fact_purch
        """)
    else:
        spark.sql(f"""
        MERGE INTO {fact_purch_table} AS tgt
        USING src_fact_purch AS src
          ON  tgt.recid = src.recid
        WHEN MATCHED THEN
          UPDATE SET *
        WHEN NOT MATCHED THEN
          INSERT *
        """)

    upsert_watermark(entity, max_wm_purch)


StatementMeta(, 520f2c59-04dc-4bec-b5eb-1691a0957a21, 9, Finished, Available, Finished)

In [8]:
mssparkutils.session.stop()


StatementMeta(, 520f2c59-04dc-4bec-b5eb-1691a0957a21, 10, Finished, Available, Finished)

In [12]:
%%sql
DELETE FROM dbo._watermarks_producto360
WHERE entity IN ('factPurchLine_Producto360');


StatementMeta(, 8690543d-79cf-467d-afa8-0812e9aa5269, 14, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 1 fields>