In [0]:
# project files path 
BASE_PATH = "/Volumes/workspace/bronze/raw_files/ETL_PROJECT"


# read the all files

In [0]:
# read files 
customers_df = (
    spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv(f"{BASE_PATH}/olist_customers_dataset.csv")
)



In [0]:
orders_df = (
    spark.read
    .option("header","true")
    .option("inferSchema","true")
    .csv(f"{BASE_PATH}/olist_orders_dataset.csv")
)

In [0]:
order_items_df = (
    spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv(f"{BASE_PATH}/olist_order_items_dataset.csv")
)


In [0]:
payments_df = (
    spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv(f"{BASE_PATH}/olist_order_payments_dataset.csv")
)


## SAVE AS BRONZE DELTA TABLES

In [0]:
# Customers Bronze Table
customers_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("bronze.customers_bronze")


In [0]:
# Orders Bronze Table 
orders_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("bronze.orders_bronze")


In [0]:
# Order Items Bronze Table 
order_items_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("bronze.order_items_bronze")


In [0]:
# Payments Bronze Table 
payments_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("bronze.payments_bronze")



🧠 WHAT YOU JUST DID (VERY IMPORTANT)
You converted:

java
Copy code
CSV files (raw)
        ↓
Spark DataFrames
        ↓
Delta tables (Bronze layer)
This is REAL ETL work, not theory.



In [0]:
%sql
SHOW TABLES IN bronze;


database,tableName,isTemporary
bronze,customers_bronze,False
bronze,order_items_bronze,False
bronze,orders_bronze,False
bronze,payments_bronze,False


🥈 SILVER LAYER — CLEAN & TRANSFORM DATA (STEP BY STEP)
        Silver layer = cleaned + standardized data

🔹 STEP 0 — Read Bronze Tables (START POINT)

In [0]:
customers_bronze_df = spark.read.table("bronze.customers_bronze")
orders_bronze_df = spark.read.table("bronze.orders_bronze")
payments_bronze_df = spark.read.table("bronze.payments_bronze")
order_items_bronze_df = spark.read.table("bronze.order_items_bronze")


# metad data infection

In [0]:
display(customers_bronze_df.count())

99441

In [0]:
customers_bronze_df.dtypes


[('customer_id', 'string'),
 ('customer_unique_id', 'string'),
 ('customer_zip_code_prefix', 'int'),
 ('customer_city', 'string'),
 ('customer_state', 'string')]

In [0]:
display(
  customers_bronze_df.summary()
)

summary,customer_id,customer_unique_id,customer_zip_code_prefix,customer_city,customer_state
count,99441,99441,99441.0,99441,99441
mean,,,35137.47458291851,,
stddev,,,29797.93899620617,,
min,00012a2ce6f8dcda20d059ce98491703,0000366f3b9a7992bf8c76cfdf3221e2,1003.0,abadia dos dourados,AC
25%,,,11347.0,,
50%,,,24416.0,,
75%,,,58884.0,,
max,ffffe8b65bbe3087b653a978c870db99,ffffd2657e2aad2907e67c3e9daecbeb,99990.0,zortea,TO


In [0]:
print(f"Rows: {customers_bronze_df.count()}, Columns: {len(customers_bronze_df.columns)}")

Rows: 99441, Columns: 5


1️⃣ CUSTOMERS — SILVER CLEANING

In [0]:
# Step 1 — Remove duplicates 
customers_clean_df = customers_bronze_df.dropDuplicates(['customer_id'])


# handdel null values 
customers_clean_df = customers_clean_df.fillna(
    {
        "customer_city":"UNKNOW",
        "Customer_state" : "UNKNOWN"
    }
)

# save to silver 
customers_clean_df.write\
    .format("delta")\
    .mode("overwrite")\
    .saveAsTable("silver.customers")



2️⃣ ORDERS — SILVER CLEANING 

In [0]:
# Convert date columns 
from pyspark.sql.functions import to_date 
orders_clean_df = orders_bronze_df \
  .withColumn("order_purchaes_timestamp",to_date("order_purchase_timestamp"))

# Step 2 — Remove duplicates 
orders_clean_df = orders_clean_df.dropDuplicates(["order_id"])

# Step 3 — Filter invalid orders  

orders_clean_df = orders_clean_df.filter("order_id IS NOT NULL")

# Step 4 — Save to Silver 
orders_clean_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("silver.orders")


💳 3️⃣ PAYMENTS — SILVER CLEANING

In [0]:
#  Step 1 — Handle null payment values
payments_clean_df = payments_bronze_df.fillna({
  "payment_value":0
})

# to  remove the negative payments 
payments_clean_df = payments_clean_df.filter("payment_value >= 0")

