In [2]:
from pyspark.sql.functions import to_date, col

# Step 1: Load the original bronze table
df = spark.read.table("superstore_bronze")

# Step 2: Rename columns to remove spaces (makes everything easier later)
for colname in df.columns:
    df = df.withColumnRenamed(colname, colname.strip().replace(" ", "_"))

# Step 3: Convert string to date and filter
df = df.withColumn("Order_Date", to_date("Order_Date", "M/d/yyyy"))
df_incremental = df.filter(col("Order_Date") > "2021-01-01")

# Step 4: Save the filtered output as a new Delta table
df_incremental.write.format("delta").mode("overwrite").saveAsTable("sales_incremental")


StatementMeta(, f75ad1f3-a268-490b-aeba-02a1f44d8ad2, 4, Finished, Available, Finished)

In [3]:
df_incremental.show()

StatementMeta(, f75ad1f3-a268-490b-aeba-02a1f44d8ad2, 5, Finished, Available, Finished)

+------+--------+----------+---------+---------+-----------+-------------+-------+-------+----+-----+-----------+------+----------+--------+------------+------------+-----+--------+--------+------+
|Row_ID|Order_ID|Order_Date|Ship_Date|Ship_Mode|Customer_ID|Customer_Name|Segment|Country|City|State|Postal_Code|Region|Product_ID|Category|Sub-Category|Product_Name|Sales|Quantity|Discount|Profit|
+------+--------+----------+---------+---------+-----------+-------------+-------+-------+----+-----+-----------+------+----------+--------+------------+------------+-----+--------+--------+------+
+------+--------+----------+---------+---------+-----------+-------------+-------+-------+----+-----+-----------+------+----------+--------+------------+------------+-----+--------+--------+------+



In [2]:
from pyspark.sql.functions import col, year, month, to_date

# Load the incremental table
df = spark.read.table("sales_incremental")

# Rename all columns to remove spaces
for old_name in df.columns:
    new_name = old_name.strip().replace(" ", "_")
    df = df.withColumnRenamed(old_name, new_name)

# Now apply transformations
df_silver = (
    df.dropna(subset=["Sales", "Profit"])
      .withColumn("Order_Year", year(col("Order_Date")))
      .withColumn("Order_Month", month(col("Order_Date")))
      .withColumn("Profit_Margin", col("Profit") / col("Sales"))
)

# Write to Silver Layer (Delta table)
df_silver.write \
    .format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable("superstore_silver")


StatementMeta(, b40c0f3b-f505-4071-86de-d21201dd62d9, 4, Finished, Available, Finished)

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import Row

# Sample customer data
customer_rows = [
    Row(Customer_ID="AB-10115", Customer_Name="Andrew Brown", Age_Group="30-40", Gender="M", Region="West", Loyalty_Score=4),
    Row(Customer_ID="AF-10250", Customer_Name="Alice Fox", Age_Group="20-30", Gender="F", Region="East", Loyalty_Score=3),
    Row(Customer_ID="BM-10355", Customer_Name="Brian Moore", Age_Group="40-50", Gender="M", Region="South", Loyalty_Score=5),
    Row(Customer_ID="CA-10450", Customer_Name="Cynthia Adams", Age_Group="30-40", Gender="F", Region="Central", Loyalty_Score=2),
    Row(Customer_ID="DL-10560", Customer_Name="David Lee", Age_Group="20-30", Gender="M", Region="West", Loyalty_Score=1),
    Row(Customer_ID="ER-10665", Customer_Name="Emma Ross", Age_Group="30-40", Gender="F", Region="East", Loyalty_Score=4),
]

df_customers = spark.createDataFrame(customer_rows)

# Save as Delta table
df_customers.write.format("delta").mode("overwrite").saveAsTable("customers")

# Preview
df_customers.show()


StatementMeta(, b40c0f3b-f505-4071-86de-d21201dd62d9, 6, Finished, Available, Finished)

