In [None]:
# Welcome to the Retail notebook
# This notebook will load the reatil data from the Kaggle SuperStore 2016 Data set.
# By Marlon Lambert (ML)
# %%pyspark

import pandas as pd
from pyspark.sql import functions as F

# 1. Read Excel into pandas
#    (Had to abandon scheme build for Bronze to Gold as PySpark does not cater for create schema command)
#   Set path
XLSX_PATH = "/lakehouse/default/Files/raw/global_superstore_2016.xlsx"

#   Read the Orders sheet
orders_pdf = pd.read_excel(XLSX_PATH, sheet_name="Orders")

# 2. Convert pandas â†’ Spark
orders_sdf = spark.createDataFrame(orders_pdf)

# 3. Clean column names
for c in orders_sdf.columns:
    orders_sdf = orders_sdf.withColumnRenamed(c, c.strip().replace(" ", "_").replace("-", "_"))

# 4. Write to Fabric Lakehouse (Bronze)
orders_sdf.write.mode("overwrite").format("delta").saveAsTable("bronze_superstore_raw")

StatementMeta(, f4244e14-eb6d-4f15-b227-47bf782e1f7f, 8, Finished, Available, Finished)

In [None]:
# %%pyspark
import pandas as pd
from pyspark.sql import functions as F

b = spark.table(f"bronze_superstore_raw")
# Update these column data types

s = (b
     .withColumn("Order_Date", F.to_date("Order_Date"))
     .withColumn("Ship_Date",  F.to_date("Ship_Date"))
     .withColumn("Postal_Code", F.col("Postal_Code").cast("string"))
     .withColumn("Sales",     F.col("Sales").cast("double"))
     .withColumn("Quantity",  F.col("Quantity").cast("int"))
     .withColumn("Discount",  F.col("Discount").cast("double"))
     .withColumn("Profit",    F.col("Profit").cast("double"))
     .filter(F.col("Sales").isNotNull())
     .filter(F.col("Order_Date").isNotNull())
     .dropDuplicates(["Order_ID","Product_ID"])   # light dedupe; tweak if needed
    )

s.write.mode("overwrite").format("delta").saveAsTable("silver_superstore_clean")
print("-- Silver tables --")
spark.table("silver_superstore_clean").count()

StatementMeta(, 846758bd-9ae4-45c5-a00f-504c08ba0927, 8, Finished, Available, Finished)

-- Silver tables --


51248

In [3]:
# Now Create Start Schema
# Implementing naming prefix of "gold_" identify gold standard tables
# (Had to abandon scheme build for Bronze to Gold as PySpark does not cater for create schema command)
# %%pyspark
import pandas as pd
from pyspark.sql import functions as F

# Derive dims and facts from SuperStore Sliver table
sv = spark.table(f"silver_superstore_clean")

# Dimensions
dim_product = (sv.select(
        F.col("Product_ID").alias("ProductKey"),
        F.col("Product_Name").alias("ProductName"),
        "Category","Sub_Category"
    ).dropDuplicates(["ProductKey"])
)
dim_customer = (sv.select(
        F.col("Customer_ID").alias("CustomerKey"),
        F.col("Customer_Name").alias("CustomerName"),
        "Segment"
    ).dropDuplicates(["CustomerKey"])
)
# Using SHA-256 hash to generate unique key for the Geography dimension
dim_geo = (sv.select("Country","State","City","Region","Postal_Code")
           .dropDuplicates()
           .withColumn("GeoKey", F.sha2(F.concat_ws("|","Country","State","City","Region","Postal_Code"), 256))
           .select("GeoKey","Country","State","City","Region","Postal_Code")
)
dim_date = (sv.select(F.col("Order_Date").alias("Date"))
            .where(F.col("Date").isNotNull())
            .dropDuplicates()
            .withColumn("Year", F.year("Date"))
            .withColumn("Month", F.month("Date"))
            .withColumn("MonthName", F.date_format("Date","MMM"))
)

# Fact
# Using same Hash again

fact_sales = (sv
    .withColumn("GeoKey", F.sha2(F.concat_ws("|","Country","State","City","Region","Postal_Code"), 256))
    .select(
        F.col("Order_ID").alias("OrderID"),
        F.col("Order_Date").alias("Date"),
        F.col("Product_ID").alias("ProductKey"),
        F.col("Customer_ID").alias("CustomerKey"),
        "GeoKey",
        "Sales","Quantity","Discount","Profit","Region","Ship_Mode"
    )
)

# Persist Gold
(dim_product.write.mode("overwrite").format("delta").saveAsTable(f"gold_dim_product"))
(dim_customer.write.mode("overwrite").format("delta").saveAsTable(f"gold_dim_customer"))
(dim_geo.write.mode("overwrite").format("delta").saveAsTable(f"gold_dim_geo"))
(dim_date.write.mode("overwrite").format("delta").saveAsTable(f"gold_dim_date"))
(fact_sales.write.mode("overwrite").format("delta").saveAsTable(f"gold_fact_sales"))

# Monthly KPIs
kpi_monthly = (fact_sales
    .withColumn("Year",  F.year("Date"))
    .withColumn("Month", F.month("Date"))
    .groupBy("Year","Month","Region")
    .agg(
        F.sum("Sales").alias("Sales"),
        F.sum("Profit").alias("Profit"),
        F.sum("Quantity").alias("Quantity"),
        (F.sum("Profit")/F.sum("Sales")).alias("MarginPct")
    )
)
kpi_monthly.write.mode("overwrite").format("delta").saveAsTable(f"gold_kpi_monthly")

print("-- Gold tables --")
for t in ["gold_dim_product","gold_dim_customer","gold_dim_geo","gold_dim_date","gold_fact_sales","gold_kpi_monthly"]:
    count = spark.table(t).count()
    print(f"{t:<25} {count:} rows")
    

StatementMeta(, 4146798b-43c6-4724-9597-c091037c3b68, 5, Finished, Available, Finished)

-- Gold tables --
gold_dim_product          3788 rows
gold_dim_customer         17415 rows
gold_dim_geo              3855 rows
gold_dim_date             1430 rows
gold_fact_sales           51248 rows
gold_kpi_monthly          1097 rows