# save to silver 
payments_clean_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("silver.payments")


🧺 4️⃣ ORDER ITEMS — SILVER CLEANING

In [0]:
# remove duplicates 
order_items_clean_df = order_items_bronze_df.dropDuplicates(
    ["order_id", "order_item_id"]
)

# handle null prices 
order_items_clean_df = order_items_clean_df.fillna({
  "price":0,
  "freight_value": 0 
})

# save to silver 
order_items_clean_df.write\
  .format("delta")\
  .mode("overwrite")\
  .saveAsTable("silver.order_items")

In [0]:
%sql
--FINAL DATA QUALITY CHECKS (SILVER)
show tables in silver;

database,tableName,isTemporary
silver,customers,False
silver,order_items,False
silver,orders,False
silver,payments,False


# GOLD LAYER — the final, business-ready layer.
    Gold = aggregated + joined + business metrics

In [0]:
# STEP 0 — READ SILVER TABLES 
customers_df = spark.read.table("silver.customers")
orders_df = spark.read.table("silver.orders")
payments_df = spark.read.table("silver.payments")
order_items_df = spark.read.table("silver.order_items")


In [0]:
# GOLD TABLE 1 — CUSTOMER ORDER SUMMARY
'''
Business Question:

How many orders and how much money each customer spent?
'''
# step1 - join orders + payments 
orders_payments_df = orders_df.join(
  payments_df,
  on="order_id",
  how = "inner"
)
# JOIN WITH CUSTOMERS 
customers_orders_df = orders_payments_df.join(
  customers_df,
  on="customer_id",
  how = "inner"
)

from pyspark.sql.functions import count, sum
# aggregate 
customer_summary_df = (
  customers_orders_df
  .groupBy("customer_id","customer_city","customer_state")
  .agg(
    count("order_id").alias("total_orders"),
    sum("payment_value").alias("total_spent")
  )
)
# save to gold table 
customer_summary_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("gold.customer_order_summary")




# 📦 GOLD TABLE 2 — DAILY SALES SUMMARY

In [0]:
'''
  Business Question:

How much sales happened per day?
'''

# join order+payments 
daily_sales_df = orders_df .join(
  payments_df,
  on="order_id",
  how="inner"
)

# GROUP BY DATE 

from pyspark.sql.functions import to_date , sum,round,col

daily_sales_df = (
  daily_sales_df
  .withColumn("order_date",to_date("order_purchase_timestamp"))
  .groupBy("order_date")
  .agg(
    round(sum(col("payment_value")),2).alias("daily_sales")
  )
  .orderBy("order_date")
)

# SAVE GOLD TABLE 
daily_sales_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("gold.daily_sales")


# GOLD TABLE 3 — PRODUCT SALES SUMMARY

In [0]:
'''
  Business Question:

Which products generate more revenue?
'''

from pyspark.sql.functions import sum,col,round,desc

product_sales_df = (
  order_items_df
  .groupBy("product_id")
  .agg(
   round(sum(col("price")),2).alias("total_product_sales")
  )
  .orderBy(desc("total_product_sales"))

)
# SAVE GOLD TABLE 
product_sales_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("gold.product_sales")




In [0]:
%sql
SHOW TABLES IN gold;


database,tableName,isTemporary
gold,customer_order_summary,False
gold,daily_sales,False
gold,product_sales,False


%md
# 🧭 DASHBOARD – FIRST PLAN (BEFORE CODING)

### 🎯 Dashboard Purpose (Plain English)
To understand customer value, sales performance, and retention for an e-commerce business.  
This dashboard should help answer:
* Are we growing revenue?
* Are customers coming back?
* Who are our valuable customers?

---

### 🗂️ TABLES YOU WILL USE (FROM YOUR PROJECT)
You already created these in the **Gold layer**:

| Gold Table | Use in Dashboard |
| :--- | :--- |
| `gold.customer_order_summary` | Customer KPIs (AOV, CLV, frequency) |
| `gold.daily_sales` | Sales trends over time |
| `gold.product_sales` | Product performance |
| `silver.orders` | Retention & repeat customers |
| `silver.customers` | Total customers count |

> **👉 Rule:** > **KPIs** → Gold tables  
> **Supporting logic (retention)** → Silver tables

---

### 📊 KPIs TO SHOW (CORE METRICS)
These are perfect KPIs for your dashboard 👇

| KPI | Business Question | Table Used |
| :--- | :--- | :--- |
| **Total Revenue** | How much did we earn? | `gold.customer_order_summary` |
| **Total Orders** | How many orders placed? | `silver.orders` |
| **Total Customers** | How many customers? | `silver.customers` |
| **AOV** | How much is one order worth? | `gold.customer_order_summary` |
| **Purchase Frequency** | How often customers buy? | `gold.customer_order_summary` |
| **CLV** | How valuable a customer is? | `gold.customer_order_summary` |
| **Retention Rate** | Do customers return? | `silver.orders` |

