Import Libraries

In [0]:
from pyspark.sql import functions as F


Read Bronze Tables

In [0]:
customers_bronze   = spark.table("workspace.bronze.customers")
products_bronze    = spark.table("workspace.bronze.products")
orders_bronze      = spark.table("workspace.bronze.orders")
order_items_bronze = spark.table("workspace.bronze.order_items")
sessions_bronze    = spark.table("workspace.bronze.web_sessions")
page_views_bronze  = spark.table("workspace.bronze.page_views")
conversions_bronze = spark.table("workspace.bronze.conversions")


**DIMENSION TABLES SECTION**

In [0]:
dim_customer = (
    customers_bronze
    .select(
        F.col("CustomerID").alias("customer_id"),
        F.col("FullName").alias("full_name"),
        F.col("Email").alias("email"),
        F.to_date("SignUpDate").alias("signup_date"),
        F.col("City").alias("city"),
        F.col("Country").alias("country")
    )
    .dropDuplicates(["customer_id"])
)

dim_customer.write.format("delta").mode("overwrite") \
    .saveAsTable("workspace.silver.dim_customer")


In [0]:
dim_product = (
    products_bronze
    .select(
        F.col("ProductID").alias("product_id"),
        F.col("ProductName").alias("product_name"),
        F.col("Category").alias("category"),
        F.col("Brand").alias("brand"),
        F.col("UnitPrice").cast("double").alias("unit_price")
    )
    .dropDuplicates(["product_id"])
)

dim_product.write.format("delta").mode("overwrite") \
    .saveAsTable("workspace.silver.dim_product")


In [0]:
order_dates = (
    orders_bronze
    .select(F.to_date("OrderDate").alias("order_date"))
    .where(F.col("order_date").isNotNull())
)

bounds = order_dates.agg(
    F.min("order_date").alias("min_date"),
    F.max("order_date").alias("max_date")
).collect()[0]

min_date = bounds["min_date"]
max_date = bounds["max_date"]

dim_date = (
    spark.createDataFrame([(min_date, max_date)], ["start", "end"])
    .select(F.explode(F.sequence("start", "end")).alias("date"))
    .select(
        F.col("date"),
        F.year("date").alias("year"),
        F.month("date").alias("month"),
        F.dayofmonth("date").alias("day"),
        F.date_format("date", "EEEE").alias("day_of_week")
    )
)

dim_date.write.format("delta").mode("overwrite") \
    .saveAsTable("workspace.silver.dim_date")


**FACT TABLES SECTION**

In [0]:
fact_orders = (
    orders_bronze
    .select(
        F.col("OrderID").alias("order_id"),
        F.col("CustomerID").alias("customer_id"),
        F.to_timestamp("OrderDate").alias("order_datetime"),
        F.to_date("OrderDate").alias("order_date"),
        F.col("OrderStatus").alias("order_status"),
        F.col("TotalAmount").cast("double").alias("total_amount")
    )
)

fact_orders.write.format("delta").mode("overwrite") \
    .saveAsTable("workspace.silver.fact_orders")


In [0]:
fact_order_items = (
    order_items_bronze
    .select(
        F.col("OrderItemID").alias("order_item_id"),
        F.col("OrderID").alias("order_id"),
        F.col("ProductID").alias("product_id"),
        F.col("Quantity").cast("int").alias("quantity"),
        F.col("PricePerUnit").cast("double").alias("price_per_unit"),
        (F.col("Quantity").cast("int") * F.col("PricePerUnit").cast("double")).alias("line_amount")
    )
)

fact_order_items.write.format("delta").mode("overwrite") \
    .saveAsTable("workspace.silver.fact_order_items")


In [0]:
from pyspark.sql import functions as F

# 1) Clean sessions (standardize column names)
sessions_clean = sessions_bronze.select(
    F.col("SessionID").alias("session_id"),
    F.col("CustomerID").alias("customer_id"),
    F.to_timestamp("StartTime").alias("session_start"),
    F.to_timestamp("EndTime").alias("session_end"),
    F.col("Device").alias("device"),
    F.col("Browser").alias("browser")
)

# 2) Page view events  ---------------------------
page_view_events = (
    page_views_bronze.alias("pv")
    .join(
        sessions_clean.alias("s"),
        F.col("pv.SessionID") == F.col("s.session_id"),
        "left"
    )
    .select(
        F.col("pv.ViewID").alias("event_id"),
        F.col("s.session_id"),
        F.col("s.customer_id"),
        F.to_timestamp("pv.Timestamp").alias("event_time"),   # 👈 NOTE: Timestamp
        F.lit("PageView").alias("event_type"),
        F.col("pv.PageURL").alias("page_url"),
        F.col("s.device"),
        F.col("s.browser")
    )
)

# 3) Conversion events  -------------------------
conversion_events = (
    conversions_bronze.alias("c")
    .join(
        sessions_clean.alias("s"),
        F.col("c.SessionID") == F.col("s.session_id"),
        "left"
    )
    .select(
        F.col("c.ConversionID").alias("event_id"),             # 👈 ConversionID
        F.col("s.session_id"),
        F.col("s.customer_id"),
        F.to_timestamp("c.Timestamp").alias("event_time"),     # 👈 Timestamp
        F.col("c.ConversionType").alias("event_type"),
        F.lit(None).cast("string").alias("page_url"),
        F.col("s.device"),
        F.col("s.browser")
    )
)

# 4) Union and save -----------------------------
fact_funnel_events = page_view_events.unionByName(conversion_events)

fact_funnel_events.write.format("delta").mode("overwrite") \
    .saveAsTable("workspace.silver.fact_funnel_events")
