In [1]:
# ==============================================
# AtmosTrack ETL – Transform Step (Notebook)
# ==============================================

# 1️ Imports
import json
from pathlib import Path
import pandas as pd
from datetime import datetime

# 2️ Paths
RAW_DIR = Path("data/raw")
STAGED_DIR = Path("data/staged")
STAGED_DIR.mkdir(parents=True, exist_ok=True)
TRANSFORMED_FILE = STAGED_DIR / "air_quality_transformed.csv"

# 3️ AQI category function
def pm25_to_aqi_category(pm25: float) -> str:
    if pm25 <= 50:
        return "Good"
    elif pm25 <= 100:
        return "Moderate"
    elif pm25 <= 200:
        return "Unhealthy"
    elif pm25 <= 300:
        return "Very Unhealthy"
    else:
        return "Hazardous"

# 4️ Load raw JSONs and flatten
all_records = []

for file in RAW_DIR.glob("*_raw_*.json"):
    city = file.stem.split("_raw_")[0].replace("_", " ").title()
    
    with open(file, "r", encoding="utf-8") as f:
        data = json.load(f)
    
    # Open-Meteo API structure: data['hourly'] contains pollutant lists
    hourly = data.get("hourly", {})
    times = hourly.get("time", [])
    
    # Skip if no hourly data
    if not times:
        continue
    
    for i, t in enumerate(times):
        record = {
            "city": city,
            "time": pd.to_datetime(t),
            "pm10": pd.to_numeric(hourly.get("pm10", [None]*len(times))[i], errors="coerce"),
            "pm2_5": pd.to_numeric(hourly.get("pm2_5", [None]*len(times))[i], errors="coerce"),
            "carbon_monoxide": pd.to_numeric(hourly.get("carbon_monoxide", [None]*len(times))[i], errors="coerce"),
            "nitrogen_dioxide": pd.to_numeric(hourly.get("nitrogen_dioxide", [None]*len(times))[i], errors="coerce"),
            "sulphur_dioxide": pd.to_numeric(hourly.get("sulphur_dioxide", [None]*len(times))[i], errors="coerce"),
            "ozone": pd.to_numeric(hourly.get("ozone", [None]*len(times))[i], errors="coerce"),
            "uv_index": pd.to_numeric(hourly.get("uv_index", [None]*len(times))[i], errors="coerce")
        }
        all_records.append(record)

# 5️ Create DataFrame
df = pd.DataFrame(all_records)
print(f"Loaded {len(df)} records from raw JSONs")

# 6️ Remove rows where all pollutant readings are missing
pollutants = ["pm10","pm2_5","carbon_monoxide","nitrogen_dioxide","sulphur_dioxide","ozone","uv_index"]
df.dropna(subset=pollutants, how="all", inplace=True)
print(f"After dropping empty rows: {len(df)} records remain")

# 7️ Feature Engineering
# AQI Category
df["aqi_category"] = df["pm2_5"].apply(pm25_to_aqi_category)

# Severity Score
df["severity"] = (
    df["pm2_5"]*5 + df["pm10"]*3 + 
    df["nitrogen_dioxide"]*4 + df["sulphur_dioxide"]*4 +
    df["carbon_monoxide"]*2 + df["ozone"]*3
)

# Risk Classification
def severity_to_risk(sev):
    if sev > 400:
        return "High Risk"
    elif sev > 200:
        return "Moderate Risk"
    else:
        return "Low Risk"

df["risk"] = df["severity"].apply(severity_to_risk)

# Hour-of-day
df["hour"] = df["time"].dt.hour

# 8️ Save transformed CSV
df.to_csv(TRANSFORMED_FILE, index=False)
print(f"Transformed data saved to {TRANSFORMED_FILE}")

# 9️ Quick check
df.head()


Loaded 600 records from raw JSONs
After dropping empty rows: 545 records remain
Transformed data saved to data\staged\air_quality_transformed.csv


