## Retail Data Analytics Platform

### 2. Bronze Layer – Data Ingestion
**Tasks:**
- Read CSV, JSON, and other data into DataFrames.
- Perform schema inference.
- Write raw data into Delta tables ( bronze_customers , bronze_orders ,
- bronze_products ).

In [0]:
# Read raw files with schema inference
df_customers = spark.read.option("header", True).csv("/FileStore/tables/customers.csv")
df_orders = spark.read.option("header", True).csv("/FileStore/tables/orders_day1.csv")
df_products = spark.read.option("multiline", True).json("/FileStore/tables/products.json")

#Schema
df_customers.printSchema()
df_customers.show()

df_orders.printSchema()
df_orders.show()

df_products.printSchema()
df_products.show()


# Write to Bronze Delta Tables
df_customers.write.format("delta").mode("overwrite").saveAsTable("bronze_customers")
df_orders.write.format("delta").mode("overwrite").saveAsTable("bronze_orders")
df_products.write.format("delta").mode("overwrite").saveAsTable("bronze_products")

root
 |-- customer_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- age: string (nullable = true)

+-----------+------------+---------+---+
|customer_id|        name|     city|age|
+-----------+------------+---------+---+
|          1|Rahul Sharma|Bangalore| 28|
|          2| Priya Singh|    Delhi| 32|
|          3|  Aman Kumar|Hyderabad| 25|
|          4| Sneha Reddy|  Chennai| 35|
|          5| Arjun Mehta|   Mumbai| 30|
|          6|  Divya Nair|    Delhi| 29|
+-----------+------------+---------+---+

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- product: string (nullable = true)
 |-- quantity: string (nullable = true)
 |-- price: string (nullable = true)
 |-- status: string (nullable = true)
 |-- order_date: string (nullable = true)

+--------+-----------+----------+--------+-----+---------+----------+
|order_id|customer_id|   product|quantity|price|   status|order_date|
+-------

In [0]:
from pyspark.sql.functions import col, expr

# Read Bronze tables
orders = spark.table("bronze_orders")
customers = spark.table("bronze_customers")
products = spark.table("bronze_products")

# Clean orders: remove pending and nulls
clean_orders = orders.filter(
    (col("status") == "Completed") & (col("customer_id").isNotNull())
)

# Add total_amount column
clean_orders = clean_orders.withColumn("total_amount", col("quantity").cast("double") * col("price").cast("double"))

# Join with customer and product using 'product_name' match (adjusting key name if needed)
enriched_orders = (
    clean_orders
    .join(customers, on="customer_id", how="inner")
    .join(products, clean_orders["product"] == products["product_name"], "left")
)

# Write Silver Table
enriched_orders.write.format("delta").mode("overwrite").saveAsTable("silver_orders")


In [0]:
from pyspark.sql.functions import sum as _sum, rank, col
from pyspark.sql.window import Window

# Load the Silver table
silver_orders = spark.table("silver_orders")

#  Calculate total revenue by region
revenue_by_region = (
    silver_orders
    .groupBy("city")
    .agg(_sum("total_amount").alias("total_revenue"))
)

display(revenue_by_region)

#  Find top-selling products by quantity
product_sales = (
    silver_orders
    .groupBy("product")
    .agg(_sum("quantity").alias("total_quantity_sold"))
)

#  Use window functions to rank products by sales
window_spec = Window.orderBy(col("total_quantity_sold").desc())
ranked_products = product_sales.withColumn("rank", rank().over(window_spec))

# Store final results as gold_sales_summary (including revenue)
product_revenue = (
    silver_orders
    .groupBy("product")
    .agg(_sum("total_amount").alias("total_revenue"))
)

# Combine ranking and revenue
gold_sales_summary = (
    ranked_products
    .join(product_revenue, on="product", how="left")
    .select("product", "total_quantity_sold", "total_revenue", "rank")
)

# Save as Delta table in Gold layer
gold_sales_summary.write.format("delta").mode("overwrite").saveAsTable("gold_sales_summary")

# result
display(spark.sql("SELECT * FROM gold_sales_summary ORDER BY rank LIMIT 10"))


city,total_revenue
Bangalore,125000.0
Delhi,75000.0




product,total_quantity_sold,total_revenue,rank
Headphones,5.0,15000.0,1
Mobile,3.0,75000.0,2
Laptop,2.0,110000.0,3


In [0]:
from pyspark.sql.functions import sum as _sum

