## EPA AQS acquisition: NYC regulatory  PM₂.₅ (parameter 88101), November 7–10, 2024

This notebook focuses **only on data acquisition** for EPA Air Quality System (AQS) measurements.

### What is EPA AQS?

EPA AQS is the United States **regulatory air-quality monitoring system**. AQS data come from instruments operated by government agencies and partners using standardized methods and quality assurance. In this lesson’s air-quality data landscape, AQS serves as a **“regulatory reference layer”** that is typically sparser than community sensor networks, but highly documented and widely used.

AQS PM₂.₅ data are especially useful for:

- Providing an **official reference** for PM₂.₅ conditions during an event window,
- Comparing **time patterns** against other sources (alignment and harmonization happen later),
- Connecting neighborhood-scale sensing to a **regional/regulatory baseline**.

### What this notebook does

- Downloads (or reuses) the EPA **AirData pre-generated** hourly PM₂.₅ file for 2024 (`hourly_88101_2024.zip`),
- Reads the CSV from the ZIP,
- Filters to **New York State** and the **five NYC counties** (Bronx, Kings, New York, Queens, Richmond),
- Filters to the event window **Nov 7–10, 2024** (local time),
- Writes acquisition outputs to `data/raw/epa/` for later integration.

No plotting, resampling, or integrated analysis is performed here. Those steps will happen later in
the `m201-air-quality-measures-integrated` notebook.


## Notes on the AirData file

This notebook uses EPA’s **AirData** distribution (a downloadable ZIP containing a CSV).

- The hourly PM₂.₅ file is large for a full year, so we keep the **ZIP** in `data/raw/epa/` and read the CSV directly from it.
- This notebook performs only the minimal filtering needed for the lesson (NYC + event window).


In [1]:
from pathlib import Path

# -------------------------------------------------------------------
# Paths
# -------------------------------------------------------------------
BASE_DIR = Path(".")
DATA_DIR = BASE_DIR / "data"
RAW_DIR = DATA_DIR / "raw" / "epa"
RAW_DIR.mkdir(parents=True, exist_ok=True)

print("RAW_DIR:", RAW_DIR.resolve())


RAW_DIR: C:\git\TOPSTSCHOOL-air-quality\data\raw\epa


In [3]:
"""
Download hourly PM2.5 (parameter 88101) for YEAR from EPA AirData (if needed),
then read the CSV from the ZIP into a pandas DataFrame.
"""

import zipfile
import requests
import pandas as pd

# -------------------------------------------------------------------
# Configuration
# -------------------------------------------------------------------
YEAR = 2024
PARAM_CODE = "88101"  # PM2.5 FRM/FEM (hourly file code)

AIRDATA_ZIP_URL = f"https://aqs.epa.gov/aqsweb/airdata/hourly_{PARAM_CODE}_{YEAR}.zip"
ZIP_PATH = RAW_DIR / f"hourly_{PARAM_CODE}_{YEAR}.zip"

# -------------------------------------------------------------------
# Download (if missing)
# -------------------------------------------------------------------
if ZIP_PATH.exists():
    print(f"ZIP already exists: {ZIP_PATH.name}")
else:
    print(f"Downloading: {AIRDATA_ZIP_URL}")
    resp = requests.get(AIRDATA_ZIP_URL, stream=True, timeout=120)
    resp.raise_for_status()
    with open(ZIP_PATH, "wb") as f:
        for chunk in resp.iter_content(chunk_size=1024 * 1024):
            if chunk:
                f.write(chunk)
    print(f"Saved: {ZIP_PATH.resolve()}")

# -------------------------------------------------------------------
# Read CSV from ZIP (there should be exactly one CSV inside)
# -------------------------------------------------------------------
with zipfile.ZipFile(ZIP_PATH, "r") as zf:
    csv_names = [n for n in zf.namelist() if n.lower().endswith(".csv")]
    if not csv_names:
        raise RuntimeError("No CSV file found inside the EPA ZIP.")
    csv_name = csv_names[0]
    print(f"Reading {csv_name} from ZIP...")

    with zf.open(csv_name) as f:
        # low_memory=False avoids mixed-type warnings.
        df = pd.read_csv(f, low_memory=False)

