Skip to content

Commit

Permalink
Merge pull request #249 from zmoon/cli-aqs
Browse files Browse the repository at this point in the history
CLI: initial AQS support
  • Loading branch information
zmoon committed May 1, 2024
2 parents 22502cf + 96dada8 commit ebf7c7b
Show file tree
Hide file tree
Showing 2 changed files with 286 additions and 0 deletions.
245 changes: 245 additions & 0 deletions melodies_monet/_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import time
from contextlib import contextmanager
from pathlib import Path
from typing import List

try:
import typer
Expand Down Expand Up @@ -914,6 +915,250 @@ def get_utc_offset(*, lat, lon):
ds.to_netcdf(dst / out_name)


@app.command()
def get_aqs(
start_date: str = typer.Option(..., "-s", "--start-date", help=f"Start date. {_DATE_FMT_NOTE}"),
end_date: str = typer.Option(..., "-e", "--end-date", help=f"End date. {_DATE_FMT_NOTE} {_DATE_END_NOTE}"),
daily: bool = typer.Option(False, help=(
"Whether to retrieve the daily averaged data product. "
"By default, the hourly data is fetched."
)
),
param: List[str] = typer.Option(["O3", "PM2.5", "PM10"], "-p", "--params", help=(
"Parameter groups. "
"Use '-p' more than once to get multiple groups. "
"Other examples: 'SPEC' (speciated PM2.5), 'PM10SPEC' (speciated PM10), "
"'VOC', 'NONOxNOy', 'SO2', 'NO2', 'CO', 'PM2.5_FRM'."
)
),
# TODO: add network selection option once working in monetio
out_name: str = typer.Option(None, "-o",
help=(
"Output file name (or full/relative path). "
"By default the name is generated like 'AQS_<start-date>_<end-date>.nc'."
)
),
dst: Path = typer.Option(".", "-d", "--dst", help=(
"Destination directory (to control output location "
"if using default output file name)."
)
),
compress: bool = typer.Option(True, help=(
"If true, pack float to int and apply compression using zlib with complevel 7. "
"This can take time if the dataset is large, but can lead to "
"significant space savings."
)
),
num_workers: int = typer.Option(1, "-n", "--num-workers", help="Number of download workers."),
verbose: bool = typer.Option(False),
debug: bool = typer.Option(
False, "--debug/", help="Print more messages (including full tracebacks)."
),
):
"""Download EPA AQS data using monetio and reformat for MM usage.
These are archived data, stored in per-year per-parameter-group files, described at
https://aqs.epa.gov/aqsweb/airdata/download_files.html
Recent-past data are generally not available from this source.
"""
import warnings

import monetio as mio
import pandas as pd

from .util.write_util import write_ncf

global DEBUG

DEBUG = debug

if verbose:
from dask.diagnostics import ProgressBar

ProgressBar().register()

typer.echo(HEADER)

start_date = pd.Timestamp(start_date)
end_date = pd.Timestamp(end_date)
dates = pd.date_range(start_date, end_date, freq="H" if not daily else "D")
if verbose:
print("Dates:")
print(dates)
print("Params:")
print(param)

# Set destination and file name
fmt = r"%Y%m%d"
if out_name is None:
out_name = f"AQS_{start_date:{fmt}}_{end_date:{fmt}}.nc"
else:
p = Path(out_name)
if p.name == out_name:
# `out_name` is just the file name
out_name = p.name
else:
# `out_name` has path
if dst != Path("."):
typer.echo(f"warning: overriding `dst` setting {dst.as_posix()!r} with `out_name` {p.as_posix()!r}")
dst = p.parent
out_name = p.name

with _timer("Fetching data with monetio"):
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore",
message="The (error|warn)_bad_lines argument has been deprecated"
)
try:
df = mio.aqs.add_data(
dates,
param=param,
daily=daily,
network=None,
download=False,
local=False,
wide_fmt=True, # column for each variable
n_procs=num_workers,
meta=False, # TODO: enable or add option once monetio fixes released
)
except KeyError as e:
if daily and str(e) == "'time'":
typer.echo("Note that the daily option currently requires monetio >0.2.5")
raise

if not daily:
with _timer("Fetching site metadata"):
# Need UTC offset in order to compute local time
# But currently the `meta=True` option doesn't work
meta0 = pd.read_csv(
"https://aqs.epa.gov/aqsweb/airdata/aqs_sites.zip",
encoding="ISO-8859-1",
usecols=[0, 1, 2, 17],
dtype=str,
)
meta = (
meta0.copy()
.assign(
siteid=meta0["State Code"] + meta0["County Code"] + meta0["Site Number"],
utcoffset=meta0["GMT Offset"].astype(int),
)
.drop(
columns=["State Code", "County Code", "Site Number", "GMT Offset"],
)
)