gold_region_revenue = (
    spark.table("silver_orders")
    .groupBy("city")
    .agg(_sum("total_amount").alias("total_revenue"))
)

gold_region_revenue.write.format("delta").mode("overwrite").saveAsTable("gold_region_revenue")

In [0]:
data = """order_id,customer_id,product,quantity,price,status,order_date
1005,4,Mobile,1,25000,Completed,2024-01-18
1006,2,Laptop,1,55000,Completed,2024-01-19
1007,3,Book,5,700,Completed,2024-01-20
1003,3,Book,10,700,Completed,2024-01-16
"""

# Write the data to DBFS file path
dbutils.fs.put("/FileStore/tables/orders_day2.csv", data, overwrite=True)


Wrote 227 bytes.


True

In [0]:
from pyspark.sql.functions import expr
from delta.tables import DeltaTable

# Read new orders
orders_day2 = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv("/FileStore/tables/orders_day2.csv")
)

# Add total_amount column
orders_day2 = orders_day2.withColumn("total_amount", expr("quantity * price"))

# Filter out Pending orders
orders_day2_filtered = orders_day2.filter("status != 'Pending'")

display(orders_day2_filtered)

# Load silver_orders Delta table
silver_orders_delta = DeltaTable.forName(spark, "silver_orders")

# Define column mapping for update and insert
column_mapping = {
    "order_id": "updates.order_id",
    "customer_id": "updates.customer_id",
    "product": "updates.product",
    "quantity": "updates.quantity",
    "price": "updates.price",
    "status": "updates.status",
    "order_date": "updates.order_date",
    "total_amount": "updates.total_amount"
}

# Perform MERGE with explicit update and insert mappings
silver_orders_delta.alias("silver").merge(
    orders_day2_filtered.alias("updates"),
    "silver.order_id = updates.order_id"
).whenMatchedUpdate(
    set=column_mapping
).whenNotMatchedInsert(
    values=column_mapping
).execute()

display(spark.table("silver_orders").orderBy("order_date"))

order_id,customer_id,product,quantity,price,status,order_date,total_amount
1005,4,Mobile,1,25000,Completed,2024-01-18,25000
1006,2,Laptop,1,55000,Completed,2024-01-19,55000
1007,3,Book,5,700,Completed,2024-01-20,3500
1003,3,Book,10,700,Completed,2024-01-16,7000


customer_id,order_id,product,quantity,price,status,order_date,total_amount,name,city,age,category,product_id,product_name
1,1001,Laptop,2,55000,Completed,2024-01-15,110000.0,Rahul Sharma,Bangalore,28.0,Electronics,P001,Laptop
2,1002,Mobile,3,25000,Completed,2024-01-16,75000.0,Priya Singh,Delhi,32.0,Electronics,P002,Mobile
3,1003,Book,10,700,Completed,2024-01-16,7000.0,,,,,,
1,1004,Headphones,5,3000,Completed,2024-01-17,15000.0,Rahul Sharma,Bangalore,28.0,Accessories,P004,Headphones
4,1005,Mobile,1,25000,Completed,2024-01-18,25000.0,,,,,,
2,1006,Laptop,1,55000,Completed,2024-01-19,55000.0,,,,,,
3,1007,Book,5,700,Completed,2024-01-20,3500.0,,,,,,


In [0]:
# Check history to see versions and timestamps
spark.sql("DESCRIBE HISTORY gold_sales_summary").show(truncate=False)

version_to_query = 0  # change to the desired version number

historical_df = spark.read.format("delta") \
    .option("versionAsOf", version_to_query) \
    .table("gold_sales_summary")

historical_df.show()

# Overwrite gold_sales_summary with old version to rollback
historical_df.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("gold_sales_summary")
# Vacuum with default retention (7 days)

spark.sql("VACUUM gold_sales_summary")

# Or vacuum with custom retention in hours (e.g., 168 hours = 7 days)
# spark.sql("VACUUM gold_sales_summary RETAIN 168 HOURS")


+-------+-------------------+---------------+----------------------------------+---------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------+----+------------------+------------------------+-----------+-----------------+-------------+------------------------------------------------------------------------------------------------------------------------------------------+------------+------------------------------------------+
|version|timestamp          |userId         |userName                          |operation                        |operationParameters                                                                                                                                    |job |notebook          |clusterId               |readVersion|isolationLevel   |isBlindAppend|operationMetrics                                                                      

DataFrame[path: string]