print("EPA raw shape:", df.shape)
print("Example columns:", df.columns.tolist()[:20])


Downloading: https://aqs.epa.gov/aqsweb/airdata/hourly_88101_2024.zip
Saved: C:\git\TOPSTSCHOOL-air-quality\data\raw\epa\hourly_88101_2024.zip
Reading hourly_88101_2024.csv from ZIP...
EPA raw shape: (8115840, 24)
Example columns: ['State Code', 'County Code', 'Site Num', 'Parameter Code', 'POC', 'Latitude', 'Longitude', 'Datum', 'Parameter Name', 'Date Local', 'Time Local', 'Date GMT', 'Time GMT', 'Sample Measurement', 'Units of Measure', 'MDL', 'Uncertainty', 'Qualifier', 'Method Type', 'Method Code']


In [4]:
"""
Normalize column names, filter to NYC, and subset to the event window.

We use the AirData columns `date_local` and `time_local` to build a local datetime.
"""

import re

def _snake(s: str) -> str:
    s = s.strip().lower()
    s = re.sub(r"[^a-z0-9]+", "_", s)
    s = re.sub(r"_+", "_", s).strip("_")
    return s

# -------------------------------------------------------------------
# 1) Normalize column names
# -------------------------------------------------------------------
df.columns = [_snake(c) for c in df.columns]

# -------------------------------------------------------------------
# 2) Ensure key ID columns are strings with zero-padding
# -------------------------------------------------------------------
def _zp(series, width):
    return series.astype(str).str.strip().str.zfill(width)

for col, width in [("state_code", 2), ("county_code", 3), ("site_num", 4)]:
    if col in df.columns:
        df[col] = _zp(df[col], width)

# -------------------------------------------------------------------
# 3) Filter to NYC counties in NY (state FIPS 36)
# -------------------------------------------------------------------
STATE_NY = "36"
NYC_COUNTIES = {
    "005": "Bronx",
    "047": "Kings",
    "061": "New York",
    "081": "Queens",
    "085": "Richmond",
}

if "state_code" not in df.columns or "county_code" not in df.columns:
    raise RuntimeError("Expected columns state_code and county_code not found after normalization.")

df_nyc = df[(df["state_code"] == STATE_NY) & (df["county_code"].isin(NYC_COUNTIES.keys()))].copy()
print("NYC subset shape:", df_nyc.shape)
print("NYC counties present:", sorted(df_nyc["county_code"].unique().tolist()))

# -------------------------------------------------------------------
# 4) Build datetime_local
# -------------------------------------------------------------------
if "date_local" not in df_nyc.columns or "time_local" not in df_nyc.columns:
    raise RuntimeError("Expected date_local/time_local columns not found in AirData file.")

# Make time_local consistently HH:MM (it is often already like '00:00')
t = df_nyc["time_local"].astype(str).str.strip()

# If time is '0' or '1', treat as hour
mask_hour_only = t.str.fullmatch(r"\d{1,2}")
t.loc[mask_hour_only] = t.loc[mask_hour_only].str.zfill(2) + ":00"

# If time is 'H:MM', pad hour to 2 digits
mask_h_mm = t.str.fullmatch(r"\d{1,2}:\d{2}")
t.loc[mask_h_mm] = t.loc[mask_h_mm].str.replace(r"^(\d{1}):", r"0\1:", regex=True)

df_nyc["datetime_local"] = pd.to_datetime(
    df_nyc["date_local"].astype(str).str.strip() + " " + t,
    errors="coerce",
)

print("datetime_local nulls:", int(df_nyc["datetime_local"].isna().sum()))
print("NYC datetime_local min/max:", df_nyc["datetime_local"].min(), df_nyc["datetime_local"].max())

# -------------------------------------------------------------------
# 5) Filter to the event window (local time)
# -------------------------------------------------------------------
EVENT_START = pd.to_datetime("2024-11-07")
EVENT_END   = pd.to_datetime("2024-11-11")  # open interval after 2024-11-10

