<a href="https://colab.research.google.com/github/ArunK-ML/Project---Live-PM2.5-Nowcast-and-Forecast---Final-Project/blob/main/PM_2_0_BEST_APPROCH.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **__init__**

In [1]:
import os
import joblib
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.metrics import mean_absolute_error

from src.preprocessing.preprocessing import clean_data
from src.features.features import feature_pipeline
from src.anomaly.anomaly import detect_anomalies

import requests
import pandas as pd
from datetime import datetime, timedelta

from sklearn.ensemble import IsolationForest
import pandas as pd
import numpy as np



# init

"""
src package for PM2.5 Forecasting Project
-----------------------------------------
Contains all modules for:
- Data fetching (OpenAQ / Open-Meteo)
- Preprocessing & feature engineering
- Anomaly detection
- Forecasting (recursive)
- Model training
"""
__all__ = ["fetch", "preprocessing", "features", "anomaly", "forecasting", "train"]


# fetch


def fetch_openaq(city="Delhi", hours=72):
    """Fetch PM2.5 data from OpenAQ API."""
    end = datetime.utcnow()
    start = end - timedelta(hours=hours)
    url = f"https://api.openaq.org/v2/measurements"
    params = {
        "city": city,
        "parameter": "pm25",
        "date_from": start.isoformat(timespec="seconds") + "Z",
        "date_to": end.isoformat(timespec="seconds") + "Z",
        "limit": 10000,
        "sort": "desc"
    }
    r = requests.get(url, params=params)
    if r.status_code != 200:
        raise Exception(f"OpenAQ API failed: {r.status_code}")
    results = r.json().get("results", [])
    df = pd.DataFrame(results)
    if "date" in df.columns:
        df["timestamp"] = pd.to_datetime(df["date"].apply(lambda x: x["utc"]))
    df = df[["timestamp", "value"]].rename(columns={"value": "pm25"}).dropna()
    df.sort_values("timestamp", inplace=True)
    return df.reset_index(drop=True)

def fetch_weather(lat=28.6139, lon=77.2090):
    """Fetch hourly weather data from Open-Meteo API."""
    url = "https://api.open-meteo.com/v1/forecast"
    params = {
        "latitude": lat,
        "longitude": lon,
        "hourly": "temperature_2m,relative_humidity_2m,wind_speed_10m,pressure_msl",
        "timezone": "UTC"
    }
    r = requests.get(url, params=params)
    if r.status_code != 200:
        raise Exception("Weather API fetch failed")
    data = r.json()["hourly"]
    df = pd.DataFrame(data)
    df["timestamp"] = pd.to_datetime(df["time"])
    df.drop(columns=["time"], inplace=True)
    return df

def merge_datasets(pm_df, weather_df):
    """Merge PM2.5 with weather data on timestamp."""
    df = pd.merge_asof(pm_df.sort_values("timestamp"),
                       weather_df.sort_values("timestamp"),
                       on="timestamp")
    return df

# preprocessing


def clean_data(df):
    """Clean and preprocess merged dataset."""
    df = df.copy()
    df = df.ffill().bfill()  # fill missing values
    df = df.drop_duplicates(subset=["timestamp"])
    df = df.set_index("timestamp").resample("1H").mean().interpolate()
    return df.reset_index()

def remove_outliers(df, cols=None, z_thresh=3):
    """Remove statistical outliers using z-score."""
    if cols is None:
        cols = [c for c in df.columns if c != "timestamp"]
    for c in cols:
        z = np.abs((df[c] - df[c].mean()) / df[c].std())
        df.loc[z > z_thresh, c] = np.nan
    return df.ffill().bfill()

def save_processed(df, path="data/processed/pm25_merged.csv"):
    """Save cleaned data to CSV."""
    df.to_csv(path, index=False)


# features



def add_time_features(df):
    """Add time-based features like hour, dayofweek."""
    df["hour"] = df["timestamp"].dt.hour
    df["dayofweek"] = df["timestamp"].dt.dayofweek
    return df