+-----------+-------------+---------+------+-------+-------------+
|Customer_ID|Customer_Name|Age_Group|Gender| Region|Loyalty_Score|
+-----------+-------------+---------+------+-------+-------------+
|   AB-10115| Andrew Brown|    30-40|     M|   West|            4|
|   AF-10250|    Alice Fox|    20-30|     F|   East|            3|
|   BM-10355|  Brian Moore|    40-50|     M|  South|            5|
|   CA-10450|Cynthia Adams|    30-40|     F|Central|            2|
|   DL-10560|    David Lee|    20-30|     M|   West|            1|
|   ER-10665|    Emma Ross|    30-40|     F|   East|            4|
+-----------+-------------+---------+------+-------+-------------+



In [6]:
df_customers = df_customers \
    .withColumnRenamed("Customer_Name", "Customer_Name_From_Customers") \
    .withColumnRenamed("Region", "Region_From_Customers")


StatementMeta(, 9f3d3d66-5c9e-4aee-803b-0f4cfdbaa66d, 8, Finished, Available, Finished)

In [7]:
df_gold = df_silver.join(df_customers, on="Customer_ID", how="left")

df_gold.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable("superstore_gold")


StatementMeta(, 9f3d3d66-5c9e-4aee-803b-0f4cfdbaa66d, 9, Finished, Available, Finished)

In [8]:
spark.read.table("superstore_bronze").select("`Customer ID`").distinct().show()


StatementMeta(, b40c0f3b-f505-4071-86de-d21201dd62d9, 10, Finished, Available, Finished)

+-----------+
|Customer ID|
+-----------+
|   VW-21775|
|   RR-19315|
|   PB-19210|
|   MY-17380|
|   EM-13960|
|   MS-17530|
|   SW-20275|
|   AH-10690|
|   KH-16630|
|   BD-11500|
|   JF-15490|
|   JF-15415|
|   PH-18790|
|   IM-15070|
|   PW-19240|
|   OT-18730|
|   NW-18400|
|   KD-16615|
|   KM-16225|
|   KF-16285|
+-----------+
only showing top 20 rows



In [12]:
spark.read.table("customers").show()


StatementMeta(, b40c0f3b-f505-4071-86de-d21201dd62d9, 14, Finished, Available, Finished)

+-----------+---------+------+------+-------------+
|Customer_ID|Age_Group|Gender|Region|Loyalty_Score|
+-----------+---------+------+------+-------------+
|   VW-21775|    30-40|     F|  West|            3|
|   RR-19315|    30-40|     F|  West|            3|
|   PB-19210|    30-40|     F|  West|            3|
|   MY-17380|    30-40|     F|  West|            3|
|   EM-13960|    30-40|     F|  West|            3|
|   MS-17530|    30-40|     F|  West|            3|
|   SW-20275|    30-40|     F|  West|            3|
|   AH-10690|    30-40|     F|  West|            3|
|   KH-16630|    30-40|     F|  West|            3|
|   BD-11500|    30-40|     F|  West|            3|
|   JF-15490|    30-40|     F|  West|            3|
|   JF-15415|    30-40|     F|  West|            3|
|   PH-18790|    30-40|     F|  West|            3|
|   IM-15070|    30-40|     F|  West|            3|
|   PW-19240|    30-40|     F|  West|            3|
|   OT-18730|    30-40|     F|  West|            3|
|   NW-18400

In [18]:
df_gold.printSchema()
df_gold.show(5)
print("Row count:", df_gold.count())



StatementMeta(, b40c0f3b-f505-4071-86de-d21201dd62d9, 20, Finished, Available, Finished)