with _timer("Forming xarray Dataset"):
# Select requested time period (older monetio doesn't do this)
df = df[df.time.between(dates[0], dates[-1], inclusive="both")]

df = df.dropna(subset=["latitude", "longitude"])

# Variables associated with a measurement,
# currently not properly useful in the wide format.
if daily:
v_vns = [
"parameter_code",
"poc",
"parameter_name",
"sample_duration",
"pollutant_standard",
"event_type",
"observation_count",
"observation_percent",
"1st_max_value",
"1st_max_hour",
"aqi",
"method_code",
"method_name",
]
else:
v_vns = [
"parameter_code",
"poc", # parameter occurrence code
"parameter_name",
"mdl", # method detection limit
"uncertainty",
"method_type",
"method_code",
"method_name",
]
df = df.drop(columns=v_vns).drop_duplicates()
# TODO: may be better to get long fmt and drop these first and then pivot
# TODO: option to average duplicate measurements at same site instead of keeping first?
if "datum" in df:
df = df.drop(columns=["datum"])

site_vns = [
"siteid",
"state_code",
"county_code",
"site_num",
"latitude",
"longitude",
]
if daily:
site_vns.extend(["local_site_name", "address", "city_name", "msa_name"])
# NOTE: time_local not included since it varies in time as well
if not daily:
df = df.merge(meta, on="siteid", how="left")
site_vns.append("utcoffset")

ds_site = (
df[site_vns]
.groupby("siteid")
.first()
.to_xarray()
.swap_dims(siteid="x")
)

# Extract units info so we can add as attrs
unit_suff = "_unit"
unit_cols = [n for n in df.columns if n.endswith(unit_suff)]
assert (df[unit_cols].nunique() == 1).all()
units = df[unit_cols][~df[unit_cols].isnull()].iloc[0].to_dict()

cols = [n for n in df.columns if not n.endswith(unit_suff)]
ds = (
df[cols]
.drop(columns=[vn for vn in site_vns if vn != "siteid"])
.drop_duplicates(["time", "siteid"], keep="first")
.set_index(["time", "siteid"])
.to_xarray()
.swap_dims(siteid="x")
.merge(ds_site)
.set_coords(["latitude", "longitude"])
.assign(x=range(ds_site.dims["x"]))
)

# Add units
for k, u in units.items():
vn = k[:-len(unit_suff)]
ds[vn].attrs.update(units=u)

# Fill in local time array
# (in the df, not all sites have rows for all times, so we have NaTs at this point)
if not daily:
ds["time_local"] = ds.time + ds.utcoffset.astype("timedelta64[h]")

# Expand
ds = (
ds
.expand_dims("y")
.transpose("time", "y", "x")
)

# Can't have `/` in variable name for netCDF
to_rename = [vn for vn in ds.data_vars if "/" in vn]
ds = ds.rename_vars({vn: vn.replace("/", "_") for vn in to_rename})

with _timer("Writing netCDF file"):
if compress:
write_ncf(ds, dst / out_name, verbose=verbose)
else:
ds.to_netcdf(dst / out_name)


cli = app

Expand Down
41 changes: 41 additions & 0 deletions melodies_monet/tests/test_get_data_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,44 @@ def test_get_ish_box(tmp_path):

assert ds.time.size == 24
assert np.unique(ds.state) == ["CO"]


def test_get_aqs_daily(tmp_path):
fn = "x.nc"
cmd = [
"melodies-monet", "get-aqs",
"-s", "2019-08-01", "-e", "2019-08-02",
"-p", "O3",
"--daily",
"--dst", tmp_path.as_posix(), "-o", fn,
]
subprocess.run(cmd, check=True)

ds = xr.open_dataset(tmp_path / fn)

assert ds.time.size == 2, "two days"
assert {
v
for v in ds.data_vars
if ds[v].dims == ("time", "y", "x")
} == {"OZONE"}


def test_get_aqs_hourly(tmp_path):
fn = "x.nc"
cmd = [
"melodies-monet", "get-aqs",
"-s", "1980-08-01", "-e", "1980-08-01 23:00",
"-p", "O3",
"--dst", tmp_path.as_posix(), "-o", fn,
]
subprocess.run(cmd, check=True)

ds = xr.open_dataset(tmp_path / fn)

assert ds.time.size == 24, "one day"
assert {
v
for v in ds.data_vars
if ds[v].dims == ("time", "y", "x")
} == {"OZONE", "time_local"}

0 comments on commit ebf7c7b

Please sign in to comment.