In [5]:
import requests
import pandas as pd

API = "A1EAAE46-87DF-4A9D-9FCA-90A3AFA97D56"
url = "https://www.airnowapi.org/aq/data/"

params = {
    "startDate": "2025-10-01T06",     # top of hour
    "endDate":   "2025-10-01T07",
    "parameters": "PM25,PM10,OZONE,NO2,SO2,CO",
    "BBOX": "-122.60,37.30,-121.70,38.10",  # lon_min,lat_min,lon_max,lat_max
    "dataType": "A",                   # observations
    "format": "application/json",
    "API_KEY": API
}

resp = requests.get(url, params=params, timeout=60)
resp.raise_for_status()
raw = pd.DataFrame(resp.json())

print("Raw columns:", raw.columns.tolist())
print(raw.head())

# ---------- Normalize columns ----------
def _pick(df, candidates):
    for c in candidates:
        if c in df.columns:
            return c
    return None

def normalize_airnow(df: pd.DataFrame) -> pd.DataFrame:
    if df is None or df.empty:
        return pd.DataFrame(columns=[
            "ts_utc","parameter","value","unit","aqi","category_code","lat","lon","parameter_std"
        ])

    col_utc   = _pick(df, ["UTC","DateObserved","DateTimeUTC","ValidDate"])
    col_param = _pick(df, ["Parameter","ParameterName"])
    col_value = _pick(df, ["Value","Concentration"])     # often missing in this endpoint
    col_unit  = _pick(df, ["Unit","UnitName"])
    col_aqi   = _pick(df, ["AQI","AQIValue"])
    col_cat   = _pick(df, ["Category","CategoryName"])   # here it's a numeric code
    col_lat   = _pick(df, ["Latitude","lat","Lat"])
    col_lon   = _pick(df, ["Longitude","lon","Long"])

    out = pd.DataFrame()
    out["ts_utc"]   = pd.to_datetime(df[col_utc], errors="coerce", utc=True) if col_utc else pd.NaT
    out["parameter"]= df[col_param] if col_param else pd.NA
    out["value"]    = df[col_value] if col_value else pd.NA
    out["unit"]     = df[col_unit]  if col_unit  else pd.NA
    out["aqi"]      = pd.to_numeric(df[col_aqi], errors="coerce") if col_aqi else pd.NA
    out["category_code"] = pd.to_numeric(df[col_cat], errors="coerce") if col_cat else pd.NA
    out["lat"]      = pd.to_numeric(df[col_lat], errors="coerce") if col_lat else pd.NA
    out["lon"]      = pd.to_numeric(df[col_lon], errors="coerce") if col_lon else pd.NA

    # Clean placeholders: -999 means missing
    out.loc[out["aqi"] == -999, "aqi"] = pd.NA
    out.loc[out["category_code"] == -999, "category_code"] = pd.NA

    # Standardize parameter labels for pivoting
    param_map = {"PM2.5":"pm25","PM25":"pm25","PM10":"pm10","OZONE":"o3","O3":"o3",
                 "NO2":"no2","SO2":"so2","CO":"co"}
    out["parameter_std"] = out["parameter"].astype(str).str.upper().map(param_map).fillna(out["parameter"])

    return out

norm = normalize_airnow(raw)
print("\nNormalized preview:")
print(norm.head())

# ---------- Pivot safely (index only on columns that actually exist & are not all-NaN) ----------
# Use only keys that are present AND not entirely NaN
candidate_keys = ["ts_utc","lat","lon"]
key_cols = [k for k in candidate_keys if k in norm.columns and norm[k].notna().any()]

if not key_cols:
    raise RuntimeError("No valid key columns to pivot on (check payload).")

# Pivot AQI (this endpoint returns AQI per pollutant)
aqi_wide = (
    norm.pivot_table(index=key_cols, columns="parameter_std", values="aqi", aggfunc="max", dropna=True)
        .reset_index()
)

# Prefix pollutant columns with 'aqi_'
aqi_cols = [c for c in aqi_wide.columns if c not in key_cols]
aqi_wide = aqi_wide.rename(columns={c: f"aqi_{c}" for c in aqi_cols})

# Composite AQI across pollutants (row-wise max)
aqi_only = aqi_wide[[c for c in aqi_wide.columns if c.startswith("aqi_")]]
aqi_wide["aqi_now"] = aqi_only.max(axis=1, skipna=True)
aqi_wide["dominant"] = aqi_only.idxmax(axis=1).str.replace("aqi_","")

# Optional: simple color bucket for your map
def aqi_color(aqi):
    if pd.isna(aqi): return "Missing"
    aqi = float(aqi)
    if aqi <= 100: return "Green"     # Good/Moderate
    if aqi <= 150: return "Yellow"    # Unhealthy for Sensitive Groups
    return "Red"                      # Unhealthy or worse

aqi_wide["color"] = aqi_wide["aqi_now"].apply(aqi_color)

print("\nPivoted (wide) preview:")
print(aqi_wide.head())

# Save if you want
# aqi_wide.to_parquet("airnow_window.parquet", index=False)
# aqi_wide.to_csv("airnow_window.csv", index=False)


Raw columns: ['Latitude', 'Longitude', 'UTC', 'Parameter', 'Unit', 'AQI', 'Category']
   Latitude  Longitude               UTC Parameter   Unit  AQI  Category
0   37.9722  -122.5189  2025-10-01T06:00     OZONE    PPB   31         1
1   37.9722  -122.5189  2025-10-01T06:00     PM2.5  UG/M3    2         1
2   37.9722  -122.5189  2025-10-01T06:00       NO2    PPB    0         1
3   37.9722  -122.5189  2025-10-01T06:00        CO    PPM -999      -999
4   37.7658  -122.3978  2025-10-01T06:00     OZONE    PPB   27         1

Normalized preview:
                     ts_utc parameter value   unit   aqi  category_code  \
0 2025-10-01 06:00:00+00:00     OZONE  <NA>    PPB  31.0            1.0   
1 2025-10-01 06:00:00+00:00     PM2.5  <NA>  UG/M3   2.0            1.0   
2 2025-10-01 06:00:00+00:00       NO2  <NA>    PPB   0.0            1.0   
3 2025-10-01 06:00:00+00:00        CO  <NA>    PPM   NaN            NaN   
4 2025-10-01 06:00:00+00:00     OZONE  <NA>    PPB  27.0            1.0   

    

In [6]:
import requests
import pandas as pd
from datetime import datetime, timedelta, timezone
import time

API = "A1EAAE46-87DF-4A9D-9FCA-90A3AFA97D56"
# Set your area of interest (San Francisco area example)
BBOX = (-122.60, 37.30, -121.70, 38.10)  # (lon_min, lat_min, lon_max, lat_max)
PARAMS = ("PM25","PM10","OZONE","NO2","SO2","CO")  # request all common pollutants
SLEEP_SECS = 0.4  # be nice to the API
OUT_CSV = "airnow_last_day.csv"

def fetch_airnow_slice(start_dt, end_dt, bbox, parameters):
    url = "https://www.airnowapi.org/aq/data/"
    params = {
        "startDate": start_dt.strftime("%Y-%m-%dT%H"),
        "endDate":   end_dt.strftime("%Y-%m-%dT%H"),
        "parameters": ",".join(parameters),
        "BBOX": ",".join(map(str, bbox)),
        "dataType": "A",                   # observations
        "format": "application/json",
        "API_KEY": API
    }
    r = requests.get(url, params=params, timeout=60)
    r.raise_for_status()
    return pd.DataFrame(r.json())

def _pick(df, candidates):
    for c in candidates:
        if c in df.columns:
            return c
    return None

def normalize_airnow(df: pd.DataFrame) -> pd.DataFrame:
    if df is None or df.empty:
        return pd.DataFrame(columns=["ts_utc","parameter","value","unit","aqi",
                                     "category_code","lat","lon","parameter_std"])
    col_utc  = _pick(df, ["UTC","DateObserved","DateTimeUTC","ValidDate"])
    col_par  = _pick(df, ["Parameter","ParameterName"])
    col_val  = _pick(df, ["Value","Concentration"])
    col_unit = _pick(df, ["Unit","UnitName"])
    col_aqi  = _pick(df, ["AQI","AQIValue"])
    col_cat  = _pick(df, ["Category","CategoryName"])
    col_lat  = _pick(df, ["Latitude","lat","Lat"])
    col_lon  = _pick(df, ["Longitude","lon","Long"])

    out = pd.DataFrame()
    out["ts_utc"]   = pd.to_datetime(df[col_utc], errors="coerce", utc=True) if col_utc else pd.NaT
    out["parameter"]= df[col_par] if col_par else pd.NA
    out["value"]    = df[col_val] if col_val else pd.NA
    out["unit"]     = df[col_unit] if col_unit else pd.NA
    out["aqi"]      = pd.to_numeric(df[col_aqi], errors="coerce") if col_aqi else pd.NA
    out["category_code"] = pd.to_numeric(df[col_cat], errors="coerce") if col_cat else pd.NA
    out["lat"]      = pd.to_numeric(df[col_lat], errors="coerce") if col_lat else pd.NA
    out["lon"]      = pd.to_numeric(df[col_lon], errors="coerce") if col_lon else pd.NA

    # -999 means missing in AirNow
    out.loc[out["aqi"] == -999, "aqi"] = pd.NA
    out.loc[out["category_code"] == -999, "category_code"] = pd.NA

    # standardize pollutant names for columns
    param_map = {"PM2.5":"pm25","PM25":"pm25","PM10":"pm10","OZONE":"o3","O3":"o3",
                 "NO2":"no2","SO2":"so2","CO":"co"}
    out["parameter_std"] = out["parameter"].astype(str).str.upper().map(param_map).fillna(out["parameter"])

    return out

def build_last_day_csv(bbox, parameters, out_csv, sleep_secs=0.4):
    # last 24h window in UTC, top-of-hour
    end_dt = datetime.now(timezone.utc).replace(minute=0, second=0, microsecond=0)
    start_dt = end_dt - timedelta(hours=24)

    # fetch hour by hour
    frames = []
    t = start_dt
    while t < end_dt:
        df = fetch_airnow_slice(t, t + timedelta(hours=1), bbox, parameters)
        if not df.empty:
            frames.append(df)
        time.sleep(sleep_secs)
        t += timedelta(hours=1)

    if not frames:
        print("No data returned for the last 24 hours.")
        return

    raw = pd.concat(frames, ignore_index=True)
    norm = normalize_airnow(raw)

    # keep only columns needed for pivot
    key_cols = ["ts_utc","lat","lon"]
    aqi_wide = (
        norm.pivot_table(index=key_cols, columns="parameter_std", values="aqi", aggfunc="max", dropna=True)
            .reset_index()
    )

    # prefix pollutant columns with 'aqi_'
    pol_cols = [c for c in aqi_wide.columns if c not in key_cols]
    aqi_wide = aqi_wide.rename(columns={c: f"aqi_{c}" for c in pol_cols})

    # composite AQI and dominant pollutant
    aqi_only = aqi_wide[[c for c in aqi_wide.columns if c.startswith("aqi_")]]
    aqi_wide["aqi_now"] = aqi_only.max(axis=1, skipna=True)
    aqi_wide["dominant"] = aqi_only.idxmax(axis=1).str.replace("aqi_","")

    # sort & save
    aqi_wide = aqi_wide.sort_values(["ts_utc","lat","lon"]).reset_index(drop=True)
    aqi_wide.to_csv(out_csv, index=False)
    print(f"Saved {len(aqi_wide):,} rows to {out_csv}")

# --- run it ---
build_last_day_csv(BBOX, PARAMS, OUT_CSV, SLEEP_SECS)


Saved 365 rows to airnow_last_day.csv


In [5]:
import requests, pandas as pd, time, math, random
from datetime import datetime, timezone
from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter

# =========== SETTINGS ===========
DEBUG = True
CITY = "New York"
CITY_COORDS = {
    "San Francisco": (37.7749, -122.4194),
    "Los Angeles":   (34.0522, -118.2437),
    "New York":      (40.7128, -74.0060),
    "Chicago":       (41.8781, -87.6298),
    "Houston":       (29.7604, -95.3698),
    "Toronto":       (43.6532, -79.3832),
    "Vancouver":     (49.2827, -123.1207),
}
CENTER_LAT, CENTER_LON = CITY_COORDS[CITY]

RADIUS_M = 25000
LOC_MAX = 200
PER_PAGE = 100
SLEEP = 0.15
# ⬇️ KEEP EVERYTHING: set to None (no filtering by parameter)
POLLUTANTS = None

OUT_CSV = "openaq_latest_city_v3_ALL_PARAMS.csv"

API_KEY = "e78f079d1ae5390a89f428671ff1349bdefdee203a74f0aff047cee62474901d"
BASE = "https://api.openaq.org/v3"
HDRS = {"Accept": "application/json", "X-API-Key": API_KEY}

def make_session():
    s = requests.Session()
    retries = Retry(
        total=6, connect=6, read=6, backoff_factor=0.8,
        respect_retry_after_header=True,
        status_forcelist=[408,429,500,502,503,504],
        allowed_methods=["GET"],
    )
    adapter = HTTPAdapter(max_retries=retries, pool_connections=10, pool_maxsize=10)
    s.mount("https://", adapter); s.mount("http://", adapter)
    return s

SESSION = make_session()

def get_with_backoff(url, params=None, timeout=30, max_attempts=6):
    attempt = 0
    while True:
        attempt += 1
        r = SESSION.get(url, headers=HDRS, params=params, timeout=timeout)
        if r.status_code in (429,500,502,503,504) and attempt < max_attempts:
            ra = r.headers.get("Retry-After")
            try:
                sleep_s = float(ra) if ra is not None else min(20.0, (2 ** attempt) + random.uniform(0, 1.0))
            except Exception:
                sleep_s = min(20.0, (2 ** attempt) + random.uniform(0, 1.0))
            if DEBUG:
                print(f"[backoff] {r.status_code} {url} — sleeping {sleep_s:.1f}s (attempt {attempt}/{max_attempts})")
            time.sleep(sleep_s)
            continue
        r.raise_for_status()
        return r