def add_lag_features(df, col="pm25", lags=6):
    """Create lag features for time series modeling."""
    for i in range(1, lags + 1):
        df[f"{col}_lag_{i}"] = df[col].shift(i)
    return df

def add_rolling_features(df, col="pm25", windows=[3,6,12]):
    """Add rolling mean and std features."""
    for w in windows:
        df[f"{col}_roll_mean_{w}"] = df[col].rolling(w).mean()
        df[f"{col}_roll_std_{w}"] = df[col].rolling(w).std()
    return df

def feature_pipeline(df):
    """Complete feature engineering pipeline."""
    df = add_time_features(df)
    df = add_lag_features(df)
    df = add_rolling_features(df)
    df = df.dropna().reset_index(drop=True)
    return df


# anomaly


def detect_anomalies(df, col="pm25"):
    """Detect anomalies using IsolationForest."""
    model = IsolationForest(contamination=0.05, random_state=42)
    df["anomaly_flag"] = model.fit_predict(df[[col]])
    df["anomaly_flag"] = df["anomaly_flag"].map({1: 0, -1: 1})
    return df, model

# forecasting

def recursive_forecast(model, initial_window, steps, feature_fn=None):
    """
    Perform recursive forecasting for `steps` ahead.
    - model: trained regressor
    - initial_window: recent feature vector (DataFrame)
    - feature_fn: optional function to rebuild features each step
    """
    preds = []
    window = initial_window.copy()
    for _ in range(steps):
        X_last = window.iloc[[-1]].drop(columns=["timestamp", "pm25"])
        y_pred = model.predict(X_last)[0]
        preds.append(y_pred)
        next_row = window.iloc[[-1]].copy()
        next_row["pm25"] = y_pred
        next_row["timestamp"] += pd.Timedelta(hours=1)
        window = pd.concat([window, next_row]).reset_index(drop=True)
        if feature_fn:
            window = feature_fn(window)
    forecast_df = pd.DataFrame({
        "timestamp": pd.date_range(window["timestamp"].iloc[-steps-1], periods=steps+1, freq="H")[1:],
        "forecast": preds
    })
    return forecast_df

# train

import joblib
from sklearn.model_selection import train_test_split
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.metrics import mean_absolute_error
import pandas as pd

from preprocessing.preprocessing import clean_data
from features.features import feature_pipeline
from anomaly.anomaly import detect_anomalies

def train_model(data_path="data/processed/pm25_merged.csv", model_out="models/model.joblib"):
    df = pd.read_csv(data_path, parse_dates=["timestamp"])
    df = clean_data(df)
    df, _ = detect_anomalies(df)
    df = feature_pipeline(df)

    X = df.drop(columns=["timestamp", "pm25"])
    y = df["pm25"]

    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, shuffle=False)

    model = GradientBoostingRegressor(n_estimators=200, learning_rate=0.1, random_state=42)
    model.fit(X_train, y_train)
    preds = model.predict(X_val)

    mae = mean_absolute_error(y_val, preds)
    print(f"Validation MAE: {mae:.3f}")

    joblib.dump(model, model_out)
    print(f"✅ Model saved to {model_out}")
    return model, mae


import os

# Create the directory structure for the 'src' package
os.makedirs("src/preprocessing", exist_ok=True)
os.makedirs("src/features", exist_ok=True)
os.makedirs("src/anomaly", exist_ok=True)

# Create empty __init__.py files to make the directories packages
with open("src/__init__.py", "w") as f:
    pass
with open("src/preprocessing/__init__.py", "w") as f:
    pass
with open("src/features/__init__.py", "w") as f:
    pass
with open("src/anomaly/__init__.py", "w") as f:
    pass

