# NYS Air Quality — Exploratory Notebook

This notebook provides a compact, reproducible walkthrough of the NYS daily air-quality pipeline using OpenAQ data.

**What this notebook is**
- A developer-facing companion to the production pipeline.
- A convenient way to run a single report date locally and preview the generated outputs (note, map, coverage chart).

**What this notebook is not**
- The production job runner. Scheduled execution is handled by GitHub Actions calling `scripts/run_daily.py`.

**Notes**
- The pipeline is idempotent per report date: reruns overwrite that day’s note and report artifacts and upsert the corresponding CSV row.
- Repository outputs are stored in `notes/` and `reports/`. Notebook cell outputs should be cleared before commits to keep diffs small and reviewable.

In [None]:
# Daily AQ notebook — configuration
# Notebook purpose: exploratory analysis of the daily pipeline outputs.
# Source of truth for production runs: scripts/run_daily.py + src/nys_aq/.

from __future__ import annotations

import os
import json
import time
from pathlib import Path

import requests

REPO_ROOT = Path.cwd().parent  # notebooks/ -> repo root
DATA_DIR = REPO_ROOT / "data"
NOTES_DIR = REPO_ROOT / "notes"
REPORTS_DIR = REPO_ROOT / "reports"

ENV_PATH = REPO_ROOT / ".env"
BOUNDARY_PATH = DATA_DIR / "nys_boundary.geojson"

OPENAQ_BASE_URL = "https://api.openaq.org/v3"
BBOX = os.getenv("BBOX", "-79.8,40.45,-71.85,45.1")
SAMPLE_SIZE = int(os.getenv("SAMPLE_SIZE", "100"))
STALE_HOURS = int(os.getenv("STALE_HOURS", "12"))

def load_env_file(path: Path) -> None:
    """Load KEY=VALUE pairs from a .env file into the process environment (no overwrite)."""
    if not path.exists():
        return
    for line in path.read_text(encoding="utf-8").splitlines():
        s = line.strip()
        if not s or s.startswith("#") or "=" not in s:
            continue
        k, v = s.split("=", 1)
        os.environ.setdefault(k.strip(), v.strip().strip('"').strip("'"))

load_env_file(ENV_PATH)

OPENAQ_API_KEY = os.getenv("OPENAQ_API_KEY")
if not OPENAQ_API_KEY:
    raise RuntimeError("Missing OPENAQ_API_KEY. Add it to .env or export it in your shell.")

print("repo_root:", REPO_ROOT)
print("bbox:", BBOX)
print("sample_size:", SAMPLE_SIZE)
print("stale_hours:", STALE_HOURS)

In [None]:
# Fetch OpenAQ location catalog (bbox is a performance hint; NY membership is enforced by GeoJSON)

t0 = time.time()
resp = requests.get(
    f"{OPENAQ_BASE_URL}/locations",
    params={"bbox": BBOX, "iso": "US", "limit": 1000},
    headers={"X-API-Key": OPENAQ_API_KEY},
    timeout=30,
)
latency_ms = int((time.time() - t0) * 1000)

resp.raise_for_status()
payload = resp.json()

locations_found = payload.get("meta", {}).get("found")
locations = payload.get("results", []) or []

print("status:", resp.status_code)
print("latency_ms:", latency_ms)
print("locations_found:", locations_found)
print("locations_returned:", len(locations))

In [None]:
# Load NYS boundary (GeoJSON Feature)

if not BOUNDARY_PATH.exists():
    raise FileNotFoundError(f"Missing boundary file: {BOUNDARY_PATH}")

ny_feature = json.loads(BOUNDARY_PATH.read_text(encoding="utf-8"))

geom = ny_feature.get("geometry") or {}
geom_type = geom.get("type")
coords = geom.get("coordinates")

if geom_type not in {"Polygon", "MultiPolygon"}:
    raise ValueError(f"Unsupported geometry type: {geom_type}")

print("boundary_geometry:", geom_type)

In [None]:
# Filter locations to points inside the NYS boundary

