In [7]:
## 1. Staging Zone – Load CSV into Lakehouse Tables

# Import necessary modules
from pyspark.sql.functions import *
from datetime import datetime

# Define file paths (files uploaded to /lakehouse/default/Files/)
base_path = "Files/"

# Load CSV files into DataFrames
df_customers = spark.read.option("header", True).csv(base_path + "olist_customers_dataset.csv")
df_geolocation = spark.read.option("header", True).csv(base_path + "olist_geolocation_dataset.csv")
df_order_items = spark.read.option("header", True).csv(base_path + "olist_order_items_dataset.csv")
df_order_payments = spark.read.option("header", True).csv(base_path + "olist_order_payments_dataset.csv")
df_orders = spark.read.option("header", True).csv(base_path + "olist_orders_dataset.csv")
df_products = spark.read.option("header", True).csv(base_path + "olist_products_dataset.csv")

# Save as staging Delta tables
df_customers.write.mode("overwrite").format("delta").saveAsTable("stg_customers")
df_geolocation.write.mode("overwrite").format("delta").saveAsTable("stg_geolocation")
df_order_items.write.mode("overwrite").format("delta").saveAsTable("stg_order_items")
df_order_payments.write.mode("overwrite").format("delta").saveAsTable("stg_order_payments")
df_orders.write.mode("overwrite").format("delta").saveAsTable("stg_orders")
df_products.write.mode("overwrite").format("delta").saveAsTable("stg_products")


## 2. Data Warehouse Zone – Star Schema

# Create dim_customer with SCD Type 2 support
from pyspark.sql.window import Window

# Add hash column to detect changes
df_cust = spark.table("stg_customers")
df_cust = df_cust.withColumn("hash_id", sha2(concat_ws("||", *df_cust.columns), 256))
df_cust = df_cust.withColumn("start_date", current_date())
df_cust = df_cust.withColumn("end_date", lit("9999-12-31"))
df_cust = df_cust.withColumn("is_current", lit(True))

# If no dim_customer table exists, create it
try:
    spark.sql("SELECT * FROM dim_customer LIMIT 1")
    dim_exists = True
except:
    dim_exists = False

if not dim_exists:
    df_cust.write.mode("overwrite").format("delta").saveAsTable("dim_customer")
else:
    df_existing = spark.table("dim_customer").filter("is_current = true")
    df_joined = df_existing.alias("target").join(
        df_cust.alias("source"),
        "customer_id",
        "outer"
    )

    df_changed = df_joined.filter(
        "target.hash_id IS NULL OR source.hash_id IS NULL OR target.hash_id != source.hash_id"
    ).select("source.*")

    if df_changed.count() > 0:
        # Expire old records
        df_expire = df_existing.join(
            df_changed,
            on="customer_id"
        ).withColumn("end_date", current_date()) \
         .withColumn("is_current", lit(False))

        df_updated = df_existing.subtract(df_expire).unionByName(df_expire)
        df_final = df_updated.unionByName(df_changed)
        df_final.write.mode("overwrite").format("delta").saveAsTable("dim_customer")

# dim_product – SCD Type 1 (overwrite only)
df_products = spark.table("stg_products")
df_products.write.mode("overwrite").format("delta").saveAsTable("dim_product")

# fact_orders – joined from orders, items, payments
fact_orders = spark.sql("""
SELECT 
  o.order_id,
  o.customer_id,
  i.product_id,
  i.seller_id,
  i.price,
  p.payment_type,
  p.payment_value,
  o.order_purchase_timestamp
FROM stg_orders o
JOIN stg_order_items i ON o.order_id = i.order_id
JOIN stg_order_payments p ON o.order_id = p.order_id
""")


StatementMeta(, 8c79b26a-ee7a-463e-802f-688f60ddcb3f, 9, Finished, Available, Finished)

In [8]:
fact_orders.write.mode("overwrite").format("delta").saveAsTable("fact_orders")

StatementMeta(, 8c79b26a-ee7a-463e-802f-688f60ddcb3f, 10, Finished, Available, Finished)

In [2]:
from pyspark.sql.functions import (
    col, to_date, year, month, dayofmonth,
    dayofweek, weekofyear, date_format, monotonically_increasing_id
)

df_dates = spark.table("stg_orders") \
    .select(to_date(col("order_purchase_timestamp")).alias("date")) \
    .distinct() \
    .withColumn("year", year("date")) \
    .withColumn("month", month("date")) \
    .withColumn("day", dayofmonth("date")) \
    .withColumn("day_of_week", dayofweek("date")) \
    .withColumn("week_of_year", weekofyear("date")) \
    .withColumn("month_name", date_format("date", "MMMM")) \
    .withColumn("day_name", date_format("date", "EEEE"))

df_dates.write.mode("overwrite").format("delta").saveAsTable("dim_date")

StatementMeta(, 3d4fd0b2-d77d-4024-b28c-29305723bede, 4, Finished, Available, Finished)

In [3]:
df_geo = spark.table("stg_geolocation") \
    .select("geolocation_zip_code_prefix", "geolocation_city", "geolocation_state") \
    .distinct()

df_geo.write.mode("overwrite").format("delta").saveAsTable("dim_geolocation")

StatementMeta(, 3d4fd0b2-d77d-4024-b28c-29305723bede, 5, Finished, Available, Finished)

In [4]:
df_payments = spark.table("stg_order_payments") \
    .select("payment_type") \
    .distinct() \
    .withColumn("payment_type_id", monotonically_increasing_id())

df_payments.write.mode("overwrite").format("delta").saveAsTable("dim_payment_type")

StatementMeta(, 3d4fd0b2-d77d-4024-b28c-29305723bede, 6, Finished, Available, Finished)

In [5]:
df_status = spark.table("stg_orders") \
    .select("order_status") \
    .distinct() \
    .withColumn("order_status_id", monotonically_increasing_id())

df_status.write.mode("overwrite").format("delta").saveAsTable("dim_order_status")

StatementMeta(, 3d4fd0b2-d77d-4024-b28c-29305723bede, 7, Finished, Available, Finished)