def bbox_from_center(lat, lon, radius_m):
    km = radius_m / 1000.0
    dlat = km / 111.0
    dlon = km / (111.0 * max(0.1, math.cos(math.radians(lat))))
    xmin, ymin = lon - dlon, lat - dlat
    xmax, ymax = lon + dlon, lat + dlat
    return f"{xmin},{ymin},{xmax},{ymax}"

def norm_param(x):
    if x is None: return None
    if isinstance(x, dict):
        return str(x.get("name","")).lower() or None
    return str(x).lower()

def list_locations_bbox(lat, lon, radius_m, per_page=PER_PAGE, limit_locations=LOC_MAX):
    url = f"{BASE}/locations"
    bbox = bbox_from_center(lat, lon, radius_m)
    rows, page = [], 1
    while len(rows) < limit_locations:
        params = {"bbox": bbox, "limit": per_page, "page": page}
        r = get_with_backoff(url, params=params, timeout=30)
        res = r.json().get("results", []) or []
        rows.extend(res)
        if DEBUG and page == 1:
            print(f"[locations] page1 got {len(res)} rows")
        if len(res) < per_page: break
        page += 1
        time.sleep(SLEEP)
    if not rows: return pd.DataFrame()
    df = pd.json_normalize(rows)
    keep = ["id","name","coordinates.latitude","coordinates.longitude","city","country","timezone"]
    for c in keep:
        if c not in df.columns: df[c] = pd.NA
    df = df.rename(columns={
        "id":"locations_id",
        "coordinates.latitude":"lat",
        "coordinates.longitude":"lon",
    })
    df = df.dropna(subset=["lat","lon"]).reset_index(drop=True)
    df = df[["locations_id","name","lat","lon","city","country","timezone"]].head(limit_locations)
    return df

def list_sensors_for_location(loc_id):
    url = f"{BASE}/locations/{int(loc_id)}/sensors"
    rows, page = [], 1
    while True:
        params = {"limit": PER_PAGE, "page": page}
        r = get_with_backoff(url, params=params, timeout=30)
        res = r.json().get("results", []) or []
        rows.extend(res)
        if len(res) < PER_PAGE: break
        page += 1
        time.sleep(SLEEP)
    if not rows: return pd.DataFrame()
    return pd.json_normalize(rows)

def fetch_latest_via_location_sensors(center_lat, center_lon, radius_m):
    locs = list_locations_bbox(center_lat, center_lon, radius_m)
    if locs.empty:
        print("[i] No locations in bbox.")
        return pd.DataFrame()

    flats = []
    for _, loc in locs.iterrows():
        sensors = list_sensors_for_location(loc["locations_id"])
        if sensors.empty:
            continue
        for _, s in sensors.iterrows():
            # parameter (keep ALL)
            if "parameter.name" in s:
                parameter = norm_param(s["parameter.name"])
                parameter_display = s.get("parameter.displayName")
            else:
                parameter = norm_param(s.get("parameter"))
                parameter_display = None

            # value/unit/timestamp
            val = s.get("latest.value")
            unit = s.get("parameter.units") or s.get("unit")

            ts = (s.get("latest.datetime.utc") or
                  s.get("latest.datetime.local") or
                  s.get("datetimeLast.utc") or
                  s.get("datetimeLast.local"))
            ts_utc = pd.to_datetime(ts, errors="coerce", utc=True) if isinstance(ts, str) else pd.NaT

            # coords
            lat = s.get("latest.coordinates.latitude")
            lon = s.get("latest.coordinates.longitude")
            if pd.isna(lat) or pd.isna(lon):
                lat = loc["lat"]; lon = loc["lon"]

            flats.append({
                "ts_utc": ts_utc,
                "parameter": parameter,
                "parameter_display": parameter_display,
                "value": pd.to_numeric(val, errors="coerce"),
                "unit": unit,
                "lat": pd.to_numeric(lat, errors="coerce"),
                "lon": pd.to_numeric(lon, errors="coerce"),
                "sensor_id": s.get("id"),
                "locationId": int(loc["locations_id"]),
                "location": loc["name"],
                "city": loc.get("city"),
                "country": loc.get("country"),
            })

    df = pd.DataFrame(flats)
    if df.empty:
        return df
    df = df.dropna(subset=["value","lat","lon"])
    if df["ts_utc"].notna().any():
        df = df.sort_values(["ts_utc","location","parameter"], ascending=[False, True, True])
    else:
        df = df.sort_values(["location","parameter"])
    return df

def main():
    print(f"CITY: {CITY} @ ({CENTER_LAT}, {CENTER_LON})")
    df = fetch_latest_via_location_sensors(CENTER_LAT, CENTER_LON, RADIUS_M)
    if df.empty:
        print("[i] No latest readings found via v3 location sensors in this radius. Try increasing RADIUS_M.")
        return
    if DEBUG:
        print(df.head(12))
    df.to_csv(OUT_CSV, index=False)
    print(f"[✓] wrote {len(df)} rows → {OUT_CSV}")

if __name__ == "__main__":
    main()


CITY: New York @ (40.7128, -74.006)
[locations] page1 got 57 rows
                       ts_utc         parameter parameter_display  \
111 2025-10-03 10:00:00+00:00               pm1               PM1   
109 2025-10-03 10:00:00+00:00              pm25             PM2.5   
108 2025-10-03 10:00:00+00:00  relativehumidity                RH   
113 2025-10-03 10:00:00+00:00       temperature   Temperature (C)   
110 2025-10-03 10:00:00+00:00             um003       PM0.3 count   
156 2025-10-03 10:00:00+00:00               pm1               PM1   
157 2025-10-03 10:00:00+00:00              pm25             PM2.5   
158 2025-10-03 10:00:00+00:00  relativehumidity                RH   
159 2025-10-03 10:00:00+00:00       temperature   Temperature (C)   
160 2025-10-03 10:00:00+00:00             um003       PM0.3 count   
84  2025-10-03 10:00:00+00:00               pm1               PM1   
89  2025-10-03 10:00:00+00:00              pm25             PM2.5   

           value           unit     

In [7]:
import requests, pandas as pd, time, math, random
from datetime import datetime, timezone
from urllib3.util.retry import Retry      # ✅ correct path
from requests.adapters import HTTPAdapter

# =========== SETTINGS ===========
DEBUG = True
CITY = "New York"
CITY_COORDS = {
    "San Francisco": (37.7749, -122.4194),
    "Los Angeles":   (34.0522, -118.2437),
    "New York":      (40.7128, -74.0060),
    "Chicago":       (41.8781, -87.6298),
    "Houston":       (29.7604, -95.3698),
    "Toronto":       (43.6532, -79.3832),
    "Vancouver":     (49.2827, -123.1207),
}
CENTER_LAT, CENTER_LON = CITY_COORDS[CITY]

RADIUS_M = 25000
LOC_MAX = 200
PER_PAGE = 100
SLEEP = 0.15
# ⬇️ KEEP EVERYTHING: set to None (no filtering by parameter)
POLLUTANTS = None

OUT_CSV = "openaq_latest_city_v3_ALL_PARAMS.csv"        # long
OUT_CSV_WIDE = "openaq_latest_city_v3_ALL_PARAMS_wide.csv"  # wide (one row per lat,lon)
COORD_DECIMALS = 5        # rounding used to merge very-close coordinates
INCLUDE_UNIT_COLS = True  # add a column per parameter with suffix _unit

API_KEY = "e78f079d1ae5390a89f428671ff1349bdefdee203a74f0aff047cee62474901d"
BASE = "https://api.openaq.org/v3"
HDRS = {"Accept": "application/json", "X-API-Key": API_KEY}

def make_session():
    s = requests.Session()
    retries = Retry(
        total=6, connect=6, read=6, backoff_factor=0.8,
        respect_retry_after_header=True,
        status_forcelist=[408,429,500,502,503,504],
        allowed_methods=["GET"],
    )
    adapter = HTTPAdapter(max_retries=retries, pool_connections=10, pool_maxsize=10)
    s.mount("https://", adapter); s.mount("http://", adapter)
    return s

SESSION = make_session()

def get_with_backoff(url, params=None, timeout=30, max_attempts=6):
    attempt = 0
    while True:
        attempt += 1
        r = SESSION.get(url, headers=HDRS, params=params, timeout=timeout)
        if r.status_code in (429,500,502,503,504) and attempt < max_attempts:
            ra = r.headers.get("Retry-After")
            try:
                sleep_s = float(ra) if ra is not None else min(20.0, (2 ** attempt) + random.uniform(0, 1.0))
            except Exception:
                sleep_s = min(20.0, (2 ** attempt) + random.uniform(0, 1.0))
            if DEBUG:
                print(f"[backoff] {r.status_code} {url} — sleeping {sleep_s:.1f}s (attempt {attempt}/{max_attempts})")
            time.sleep(sleep_s)
            continue
        r.raise_for_status()
        return r

def bbox_from_center(lat, lon, radius_m):
    km = radius_m / 1000.0
    dlat = km / 111.0
    dlon = km / (111.0 * max(0.1, math.cos(math.radians(lat))))
    xmin, ymin = lon - dlon, lat - dlat
    xmax, ymax = lon + dlon, lat + dlat
    return f"{xmin},{ymin},{xmax},{ymax}"

def norm_param(x):
    if x is None: return None
    if isinstance(x, dict):
        return str(x.get("name","")).lower() or None
    return str(x).lower()

def list_locations_bbox(lat, lon, radius_m, per_page=PER_PAGE, limit_locations=LOC_MAX):
    url = f"{BASE}/locations"
    bbox = bbox_from_center(lat, lon, radius_m)
    rows, page = [], 1
    while len(rows) < limit_locations:
        params = {"bbox": bbox, "limit": per_page, "page": page}
        r = get_with_backoff(url, params=params, timeout=30)
        res = r.json().get("results", []) or []
        rows.extend(res)
        if DEBUG and page == 1:
            print(f"[locations] page1 got {len(res)} rows")
        if len(res) < per_page: break
        page += 1
        time.sleep(SLEEP)
    if not rows: return pd.DataFrame()
    df = pd.json_normalize(rows)
    keep = ["id","name","coordinates.latitude","coordinates.longitude","city","country","timezone"]
    for c in keep:
        if c not in df.columns: df[c] = pd.NA
    df = df.rename(columns={
        "id":"locations_id",
        "coordinates.latitude":"lat",
        "coordinates.longitude":"lon",
    })
    df = df.dropna(subset=["lat","lon"]).reset_index(drop=True)
    df = df[["locations_id","name","lat","lon","city","country","timezone"]].head(limit_locations)
    return df

def list_sensors_for_location(loc_id):
    url = f"{BASE}/locations/{int(loc_id)}/sensors"
    rows, page = [], 1
    while True:
        params = {"limit": PER_PAGE, "page": page}
        r = get_with_backoff(url, params=params, timeout=30)
        res = r.json().get("results", []) or []
        rows.extend(res)
        if len(res) < PER_PAGE: break
        page += 1
        time.sleep(SLEEP)
    if not rows: return pd.DataFrame()
    return pd.json_normalize(rows)

def fetch_latest_via_location_sensors(center_lat, center_lon, radius_m):
    locs = list_locations_bbox(center_lat, center_lon, radius_m)
    if locs.empty:
        print("[i] No locations in bbox.")
        return pd.DataFrame()

    flats = []
    for _, loc in locs.iterrows():
        sensors = list_sensors_for_location(loc["locations_id"])
        if sensors.empty:
            continue
        for _, s in sensors.iterrows():
            # parameter (keep ALL)
            if "parameter.name" in s:
                parameter = norm_param(s["parameter.name"])
                parameter_display = s.get("parameter.displayName")
            else:
                parameter = norm_param(s.get("parameter"))
                parameter_display = None

            # optional filter
            if POLLUTANTS and parameter not in POLLUTANTS:
                continue

            # value/unit/timestamp
            val = s.get("latest.value")
            unit = s.get("parameter.units") or s.get("unit")

            ts = (s.get("latest.datetime.utc") or
                  s.get("latest.datetime.local") or
                  s.get("datetimeLast.utc") or
                  s.get("datetimeLast.local"))
            ts_utc = pd.to_datetime(ts, errors="coerce", utc=True) if isinstance(ts, str) else pd.NaT

            # coords
            lat = s.get("latest.coordinates.latitude")
            lon = s.get("latest.coordinates.longitude")
            if pd.isna(lat) or pd.isna(lon):
                lat = loc["lat"]; lon = loc["lon"]

            flats.append({
                "ts_utc": ts_utc,
                "parameter": parameter,
                "parameter_display": parameter_display,
                "value": pd.to_numeric(val, errors="coerce"),
                "unit": unit,
                "lat": pd.to_numeric(lat, errors="coerce"),
                "lon": pd.to_numeric(lon, errors="coerce"),
                "sensor_id": s.get("id"),
                "locationId": int(loc["locations_id"]),
                "location": loc["name"],
                "city": loc.get("city"),
                "country": loc.get("country"),
            })

    df = pd.DataFrame(flats)
    if df.empty:
        return df
    df = df.dropna(subset=["value","lat","lon"])
    if df["ts_utc"].notna().any():
        df = df.sort_values(["ts_utc","location","parameter"], ascending=[False, True, True])
    else:
        df = df.sort_values(["location","parameter"])
    return df

