In [0]:
# 02_Silver_Layer

from pyspark.sql.functions import col, to_timestamp

# 1. Set the context to our catalog and schema
spark.sql("USE CATALOG main")
spark.sql("USE SCHEMA ecommerce")

# Define path where bronze data sits
bronze_base_path = "/Volumes/main/ecommerce/lakehouse_vol/bronze/"

# ---------------------------------------------------------
# TABLE 1: ORDERS (silver_orders)
# ---------------------------------------------------------
print("Processing: silver_orders...")
df_orders = spark.read.parquet(f"{bronze_base_path}/orders/")

# Transformation:
# 1. Convert string timestamp to actual Timestamp type (Critical for later steps)
# 2. Drop duplicates on the Primary Key (order_id)
df_orders_clean = df_orders \
    .withColumn("order_purchase_timestamp", to_timestamp(col("order_purchase_timestamp"))) \
    .dropDuplicates(["order_id"]) \
    .dropna(subset=["order_id"]) # Drop if order_id is null

# Write as Managed Delta Table
df_orders_clean.write.format("delta").mode("overwrite").saveAsTable("silver_orders")
print("--> Created Table: main.ecommerce.silver_orders")


# ---------------------------------------------------------
# TABLE 2: ORDER ITEMS (silver_order_items)
# ---------------------------------------------------------
print("Processing: silver_order_items...")
df_items = spark.read.parquet(f"{bronze_base_path}/order_items/")

# Transformation: PK is order_id + order_item_id
df_items_clean = df_items \
    .dropDuplicates(["order_id", "order_item_id"]) \
    .dropna(subset=["order_id", "order_item_id"])

df_items_clean.write.format("delta").mode("overwrite").saveAsTable("silver_order_items")
print("--> Created Table: main.ecommerce.silver_order_items")


# ---------------------------------------------------------
# TABLE 3: CUSTOMERS (silver_customers)
# ---------------------------------------------------------
print("Processing: silver_customers...")
df_cust = spark.read.parquet(f"{bronze_base_path}/customers/")

# Transformation: PK is customer_id
df_cust_clean = df_cust \
    .dropDuplicates(["customer_id"]) \
    .dropna(subset=["customer_id"])

df_cust_clean.write.format("delta").mode("overwrite").saveAsTable("silver_customers")
print("--> Created Table: main.ecommerce.silver_customers")


# ---------------------------------------------------------
# TABLE 4: PRODUCTS (silver_products)
# ---------------------------------------------------------
print("Processing: silver_products...")
df_prod = spark.read.parquet(f"{bronze_base_path}/products/")

# Transformation: PK is product_id
df_prod_clean = df_prod \
    .dropDuplicates(["product_id"]) \
    .dropna(subset=["product_id"])

df_prod_clean.write.format("delta").mode("overwrite").saveAsTable("silver_products")
print("--> Created Table: main.ecommerce.silver_products")


# ---------------------------------------------------------
# TABLE 5: PAYMENTS (silver_payments)
# ---------------------------------------------------------
print("Processing: silver_payments...")
df_pay = spark.read.parquet(f"{bronze_base_path}/payments/")

# Transformation: PK is order_id + payment_sequential
df_pay_clean = df_pay \
    .dropDuplicates(["order_id", "payment_sequential"]) \
    .dropna(subset=["order_id"])

df_pay_clean.write.format("delta").mode("overwrite").saveAsTable("silver_payments")
print("--> Created Table: main.ecommerce.silver_payments")

print("------------------------------------------------")
print("SUCCESS: All Silver Managed Tables created!")