Unnamed: 0,city,time,pm10,pm2_5,carbon_monoxide,nitrogen_dioxide,sulphur_dioxide,ozone,uv_index,aqi_category,severity,risk,hour
0,Bengaluru,2025-12-11 00:00:00,53.3,50.9,363.0,6.2,16.7,119.0,0.0,Moderate,1589.0,High Risk,0
1,Bengaluru,2025-12-11 01:00:00,56.2,53.9,485.0,10.2,18.2,120.0,0.0,Moderate,1881.7,High Risk,1
2,Bengaluru,2025-12-11 02:00:00,59.8,57.5,649.0,15.5,19.9,120.0,0.45,Moderate,2266.5,High Risk,2
3,Bengaluru,2025-12-11 03:00:00,60.8,58.2,734.0,18.0,20.4,123.0,1.85,Moderate,2464.0,High Risk,3
4,Bengaluru,2025-12-11 04:00:00,56.7,53.4,662.0,14.6,18.9,135.0,4.15,Moderate,2300.1,High Risk,4


In [None]:
"""
transform.py

Transform step for AtmosTrack Multi-City AQI ETL Pipeline.

- Flattens hourly JSON into tabular format
- Computes AQI category, severity score, and risk classification
- Saves combined CSV to data/staged/air_quality_transformed.csv
"""

import json
from pathlib import Path
import pandas as pd

# ------------------------------
# Directories
# ------------------------------
RAW_DIR = Path("data/raw/")
STAGED_DIR = Path("data/staged/")
STAGED_DIR.mkdir(parents=True, exist_ok=True)
STAGED_FILE = STAGED_DIR / "air_quality_transformed.csv"

# ------------------------------
# Feature Engineering
# ------------------------------
def compute_aqi(pm2_5):
    if pm2_5 <= 50:
        return "Good"
    elif pm2_5 <= 100:
        return "Moderate"
    elif pm2_5 <= 200:
        return "Unhealthy"
    elif pm2_5 <= 300:
        return "Very Unhealthy"
    else:
        return "Hazardous"

def compute_severity(row):
    return (
        row.get("pm2_5", 0) * 5 +
        row.get("pm10", 0) * 3 +
        row.get("nitrogen_dioxide", 0) * 4 +
        row.get("sulphur_dioxide", 0) * 4 +
        row.get("carbon_monoxide", 0) * 2 +
        row.get("ozone", 0) * 3
    )

def classify_risk(severity):
    if severity > 400:
        return "High Risk"
    elif severity > 200:
        return "Moderate Risk"
    else:
        return "Low Risk"

# ------------------------------
# Transform single city file
# ------------------------------
def transform_city_file(file_path: Path) -> pd.DataFrame:
    with open(file_path, "r", encoding="utf-8") as f:
        data = json.load(f)

    city_name = file_path.stem.split("_raw_")[0]
    hourly = data.get("hourly", {})

    if not hourly:
        return pd.DataFrame()

    df = pd.DataFrame(hourly)
    if df.empty:
        return pd.DataFrame()

    df["city"] = city_name

    # Convert numeric columns
    for col in ["pm10","pm2_5","carbon_monoxide","nitrogen_dioxide","ozone","sulphur_dioxide","uv_index"]:
        df[col] = pd.to_numeric(df.get(col, 0), errors="coerce")

    # Drop rows where all pollutants are missing
    df = df.dropna(subset=["pm10","pm2_5","carbon_monoxide","nitrogen_dioxide","ozone","sulphur_dioxide"], how="all")

    # Convert time to datetime and extract hour
    df["time"] = pd.to_datetime(df["time"], errors="coerce")
    df["hour"] = df["time"].dt.hour

    # Feature engineering
    df["AQI_category"] = df["pm2_5"].apply(compute_aqi)
    df["severity"] = df.apply(compute_severity, axis=1)
    df["risk"] = df["severity"].apply(classify_risk)

    return df

# ------------------------------
# Transform all files
# ------------------------------
def transform_all():
    files = list(RAW_DIR.glob("*.json"))
    all_dfs = []

    for f in files:
        df = transform_city_file(f)
        if not df.empty:
            all_dfs.append(df)

    if all_dfs:
        final_df = pd.concat(all_dfs, ignore_index=True)
        final_df.to_csv(STAGED_FILE, index=False)
        print(f"✅ Transformed data saved to {STAGED_FILE}")
        return final_df
    else:
        print("⚠️ No data transformed. Check raw files.")
        return pd.DataFrame()

# ------------------------------
# Main
# ------------------------------
if __name__ == "__main__":
    print("Starting transform step...")
    transform_all()
    print("Transform step completed.")