# ---- Pivot to one row per (lat, lon) with parameter columns ----
def to_wide_by_point(df: pd.DataFrame, decimals: int = COORD_DECIMALS, include_units: bool = INCLUDE_UNIT_COLS) -> pd.DataFrame:
    if df.empty:
        return df

    d = df.copy()
    d["lat_r"] = d["lat"].round(decimals)
    d["lon_r"] = d["lon"].round(decimals)

    # normalize time for ordering; missing timestamps go very old
    ts = pd.to_datetime(d.get("ts_utc"), errors="coerce", utc=True)
    d["_ts"] = ts.fillna(pd.Timestamp("1970-01-01", tz="UTC"))

    # keep latest per (point, parameter)
    d = d.sort_values("_ts")
    latest = d.groupby(["lat_r","lon_r","parameter"], as_index=False).tail(1)

    # representative timestamp per point
    point_ts = latest.groupby(["lat_r","lon_r"], as_index=False)["_ts"].max().rename(columns={"_ts":"ts_point"})

    # optional representative metadata (mode / most recent)
    meta_cols = []
    for mc in ["locationId","location","city","country"]:
        if mc in latest.columns:
            meta_cols.append(mc)

    # pivot values
    wide_val = latest.pivot_table(index=["lat_r","lon_r"], columns="parameter", values="value", aggfunc="first")

    if include_units:
        latest["param_unit"] = latest["parameter"] + "_unit"
        wide_unit = latest.pivot_table(index=["lat_r","lon_r"], columns="param_unit", values="unit", aggfunc="first")
        wide = pd.concat([wide_val, wide_unit], axis=1)
    else:
        wide = wide_val

    wide = wide.merge(point_ts, left_index=True, right_on=["lat_r","lon_r"], how="left")

    # attach metadata
    for mc in meta_cols:
        meta = (latest.groupby(["lat_r","lon_r"])[mc]
                    .agg(lambda s: s.mode().iloc[0] if not s.mode().empty else s.iloc[-1]))
        wide = wide.join(meta, on=["lat_r","lon_r"])

    # finalize columns/order
    wide = wide.reset_index(drop=True)
    param_cols = sorted([c for c in wide.columns if c not in {"lat_r","lon_r","ts_point","locationId","location","city","country"} and not str(c).endswith("_unit")])
    unit_cols  = sorted([c for c in wide.columns if str(c).endswith("_unit")])
    ordered = (["lat_r","lon_r","ts_point","locationId","location","city","country"] +
               param_cols + unit_cols)
    ordered = [c for c in ordered if c in wide.columns]
    wide = wide[ordered].rename(columns={"lat_r":"lat","lon_r":"lon","ts_point":"ts_utc"})
    return wide

def main():
    print(f"CITY: {CITY} @ ({CENTER_LAT}, {CENTER_LON})")
    df = fetch_latest_via_location_sensors(CENTER_LAT, CENTER_LON, RADIUS_M)
    if df.empty:
        print("[i] No latest readings found via v3 location sensors in this radius. Try increasing RADIUS_M.")
        return

    if DEBUG:
        print(df.head(12))

    # save long
    df.to_csv(OUT_CSV, index=False)
    print(f"[✓] wrote long: {len(df)} rows → {OUT_CSV}")

    # save wide (one row per lat,lon)
    wide = to_wide_by_point(df)
    print(f"[✓] wide shape: {wide.shape}")
    if DEBUG:
        print(wide.head(10))
    wide.to_csv(OUT_CSV_WIDE, index=False)
    print(f"[✓] wrote wide → {OUT_CSV_WIDE}")

if __name__ == "__main__":
    main()


CITY: New York @ (40.7128, -74.006)
[locations] page1 got 57 rows
                       ts_utc         parameter parameter_display       value  \
111 2025-10-03 10:00:00+00:00               pm1               PM1    0.888899   
109 2025-10-03 10:00:00+00:00              pm25             PM2.5    1.578869   
108 2025-10-03 10:00:00+00:00  relativehumidity                RH   41.401359   
113 2025-10-03 10:00:00+00:00       temperature   Temperature (C)   17.600000   
110 2025-10-03 10:00:00+00:00             um003       PM0.3 count  389.876091   
45  2025-10-03 10:00:00+00:00                no                NO    0.017000   
44  2025-10-03 10:00:00+00:00               no2               NO₂    0.034000   
46  2025-10-03 10:00:00+00:00               nox               NOx    0.052000   
43  2025-10-03 10:00:00+00:00                o3                O₃    0.001000   
156 2025-10-03 10:00:00+00:00               pm1               PM1    5.616333   
157 2025-10-03 10:00:00+00:00              

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  latest["param_unit"] = latest["parameter"] + "_unit"


Enter your Earthdata Login username (saved locally in netrc):
Username: mahdinasa
Password (input hidden): ········
[✓] Wrote credentials to C:\Users\Lenovo\_netrc


[i] Opening SLV...
[skip] could not open https://goldsmr4.gesdisc.eosdis.nasa.gov/opendap/MERRA2/M2T1NXSLV.5.12.4/2025/09/MERRA2_400.t1nxslv.20250926.nc4: found the following matches with the input file in xarray's IO backends: ['netcdf4', 'pydap']. But their dependencies may not be installed, see:
https://docs.xarray.dev/en/stable/user-guide/io.html 
https://docs.xarray.dev/en/stable/getting-started-guide/installing.html
[skip] could not open https://goldsmr4.gesdisc.eosdis.nasa.gov/opendap/MERRA2/M2T1NXSLV.5.12.4/2025/09/MERRA2_400.t1nxslv.20250927.nc4: found the following matches with the input file in xarray's IO backends: ['netcdf4', 'pydap']. But their dependencies may not be installed, see:
https://docs.xarray.dev/en/stable/user-guide/io.html 
https://docs.xarray.dev/en/stable/getting-started-guide/installing.html
[skip] could not open https://goldsmr4.gesdisc.eosdis.nasa.gov/opendap/MERRA2/M2T1NXSLV.5.12.4/2025/09/MERRA2_400.t1nxslv.20250928.nc4: found the following matches wit



C:\Users\Lenovo\anaconda3\python.exe


Collecting xarray
  Obtaining dependency information for xarray from https://files.pythonhosted.org/packages/0e/a7/6eeb32e705d510a672f74135f538ad27f87f3d600845bfd3834ea3a77c7e/xarray-2025.9.1-py3-none-any.whl.metadata
  Downloading xarray-2025.9.1-py3-none-any.whl.metadata (12 kB)
Collecting netCDF4
  Obtaining dependency information for netCDF4 from https://files.pythonhosted.org/packages/cf/ba/d26e8278ad8a2306580bab076b6d64cd16459a60e632e6c1a9cbb68dd3d9/netCDF4-1.7.2-cp311-cp311-win_amd64.whl.metadata
  Downloading netCDF4-1.7.2-cp311-cp311-win_amd64.whl.metadata (1.8 kB)
Collecting pydap
  Obtaining dependency information for pydap from https://files.pythonhosted.org/packages/3b/61/df917dc9e5190fc3498741710e2143c02aef458f232c69c6a790107a4beb/pydap-3.5.7-py3-none-any.whl.metadata
  Downloading pydap-3.5.7-py3-none-any.whl.metadata (9.1 kB)
Collecting cftime
  Obtaining dependency information for cftime from https://files.pythonhosted.org/packages/79/b1/6551603f8ea31de55913c84e4def3c3

ERROR: Could not install packages due to an OSError: [WinError 5] Accès refusé: 'C:\\Users\\Lenovo\\AppData\\Local\\Temp\\pip-uninstall-5oc1vcpu\\core\\_multiarray_tests.cp311-win_amd64.pyd'
Consider using the `--user` option or check the permissions.



^C

Note: you may need to restart the kernel to use updated packages.


In [3]:
import requests, pandas as pd, time, math, random
from datetime import datetime, timezone, timedelta
from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
from math import ceil

# =========== SETTINGS ===========
DEBUG = True
CITY = "New York"
CITY_COORDS = {
    "San Francisco": (37.7749, -122.4194),
    "Los Angeles":   (34.0522, -118.2437),
    "New York":      (40.7128, -74.0060),
    "Chicago":       (41.8781, -87.6298),
    "Houston":       (29.7604, -95.3698),
    "Toronto":       (43.6532, -79.3832),
    "Vancouver":     (49.2827, -123.1207),
}
CENTER_LAT, CENTER_LON = CITY_COORDS[CITY]

RADIUS_M = 25000
PER_PAGE  = 100          # OpenAQ v3 max=100
SLEEP     = 0.15
LOC_MAX   = 200

# ⬇️ Keep ALL pollutants; or e.g. ["pm25","pm10","no2","o3","so2","co"]
POLLUTANTS = None

# Last month window
NOW_UTC   = datetime.now(timezone.utc)
DATE_TO   = NOW_UTC
DATE_FROM = NOW_UTC - timedelta(days=30)

OUT_CSV   = f"openaq_{CITY.replace(' ','_').lower()}_bbox_last_month_hourly.csv"

API_KEY = "e78f079d1ae5390a89f428671ff1349bdefdee203a74f0aff047cee62474901d"
BASE   = "https://api.openaq.org/v3"
HDRS   = {"Accept": "application/json", "X-API-Key": API_KEY}

# --- Session: disable status-based auto-retries; we handle them ourselves
def make_session():
    s = requests.Session()
    retries = Retry(
        total=0,                 # no automatic status retries
        connect=6, read=6,       # keep transient network retries
        backoff_factor=0.0,
        respect_retry_after_header=True,
        status_forcelist=[],     # empty: we manage 408/429/5xx manually
        allowed_methods=["GET"],
        raise_on_status=False,
    )
    adapter = HTTPAdapter(max_retries=retries, pool_connections=10, pool_maxsize=10)
    s.mount("https://", adapter); s.mount("http://", adapter)
    return s

SESSION = make_session()

# --- Our backoff: handles 408/429/500/502/503/504 with Retry-After + jitter
def get_with_backoff(url, params=None, timeout=60, max_attempts=7):
    attempt = 0
    while True:
        attempt += 1
        r = SESSION.get(url, headers=HDRS, params=params, timeout=timeout)

        if r.status_code in (408, 429, 500, 502, 503, 504) and attempt < max_attempts:
            ra = r.headers.get("Retry-After")
            try:
                sleep_s = float(ra) if ra is not None else min(30.0, (2 ** attempt) + random.uniform(0, 1.0))
            except Exception:
                sleep_s = min(30.0, (2 ** attempt) + random.uniform(0, 1.0))
            if DEBUG:
                print(f"[backoff] {r.status_code} {url} — sleeping {sleep_s:.1f}s (attempt {attempt}/{max_attempts})")
            time.sleep(sleep_s)
            continue

        r.raise_for_status()
        return r

def bbox_from_center(lat, lon, radius_m):
    km = radius_m / 1000.0
    dlat = km / 111.0
    dlon = km / (111.0 * max(0.1, math.cos(math.radians(lat))))
    xmin, ymin = lon - dlon, lat - dlat
    xmax, ymax = lon + dlon, lat + dlat
    return f"{xmin},{ymin},{xmax},{ymax}"

def norm_param(x):
    if x is None: return None
    if isinstance(x, dict):
        return str(x.get("name","")).lower() or None
    return str(x).lower()

# ---------- Helpers to discover locations & sensors in bbox ----------
def list_locations_bbox(lat, lon, radius_m, per_page=PER_PAGE, limit_locations=LOC_MAX):
    url = f"{BASE}/locations"
    bbox = bbox_from_center(lat, lon, radius_m)
    rows, page = [], 1
    while len(rows) < limit_locations:
        params = {"bbox": bbox, "limit": per_page, "page": page}
        r = get_with_backoff(url, params=params, timeout=30)
        res = r.json().get("results", []) or []
        rows.extend(res)
        if DEBUG and page == 1:
            print(f"[locations] page1 got {len(res)} rows")
        if len(res) < per_page: break
        page += 1
        time.sleep(SLEEP)
    if not rows: return pd.DataFrame()
    df = pd.json_normalize(rows)
    keep = ["id","name","coordinates.latitude","coordinates.longitude","city","country","timezone"]
    for c in keep:
        if c not in df.columns: df[c] = pd.NA
    df = df.rename(columns={
        "id":"locations_id",
        "coordinates.latitude":"lat",
        "coordinates.longitude":"lon",
    })
    df = df.dropna(subset=["lat","lon"]).reset_index(drop=True)
    df = df[["locations_id","name","lat","lon","city","country","timezone"]].head(limit_locations)
    return df

def list_sensors_for_location(loc_id):
    url = f"{BASE}/locations/{int(loc_id)}/sensors"
    rows, page = [], 1
    while True:
        params = {"limit": PER_PAGE, "page": page}
        r = get_with_backoff(url, params=params, timeout=30)
        res = r.json().get("results", []) or []
        rows.extend(res)
        if len(res) < PER_PAGE: break
        page += 1
        time.sleep(SLEEP)
    if not rows: return pd.DataFrame()
    return pd.json_normalize(rows)

