# OpenAQ Hourly Data Downloader

This notebook fetches hourly PM2.5 and ozone measurements from the OpenAQ v3 API. Update the configuration below to reference your country metadata CSV before running the main workflow.

In [None]:
# ============================================================
# ===================  User Configuration  ===================
# ============================================================

import os

# 1) OpenAQ authentication (v3 requires an API key)
API_KEY = os.getenv("OPENAQ_API_KEY")  # set this environment variable before running

# 1.1) Proxy control
DISABLE_ENV_PROXIES = True  # Set to False if corporate proxies are required

# 2) Country metadata (CSV file used to map regions to OpenAQ)
COUNTRY_CSV_PATH = "E:/Github_project/rfasst_test/data/Regions.csv"
COUNTRY_CSV_COLUMNS = {
    "country_name": "country_name",  # required; CSV column name for country labels
    "iso3": "iso3",                  # required; CSV column name for ISO3 codes
    "fasst_region": "fasst_region",  # optional; set to None if the CSV does not contain this column
}
OUTPUT_DIR = os.path.join(os.getcwd(), "output")  # per-country CSV files will be written here

# 3) Sampling strategy
MAX_LOCATIONS_PER_COUNTRY = 200  # <=0 means no limit

# 4) Country filtering (optional)
INCLUDE_COUNTRIES = []  # limit run to these ISO2/ISO3 codes or names
EXCLUDE_COUNTRIES = []  # exclude these ISO2/ISO3 codes or names

# 5) Rate limiting and retry behaviour (align with OpenAQ v3 limits)
MAX_LIMIT = 1000
REQUESTS_PER_MIN = 55
REQUESTS_PER_HOUR = 1900
REQUEST_SLEEP = 0.10
RETRY_MAX_TRIES = 6
RETRY_BACKOFF_BASE = 0.5
RETRY_BACKOFF_FACTOR = 2.0
RETRY_AFTER_CAP = 90
PER_COUNTRY_PAUSE_SEC = 30

# 6) Optional geographic filters
WITHIN_WKT = None
WITHIN_RADIUS_KM = None
BBOX = None  # [minLon, minLat, maxLon, maxLat]

# 7) Hourly data chunking parameters
HOURS_CHUNK_DAYS = 90
MAX_CHUNK_SPLIT_DEPTH = 3
PAGE_LIMITS_TRY = [1000, 500, 200]
REQUEST_TIMEOUT = 30

# 8) Hourly data time window (UTC)
DATETIME_FROM = "2020-01-01T00:00:00Z"
DATETIME_TO = "2024-12-31T23:59:59Z"


In [None]:
from __future__ import annotations
import os
import re, time, math
from typing import Optional, List, Dict, Tuple
from datetime import datetime, timedelta, timezone

import pandas as pd
from tqdm import tqdm
from openaq import OpenAQ

# --- HTTP 会话直连 v3，用于 /v3/sensors/{id}/hours（更可控的超时/重试） ---
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

BASE_URL = "https://api.openaq.org/v3"

def _build_http_session(api_key: str) -> requests.Session:
    s = requests.Session()
    headers = {"Accept": "application/json"}
    if api_key:
        headers["X-API-Key"] = api_key
    s.headers.update(headers)
    retry = Retry(
        total=RETRY_MAX_TRIES,
        backoff_factor=0.5,
        status_forcelist=(408, 429, 500, 502, 503, 504),
        allowed_methods=frozenset(["GET"]),
        raise_on_status=False,
        respect_retry_after_header=True,
    )
    adapter = HTTPAdapter(max_retries=retry, pool_connections=50, pool_maxsize=50)
    s.mount("https://", adapter)
    s.mount("http://", adapter)
    return s

def _maybe_disable_env_proxies():
    if not DISABLE_ENV_PROXIES:
        return
    for k in ["HTTP_PROXY","HTTPS_PROXY","ALL_PROXY","NO_PROXY","http_proxy","https_proxy","all_proxy","no_proxy"]:
        os.environ.pop(k, None)

_RATE_LIMIT_RE = re.compile(r"Limit resets in\s+(\d+)\s+seconds", re.I)

def _classify_exc(e: Exception) -> str:
    s = str(e)
    if "Rate limit exceeded" in s or "429" in s:
        return "rate"
    if "404" in s or "Not Found" in s:
        return "notfound"
    if any(x in s for x in ["500","502","503","504","Internal Server Error","Bad Gateway"]):
        return "server"
    if "timeout" in s.lower() or "timed out" in s.lower():
        return "timeout"
    return "other"