mask = (df_nyc["datetime_local"] >= EVENT_START) & (df_nyc["datetime_local"] < EVENT_END)
df_event = df_nyc.loc[mask].copy()

print("NYC event subset shape:", df_event.shape)
df_event[["datetime_local", "state_code", "county_code"]].head()


NYC subset shape: (51885, 24)
NYC counties present: ['005', '047', '081']
datetime_local nulls: 0
NYC datetime_local min/max: 2024-01-01 00:00:00 2024-12-31 23:00:00
NYC event subset shape: (574, 25)


Unnamed: 0,datetime_local,state_code,county_code
4885633,2024-11-07 00:00:00,36,5
4885634,2024-11-07 01:00:00,36,5
4885635,2024-11-07 02:00:00,36,5
4885636,2024-11-07 03:00:00,36,5
4885637,2024-11-07 04:00:00,36,5


In [5]:
"""
Create a simplified event-window table and write outputs to data/raw/epa/.

Outputs:
- Site metadata table (unique sites) -> SouthBronxQuantAQDevices style analogue for EPA
- Event-window measurements -> epa_pm25_nyc_20241107_10.(parquet|csv)
"""

# ------------------------------------------------------------------
# If df_event is empty, warn and skip writing files
# ------------------------------------------------------------------
if df_event.empty:
    print(
        "WARNING: df_event is empty.\n"
        "Check the printed datetime_local min/max above and adjust EVENT_START/EVENT_END\n"
        "or use a different year/file for EPA data."
    )
else:
    # ------------------------------------------------------------------
    # 1) Build a site_id (state + county + site_num)
    # ------------------------------------------------------------------
    if "site_num" not in df_event.columns:
        # Some AirData files may use site_number, but hourly files typically use site_num.
        # We'll attempt a fallback.
        for alt in ["site_number", "site"]:
            if alt in df_event.columns:
                df_event["site_num"] = df_event[alt].astype(str).str.zfill(4)
                break
        if "site_num" not in df_event.columns:
            raise RuntimeError("Could not find a site number column (site_num/site_number/site).")

    df_event["site_id"] = df_event["state_code"] + df_event["county_code"] + df_event["site_num"]

    # ------------------------------------------------------------------
    # 2) Pick core measurement/value column(s)
    # ------------------------------------------------------------------
    # Hourly AirData files typically use sample_measurement for the numeric value.
    value_col = "sample_measurement" if "sample_measurement" in df_event.columns else None
    if value_col is None:
        # fallback: look for any plausible value column
        candidates = [c for c in df_event.columns if "sample" in c and "measure" in c]
        if candidates:
            value_col = candidates[0]
        else:
            raise RuntimeError("Could not identify the measurement value column (sample_measurement).")

    # Optional columns
    unit_col = "units_of_measure" if "units_of_measure" in df_event.columns else None
    poc_col  = "poc" if "poc" in df_event.columns else None

    # Latitude/longitude columns vary slightly; try common options
    lat_col = next((c for c in ["latitude", "lat"] if c in df_event.columns), None)
    lon_col = next((c for c in ["longitude", "lon", "long"] if c in df_event.columns), None)

    # ------------------------------------------------------------------
    # 3) Build simplified event table
    # ------------------------------------------------------------------
    columns_out = {
        "datetime_local": df_event["datetime_local"],
        "site_id": df_event["site_id"],
        "state_code": df_event["state_code"],
        "county_code": df_event["county_code"],
        "site_num": df_event["site_num"],
        "parameter_code": df_event["parameter_code"] if "parameter_code" in df_event.columns else PARAM_CODE,
        "value": df_event[value_col],
    }
    if unit_col:
        columns_out["units"] = df_event[unit_col]
    if poc_col:
        columns_out["poc"] = df_event[poc_col]
    if lat_col:
        columns_out["latitude"] = df_event[lat_col]
    if lon_col:
        columns_out["longitude"] = df_event[lon_col]

    df_event_simple = pd.DataFrame(columns_out).sort_values("datetime_local").reset_index(drop=True)

    print("Simplified EPA event DataFrame shape:", df_event_simple.shape)
    display(df_event_simple.head())

    # ------------------------------------------------------------------
    # 4) Site metadata table (unique sites)
    # ------------------------------------------------------------------
    meta_cols = ["site_id", "state_code", "county_code", "site_num"]
    if lat_col: meta_cols.append("latitude")
    if lon_col: meta_cols.append("longitude")

    df_sites = df_event_simple[meta_cols].drop_duplicates().reset_index(drop=True)
    print("Unique sites in event window:", df_sites.shape[0])
    display(df_sites.head())

    # ------------------------------------------------------------------
    # 5) Write outputs to data/raw/epa/
    # ------------------------------------------------------------------
    out_base = "epa_pm25_nyc_20241107_10"
    out_csv = RAW_DIR / f"{out_base}.csv"
    df_event_simple.to_csv(out_csv, index=False)
    print(f"Wrote CSV: {out_csv.resolve()}")

    # Parquet is preferred for the integrated notebook; write if available.
    out_parquet = RAW_DIR / f"{out_base}.parquet"
    try:
        df_event_simple.to_parquet(out_parquet, index=False)
        print(f"Wrote Parquet: {out_parquet.resolve()}")
    except Exception as e:
        print("Parquet write failed (install pyarrow or fastparquet to enable).")
        print("Error:", repr(e))

    sites_csv = RAW_DIR / "NYC_EPA_AQS_sites_eventwindow_20241107_10.csv"
    df_sites.to_csv(sites_csv, index=False)
    print(f"Wrote site metadata CSV: {sites_csv.resolve()}")

    sites_parquet = RAW_DIR / "NYC_EPA_AQS_sites_eventwindow_20241107_10.parquet"
    try:
        df_sites.to_parquet(sites_parquet, index=False)
        print(f"Wrote site metadata Parquet: {sites_parquet.resolve()}")
    except Exception as e:
        print("Parquet write failed for site metadata.")
        print("Error:", repr(e))