def point_in_ring(lon: float, lat: float, ring: list[list[float]]) -> bool:
    """Ray casting point-in-polygon for a single ring."""
    inside = False
    j = len(ring) - 1
    for i in range(len(ring)):
        xi, yi = ring[i]
        xj, yj = ring[j]
        intersects = ((yi > lat) != (yj > lat)) and (
            lon < (xj - xi) * (lat - yi) / ((yj - yi) or 1e-15) + xi
        )
        if intersects:
            inside = not inside
        j = i
    return inside

def point_in_polygon(lon: float, lat: float, polygon: list[list[list[float]]]) -> bool:
    """polygon: [outer_ring, hole1, hole2, ...]"""
    if not polygon:
        return False
    if not point_in_ring(lon, lat, polygon[0]):
        return False
    for hole in polygon[1:]:
        if point_in_ring(lon, lat, hole):
            return False
    return True

def point_in_multipolygon(lon: float, lat: float, multipolygon: list) -> bool:
    for polygon in multipolygon:
        if point_in_polygon(lon, lat, polygon):
            return True
    return False

def loc_lon_lat(loc: dict) -> tuple[float | None, float | None]:
    c = loc.get("coordinates") or {}
    try:
        return float(c.get("longitude")), float(c.get("latitude"))
    except (TypeError, ValueError):
        return None, None

geom = ny_feature["geometry"]
geom_type = geom["type"]
coords = geom["coordinates"]

ny_locations: list[dict] = []
missing_coords = 0

for loc in locations:
    lon, lat = loc_lon_lat(loc)
    if lon is None or lat is None:
        missing_coords += 1
        continue

    inside = point_in_polygon(lon, lat, coords) if geom_type == "Polygon" else point_in_multipolygon(lon, lat, coords)
    if inside:
        ny_locations.append(loc)

print("locations_total:", len(locations))
print("missing_coords:", missing_coords)
print("ny_locations:", len(ny_locations))
print("first_5_names:", [l.get("name") for l in ny_locations[:5]])

In [None]:
# Stable sample of NY locations (deterministic ordering by location id + bbox)

import hashlib

def stable_rank(location_id: int, bbox: str) -> int:
    raw = f"{location_id}|{bbox}".encode("utf-8")
    return int(hashlib.sha256(raw).hexdigest(), 16)

ny_locations_with_id = [l for l in ny_locations if isinstance(l.get("id"), int)]
ny_sorted = sorted(ny_locations_with_id, key=lambda l: stable_rank(l["id"], BBOX))

sampled_locations = ny_sorted[: min(SAMPLE_SIZE, len(ny_sorted))]
sample_ids = [l["id"] for l in sampled_locations]
sample_label = "All NY locations (with coordinates)" if len(sample_ids) == len(ny_sorted) else f"Stable sample (n={len(sample_ids)})"

print("ny_locations_with_id:", len(ny_sorted))
print("sample_size_actual:", len(sample_ids))
print("sample_label:", sample_label)
print("sample_first5_ids:", sample_ids[:5])
print("sample_first5_names:", [l.get("name") for l in sampled_locations[:5]])

In [None]:
# Fetch latest measurements with retry/backoff (handles HTTP 429) and normalize to rows

from datetime import datetime, timezone

session = requests.Session()
session.headers.update({"X-API-Key": OPENAQ_API_KEY})

def fetch_latest(location_id: int, *, max_attempts: int = 6, timeout_s: int = 30) -> dict:
    url = f"{OPENAQ_BASE_URL}/locations/{location_id}/latest"
    delay_s = 1.0

    for attempt in range(1, max_attempts + 1):
        resp = session.get(url, timeout=timeout_s)

        if resp.status_code == 429:
            retry_after = resp.headers.get("Retry-After")
            if retry_after:
                try:
                    delay_s = max(delay_s, float(retry_after))
                except ValueError:
                    pass
            time.sleep(delay_s)
            delay_s = min(delay_s * 2, 30.0)
            continue

        resp.raise_for_status()
        return resp.json()

    raise RuntimeError(f"Rate-limited too long for location_id={location_id}")

def parse_utc_datetime(dt_obj: object) -> datetime | None:
    if not isinstance(dt_obj, dict):
        return None
    s = dt_obj.get("utc")
    if not isinstance(s, str) or not s:
        return None
    try:
        if s.endswith("Z"):
            s = s[:-1] + "+00:00"
        return datetime.fromisoformat(s).astimezone(timezone.utc)
    except ValueError:
        return None