def _parse_rate_reset_from_resp(resp: requests.Response) -> Optional[int]:
    if resp is not None and hasattr(resp, "headers"):
        ra = resp.headers.get("Retry-After")
        if ra:
            try:
                return min(int(ra), RETRY_AFTER_CAP)
            except Exception:
                return None
    return None

def _parse_rate_reset(e: Exception) -> Optional[int]:
    m = _RATE_LIMIT_RE.search(str(e))
    if m:
        try:
            return min(int(m.group(1)), RETRY_AFTER_CAP)
        except Exception:
            pass
    try:
        resp = getattr(e, "response", None)
        if resp:
            return _parse_rate_reset_from_resp(resp)
    except Exception:
        pass
    return None

def _sleep_with_backoff(attempt: int):
    delay = RETRY_BACKOFF_BASE * (RETRY_BACKOFF_FACTOR ** max(0, attempt - 1))
    time.sleep(delay)

def _parse_iso_z(v: Optional[str]) -> Optional[str]:
    if v is None or str(v).strip() == "" or str(v).strip().lower() == "none":
        return None
    try:
        ts = pd.to_datetime(v, utc=True)
        return ts.strftime("%Y-%m-%dT%H:%M:%SZ")
    except Exception:
        return None

class RateLimiter:
    def __init__(self, rpm: int = REQUESTS_PER_MIN, rph: int = REQUESTS_PER_HOUR):
        self.min_interval = 60.0 / max(1, rpm)
        self.hour_budget = rph
        self.last_call_ts = 0.0
        self.window_start = time.time()
        self.used_this_hour = 0
    def wait(self):
        now = time.time()
        since_last = now - self.last_call_ts
        if since_last < self.min_interval:
            time.sleep(self.min_interval - since_last)
        self.last_call_ts = time.time()
        now = time.time()
        if now - self.window_start >= 3600:
            self.window_start = now
            self.used_this_hour = 0
        self.used_this_hour += 1

RL = RateLimiter()


# ================= Country CSV 读取 & 国家映射 =================
COUNTRY_NAME_ALIASES = {
    "KOREA; REPUBLIC OF": "Republic of Korea",
    "CONGO;  Democratic Republic of (was Zaire)": "Democratic Republic of the Congo",
    "TANZANIA;UNITED REPUBLIC OF": "Tanzania",
    "IRAN (ISLAMIC REPUBLIC OF)": "Iran",
    "VIET NAM": "Vietnam",
    "SYRIAN ARAB REPUBLIC": "Syrian Arab Republic",
    "LIBYAN ARAB JAMAHIRIYA": "Libya",
    "CROATIA (local name: Hrvatska)": "Croatia",
    "BOLIVIA": "Bolivia (Plurinational State of)",
    "VENEZUELA": "Venezuela (Bolivarian Republic of)",
    "MOLDOVA; REPUBLIC OF": "Republic of Moldova",
    "MACAU": "Macao",
    "PALESTINIAN TERRITORY; Occupied": "State of Palestine",
    "KOREA; DEMOCRATIC PEOPLE'S REPUBLIC OF": "Democratic People's Republic of Korea",
}
HISTORIC_ISO3_TO_ISO2 = {"SCG": ["RS","ME"], "ANT": ["CW","SX","BQ"]}

def load_regions_csv(csv_path: str, column_map: Dict[str, str]) -> pd.DataFrame:
    """Load country metadata from a CSV file and normalise key columns."""
    if not os.path.exists(csv_path):
        raise FileNotFoundError(f"Country CSV not found: {csv_path}")

    df = pd.read_csv(csv_path)
    column_map = column_map or {}
    required_keys = ["country_name", "iso3"]
    optional_keys = ["fasst_region"]

    rename_map = {}
    for key in required_keys + optional_keys:
        source = column_map.get(key) or key
        if source not in df.columns:
            if key in required_keys:
                raise ValueError(f"Column '{source}' required for '{key}' was not found in {csv_path}.")
            else:
                df[key] = None
                continue
        if source != key:
            rename_map[source] = key

    if rename_map:
        df = df.rename(columns=rename_map)

    for key in optional_keys:
        if key not in df.columns:
            df[key] = None

    df["country_name"] = df["country_name"].astype(str).str.strip()
    if "iso3" in df.columns:
        df["iso3"] = df["iso3"].astype(str).str.strip().str.upper()
    df = df.dropna(subset=["country_name"]).reset_index(drop=True)
    return df

