# **# Step 1:Create Raw Data in Datbricks**

## **150 Records=raw_products**

In [0]:
raw_products = [
    ("P001", "Dove Shampoo", "Beauty", 199.50),
    ("P002", "Samsung Earbuds", "Electronics", 399.00),
    ("P003", "Dell Mouse", "Electronics", 149.00),
    ("P004", "Lakme Face Wash", "Beauty", 249.00),
    ("P005", "Boat Headphones", "Electronics", 1299.00),
    ("P006", "Pepsodent Toothpaste", "Grocery", 89.00),
    ("P007", "Sunfeast Biscuits", "Grocery", 35.00),
    ("P008", "Amul Butter", "Grocery", 55.00),
    ("P009", "Parle-G Biscuits", "Grocery", 25.00),
    ("P010", "Tata Salt", "Grocery", 45.00),
    ("P011", "iPhone Charger", "Electronics", 1599.00),
    ("P012", "Nivea Cream", "Beauty", 199.00),
    ("P013", "Britannia Cake", "Grocery", 45.00),
    ("P014", "USB Type-C Cable", "Electronics", 299.00),
    ("P015", "Himalaya Shampoo", "Beauty", 179.00),
    ("P016", "Colgate Paste", "Grocery", 98.00),
    ("P017", "Kelloggâ€™s Cornflakes", "Grocery", 299.00),
    ("P018", "Dabur Honey", "Grocery", 199.00),
    ("P019", "Mi Power Bank", "Electronics", 999.00),
    ("P020", "Blue Heaven Kajal", "Beauty", 99.00),
] * 8

##  **150 Records-Raw_customer**

In [0]:

raw_customers = [
    ("C001", "Akash", "Delhi"),
    ("C002", "Priya", "Chennai"),
    ("C003", "Ravi", "Mumbai"),
    ("C004", "Suresh", "Hyderabad"),
    ("C005", "Meena", "Bangalore"),
    ("C006", "Karthik", "Pune"),
    ("C007", "Aisha", "Kolkata"),
    ("C008", "Rahul", "Jaipur"),
    ("C009", "Sneha", "Nagpur"),
    ("C010", "Vikram", "Indore"),
    ("C011", "Harini", "Coimbatore"),
    ("C012", "Nitin", "Ahmedabad"),
    ("C013", "Tejas", "Surat"),
    ("C014", "Deepika", "Patna"),
    ("C015", "Gopal", "Lucknow"),
] * 10

## **150+Record-raw_sales**

In [0]:
raw_sales=[]
import random
from datetime import datetime,timedelta
product_ids=["P00"+str(i) if i<10 else "P0"+str(i) for i in range(1,21)]
customer_ids=["C00"+str(i) if i<10 else "C0" +str(i) for i in range(1,16)]
base_date=datetime(2024,1,1)

prices={
    "P001":199.50, "P002":399.00, "P003":149.00, "P004":249.00, "P005":1299.00,
    "P006":89.00,  "P007":35.00,  "P008":55.00,  "P009":25.00,  "P010":45.00,
    "P011":1599.00,"P012":199.00, "P013":45.00,  "P014":299.00, "P015":179.00,
    "P016":98.00,  "P017":299.00, "P018":199.00, "P019":999.00, "P020":99.00
}

order_id_counter=1001

for i in range(160):
    cust=random.choice(customer_ids)
    prod=random.choice(product_ids)
    qty=random.randint(1,5)
    price=prices.get(prod,100.00)
    order_date=base_date+timedelta(days=random.randint(0,75))

    raw_sales.append((
        str(order_id_counter),
        cust,
        prod,
        order_date.strftime("%Y-%m-%d"),
        qty,
        price
    ))

    order_id_counter+=1

## ** Create Dataframe**

In [0]:
columns=["order_id","customer_id","product_id","order_date","quantity","price"]
sales_df=spark.createDataFrame(raw_sales,columns)

product_df=spark.createDataFrame(raw_products,["product_id","product_name","category","unit_price"])
customer_df=spark.createDataFrame(raw_customers,["customer_id","customer_name","city"])

# **Step 2:Save to bronze**

In [0]:
sales_df.write.format("delta").mode("overwrite").save("/Volumes/workspace/default/secondvoulmn/bronze/sales")
product_df.write.format("delta").mode("overwrite").save("/Volumes/workspace/default/secondvoulmn/bronze/products")
customer_df.write.format("delta").mode("overwrite").save("/Volumes/workspace/default/secondvoulmn/bronze/customers")

# **Step 3:Read Bronze**

In [0]:
bronze_sales=spark.read.format("delta").load("/Volumes/workspace/default/secondvoulmn/bronze/sales")
bronze_products=spark.read.format("delta").load("/Volumes/workspace/default/secondvoulmn/bronze/products")
bronze_customer=spark.read.format("delta").load("/Volumes/workspace/default/secondvoulmn/bronze/customers")

# **Step 4:Clean+Join**

In [0]:
from pyspark.sql.functions import *
silver_sales=bronze_sales \
    .dropDuplicates(["order_id"]) \
    .filter("quantity>0") \
    .filter("price>0") \
    .join(bronze_products,"product_id","left")  \
    .join(bronze_customer,"customer_id","left") \
    .withColumn("total_amount",col("price")*col("unit_price"))         

#**Step 5:Save Silver Layer as Delta**

In [0]:
silver_sales.write.format("delta").mode("overwrite").save("/Volumes/workspace/default/secondvoulmn/silver_sales")

# **Step 6:Load Silver Table**

In [0]:
sales_silver=spark.read.format("delta").load("/Volumes/workspace/default/secondvoulmn/silver_sales")

# **Step 7:Find Business KPI**

## **1.Daily Revenue**

In [0]:
daily_revenue=sales_silver.groupBy("order_date").agg(sum("total_amount").alias("revenue"))

## **2.Revenue by Category**

In [0]:
rev_category=sales_silver.groupBy("category").agg(sum("total_amount").alias("category_revenue"))

In [0]:
rev_city=sales_silver.groupBy("city").agg(sum("total_amount").alias("city_sales"))

In [0]:
daily_revenue.write.format("delta").mode("overwrite").save("/Volumes/workspace/default/secondvoulmn/gold/daily_revenue")

rev_category.write.format("delta").mode("overwrite").save("/Volumes/workspace/default/secondvoulmn/gold/rev_category")

rev_city.write.format("delta").mode("overwrite").save("/Volumes/workspace/default/secondvoulmn/gold/rev_city")

In [0]:
display(daily_revenue)
display(rev_category)
display(rev_city)

order_date,revenue
2024-01-10,768320.0
2024-02-04,8351760.0
2024-01-29,204706080.0
2024-02-20,7556080.0
2024-02-22,10336100.0
2024-03-10,212000.0
2024-02-21,82384480.0
2024-01-11,484000.0
2024-03-06,143920240.0
2024-03-04,7152080.0


Databricks visualization. Run in Databricks to view.

category,category_revenue
Beauty,107865140.0
Grocery,124010080.0
Electronics,3048947360.0


Databricks visualization. Run in Databricks to view.

city,city_sales
Lucknow,6480800.0
Kolkata,101876420.0
Coimbatore,45447920.0
Bangalore,280984260.0
Chennai,256904000.0
Jaipur,154582720.0
Delhi,29763520.0
Nagpur,523538000.0
Ahmedabad,402210740.0
Patna,39168100.0


Databricks visualization. Run in Databricks to view.