In [0]:
%load_ext autoreload
%autoreload 2

In [0]:
from tools.utils import *
from pyspark.sql import functions as F
from delta.tables import DeltaTable

In [0]:
stg_customers = (
    spark.table("ecom_lakehouse.silver.customers_staging")
         .select(
             F.col("customer_id"),
             F.initcap(F.trim(F.col("first_name"))).alias("first_name"),
             F.initcap(F.trim(F.col("last_name"))).alias("last_name"),
             F.when(
                 F.col("email_address").isNull() | (F.length(F.trim(F.col("email_address"))) == 0),
                 F.lit("unknown email")
             ).otherwise(F.trim(F.col("email_address"))).alias("email_address"),
             F.when(
                 F.col("phone_number").isNull() | (F.length(F.trim(F.col("phone_number"))) == 0),
                 F.lit("unknown phone")
             ).otherwise(F.trim(F.col("phone_number"))).alias("phone_number"),
             F.col("customer_since"),
             F.upper(F.col("is_active")).alias("is_active"),
             F.col("op"),
             F.col("event_time"),
             F.col("seq_num"),
             F.col("dt"),
             F.col("_bronze_ingested_at"),
             F.col("_bronze_source_file"),
             F.col("_silver_ingested_at"),
             F.col("_silver_source_table"),
         )
         .na.drop(subset=["customer_id", "customer_since"])
)

stg_customers = dedup_by(spark, stg_customers, "customer_id", "seq_num", "event_time")
stg_customers_latest = normalize_cdc_latest(
    spark_session = spark, 
    df = stg_customers,
    key_col = "customer_id", 
    seq_col = "seq_num", 
    ts_col = "event_time"
)

In [0]:
stg_customers_latest.createOrReplaceTempView("stg_customers_latest")

In [0]:
target = DeltaTable.forName(spark, "ecom_lakehouse.silver.customers")
source = stg_customers_latest.alias("src")

tgt_cols = set(target.toDF().columns)
src_cols = set(source.columns)
common = [c for c in (tgt_cols & src_cols) if c not in ("op", "event_time", "seq_num")]
set_map = {c: f"src.`{c}`" for c in common}
ins_map = {c: f"src.`{c}`" for c in common}


(
    target.alias("tgt")
    .merge(
        source,
        "tgt.customer_id = src.customer_id",
    )
    .whenMatchedDelete("src.op = 'D'")
    .whenMatchedUpdate("src.op = 'U'", set=set_map)
    .whenNotMatchedInsert("src.op = 'I'", values=ins_map)
    .execute()
)

In [0]:
category_mapping = {
    "electronics": "electronics",
    "electr√≥nica": "electronics",
    "home": "home",
    "hogar": "home",
    "juguetes": "toys",
    "toys": "toys"
}
mapping_expr = F.create_map(
    *[x for kv in category_mapping.items() for x in (F.lit(kv[0]), F.lit(kv[1]))]
)
raw_cat = F.col("category")
norm_cat = normalize_ascii_lower(raw_cat) 
mapped_cat = mapping_expr.getItem(norm_cat)    
final_cat = F.coalesce(mapped_cat, norm_cat)
final_cat_title = F.initcap(final_cat)  

stg_products = (
    spark.table("ecom_lakehouse.silver.products_staging")
        .select(
            F.col("product_id"),
            F.trim(F.initcap(F.col("product_name"))).alias("product_name"),
            F.when(F.col("category").isNull(),F.lit("unknown category")).otherwise(final_cat_title).alias("category"),
            F.col("unit_price"),
            F.upper(F.col("currency")).alias("currency"),
            F.col("product_release_date"),
            F.when(F.upper(F.col("is_discontinued")).isin("Y","YES"), "Y").otherwise("N").alias("is_discontinued"),
            F.col("op"),
            F.col("event_time"),
            F.col("seq_num"),
            F.col("dt"),
            F.col("_bronze_ingested_at"),
            F.col("_bronze_source_file"),
            F.col("_silver_ingested_at"),
            F.col("_silver_source_table"),           
        )
        .na.drop(subset=["product_id","product_name","unit_price","currency","product_release_date","is_discontinued"])
)
stg_products = dedup_by(spark, stg_products, "product_id", "seq_num", "event_time")
stg_products_latest = normalize_cdc_latest(
    spark_session = spark, 
    df = stg_products,
    key_col = "product_id", 
    seq_col = "seq_num", 
    ts_col = "event_time"    
) 

In [0]:
target = DeltaTable.forName(spark, "ecom_lakehouse.silver.products")
source = stg_products_latest.alias("src")

tgt_cols = set(target.toDF().columns)
src_cols = set(source.columns)
common = [c for c in (tgt_cols & src_cols) if c not in ("op", "event_time", "seq_num")]
set_map = {c: f"src.`{c}`" for c in common}
ins_map = {c: f"src.`{c}`" for c in common}
(
    target.alias("tgt")
    .merge(
        source,
        "tgt.product_id = src.product_id",
    )
    .whenMatchedDelete("src.op = 'D'")
    .whenMatchedUpdate("src.op = 'U'", set=set_map)
    .whenNotMatchedInsert("src.op = 'I'", values=ins_map)
    .execute()
)

In [0]:
stg_order_items = (
    spark.table("ecom_lakehouse.silver.order_items_staging")
         .select(
             F.col("order_item_id"),
             F.col("order_id"),
             F.col("product_id"),
             F.col("quantity"),
             F.col("unit_price"),
             F.upper(F.col("currency")).alias("currency"),
             F.col("op"),
             F.col("event_time"),
             F.col("seq_num"),
             F.col("dt"),
             F.col("_bronze_ingested_at"),
             F.col("_bronze_source_file"),
             F.col("_silver_ingested_at"),
             F.col("_silver_source_table"),
         )
         .na.drop(subset=["order_item_id","order_id","product_id","quantity"])
)
stg_order_items = dedup_by(spark, stg_order_items, "order_item_id", "order_id", "product_id", "seq_num", "event_time")
stg_order_items_latest = normalize_cdc_latest(
    spark_session = spark, 
    df = stg_order_items,
    key_col = "order_item_id", 
    seq_col = "seq_num", 
    ts_col = "event_time"
)