def _norm(s: str) -> str:
    s = (s or "").lower().strip()
    s = re.sub(r"[^\w\s\-]", " ", s)
    s = re.sub(r"\s+", " ", s)
    return s

def map_countries_to_openaq_ids(client: OpenAQ, regions_df: pd.DataFrame) -> pd.DataFrame:
    # 获取 OpenAQ countries 列表
    tries = 0
    while True:
        try:
            RL.wait()
            oq_df = pd.DataFrame(client.countries.list(limit=MAX_LIMIT).dict()["results"])
            break
        except Exception as e:
            tries += 1
            cls = _classify_exc(e)
            if cls == "rate":
                reset = _parse_rate_reset(e) or 25
                print(f"[RATE] countries.list 限流，等待 {reset}s ...")
                time.sleep(reset + 1); continue
            if tries >= RETRY_MAX_TRIES: raise
            _sleep_with_backoff(tries)

    oq_df["name_norm"] = oq_df["name"].map(_norm)
    oq_df["code"] = oq_df["code"].astype(str).str.upper()
    if "code3" not in oq_df.columns: oq_df["code3"] = None

    rows, misses = [], []
    for _, r in regions_df.iterrows():
        nm = str(r.get("country_name","")).strip()
        iso3 = str(r.get("iso3","")).upper().strip()
        fasst = r.get("fasst_region")

        if iso3 in HISTORIC_ISO3_TO_ISO2:
            iso2s = HISTORIC_ISO3_TO_ISO2[iso3]
            hits = oq_df[oq_df["code"].isin(iso2s)]
            if not hits.empty:
                for _, best in hits.iterrows():
                    rows.append({
                        "countries_id": best["id"],
                        "country_name": best["name"],
                        "country_code": best["code"],
                        "iso3": best.get("code3") or iso3,
                        "fasst_region": fasst,
                        "country_name_req": nm,
                        "iso3_req": iso3,
                    })
                continue
            else:
                misses.append({"country_name_req": nm, "iso3_req": iso3}); continue

        aliased = COUNTRY_NAME_ALIASES.get(nm, nm)
        nm_norm = _norm(aliased) if aliased else None

        hit = None
        if iso3 and "code3" in oq_df and oq_df["code3"].notna().any():
            hit = oq_df[oq_df["code3"] == iso3]
        if (hit is None or hit.empty) and nm_norm:
            hit = oq_df[oq_df["name_norm"] == nm_norm]
        if (hit is None or hit.empty) and nm_norm:
            hit = oq_df[oq_df["name_norm"].str.contains(rf"^{re.escape(nm_norm)}") |
                        oq_df["name_norm"].str.contains(rf"{re.escape(nm_norm)}$")]
        if hit is not None and not hit.empty:
            best = hit.iloc[0]
            rows.append({
                "countries_id": best["id"],
                "country_name": best["name"],
                "country_code": best["code"],
                "iso3": iso3 if iso3 else None,
                "fasst_region": fasst,
                "country_name_req": nm,
                "iso3_req": iso3,
            })
        else:
            misses.append({"country_name_req": nm, "iso3_req": iso3})

    if misses:
        print("未匹配到 OpenAQ 的国家（建议补充别名/校对 ISO3）：")
        for m in misses: print("  -", m)

    def _norm_any(x: str) -> str: return str(x or "").strip().upper()
    df = pd.DataFrame(rows)

    if INCLUDE_COUNTRIES:
        inc = set(map(_norm_any, INCLUDE_COUNTRIES))
        df = df[
            df["country_code"].map(_norm_any).isin(inc) |
            df["country_name"].map(_norm_any).isin(inc) |
            df["iso3"].map(_norm_any).isin(inc)
        ]
    if EXCLUDE_COUNTRIES:
        exc = set(map(_norm_any, EXCLUDE_COUNTRIES))
        df = df[
            ~(
                df["country_code"].map(_norm_any).isin(exc) |
                df["country_name"].map(_norm_any).isin(exc) |
                df["iso3"].map(_norm_any).isin(exc)
            )
        ]
    return df.reset_index(drop=True)


