#### **1. Read Bronze CSV (Raw Data)**



In [1]:
# Path to Bronze CSV
bronze_path = "Files/bronze/superstore/superstore_20240101.csv"

# Read raw CSV with schema-on-read
df_bronze = spark.read.option("header", True).option("inferSchema", True).csv(bronze_path)

# Preview first 5 rows
display(df_bronze.limit(10))

df_bronze.printSchema()

StatementMeta(, 2c60a086-11af-4520-bf46-cb6a2f58607a, 3, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, b451f4d2-c493-4511-ad6d-e002dcb19960)

root
 |-- Row ID: integer (nullable = true)
 |-- Order ID: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Ship Date: string (nullable = true)
 |-- Ship Mode: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Customer Name: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Postal Code: integer (nullable = true)
 |-- Region: string (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Sub-Category: string (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- Sales: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- Discount: string (nullable = true)
 |-- Profit: double (nullable = true)



#### **2. Clean and DeDuplicate Data**

In [2]:
from pyspark.sql import functions as f
# Remove duplicate rows
df_clean = df_bronze.dropDuplicates()

# Remove commas from Sales and cast numeric columns properly
df_clean = (
    df_clean
    .withColumn("Sales", f.regexp_replace("Sales", ",", "").cast("double"))
    .withColumn("Quantity", f.col("Quantity").cast("int"))
    .withColumn("Discount", f.col("Discount").cast("double"))
    .withColumn("Profit", f.col("Profit").cast("double"))
)

# Convert Order Date and Ship Date to proper date type (safe for Spark 3+)
df_clean = (
    df_clean
    .withColumn("Order Date", f.to_date("Order Date", "M/d/yyyy"))
    .withColumn("Ship Date", f.to_date("Ship Date", "M/d/yyyy"))
)

# Fill missing values
numeric_cols = ["Sales", "Quantity", "Discount", "Profit", "Postal Code"]
string_cols = [
    "Order ID", "Customer ID", "Customer Name", "Segment",
    "Country", "City", "State", "Region", "Product ID",
    "Category", "Sub-Category", "Product Name", "Ship Mode"
]

df_clean = df_clean.fillna(0, subset=numeric_cols).fillna("Unknown", subset=string_cols)

# Preview cleaned data
display(df_clean.limit(10))
df_clean.printSchema()

StatementMeta(, 2c60a086-11af-4520-bf46-cb6a2f58607a, 4, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 82466c00-d75e-4448-bd48-7a73ec62a4b7)

root
 |-- Row ID: integer (nullable = true)
 |-- Order ID: string (nullable = false)
 |-- Order Date: date (nullable = true)
 |-- Ship Date: date (nullable = true)
 |-- Ship Mode: string (nullable = false)
 |-- Customer ID: string (nullable = false)
 |-- Customer Name: string (nullable = false)
 |-- Segment: string (nullable = false)
 |-- Country: string (nullable = false)
 |-- City: string (nullable = false)
 |-- State: string (nullable = false)
 |-- Postal Code: integer (nullable = true)
 |-- Region: string (nullable = false)
 |-- Product ID: string (nullable = false)
 |-- Category: string (nullable = false)
 |-- Sub-Category: string (nullable = false)
 |-- Product Name: string (nullable = false)
 |-- Sales: double (nullable = false)
 |-- Quantity: integer (nullable = true)
 |-- Discount: double (nullable = false)
 |-- Profit: double (nullable = false)



#### **3. Create Dimension Tables (Silver Layer)**

#####  **3.1 Customer Dimension Table**

In [3]:
df_customers = df_clean.select(
    f.col("Customer ID").alias("customer_id"),
    f.col("Customer Name").alias("customer_name"),
    f.col("Segment").alias("segment"),
    f.col("Country").alias("country"),
    f.col("City").alias("city"),
    f.col("State").alias("state")
).dropDuplicates()

# Save in Silver
df_customers.write.format("delta").mode("overwrite").save("Files/silver/customers")
display(df_customers.limit(10))


StatementMeta(, 2c60a086-11af-4520-bf46-cb6a2f58607a, 5, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 4363c190-50ee-4924-821a-6c20008c251b)

#####  **3.2 Products Dimension Table**

In [4]:
df_products = (
    df_clean.select(
        f.col("Product ID").alias("product_id"),
        f.col("Category").alias("category"),
        f.col("Sub-Category").alias("sub_category"),
        f.col("Product Name").alias("product_name")
    )
    .dropDuplicates()
)

df_products.write.format("delta").mode("overwrite").save("Files/silver/products")

display(df_products.limit(10))


StatementMeta(, 2c60a086-11af-4520-bf46-cb6a2f58607a, 6, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 3fd8551b-b882-4c10-89d0-b1abc9c724fe)

####  **4. Orders Fact Table**

In [5]:
df_orders = (
    df_clean.select(
        f.col("Order ID").alias("order_id"),
        f.col("Customer ID").alias("customer_id"),
        f.col("Product ID").alias("product_id"),
        f.col("Order Date").alias("order_date"),
        f.col("Sales").alias("revenue"),
        f.col("Quantity").alias("quantity"),
        f.col("Discount").alias("discount"),
        f.col("Profit").alias("profit")
    )
)

df_orders.write.format("delta").mode("overwrite").save("Files/silver/orders")

display(df_orders.limit(10))


StatementMeta(, 2c60a086-11af-4520-bf46-cb6a2f58607a, 7, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 9ceba00a-862e-4872-9365-d88d43a59b25)

#### **5. Data Validation & Quality Check**

In [6]:
# 1. Primary Key Uniqueness (Fact Table)
spark.sql("""
SELECT order_id, product_id, COUNT(*) AS cnt
FROM delta.`Files/silver/orders`
GROUP BY order_id, product_id
HAVING cnt > 1
""").show()  # Check for duplicate orders

StatementMeta(, 2c60a086-11af-4520-bf46-cb6a2f58607a, 8, Finished, Available, Finished)

+--------------+---------------+---+
|      order_id|     product_id|cnt|
+--------------+---------------+---+
|CA-2017-152912|OFF-ST-10003208|  2|
|CA-2015-103135|OFF-BI-10000069|  2|
|US-2016-123750|TEC-AC-10004659|  2|
|CA-2016-140571|OFF-PA-10001954|  2|
|CA-2017-118017|TEC-AC-10002006|  2|
|CA-2016-137043|FUR-FU-10003664|  2|
|US-2014-150119|FUR-CH-10002965|  2|
|CA-2016-129714|OFF-PA-10001970|  2|
+--------------+---------------+---+



In [7]:
# Remove duplicate rows based on order_id + product_id
df_orders_clean = spark.read.format("delta").load("Files/silver/orders") \
    .dropDuplicates(["order_id", "product_id"])

# Overwrite the Silver orders table with cleaned data
df_orders_clean.write.format("delta").mode("overwrite").save("Files/silver/orders")

# Check again for duplicates
spark.sql("""
SELECT order_id, product_id, COUNT(*) AS cnt
FROM delta.`Files/silver/orders`
GROUP BY order_id, product_id
HAVING cnt > 1
""").show()  # Should return empty

StatementMeta(, 2c60a086-11af-4520-bf46-cb6a2f58607a, 9, Submitted, Waiting, Running)

In [None]:
# Null Checks 
spark.sql("""
SELECT
    SUM(CASE WHEN order_id IS NULL THEN 1 ELSE 0 END) AS null_order_id,
    SUM(CASE WHEN customer_id IS NULL THEN 1 ELSE 0 END) AS null_customer_id,
    SUM(CASE WHEN product_id IS NULL THEN 1 ELSE 0 END) AS null_product_id,
    SUM(CASE WHEN order_date IS NULL THEN 1 ELSE 0 END) AS null_order_date
FROM delta.`Files/silver/orders`
""").show()  # Check for missing critical fields


StatementMeta(, , -1, Waiting, , Waiting)

In [9]:
# Business Rule Validation
spark.sql("""
SELECT
    MIN(revenue) AS min_revenue,
    MAX(revenue) AS max_revenue,
    MIN(quantity) AS min_quantity,
    MAX(quantity) AS max_quantity
FROM delta.`Files/silver/orders`
""").show()  # Ensure revenue/quantity make sense

StatementMeta(, 2c60a086-11af-4520-bf46-cb6a2f58607a, 11, Finished, Available, Finished)

+-----------+-----------+------------+------------+
|min_revenue|max_revenue|min_quantity|max_quantity|
+-----------+-----------+------------+------------+
|       -0.0|   22638.48|           0|        1082|
+-----------+-----------+------------+------------+



In [10]:
# Referential Integrity (Fact -> Dimension)
# Customer FK check
spark.sql("""
SELECT COUNT(*) AS missing_customers
FROM delta.`Files/silver/orders` o
LEFT JOIN delta.`Files/silver/customers` c
ON o.customer_id = c.customer_id
WHERE c.customer_id IS NULL
""").show()

# Product FK check
spark.sql("""
SELECT COUNT(*) AS missing_products
FROM delta.`Files/silver/orders` o
LEFT JOIN delta.`Files/silver/products` p
ON o.product_id = p.product_id
WHERE p.product_id IS NULL
""").show()


StatementMeta(, 2c60a086-11af-4520-bf46-cb6a2f58607a, 12, Finished, Available, Finished)

+-----------------+
|missing_customers|
+-----------------+
|                0|
+-----------------+

+----------------+
|missing_products|
+----------------+
|               0|
+----------------+



In [11]:
# Date Range Validation
spark.sql("""
SELECT
    MIN(order_date) AS min_date,
    MAX(order_date) AS max_date
FROM delta.`Files/silver/orders`
""").show()

StatementMeta(, 2c60a086-11af-4520-bf46-cb6a2f58607a, 13, Finished, Available, Finished)

+----------+----------+
|  min_date|  max_date|
+----------+----------+
|2014-01-03|2017-12-30|
+----------+----------+



#### **6. Create Gold Layer - Dimensions**

In [12]:
from pyspark.sql import functions as F

# Load Silver Tables
# Fact table
df_orders = spark.read.format("delta").load("Files/silver/orders")

# Dimensions
df_customers = spark.read.format("delta").load("Files/silver/customers")
df_products = spark.read.format("delta").load("Files/silver/products")


StatementMeta(, 2c60a086-11af-4520-bf46-cb6a2f58607a, 14, Finished, Available, Finished)

In [13]:
from pyspark.sql import functions as f

# Create Date Dimension from Orders
df_dim_dates = (
    df_orders
    .select("order_date")
    .distinct()
    .withColumnRenamed("order_date", "date")
    .withColumn("year", f.year("date"))
    .withColumn("month", f.month("date"))
    .withColumn("day", f.dayofmonth("date"))
    .withColumn("quarter", f.quarter("date"))
    .withColumn("month_name", f.date_format("date", "MMMM"))
    .withColumn("weekday", f.date_format("date", "E"))
)

# Save Gold Date Dimension
df_dim_dates.write \
    .format("delta") \
    .mode("overwrite") \
    .save("Files/gold/dim_dates")



StatementMeta(, 2c60a086-11af-4520-bf46-cb6a2f58607a, 15, Finished, Available, Finished)

In [14]:
# Create Gold Fact Sales table
df_fact_sales = (
    df_orders
    .join(df_customers, "customer_id", "left")
    .join(df_products, "product_id", "left")
    .withColumn("net_revenue", f.col("revenue") * (1 - f.col("discount")))
    .withColumn(
        "profit_margin",
        f.when(f.col("revenue") != 0, f.col("profit") / f.col("revenue"))
         .otherwise(0)
    )
)

# Save Gold Fact table
df_fact_sales.write \
    .format("delta") \
    .mode("overwrite") \
    .save("Files/gold/fact_sales")


StatementMeta(, 2c60a086-11af-4520-bf46-cb6a2f58607a, 16, Finished, Available, Finished)

In [15]:
df_gold_monthly_revenue = (
    df_fact_sales
    .groupBy(
        f.year("order_date").alias("year"),
        f.month("order_date").alias("month")
    )
    .agg(
        f.sum("net_revenue").alias("monthly_revenue"),
        f.sum("profit").alias("monthly_profit")
    )
    .orderBy("year", "month")
)

df_gold_monthly_revenue.write \
    .format("delta") \
    .mode("overwrite") \
    .save("Files/gold/monthly_revenue")


StatementMeta(, 2c60a086-11af-4520-bf46-cb6a2f58607a, 17, Finished, Available, Finished)

In [16]:
df_gold_top_products = (
    df_fact_sales
    .groupBy("product_name", "category")
    .agg(f.sum("net_revenue").alias("total_revenue"))
    .orderBy(f.desc("total_revenue"))
)

df_gold_top_products.write \
    .format("delta") \
    .mode("overwrite") \
    .save("Files/gold/top_products")


StatementMeta(, 2c60a086-11af-4520-bf46-cb6a2f58607a, 18, Finished, Available, Finished)

#### **Load Silver Tables**

In [17]:
# Create Gold Customer Dimension (if needed, add business attributes
df_customers.write.format("delta").mode("overwrite").save("Files/gold/dim_customers")

# Create Gold Product Dimension (if needed, add business attributes)
df_products.write.format("delta").mode("overwrite").save("Files/gold/dim_products")

StatementMeta(, 2c60a086-11af-4520-bf46-cb6a2f58607a, 19, Finished, Available, Finished)

In [18]:
# Create Gold Fact Table (fact_sales) by joining orders + customers + products + dates
df_fact_sales.write.format("delta").mode("overwrite").save("Files/gold/fact_sales")

StatementMeta(, 2c60a086-11af-4520-bf46-cb6a2f58607a, 20, Finished, Available, Finished)