# ---------- Fetch last month of HOURLY data per sensor (robust) ----------
def fetch_last_month_hourly_for_bbox(center_lat, center_lon, radius_m, date_from, date_to, per_page=PER_PAGE):
    locs = list_locations_bbox(center_lat, center_lon, radius_m)
    if locs.empty:
        print("[i] No locations in bbox.")
        return pd.DataFrame()

    pollutants_set = set([p.lower() for p in POLLUTANTS]) if POLLUTANTS else None
    all_rows = []

    # pages needed for the window (hourly, per_page rows per page)
    hours_span = max(1, int((date_to - date_from).total_seconds() // 3600))
    max_pages_needed = ceil(hours_span / per_page) + 1  # +1 slack

    for _, loc in locs.iterrows():
        sensors = list_sensors_for_location(loc["locations_id"])
        if sensors.empty:
            continue

        for _, s in sensors.iterrows():
            sensor_id = s.get("id")
            if pd.isna(sensor_id):
                continue

            # Early skip by pollutant at sensor level (if available)
            if pollutants_set:
                sn = (s.get("parameter.name") or s.get("parameter") or "")
                if isinstance(sn, dict):  # sometimes parameter is dict
                    sn = sn.get("name", "")
                if (sn or "").lower() not in pollutants_set:
                    continue

            page = 1
            while True:
                url = f"{BASE}/sensors/{int(sensor_id)}/hours"
                params = {
                    "limit": per_page,
                    "page": page,
                    "date_from": date_from.strftime("%Y-%m-%dT%H:%M:%SZ"),
                    "date_to":   date_to.strftime("%Y-%m-%dT%H:%M:%SZ"),
                    "sort": "asc",  # oldest→newest
                }

                try:
                    r = get_with_backoff(url, params=params, timeout=60)
                except requests.HTTPError as e:
                    if DEBUG:
                        print(f"[warn] sensor {sensor_id} page {page} HTTPError: {e}")
                    break
                except requests.RequestException as e:
                    if DEBUG:
                        print(f"[warn] sensor {sensor_id} page {page} RequestException: {e}")
                    break

                res = r.json().get("results", []) or []
                if not res:
                    break

                for rec in res:
                    value = pd.to_numeric(rec.get("value"), errors="coerce")
                    param_obj = rec.get("parameter") or {}
                    parameter = (param_obj.get("name") or "").lower() or None
                    unit = param_obj.get("units")

                    if pollutants_set and (parameter not in pollutants_set):
                        continue

                    period = rec.get("period") or {}
                    dt_from = (period.get("datetimeFrom") or {}).get("utc")
                    ts_utc = pd.to_datetime(dt_from, errors="coerce", utc=True) if isinstance(dt_from, str) else pd.NaT

                    # coords: prefer sensor latest coords, else fallback to location coords
                    lat = s.get("latest.coordinates.latitude", loc["lat"])
                    lon = s.get("latest.coordinates.longitude", loc["lon"])

                    all_rows.append({
                        "ts_utc": ts_utc,
                        "parameter": parameter,
                        "value": value,
                        "unit": unit,
                        "lat": pd.to_numeric(lat, errors="coerce"),
                        "lon": pd.to_numeric(lon, errors="coerce"),
                        "sensor_id": sensor_id,
                        "locationId": int(loc["locations_id"]),
                        "location": loc["name"],
                        "city": loc.get("city"),
                        "country": loc.get("country"),
                    })

                # pagination guards
                if len(res) < per_page:
                    break
                page += 1
                if page > max_pages_needed:
                    if DEBUG:
                        print(f"[guard] stop sensor {sensor_id} at page {page-1}/{max_pages_needed} for {hours_span}h window")
                    break
                time.sleep(SLEEP)

    df = pd.DataFrame(all_rows)
    if df.empty:
        return df
    df = df.dropna(subset=["value"])
    if df["ts_utc"].notna().any():
        df = df.sort_values(["ts_utc","location","parameter"]).reset_index(drop=True)
    return df

def main():
    print(f"CITY: {CITY} @ ({CENTER_LAT}, {CENTER_LON})")
    print(f"Window (UTC): {DATE_FROM.strftime('%Y-%m-%d %H:%M:%S')}  →  {DATE_TO.strftime('%Y-%m-%d %H:%M:%S')}")
    df = fetch_last_month_hourly_for_bbox(CENTER_LAT, CENTER_LON, RADIUS_M, DATE_FROM, DATE_TO)
    if df.empty:
        print("[i] No hourly values found. Try increasing RADIUS_M or widening dates.")
        return

    if DEBUG:
        print(df.head(12))
        print(f"[i] rows fetched: {len(df)}")

    df.to_csv(OUT_CSV, index=False)
    print(f"[✓] wrote {len(df)} rows → {OUT_CSV}")

if __name__ == "__main__":
    main()


CITY: New York @ (40.7128, -74.006)
Window (UTC): 2025-09-04 11:57:26  →  2025-10-04 11:57:26
[locations] page1 got 57 rows
[guard] stop sensor 673 at page 9/9 for 720h window
[guard] stop sensor 671 at page 9/9 for 720h window
[guard] stop sensor 674 at page 9/9 for 720h window
[guard] stop sensor 1097 at page 9/9 for 720h window
[guard] stop sensor 1102 at page 9/9 for 720h window
[guard] stop sensor 1099 at page 9/9 for 720h window
[guard] stop sensor 1098 at page 9/9 for 720h window
[guard] stop sensor 1103 at page 9/9 for 720h window
[guard] stop sensor 1152 at page 9/9 for 720h window
[guard] stop sensor 1106 at page 9/9 for 720h window
[guard] stop sensor 1121 at page 9/9 for 720h window
[guard] stop sensor 1128 at page 9/9 for 720h window
[guard] stop sensor 1143 at page 9/9 for 720h window
[guard] stop sensor 1145 at page 9/9 for 720h window
[guard] stop sensor 1146 at page 9/9 for 720h window
[guard] stop sensor 1147 at page 9/9 for 720h window
[guard] stop sensor 1522 at pag

[guard] stop sensor 13071826 at page 9/9 for 720h window
[guard] stop sensor 13071827 at page 9/9 for 720h window
[guard] stop sensor 13071828 at page 9/9 for 720h window
[guard] stop sensor 13193873 at page 9/9 for 720h window
[guard] stop sensor 13193874 at page 9/9 for 720h window
[guard] stop sensor 13193875 at page 9/9 for 720h window
[guard] stop sensor 13193876 at page 9/9 for 720h window
[guard] stop sensor 13193877 at page 9/9 for 720h window
[guard] stop sensor 13193868 at page 9/9 for 720h window
[guard] stop sensor 13193869 at page 9/9 for 720h window
[guard] stop sensor 13193870 at page 9/9 for 720h window
[guard] stop sensor 13193871 at page 9/9 for 720h window
[guard] stop sensor 13193872 at page 9/9 for 720h window
[guard] stop sensor 13322774 at page 9/9 for 720h window
[guard] stop sensor 13322775 at page 9/9 for 720h window
[guard] stop sensor 13322776 at page 9/9 for 720h window
[guard] stop sensor 13322777 at page 9/9 for 720h window
[guard] stop sensor 13322778 at

In [4]:
import requests, pandas as pd, time, math, random
from datetime import datetime, timezone, timedelta
from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
from math import ceil

# =========== SETTINGS ===========
DEBUG = True
CITY = "New York"
CITY_COORDS = {
    "San Francisco": (37.7749, -122.4194),
    "Los Angeles":   (34.0522, -118.2437),
    "New York":      (40.7128, -74.0060),
    "Chicago":       (41.8781, -87.6298),
    "Houston":       (29.7604, -95.3698),
    "Toronto":       (43.6532, -79.3832),
    "Vancouver":     (49.2827, -123.1207),
}
CENTER_LAT, CENTER_LON = CITY_COORDS[CITY]

RADIUS_M = 25000
PER_PAGE  = 100          # OpenAQ v3 max=100
SLEEP     = 0.15
LOC_MAX   = 200

# ⬇️ Keep ALL pollutants; or e.g. ["pm25","pm10","no2","o3","so2","co"]
POLLUTANTS = None

# Last month window
NOW_UTC   = datetime.now(timezone.utc)
DATE_TO   = NOW_UTC
DATE_FROM = NOW_UTC - timedelta(days=30)

# Outputs
OUT_CSV_LONG = f"openaq_{CITY.replace(' ','_').lower()}_bbox_last_month_hourly_long.csv"   # hour x location x parameter
OUT_CSV_WIDE = f"openaq_{CITY.replace(' ','_').lower()}_bbox_last_month_hourly_wide.csv"   # hour x point, columns per parameter
COORD_DECIMALS = 5
INCLUDE_UNIT_COLS = True

API_KEY = "e78f079d1ae5390a89f428671ff1349bdefdee203a74f0aff047cee62474901d"
BASE   = "https://api.openaq.org/v3"
HDRS   = {"Accept": "application/json", "X-API-Key": API_KEY}

# --- Session: disable status-based auto-retries; we handle them ourselves
def make_session():
    s = requests.Session()
    retries = Retry(
        total=0,                 # no automatic status retries
        connect=6, read=6,       # keep transient network retries
        backoff_factor=0.0,
        respect_retry_after_header=True,
        status_forcelist=[],     # empty: we manage 408/429/5xx manually
        allowed_methods=["GET"],
        raise_on_status=False,
    )
    adapter = HTTPAdapter(max_retries=retries, pool_connections=10, pool_maxsize=10)
    s.mount("https://", adapter); s.mount("http://", adapter)
    return s

SESSION = make_session()

# --- Our backoff: handles 408/429/500/502/503/504 with Retry-After + jitter
def get_with_backoff(url, params=None, timeout=60, max_attempts=7):
    attempt = 0
    while True:
        attempt += 1
        r = SESSION.get(url, headers=HDRS, params=params, timeout=timeout)

        if r.status_code in (408, 429, 500, 502, 503, 504) and attempt < max_attempts:
            ra = r.headers.get("Retry-After")
            try:
                sleep_s = float(ra) if ra is not None else min(30.0, (2 ** attempt) + random.uniform(0, 1.0))
            except Exception:
                sleep_s = min(30.0, (2 ** attempt) + random.uniform(0, 1.0))
            if DEBUG:
                print(f"[backoff] {r.status_code} {url} — sleeping {sleep_s:.1f}s (attempt {attempt}/{max_attempts})")
            time.sleep(sleep_s)
            continue

        r.raise_for_status()
        return r

def bbox_from_center(lat, lon, radius_m):
    km = radius_m / 1000.0
    dlat = km / 111.0
    dlon = km / (111.0 * max(0.1, math.cos(math.radians(lat))))
    xmin, ymin = lon - dlon, lat - dlat
    xmax, ymax = lon + dlon, lat + dlat
    return f"{xmin},{ymin},{xmax},{ymax}"

def norm_param(x):
    if x is None: return None
    if isinstance(x, dict):
        return str(x.get("name","")).lower() or None
    return str(x).lower()

# ---------- Helpers to discover locations & sensors in bbox ----------
def list_locations_bbox(lat, lon, radius_m, per_page=PER_PAGE, limit_locations=LOC_MAX):
    url = f"{BASE}/locations"
    bbox = bbox_from_center(lat, lon, radius_m)
    rows, page = [], 1
    while len(rows) < limit_locations:
        params = {"bbox": bbox, "limit": per_page, "page": page}
        r = get_with_backoff(url, params=params, timeout=30)
        res = r.json().get("results", []) or []
        rows.extend(res)
        if DEBUG and page == 1:
            print(f"[locations] page1 got {len(res)} rows")
        if len(res) < per_page: break
        page += 1
        time.sleep(SLEEP)
    if not rows: return pd.DataFrame()
    df = pd.json_normalize(rows)
    keep = ["id","name","coordinates.latitude","coordinates.longitude","city","country","timezone"]
    for c in keep:
        if c not in df.columns: df[c] = pd.NA
    df = df.rename(columns={
        "id":"locations_id",
        "coordinates.latitude":"lat",
        "coordinates.longitude":"lon",
    })
    df = df.dropna(subset=["lat","lon"]).reset_index(drop=True)
    df = df[["locations_id","name","lat","lon","city","country","timezone"]].head(limit_locations)
    return df

def list_sensors_for_location(loc_id):
    url = f"{BASE}/locations/{int(loc_id)}/sensors"
    rows, page = [], 1
    while True:
        params = {"limit": PER_PAGE, "page": page}
        r = get_with_backoff(url, params=params, timeout=30)
        res = r.json().get("results", []) or []
        rows.extend(res)
        if len(res) < PER_PAGE: break
        page += 1
        time.sleep(SLEEP)
    if not rows: return pd.DataFrame()
    return pd.json_normalize(rows)

# ---------- Fetch last month of HOURLY data per sensor (robust) ----------
def fetch_last_month_hourly_for_bbox(center_lat, center_lon, radius_m, date_from, date_to, per_page=PER_PAGE):
    locs = list_locations_bbox(center_lat, center_lon, radius_m)
    if locs.empty:
        print("[i] No locations in bbox.")
        return pd.DataFrame()

    pollutants_set = set([p.lower() for p in POLLUTANTS]) if POLLUTANTS else None
    all_rows = []

    # pages needed for the window (hourly, per_page rows per page)
    hours_span = max(1, int((date_to - date_from).total_seconds() // 3600))
    max_pages_needed = ceil(hours_span / per_page) + 1  # +1 slack

    for _, loc in locs.iterrows():
        sensors = list_sensors_for_location(loc["locations_id"])
        if sensors.empty:
            continue

        for _, s in sensors.iterrows():
            sensor_id = s.get("id")
            if pd.isna(sensor_id):
                continue

            # Early skip by pollutant at sensor level (if available)
            if pollutants_set:
                sn = (s.get("parameter.name") or s.get("parameter") or "")
                if isinstance(sn, dict):
                    sn = sn.get("name", "")
                if (sn or "").lower() not in pollutants_set:
                    continue

            page = 1
            while True:
                url = f"{BASE}/sensors/{int(sensor_id)}/hours"
                params = {
                    "limit": per_page,
                    "page": page,
                    "date_from": date_from.strftime("%Y-%m-%dT%H:%M:%SZ"),
                    "date_to":   date_to.strftime("%Y-%m-%dT%H:%M:%SZ"),
                    "sort": "asc",  # oldest→newest
                }

                try:
                    r = get_with_backoff(url, params=params, timeout=60)
                except requests.HTTPError as e:
                    if DEBUG:
                        print(f"[warn] sensor {sensor_id} page {page} HTTPError: {e}")
                    break
                except requests.RequestException as e:
                    if DEBUG:
                        print(f"[warn] sensor {sensor_id} page {page} RequestException: {e}")
                    break

                res = r.json().get("results", []) or []
                if not res:
                    break

                for rec in res:
                    value = pd.to_numeric(rec.get("value"), errors="coerce")
                    param_obj = rec.get("parameter") or {}
                    parameter = (param_obj.get("name") or "").lower() or None
                    unit = param_obj.get("units")

                    if pollutants_set and (parameter not in pollutants_set):
                        continue

                    period = rec.get("period") or {}
                    dt_from = (period.get("datetimeFrom") or {}).get("utc")
                    ts_utc = pd.to_datetime(dt_from, errors="coerce", utc=True) if isinstance(dt_from, str) else pd.NaT

                    # coords: prefer sensor latest coords, else fallback to location coords
                    lat = s.get("latest.coordinates.latitude", loc["lat"])
                    lon = s.get("latest.coordinates.longitude", loc["lon"])

                    all_rows.append({
                        "ts_utc": ts_utc,
                        "parameter": parameter,
                        "value": value,
                        "unit": unit,
                        "lat": pd.to_numeric(lat, errors="coerce"),
                        "lon": pd.to_numeric(lon, errors="coerce"),
                        "sensor_id": sensor_id,
                        "locationId": int(loc["locations_id"]),
                        "location": loc["name"],
                        "city": loc.get("city"),
                        "country": loc.get("country"),
                    })

                # pagination guards
                if len(res) < per_page:
                    break
                page += 1
                if page > max_pages_needed:
                    if DEBUG:
                        print(f"[guard] stop sensor {sensor_id} at page {page-1}/{max_pages_needed} for {hours_span}h window")
                    break
                time.sleep(SLEEP)

    df = pd.DataFrame(all_rows)
    if df.empty:
        return df
    df = df.dropna(subset=["value"])
    if df["ts_utc"].notna().any():
        df = df.sort_values(["ts_utc","location","parameter"]).reset_index(drop=True)
    return df

# ---- Pivot to one row per (hour, rounded lat, rounded lon) with parameter columns
def to_wide_by_point_hourly(df: pd.DataFrame, decimals: int = COORD_DECIMALS, include_units: bool = INCLUDE_UNIT_COLS) -> pd.DataFrame:
    if df.empty:
        return df

    d = df.copy()
    d["ts_hour"] = pd.to_datetime(d["ts_utc"], utc=True).dt.floor("H")
    d["lat_r"] = d["lat"].round(decimals)
    d["lon_r"] = d["lon"].round(decimals)

    # If multiple sensors report the same parameter at the same site/hour → average their values
    agg = (d.groupby(["ts_hour","lat_r","lon_r","parameter","unit"], dropna=False)["value"]
             .mean()
             .reset_index())

    # Pivot values to columns per parameter
    wide_val = agg.pivot_table(index=["ts_hour","lat_r","lon_r"], columns="parameter", values="value", aggfunc="first")

    if include_units:
        # one unit per parameter per hour/point (just take the unit string)
        agg["param_unit"] = agg["parameter"] + "_unit"
        wide_unit = agg.pivot_table(index=["ts_hour","lat_r","lon_r"], columns="param_unit", values="unit", aggfunc="first")
        wide = pd.concat([wide_val, wide_unit], axis=1)
    else:
        wide = wide_val

    # Attach representative metadata (locationId/location/city/country) using most frequent at that point/hour
    meta_cols = [c for c in ["locationId","location","city","country"] if c in d.columns]
    for mc in meta_cols:
        meta = (d.groupby(["ts_hour","lat_r","lon_r"])[mc]
                  .agg(lambda s: s.mode().iloc[0] if not s.mode().empty else s.iloc[-1]))
        wide = wide.join(meta, on=["ts_hour","lat_r","lon_r"])

    wide = wide.reset_index()
    wide = wide.rename(columns={"ts_hour":"ts_utc","lat_r":"lat","lon_r":"lon"})
    # reorder
    param_cols = sorted([c for c in wide.columns if c not in {"ts_utc","lat","lon","locationId","location","city","country"} and not str(c).endswith("_unit")])
    unit_cols  = sorted([c for c in wide.columns if str(c).endswith("_unit")])
    ordered = (["ts_utc","lat","lon","locationId","location","city","country"] + param_cols + unit_cols)
    wide = wide[ordered]
    return wide

def main():
    print(f"CITY: {CITY} @ ({CENTER_LAT}, {CENTER_LON})")
    print(f"Window (UTC): {DATE_FROM.strftime('%Y-%m-%d %H:%M:%S')}  →  {DATE_TO.strftime('%Y-%m-%d %H:%M:%S')}")
    df = fetch_last_month_hourly_for_bbox(CENTER_LAT, CENTER_LON, RADIUS_M, DATE_FROM, DATE_TO)
    if df.empty:
        print("[i] No hourly values found. Try increasing RADIUS_M or widening dates.")
        return

    if DEBUG:
        print(df.head(12))
        print(f"[i] rows fetched (long): {len(df)}")

    # LONG: one line per (hour, location, parameter)
    df_long = (df.assign(ts_hour=lambda d: pd.to_datetime(d["ts_utc"], utc=True).dt.floor("H"))
                 .sort_values(["ts_hour","location","parameter"])
                 .reset_index(drop=True))
    df_long.to_csv(OUT_CSV_LONG, index=False)
    print(f"[✓] wrote long → {OUT_CSV_LONG}")

    # WIDE: one line per (hour, rounded lat/lon), columns per parameter (+ optional _unit)
    wide = to_wide_by_point_hourly(df)
    print(f"[✓] wide shape: {wide.shape}")
    if DEBUG:
        print(wide.head(10))
    wide.to_csv(OUT_CSV_WIDE, index=False)
    print(f"[✓] wrote wide → {OUT_CSV_WIDE}")

if __name__ == "__main__":
    main()


CITY: New York @ (40.7128, -74.006)
Window (UTC): 2025-09-04 12:38:06  →  2025-10-04 12:38:06
[locations] page1 got 57 rows
[guard] stop sensor 673 at page 9/9 for 720h window
[guard] stop sensor 671 at page 9/9 for 720h window
[guard] stop sensor 674 at page 9/9 for 720h window
[guard] stop sensor 1097 at page 9/9 for 720h window
[guard] stop sensor 1102 at page 9/9 for 720h window
[guard] stop sensor 1099 at page 9/9 for 720h window
[guard] stop sensor 1098 at page 9/9 for 720h window
[guard] stop sensor 1103 at page 9/9 for 720h window
[guard] stop sensor 1152 at page 9/9 for 720h window
[guard] stop sensor 1106 at page 9/9 for 720h window
[guard] stop sensor 1121 at page 9/9 for 720h window
[guard] stop sensor 1128 at page 9/9 for 720h window
[guard] stop sensor 1143 at page 9/9 for 720h window
[guard] stop sensor 1145 at page 9/9 for 720h window
[guard] stop sensor 1146 at page 9/9 for 720h window
[guard] stop sensor 1147 at page 9/9 for 720h window
[guard] stop sensor 1522 at pag

[guard] stop sensor 13071825 at page 9/9 for 720h window
[guard] stop sensor 13071826 at page 9/9 for 720h window
[guard] stop sensor 13071827 at page 9/9 for 720h window
[guard] stop sensor 13071828 at page 9/9 for 720h window
[guard] stop sensor 13193873 at page 9/9 for 720h window
[guard] stop sensor 13193874 at page 9/9 for 720h window
[guard] stop sensor 13193875 at page 9/9 for 720h window
[guard] stop sensor 13193876 at page 9/9 for 720h window
[guard] stop sensor 13193877 at page 9/9 for 720h window
[guard] stop sensor 13193868 at page 9/9 for 720h window
[guard] stop sensor 13193869 at page 9/9 for 720h window
[guard] stop sensor 13193870 at page 9/9 for 720h window
[guard] stop sensor 13193871 at page 9/9 for 720h window
[guard] stop sensor 13193872 at page 9/9 for 720h window
[guard] stop sensor 13322774 at page 9/9 for 720h window
[guard] stop sensor 13322775 at page 9/9 for 720h window
[guard] stop sensor 13322776 at page 9/9 for 720h window
[guard] stop sensor 13322777 at

In [5]:
import pandas as pd
import numpy as np
from pathlib import Path

# ===== USER SETTINGS =====
INPUT_CSV  = "openaq_new_york_bbox_last_month_hourly_wide.csv"
OUTPUT_CSV = Path(INPUT_CSV).with_name(Path(INPUT_CSV).stem.replace("_hourly_wide","") + "_daily_aqi.csv")

# pollutants we’ll handle for AQI
POLLUTANTS = ["pm25","pm10","o3","co","no2","so2"]

# molecular weights (for µg/m³ <-> ppm/ppb at 25°C & 1 atm, 24.45 L/mol)
MW = {"o3": 48.00, "co": 28.01, "no2": 46.01, "so2": 64.07}

# EPA AQI breakpoints (conc units noted in comments)
AQI_BP = {
    "pm25": [ # µg/m³, 24-hr
        (0.0, 12.0, 0, 50), (12.1, 35.4, 51, 100), (35.5, 55.4, 101, 150),
        (55.5, 150.4, 151, 200), (150.5, 250.4, 201, 300), (250.5, 350.4, 301, 400),
        (350.5, 500.4, 401, 500),
    ],
    "pm10": [ # µg/m³, 24-hr
        (0, 54, 0, 50), (55, 154, 51, 100), (155, 254, 101, 150),
        (255, 354, 151, 200), (355, 424, 201, 300), (425, 504, 301, 400),
        (505, 604, 401, 500),
    ],
    "o3": [   # ppm, max 8-hr
        (0.000, 0.054, 0, 50), (0.055, 0.070, 51, 100), (0.071, 0.085, 101, 150),
        (0.086, 0.105, 151, 200), (0.106, 0.200, 201, 300),
    ],
    "co": [   # ppm, max 8-hr
        (0.0, 4.4, 0, 50), (4.5, 9.4, 51, 100), (9.5, 12.4, 101, 150),
        (12.5, 15.4, 151, 200), (15.5, 30.4, 201, 300), (30.5, 40.4, 301, 400),
        (40.5, 50.4, 401, 500),
    ],
    "no2": [  # ppb, max 1-hr
        (0, 53, 0, 50), (54, 100, 51, 100), (101, 360, 101, 150),
        (361, 649, 151, 200), (650, 1249, 201, 300), (1250, 1649, 301, 400),
        (1650, 2049, 401, 500),
    ],
    "so2": [  # ppb, max 1-hr
        (0, 35, 0, 50), (36, 75, 51, 100), (76, 185, 101, 150),
        (186, 304, 151, 200), (305, 604, 201, 300), (605, 804, 301, 400),
        (805, 1004, 401, 500),
    ],
}

# --- helpers ---
def aqi_from_bp(p, conc):
    if pd.isna(conc):
        return np.nan
    for (Cl, Ch, Il, Ih) in AQI_BP[p]:
        if Cl <= conc <= Ch:
            return (Ih - Il) / (Ch - Cl) * (conc - Cl) + Il
    return np.nan

def ugm3_to_ppm(x, species):  # µg/m³ -> ppm
    if pd.isna(x): return np.nan
    return (x * 24.45) / (MW[species] * 1000.0)

def ugm3_to_ppb(x, species):  # µg/m³ -> ppb
    if pd.isna(x): return np.nan
    return (x * 24.45) / MW[species]

def ensure_ts_hour(df):
    # choose ts column and floor to hour
    if "ts_hour" in df.columns:
        df["ts_hour"] = pd.to_datetime(df["ts_hour"], utc=True, errors="coerce").dt.floor("H")
    else:
        ts_col = "ts_utc" if "ts_utc" in df.columns else None
        if ts_col is None:
            raise ValueError("No timestamp column found (ts_hour or ts_utc).")
        df["ts_hour"] = pd.to_datetime(df[ts_col], utc=True, errors="coerce").dt.floor("H")
    return df

# --- load hourly wide ---
df = pd.read_csv(INPUT_CSV)
df = ensure_ts_hour(df)

# build locationId if missing
if "locationId" not in df.columns:
    if {"lat","lon"} <= set(df.columns):
        df["locationId"] = (df["lat"].round(5).astype(str) + "," + df["lon"].round(5).astype(str))
    else:
        df["locationId"] = "unknown"

# make sure pollutant columns exist (if not, create empty)
for p in POLLUTANTS:
    if p not in df.columns:
        df[p] = np.nan

# --- impute hourly values per locationId (ffill/bfill -> group median -> global median) ---
df = df.sort_values(["locationId","ts_hour"])
for p in POLLUTANTS:
    df[p] = pd.to_numeric(df[p], errors="coerce")
    df[p] = df.groupby("locationId")[p].ffill().bfill()
    if df[p].isna().any():
        med_loc = df.groupby("locationId")[p].transform("median")
        df[p] = df[p].fillna(med_loc)
    if df[p].isna().any():
        df[p] = df[p].fillna(df[p].median())

# --- normalize units to EPA target units ---
# use companion unit columns when present (e.g., pm25_unit, o3_unit)
def normalize_series_to_target(p):
    s = df[p].copy()
    unit_col = f"{p}_unit"
    u = df[unit_col].str.lower() if unit_col in df.columns else pd.Series(index=df.index, dtype="object")

    if p in ("pm25","pm10"):
        # EPA uses µg/m³ 24h avg; typically OpenAQ already µg/m³
        # If someone reports ppm/ppb (rare), convert back to µg/m³ (not expected here) -> skip
        return s

    if p == "o3":  # target: ppm (8h max)
        out = s.copy()
        mask_ppm = u.str.contains("ppm", na=False)
        mask_ppb = u.str.contains("ppb", na=False)
        # assume remaining are µg/m³
        out[mask_ppm] = s[mask_ppm]
        out[mask_ppb] = s[mask_ppb] / 1000.0
        out[~(mask_ppm | mask_ppb)] = s[~(mask_ppm | mask_ppb)].apply(lambda x: ugm3_to_ppm(x, "o3"))
        return pd.to_numeric(out, errors="coerce")

    if p == "co":  # target: ppm (8h max)
        out = s.copy()
        mask_ppm = u.str.contains("ppm", na=False)
        mask_ppb = u.str.contains("ppb", na=False)
        mask_mgm3 = u.str.contains("mg/m", na=False)
        out[mask_ppm] = s[mask_ppm]
        out[mask_ppb] = s[mask_ppb] / 1000.0
        out[mask_mgm3] = s[mask_mgm3] * 24.45 / MW["co"]
        out[~(mask_ppm | mask_ppb | mask_mgm3)] = s[~(mask_ppm | mask_ppb | mask_mgm3)].apply(lambda x: ugm3_to_ppm(x, "co"))
        return pd.to_numeric(out, errors="coerce")

    if p in ("no2","so2"):  # target: ppb (1h max)
        out = s.copy()
        mask_ppb = u.str.contains("ppb", na=False)
        mask_ppm = u.str.contains("ppm", na=False)
        out[mask_ppb] = s[mask_ppb]
        out[mask_ppm] = s[mask_ppm] * 1000.0
        out[~(mask_ppb | mask_ppm)] = s[~(mask_ppb | mask_ppm)].apply(lambda x: ugm3_to_ppb(x, p))
        return pd.to_numeric(out, errors="coerce")

    return s

std_cols = {}
for p in POLLUTANTS:
    std_cols[p] = f"{p}_std"
    df[std_cols[p]] = normalize_series_to_target(p)

# --- compute daily concentrations per EPA method ---
df["date"] = df["ts_hour"].dt.floor("D")

daily_parts = []

# PM2.5/PM10: 24h mean
for p in ("pm25","pm10"):
    c = df.groupby(["locationId","date"])[std_cols[p]].mean().reset_index()
    c = c.rename(columns={std_cols[p]: f"{p}_conc"})
    daily_parts.append(c)

# O3/CO: daily max 8h rolling average
for p in ("o3","co"):
    out_list = []
    for loc, sub in df[["locationId","ts_hour",std_cols[p]]].dropna().groupby("locationId"):
        sub = sub.sort_values("ts_hour").set_index("ts_hour")
        roll8 = sub[std_cols[p]].rolling("8H", min_periods=6).mean().reset_index()
        roll8["date"] = roll8["ts_hour"].dt.floor("D")
        dmax = roll8.groupby("date")[std_cols[p]].max().reset_index().rename(columns={std_cols[p]: f"{p}_conc"})
        dmax.insert(0, "locationId", loc)
        out_list.append(dmax)
    if out_list:
        daily_parts.append(pd.concat(out_list, ignore_index=True))

# NO2/SO2: daily max 1h
for p in ("no2","so2"):
    c = df.groupby(["locationId","date"])[std_cols[p]].max().reset_index()
    c = c.rename(columns={std_cols[p]: f"{p}_conc"})
    daily_parts.append(c)

# merge all daily concs
daily = None
for part in daily_parts:
    daily = part if daily is None else pd.merge(daily, part, on=["locationId","date"], how="outer")

# --- compute sub-AQIs and overall AQI ---
for p in POLLUTANTS:
    conc_col = f"{p}_conc"
    if conc_col in daily.columns:
        daily[f"aqi_{p}"] = daily[conc_col].apply(lambda x: aqi_from_bp(p, x))

aqi_cols = [c for c in daily.columns if c.startswith("aqi_")]
daily["aqi"] = daily[aqi_cols].max(axis=1, skipna=True)

# --- next 24h AQI per location ---
daily = daily.sort_values(["locationId","date"])
daily["aqi_next_24h"] = daily.groupby("locationId")["aqi"].shift(-1)

# --- optional metadata: bring most recent lat/lon/location/city/country of the day ---
meta_cols = [c for c in ["location","city","country","lat","lon"] if c in df.columns]
if meta_cols:
    meta = (df.sort_values("ts_hour")
              .groupby(["locationId","date"])[meta_cols]
              .agg(lambda s: s.dropna().iloc[-1] if not s.dropna().empty else np.nan)
              .reset_index())
    daily = pd.merge(daily, meta, on=["locationId","date"], how="left")

daily = daily.sort_values(["locationId","date"]).reset_index(drop=True)
daily.to_csv(OUTPUT_CSV, index=False)
print(f"[✓] wrote {len(daily)} rows → {OUTPUT_CSV}")
print(daily.head(10))


  df = pd.read_csv(INPUT_CSV)


[✓] wrote 3814 rows → openaq_new_york_bbox_last_month_daily_aqi.csv
   locationId                      date  pm25_conc  pm10_conc   o3_conc  \
0         384 2016-03-12 00:00:00+00:00   2.400000       21.0       NaN   
1         384 2016-03-26 00:00:00+00:00   2.400000       21.0       NaN   
2         384 2016-03-31 00:00:00+00:00   8.550000       21.0       NaN   
3         384 2016-04-01 00:00:00+00:00  10.411765       21.0  0.023833   
4         384 2016-04-02 00:00:00+00:00   5.020000       21.0  0.021001   
5         384 2016-04-03 00:00:00+00:00   4.700000       21.0       NaN   
6         384 2016-05-12 00:00:00+00:00  15.400000       21.0       NaN   
7         384 2016-05-21 00:00:00+00:00   6.600000       21.0       NaN   
8         384 2016-08-04 00:00:00+00:00   6.900000       21.0       NaN   
9         384 2016-08-15 00:00:00+00:00   5.800000       21.0       NaN   

    co_conc  no2_conc  so2_conc   aqi_pm25   aqi_pm10  ...    aqi_co  \
0       NaN  0.004783       0.0  1

In [6]:
import pandas as pd
import numpy as np
import re
from pathlib import Path

# ===== USER SETTINGS =====
INPUT_CSV  = "openaq_new_york_bbox_last_month_hourly_wide.csv"
OUTPUT_CSV = Path(INPUT_CSV).with_name(Path(INPUT_CSV).stem.replace("_hourly_wide","") + "_daily_aqi.csv")

POLLUTANTS = ["pm25","pm10","o3","co","no2","so2"]
UNIT_COLS  = [f"{p}_unit" for p in POLLUTANTS]

# molecular weights (25°C, 1 atm; 24.45 L/mol)
MW = {"o3": 48.00, "co": 28.01, "no2": 46.01, "so2": 64.07}

# EPA AQI breakpoints
AQI_BP = {
    "pm25": [(0.0,12.0,0,50),(12.1,35.4,51,100),(35.5,55.4,101,150),(55.5,150.4,151,200),(150.5,250.4,201,300),(250.5,350.4,301,400),(350.5,500.4,401,500)],
    "pm10": [(0,54,0,50),(55,154,51,100),(155,254,101,150),(255,354,151,200),(355,424,201,300),(425,504,301,400),(505,604,401,500)],
    "o3":   [(0.000,0.054,0,50),(0.055,0.070,51,100),(0.071,0.085,101,150),(0.086,0.105,151,200),(0.106,0.200,201,300)],
    "co":   [(0.0,4.4,0,50),(4.5,9.4,51,100),(9.5,12.4,101,150),(12.5,15.4,151,200),(15.5,30.4,201,300),(30.5,40.4,301,400),(40.5,50.4,401,500)],
    "no2":  [(0,53,0,50),(54,100,51,100),(101,360,101,150),(361,649,151,200),(650,1249,201,300),(1250,1649,301,400),(1650,2049,401,500)],
    "so2":  [(0,35,0,50),(36,75,51,100),(76,185,101,150),(186,304,151,200),(305,604,201,300),(605,804,301,400),(805,1004,401,500)],
}

# ---------- helpers ----------
def aqi_from_bp(p, conc):
    if pd.isna(conc): return np.nan
    for (Cl, Ch, Il, Ih) in AQI_BP[p]:
        if Cl <= conc <= Ch:
            return (Ih-Il)/(Ch-Cl)*(conc-Cl)+Il
    return np.nan

def ugm3_to_ppm(x, species):  # µg/m³ -> ppm
    if pd.isna(x): return np.nan
    return (x * 24.45) / (MW[species] * 1000.0)

def ugm3_to_ppb(x, species):  # µg/m³ -> ppb
    if pd.isna(x): return np.nan
    return (x * 24.45) / MW[species]

def canon_unit(raw):
    if raw is None or (isinstance(raw,float) and np.isnan(raw)): return ""
    u = str(raw).lower()
    u = re.sub(r"[^a-z0-9/µ³]", "", u)  # strip commas/whitespace/etc.
    u = u.replace("μ", "µ")
    u = u.replace("ug/m3","µg/m3").replace("ugm3","µg/m3").replace("µgm3","µg/m3").replace("mug/m3","µg/m3")
    return u

def ensure_ts_hour(df):
    if "ts_hour" in df.columns:
        df["ts_hour"] = pd.to_datetime(df["ts_hour"], utc=True, errors="coerce").dt.floor("H")
    else:
        ts_col = "ts_utc" if "ts_utc" in df.columns else None
        if ts_col is None:
            raise ValueError("No timestamp column found (ts_hour or ts_utc).")
        df["ts_hour"] = pd.to_datetime(df[ts_col], utc=True, errors="coerce").dt.floor("H")
    return df

# ---------- load robustly ----------
first = pd.read_csv(INPUT_CSV, nrows=1)
dtype_units = {c: "string" for c in UNIT_COLS if c in first.columns}
df = pd.read_csv(INPUT_CSV, low_memory=False, dtype=dtype_units)
df = ensure_ts_hour(df)

# ensure locationId
if "locationId" not in df.columns:
    if {"lat","lon"} <= set(df.columns):
        df["locationId"] = (df["lat"].round(5).astype(str) + "," + df["lon"].round(5).astype(str))
    else:
        df["locationId"] = "unknown"

# ensure all pollutant & unit columns exist
for p in POLLUTANTS:
    if p not in df.columns: df[p] = np.nan
for c in UNIT_COLS:
    if c not in df.columns: df[c] = ""

# ---------- impute hourly values (per location), guarantee no NaNs ----------
df = df.sort_values(["locationId","ts_hour"]).reset_index(drop=True)
for p in POLLUTANTS:
    df[p] = pd.to_numeric(df[p], errors="coerce")
    # per-location ffill/bfill
    df[p] = df.groupby("locationId", group_keys=False)[p].apply(lambda s: s.ffill().bfill())
    # per-location median
    loc_med = df.groupby("locationId")[p].transform("median")
    df[p] = df[p].fillna(loc_med)
    # global median
    glob_med = df[p].median()
    df[p] = df[p].fillna(glob_med)
    # final fallback
    df[p] = df[p].fillna(0.0)

# ---------- normalize units to EPA targets (tolerant parsing + heuristics) ----------
for c in UNIT_COLS:
    df[c] = df[c].astype("string").map(canon_unit)

def norm_to_target(p):
    s = pd.to_numeric(df[p], errors="coerce")
    u = df[f"{p}_unit"]
    if p in ("pm25","pm10"):
        return s  # µg/m³ already the target for daily avg
    if p == "o3":
        out = s.copy()
        m_ppm = u.str.contains("ppm", na=False)
        m_ppb = u.str.contains("ppb", na=False)
        m_ug  = u.str.contains("µg/m3", na=False)
        out[m_ppm] = s[m_ppm]
        out[m_ppb] = s[m_ppb] / 1000.0
        out[m_ug]  = s[m_ug].apply(lambda x: ugm3_to_ppm(x,"o3"))
        unk = ~(m_ppm|m_ppb|m_ug)
        out[unk & (s < 0.2)] = s[unk & (s < 0.2)]           # looks like ppm
        out[unk & (s > 200)] = s[unk & (s > 200)].apply(lambda x: ugm3_to_ppm(x,"o3"))
        mid = unk & (s >= 0.2) & (s <= 200)                 # looks like ppb
        out[mid] = s[mid] / 1000.0
        return pd.to_numeric(out, errors="coerce")
    if p == "co":
        out = s.copy()
        m_ppm = u.str.contains("ppm", na=False)
        m_ppb = u.str.contains("ppb", na=False)
        m_mg  = u.str.contains("mg/m3", na=False)
        m_ug  = u.str.contains("µg/m3", na=False)
        out[m_ppm] = s[m_ppm]
        out[m_ppb] = s[m_ppb] / 1000.0
        out[m_mg]  = s[m_mg] * 24.45 / MW["co"]
        out[m_ug]  = s[m_ug].apply(lambda x: ugm3_to_ppm(x,"co"))
        unk = ~(m_ppm|m_ppb|m_mg|m_ug)
        out[unk & (s < 0.2)] = s[unk & (s < 0.2)]
        out[unk & (s > 200)] = s[unk & (s > 200)].apply(lambda x: ugm3_to_ppm(x,"co"))
        mid = unk & (s >= 0.2) & (s <= 200)
        out[mid] = s[mid] / 1000.0
        return pd.to_numeric(out, errors="coerce")
    if p in ("no2","so2"):
        out = s.copy()
        mw = MW[p]
        m_ppb = u.str.contains("ppb", na=False)
        m_ppm = u.str.contains("ppm", na=False)
        m_ug  = u.str.contains("µg/m3", na=False)
        out[m_ppb] = s[m_ppb]
        out[m_ppm] = s[m_ppm] * 1000.0
        out[m_ug]  = s[m_ug].apply(lambda x: ugm3_to_ppb(x,p))
        unk = ~(m_ppb|m_ppm|m_ug)
        out[unk & (s < 0.2)] = s[unk & (s < 0.2)] * 1000.0   # ppm-ish → ppb
        out[unk & (s > 200)] = s[unk & (s > 200)].apply(lambda x: ugm3_to_ppb(x,p))
        mid = unk & (s >= 0.2) & (s <= 200)
        out[mid] = s[mid]                                     # ppb-ish
        return pd.to_numeric(out, errors="coerce")
    return s

std_cols = {}
for p in POLLUTANTS:
    std_cols[p] = f"{p}_std"
    df[std_cols[p]] = norm_to_target(p)
    # make sure no NaNs after normalization
    df[std_cols[p]] = df[std_cols[p]].fillna(df.groupby("locationId")[std_cols[p]].transform("median"))
    df[std_cols[p]] = df[std_cols[p]].fillna(df[std_cols[p]].median()).fillna(0.0)

# ---------- daily concentrations ----------
df["date"] = df["ts_hour"].dt.floor("D")
daily_parts = []

# PM: 24h mean
for p in ("pm25","pm10"):
    c = df.groupby(["locationId","date"])[std_cols[p]].mean().reset_index()
    c = c.rename(columns={std_cols[p]: f"{p}_conc"})
    daily_parts.append(c)

# O3/CO: max 8h rolling average
for p in ("o3","co"):
    out_list = []
    for loc, sub in df[["locationId","ts_hour",std_cols[p]]].groupby("locationId"):
        s = sub.sort_values("ts_hour").set_index("ts_hour")[std_cols[p]]
        roll8 = s.rolling("8H", min_periods=6).mean().reset_index(name="avg8")
        roll8["date"] = roll8["ts_hour"].dt.floor("D")
        daymax = roll8.groupby("date")["avg8"].max().reset_index().rename(columns={"avg8": f"{p}_conc"})
        daymax.insert(0, "locationId", loc)
        out_list.append(daymax)
    if out_list:
        daily_parts.append(pd.concat(out_list, ignore_index=True))

# NO2/SO2: max 1h
for p in ("no2","so2"):
    c = df.groupby(["locationId","date"])[std_cols[p]].max().reset_index()
    c = c.rename(columns={std_cols[p]: f"{p}_conc"})
    daily_parts.append(c)

# merge & fill remaining holes (use per-pollutant medians, then 0)
daily = None
for part in daily_parts:
    daily = part if daily is None else pd.merge(daily, part, on=["locationId","date"], how="outer")

for p in POLLUTANTS:
    col = f"{p}_conc"
    if col in daily.columns:
        med = daily[col].median(skipna=True)
        daily[col] = daily[col].fillna(med).fillna(0.0)

# ---------- AQI ----------
for p in POLLUTANTS:
    col = f"{p}_conc"
    if col in daily.columns:
        daily[f"aqi_{p}"] = daily[col].apply(lambda x: aqi_from_bp(p, x))

aqi_cols = [c for c in daily.columns if c.startswith("aqi_")]
# If all sub-AQIs are NaN for a row (should be rare), set overall AQI to 0
daily["aqi"] = daily[aqi_cols].max(axis=1, skipna=True).fillna(0.0)
for c in aqi_cols + ["aqi"]:
    daily[c] = daily[c].round(0)

# next 24h AQI (last day per location will be NaN; keep as-is)
daily = daily.sort_values(["locationId","date"])
daily["aqi_next_24h"] = daily.groupby("locationId")["aqi"].shift(-1)

# ---------- optional metadata backfill ----------
meta_cols = [c for c in ["location","city","country","lat","lon"] if c in df.columns]
if meta_cols:
    meta = (df.sort_values("ts_hour")
              .groupby(["locationId","date"])[meta_cols]
              .agg(lambda s: s.dropna().iloc[-1] if not s.dropna().empty else np.nan)
              .reset_index())
    daily = pd.merge(daily, meta, on=["locationId","date"], how="left")

daily = daily.sort_values(["locationId","date"]).reset_index(drop=True)
daily.to_csv(OUTPUT_CSV, index=False)
print(f"[✓] wrote {len(daily)} rows → {OUTPUT_CSV}")
print(daily.head(10))


[✓] wrote 3814 rows → openaq_new_york_bbox_last_month_daily_aqi.csv
   locationId                      date  pm25_conc  pm10_conc   o3_conc  \
0         384 2016-03-12 00:00:00+00:00   2.400000      7.265  0.027000   
1         384 2016-03-26 00:00:00+00:00   2.400000      7.265  0.027000   
2         384 2016-03-31 00:00:00+00:00   8.550000      7.265  0.027000   
3         384 2016-04-01 00:00:00+00:00  10.411765      7.265  0.023833   
4         384 2016-04-02 00:00:00+00:00   5.020000      7.265  0.023429   
5         384 2016-04-03 00:00:00+00:00   4.700000      7.265  0.027000   
6         384 2016-05-12 00:00:00+00:00  15.400000      7.265  0.027000   
7         384 2016-05-21 00:00:00+00:00   6.600000      7.265  0.027000   
8         384 2016-08-04 00:00:00+00:00   6.900000      7.265  0.027000   
9         384 2016-08-15 00:00:00+00:00   5.800000      7.265  0.027000   

   co_conc  no2_conc  so2_conc  aqi_pm25  aqi_pm10  ...  aqi_co  aqi_no2  \
0   0.0002      19.0       0.0

In [8]:
# train_aqi_next_day.py
import os
import pandas as pd
import numpy as np
from pathlib import Path
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.ensemble import HistGradientBoostingRegressor
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.inspection import permutation_importance
import joblib

# (optional) silence joblib CPU warning on Windows – set to your physical cores
os.environ.setdefault("LOKY_MAX_CPU_COUNT", "8")

# ========= USER SETTINGS =========
INPUT_CSV = "openaq_new_york_bbox_last_month_daily_aqi.csv"
OUT_DIR = Path(".")
MODEL_PATH = OUT_DIR / "aqi_next_day_hgb.joblib"
VAL_PREDS_CSV = OUT_DIR / "aqi_next_day_val_preds.csv"
VAL_REPORT_CSV = OUT_DIR / "aqi_next_day_val_report.csv"
VAL_PER_SITE_CSV = OUT_DIR / "aqi_next_day_val_report_by_location.csv"
VAL_IMPORTANCES_CSV = OUT_DIR / "aqi_next_day_val_feature_importances.csv"

# which features to lag / roll
LAG_AQI_DAYS = [1,2,3,4,5,6,7]
LAG_POLLUTANT_DAYS = [1,2,3]
ROLL_WINDOWS = [3,7]  # in days
ROLL_VARS = ["aqi","pm25_conc","o3_conc"]  # small & predictive

def add_time_features(df):
    df = df.copy()
    df["dow"] = df["date"].dt.weekday
    df["is_weekend"] = df["dow"].isin([5,6]).astype(int)
    df["month"] = df["date"].dt.month
    df["doy"] = df["date"].dt.dayofyear
    return df

def make_lags_rolls(df):
    df = df.sort_values(["locationId","date"]).copy()
    by_loc = []
    for loc, g in df.groupby("locationId"):
        g = g.sort_values("date").copy()

        for k in LAG_AQI_DAYS:
            g[f"aqi_lag_{k}"] = g["aqi"].shift(k)

        pols = ["pm25_conc","pm10_conc","o3_conc","co_conc","no2_conc","so2_conc"]
        for p in pols:
            if p in g.columns:
                for k in LAG_POLLUTANT_DAYS:
                    g[f"{p}_lag_{k}"] = g[p].shift(k)

        for w in ROLL_WINDOWS:
            for v in ROLL_VARS:
                if v in g.columns:
                    g[f"{v}_rollmean_{w}"] = (
                        g[v].rolling(window=w, min_periods=max(1, int(np.ceil(w*0.6))))
                            .mean()
                            .shift(1)
                    )
        by_loc.append(g)
    return pd.concat(by_loc, ignore_index=True)

def build_dataset(path):
    df = pd.read_csv(path)
    if "date" not in df.columns:
        raise ValueError("Expected a 'date' column.")
    df["date"] = pd.to_datetime(df["date"], utc=True, errors="coerce").dt.tz_convert(None)

    for c in ["locationId","date","aqi","aqi_next_24h"]:
        if c not in df.columns:
            raise ValueError(f"Missing required column: {c}")

    pol_cols = ["pm25_conc","pm10_conc","o3_conc","co_conc","no2_conc","so2_conc"]
    for c in pol_cols:
        if c not in df.columns:
            df[c] = np.nan

    for meta in ["lat","lon","city","country","location"]:
        if meta not in df.columns:
            df[meta] = np.nan

    df = df.sort_values(["locationId","date"])
    for c in pol_cols + ["aqi"]:
        df[c] = pd.to_numeric(df[c], errors="coerce")
        df[c] = df.groupby("locationId")[c].ffill().bfill()
        df[c] = df[c].fillna(df[c].median())

    df = add_time_features(df)
    df = make_lags_rolls(df)

    feature_cols = [col for col in df.columns if any(s in col for s in [
        "lag_","rollmean_","dow","month","is_weekend","doy"
    ])] + ["lat","lon"]
    X = df[["locationId","date"] + feature_cols].copy()
    y = df["aqi_next_24h"].copy()

    keep = y.notna()
    for c in feature_cols:
        keep &= X[c].notna()
    X = X[keep].reset_index(drop=True)
    y = y[keep].reset_index(drop=True)

    return df, X, y, feature_cols

def time_based_split(X, y, frac_val=0.2):
    Xy = X.copy()
    Xy["target"] = y.values
    val_idx, train_idx = [], []
    for _, g in Xy.groupby("locationId"):
        n = len(g)
        cut = int(np.floor(n * (1 - frac_val)))
        idx_sorted = g.sort_values("date").index
        train_idx.extend(idx_sorted[:cut])
        val_idx.extend(idx_sorted[cut:])
    return sorted(set(train_idx)), sorted(set(val_idx))

def regression_report(y_true, y_pred):
    mae = mean_absolute_error(y_true, y_pred)
    rmse = mean_squared_error(y_true, y_pred, squared=False)
    r2 = r2_score(y_true, y_pred)
    me = float(np.mean(y_pred - y_true))            # bias
    med_ae = float(np.median(np.abs(y_pred - y_true)))
    p90_ae = float(np.percentile(np.abs(y_pred - y_true), 90))
    # MAPE with protection against division by zero
    denom = np.where(y_true == 0, np.nan, np.abs(y_true))
    mape = float(np.nanmean(np.abs((y_pred - y_true) / denom)) * 100.0)
    return {
        "MAE": mae, "RMSE": rmse, "R2": r2,
        "Bias_ME": me, "MedianAE": med_ae, "P90AE": p90_ae, "MAPE_%": mape
    }

def main():
    df_all, X, y, feature_cols = build_dataset(INPUT_CSV)

    preproc = ColumnTransformer(
        transformers=[("num", "passthrough", feature_cols)],
        remainder="drop"
    )

    model = HistGradientBoostingRegressor(
        max_depth=None,
        learning_rate=0.06,
        max_iter=500,
        l2_regularization=0.0,
        early_stopping=True,
        validation_fraction=0.1,
        random_state=42,
    )

    pipe = Pipeline([("prep", preproc), ("model", model)])

    # time-based split
    train_idx, val_idx = time_based_split(X, y, frac_val=0.2)
    X_train, y_train = X.loc[train_idx, :], y.loc[train_idx]
    X_val,   y_val   = X.loc[val_idx, :],   y.loc[val_idx]

    # fit
    pipe.fit(X_train, y_train)

    # -------- VALIDATION: predictions + metrics ----------
    pred_val = pipe.predict(X_val)
    overall = regression_report(y_val.values, pred_val)

    print("\n=== Validation Metrics (overall) ===")
    for k, v in overall.items():
        if np.isnan(v):
            print(f"{k}: NaN")
        else:
            print(f"{k}: {v:.3f}")

    # per-location stats
    per_site_rows = []
    Xy_val = X_val[["locationId","date"]].copy()
    Xy_val["y_true"] = y_val.values
    Xy_val["y_pred"] = pred_val
    for loc, g in Xy_val.groupby("locationId"):
        rep = regression_report(g["y_true"].values, g["y_pred"].values)
        rep["locationId"] = loc
        rep["n_samples"] = len(g)
        per_site_rows.append(rep)
    per_site = pd.DataFrame(per_site_rows).sort_values("MAE")

    # save predictions & reports
    Xy_val.to_csv(VAL_PREDS_CSV, index=False)
    pd.DataFrame([overall]).to_csv(VAL_REPORT_CSV, index=False)
    per_site.to_csv(VAL_PER_SITE_CSV, index=False)
    print(f"\n[✓] wrote validation predictions  → {VAL_PREDS_CSV}")
    print(f"[✓] wrote overall report         → {VAL_REPORT_CSV}")
    print(f"[✓] wrote per-location report    → {VAL_PER_SITE_CSV}")

    # -------- Permutation feature importance (on validation) ----------
    # Use the transformed numeric features
    feat_names = pipe.named_steps["prep"].feature_names_in_
    r = permutation_importance(
        pipe, X_val, y_val, n_repeats=10, random_state=42, n_jobs=1, scoring="neg_mean_absolute_error"
    )
    importances = (pd.DataFrame({
        "feature": feat_names,
        "importance_mean": r.importances_mean,
        "importance_std": r.importances_std
    })
    .sort_values("importance_mean", ascending=False))
    importances.to_csv(VAL_IMPORTANCES_CSV, index=False)
    print(f"[✓] wrote permutation importances → {VAL_IMPORTANCES_CSV}")
    print("\nTop 15 features by importance:")
    print(importances.head(15))

    # -------- Retrain on ALL data and save model ----------
    pipe.fit(X, y)
    joblib.dump({"pipeline": pipe, "features": feature_cols}, MODEL_PATH)
    print(f"\n[✓] saved model → {MODEL_PATH}")

    # -------- Predict next-day AQI for each location (t -> t+1) ----------
    X_latest = (
        X.sort_values("date")
         .groupby("locationId", as_index=False)
         .tail(1)
         .copy()
    )
    preds_next = pipe.predict(X_latest[feat_names])
    next_out = X_latest[["locationId","date"]].copy()
    next_out["date"] = next_out["date"] + pd.Timedelta(days=1)
    next_out["aqi_pred_next_24h"] = preds_next
    print("\nNext-day AQI predictions (per location):")
    print(next_out.head(10))

if __name__ == "__main__":
    main()



=== Validation Metrics (overall) ===
MAE: 8.853
RMSE: 13.684
R2: 0.269
Bias_ME: 1.206
MedianAE: 6.010
P90AE: 20.613
MAPE_%: 24.693

[✓] wrote validation predictions  → aqi_next_day_val_preds.csv
[✓] wrote overall report         → aqi_next_day_val_report.csv
[✓] wrote per-location report    → aqi_next_day_val_report_by_location.csv




[✓] wrote permutation importances → aqi_next_day_val_feature_importances.csv

Top 15 features by importance:
                 feature  importance_mean  importance_std
34        aqi_rollmean_7         1.409543        0.131847
5                    doy         0.904514        0.145742
12             aqi_lag_7         0.752854        0.086889
32  pm25_conc_rollmean_3         0.415545        0.039979
13       pm25_conc_lag_1         0.333446        0.057800
9              aqi_lag_4         0.333243        0.049326
35  pm25_conc_rollmean_7         0.331214        0.083048
38                   lon         0.283432        0.058616
8              aqi_lag_3         0.265166        0.020334
6              aqi_lag_1         0.242031        0.078718
14       pm25_conc_lag_2         0.223337        0.031796
11             aqi_lag_6         0.210221        0.047717
15       pm25_conc_lag_3         0.167005        0.042232
37                   lat         0.142016        0.034606
31        aqi_rollmea

In [10]:
import os
import math
import numpy as np
import pandas as pd
from pathlib import Path

from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.model_selection import RandomizedSearchCV, PredefinedSplit
from sklearn.ensemble import HistGradientBoostingRegressor
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.inspection import permutation_importance
from scipy.stats import loguniform, randint
import joblib

os.environ.setdefault("LOKY_MAX_CPU_COUNT", "8")

# ================== USER SETTINGS ==================
INPUT_CSV   = "openaq_new_york_bbox_last_month_daily_aqi.csv"
WEATHER_CSV = None
OUT_DIR     = Path(".")

MODEL_PATH            = OUT_DIR / "aqi_next_day_hgb_tuned.joblib"
VAL_PREDS_CSV         = OUT_DIR / "aqi_next_day_val_preds.csv"
VAL_REPORT_CSV        = OUT_DIR / "aqi_next_day_val_report.csv"
VAL_PER_SITE_CSV      = OUT_DIR / "aqi_next_day_val_report_by_location.csv"
VAL_IMPORTANCES_CSV   = OUT_DIR / "aqi_next_day_val_feature_importances.csv"

LAG_AQI_DAYS        = list(range(1, 15))
LAG_POLLUTANT_DAYS  = [1, 2, 3, 7]
ROLL_WINDOWS        = [3, 7, 14]
EWMA_SPANS          = [3, 7, 14]

POLS = ["pm25_conc","pm10_conc","o3_conc","co_conc","no2_conc","so2_conc"]
WEATHER_VARS = ["temp","rh","wind_speed","wind_dir","precip","pblh"]

def add_calendar_features(df):
    df = df.copy()
    df["dow"] = df["date"].dt.weekday
    df["is_weekend"] = df["dow"].isin([5,6]).astype(int)
    df["month"] = df["date"].dt.month
    df["doy"] = df["date"].dt.dayofyear
    tau = 2 * math.pi
    df["sin_doy"] = np.sin(tau * df["doy"] / 365.25)
    df["cos_doy"] = np.cos(tau * df["doy"] / 365.25)
    return df

def add_sequence_features(df, cols_for_lags, lags, roll_windows, ewma_spans):
    df = df.sort_values(["locationId","date"]).copy()
    out = []
    for _, g in df.groupby("locationId"):
        g = g.sort_values("date").copy()
        for c in cols_for_lags:
            if c not in g.columns: 
                continue
            for k in lags:
                g[f"{c}_lag_{k}"] = g[c].shift(k)
        for c in cols_for_lags:
            if c not in g.columns: 
                continue
            for w in roll_windows:
                gm = g[c].rolling(window=w, min_periods=max(1,int(np.ceil(w*0.6)))).mean().shift(1)
                gs = g[c].rolling(window=w, min_periods=max(1,int(np.ceil(w*0.6)))).std().shift(1)
                g[f"{c}_rollmean_{w}"] = gm
                g[f"{c}_rollstd_{w}"]  = gs
        for c in cols_for_lags:
            if c not in g.columns: 
                continue
            for s in ewma_spans:
                g[f"{c}_ewm_{s}"] = g[c].ewm(span=s, adjust=False, min_periods=max(1,int(np.ceil(s*0.6)))).mean().shift(1)
        out.append(g)
    return pd.concat(out, ignore_index=True)

def regression_report(y_true, y_pred):
    mae  = mean_absolute_error(y_true, y_pred)
    rmse = mean_squared_error(y_true, y_pred, squared=False)
    r2   = r2_score(y_true, y_pred)
    me   = float(np.mean(y_pred - y_true))
    med_ae = float(np.median(np.abs(y_pred - y_true)))
    p90_ae = float(np.percentile(np.abs(y_pred - y_true), 90))
    denom  = np.where(y_true == 0, np.nan, np.abs(y_true))
    mape = float(np.nanmean(np.abs((y_pred - y_true) / denom)) * 100.0)
    return {"MAE":mae,"RMSE":rmse,"R2":r2,"Bias_ME":me,"MedianAE":med_ae,"P90AE":p90_ae,"MAPE_%":mape}

def time_split_indices(X, y, frac_val=0.2):
    Xy = X.copy()
    Xy["target"] = y.values
    train_idx, val_idx = [], []
    for _, g in Xy.groupby("locationId"):
        n = len(g)
        if n < 5:
            train_idx.extend(g.index)
            continue
        cut = int(np.floor(n * (1 - frac_val)))
        idx_sorted = g.sort_values("date").index
        train_idx.extend(idx_sorted[:cut])
        val_idx.extend(idx_sorted[cut:])
    return sorted(set(train_idx)), sorted(set(val_idx))

def build_dataset(aqi_path, weather_path=None):
    df = pd.read_csv(aqi_path)
    if "date" not in df.columns:
        raise ValueError("Expected 'date' column in AQI CSV.")
    df["date"] = pd.to_datetime(df["date"], utc=True, errors="coerce").dt.tz_convert(None)

    for c in ["locationId","date","aqi","aqi_next_24h"]:
        if c not in df.columns:
            raise ValueError(f"Missing column: {c}")

    for c in POLS:
        if c not in df.columns:
            df[c] = np.nan
    for c in ["lat","lon"]:
        if c not in df.columns:
            df[c] = np.nan

    if weather_path:
        w = pd.read_csv(weather_path)
        if "date" not in w.columns or "locationId" not in w.columns:
            raise ValueError("Weather CSV must have 'locationId' and 'date'.")
        w["date"] = pd.to_datetime(w["date"], utc=True, errors="coerce").dt.tz_convert(None)
        keep_w = ["locationId","date"] + [c for c in WEATHER_VARS if c in w.columns]
        w = w[keep_w].copy()
        df = pd.merge(df, w, on=["locationId","date"], how="left")

    df = df.sort_values(["locationId","date"])
    for c in POLS + ["aqi"] + [c for c in WEATHER_VARS if c in df.columns]:
        df[c] = pd.to_numeric(df[c], errors="coerce")
        df[c] = df.groupby("locationId")[c].ffill().bfill()
        df[c] = df[c].fillna(df[c].median())

    df = add_calendar_features(df)

    cols_for_lags = ["aqi","pm25_conc","o3_conc"] + [c for c in WEATHER_VARS if c in df.columns]
    df = add_sequence_features(df, cols_for_lags, LAG_AQI_DAYS, ROLL_WINDOWS, EWMA_SPANS)
    df = add_sequence_features(df, POLS, LAG_POLLUTANT_DAYS, [], [])

    base_feats = ["dow","is_weekend","month","doy","sin_doy","cos_doy","lat","lon"]
    seq_feats = [c for c in df.columns if any(s in c for s in ["_lag_","_rollmean_","_rollstd_","_ewm_"])]
    feature_cols = base_feats + seq_feats

    y = pd.to_numeric(df["aqi_next_24h"], errors="coerce")
    X = df[["locationId","date"] + feature_cols].copy()
    mask = y.notna()
    for c in feature_cols:
        mask &= X[c].notna()
    X = X[mask].reset_index(drop=True)
    y = y[mask].reset_index(drop=True)
    return df, X, y, feature_cols

def main():
    df_all, X, y, feature_cols = build_dataset(INPUT_CSV, WEATHER_CSV)

    # ---------- Time-based split (FIX) ----------
    train_idx, val_idx = time_split_indices(X, y, frac_val=0.2)
    X_train, y_train = X.loc[train_idx, :], y.loc[train_idx]
    X_val,   y_val   = X.loc[val_idx, :],   y.loc[val_idx]

    # ---------- Pipeline ----------
    preproc = ColumnTransformer([("num", "passthrough", feature_cols)], remainder="drop")
    model = HistGradientBoostingRegressor(
        max_depth=None, learning_rate=0.06, max_iter=600,
        l2_regularization=1e-6, early_stopping=True,
        validation_fraction=0.1, random_state=42
    )
    pipe = Pipeline([("prep", preproc), ("model", model)])

    # ---------- Hyperparameter tuning ----------
    split_index = np.full(len(X), -1)
    split_index[val_idx] = 0
    ps = PredefinedSplit(split_index)

    param_dist = {
        "model__learning_rate": loguniform(0.01, 0.2),
        "model__max_iter": randint(400, 1400),
        "model__max_depth": randint(3, 14),
        "model__l2_regularization": loguniform(1e-7, 1e-2),
        "model__min_samples_leaf": randint(10, 200)
    }

    search = RandomizedSearchCV(
        pipe, param_distributions=param_dist, n_iter=40,
        cv=ps, scoring="neg_mean_absolute_error", n_jobs=1,
        random_state=42, verbose=1
    )
    search.fit(X, y)
    best_pipe = search.best_estimator_
    print("Best params:", search.best_params_)

    # ---------- Validate tuned model ----------
    pred_val = best_pipe.predict(X_val)
    overall = regression_report(y_val.values, pred_val)
    print("\n=== Validation Metrics (tuned) ===")
    for k, v in overall.items():
        print(f"{k}: {v:.3f}")

    # ---------- Per-site bias calibration ----------
    val_frame = X_val[["locationId","date"]].copy()
    val_frame["y_true"] = y_val.values
    val_frame["y_pred"] = pred_val
    site_bias = (val_frame.groupby("locationId")
                          .apply(lambda g: (g["y_true"] - g["y_pred"]).mean())
                          .to_dict())

    def apply_site_bias(dfX, preds):
        adj = preds.copy()
        for i, loc in enumerate(dfX["locationId"].values):
            adj[i] = preds[i] + site_bias.get(loc, 0.0)
        return adj

    pred_val_adj = apply_site_bias(X_val, pred_val)
    overall_adj = regression_report(y_val.values, pred_val_adj)
    print("\n=== Validation Metrics (tuned + per-site bias) ===")
    for k, v in overall_adj.items():
        print(f"{k}: {v:.3f}")

    # ---------- Save validation outputs ----------
    out_val = X_val[["locationId","date"]].copy()
    out_val["y_true"] = y_val.values
    out_val["y_pred"] = pred_val
    out_val["y_pred_biasadj"] = pred_val_adj
    out_val.to_csv(VAL_PREDS_CSV, index=False)
    pd.DataFrame([overall]).to_csv(VAL_REPORT_CSV, index=False)

    per_rows = []
    for loc, g in out_val.groupby("locationId"):
        rep = regression_report(g["y_true"].values, g["y_pred"].values)
        rep_bias = regression_report(g["y_true"].values, g["y_pred_biasadj"].values)
        per_rows.append({
            "locationId": loc, "n": len(g),
            **{f"raw_{k}": v for k, v in rep.items()},
            **{f"biasadj_{k}": v for k, v in rep_bias.items()},
        })
    pd.DataFrame(per_rows).to_csv(VAL_PER_SITE_CSV, index=False)

    # ---------- Permutation importance ----------
    feat_names = best_pipe.named_steps["prep"].feature_names_in_
    r = permutation_importance(
        best_pipe, X_val, y_val, n_repeats=10, random_state=42,
        n_jobs=1, scoring="neg_mean_absolute_error"
    )
    importances = (pd.DataFrame({
        "feature": feat_names,
        "importance_mean": r.importances_mean,
        "importance_std": r.importances_std
    }).sort_values("importance_mean", ascending=False))
    importances.to_csv(VAL_IMPORTANCES_CSV, index=False)
    print("\nTop 15 features:")
    print(importances.head(15))

    # ---------- Retrain on ALL data and save ----------
    best_pipe.fit(X, y)
    joblib.dump({"pipeline": best_pipe, "features": feature_cols, "site_bias": site_bias}, MODEL_PATH)
    print(f"\n[✓] saved tuned model → {MODEL_PATH}")

    # ---------- Predict next-day AQI (t -> t+1) ----------
    X_latest = (X.sort_values("date").groupby("locationId", as_index=False).tail(1).copy())
    preds_next = best_pipe.predict(X_latest[feat_names])
    # FIXED brackets below
    preds_next_adj = np.array([p + site_bias.get(loc, 0.0)
                               for p, loc in zip(preds_next, X_latest["locationId"])])
    next_out = X_latest[["locationId","date"]].copy()
    next_out["date"] = next_out["date"] + pd.Timedelta(days=1)
    next_out["aqi_pred_next_24h"] = preds_next
    next_out["aqi_pred_next_24h_biasadj"] = preds_next_adj
    print("\nNext-day AQI predictions (per location, raw & bias-adjusted):")
    print(next_out.head(10))

if __name__ == "__main__":
    main()


Fitting 1 folds for each of 40 candidates, totalling 40 fits
Best params: {'model__l2_regularization': 0.0005981457917250761, 'model__learning_rate': 0.03573884768117847, 'model__max_depth': 8, 'model__max_iter': 505, 'model__min_samples_leaf': 13}

=== Validation Metrics (tuned) ===
MAE: 5.230
RMSE: 7.679
R2: 0.747
Bias_ME: 0.204
MedianAE: 4.100
P90AE: 11.160
MAPE_%: 14.800

=== Validation Metrics (tuned + per-site bias) ===
MAE: 4.551
RMSE: 6.969
R2: 0.792
Bias_ME: -0.000
MedianAE: 3.155
P90AE: 10.591
MAPE_%: 12.731

Top 15 features:
                 feature  importance_mean  importance_std
72            aqi_ewm_14         1.206804        0.090983
7                cos_doy         1.004292        0.099374
74       pm25_conc_ewm_7         0.952531        0.077934
75      pm25_conc_ewm_14         0.909454        0.076377
56       aqi_rollmean_14         0.695752        0.045391
13             aqi_lag_4         0.390161        0.047443
10             aqi_lag_1         0.387077        0.0

Testing locationId=924
Chose date t = 2016-05-06 (first available with full features)
Predicted AQI for t+1: 37.96 | bias-adjusted: 32.85


SystemExit: No testable row with ground truth on/after 2016-05-06 for site 924.
Try a date >= 2016-03-20 and ensure there is data for the next day.

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)