# ================= OpenAQ 查询封装（locations/sensors 走 SDK） =================
def get_parameter_id(client: OpenAQ, aliases: List[str]) -> int:
    """按 code/name 小写别名匹配参数ID，例如 ['o3'] 或 ['pm25','pm2.5']"""
    RL.wait()
    recs = client.parameters.list(limit=MAX_LIMIT).dict()["results"]
    aliases_lc = {a.lower() for a in aliases}
    for r in recs:
        code = str(r.get("code","")).lower()
        name = str(r.get("name","")).lower()
        if code in aliases_lc or name in aliases_lc:
            return int(r["id"])
    raise RuntimeError(f"未在 OpenAQ 参数列表中找到 {aliases}。")

def list_locations_for_country(client: OpenAQ, country_row: pd.DataFrame, parameters_id: List[int]) -> pd.DataFrame:
    rows = []
    geo_kwargs = {}
    if WITHIN_WKT:
        geo_kwargs["within"] = WITHIN_WKT
        if WITHIN_RADIUS_KM: geo_kwargs["radius"] = float(WITHIN_RADIUS_KM)
    if BBOX and isinstance(BBOX,(list,tuple)) and len(BBOX)==4:
        geo_kwargs["bbox"] = BBOX

    cid = int(country_row["countries_id"])
    page = 1
    while True:
        tries = 0
        while True:
            try:
                RL.wait()
                resp = client.locations.list(
                    countries_id=cid,
                    parameters_id=parameters_id,   # ← 同时过滤 O3 + PM2.5
                    page=page, limit=MAX_LIMIT,
                    **geo_kwargs
                )
                break
            except Exception as e:
                tries += 1
                cls = _classify_exc(e)
                if cls == "rate":
                    reset = _parse_rate_reset(e) or 25
                    print(f"[RATE] locations {cid} 限流，等待 {reset}s ...")
                    time.sleep(reset + 1); continue
                if tries >= RETRY_MAX_TRIES:
                    print(f"[WARN] 国家 {cid} 第{page}页 locations 获取失败，跳过该页。原因: {e}")
                    resp = None; break
                _sleep_with_backoff(tries)
        if resp is None:
            page += 1
            if page > 999999: break
            continue

        results = resp.dict().get("results", [])
        for loc in results:
            rows.append({
                "countries_id": cid,
                "country_code": country_row["country_code"],
                "country_name": country_row["country_name"],
                "iso3": country_row.get("iso3"),
                "fasst_region": country_row.get("fasst_region"),
                "location_id": int(loc["id"]),
                "location_name": loc.get("name"),
                "lat": (loc.get("coordinates") or {}).get("latitude"),
                "lon": (loc.get("coordinates") or {}).get("longitude"),
            })
        if len(results) < MAX_LIMIT: break
        page += 1; time.sleep(REQUEST_SLEEP)

    return pd.DataFrame(rows).drop_duplicates(subset=["location_id"]).reset_index(drop=True)

def list_sensors_for_locations(client: OpenAQ, loc_df: pd.DataFrame, target_param_id: int) -> pd.DataFrame:
    sensors = []
    loc_map = {int(r.location_id): (float(r.lat) if pd.notna(r.lat) else None,
                                    float(r.lon) if pd.notna(r.lon) else None)
               for _, r in loc_df.iterrows()}

    def _run(ids: List[int]) -> List[int]:
        retry_these = []
        for loc in tqdm(ids, desc="sensors per location", unit="loc"):
            tries = 0
            while True:
                try:
                    RL.wait()
                    resp = client.locations.sensors(int(loc))  # /v3/locations/{id}/sensors
                    break
                except Exception as e:
                    cls = _classify_exc(e)
                    if cls == "notfound":
                        print(f"[WARN] 站点 {loc} 404，跳过。")
                        resp = None; break
                    if cls == "rate":
                        reset = _parse_rate_reset(e) or 25
                        print(f"[RATE] 站点 {loc} 限流，等待 {reset}s ...")
                        time.sleep(reset + 1); continue
                    tries += 1
                    if tries >= RETRY_MAX_TRIES:
                        print(f"[WARN] 站点 {loc} 传感器列表失败，进入补捞。原因: {e}")
                        retry_these.append(loc); resp = None; break
                    _sleep_with_backoff(tries)
            if resp is None: continue
            for s in resp.dict().get("results", []):
                pid = int((s.get("parameter") or {}).get("id") or -1)
                if pid == target_param_id:
                    lat, lon = loc_map.get(int(loc), (None, None))
                    sensors.append({
                        "location_id": int(loc),
                        "sensor_id": int(s["id"]),
                        "parameter_id": pid,
                        "parameter_name": (s.get("parameter") or {}).get("name"),
                        "unit": (s.get("parameter") or {}).get("units"),
                        "lat": lat, "lon": lon,
                    })
        return retry_these

    loc_ids = loc_df["location_id"].astype(int).tolist()
    failed = _run(loc_ids)
    if failed:
        print(f"[INFO] sensors 补捞轮，待补 {len(failed)} 个站点 ...")
        time.sleep(2); _ = _run(failed)

    return pd.DataFrame(sensors)


