In [0]:
sales_bronze = spark.table("workspace.default.retail_sales_dataset")


In [0]:
sales_bronze = sales_bronze.toDF(*[c.replace(" ", "_") for c in sales_bronze.columns])

# Check the new columns
sales_bronze.columns

['Transaction_ID',
 'Date',
 'Customer_ID',
 'Gender',
 'Age',
 'Product_Category',
 'Quantity',
 'Price_per_Unit',
 'Total_Amount']

In [0]:
sales_bronze.columns

['Transaction ID',
 'Date',
 'Customer ID',
 'Gender',
 'Age',
 'Product Category',
 'Quantity',
 'Price per Unit',
 'Total Amount']

In [0]:
from pyspark.sql.functions import col

sales_silver = (sales_bronze
                .dropDuplicates(["Transaction_ID"])  # deduplicate
                .withColumn("Quantity", col("Quantity").cast("int"))
                .withColumn("Price_per_Unit", col("Price_per_Unit").cast("double"))
                .withColumn("Total_Amount", col("Total_Amount").cast("double"))
                .withColumn("Date", col("Date").cast("date"))
               )

# Save Silver table in Unity Catalog
sales_silver.write.format("delta").mode("overwrite").saveAsTable("workspace.default.retail_sales_silver")

# Preview
sales_silver.show(5)

+--------------+----------+-----------+------+---+----------------+--------+--------------+------------+
|Transaction_ID|      Date|Customer_ID|Gender|Age|Product_Category|Quantity|Price_per_Unit|Total_Amount|
+--------------+----------+-----------+------+---+----------------+--------+--------------+------------+
|           271|2023-06-23|    CUST271|Female| 62|          Beauty|       4|          30.0|       120.0|
|           372|2023-02-07|    CUST372|Female| 24|          Beauty|       3|         500.0|      1500.0|
|           722|2023-07-14|    CUST722|  Male| 20|          Beauty|       3|         300.0|       900.0|
|           212|2023-06-09|    CUST212|  Male| 21|        Clothing|       3|         500.0|      1500.0|
|           234|2023-11-20|    CUST234|Female| 62|     Electronics|       2|          25.0|        50.0|
+--------------+----------+-----------+------+---+----------------+--------+--------------+------------+
only showing top 5 rows


In [0]:
sales_gold = (sales_silver
              .groupBy("Product_Category")
              .sum("Quantity", "Total_Amount")
              .withColumnRenamed("sum(Quantity)", "Total_Quantity")
              .withColumnRenamed("sum(Total_Amount)", "Total_Sales")
             )

# Save Gold table
sales_gold.write.format("delta").mode("overwrite").saveAsTable("workspace.default.retail_sales_gold")

In [0]:
sales_gold.show()

+----------------+--------------+-----------+
|Product_Category|Total_Quantity|Total_Sales|
+----------------+--------------+-----------+
|        Clothing|           894|   155580.0|
|          Beauty|           771|   143515.0|
|     Electronics|           849|   156905.0|
+----------------+--------------+-----------+



In [0]:
%sql
SELECT Product_Category, Total_Sales
FROM workspace.default.retail_sales_gold
ORDER BY Total_Sales DESC;

Product_Category,Total_Sales
Electronics,156905.0
Clothing,155580.0
Beauty,143515.0