# Write the function definitions into the corresponding files
with open("src/preprocessing/preprocessing.py", "w") as f:
    f.write("""
import pandas as pd
import numpy as np

def clean_data(df):
    \"\"\"Clean and preprocess merged dataset.\"\"\"
    df = df.copy()
    df = df.ffill().bfill()  # fill missing values
    df = df.drop_duplicates(subset=["timestamp"])
    df = df.set_index("timestamp").resample("1H").mean().interpolate()
    return df.reset_index()

def remove_outliers(df, cols=None, z_thresh=3):
    \"\"\"Remove statistical outliers using z-score.\"\"\"
    if cols is None:
        cols = [c for c in df.columns if c != "timestamp"]
    for c in cols:
        z = np.abs((df[c] - df[c].mean()) / df[c].std())
        df.loc[z > z_thresh, c] = np.nan
    return df.ffill().bfill()

def save_processed(df, path="data/processed/pm25_merged.csv"):
    \"\"\"Save cleaned data to CSV.\"\"\"
    df.to_csv(path, index=False)
""")

with open("src/features/features.py", "w") as f:
    f.write("""
import pandas as pd

def add_time_features(df):
    \"\"\"Add time-based features like hour, dayofweek.\"\"\"
    df["hour"] = df["timestamp"].dt.hour
    df["dayofweek"] = df["timestamp"].dt.dayofweek
    return df

def add_lag_features(df, col="pm25", lags=6):
    \"\"\"Create lag features for time series modeling.\"\"\"
    for i in range(1, lags + 1):
        df[f"{col}_lag_{i}"] = df[col].shift(i)
    return df

def add_rolling_features(df, col="pm25", windows=[3,6,12]):
    \"\"\"Add rolling mean and std features.\"\"\"
    for w in windows:
        df[f"{col}_roll_mean_{w}"] = df[col].rolling(w).mean()
        df[f"{col}_roll_std_{w}"] = df[col].rolling(w).std()
    return df

def feature_pipeline(df):
    \"\"\"Complete feature engineering pipeline.\"\"\"
    df = add_time_features(df)
    df = add_lag_features(df)
    df = add_rolling_features(df)
    df = df.dropna().reset_index(drop=True)
    return df
""")

with open("src/anomaly/anomaly.py", "w") as f:
    f.write("""
import pandas as pd
from sklearn.ensemble import IsolationForest

def detect_anomalies(df, col="pm25"):
    \"\"\"Detect anomalies using IsolationForest.\"\"\"
    model = IsolationForest(contamination=0.05, random_state=42)
    df["anomaly_flag"] = model.fit_predict(df[[col]])
    df["anomaly_flag"] = df["anomaly_flag"].map({1: 0, -1: 1})
    return df, model
""")


import sys
sys.path.append("./src")


def train_model(
    data_path="data/processed/pm25_merged.csv",
    model_out="models/model.joblib"
):
    # Ensure output folder exists
    os.makedirs(os.path.dirname(model_out), exist_ok=True)
    os.makedirs("data/processed", exist_ok=True)

    # Load data
    if not os.path.exists(data_path):
        raise FileNotFoundError(
            f"❌ Data file not found at {data_path}. Please run preprocessing first."
        )

    print("📂 Loading data...")
    df = pd.read_csv(data_path, parse_dates=["timestamp"])

    # Clean and preprocess
    print("🧹 Cleaning data...")
    df = clean_data(df)

    # Detect anomalies
    print("⚠️ Running anomaly detection...")
    df, _ = detect_anomalies(df)

    # Feature engineering
    print("🧠 Creating features...")
    df = feature_pipeline(df)

    # Split data
    X = df.drop(columns=["timestamp", "pm25"])
    y = df["pm25"]
    X_train, X_val, y_train, y_val = train_test_split(
        X, y, test_size=0.2, shuffle=False
    )

    # Train model
    print("🚀 Training model (Gradient Boosting)...")
    model = GradientBoostingRegressor(
        n_estimators=200, learning_rate=0.1, random_state=42
    )
    model.fit(X_train, y_train)

    # Evaluate
    preds = model.predict(X_val)
    mae = mean_absolute_error(y_val, preds)
    print(f"✅ Validation MAE: {mae:.3f}")

    # Save model
    joblib.dump(model, model_out)
    print(f"💾 Model saved successfully to: {model_out}")

    return model, mae


if __name__ == "__main__":
    # You can change the paths below if needed
    train_model(
        data_path="data/processed/pm25_merged.csv",
        model_out="models/model.joblib"
    )


ModuleNotFoundError: No module named 'preprocessing'