> **👉 Note:** These KPIs will be shown as **big number cards** at the top of your Power BI or Databricks Dashboard.

# DASHBOARD IMPLEMENTATION
  Rule we follow

Spark → prepare data

Pandas → calculate KPIs

Plotly → visualize


In [0]:
# STEP 1 — LOAD GOLD & SILVER TABLES INTO PANDAS
''' Spark is great for big data
Plotly works best with Pandas'''

# Read Spark tables 

# Gold tables
customer_summary_spark = spark.read.table("gold.customer_order_summary")
daily_sales_spark = spark.read.table("gold.daily_sales")
product_sales_spark = spark.read.table("gold.product_sales")

# Silver tables (for retention logic)
orders_spark = spark.read.table("silver.orders")
customers_spark = spark.read.table("silver.customers")


In [0]:
# ================================
# FORCE PANDAS CONVERSION (ONCE)
# ================================

customer_summary = customer_summary_spark.toPandas()
daily_sales = daily_sales_spark.toPandas()
product_sales = product_sales_spark.toPandas()

orders = orders_spark.select("order_id", "customer_id").toPandas()
customers = customers_spark.select("customer_id").toPandas()


## CALCULATE BUSINESS KPIs (CLEAN & SIMPLE)

In [0]:
# ================================
# KPI RECALCULATION (PANDAS ONLY)
# ================================

# Total Revenue
total_revenue = customer_filtered["total_spent"].sum()

# Total Orders
total_orders = orders["order_id"].nunique()

# Total Customers
total_customers = customer_filtered["customer_id"].nunique()

# AOV
aov = total_revenue / total_orders if total_orders > 0 else 0

# Purchase Frequency
purchase_frequency = total_orders / total_customers if total_customers > 0 else 0

# Retention Rate
orders_per_customer = (
    orders
    .groupby("customer_id")
    .agg(order_count=("customer_id", "count"))
    .reset_index()
)

repeat_customers = orders_per_customer[orders_per_customer["order_count"] > 1]

retention_rate = (
    (len(repeat_customers) / total_customers) * 100
    if total_customers > 0 else 0
)

# CLV
lifespan = 1
clv = aov * purchase_frequency * lifespan


In [0]:
import builtins
kpis = {
    "Total Revenue": builtins.round(total_revenue, 2),
    "Total Orders": total_orders,
    "Total Customers": total_customers,
    "AOV": builtins.round(aov, 2),
    "Purchase Frequency": builtins.round(purchase_frequency, 2),
    "Retention Rate (%)": builtins.round(retention_rate, 2),
    "CLV": builtins.round(clv, 2)
}


display(kpis)


{'Total Revenue': np.float64(16008872.12),
 'Total Orders': 99441,
 'Total Customers': 99440,
 'AOV': np.float64(160.99),
 'Purchase Frequency': 1.0,
 'Retention Rate (%)': 0.0,
 'CLV': np.float64(160.99)}

In [0]:
# Daily Sales Line Chart (Plotly) 
fig_sales = px.line(
    daily_sales,
    x="order_date",
    y = "daily_sales",
    title = "Daily Sales Trend",
    markers = True
)
fig_sales.show()

In [0]:
product_sales
product_sales.head()



Unnamed: 0,product_id,total_product_sales
0,bb50f2e236e5eea0100680137654686c,63885.0
1,6cdd53843498f92890544667809f1595,54730.2
2,d6160fb7873f184099d9bc95e30376af,48899.34
3,d1c427060a0f73f6b889a5c7c61f2ac4,47214.51
4,99a4788cb24856965c36a24e339b6058,43025.56


Which products are generating the most revenue?

In [0]:
# SORT PRODUCTS BY SALES 
product_sales_sorted = (
    product_sales
    .sort_values(by=["total_product_sales"],ascending=False)
)

# top 10 products 
top_products = product_sales_sorted.head(10)

# CREATE BAR CHART
import plotly.express as px

fig_products = px.bar(
    top_products,
    x="total_product_sales",
    y="product_id",
    orientation="h",
    title="Top 10 Products by Revenue",
    labels={
        "total_product_sales":" Total Sales",
        "Product_id": " Product ID"
    }
)

fig_products.show()


Which states generate the most revenue?

In [0]:
# gold.customer_order_summary
customer_summary
customer_summary.head()


