In [0]:
from pyspark.sql import functions as F

In [0]:
# configuration
bronze_abfss_path="abfss://bronze@delakehouse.dfs.core.windows.net"
silver_abfss_path="abfss://silver@delakehouse.dfs.core.windows.net"

## 1. Silver Orders


In [0]:
orders_df = (
    spark.read.format("parquet")
    .load(f"{bronze_abfss_path}/orders")
)

# Data quality checks
print(f"Bronze records: {orders_df.count()}")
print(f"Null order_ids: {orders_df.filter(F.col('order_id').isNull()).count()}")
print(f"Duplicate order_ids: {orders_df.count() - orders_df.dropDuplicates(['order_id']).count()}")


silver_orders = (
    orders_df
    .withColumn("created_at", F.to_timestamp("created_at"))
    .withColumn("order_id", F.col("order_id").cast("long"))
    .withColumn("price_usd",F.round(F.col("price_usd"),3).cast("double"))
    .withColumn("cogs_usd",F.round(F.col("cogs_usd"),3).cast("double"))
    .dropDuplicates(["order_id"])
    .filter(F.col("order_id").isNotNull())
)

print(f"Silver records after transformation: {silver_orders.count()}")

silver_orders.write.format("delta") \
    .mode("overwrite") \
    .save(f"{silver_abfss_path}/orders")

Bronze records: 32313
Null order_ids: 0
Duplicate order_ids: 0
Silver records after transformation: 32313


## 2. Silver Order Items

In [0]:
order_items_df = (
    spark.read.format("parquet")
    .load(f"{bronze_abfss_path}/order_items")
)

# Data quality checks
print(f"Bronze records: {order_items_df.count()}")
print(f"Null order_ids: {order_items_df.filter(F.col('order_item_id').isNull()).count()}")
print(f"Duplicate order_ids: {order_items_df.count() - order_items_df.dropDuplicates(['order_item_id']).count()}")

silver_order_items = (
    order_items_df
    .withColumn("created_at", F.to_timestamp("created_at"))
    .withColumn("order_item_id", F.col("order_item_id").cast("long"))
    .withColumn("order_id", F.col("order_id").cast("long"))
    .dropDuplicates(["order_item_id"])
    .filter(F.col("order_item_id").isNotNull())
)

print(f"Silver records after transformation: {silver_order_items.count()}")

silver_order_items.write.format("delta") \
    .mode("overwrite") \
    .save(f"{silver_abfss_path}/order_items_df")


Bronze records: 40025
Null order_ids: 0
Duplicate order_ids: 0
Silver records after transformation: 40025


## 3. Silver Order Item Refunds

In [0]:
refunds_df = (
    spark.read.format("parquet")
    .load(f"{bronze_abfss_path}/order_item_refunds")
)
from pyspark.sql.window import Window
window_refund = Window.partitionBy("order_item_id").orderBy(F.col("created_at").desc())

silver_refunds = (
    refunds_df
    .withColumn("created_at", F.to_timestamp("created_at"))
    .withColumn("order_item_id", F.col("order_item_id").cast("long"))
    .withColumn("rn", F.row_number().over(window_refund))
    .filter(F.col("rn") == 1) # One refund per order item (latest wins)
    .drop("rn")
)

silver_refunds.write.format("delta") \
    .mode("overwrite") \
    .save(f"{silver_abfss_path}/order_item_refunds")

## 4. Silver Products

In [0]:
products_df = (
    spark.read.format("parquet")
    .load(f"{bronze_abfss_path}/products")
)

silver_products = (
    products_df
    .withColumn("product_id", F.col("product_id").cast("long"))
    .withColumn("created_at", F.to_timestamp("created_at"))
    .withColumn("product_name", F.lower(F.col("product_name")))
    .dropDuplicates(["product_id"])
)

silver_products.write.format("delta") \
    .mode("overwrite") \
    .save(f"{silver_abfss_path}/products")

## 5. Silver Website Sessions

In [0]:
sessions_df = (
    spark.read.format("parquet")
    .load(f"{bronze_abfss_path}/website_sessions")
)

silver_sessions = (
    sessions_df
    .withColumn("created_at", F.to_timestamp("created_at"))
    .withColumn("website_session_id", F.col("website_session_id").cast("long"))
    .dropDuplicates(["website_session_id"])
)

silver_sessions.write.format("delta") \
    .mode("overwrite") \
    .save(f"{silver_abfss_path}/website_sessions")


## 6. Silver Website Pageviews

In [0]:
pageviews_df = (
    spark.read.format("parquet")
    .load(f"{bronze_abfss_path}/website_pageviews")
)

silver_pageviews = (
    pageviews_df
    .withColumn("created_at", F.to_timestamp("created_at"))
    .withColumn("website_pageview_id", F.col("website_pageview_id").cast("long"))
    .withColumn("pageview_url", F.lower(F.col("pageview_url")))
    .dropDuplicates(["website_pageview_id"])
)

silver_pageviews.write.format("delta") \
    .mode("overwrite") \
    .save(f"{silver_abfss_path}/website_pageviews")