In [2]:
import os
import joblib
import requests
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from sklearn.ensemble import IsolationForest, GradientBoostingRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error


# ==========================================================
# 1️⃣ DATA FETCHING
# ==========================================================

import requests
import pandas as pd
from datetime import datetime, timedelta, timezone

TZ = timezone.utc
LAT, LON = 13.0827, 80.2707   # Chennai (change as needed)
HOURS_BACK = 72               # how many past hours of PM2.5
RADIUS_KM = 30                # OpenAQ search radius

def fetch_openaq_pm25(lat: float, lon: float, hours: int = 168, radius_km: int = 30) -> pd.DataFrame:
    end = datetime.now(TZ)
    start = end - timedelta(hours=hours)
    base = "https://api.openaq.org/v2/measurements"
    params = {
        "coordinates": f"{lat},{lon}",
        "radius": int(radius_km * 1000),
        "parameter": "pm25",
        "date_from": start.isoformat(),
        "date_to": end.isoformat(),
        "limit": 10000,
        "sort": "desc",
        "order_by": "datetime",
        "page": 1,
    }
    frames = []
    try:
        while True:
            r = requests.get(base, params=params, timeout=30)
            if r.status_code >= 400:
                return pd.DataFrame()
            js = r.json()
            items = js.get("results", [])
            if not items:
                break
            df = pd.DataFrame(items)
            if "date" not in df:
                break
            df = df[["date", "value"]]
            df["datetime"] = pd.to_datetime(df["date"].apply(lambda d: d.get("utc")), utc=True)
            df = df.drop(columns=["date"]).sort_values("datetime")
            frames.append(df)
            meta = js.get("meta", {})
            found = meta.get("found")
            page = params["page"]
            limit = params["limit"]
            if found is None or page * limit >= int(found):
                break
            params["page"] = page + 1
    except Exception:
        return pd.DataFrame()

    if not frames:
        return pd.DataFrame()

    df = pd.concat(frames, ignore_index=True)
    df = df[(df["datetime"] >= start) & (df["datetime"] <= end)]
    # hourly median across stations
    df_hour = (
        df.set_index("datetime")
          .groupby(pd.Grouper(freq="1H"))["value"]
          .median()
          .reset_index()
          .rename(columns={"value": "pm25"})
    )
    return df_hour


def fetch_openmeteo_aq_pm25(lat: float, lon: float, hours: int = 168) -> pd.DataFrame:
    end = datetime.now(TZ)
    start = end - timedelta(hours=hours)
    url = (
        "https://air-quality-api.open-meteo.com/v1/air-quality?"
        f"latitude={lat}&longitude={lon}&hourly=pm2_5&past_days={(hours//24)+1}&timezone=UTC"
    )
    try:
        r = requests.get(url, timeout=30)
        r.raise_for_status()
        h = r.json().get("hourly", {})
        if not h:
            return pd.DataFrame()
        df = pd.DataFrame({"datetime": h.get("time", []), "pm25": h.get("pm2_5", [])})
        df["datetime"] = pd.to_datetime(df["datetime"], utc=True)
        return df[(df["datetime"] >= start) & (df["datetime"] <= end)].reset_index(drop=True)
    except Exception:
        return pd.DataFrame()


if __name__ == "__main__":

    pm25 = fetch_openaq_pm25(LAT, LON, hours=HOURS_BACK, radius_km=RADIUS_KM)
    if pm25.empty:
        pm25 = fetch_openmeteo_aq_pm25(LAT, LON, hours=HOURS_BACK)



def merge_datasets(pm_df, weather_df):
    """Merge PM2.5 and weather data by timestamp."""
    print("🔗 Merging datasets...")
    df = pd.merge_asof(
        pm_df.sort_values("timestamp"),
        weather_df.sort_values("timestamp"),
        on="timestamp"
    )
    return df


# ==========================================================
# 2️⃣ PREPROCESSING
# ==========================================================

def clean_data(df):
    """Clean and preprocess merged dataset."""
    df = df.copy()
    df = df.ffill().bfill()
    df = df.drop_duplicates(subset=["timestamp"])
    df = df.set_index("timestamp").resample("1H").mean().interpolate()
    return df.reset_index()