Unnamed: 0,customer_id,customer_city,customer_state,total_orders,total_spent
0,e51234890e1956bd1cf29cbbbbb42171,brasilia,DF,1,83.13
1,9f92375d50ab60bcbfcf969aa156d455,fortaleza,CE,1,54.04
2,55b0a777baab080147369bfa492dbaf6,limeira,SP,1,111.35
3,ea9eace2850dd109fdf9443e88092b44,olinda,PE,1,70.65
4,c8a0b7a3747bdf0764e839a4d1ec1d20,santo andre,SP,1,115.94


In [0]:
# AGGREGATE REVENUE BY STATE and sortvalues 
state_revenue = (
    customer_summary
    .groupby(by=["customer_state"])
    .agg(
        total_revenue = ("total_spent","sum")
    )
    .sort_values(by=["total_revenue"],ascending=[False])
    .reset_index()
)

top_states = state_revenue.head(10)

# Create bar chart 
import plotly.express as px 

fig_state = px.bar(
    top_states,
    x="customer_state",
    y="total_revenue",
    title= "Top 10 States by Revenue",
    labels={
        "customer_state":"state",
        "total_revenue" : "total Revenue"
    }
)

fig_state.show()

What percentage of customers are repeat customers vs one-time customers?

In [0]:
orders_per_customer


Unnamed: 0,customer_id,order_count
0,00012a2ce6f8dcda20d059ce98491703,1
1,000161a058600d5901f007fab4c27140,1
2,0001fd6190edaaf884bcaf3d49edf079,1
3,0002414f95344307404f0ace7a26f1d5,1
4,000379cdec625522490c315e70c7a9fb,1
...,...,...
99436,fffecc9f79fd8c764f843e9951b11341,1
99437,fffeda5b6d849fbd39689bb92087f431,1
99438,ffff42319e9b2d713724ae527742af25,1
99439,ffffa3172527f765de70084a7e53aae8,1


In [0]:
# CLASSIFY CUSTOMERS 
# Repeat customer → order_count > 1

# One-time customer → order_count = 1

orders_per_customer["customer_type"] = orders_per_customer["order_count"].apply( lambda x :  "repeted customer" if x>1  else "One-time customer")

# COUNT EACH CUSTOMER TYPE 
retention_summary = (
    orders_per_customer
    .groupby("customer_type")
    .agg(customer_count =("customer_id","count"))
    .reset_index()
)

# CREATE DONUT CHART 
import plotly.express as px

fig_retention = px.pie(
    retention_summary,
    names="customer_type",
    values="customer_count",
    hole=0.5,
    title="Customer Retention: Repeat vs One-time"
)

fig_retention.show()


In [0]:
import builtins

kpis = {
    "Total Revenue": builtins.round(total_revenue, 2),
    "Total Orders": total_orders,
    "Total Customers": total_customers,
    "AOV": builtins.round(aov, 2),
    "Retention Rate (%)": builtins.round(retention_rate, 2),
    "CLV": builtins.round(clv, 2)
}

kpis


{'Total Revenue': np.float64(16008872.12),
 'Total Orders': 99441,
 'Total Customers': 99440,
 'AOV': np.float64(160.99),
 'Retention Rate (%)': 0.0,
 'CLV': np.float64(160.99)}

In [0]:
# CREATE KPI CARDS (PLOTLY INDICATORS)

import plotly.graph_objects as go

fig_kpis = go.Figure()

fig_kpis.add_trace(go.Indicator(
    mode="number",
    value=kpis["Total Revenue"],
    title={"text": "Total Revenue"},
    domain={"row": 0, "column": 0}
))

fig_kpis.add_trace(go.Indicator(
    mode="number",
    value=kpis["Total Orders"],
    title={"text": "Total Orders"},
    domain={"row": 0, "column": 1}
))

fig_kpis.add_trace(go.Indicator(
    mode="number",
    value=kpis["Total Customers"],
    title={"text": "Total Customers"},
    domain={"row": 0, "column": 2}
))

fig_kpis.add_trace(go.Indicator(
    mode="number",
    value=kpis["AOV"],
    title={"text": "AOV"},
    domain={"row": 1, "column": 0}
))

fig_kpis.add_trace(go.Indicator(
    mode="number",
    value=kpis["Retention Rate (%)"],
    title={"text": "Retention Rate (%)"},
    domain={"row": 1, "column": 1}
))

fig_kpis.add_trace(go.Indicator(
    mode="number",
    value=kpis["CLV"],
    title={"text": "CLV"},
    domain={"row": 1, "column": 2}
))

fig_kpis.update_layout(
    grid={"rows": 2, "columns": 3, "pattern": "independent"},
    title="E-Commerce Business KPI Summary"
)

fig_kpis.show()
