### Step 1: Create Raw Data in Databricks Community Edition

### 🛒 1️⃣ 150 Records — raw_products

In [0]:
raw_products = [
    ("P001", "Dove Shampoo", "Beauty", 199.50),
    ("P002", "Samsung Earbuds", "Electronics", 399.00),
    ("P003", "Dell Mouse", "Electronics", 149.00),
    ("P004", "Lakme Face Wash", "Beauty", 249.00),
    ("P005", "Boat Headphones", "Electronics", 1299.00),
    ("P006", "Pepsodent Toothpaste", "Grocery", 89.00),
    ("P007", "Sunfeast Biscuits", "Grocery", 35.00),
    ("P008", "Amul Butter", "Grocery", 55.00),
    ("P009", "Parle-G Biscuits", "Grocery", 25.00),
    ("P010", "Tata Salt", "Grocery", 45.00),
    ("P011", "iPhone Charger", "Electronics", 1599.00),
    ("P012", "Nivea Cream", "Beauty", 199.00),
    ("P013", "Britannia Cake", "Grocery", 45.00),
    ("P014", "USB Type-C Cable", "Electronics", 299.00),
    ("P015", "Himalaya Shampoo", "Beauty", 179.00),
    ("P016", "Colgate Paste", "Grocery", 98.00),
    ("P017", "Kellogg’s Cornflakes", "Grocery", 299.00),
    ("P018", "Dabur Honey", "Grocery", 199.00),
    ("P019", "Mi Power Bank", "Electronics", 999.00),
    ("P020", "Blue Heaven Kajal", "Beauty", 99.00),
] * 8  # ➜ This makes 160 rows


### 👥 2️⃣ 150 Records — raw_customers

In [0]:
raw_customers = [
    ("C001", "Akash", "Delhi"),
    ("C002", "Priya", "Chennai"),
    ("C003", "Ravi", "Mumbai"),
    ("C004", "Suresh", "Hyderabad"),
    ("C005", "Meena", "Bangalore"),
    ("C006", "Karthik", "Pune"),
    ("C007", "Aisha", "Kolkata"),
    ("C008", "Rahul", "Jaipur"),
    ("C009", "Sneha", "Nagpur"),
    ("C010", "Vikram", "Indore"),
    ("C011", "Harini", "Coimbatore"),
    ("C012", "Nitin", "Ahmedabad"),
    ("C013", "Tejas", "Surat"),
    ("C014", "Deepika", "Patna"),
    ("C015", "Gopal", "Lucknow"),
] * 10  # ➜ 150 customers


### 🧾 3️⃣ 150+ Records — raw_sales

⚡Generated with random-like pattern

⚡ Order dates: Jan 2024 – Mar 2024

⚡ Customers rotate from C001–C015

⚡ Products rotate P001–P020

⚡ Quantities vary 1–5

⚡ Prices are picked from product price list

In [0]:
raw_sales = []

import random
from datetime import datetime, timedelta

product_ids = ["P00" + str(i) if i < 10 else "P0" + str(i) for i in range(1, 21)]
customer_ids = ["C00" + str(i) if i < 10 else "C0" + str(i) for i in range(1, 16)]
base_date = datetime(2024, 1, 1)

prices = {
    "P001":199.50, "P002":399.00, "P003":149.00, "P004":249.00, "P005":1299.00,
    "P006":89.00,  "P007":35.00,  "P008":55.00,  "P009":25.00,  "P010":45.00,
    "P011":1599.00,"P012":199.00, "P013":45.00,  "P014":299.00, "P015":179.00,
    "P016":98.00,  "P017":299.00, "P018":199.00, "P019":999.00, "P020":99.00
}

order_id_counter = 1001

for i in range(160):  # ➜ 160 sales records
    cust = random.choice(customer_ids)
    prod = random.choice(product_ids)
    qty = random.randint(1, 5)
    price = prices.get(prod, 100.00)
    order_date = base_date + timedelta(days=random.randint(0, 75))

    raw_sales.append((
        str(order_id_counter),
        cust,
        prod,
        order_date.strftime("%Y-%m-%d"),
        qty,
        price
    ))

    order_id_counter += 1


### ✔️ Create DataFrames

In [0]:
sales_df = spark.createDataFrame(
    raw_sales,
    ["order_id","customer_id","product_id","order_date","quantity","price"]
)

products_df = spark.createDataFrame(
    raw_products,
    ["product_id","product_name","category","unit_price"]
)

customers_df = spark.createDataFrame(
    raw_customers,
    ["customer_id","customer_name","city"]
)


## Step 2: Save to Bronze (Delta Lake)

Raw → Delta format

In [0]:
sales_df.write.format("delta").mode("overwrite").save("/Volumes/workspace/default/bronze/sales")
products_df.write.format("delta").mode("overwrite").save("/Volumes/workspace/default/bronze/products")
customers_df.write.format("delta").mode("overwrite").save("/Volumes/workspace/default/bronze/customers")


## 🥈 SILVER LAYER (Cleaning + Joining)
### 🎯 Goal

Convert raw data → trusted data.

### ✔ What We Will Do

- Remove nulls

- Validate data types

- Remove duplicates

- Join sales with products

- Join sales with customers

- Enrich with product category & customer city

### Step 3: Read Bronze

In [0]:
bronze_sales = spark.read.format("delta").load("/Volumes/workspace/default/bronze/sales")
bronze_products = spark.read.format("delta").load("/Volumes/workspace/default/bronze/products")
bronze_customers = spark.read.format("delta").load("/Volumes/workspace/default/bronze/customers")


### Step 4: Clean + Join

In [0]:
from pyspark.sql.functions import *

silver_sales = bronze_sales \
    .dropDuplicates(["order_id"]) \
    .filter("quantity > 0") \
    .filter("price > 0") \
    .join(bronze_products, "product_id", "left") \
    .join(bronze_customers, "customer_id", "left") \
    .withColumn("total_amount", col("quantity") * col("unit_price"))


### Step 5: Save Silver Layer as Delta

In [0]:
silver_sales.write.format("delta").mode("overwrite").save("/Volumes/workspace/default/silver/sales")


**Silver Layer Output**

You now have a clean, validated, and enriched sales table with:

- customer name

- city

- product name

- category

- total amount

### 🥇 GOLD LAYER (Business Aggregations)
**Goal:**

Create business-ready KPIs for dashboards.

### Step 6: Load Silver Table

In [0]:
sales_silver = spark.read.format("delta").load("/Volumes/workspace/default/silver/sales")


## ⭐ Business KPIs
### 1. Daily Revenue

In [0]:
daily_revenue = sales_silver.groupBy("order_date").agg(sum("total_amount").alias("revenue"))


### 2. Revenue by Category

In [0]:
rev_category = sales_silver.groupBy("category").agg(sum("total_amount").alias("category_revenue"))


### 3. Top Cities by Sales

In [0]:
rev_city = sales_silver.groupBy("city").agg(sum("total_amount").alias("city_sales"))


### Step 7: Save Gold Layer

In [0]:
daily_revenue.write.format("delta").mode("overwrite").save("/Volumes/workspace/default/gold/daily_revenue")
rev_category.write.format("delta").mode("overwrite").save("/Volumes/workspace/default/gold/rev_category")
rev_city.write.format("delta").mode("overwrite").save("/Volumes/workspace/default/gold/rev_city")
