In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Read silver tables
customer_df = spark.read.format('delta').load('/mnt/silver/Sales/Customer')
sales_order_header_df = spark.read.format('delta').load('/mnt/silver/Sales/SalesOrderHeader')
sales_order_detail_df = spark.read.format('delta').load('/mnt/silver/Sales/SalesOrderDetail')
product_df = spark.read.format('delta').load('/mnt/silver/Production/Product')
product_subcat_df = spark.read.format('delta').load('/mnt/silver/Production/ProductSubcategory')
product_cat_df = spark.read.format('delta').load('/mnt/silver/Production/ProductCategory')
person_df = spark.read.format('delta').load('/mnt/silver/Person/Person')
territory_df = spark.read.format('delta').load('/mnt/silver/Sales/SalesTerritory')

In [0]:
# 1. Create a Fact Table
sales_fact = (sales_order_detail_df
    .join(sales_order_header_df, "SalesOrderID")
    .join(product_df, "ProductID")
    .select(
        F.col("SalesOrderDetailID").alias("sales_order_detail_id"),
        F.col("SalesOrderID").alias("sales_order_id"),
        F.col("ProductID").alias("product_id"),
        F.col("CustomerID").alias("customer_id"),
        F.col("SalesPersonID").alias("sales_person_id"),
        F.col("TerritoryID").alias("territory_id"),
        F.col("OrderDate").alias("order_date"),
        F.col("OrderQty").alias("order_qty"),
        F.col("UnitPrice").cast("decimal(10,2)").alias("unit_price"),
        F.col("UnitPriceDiscount").cast("decimal(10,2)").alias("unit_price_discount"),
        (F.col("OrderQty") * F.col("UnitPrice") * (1 - F.col("UnitPriceDiscount"))).cast("decimal(10,2)").alias("line_total")
    ))

In [0]:
# 2. Create Customer Dimension

customer_dim = (customer_df
    .join(person_df, customer_df["PersonID"] == person_df["BusinessEntityID"], "left")
    .select(
        F.col("CustomerID").alias("customer_id"),
        F.col("PersonID").alias("person_id"),
        F.col("territoryID").alias("territory_id"),
        F.col("FirstName").alias("first_name"),
        F.col("LastName").alias("last_name"),
        F.concat(F.col("FirstName"), F.lit(" "), F.col("LastName")).alias("full_name"),
        F.col("Title").alias("title"),
    )
    .filter(F.col("PersonType") == "IN") # Filter out employees and non customers
)

In [0]:
# 3. Create Product Dimension with hierarchy
product_dim = (product_df.alias("prod")
    .join(product_subcat_df.alias("subcat"), "ProductSubcategoryID", "left")
    .join(product_cat_df.alias("cat"), "ProductCategoryID", "left")
    .select(
        F.col("ProductID").alias("product_id"),
        F.col("prod.Name").alias("product_name"),
        F.col("ProductNumber").alias("product_number"),
        F.col("ProductSubcategoryID").alias("product_subcat_id"),
        product_subcat_df.Name.alias("product_sub_cat_name"),
        F.col("ProductCategoryID").alias("product_cat_id"),
        product_cat_df.Name.alias("product_cat_name"),
        F.col("StandardCost").cast("decimal(10,2)").alias("standard_cost"),
        F.col("ListPrice").cast("decimal(10,2)").alias("list_price"),
        F.col("Color").alias("color"),
    ))


In [0]:
# 4. Create Territory Dimension
territory_dim = territory_df.select(
    F.col("TerritoryID").alias("territory_id"),
    F.col("Name").alias("TerritoryName"),
    F.col("CountryRegionCode").alias("country_code"),
    F.col("Group").alias("territory_group")
)

In [0]:
# 5. Create Date Dimension
date_dim = (sales_fact
    .select("order_date")
    .distinct()
    .select(
        F.col("order_date"),
        F.date_format("order_date", "yyyy").alias("year"),
        F.date_format("order_date", "MM").alias("month"),
        F.date_format("order_date", "dd").alias("day"),
        F.date_format("order_date", "Q").alias("quarter"),
        F.date_format("order_date", "MMMM").alias("month_name"),
        F.dayofweek("order_date").alias("day_of_Week")
    ))

In [0]:
# 6. Create Sales KPIs
sales_kpis = (sales_fact
    .join(customer_dim, "customer_id", "left")
    .groupBy("order_date")
    .agg(
        F.sum("line_total").cast("decimal(10,2)").alias("total_sales"),
        F.countDistinct("sales_order_id").alias("number_of_orders"),
        (F.sum("line_total") / F.countDistinct("sales_order_id")).cast("decimal(10,2)").alias("average_order_value"),
        F.countDistinct("customer_id").alias("unique_customers"),
        (F.sum("line_total") / F.countDistinct("customer_id")).cast("decimal(10,2)").alias("avg_sales_per_customer")
    ))

In [0]:
# Write to gold layer
sales_fact.write.format("delta").mode("overwrite").save('/mnt/gold/Sales/sales_fact/')
customer_dim.write.format("delta").mode("overwrite").save("/mnt/gold/Sales/customer_dim/")
product_dim.write.format("delta").mode("overwrite").save("/mnt/gold/Sales/product_dim/")
territory_dim.write.format("delta").mode("overwrite").save("/mnt/gold/Sales/territory_dim/")
date_dim.write.format("delta").mode("overwrite").save("/mnt/gold/Sales/date_dim/")
sales_kpis.write.format("delta").mode("overwrite").save("/mnt/gold/Sales/sales_kpis/")