Simplified EPA event DataFrame shape: (574, 11)


Unnamed: 0,datetime_local,site_id,state_code,county_code,site_num,parameter_code,value,units,poc,latitude,longitude
0,2024-11-07,360050110,36,5,110,88101,8.2,Micrograms/cubic meter (LC),4,40.816,-73.902
1,2024-11-07,360470118,36,47,118,88101,13.9,Micrograms/cubic meter (LC),4,40.69454,-73.92769
2,2024-11-07,360810124,36,81,124,88101,13.2,Micrograms/cubic meter (LC),4,40.73614,-73.82153
3,2024-11-07,360050112,36,5,112,88101,8.1,Micrograms/cubic meter (LC),4,40.81551,-73.88553
4,2024-11-07,360810125,36,81,125,88101,11.6,Micrograms/cubic meter (LC),4,40.739264,-73.817694


Unique sites in event window: 6


Unnamed: 0,site_id,state_code,county_code,site_num,latitude,longitude
0,360050110,36,5,110,40.816,-73.902
1,360470118,36,47,118,40.69454,-73.92769
2,360810124,36,81,124,40.73614,-73.82153
3,360050112,36,5,112,40.81551,-73.88553
4,360810125,36,81,125,40.739264,-73.817694


Wrote CSV: C:\git\TOPSTSCHOOL-air-quality\data\raw\epa\epa_pm25_nyc_20241107_10.csv
Wrote Parquet: C:\git\TOPSTSCHOOL-air-quality\data\raw\epa\epa_pm25_nyc_20241107_10.parquet
Wrote site metadata CSV: C:\git\TOPSTSCHOOL-air-quality\data\raw\epa\NYC_EPA_AQS_sites_eventwindow_20241107_10.csv
Wrote site metadata Parquet: C:\git\TOPSTSCHOOL-air-quality\data\raw\epa\NYC_EPA_AQS_sites_eventwindow_20241107_10.parquet