def remove_outliers(df, cols=None, z_thresh=3):
    """Remove statistical outliers."""
    if cols is None:
        cols = [c for c in df.columns if c != "timestamp"]
    for c in cols:
        z = np.abs((df[c] - df[c].mean()) / df[c].std())
        df.loc[z > z_thresh, c] = np.nan
    return df.ffill().bfill()


# ==========================================================
# 3️⃣ FEATURE ENGINEERING
# ==========================================================

def add_time_features(df):
    df["hour"] = df["timestamp"].dt.hour
    df["dayofweek"] = df["timestamp"].dt.dayofweek
    return df

def add_lag_features(df, col="pm25", lags=6):
    for i in range(1, lags + 1):
        df[f"{col}_lag_{i}"] = df[col].shift(i)
    return df

def add_rolling_features(df, col="pm25", windows=[3,6,12]):
    for w in windows:
        df[f"{col}_roll_mean_{w}"] = df[col].rolling(w).mean()
        df[f"{col}_roll_std_{w}"] = df[col].rolling(w).std()
    return df

def feature_pipeline(df):
    print("🧠 Creating time, lag, and rolling features...")
    df = add_time_features(df)
    df = add_lag_features(df)
    df = add_rolling_features(df)
    df = df.dropna().reset_index(drop=True)
    return df


# ==========================================================
# 4️⃣ ANOMALY DETECTION
# ==========================================================

def detect_anomalies(df, col="pm25"):
    print("⚠️ Detecting anomalies (IsolationForest)...")
    model = IsolationForest(contamination=0.05, random_state=42)
    df["anomaly_flag"] = model.fit_predict(df[[col]])
    df["anomaly_flag"] = df["anomaly_flag"].map({1: 0, -1: 1})
    return df


# ==========================================================
# 5️⃣ TRAINING
# ==========================================================

def train_model(df, model_out="models/model.joblib"):
    """Train Gradient Boosting model."""
    os.makedirs(os.path.dirname(model_out), exist_ok=True)

    df = feature_pipeline(df)
    X = df.drop(columns=["timestamp", "pm25"])
    y = df["pm25"]

    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, shuffle=False)

    print("🚀 Training model (Gradient Boosting)...")
    model = GradientBoostingRegressor(n_estimators=200, learning_rate=0.1, random_state=42)
    model.fit(X_train, y_train)

    preds = model.predict(X_val)
    mae = mean_absolute_error(y_val, preds)
    print(f"✅ Validation MAE: {mae:.3f}")

    joblib.dump(model, model_out)
    print(f"💾 Model saved to {model_out}")

    return model, mae


# ==========================================================
# 6️⃣ MAIN PIPELINE
# ==========================================================

def main():
    os.makedirs("data/processed", exist_ok=True)
    os.makedirs("models", exist_ok=True)

    pm_df = fetch_openaq("Delhi", hours=72)
    weather_df = fetch_weather()
    merged_df = merge_datasets(pm_df, weather_df)

    cleaned_df = clean_data(merged_df)
    cleaned_df = remove_outliers(cleaned_df)
    cleaned_df = detect_anomalies(cleaned_df)

    cleaned_df.to_csv("data/processed/pm25_merged.csv", index=False)
    print("📁 Processed data saved to data/processed/pm25_merged.csv")

    train_model(cleaned_df, model_out="models/model.joblib")


if __name__ == "__main__":
    main()


🌫️ Fetching PM2.5 data for Delhi...


  end = datetime.utcnow()


Exception: OpenAQ API failed: 410

In [3]:
import os
import joblib
import requests
import numpy as np
import pandas as pd
from datetime import datetime, timedelta, timezone
from sklearn.ensemble import IsolationForest, GradientBoostingRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error

# ==========================================================
# CONFIG
# ==========================================================
TZ = timezone.utc
# Default location (Chennai). Change to desired lat/lon.
LAT, LON = 13.0827, 80.2707
HOURS_BACK = 72
RADIUS_KM = 30

# ==========================================================
# 1️⃣ DATA FETCHING
# ==========================================================