# --------- 小时数据抓取：/v3/sensors/{id}/hours ---------
def _iso(dt: datetime) -> str:
    return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")

def _gen_chunks(dt_from: datetime, dt_to: datetime, days: int) -> List[Tuple[datetime, datetime]]:
    out, cur, delta = [], dt_from, timedelta(days=days)
    while cur < dt_to:
        nxt = min(cur + delta, dt_to)
        out.append((cur, nxt)); cur = nxt
    return out

def _calc_total_pages(meta: dict | None, per_page: int) -> Optional[int]:
    """从响应 meta 安全计算总页数；meta.found 可能为数值/字符串/缺失。"""
    if not meta or per_page <= 0:
        return None
    found_raw = meta.get("found", None)
    if found_raw is None:
        return None
    try:
        if isinstance(found_raw, (int, float)):
            found_int = int(found_raw)
        else:
            found_int = int(re.sub(r"[^\d]", "", str(found_raw)))  # 去掉逗号等分隔符
        if found_int <= 0:
            return 0
        return math.ceil(found_int / float(per_page))
    except Exception:
        return None

def _http_get_json(session: requests.Session, url: str, params: Dict) -> Dict:
    RL.wait()
    resp = session.get(url, params=params, timeout=REQUEST_TIMEOUT)
    if resp.status_code == 429:
        ra = _parse_rate_reset_from_resp(resp) or 25
        time.sleep(ra + 1); RL.wait()
        resp = session.get(url, params=params, timeout=REQUEST_TIMEOUT)
    if resp.status_code >= 400:
        resp.raise_for_status()
    return resp.json()

def _quick_count_hours(session: requests.Session, sid: int, dt_from: datetime, dt_to: datetime) -> Optional[int]:
    """快速预检某传感器在窗口内的总条数（limit=1）。返回 0/正整数/None(未知)。"""
    url = f"{BASE_URL}/sensors/{sid}/hours"
    params = {
        "limit": 1, "page": 1,
        "date_from": _iso(dt_from), "date_to": _iso(dt_to),
        "order_by": "datetime", "sort_order": "asc",
    }
    tries = 0
    while True:
        try:
            j = _http_get_json(session, url, params)
            meta = j.get("meta", {}) or {}
            total_pages = _calc_total_pages(meta, 1)
            if total_pages is None:
                # meta 不提供 found；以首批结果估计
                return len(j.get("results", []) or [])
            # pages == found（因为 per_page=1）
            return int(total_pages)
        except Exception as e:
            tries += 1
            cls = _classify_exc(e)
            if cls == "rate":
                reset = _parse_rate_reset(e) or 25
                print(f"[RATE] 预检 {sid} 限流，等待 {reset}s ...")
                time.sleep(reset + 1); continue
            if tries >= RETRY_MAX_TRIES:
                print(f"[WARN] 预检 {sid} 失败，跳过预估：{e}")
                return None
            _sleep_with_backoff(tries)