root
 |-- Customer_ID: string (nullable = true)
 |-- Row_ID: long (nullable = true)
 |-- Order_ID: string (nullable = true)
 |-- Order_Date: date (nullable = true)
 |-- Ship_Date: date (nullable = true)
 |-- Ship_Mode: string (nullable = true)
 |-- Customer_Name: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Postal_Code: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Sub-Category: string (nullable = true)
 |-- Product_Name: string (nullable = true)
 |-- Sales: double (nullable = true)
 |-- Quantity: long (nullable = true)
 |-- Discount: double (nullable = true)
 |-- Profit: double (nullable = true)
 |-- Profit_Margin: double (nullable = true)
 |-- Order_Year: integer (nullable = true)
 |-- Order_Month: integer (nullable = true)
 |-- Age_Group: string (

In [20]:
df_silver.select("Customer_ID").distinct().show()
df_customers.select("Customer_ID").distinct().show()


StatementMeta(, b40c0f3b-f505-4071-86de-d21201dd62d9, 22, Finished, Available, Finished)

+-----------+
|Customer_ID|
+-----------+
+-----------+

+-----------+
|Customer_ID|
+-----------+
|   VW-21775|
|   RR-19315|
|   PB-19210|
|   MY-17380|
|   EM-13960|
|   MS-17530|
|   SW-20275|
|   AH-10690|
|   KH-16630|
|   BD-11500|
|   JF-15490|
|   JF-15415|
|   PH-18790|
|   IM-15070|
|   PW-19240|
|   OT-18730|
|   NW-18400|
|   KD-16615|
|   KM-16225|
|   KF-16285|
+-----------+
only showing top 20 rows



In [21]:
from pyspark.sql.functions import to_date, col, year, month

# Read from raw bronze table
df = spark.read.table("superstore_bronze")

# Clean column names
for c in df.columns:
    df = df.withColumnRenamed(c, c.strip().replace(" ", "_"))

# Convert to date format
df = df.withColumn("Order_Date", to_date("Order_Date", "M/d/yyyy"))

# Recreate Silver table with full records (including Customer_ID)
df_silver = (
    df.dropna(subset=["Sales", "Profit"])
      .withColumn("Profit_Margin", col("Profit") / col("Sales"))
      .withColumn("Order_Year", year(col("Order_Date")))
      .withColumn("Order_Month", month(col("Order_Date")))
)

# Save the silver table
df_silver.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable("superstore_silver")


StatementMeta(, b40c0f3b-f505-4071-86de-d21201dd62d9, 23, Finished, Available, Finished)

In [23]:
from pyspark.sql.functions import upper, trim

# Reload and clean Customer_IDs
df_silver = spark.read.table("superstore_silver").withColumn("Customer_ID", upper(trim("Customer_ID")))
df_customers = spark.read.table("customers").withColumn("Customer_ID", upper(trim("Customer_ID")))

# Optional: Rename conflicting columns to avoid duplicate
df_customers = df_customers.withColumnRenamed("Region", "Region_Customer")

# Join
df_gold = df_silver.join(df_customers, on="Customer_ID", how="left")

# Check row count and preview
print("Row count after join:", df_gold.count())
df_gold.show(5)

# Save gold table
df_gold.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable("superstore_gold")


StatementMeta(, b40c0f3b-f505-4071-86de-d21201dd62d9, 25, Finished, Available, Finished)

Row count after join: 9994
+-----------+------+--------------+----------+----------+--------------+----------------+--------+-------------+-----------+----------+-----------+------+---------------+---------------+------------+--------------------+------+--------+--------+-------+-------------------+----------+-----------+---------+------+---------------+-------------+
|Customer_ID|Row_ID|      Order_ID|Order_Date| Ship_Date|     Ship_Mode|   Customer_Name| Segment|      Country|       City|     State|Postal_Code|Region|     Product_ID|       Category|Sub-Category|        Product_Name| Sales|Quantity|Discount| Profit|      Profit_Margin|Order_Year|Order_Month|Age_Group|Gender|Region_Customer|Loyalty_Score|
+-----------+------+--------------+----------+----------+--------------+----------------+--------+-------------+-----------+----------+-----------+------+---------------+---------------+------------+--------------------+------+--------+--------+-------+-------------------+----------+-

In [None]:
df = spark.sql("SELECT * FROM RetailLakehouse.superstore_gold LIMIT 1000")
display(df)

StatementMeta(, 9a15dde0-1114-44a1-9a3a-c46a1aeb1e47, 4, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, d031d620-729d-4443-a24b-2106024796bf)