def fetch_openaq_pm25(lat: float, lon: float, hours: int = 168, radius_km: int = 30) -> pd.DataFrame:
    """Fetch PM2.5 measurements from OpenAQ and return hourly median with a 'timestamp' column (UTC)."""
    end = datetime.now(TZ)
    start = end - timedelta(hours=hours)
    base = "https://api.openaq.org/v2/measurements"
    params = {
        "coordinates": f"{lat},{lon}",
        "radius": int(radius_km * 1000),
        "parameter": "pm25",
        "date_from": start.isoformat(),
        "date_to": end.isoformat(),
        "limit": 10000,
        "sort": "desc",
        "order_by": "datetime",
        "page": 1,
    }
    frames = []
    try:
        while True:
            r = requests.get(base, params=params, timeout=30)
            if r.status_code >= 400:
                break
            js = r.json()
            items = js.get("results", [])
            if not items:
                break
            df = pd.DataFrame(items)
            if "date" not in df:
                break
            # extract UTC datetime
            df["timestamp"] = pd.to_datetime(df["date"].apply(lambda d: d.get("utc")), utc=True)
            df = df[["timestamp", "value"]].rename(columns={"value": "pm25"})
            frames.append(df)
            meta = js.get("meta", {})
            found = meta.get("found")
            page = params["page"]
            limit = params["limit"]
            if found is None or page * limit >= int(found):
                break
            params["page"] = page + 1
    except Exception:
        frames = []

    if not frames:
        return pd.DataFrame(columns=["timestamp", "pm25"])

    df = pd.concat(frames, ignore_index=True)
    df = df[(df["timestamp"] >= start) & (df["timestamp"] <= end)]

    # hourly median across stations
    df_hour = (
        df.set_index("timestamp")
          .groupby(pd.Grouper(freq="1H"))["pm25"]
          .median()
          .reset_index()
    )
    # ensure sorted
    df_hour = df_hour.sort_values("timestamp").reset_index(drop=True)
    return df_hour

