In [1]:
import os
import time
import json
import random
import datetime as dt
from typing import Dict, Any, Iterable, List, Optional, Tuple

import requests
import psycopg2
import psycopg2.extras

In [2]:
BASE_URL = "https://api.openaq.org/v3"

PG_DSN = "dbname=airquality user=postgres password=Milian112! host=localhost port=5432"
OPENAQ_API_KEY = "7e4733295d8f3421127be449858a1a7cd7569a4c61e6581377453d05e436a341"

HEADERS = {"X-API-Key": OPENAQ_API_KEY}

In [3]:
# Covers England and a bit around borders
ENGLAND_BBOX = "-6.5,49.8,2.2,55.9"

TARGET_PARAMS = {"pm25", "no2"}

In [4]:
def pg_conn():
    return psycopg2.connect(PG_DSN)

In [5]:
import random
import requests

def fetch_json(
    path: str,
    params: Optional[Dict[str, Any]] = None,
    sleep_s: float = 0.2,
    max_retries: int = 5,
) -> Dict[str, Any]:
    """
    Robust OpenAQ v3 fetch with:
    - X-API-Key auth
    - retries + exponential backoff
    - optional auth debugging
    """

    url = f"{BASE_URL}{path}"
    last_err = None

    # ---- AUTH DEBUG (optional) ----
    if os.getenv("DEBUG_AUTH", "0") == "1":
        print("AUTH DEBUG | Request URL:", url)
        print(
            "AUTH DEBUG | X-API-Key present:",
            "X-API-Key" in HEADERS and bool(HEADERS.get("X-API-Key"))
        )
        print(
            "AUTH DEBUG | X-API-Key length:",
            len(HEADERS.get("X-API-Key") or "")
        )

    for attempt in range(1, max_retries + 1):
        try:
            response = requests.get(
                url,
                headers=HEADERS,
                params=params,
                timeout=60
            )

            # Explicit auth failure (do not retry)
            if response.status_code == 401:
                raise RuntimeError(
                    "401 Unauthorized â€” X-API-Key missing or invalid"
                )

            # Transient / retryable errors
            if response.status_code in (429, 500, 502, 503, 504):
                last_err = (
                    response.status_code,
                    response.text[:300]
                )
                backoff = min(30, (2 ** (attempt - 1)))
                time.sleep(backoff)
                continue

            # Any other non-200
            response.raise_for_status()

            time.sleep(sleep_s)
            return response.json()

        except requests.RequestException as e:
            last_err = str(e)
            backoff = min(30, (2 ** (attempt - 1)))
            time.sleep(backoff)

    raise RuntimeError(
        f"Failed after {max_retries} retries | "
        f"url={url} params={params} | "
        f"last_error={last_err}"
    )

In [6]:
def paginated_get(path: str, base_params: Dict[str, Any], limit: int = 100) -> Iterable[Dict[str, Any]]:
    """
    OpenAQ list endpoints typically use page + limit pagination.
    We keep going until results are empty.
    """
    page = 1
    while True:
        params = dict(base_params)
        params["limit"] = limit
        params["page"] = page
        payload = fetch_json(path, params=params)
        results = payload.get("results", [])
        if not results:
            break
        for item in results:
            yield item
        page += 1

In [7]:
def upsert_locations(conn, rows: List[Tuple]):
    sql = """
    INSERT INTO stg_openaq_locations
      (location_id, name, locality, timezone, country_code, country_name,
       provider_id, provider_name, owner_id, owner_name, lon, lat, geom, raw)
    VALUES %s
    ON CONFLICT (location_id) DO UPDATE SET
      name=EXCLUDED.name,
      locality=EXCLUDED.locality,
      timezone=EXCLUDED.timezone,
      country_code=EXCLUDED.country_code,
      country_name=EXCLUDED.country_name,
      provider_id=EXCLUDED.provider_id,
      provider_name=EXCLUDED.provider_name,
      owner_id=EXCLUDED.owner_id,
      owner_name=EXCLUDED.owner_name,
      lon=EXCLUDED.lon,
      lat=EXCLUDED.lat,
      geom=EXCLUDED.geom,
      raw=EXCLUDED.raw,
      loaded_at=now();
    """
    with conn.cursor() as cur:
        psycopg2.extras.execute_values(cur, sql, rows, page_size=1000)
    conn.commit()

In [8]:
def upsert_sensors(conn, rows: List[Tuple]):
    sql = """
    INSERT INTO stg_openaq_sensors
      (sensor_id, location_id, parameter_name, parameter_units,
       datetime_first_utc, datetime_last_utc, raw)
    VALUES %s
    ON CONFLICT (sensor_id) DO UPDATE SET
      location_id=EXCLUDED.location_id,
      parameter_name=EXCLUDED.parameter_name,
      parameter_units=EXCLUDED.parameter_units,
      datetime_first_utc=EXCLUDED.datetime_first_utc,
      datetime_last_utc=EXCLUDED.datetime_last_utc,
      raw=EXCLUDED.raw,
      loaded_at=now();
    """
    with conn.cursor() as cur:
        psycopg2.extras.execute_values(cur, sql, rows, page_size=1000)
    conn.commit()

