#### Fuel Price ETL Pipeline - Data Showcase
#### Bronze ‚Üí Silver ‚Üí Gold Architecture

This notebook demonstrates the data transformation at each layer of our ETL pipeline by **querying live PostgreSQL database**.

#### Pipeline Overview
```
RapidAPI ‚Üí Bronze (Raw JSON) ‚Üí Silver (Normalized) ‚Üí Gold (Analytics)
```

In [2]:
import pandas as pd
import psycopg2
from sqlalchemy import create_engine

# Database connection
DB_URL = "postgresql://postgres:postgres@localhost:5433/fuel_prices"
engine = create_engine(DB_URL)

print(" Connected to PostgreSQL database: fuel_prices")

 Connected to PostgreSQL database: fuel_prices


---
## 1Ô∏è‚É£ Raw API Data

This is the structure we receive from RapidAPI fuel prices endpoint:

In [3]:
# Sample API Response Structure
sample_api_response = {
    "cityId": "mumbai",
    "cityName": "Mumbai",
    "stateId": "maharashtra",
    "stateName": "Maharashtra",
    "history": [
        {
            "applicableOn": "2025-08-13",
            "fuel": {
                "petrol": {"retailPrice": 103.49, "retailPriceChange": 0.00},
                "diesel": {"retailPrice": 89.88, "retailPriceChange": 0.00},
                "lpg": {"retailPrice": 772.50, "retailPriceChange": 0.00},
                "cng": {"retailPrice": 75.00, "retailPriceChange": 0.00}
            }
        }
    ]
}
sample_api_response

{'cityId': 'mumbai',
 'cityName': 'Mumbai',
 'stateId': 'maharashtra',
 'stateName': 'Maharashtra',
 'history': [{'applicableOn': '2025-08-13',
   'fuel': {'petrol': {'retailPrice': 103.49, 'retailPriceChange': 0.0},
    'diesel': {'retailPrice': 89.88, 'retailPriceChange': 0.0},
    'lpg': {'retailPrice': 772.5, 'retailPriceChange': 0.0},
    'cng': {'retailPrice': 75.0, 'retailPriceChange': 0.0}}}]}

---
### 2Ô∏è‚É£ Bronze Layer (Raw Storage)

store the complete API response as **JSONB** in PostgreSQL.

**Purpose:** Store raw data as-is for audit trail and reprocessing

In [4]:
# Query Bronze Layer from PostgreSQL
bronze_query = """
SELECT 
    city_id,
    city_name,
    state_name,
    applicable_on,
    raw_data->'fuel'->'petrol'->>'retailPrice' as petrol_price
FROM bronze_fuel_prices
ORDER BY applicable_on DESC, city_id
LIMIT 10
"""

bronze_df = pd.read_sql(bronze_query, engine)
bronze_count = pd.read_sql("SELECT COUNT(*) as cnt FROM bronze_fuel_prices", engine)['cnt'][0]

print("üî∂ BRONZE LAYER - Raw Data Storage")
print("===================================")
print(f"Total Records: {bronze_count}")
print()
bronze_df.head()

üî∂ BRONZE LAYER - Raw Data Storage
Total Records: 150



Unnamed: 0,city_id,city_name,state_name,applicable_on,petrol_price
0,bengaluru,Bengaluru,Karnataka,2025-08-13,102.9
1,chennai,Chennai,Tamil Nadu,2025-08-13,100.79
2,delhi,Delhi,Delhi,2025-08-13,94.81
3,kolkata,Kolkata,West Bengal,2025-08-13,104.99
4,mumbai,Mumbai,Maharashtra,2025-08-13,103.49


5 cities √ó 30 days = 150 records
`raw_data->'fuel'->'petrol'->>'retailPrice'`

---
### Silver Layer (Transformed & Normalized)

flatten the nested JSON into a tabular format using SQL `jsonb_each()`.

**Purpose:** Query-ready normalized data for analysis

In [5]:
# Query Silver Layer from PostgreSQL
silver_query = """
SELECT 
    city_id,
    city_name,
    applicable_on,
    fuel_type,
    retail_price,
    price_change
FROM silver_fuel_prices
ORDER BY applicable_on DESC, city_id, fuel_type
LIMIT 12
"""

silver_df = pd.read_sql(silver_query, engine)
silver_count = pd.read_sql("SELECT COUNT(*) as cnt FROM silver_fuel_prices", engine)['cnt'][0]

print("SILVER LAYER - Normalized Data")
print("=================================")
print(f"Total Records: {silver_count}")
print()
silver_df.head(8)

SILVER LAYER - Normalized Data
Total Records: 600