def _fetch_one_chunk_to_csv(session: requests.Session, sid: int, loc: int,
                            chunk_from: datetime, chunk_to: datetime,
                            csv_path: str, wrote_header_flag: List[bool]) -> int:
    url = f"{BASE_URL}/sensors/{sid}/hours"
    total_rows = 0
    for lim in PAGE_LIMITS_TRY:
        page, bad = 1, 0
        while True:
            tries = 0
            while True:
                try:
                    params = {
                        "limit": lim, "page": page,
                        "date_from": _iso(chunk_from), "date_to": _iso(chunk_to),
                        "order_by": "datetime", "sort_order": "asc",
                    }
                    j = _http_get_json(session, url, params)
                    break
                except Exception as e:
                    tries += 1
                    cls = _classify_exc(e)
                    if cls == "rate":
                        reset = _parse_rate_reset(e) or 25
                        print(f"[RATE] sid={sid} chunk {chunk_from:%F}->{chunk_to:%F} 等待 {reset}s ...")
                        time.sleep(reset + 1); continue
                    if tries >= RETRY_MAX_TRIES:
                        bad += 1; break
                    _sleep_with_backoff(tries)

            if tries >= RETRY_MAX_TRIES:
                page += 1
                if bad >= 3: break
                continue

            results = j.get("results", []) or []
            if results:
                batch = []
                for m in results:
                    val = m.get("value")
                    if val is None and isinstance(m.get("summary"), dict):
                        val = m["summary"].get("avg")

                    ts = None
                    dt_field = m.get("datetime")
                    if isinstance(dt_field, dict):
                        ts = dt_field.get("utc") or dt_field.get("value") or dt_field.get("local")
                    elif dt_field:
                        ts = dt_field
                    if not ts:
                        period = m.get("period") or {}
                        if isinstance(period, dict):
                            dt_from = period.get("datetimeFrom") or period.get("from") or {}
                            if isinstance(dt_from, dict):
                                ts = dt_from.get("utc") or dt_from.get("local")
                            elif dt_from:
                                ts = dt_from
                    if not ts:
                        coverage = m.get("coverage") or {}
                        if isinstance(coverage, dict):
                            dt_from = coverage.get("datetimeFrom") or {}
                            if isinstance(dt_from, dict):
                                ts = dt_from.get("utc") or dt_from.get("local")
                            elif dt_from:
                                ts = dt_from
                    if not ts:
                        dt_candidate = m.get("date") or m.get("timestamp")
                        if isinstance(dt_candidate, dict):
                            ts = dt_candidate.get("utc") or dt_candidate.get("local")
                        elif dt_candidate:
                            ts = dt_candidate

                    if ts is None or val is None:
                        continue

                    ts_parsed = pd.to_datetime(ts, utc=True, errors="coerce")
                    if pd.isna(ts_parsed):
                        continue

                    try:
                        valf = float(val)
                    except Exception:
                        continue

                    unit = m.get("unit")
                    if not unit and isinstance(m.get("parameter"), dict):
                        unit = m["parameter"].get("units") or m["parameter"].get("unit")

                    batch.append({
                        "sensor_id": sid,
                        "location_id": loc,
                        "datetime_utc": ts_parsed.strftime("%Y-%m-%dT%H:%M:%SZ"),
                        "value": valf,
                        "unit": unit,
                    })
                if batch:
                    dfb = pd.DataFrame(batch)
                    dfb.to_csv(csv_path, mode="a", index=False, header=(not wrote_header_flag[0]))
                    wrote_header_flag[0] = True
                    total_rows += len(dfb)

            meta = j.get("meta", {}) or {}
            total_pages = _calc_total_pages(meta, lim)
            if total_pages is not None and page >= total_pages:
                break
            if len(results) < lim:
                break
            page += 1; time.sleep(REQUEST_SLEEP)

        if total_rows > 0:
            return total_rows
    return -1

def _fetch_chunk_recursive(session: requests.Session, sid: int, loc: int,
                           chunk_from: datetime, chunk_to: datetime,
                           csv_path: str, wrote_header_flag: List[bool],
                           depth: int) -> int:
    rows = _fetch_one_chunk_to_csv(session, sid, loc, chunk_from, chunk_to, csv_path, wrote_header_flag)
    if rows >= 0: return max(rows, 0)
    if depth >= MAX_CHUNK_SPLIT_DEPTH: return 0
    mid = chunk_from + (chunk_to - chunk_from) / 2
    print(f"  ↪ sid={sid} 分裂块: {chunk_from:%F}->{mid:%F} / {mid:%F}->{chunk_to:%F}")
    left = _fetch_chunk_recursive(session, sid, loc, chunk_from, mid, csv_path, wrote_header_flag, depth + 1)
    right = _fetch_chunk_recursive(session, sid, loc, mid, chunk_to, csv_path, wrote_header_flag, depth + 1)
    return left + right