In [0]:
products_price = (
    stg_products_latest
        .select(
            F.col("product_id"),
            F.col("unit_price").alias("unit_price_prod")
        )
        .alias("p")
)

order_items_enriched = (
    stg_order_items_latest.alias("oi")
    .join(products_price, on=["product_id"], how="left")
    .withColumn(
        "unit_price",
        F.coalesce(F.col("oi.unit_price"), F.col("p.unit_price_prod"))
    )
    .withColumn(
        "line_amount",
        F.round(F.col("quantity") * F.col("unit_price"), 2
    )
    .drop("unit_price_prod")
    )

In [0]:
target = DeltaTable.forName(spark, "ecom_lakehouse.silver.order_items")
source = stg_order_items_latest.alias("src")

tgt_cols = set(target.toDF().columns)
src_cols = set(source.columns)
common = [c for c in (tgt_cols & src_cols) if c not in ("op", "event_time", "seq_num")]
set_map = {c: f"src.`{c}`" for c in common}
ins_map = {c: f"src.`{c}`" for c in common}

(
    target.alias("tgt")
    .merge(
        source,
        "tgt.order_item_id = src.order_item_id",
    )
    .whenMatchedDelete("src.op = 'D'")
    .whenMatchedUpdate("src.op = 'U'", set=set_map)
    .whenNotMatchedInsert("src.op = 'I'", values=ins_map)
    .execute()
)

In [0]:
stg_payments = (
    spark.table("ecom_lakehouse.silver.payments_staging")
        .select(
            F.col("payment_id"),
            F.col("order_id"),
            F.upper(F.col("payment_method")).alias("payment_method"),
            F.col("amount"),
            F.col("payment_date"),
            F.upper(F.col("status")).alias("status"),
            F.col("op"),
            F.col("event_time"),
            F.col("seq_num"),
            F.col("dt"),
            F.col("_bronze_ingested_at"),
            F.col("_bronze_source_file"),
            F.col("_silver_ingested_at"),
            F.col("_silver_source_table"),
        )
        .na.drop(subset=["payment_id","order_id","payment_method","amount","payment_date","status"])
)
stg_payments = dedup_by(spark, stg_payments, "payment_id", "order_id", "seq_num", "event_time")
stg_payments_latest = normalize_cdc_latest(
    spark_session = spark, 
    df = stg_payments,
    key_col = "payment_id", 
    seq_col = "seq_num", 
    ts_col = "event_time"
)

In [0]:
target = DeltaTable.forName(spark, "ecom_lakehouse.silver.payments")
source = stg_payments_latest.alias("src")

tgt_cols = set(target.toDF().columns)
src_cols = set(source.columns)
common = [c for c in (tgt_cols & src_cols) if c not in ("op", "event_time", "seq_num")]
set_map = {c: f"src.`{c}`" for c in common}
ins_map = {c: f"src.`{c}`" for c in common}

(
    target.alias("tgt")
    .merge(
        source,
        "tgt.payment_id = src.payment_id",
    )
    .whenMatchedDelete("src.op = 'D'")
    .whenMatchedUpdate("src.op = 'U'", set=set_map)
    .whenNotMatchedInsert("src.op = 'I'", values=ins_map)
    .execute()
)

In [0]:
stg_orders = (
    spark.table("ecom_lakehouse.silver.orders_staging")
        .select(
            F.col("order_id"),
            F.col("customer_id"),
            F.col("order_date"),
            F.upper(F.col("status")).alias("status"),
            F.col("shipping_address"),
            F.col("op"),
            F.col("event_time"),
            F.col("seq_num"),
            F.col("dt"),
            F.col("_bronze_ingested_at"),
            F.col("_bronze_source_file"),
            F.col("_silver_ingested_at"),
            F.col("_silver_source_table"),
        )
        .na.drop(subset=["order_id","customer_id","order_date","status","shipping_address"])
)
stg_orders = dedup_by(spark, stg_orders, "order_id", "customer_id", "seq_num", "event_time")
stg_orders_latest = normalize_cdc_latest(
    spark_session = spark, 
    df = stg_orders,
    key_col = "order_id", 
    seq_col = "seq_num", 
    ts_col = "event_time"
)


In [0]:
totals = stg_orders_latest \
            .filter(F.col("op") != "D") \
            .groupBy("order_id") \
            .agg(
                F.round(F.sum(F.col("line_amount").cast("double")), 2).alias("total_amount")
            )

orders_enriched = (
    stg_orders_latest.alias("o")
    .join(totals.alias("t"), "order_id", "left")
).na.drop(subset=["total_amount"])

In [0]:
target = DeltaTable.forname(spark, "ecom_lakehouse.silver.orders")
source = orders_enriched.alias("src")

tgt_cols = set(target.toDF().columns)
src_cols = set(source.columns)
common = [c for c in (tgt_cols & src_cols) if c not in ("op", "event_time", "seq_num")]
set_map = {c: f"src.`{c}`" for c in common}
ins_map = {c: f"src.`{c}`" for c in common}

(
    target.alias("tgt")
    .merge(
        source,
        "tgt.order_id = src.order_id",
    )
    .whenMatchedDelete("src.op = 'D'")
    .whenMatchedUpdate("src.op = 'U'", set=set_map)
    .whenNotMatchedInsert(values=ins_map)
    .execute()
)