Unnamed: 0,city_id,city_name,applicable_on,fuel_type,retail_price,price_change
0,bengaluru,Bengaluru,2025-08-13,cng,88.0,0.0
1,bengaluru,Bengaluru,2025-08-13,diesel,90.98,0.0
2,bengaluru,Bengaluru,2025-08-13,lpg,855.5,0.0
3,bengaluru,Bengaluru,2025-08-13,petrol,102.9,0.0
4,chennai,Chennai,2025-08-13,cng,90.5,2.0
5,chennai,Chennai,2025-08-13,diesel,92.38,0.0
6,chennai,Chennai,2025-08-13,lpg,868.5,0.0
7,chennai,Chennai,2025-08-13,petrol,100.79,0.0


### Key Observation:
- **1 row per city/date/fuel_type** = 150 Bronze rows √ó 4 fuel types = 600 records
- Nested JSON is **flattened into separate rows**
- Easy to query: `SELECT * FROM silver_fuel_prices WHERE fuel_type='petrol'`

In [6]:
# Analytics Query on Silver Layer
analytics_query = """
SELECT 
    fuel_type,
    ROUND(AVG(retail_price)::numeric, 2) as avg_price,
    MIN(retail_price) as min_price,
    MAX(retail_price) as max_price
FROM silver_fuel_prices
GROUP BY fuel_type
ORDER BY fuel_type
"""

print("Price Comparison by Fuel Type (from Silver Layer):")
pd.read_sql(analytics_query, engine)

Price Comparison by Fuel Type (from Silver Layer):


Unnamed: 0,fuel_type,avg_price,min_price,max_price
0,cng,84.62,76.59,90.5
1,diesel,90.58,87.71,92.38
2,lpg,861.7,852.5,879.0
3,petrol,101.4,94.81,104.99


---
### Gold Layer (Analytics Ready)

### Gold Dataset Description:
| Column | Type | Description |
|--------|------|-------------|
| state_name | VARCHAR | State for aggregation |
| fuel_type | VARCHAR | Fuel category |
| avg_price | DECIMAL | **State average** price |
| min_price | DECIMAL | Cheapest city in state |
| max_price | DECIMAL | Most expensive city |
| city_count | INT | Number of cities in state |

This is for model building.

In [7]:
# Query Gold Layer from PostgreSQL
gold_query = """
SELECT 
    state_name,
    fuel_type,
    avg_price,
    min_price,
    max_price,
    city_count
FROM gold_state_analytics
ORDER BY state_name, fuel_type
LIMIT 12
"""

gold_df = pd.read_sql(gold_query, engine)
gold_count = pd.read_sql("SELECT COUNT(*) as cnt FROM gold_state_analytics", engine)['cnt'][0]

print(" GOLD LAYER - Analytics Aggregations")
print("======================================")
print(f"Total Records: {gold_count}")
print()
gold_df.head(8)

 GOLD LAYER - Analytics Aggregations
Total Records: 600



Unnamed: 0,state_name,fuel_type,avg_price,min_price,max_price,city_count
0,Delhi,cng,76.59,76.59,76.59,1
1,Delhi,cng,76.59,76.59,76.59,1
2,Delhi,cng,76.59,76.59,76.59,1
3,Delhi,cng,76.59,76.59,76.59,1
4,Delhi,cng,76.59,76.59,76.59,1
5,Delhi,cng,76.59,76.59,76.59,1
6,Delhi,cng,76.59,76.59,76.59,1
7,Delhi,cng,76.59,76.59,76.59,1


---
## Summary: Record Counts Across Layers

In [8]:
# Summary Query
summary_query = """
SELECT 'Bronze' as layer, COUNT(*) as records FROM bronze_fuel_prices
UNION ALL
SELECT 'Silver', COUNT(*) FROM silver_fuel_prices
UNION ALL
SELECT 'Gold', COUNT(*) FROM gold_state_analytics
"""

summary_df = pd.read_sql(summary_query, engine)
summary_df['description'] = [
    '5 cities √ó 30 days (raw JSONB)',
    '150 √ó 4 fuel types (normalized)',
    'State-level aggregations'
]

print(" ETL Pipeline Summary")
print("=======================")
summary_df

 ETL Pipeline Summary


Unnamed: 0,layer,records,description
0,Bronze,150,5 cities √ó 30 days (raw JSONB)
1,Silver,600,150 √ó 4 fuel types (normalized)
2,Gold,600,State-level aggregations


---
## üõ†Ô∏è Technologies Used

- **Apache Airflow** (Astronomer) - Orchestration
- **PostgreSQL** - Data storage with JSONB support
- **Python** - ETL logic
- **MLflow** - Experiment tracking
- **Docker** - Containerization