def fetch_hours_stream_to_csv(
    sensors_df: pd.DataFrame,
    csv_path: str,
    datetime_from: Optional[str] = None,
    datetime_to: Optional[str] = None,
    desc: str = "fetch /sensors/{id}/hours"
):
    os.makedirs(os.path.dirname(csv_path), exist_ok=True)
    if os.path.exists(csv_path): os.remove(csv_path)
    wrote_header_flag = [False]

    if sensors_df.empty:
        pd.DataFrame(columns=["sensor_id","location_id","datetime_utc","value","unit"]).to_csv(csv_path, index=False)
        return 0

    dt_to = pd.to_datetime(datetime_to, utc=True).to_pydatetime()
    dt_from = pd.to_datetime(datetime_from, utc=True).to_pydatetime()

    session = _build_http_session(API_KEY)
    total_rows = 0

    # 预检：先判断每个传感器在整个窗口内是否有数据
    jobs = []
    print(f"[预检] 估算每个传感器在 {dt_from:%F} ~ {dt_to:%F} 的数据条数（limit=1） ...")
    for _, r in sensors_df.iterrows():
        sid = int(r["sensor_id"]); loc = int(r["location_id"])
        cnt = _quick_count_hours(session, sid, dt_from, dt_to)
        jobs.append((sid, loc, cnt))
        if cnt is not None:
            print(f"  - sid={sid} 预估条数≈{cnt}")
        else:
            print(f"  - sid={sid} 预估条数未知（继续抓取）")
    # 跳过明确为 0 的
    jobs = [(sid,loc,cnt) for (sid,loc,cnt) in jobs if (cnt is None or cnt > 0)]
    if not jobs:
        pd.DataFrame(columns=["sensor_id","location_id","datetime_utc","value","unit"]).to_csv(csv_path, index=False)
        session.close(); return 0

    pbar = tqdm(jobs, desc=desc, unit="sensor")
    for sid, loc, cnt in pbar:
        print(f"[sid={sid}] 预计条数≈{cnt if cnt is not None else '未知'}，开始分块抓取 ...")
        chunks = _gen_chunks(dt_from, dt_to, HOURS_CHUNK_DAYS)
        rows_this_sensor = 0
        for i, (c_from, c_to) in enumerate(chunks, 1):
            print(f"  · chunk {i}/{len(chunks)}: {c_from:%F} → {c_to:%F}")
            rows_this_sensor += _fetch_chunk_recursive(
                session, sid, loc, c_from, c_to, csv_path, wrote_header_flag, depth=0
            )
        total_rows += rows_this_sensor
        print(f"[sid={sid}] 完成，新增 {rows_this_sensor} 行。")

    session.close()
    return total_rows


# ========================== CSV 自检 ==========================
def verify_csv(csv_path: str, print_rows: int = 5) -> None:
    """基本完整性检查：列存在、时间可解析、去重后行数、样例预览。"""
    if not os.path.exists(csv_path):
        print(f"[VERIFY] 文件不存在：{csv_path}"); return
    df = pd.read_csv(csv_path)
    required = ["sensor_id","location_id","datetime_utc","value","unit"]
    missing = [c for c in required if c not in df.columns]
    if missing:
        print(f"[VERIFY] 缺少列：{missing}"); return

    # 解析时间并去重（同一 sensor + 时间）
    df["datetime_utc"] = pd.to_datetime(df["datetime_utc"], utc=True, errors="coerce")
    before = len(df)
    df = df.dropna(subset=["datetime_utc","value"])
    df = df.drop_duplicates(subset=["sensor_id","datetime_utc"])
    after = len(df)

    nunique_sensors = df["sensor_id"].nunique()
    time_min = df["datetime_utc"].min()
    time_max = df["datetime_utc"].max()
    print(f"[VERIFY] ✅ CSV OK | 行数(去重后): {after}/{before} | 传感器数: {nunique_sensors} | 时间范围: {time_min} ~ {time_max}")
    print(df.head(print_rows).to_string(index=False))