t0 = time.time()

latest_payloads: list[dict] = []
errors: list[tuple[int, str]] = []

for loc_id in sample_ids:
    try:
        latest_payloads.append(fetch_latest(loc_id))
    except Exception as e:
        errors.append((loc_id, f"{type(e).__name__}: {e}"))

elapsed_s = time.time() - t0

print("locations_requested:", len(sample_ids))
print("latest_ok:", len(latest_payloads))
print("latest_errors:", len(errors))
print("latest_elapsed_s:", round(elapsed_s, 2))
if errors:
    print("first_error:", errors[0])

now_utc = datetime.now(timezone.utc)
stale_cutoff = now_utc.timestamp() - (STALE_HOURS * 3600)

rows: list[dict] = []
missing_datetime = 0

for payload in latest_payloads:
    for m in (payload.get("results") or []):
        dt = parse_utc_datetime(m.get("datetime"))
        if dt is None:
            missing_datetime += 1
        dt_ts = dt.timestamp() if dt else None

        stale = (dt_ts is None) or (dt_ts < stale_cutoff)
        coords = m.get("coordinates") or {}

        rows.append(
            {
                "locationsId": m.get("locationsId"),
                "sensorsId": m.get("sensorsId"),
                "latitude": coords.get("latitude"),
                "longitude": coords.get("longitude"),
                "value": m.get("value"),
                "datetime_utc": dt.isoformat().replace("+00:00", "Z") if dt else None,
                "stale": stale,
            }
        )

total = len(rows)
stale_count = sum(1 for r in rows if r["stale"])
stale_fraction = (stale_count / total) if total else 0.0

numeric_rows = [r for r in rows if isinstance(r.get("value"), (int, float))]
top_values = sorted(numeric_rows, key=lambda r: float(r["value"]), reverse=True)[:5]

print("locations_sampled:", len(sample_ids))
print("measurements_total:", total)
print("missing_datetime:", missing_datetime)
print("stale_fraction:", round(stale_fraction, 3))
print("top_5_values:", [(r.get("locationsId"), r.get("value"), r.get("datetime_utc")) for r in top_values])

In [None]:
# Run the production pipeline for a chosen report date (yesterday ET by default)

from datetime import date

from nys_aq.daily import run_daily

# Optional: set a specific date for reproducibility
# report_date = date.fromisoformat("2026-02-01")
report_date = None

cfg = run_daily(run_date=report_date)
cfg


In [None]:
# Inspect the generated outputs (note + CSV row)

import pandas as pd

report_date = cfg.run_date.isoformat()

note_path = cfg.repo_root / "notes" / f"{report_date}.md"
csv_path = cfg.repo_root / "data" / "daily.csv"
report_dir = cfg.repo_root / "reports" / report_date

print("note:", note_path)
print("report_dir:", report_dir)

print("\n--- note (top) ---\n")
print("\n".join(note_path.read_text(encoding="utf-8").splitlines()[:25]))

df = pd.read_csv(csv_path)
row = df.loc[df["report_date"] == report_date]
row

In [None]:
# Outputs (generated by the pipeline) — quick links for review

from IPython.display import SVG, Markdown, display

report_date = cfg.run_date.isoformat()
note_path = cfg.repo_root / "notes" / f"{report_date}.md"
report_dir = cfg.repo_root / "reports" / report_date

display(Markdown(f"## Outputs for `{report_date}`"))
display(Markdown(f"- Note: `{note_path.relative_to(cfg.repo_root)}`"))
display(Markdown(f"- Map: `{(report_dir / 'map.svg').relative_to(cfg.repo_root)}`"))
display(Markdown(f"- Coverage: `{(report_dir / 'parameter_coverage.svg').relative_to(cfg.repo_root)}`"))

display(Markdown("### Note preview"))
display(Markdown("\n".join(note_path.read_text(encoding='utf-8').splitlines()[:40])))

display(Markdown("### Parameter coverage"))
display(SVG(filename=str(report_dir / "parameter_coverage.svg")))

display(Markdown("### Map"))
display(SVG(filename=str(report_dir / "map.svg")))