In [0]:
# Step 1: Load the raw data
bronze_df = spark.read.option("header", "true").option("inferSchema", "true") \
    .csv("/Workspace/Users/parthasarathy_1@msn.com/Sales.csv")

# Step 2: Clean column names (replace spaces and special characters)
import re
for col in bronze_df.columns:
    clean_col = re.sub(r'[^A-Za-z0-9_]', '_', col)  # replace spaces and invalid chars with _
    bronze_df = bronze_df.withColumnRenamed(col, clean_col)

# Step 3: Verify schema
bronze_df.printSchema()



# Step 4: Write to Bronze layer in Delta format
# df.write.format("delta").mode("overwrite").save("/dbfs/tmp/bronze/sales_orders")

In [0]:
bronze_df.write.format("delta").mode("overwrite").saveAsTable("bronze_sales_orders")

In [0]:
%sql
SELECT * FROM bronze_sales_orders

In [0]:
silver_df = bronze_df \
    .withColumnRenamed("Order Date", "Order_Date") \
    .withColumnRenamed("Customer Name", "Customer_Name") \
    .withColumnRenamed("Product Category", "Product_Category")

silver_df.write.format("delta").mode("overwrite").saveAsTable("silver_sales_orders")

In [0]:
%sql
SELECT * FROM silver_sales_orders

In [0]:
from pyspark.sql import functions as F

gold_df = silver_df.groupBy("Region", "Product_Category") \
    .agg(
        F.sum("Total_Revenue").alias("TotalRevenue"),
        F.sum("Total_Profit").alias("TotalProfit")
    )

gold_df.write.format("delta").mode("overwrite").saveAsTable("gold_sales_orders")

In [0]:
%sql
SELECT * FROM gold_sales_orders