In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import DeltaTable
from pyspark.sql.window import Window

In [0]:
# orders table processing
after_schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("status", StringType(), True),
    StructField("order_date", TimestampType(), True)
])

df_bronze_orders=spark.read.format('delta').table('new_ecommerce_cdc.bronze.orders')

df_bronze_orders = df_bronze_orders.withColumn("after", from_json(col("after"), after_schema))

df_silver_orders = df_bronze_orders.select(
    col("op").alias("operation"), 
    col("timestamp").alias("updated_at"),
    col("after.order_id").alias("order_id"),
    col("after.customer_id").alias("customer_id"),
    col("after.product_id").alias("product_id"),
    col("after.status").alias("order_status"),
    col("after.order_date").alias("order_date")
)

# Define window to partition by order_id and get the latest update(deduplication)
window_spec = Window.partitionBy("order_id").orderBy(col("updated_at").desc())
df_silver_orders_dedup = df_silver_orders.withColumn("row_num", row_number().over(window_spec))
df_silver_orders_dedup = df_silver_orders_dedup.filter(col("row_num") == 1).drop("row_num")

silver_orders_table=DeltaTable.forName(spark, "new_ecommerce_cdc.silver.orders")

(silver_orders_table.alias("silver").merge(df_silver_orders_dedup.alias("bronze"), "silver.order_id = bronze.order_id").whenMatchedUpdate(
    condition="bronze.operation == 'u'",
    set={
            "customer_id": col("bronze.customer_id"),
            "product_id": col("bronze.product_id"),
            "status": col("bronze.order_status"),
            "order_date": col("bronze.order_date"),
            "last_updated": col("bronze.updated_at"),
        }
).whenMatchedDelete(
    condition="bronze.operation == 'd'",
).whenNotMatchedInsert(
    condition="bronze.operation IN ('c', 'r')",
        values={
            "order_id": col("bronze.order_id"),
            "customer_id": col("bronze.customer_id"),
            "product_id": col("bronze.product_id"),
            "status": col("bronze.order_status"),
            "order_date": col("bronze.order_date"),
            "last_updated": col("bronze.updated_at"),
        }
)
.execute())

In [0]:
# Customers table processing
customers_schema = StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("created_at", TimestampType(), True)
])

df_bronze_customers = spark.read.format('delta').table("new_ecommerce_cdc.bronze.customers")

df_bronze_customers = df_bronze_customers.withColumn("after", from_json(col("after"), customers_schema))

# Extract flattened fields
df_silver_customers = df_bronze_customers.select(
    col("op").alias("operation"), 
    col("after.customer_id").alias("customer_id"),
    col("after.name").alias("name"),
    col("after.email").alias("email"),
    col("after.created_at").alias('created_at'),
    col("timestamp").alias("updated_at"),
)

window_spec_cust = Window.partitionBy("customer_id").orderBy(col("updated_at").desc())
df_silver_customers_dedup = df_silver_customers.withColumn("row_num", row_number().over(window_spec_cust))
df_silver_customers_dedup = df_silver_customers_dedup.filter(col("row_num") == 1).drop("row_num")


# Load Silver Table
silver_customers_table = DeltaTable.forName(spark, "new_ecommerce_cdc.silver.customers")

# Perform MERGE (Upsert)
(
    silver_customers_table.alias("silver")
    .merge(df_silver_customers_dedup.alias("bronze"), "silver.customer_id = bronze.customer_id")
    .whenMatchedUpdate(
        condition="bronze.operation == 'u'",
        set={
            "name": col("bronze.name"),
            "email": col("bronze.email"),
            "created_at": col("bronze.created_at"),
            "last_updated": col("bronze.updated_at"),
        }
    )
    .whenMatchedDelete(
        condition="bronze.operation == 'd'"
    )
    .whenNotMatchedInsert(
        condition="bronze.operation IN ('c', 'r')",
        values={
            "customer_id": col("bronze.customer_id"),
            "name": col("bronze.name"),
            "email": col("bronze.email"),
            "created_at": col("bronze.created_at"),
            "last_updated": col("bronze.updated_at"),
        }
    )
    .execute()
)



In [0]:
# Products table processing
products_schema = StructType([
    StructField("product_id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("stock_quantity", IntegerType(), True),
    StructField("price", FloatType(), True),
    StructField("created_at", TimestampType(), True)
])

df_bronze_products = spark.read.format('delta').table("new_ecommerce_cdc.bronze.products")


df_bronze_products = df_bronze_products.withColumn("after", from_json(col("after"), products_schema))

# Extract flattened fields
df_silver_products = df_bronze_products.select(
    col("op").alias("operation"), 
    col("timestamp").alias("updated_at"),
    col("after.product_id").alias("product_id"),
    col("after.name").alias("name"),
    col("after.stock_quantity").alias("stock_quantity"),
    col("after.price").alias("price"),
    col("after.created_at").alias('created_at')
)

window_spec_prod = Window.partitionBy("product_id").orderBy(col("updated_at").desc())
df_silver_products_dedup = df_silver_products.withColumn("row_num", row_number().over(window_spec_prod))
df_silver_products_dedup = df_silver_products_dedup.filter(col("row_num") == 1).drop("row_num")


# Load Silver Table
silver_products_table = DeltaTable.forName(spark, "new_ecommerce_cdc.silver.products")

# Perform MERGE (Upsert)
(
    silver_products_table.alias("silver")
    .merge(df_silver_products_dedup.alias("bronze"), "silver.product_id = bronze.product_id")
    .whenMatchedUpdate(
        condition="bronze.operation == 'u'",
        set={
            "name": col("bronze.name"),
            "price": col("bronze.price"),
            "stock_quantity": col("bronze.stock_quantity"),
            "created_at": col("bronze.created_at"),
            "last_updated": col("bronze.updated_at"),
        }
    )
    .whenMatchedDelete(
        condition="bronze.operation == 'd'"
    )
    .whenNotMatchedInsert(
        condition="bronze.operation IN ('c', 'r')",
        values={
            "product_id": col("bronze.product_id"),
            "name": col("bronze.name"),
            "price": col("bronze.price"),
            "stock_quantity": col("bronze.stock_quantity"),
            "created_at": col("bronze.created_at"),
            "last_updated": col("bronze.updated_at"),
        }
    )
    .execute()
)