def fetch_openmeteo_aq_pm25(lat: float, lon: float, hours: int = 168) -> pd.DataFrame:
    """Fallback: open-meteo air-quality PM2.5 hourly. Returns 'timestamp' and 'pm25' columns (UTC)."""
    end = datetime.now(TZ)
    start = end - timedelta(hours=hours)
    # past_days param uses days; request a few days to cover hours window
    past_days = max(1, (hours // 24) + 1)
    url = (
        "https://air-quality-api.open-meteo.com/v1/air-quality?"
        f"latitude={lat}&longitude={lon}&hourly=pm2_5&past_days={past_days}&timezone=UTC"
    )
    try:
        r = requests.get(url, timeout=30)
        r.raise_for_status()
        h = r.json().get("hourly", {})
        if not h:
            return pd.DataFrame(columns=["timestamp", "pm25"])
        df = pd.DataFrame({"timestamp": h.get("time", []), "pm25": h.get("pm2_5", [])})
        df["timestamp"] = pd.to_datetime(df["timestamp"], utc=True)
        df = df[(df["timestamp"] >= start) & (df["timestamp"] <= end)].reset_index(drop=True)
        return df
    except Exception:
        return pd.DataFrame(columns=["timestamp", "pm25"])

def fetch_openmeteo_weather(lat: float, lon: float, hours: int = 168) -> pd.DataFrame:
    """Fetch meteorological hourly features from Open-Meteo (temperature, humidity, windspeed)."""
    end = datetime.now(TZ)
    start = end - timedelta(hours=hours)
    past_days = max(1, (hours // 24) + 1)
    url = (
        "https://api.open-meteo.com/v1/forecast?"
        f"latitude={lat}&longitude={lon}&hourly=temperature_2m,relativehumidity_2m,windspeed_10m"
        f"&past_days={past_days}&timezone=UTC"
    )
    try:
        r = requests.get(url, timeout=30)
        r.raise_for_status()
        j = r.json().get("hourly", {})
        if not j:
            return pd.DataFrame(columns=["timestamp"])
        df = pd.DataFrame({
            "timestamp": j.get("time", []),
            "temperature_2m": j.get("temperature_2m", []),
            "relativehumidity_2m": j.get("relativehumidity_2m", []),
            "windspeed_10m": j.get("windspeed_10m", []),
        })
        df["timestamp"] = pd.to_datetime(df["timestamp"], utc=True)
        return df[(df["timestamp"] >= start) & (df["timestamp"] <= end)].reset_index(drop=True)
    except Exception:
        return pd.DataFrame(columns=["timestamp"])

# ==========================================================
# 2️⃣ MERGE + PREPROCESSING
# ==========================================================

def merge_datasets(pm_df: pd.DataFrame, weather_df: pd.DataFrame) -> pd.DataFrame:
    """
    Merge PM2.5 and weather data by timestamp (asof-merge).
    If weather_df is empty, returns pm_df with timestamp/pm25 only.
    """
    print("🔗 Merging datasets...")
    pm = pm_df.copy()
    pm = pm.sort_values("timestamp").reset_index(drop=True)
    if weather_df is None or weather_df.empty:
        return pm
    w = weather_df.copy().sort_values("timestamp").reset_index(drop=True)
    # asof merge: nearest earlier weather observation for each pm timestamp
    merged = pd.merge_asof(pm, w, on="timestamp", direction="nearest", tolerance=pd.Timedelta("1H"))
    # if some weather cols are missing due to tolerance, that's okay
    return merged

def clean_data(df: pd.DataFrame) -> pd.DataFrame:
    """Clean and preprocess merged dataset. Ensures hourly frequency, interpolates missing values."""
    print("🧹 Cleaning data...")
    if df.empty:
        return df
    df = df.copy()
    # ensure timestamp is datetime tz-aware
    df["timestamp"] = pd.to_datetime(df["timestamp"], utc=True)
    df = df.drop_duplicates(subset=["timestamp"])
    df = df.set_index("timestamp").sort_index()
    # resample to 1H and take mean for numeric columns
    df = df.resample("1H").mean()
    # interpolate small gaps, then forward/backward fill remaining
    df = df.interpolate(limit=3).ffill().bfill()
    df = df.reset_index()
    return df

def remove_outliers(df: pd.DataFrame, cols=None, z_thresh=3) -> pd.DataFrame:
    """Remove statistical outliers by z-score (replace with NaN -> interpolate)."""
    print("🚫 Removing outliers...")
    if df.empty:
        return df
    df = df.copy()
    if cols is None:
        cols = [c for c in df.columns if c not in ("timestamp",)]
    for c in cols:
        if c not in df.columns or not np.issubdtype(df[c].dtype, np.number):
            continue
        std = df[c].std()
        mean = df[c].mean()
        if std == 0 or np.isnan(std):
            continue
        z = np.abs((df[c] - mean) / std)
        df.loc[z > z_thresh, c] = np.nan
    # interpolate and fill
    df = df.interpolate(limit=6).ffill().bfill()
    return df

# ==========================================================
# 3️⃣ FEATURE ENGINEERING
# ==========================================================

def add_time_features(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df["hour"] = df["timestamp"].dt.hour
    df["dayofweek"] = df["timestamp"].dt.dayofweek
    return df

def add_lag_features(df: pd.DataFrame, col="pm25", lags=6) -> pd.DataFrame:
    df = df.copy()
    for i in range(1, lags + 1):
        df[f"{col}_lag_{i}"] = df[col].shift(i)
    return df

def add_rolling_features(df: pd.DataFrame, col="pm25", windows=[3,6,12]) -> pd.DataFrame:
    df = df.copy()
    for w in windows:
        df[f"{col}_roll_mean_{w}"] = df[col].rolling(w, min_periods=1).mean()
        df[f"{col}_roll_std_{w}"] = df[col].rolling(w, min_periods=1).std().fillna(0)
    return df

def feature_pipeline(df: pd.DataFrame) -> pd.DataFrame:
    print("🧠 Creating time, lag, and rolling features...")
    df = df.copy()
    # ensure pm25 exists
    if "pm25" not in df.columns:
        raise KeyError("pm25 column not found in dataframe (needed for feature engineering).")
    df = add_time_features(df)
    df = add_lag_features(df)
    df = add_rolling_features(df)
    # drop rows with NaNs introduced by lagging
    df = df.dropna().reset_index(drop=True)
    return df

# ==========================================================
# 4️⃣ ANOMALY DETECTION
# ==========================================================

def detect_anomalies(df: pd.DataFrame, col="pm25") -> pd.DataFrame:
    print("⚠️ Detecting anomalies (IsolationForest)...")
    df = df.copy()
    if col not in df.columns or df.empty:
        df["anomaly_flag"] = 0
        return df
    # IsolationForest expects 2D numeric input and no NaNs
    X = df[[col]].fillna(method="ffill").fillna(method="bfill")
    model = IsolationForest(contamination=0.05, random_state=42)
    flags = model.fit_predict(X)
    df["anomaly_flag"] = np.where(flags == -1, 1, 0)
    return df

# ==========================================================
# 5️⃣ TRAINING
# ==========================================================

def train_model(df: pd.DataFrame, model_out="models/model.joblib"):
    """Train Gradient Boosting model."""
    os.makedirs(os.path.dirname(model_out), exist_ok=True)

    df = df.copy()
    df = feature_pipeline(df)

    # drop non-feature columns
    drop_cols = ["timestamp", "anomaly_flag"]
    X = df.drop(columns=[c for c in drop_cols if c in df.columns] + ["pm25"], errors="ignore")
    y = df["pm25"]

    # simple chronological split
    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, shuffle=False)

    print("🚀 Training model (Gradient Boosting)...")
    model = GradientBoostingRegressor(n_estimators=200, learning_rate=0.1, random_state=42)
    model.fit(X_train, y_train)

    preds = model.predict(X_val)
    mae = mean_absolute_error(y_val, preds)
    print(f"✅ Validation MAE: {mae:.3f}")

    joblib.dump(model, model_out)
    print(f"💾 Model saved to {model_out}")

    return model, mae

# ==========================================================
# 6️⃣ MAIN PIPELINE
# ==========================================================

def main():
    os.makedirs("data/processed", exist_ok=True)
    os.makedirs("models", exist_ok=True)

    # Fetch PM2.5 (OpenAQ preferred, fallback open-meteo)
    pm_df = fetch_openaq_pm25(LAT, LON, hours=HOURS_BACK, radius_km=RADIUS_KM)
    if pm_df.empty:
        print("No OpenAQ data found — falling back to Open-Meteo air quality.")
        pm_df = fetch_openmeteo_aq_pm25(LAT, LON, hours=HOURS_BACK)

    if pm_df.empty:
        raise RuntimeError("Failed to fetch any PM2.5 data. Check network / API availability.")

    # Fetch weather features (temperature, humidity, wind)
    weather_df = fetch_openmeteo_weather(LAT, LON, hours=HOURS_BACK)

    merged_df = merge_datasets(pm_df, weather_df)
    cleaned_df = clean_data(merged_df)
    cleaned_df = remove_outliers(cleaned_df)
    cleaned_df = detect_anomalies(cleaned_df)

    processed_path = "data/processed/pm25_merged.csv"
    cleaned_df.to_csv(processed_path, index=False)
    print(f"📁 Processed data saved to {processed_path}")

    # Train model
    train_model(cleaned_df, model_out="models/model.joblib")


if __name__ == "__main__":
    main()


No OpenAQ data found — falling back to Open-Meteo air quality.
🔗 Merging datasets...
🧹 Cleaning data...


  merged = pd.merge_asof(pm, w, on="timestamp", direction="nearest", tolerance=pd.Timedelta("1H"))
  df = df.resample("1H").mean()
  X = df[[col]].fillna(method="ffill").fillna(method="bfill")


🚫 Removing outliers...
⚠️ Detecting anomalies (IsolationForest)...
📁 Processed data saved to data/processed/pm25_merged.csv
🧠 Creating time, lag, and rolling features...
🚀 Training model (Gradient Boosting)...
✅ Validation MAE: 3.370
💾 Model saved to models/model.joblib