# ========================== 主流程 ==========================
def main():
    if not API_KEY or API_KEY.startswith("PUT-"):
        raise SystemExit("请先配置 OPENAQ_API_KEY（v3 必须有 Key）。")

    _maybe_disable_env_proxies()
    os.makedirs(OUTPUT_DIR, exist_ok=True)

    # 时间窗
    dt_from_s = _parse_iso_z(DATETIME_FROM)
    dt_to_s   = _parse_iso_z(DATETIME_TO)

    print("读取国家 CSV ...")
    regions_df = load_regions_csv(COUNTRY_CSV_PATH, COUNTRY_CSV_COLUMNS)

    client = OpenAQ(api_key=API_KEY)
    try:
        print("映射国家 → OpenAQ countries_id ...")
        country_map = map_countries_to_openaq_ids(client, regions_df)
        if country_map.empty:
            raise RuntimeError("Country CSV rows could not be mapped to OpenAQ countries.")
        print(f"国家数：{len(country_map)}")

        print("获取参数ID ...")
        o3_id = get_parameter_id(client, ["o3","ozone"])
        pm25_id = get_parameter_id(client, ["pm25","pm2.5"])  # 也可直接写 2，但稳妥起见动态获取

        for idx, crow in country_map.iterrows():
            cc = crow["country_code"]; cname = crow["country_name"]
            print("=" * 60)
            print(f"[{idx+1}/{len(country_map)}] 国家 {cname} ({cc}) — 列出含 O3/PM2.5 的站点 ...")

            # 1) 列出该国含 O3 或 PM2.5 的站点
            loc_df = list_locations_for_country(client, crow, [o3_id, pm25_id])
            loc_out = os.path.join(OUTPUT_DIR, f"{cc}_site_catalog.csv")
            loc_df.to_csv(loc_out, index=False)
            print(f"[OK] 站点清单：{loc_out}（{len(loc_df)} 个）")

            if loc_df.empty:
                for pol in ("o3","pm25"):
                    hourly_path = os.path.join(OUTPUT_DIR, f"{cc}_hourly_{pol}.csv")
                    pd.DataFrame(columns=["sensor_id","location_id","datetime_utc","value","unit"]).to_csv(hourly_path, index=False)
                print(f"[PAUSE] 国家间暂停 {PER_COUNTRY_PAUSE_SEC}s ...")
                time.sleep(PER_COUNTRY_PAUSE_SEC); continue

            # 2) 每国按站点抽样（为了公平，不区分参数先抽样再筛传感器）
            if MAX_LOCATIONS_PER_COUNTRY and MAX_LOCATIONS_PER_COUNTRY > 0 and len(loc_df) > MAX_LOCATIONS_PER_COUNTRY:
                loc_df = loc_df.sample(n=MAX_LOCATIONS_PER_COUNTRY, random_state=42).reset_index(drop=True)

            # 3) 列出 O3 与 PM2.5 的传感器（把 lat/lon 带上）
            print(f"[{cc}] 列出 O3 传感器 ...")
            o3_sensors_df = list_sensors_for_locations(client, loc_df, o3_id)
            print(f"[{cc}] 列出 PM2.5 传感器 ...")
            pm25_sensors_df = list_sensors_for_locations(client, loc_df, pm25_id)

            # 4) 逐小时抓取与写出
            hourly_o3 = os.path.join(OUTPUT_DIR, f"{cc}_hourly_o3.csv")
            hourly_pm25 = os.path.join(OUTPUT_DIR, f"{cc}_hourly_pm25.csv")

            if not o3_sensors_df.empty:
                print(f"[{cc}] 抓取逐小时 O3 并写入：{hourly_o3} ...")
                nrows_o3 = fetch_hours_stream_to_csv(
                    o3_sensors_df, hourly_o3, datetime_from=dt_from_s, datetime_to=dt_to_s,
                    desc="fetch /sensors/{id}/hours (O3)"
                )
                print(f"[OK] {cc} O3 写入 {nrows_o3} 条小时数据。")
                verify_csv(hourly_o3)
            else:
                pd.DataFrame(columns=["sensor_id","location_id","datetime_utc","value","unit"]).to_csv(hourly_o3, index=False)

            if not pm25_sensors_df.empty:
                print(f"[{cc}] 抓取逐小时 PM2.5 并写入：{hourly_pm25} ...")
                nrows_pm25 = fetch_hours_stream_to_csv(
                    pm25_sensors_df, hourly_pm25, datetime_from=dt_from_s, datetime_to=dt_to_s,
                    desc="fetch /sensors/{id}/hours (PM2.5)"
                )
                print(f"[OK] {cc} PM2.5 写入 {nrows_pm25} 条小时数据。")
                verify_csv(hourly_pm25)
            else:
                pd.DataFrame(columns=["sensor_id","location_id","datetime_utc","value","unit"]).to_csv(hourly_pm25, index=False)

            # 5) 国家之间暂停（30s）
            print(f"[PAUSE] 国家间暂停 {PER_COUNTRY_PAUSE_SEC}s ...")
            time.sleep(PER_COUNTRY_PAUSE_SEC)

        print("=" * 60)
        print("[DONE] 全部国家处理完成。所有 CSV 已写入 output/")

    finally:
        client.close()

if __name__ == "__main__":
    main()
