In [13]:
import requests
BASE_URL = "https://api.openaq.org/v3"
OPENAQ_API_KEY = "c9c135bc0c771160d7c9a1f2f7dbefa5f66acb0a50cde7b0a43aa950cba44624"
HEADERS = {"X-API-Key": OPENAQ_API_KEY}

def openaq_get(path: str, params=None, timeout=60):
    url = f"{BASE_URL}{path}"
    return requests.get(url, headers=HEADERS, params=params or {}, timeout=timeout)
resp = openaq_get("/locations", {"limit": 1, "page": 1})
print("status:", resp.status_code)
print(resp.text[:300])

StatementMeta(, a86bbdd6-8b24-4667-a762-70eb8131b581, 15, Finished, Available, Finished)

status: 200
{"meta":{"name":"openaq-api","website":"/","page":1,"limit":1,"found":">1"},"results":[{"id":3,"name":"NMA - Nima","locality":null,"timezone":"Africa/Accra","country":{"id":152,"code":"GH","name":"Ghana"},"owner":{"id":4,"name":"Unknown Governmental Organization"},"provider":{"id":209,"name":"Dr. Ra


In [14]:
import math
import pandas as pd
from datetime import datetime, timezone

NYC_BBOX = (-74.25909, 40.477399, -73.700272, 40.917577)  # lon_min, lat_min, lon_max, lat_max. Границы ньюйорка

def fetch_all_pages(path, base_params, page_size=1000, max_pages=1000):
    out = []
    page = 1
    while page <= max_pages:
        params = dict(base_params)
        params.update({"limit": page_size, "page": page})
        r = openaq_get(path, params)
        if r.status_code != 200: raise RuntimeError(f"Request failed {r.status_code}: {r.text[:200]}")
        js = r.json()
        results = js.get("results", [])
        out.extend(results)
        meta = js.get("meta", {})
        found = meta.get("found", 0)
        if isinstance(found, str) and found.startswith(">"): pass
        if len(results) < page_size: break
        page += 1
    return out

# 1) NYC locations
lon_min, lat_min, lon_max, lat_max = NYC_BBOX
locations = fetch_all_pages(
    "/locations",
    base_params={"bbox": f"{lon_min},{lat_min},{lon_max},{lat_max}","sort": "id"},
    page_size=1000,max_pages=200
)
print("NYC locations fetched:", len(locations))
loc_df = pd.json_normalize(locations)
cols_keep = [c for c in loc_df.columns if c in ["id","name","locality","timezone","country.code","country.name","coordinates.latitude","coordinates.longitude","isMobile","isAnalysis","isSource"]]
loc_df = loc_df[cols_keep].copy()
loc_df["ingested_at_utc"] = datetime.now(timezone.utc).isoformat()
display(loc_df.head(10))
print("Unique timezones:", loc_df["timezone"].nunique() if "timezone" in loc_df.columns else "n/a")

StatementMeta(, a86bbdd6-8b24-4667-a762-70eb8131b581, 16, Finished, Available, Finished)

NYC locations fetched: 55


SynapseWidget(Synapse.DataFrame, f0709ab1-9a75-426c-a619-227d9ec333ef)

Unique timezones: 1


In [15]:
import pandas as pd
import time
from datetime import datetime, timezone
from concurrent.futures import ThreadPoolExecutor, as_completed

location_ids = loc_df["id"].dropna().astype(int).tolist()
print("Location IDs:", len(location_ids))

def get_location_sensors(location_id: int, max_retries=6):
    # endpoint: /v3/locations/{id}/sensors
    params = {"limit": 1000, "page": 1, "sort": "id"}
    path = f"/locations/{location_id}/sensors"
    attempt = 0
    while True:
        r = openaq_get(path, params)
        if r.status_code == 200:
            js = r.json()
            rows = js.get("results", [])
            for x in rows:x["location_id"] = location_id
            return rows
        if r.status_code in (429, 500, 502, 503, 504) and attempt < max_retries:
            sleep_s = min(2 ** attempt, 30)
            time.sleep(sleep_s)
            attempt += 1
            continue
        raise RuntimeError(f"Failed for location {location_id}: {r.status_code} {r.text[:200]}")
        
MAX_WORKERS = 6
all_sensors = []
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:
    futs = {ex.submit(get_location_sensors, lid): lid for lid in location_ids}
    for fut in as_completed(futs):
        all_sensors.extend(fut.result())

sens_df = pd.json_normalize(all_sensors).drop_duplicates(subset=["id"]).copy()
sens_df["ingested_at_utc"] = datetime.now(timezone.utc).isoformat()
print("Sensors fetched (unique):", sens_df["id"].nunique())
print("Columns:", len(sens_df.columns))

param_cols = [c for c in sens_df.columns if "parameter" in c.lower()]
print("Parameter-related columns:", param_cols[:10])
if "parameter.name" in sens_df.columns:
    counts = sens_df["parameter.name"].value_counts().head(20)
    display(counts)
else:display(sens_df.head(10))

StatementMeta(, a86bbdd6-8b24-4667-a762-70eb8131b581, 17, Finished, Available, Finished)

Location IDs: 55
Sensors fetched (unique): 194
Columns: 41
Parameter-related columns: ['parameter.id', 'parameter.name', 'parameter.units', 'parameter.displayName']


SynapseWidget(Synapse.DataFrame, 713d341a-5823-406c-aa1a-d06f3bf7fa2c)

In [16]:
display(sens_df["parameter.name"].value_counts().head(30))
pm_mask  = sens_df["parameter.name"].str.contains("pm25|pm2\\.5", case=False, na=False)
no2_mask = sens_df["parameter.name"].str.contains("^no2$|nitrogen", case=False, na=False)
print("PM2.5 sensors:", sens_df.loc[pm_mask, "id"].nunique())
print("NO2 sensors:",  sens_df.loc[no2_mask, "id"].nunique())
time_cols = [c for c in sens_df.columns if c.lower().startswith("datetimefirst") or c.lower().startswith("datetimelast")]
print("time cols:", time_cols)

StatementMeta(, a86bbdd6-8b24-4667-a762-70eb8131b581, 18, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 1f47b3da-dbd0-4638-9e37-fd0ab171e43f)

PM2.5 sensors: 49
NO2 sensors: 6
time cols: ['datetimeFirst.utc', 'datetimeFirst.local', 'datetimeLast.utc', 'datetimeLast.local', 'datetimeFirst', 'datetimeLast']


In [17]:
import pandas as pd

sens_df["datetimeFirst.utc"] = pd.to_datetime(sens_df["datetimeFirst.utc"], utc=True, errors="coerce")
sens_df["datetimeLast.utc"]  = pd.to_datetime(sens_df["datetimeLast.utc"],  utc=True, errors="coerce")
pm_df  = sens_df[sens_df["parameter.name"].str.contains("pm25|pm2\\.5", case=False, na=False)].copy()
no2_df = sens_df[sens_df["parameter.name"].str.contains("^no2$|nitrogen", case=False, na=False)].copy()
print("PM sensors:", pm_df["id"].nunique(), "NO2 sensors:", no2_df["id"].nunique())

months = pd.period_range("2018-01", "2020-12", freq="M")
def count_covering(df, month: pd.Period):
    start = month.start_time.tz_localize("UTC")
    end   = (month.end_time + pd.Timedelta(seconds=1)).tz_localize("UTC")
    ok = (df["datetimeFirst.utc"] <= start) & (df["datetimeLast.utc"] >= end)
    return int(df.loc[ok, "id"].nunique())

rows = []
for m in months:
    rows.append({
        "month": str(m),
        "pm25_sensors_covering_full_month": count_covering(pm_df, m),
        "no2_sensors_covering_full_month":  count_covering(no2_df, m),
    })
cov = pd.DataFrame(rows)
cov["min_both"] = cov[["pm25_sensors_covering_full_month","no2_sensors_covering_full_month"]].min(axis=1)
cov = cov.sort_values(["min_both","no2_sensors_covering_full_month","pm25_sensors_covering_full_month"], ascending=False)
display(cov.head(12))
#проверка для 2019-03
m = pd.Period("2019-03", freq="M")
print("2019-03 PM2.5 covering:", count_covering(pm_df, m))
print("2019-03 NO2 covering:",  count_covering(no2_df, m))

StatementMeta(, a86bbdd6-8b24-4667-a762-70eb8131b581, 19, Finished, Available, Finished)

PM sensors: 49 NO2 sensors: 6


SynapseWidget(Synapse.DataFrame, 3462d588-7528-4dd2-9b08-bc5c2fd5fc32)

2019-03 PM2.5 covering: 13
2019-03 NO2 covering: 5


In [18]:
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed

month = pd.Period("2019-03", freq="M")
dt_from = month.start_time.tz_localize("UTC").strftime("%Y-%m-%dT%H:%M:%SZ")
dt_to   = (month.end_time + pd.Timedelta(hours=23, minutes=59, seconds=59)).tz_localize("UTC").strftime("%Y-%m-%dT%H:%M:%SZ")
def covering_full_month(df):
    start = month.start_time.tz_localize("UTC")
    end   = (month.end_time + pd.Timedelta(seconds=1)).tz_localize("UTC")
    ok = (df["datetimeFirst.utc"] <= start) & (df["datetimeLast.utc"] >= end)
    return df.loc[ok].copy()
pm_full  = covering_full_month(pm_df)
no2_full = covering_full_month(no2_df)
pm_sample  = pm_full["id"].dropna().astype(int).unique()[:3].tolist()
no2_sample = no2_full["id"].dropna().astype(int).unique()[:3].tolist()

print("PM2.5 sample sensors:", pm_sample)
print("NO2 sample sensors:", no2_sample)
print("datetime_from:", dt_from, "datetime_to:", dt_to)

def fetch_hours(sensor_id: int, limit=5):
    path = f"/sensors/{sensor_id}/hours"
    params = {
        "datetime_from": dt_from,
        "datetime_to": dt_to,
        "limit": limit,
        "page": 1
    }
    r = openaq_get(path, params)
    if r.status_code != 200:
        return {"sensor_id": sensor_id, "status": r.status_code, "n": None, "first_dt": None, "last_dt": None, "err": r.text[:200]}
    js = r.json()
    res = js.get("results", [])
    dts = []
    for x in res:
        for k in ["datetime", "date", "time", "period", "utc", "timestamp"]:
            if k in x:dts.append(str(x[k]))
            break
    return {"sensor_id": sensor_id, "status": 200, "n": len(res), "first_dt": dts[0] if dts else None, "last_dt": dts[-1] if dts else None, "err": None}

sample_ids = pm_sample + no2_sample
rows = []
with ThreadPoolExecutor(max_workers=3) as ex:
    futs = {ex.submit(fetch_hours, sid): sid for sid in sample_ids}
    for fut in as_completed(futs):
        rows.append(fut.result())
check_df = pd.DataFrame(rows).sort_values(["status","sensor_id"])
display(check_df)


StatementMeta(, a86bbdd6-8b24-4667-a762-70eb8131b581, 20, Finished, Available, Finished)

PM2.5 sample sensors: [1152, 1102, 1097]
NO2 sample sensors: [1535, 3951, 3638]
datetime_from: 2019-03-01T00:00:00Z datetime_to: 2019-04-01T23:59:58Z


SynapseWidget(Synapse.DataFrame, 8f9e8750-9c43-4132-8479-c08d5447c69e)

In [19]:
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed

dt_from = "2019-03-01T00:00:00Z"
dt_to   = "2019-04-01T00:00:00Z"
pm_sample  = [1102, 1097, 1103]
no2_sample = [1535, 3638, 2644]
def fetch_days(sensor_id: int, limit=5):
    path = f"/sensors/{sensor_id}/days"
    params = {"datetime_from": dt_from,"datetime_to": dt_to,"limit": limit,"page": 1}
    r = openaq_get(path, params)
    if r.status_code != 200:return {"sensor_id": sensor_id, "status": r.status_code, "n": None, "err": r.text[:200]}
    js = r.json()
    res = js.get("results", [])
    first = res[0] if res else {}
    return {"sensor_id": sensor_id,"status": 200,"n": len(res),"first_row_keys": sorted(list(first.keys()))[:12],"err": None}

sample_ids = pm_sample + no2_sample
rows = []
with ThreadPoolExecutor(max_workers=3) as ex:
    futs = {ex.submit(fetch_days, sid): sid for sid in sample_ids}
    for fut in as_completed(futs):rows.append(fut.result())
check_days = pd.DataFrame(rows).sort_values(["status","sensor_id"])
display(check_days)

StatementMeta(, a86bbdd6-8b24-4667-a762-70eb8131b581, 21, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 2da2a455-ad6b-452e-a2d2-974fd964508a)

In [20]:
# финальный набор сенсоров на выбранный месяц
pm_ids  = pm_full["id"].dropna().astype(int).unique().tolist()
no2_ids = no2_full["id"].dropna().astype(int).unique().tolist()
sensor_ids = sorted(set(pm_ids + no2_ids))

print("PM full-month sensors:", len(pm_ids))
print("NO2 full-month sensors:", len(no2_ids))
print("TOTAL sensors to ingest:", len(sensor_ids))
print("Sample:", sensor_ids[:10])

StatementMeta(, a86bbdd6-8b24-4667-a762-70eb8131b581, 22, Finished, Available, Finished)

PM full-month sensors: 13
NO2 full-month sensors: 5
TOTAL sensors to ingest: 18
Sample: [673, 1097, 1102, 1103, 1128, 1143, 1145, 1146, 1152, 1534]


In [21]:
import pandas as pd
import time
from datetime import datetime, timezone
from concurrent.futures import ThreadPoolExecutor, as_completed

DT_FROM = "2019-03-01T00:00:00Z"
DT_TO   = "2019-04-01T00:00:00Z" 
print("TOTAL sensors to ingest:", len(sensor_ids))
print("Period:", DT_FROM, "→", DT_TO)

meta_cols = ["location_id", "parameter.id", "parameter.name", "parameter.units", "parameter.displayName"]
sensor_meta = (sens_df.set_index("id")[meta_cols].to_dict(orient="index"))

def fetch_sensor_days(sensor_id: int, page_size=1000, max_pages=50, max_retries=6):
    path = f"/sensors/{sensor_id}/days"
    out = []
    page = 1
    while page <= max_pages:
        params = {"date_from": DT_FROM,"date_to":DT_TO,"limit": page_size,"page": page}
        attempt = 0
        while True:
            r = openaq_get(path, params)
            if r.status_code == 200:
                js = r.json()
                res = js.get("results", [])
                for row in res:out.append({"sensor_id": sensor_id, "raw": row})
                if len(res) < page_size:return out
                page += 1
                break
            if r.status_code in (429, 500, 502, 503, 504) and attempt < max_retries:
                time.sleep(min(2 ** attempt, 30))
                attempt += 1
                continue
            raise RuntimeError(f"/days failed sensor={sensor_id} status={r.status_code} body={r.text[:300]}")
    return out

MAX_WORKERS = 4
rows = []
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:
    futs = {ex.submit(fetch_sensor_days, sid): sid for sid in sensor_ids}
    for fut in as_completed(futs):
        rows.extend(fut.result())
print("Fetched raw day-rows:", len(rows))

def flatten(rec):
    sid = rec["sensor_id"]
    raw = rec["raw"] or {}
    meta = sensor_meta.get(sid, {})
    period = raw.get("period") or {}
    dt_from = (period.get("datetimeFrom") or {}).get("utc")
    dt_to   = (period.get("datetimeTo") or {}).get("utc")
    coverage = raw.get("coverage") or {}
    cov_expected = coverage.get("expectedCount")
    cov_observed = coverage.get("observedCount")
    cov_interval = coverage.get("expectedInterval")

    return {
        "sensor_id": sid,
        "location_id": meta.get("location_id"),
        "parameter_id": meta.get("parameter.id"),
        "parameter_name": meta.get("parameter.name"),
        "parameter_units": meta.get("parameter.units"),
        "parameter_displayName": meta.get("parameter.displayName"),
        "period_datetime_from_utc": dt_from,
        "period_datetime_to_utc": dt_to,
        "value": raw.get("value"),
        "coverage_expectedCount": cov_expected,
        "coverage_observedCount": cov_observed,
        "coverage_expectedInterval": cov_interval,
        "ingested_at_utc": datetime.now(timezone.utc).isoformat(),
    }

days_df = pd.DataFrame([flatten(r) for r in rows])
days_df["period_datetime_from_utc"] = pd.to_datetime(days_df["period_datetime_from_utc"], utc=True, errors="coerce")
days_df["period_datetime_to_utc"]   = pd.to_datetime(days_df["period_datetime_to_utc"], utc=True, errors="coerce")
print("days_df shape:", days_df.shape)
display(days_df.head(25))
qc = (days_df.groupby(["parameter_name","sensor_id"])
      .size()
      .reset_index(name="n_days")
      .sort_values(["parameter_name","n_days"], ascending=[True, False]))
display(qc.head(30))


StatementMeta(, a86bbdd6-8b24-4667-a762-70eb8131b581, 23, Finished, Available, Finished)

TOTAL sensors to ingest: 18
Period: 2019-03-01T00:00:00Z → 2019-04-01T00:00:00Z
Fetched raw day-rows: 256
days_df shape: (256, 13)


SynapseWidget(Synapse.DataFrame, 518c0d92-106c-4090-9310-9284b5d1c31e)

SynapseWidget(Synapse.DataFrame, e046254c-4667-49d1-8922-94bd88732404)

In [22]:
print("unique sensors in days_df:", days_df["sensor_id"].nunique())
print(days_df["parameter_name"].value_counts())

StatementMeta(, a86bbdd6-8b24-4667-a762-70eb8131b581, 24, Finished, Available, Finished)

unique sensors in days_df: 8
parameter_name
no2     160
pm25     96
Name: count, dtype: int64


In [23]:
print("unique sensors in days_df:", days_df["sensor_id"].nunique())
print(days_df["parameter_name"].value_counts())

StatementMeta(, a86bbdd6-8b24-4667-a762-70eb8131b581, 25, Finished, Available, Finished)

unique sensors in days_df: 8
parameter_name
no2     160
pm25     96
Name: count, dtype: int64


In [24]:
print("Expected:", len(sensor_ids))
print("Present:", days_df["sensor_id"].nunique())

StatementMeta(, a86bbdd6-8b24-4667-a762-70eb8131b581, 26, Finished, Available, Finished)

Expected: 18
Present: 8


In [25]:
import pandas as pd
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

DT_FROM = "2019-03-01T00:00:00Z"
DT_TO   = "2019-04-01T00:00:00Z"

pm_all_ids = (sens_df[sens_df["parameter.name"].str.contains("pm25|pm2\\.5", case=False, na=False)]["id"].dropna().astype(int).unique().tolist())

def probe_days_found(sensor_id: int, max_retries=6):
    path = f"/sensors/{sensor_id}/days"
    params = {"date_from": DT_FROM, "date_to": DT_TO, "limit": 1, "page": 1}
    attempt = 0
    while True:r = openaq_get(path, params)
    if r.status_code == 200:
            js = r.json()
            meta = js.get("meta", {})
            found = meta.get("found", 0)
            s = str(found)
            if s.startswith(">"):s = s[1:]
            try:found_int = int(s)
            except:found_int = 0
            return {"sensor_id": sensor_id, "status": 200, "found": found_int}
    if r.status_code in (429, 500, 502, 503, 504) and attempt < max_retries:
            time.sleep(min(2 ** attempt, 30))
            attempt += 1
            continue
    return {"sensor_id": sensor_id, "status": r.status_code, "found": 0}

MAX_WORKERS = 6
rows = []
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:futs = {ex.submit(probe_days_found, sid): sid for sid in pm_all_ids}
for fut in as_completed(futs): rows.append(fut.result())
probe_df = pd.DataFrame(rows)
ok_pm = probe_df[(probe_df["status"] == 200) & (probe_df["found"] > 0)].copy()

print("PM2.5 sensors total:", len(pm_all_ids))
print("PM2.5 sensors with DAYS in 2019-03:", ok_pm.shape[0])
display(ok_pm.sort_values("found", ascending=False).head(25))

StatementMeta(, a86bbdd6-8b24-4667-a762-70eb8131b581, 27, Finished, Available, Finished)

PM2.5 sensors total: 49
PM2.5 sensors with DAYS in 2019-03: 3


SynapseWidget(Synapse.DataFrame, c6d0ba03-a537-4373-8106-305aabbeed63)

In [26]:
# import pandas as pd
# import time
# from concurrent.futures import ThreadPoolExecutor, as_completed

# months = pd.period_range("2018-04", "2019-03", freq="M")

# pm_ids  = (sens_df[sens_df["parameter.name"].str.contains("pm25|pm2\\.5", case=False, na=False)]
#            ["id"].dropna().astype(int).unique().tolist())
# no2_ids = (sens_df[sens_df["parameter.name"].str.contains("^no2$|nitrogen", case=False, na=False)]
#            ["id"].dropna().astype(int).unique().tolist())

# def probe_month(sensor_id: int, dt_from: str, dt_to: str, max_retries=6):
#     path = f"/sensors/{sensor_id}/days"
#     params = {"date_from": dt_from, "date_to": dt_to, "limit": 1, "page": 1}
#     attempt = 0
#     while True:r = openaq_get(path, params)
#         if r.status_code == 200:js = r.json()
#             found = js.get("meta", {}).get("found", 0)
#             s = str(found)
#             if s.startswith(">"):s = s[1:]
#             try:found_int = int(s)
#             except:found_int = 0
#             return found_int
#         if r.status_code in (429, 500, 502, 503, 504) and attempt < max_retries:
#             time.sleep(min(2 ** attempt, 20))
#             attempt += 1
#             continue
#         return 0

# def count_sensors_with_data(sensor_list, dt_from, dt_to, max_workers=8):
#     cnt = 0
#     with ThreadPoolExecutor(max_workers=max_workers) as ex:
#         futs = {ex.submit(probe_month, sid, dt_from, dt_to): sid for sid in sensor_list}
#         for fut in as_completed(futs):
#             if fut.result() > 0:
#                 cnt += 1
#     return cnt

# rows = []
# for m in months:
#     dt_from = f"{m.start_time.date()}T00:00:00Z"
#     dt_to   = f"{(m.end_time + pd.Timedelta(days=1)).date()}T00:00:00Z"  # exclusive
#     pm_ok  = count_sensors_with_data(pm_ids, dt_from, dt_to, max_workers=8)
#     no2_ok = count_sensors_with_data(no2_ids, dt_from, dt_to, max_workers=6)
#     rows.append({"month": str(m), "pm25_days_sensors": pm_ok, "no2_days_sensors": no2_ok, "min_both": min(pm_ok, no2_ok)})
#     print(str(m), "PM:", pm_ok, "NO2:", no2_ok)

# best = pd.DataFrame(rows).sort_values(["min_both","no2_days_sensors","pm25_days_sensors"], ascending=False)
# display(best.head(12))

StatementMeta(, a86bbdd6-8b24-4667-a762-70eb8131b581, 28, Finished, Available, Finished)

In [27]:
import pandas as pd
import time
from datetime import datetime, timezone
from concurrent.futures import ThreadPoolExecutor, as_completed

if "openaq_get" not in globals():raise NameError("openaq_get is not defined. Run the setup cell that defines openaq_get().")

if "sens_df" not in globals():raise NameError("sens_df is not defined. Run the cell that builds sens_df (NYC sensors).")

DT_FROM = "2019-01-01T00:00:00Z"
DT_TO   = "2019-02-01T00:00:00Z"
pm_ok_ids  = [673, 1097, 1103, 1128, 1143, 1145, 1152, 1534, 1758, 1761, 25520]
no2_ok_ids = [1535, 2644, 3638, 3951, 23341]
sensor_ids_best = sorted(set(pm_ok_ids + no2_ok_ids))

print("Period:", DT_FROM, "→", DT_TO)
print("TOTAL sensors:", len(sensor_ids_best), "| PM2.5:", len(pm_ok_ids), "| NO2:", len(no2_ok_ids))
meta_cols = ["location_id", "parameter.id", "parameter.name", "parameter.units", "parameter.displayName"]
missing_cols = [c for c in meta_cols if c not in sens_df.columns]
if missing_cols:raise ValueError(f"sens_df is missing columns: {missing_cols}. Check your sensors extraction.")

sensor_meta = (sens_df.set_index("id")[meta_cols].to_dict(orient="index"))
def fetch_sensor_days(sensor_id: int, page_size=1000, max_pages=50, max_retries=6):
    path = f"/sensors/{sensor_id}/days"
    out = []
    page = 1
    while page <= max_pages:
        params = {
            "date_from": DT_FROM,
            "date_to":   DT_TO,
            "limit": page_size,
            "page": page
        }
        attempt = 0
        while True:
            r = openaq_get(path, params)
            if r.status_code == 200: 
                js = r.json()
                res = js.get("results", [])
                for row in res:out.append({"sensor_id": sensor_id, "raw": row})
                if len(res) < page_size:return out
                page += 1
                break
            if r.status_code in (429, 500, 502, 503, 504) and attempt < max_retries: 
                time.sleep(min(2 ** attempt, 20))
                attempt += 1
                continue
            raise RuntimeError(f"/days failed sensor={sensor_id} status={r.status_code} body={r.text[:200]}")
    return out
rows = []
with ThreadPoolExecutor(max_workers=4) as ex:
    futs = {ex.submit(fetch_sensor_days, sid): sid for sid in sensor_ids_best}
    for fut in as_completed(futs):rows.extend(fut.result())
print("Fetched raw day-rows:", len(rows))

def flatten(rec):
    sid = rec["sensor_id"]
    raw = rec["raw"] or {}
    meta = sensor_meta.get(sid, {})
    period = raw.get("period") or {}
    dt_from = (period.get("datetimeFrom") or {}).get("utc")
    dt_to   = (period.get("datetimeTo") or {}).get("utc")
    coverage = raw.get("coverage") or {}
    return {
        "sensor_id": sid,
        "location_id": meta.get("location_id"),
        "parameter_id": meta.get("parameter.id"),
        "parameter_name": meta.get("parameter.name"),
        "parameter_units": meta.get("parameter.units"),
        "parameter_displayName": meta.get("parameter.displayName"),
        "period_datetime_from_utc": dt_from,
        "period_datetime_to_utc": dt_to,
        "value": raw.get("value"),
        "coverage_expectedCount": coverage.get("expectedCount"),
        "coverage_observedCount": coverage.get("observedCount"),
        "coverage_expectedInterval": coverage.get("expectedInterval"),
        "ingested_at_utc": datetime.now(timezone.utc).isoformat(),
    }

days_df = pd.DataFrame([flatten(r) for r in rows])
days_df["period_datetime_from_utc"] = pd.to_datetime(days_df["period_datetime_from_utc"], utc=True, errors="coerce")
days_df["period_datetime_to_utc"]   = pd.to_datetime(days_df["period_datetime_to_utc"], utc=True, errors="coerce")

print("days_df shape:", days_df.shape)
print("unique sensors:", days_df["sensor_id"].nunique())
print(days_df["parameter_name"].value_counts())
qc = days_df.groupby(["parameter_name","sensor_id"]).size().reset_index(name="n_days").sort_values(["parameter_name","n_days"], ascending=[True, False])
display(qc)
spark_df = spark.createDataFrame(days_df)
target_table = "bronze_openaq_sensor_days_v2"
spark_df.write.format("delta").mode("overwrite").saveAsTable(target_table))
print("Saved table:", target_table)
spark.sql("SHOW TABLES").show(50, truncate=False)

StatementMeta(, a86bbdd6-8b24-4667-a762-70eb8131b581, 29, Finished, Available, Finished)

Period: 2019-01-01T00:00:00Z → 2019-02-01T00:00:00Z
TOTAL sensors: 16 | PM2.5: 11 | NO2: 5
Fetched raw day-rows: 490
days_df shape: (490, 13)
unique sensors: 16
parameter_name
pm25    330
no2     160
Name: count, dtype: int64


SynapseWidget(Synapse.DataFrame, 244ac481-82bf-4d3c-ac3a-712c2ead23fe)

AnalysisException: [_LEGACY_ERROR_TEMP_DELTA_0007] A schema mismatch detected when writing to the Delta table (Table ID: 73add2bf-bf26-4e14-a0c9-d7bd389dbd12).
To enable schema migration using DataFrameWriter or DataStreamWriter, please set:
'.option("mergeSchema", "true")'.
For other operations, set the session configuration
spark.databricks.delta.schema.autoMerge.enabled to "true". See the documentation
specific to the operation for details.

Table schema:
root
-- sensor_id: long (nullable = true)
-- location_id: void (nullable = true)
-- parameter_id: long (nullable = true)
-- parameter_name: string (nullable = true)
-- parameter_units: string (nullable = true)
-- parameter_displayName: string (nullable = true)
-- period_datetime_from_utc: timestamp (nullable = true)
-- period_datetime_to_utc: timestamp (nullable = true)
-- value: double (nullable = true)
-- coverage_expectedCount: long (nullable = true)
-- coverage_observedCount: long (nullable = true)
-- coverage_expectedInterval: string (nullable = true)
-- ingested_at_utc: string (nullable = true)


Data schema:
root
-- sensor_id: long (nullable = true)
-- location_id: long (nullable = true)
-- parameter_id: long (nullable = true)
-- parameter_name: string (nullable = true)
-- parameter_units: string (nullable = true)
-- parameter_displayName: string (nullable = true)
-- period_datetime_from_utc: timestamp (nullable = true)
-- period_datetime_to_utc: timestamp (nullable = true)
-- value: double (nullable = true)
-- coverage_expectedCount: long (nullable = true)
-- coverage_observedCount: long (nullable = true)
-- coverage_expectedInterval: string (nullable = true)
-- ingested_at_utc: string (nullable = true)

         
To overwrite your schema or change partitioning, please set:
'.option("overwriteSchema", "true")'.

Note that the schema can't be overwritten when using
'replaceWhere'.
         

In [28]:
spark_df = spark.createDataFrame(days_df)

target_table = "bronze_openaq_sensor_days_v2"
spark_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true")   # <-- ключевое.saveAsTable(target_table))
print("Saved table with overwriteSchema:", target_table)
spark.sql("SHOW TABLES").show(50, truncate=False)

StatementMeta(, a86bbdd6-8b24-4667-a762-70eb8131b581, 30, Finished, Available, Finished)

Saved table with overwriteSchema: bronze_openaq_sensor_days_v2
+----------------------------------------+----------------------------+-----------+
|namespace                               |tableName                   |isTemporary|
+----------------------------------------+----------------------------+-----------+
|project_analytics_aly.lake_lakehouse.dbo|bronze_openaq_sensor_days_v2|false      |
+----------------------------------------+----------------------------+-----------+



In [29]:
from pyspark.sql import functions as F

loc_df2 = loc_df.drop_duplicates(subset=["id"]).copy()
spark_loc = spark.createDataFrame(loc_df2)
spark_loc = spark_loc.withColumn("ingested_at_utc", F.current_timestamp()).withColumn("source", F.lit("OpenAQ v3"))
spark_loc.write.format("delta").mode("overwrite").option("overwriteSchema","true").saveAsTable("dbo.bronze_openaq_locations_nyc_v2")
display(spark.table("dbo.bronze_openaq_locations_nyc_v2").limit(5))
spark.sql("SELECT COUNT(*) cnt, COUNT(DISTINCT id) distinct_id FROM dbo.bronze_openaq_locations_nyc_v2").show()

StatementMeta(, a86bbdd6-8b24-4667-a762-70eb8131b581, 31, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 235c00e9-7e5c-4829-b56a-c9ebb1f1e584)

+---+-----------+
|cnt|distinct_id|
+---+-----------+
| 55|         55|
+---+-----------+



In [30]:
from pyspark.sql import functions as F

sens_df2 = sens_df.drop_duplicates(subset=["id"]).copy()
spark_sens = spark.createDataFrame(sens_df2)
spark_sens = spark_sens.withColumn("ingested_at_utc", F.current_timestamp()).withColumn("source", F.lit("OpenAQ v3"))
spark_sens.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("dbo.bronze_openaq_location_sensors_nyc_v2")

StatementMeta(, a86bbdd6-8b24-4667-a762-70eb8131b581, 32, Finished, Available, Finished)