In [9]:
def upsert_daily(conn, rows: List[Tuple]):
    sql = """
    INSERT INTO stg_openaq_sensor_daily
      (sensor_id, date_utc, parameter_name, value, units,
       observed_count, expected_count, coverage_pct, sd, raw)
    VALUES %s
    ON CONFLICT (sensor_id, date_utc) DO UPDATE SET
      parameter_name=EXCLUDED.parameter_name,
      value=EXCLUDED.value,
      units=EXCLUDED.units,
      observed_count=EXCLUDED.observed_count,
      expected_count=EXCLUDED.expected_count,
      coverage_pct=EXCLUDED.coverage_pct,
      sd=EXCLUDED.sd,
      raw=EXCLUDED.raw,
      loaded_at=now();
    """
    with conn.cursor() as cur:
        psycopg2.extras.execute_values(cur, sql, rows, page_size=2000)
    conn.commit()

In [10]:
def parse_ts(ts_obj: Optional[Dict[str, Any]]) -> Optional[str]:
    if not ts_obj:
        return None
    return ts_obj.get("utc") or None

In [11]:
def load_england_locations():
    """
    Pull locations inside the England-ish bbox.
    """
    base_params = {"bbox": ENGLAND_BBOX}

    buf: List[Tuple] = []
    with pg_conn() as conn:
        for loc in paginated_get("/locations", base_params=base_params, limit=100):
            loc_id = loc["id"]
            coords = loc.get("coordinates") or {}
            lat = coords.get("latitude")
            lon = coords.get("longitude")

            # Postgres can cast EWKT text to geometry automatically
            geom_ewkt = None
            if lat is not None and lon is not None:
                geom_ewkt = f"SRID=4326;POINT({lon} {lat})"

            country = loc.get("country") or {}
            provider = loc.get("provider") or {}
            owner = loc.get("owner") or {}

            buf.append((
                loc_id,
                loc.get("name"),
                loc.get("locality"),
                loc.get("timezone"),
                country.get("code"),
                country.get("name"),
                provider.get("id"),
                provider.get("name"),
                owner.get("id"),
                owner.get("name"),
                lon,
                lat,
                geom_ewkt,
                json.dumps(loc)
            ))

            if len(buf) >= 2000:
                upsert_locations(conn, buf)
                buf.clear()

        if buf:
            upsert_locations(conn, buf)
            buf.clear()

    print("Loaded England bbox locations.")

In [12]:
def load_sensors_for_locations():
    """
    For each location, pull sensors and keep only pm25/no2.
    """
    with pg_conn() as conn:
        with conn.cursor() as cur:
            cur.execute("SELECT location_id FROM stg_openaq_locations;")
            location_ids = [r[0] for r in cur.fetchall()]

        sensor_rows: List[Tuple] = []
        for loc_id in location_ids:
            payload = fetch_json(f"/locations/{loc_id}/sensors")
            for s in payload.get("results", []):
                param = (s.get("parameter") or {})
                param_name = (param.get("name") or "").lower()
                if param_name not in TARGET_PARAMS:
                    continue

                sensor_rows.append((
                    s["id"],
                    loc_id,
                    param_name,
                    param.get("units"),
                    parse_ts(s.get("datetimeFirst")),
                    parse_ts(s.get("datetimeLast")),
                    json.dumps(s)
                ))

            if len(sensor_rows) >= 2000:
                upsert_sensors(conn, sensor_rows)
                sensor_rows.clear()

        if sensor_rows:
            upsert_sensors(conn, sensor_rows)

    print("Loaded sensors (pm25/no2).")

In [13]:
def dedupe_daily_rows(rows: List[Tuple]) -> List[Tuple]:
    """
    Deduplicate by (sensor_id, date_utc) within a batch.
    Keep the row with the best 'observed_count' (highest), then fall back to latest payload.
    Tuple layout:
      (sensor_id, day, parameter_name, value, units, observed, expected, pct_cov, sd, raw_json)
    """
    best = {}
    for row in rows:
        key = (row[0], row[1])  # (sensor_id, date_utc)
        observed = row[5] if row[5] is not None else -1

        if key not in best:
            best[key] = row
        else:
            prev = best[key]
            prev_observed = prev[5] if prev[5] is not None else -1
            # keep the row with higher observed_count
            if observed > prev_observed:
                best[key] = row

    return list(best.values())

In [14]:
def log_failure(conn, sensor_id, param_name, w_start, w_end, page, msg):
    with conn.cursor() as cur:
        cur.execute(
            """
            INSERT INTO etl_openaq_failures
              (sensor_id, parameter_name, window_start, window_end, page, error_message)
            VALUES (%s, %s, %s, %s, %s, %s)
            """,
            (sensor_id, param_name, w_start, w_end, page, msg[:2000])
        )
    conn.commit()

In [15]:
def load_daily_measurements(date_from: str, date_to: str, max_failures_per_sensor: int = 2):
    """
    Pull daily aggregates from /v3/sensors/{id}/days (more reliable than /measurements/daily).
    Uses monthly windows and ISO datetimes.
    """
    date_from_d = dt.date.fromisoformat(date_from)
    date_to_d = dt.date.fromisoformat(date_to)

    windows = list(month_windows(date_from_d, date_to_d))
    print(f"Monthly windows: {len(windows)}")

    def to_iso_start(d: str) -> str:
        return f"{d}T00:00:00Z"

    def to_iso_end(d: str) -> str:
        return f"{d}T23:59:59Z"

    with pg_conn() as conn:
        with conn.cursor() as cur:
            cur.execute("SELECT sensor_id, parameter_name FROM stg_openaq_sensors;")
            sensors = cur.fetchall()

        daily_rows: List[Tuple] = []
        failure_count = {}

        for sensor_id, param_name in sensors:
            if failure_count.get(sensor_id, 0) >= max_failures_per_sensor:
                continue

            for w_start, w_end in windows:
                if failure_count.get(sensor_id, 0) >= max_failures_per_sensor:
                    break

                page = 1
                while True:
                    params = {
                        "datetime_from": to_iso_start(w_start),
                        "datetime_to": to_iso_end(w_end),
                        "limit": 100,
                        "page": page
                    }

                    try:
                        payload = fetch_json(f"/sensors/{sensor_id}/days", params=params)
                    except Exception as e:
                        msg = str(e)
                        print(f"[WARN] sensor {sensor_id} days window {w_start}..{w_end} page {page} failed: {msg}")
                        failure_count[sensor_id] = failure_count.get(sensor_id, 0) + 1
                        try:
                            log_failure(conn, sensor_id, param_name, w_start, w_end, page, msg)
                        except Exception:
                            pass
                        break  # abandon this window

                    results = payload.get("results", [])
                    if not results:
                        break

                    for r in results:
                        # Use the period start (utc) as day key
                        dt_from = ((r.get("period") or {}).get("datetimeFrom") or {}).get("utc")
                        if not dt_from:
                            continue
                        day = dt_from[:10]

                        parameter = r.get("parameter") or {}
                        units = parameter.get("units")

                        coverage = r.get("coverage") or {}
                        observed = coverage.get("observedCount")
                        expected = coverage.get("expectedCount")
                        pct_cov = coverage.get("percentCoverage")

                        # Prefer summary.avg if present, else r["value"]
                        summary = r.get("summary") or {}
                        value = summary.get("avg", r.get("value"))
                        sd = summary.get("sd", r.get("sd"))

                        daily_rows.append((
                            sensor_id,
                            day,
                            (parameter.get("name") or param_name),
                            value,
                            units,
                            observed,
                            expected,
                            pct_cov,
                            sd,
                            json.dumps(r)
                        ))

                    if len(daily_rows) >= 5000:
                        batch = dedupe_daily_rows(daily_rows)
                        upsert_daily(conn, batch)
                        daily_rows.clear()



                    page += 1

        if daily_rows:
            batch = dedupe_daily_rows(daily_rows)
            upsert_daily(conn, batch)


    quarantined = [sid for sid, c in failure_count.items() if c >= max_failures_per_sensor]
    print(f"Loaded /days aggregates. Quarantined sensors this run: {len(quarantined)}")

In [16]:
def month_windows(date_from: dt.date, date_to: dt.date):
    """
    Yield (start_date, end_date) as YYYY-MM-DD strings, month by month.
    end_date is exclusive-ish for safety (we'll pass to API as inclusive range; overlap is ok due to upsert PK).
    """
    cur = dt.date(date_from.year, date_from.month, 1)
    end = dt.date(date_to.year, date_to.month, 1)

    while cur <= end:
        # next month
        if cur.month == 12:
            nxt = dt.date(cur.year + 1, 1, 1)
        else:
            nxt = dt.date(cur.year, cur.month + 1, 1)

        start_str = str(max(cur, date_from))
        # Use the last day of the month or date_to, whichever is earlier
        end_day = min(nxt - dt.timedelta(days=1), date_to)
        end_str = str(end_day)

        yield start_str, end_str
        cur = nxt

In [17]:
start = dt.date(2023, 9, 1)
today = dt.date.today()

load_england_locations()
load_sensors_for_locations()
load_daily_measurements(date_from=str(start), date_to=str(today))

Loaded England bbox locations.
Loaded sensors (pm25/no2).
Monthly windows: 29


KeyboardInterrupt: 