diff --git a/src/unified_graphics/diag.py b/src/unified_graphics/diag.py index d2a8e4de..dd0e01c9 100644 --- a/src/unified_graphics/diag.py +++ b/src/unified_graphics/diag.py @@ -2,10 +2,12 @@ from collections import namedtuple from dataclasses import dataclass from enum import Enum -from typing import Generator, List, Optional, Union +from pathlib import Path +from typing import Generator, List, Union from urllib.parse import urlparse import numpy as np +import pandas as pd import sqlalchemy as sa import xarray as xr import zarr # type: ignore @@ -75,44 +77,6 @@ def to_geojson(self): } -@dataclass -class SummaryStatistics: - min: float - max: float - mean: float - - @classmethod - def from_data_array(cls, array: xr.DataArray) -> "SummaryStatistics": - return cls( - min=float(array.min()), - max=float(array.max()), - mean=float(array.mean()), - ) - - -@dataclass -class DiagSummary: - initialization_time: str - obs_minus_forecast_adjusted: SummaryStatistics - obs_minus_forecast_unadjusted: SummaryStatistics - observation: SummaryStatistics - obs_count: int - - @classmethod - def from_dataset(cls, dataset: xr.Dataset) -> "DiagSummary": - return cls( - initialization_time=dataset.attrs["initialization_time"], - obs_minus_forecast_adjusted=SummaryStatistics.from_data_array( - dataset["obs_minus_forecast_adjusted"] - ), - obs_minus_forecast_unadjusted=SummaryStatistics.from_data_array( - dataset["obs_minus_forecast_unadjusted"] - ), - observation=SummaryStatistics.from_data_array(dataset["observation"]), - obs_count=len(dataset["nobs"]), - ) - - ModelMetadata = namedtuple( "ModelMetadata", ( @@ -486,65 +450,41 @@ def get_model_run_list( return group.group_keys() -def summary( - diag_zarr: str, +def history( + parquet_path: str, model: str, system: str, domain: str, background: str, frequency: str, - initialization_time: str, variable: Variable, loop: MinimLoop, filters: MultiDict, -) -> Optional[DiagSummary]: - store = get_store(diag_zarr) - path = "/".join( - [ - model, - system, - domain, - background, - frequency, - variable.value, - initialization_time, - loop.value, - ] +) -> pd.DataFrame: + # FIXME: This fails when diag_zarr is a file:// URL. Pandas ends up trying to use + # urlopen to read the file, but it's a directory. For now, we strip file://, but + # this is a hack. + parquet_file = ( + Path(parquet_path.replace("file://", "")) + / "_".join((model, background, system, domain, frequency)) + / variable.value ) - ds = xr.open_zarr(store, group=path, consolidated=False) - ds = apply_filters(ds, filters) - return DiagSummary.from_dataset(ds) if len(ds["nobs"]) > 0 else None - + df = pd.read_parquet( + parquet_file, + columns=["initialization_time", "obs_minus_forecast_unadjusted"], + filters=(("loop", "=", loop.value), ("is_used", "=", True)), + ) -def history( - diag_zarr: str, - model: str, - system: str, - domain: str, - background: str, - frequency: str, - variable: Variable, - loop: MinimLoop, - filters: MultiDict, -): - for init_time in get_model_run_list( - diag_zarr, model, system, domain, background, frequency, variable - ): - result = summary( - diag_zarr, - model, - system, - domain, - background, - frequency, - init_time, - variable, - loop, - filters, - ) + if df.empty: + return df - if not result: - continue + df = ( + df.sort_values("initialization_time") + .groupby("initialization_time") + .describe() + .droplevel(0, axis=1) # Drop a level from the columns created by the groupby + .reset_index() + ) - yield result + return df[["initialization_time", "min", "max", "mean", "count"]] diff --git a/src/unified_graphics/routes.py b/src/unified_graphics/routes.py index 52c6b072..6311d7c8 100644 --- a/src/unified_graphics/routes.py +++ b/src/unified_graphics/routes.py @@ -136,7 +136,7 @@ def serviceworker(): @bp.route("/diag////////") def history(model, system, domain, background, frequency, variable, loop): data = diag.history( - current_app.config["DIAG_ZARR"], + current_app.config["DIAG_PARQUET"], model, system, domain, @@ -147,7 +147,9 @@ def history(model, system, domain, background, frequency, variable, loop): request.args, ) - return jsonify([d for d in data]) + return data.to_json(orient="records", date_format="iso"), { + "Content-Type": "application/json" + } @bp.route( diff --git a/src/unified_graphics/static/js/component/ChartTimeSeries.js b/src/unified_graphics/static/js/component/ChartTimeSeries.js index d85da1dd..65a46f00 100644 --- a/src/unified_graphics/static/js/component/ChartTimeSeries.js +++ b/src/unified_graphics/static/js/component/ChartTimeSeries.js @@ -177,10 +177,7 @@ export default class ChartTimeSeries extends ChartElement { } get yScale() { - const domain = [ - min(this.#data, (d) => d.obs_minus_forecast_adjusted.min), - max(this.#data, (d) => d.obs_minus_forecast_adjusted.max), - ]; + const domain = [min(this.#data, (d) => d.min), max(this.#data, (d) => d.max)]; const { top, bottom } = this.margin; const height = this.height - top - bottom; @@ -196,12 +193,12 @@ export default class ChartTimeSeries extends ChartElement { const { xScale, yScale } = this; const rangeArea = area() .x((d) => xScale(d.initialization_time)) - .y0((d) => yScale(d.obs_minus_forecast_adjusted.min)) - .y1((d) => yScale(d.obs_minus_forecast_adjusted.max)) + .y0((d) => yScale(d.min)) + .y1((d) => yScale(d.max)) .curve(curveBumpX); const meanLine = line() .x((d) => xScale(d.initialization_time)) - .y((d) => yScale(d.obs_minus_forecast_adjusted.mean)) + .y((d) => yScale(d.mean)) .curve(curveBumpX); this.#svg.attr("viewBox", `0 0 ${this.width} ${this.height}`); diff --git a/src/unified_graphics/templates/layouts/diag.html b/src/unified_graphics/templates/layouts/diag.html index f1d25c76..8c80c1a2 100644 --- a/src/unified_graphics/templates/layouts/diag.html +++ b/src/unified_graphics/templates/layouts/diag.html @@ -22,11 +22,20 @@

{% if minim_loop == "ges" %}Guess{% else %}Analysis >Observation − Forecast {%- else -%} - - Observation Count - - Observation − Forecast - +
+ + Observation Count + + Observation − Forecast + + + + Observation − Forecast + + Initialization Time + +
{%- endif %} diff --git a/tests/conftest.py b/tests/conftest.py index 87db1059..9323cfc0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,5 @@ import os +from pathlib import Path from typing import Optional import alembic.command @@ -85,6 +86,7 @@ def session(engine): s.rollback() +# FIXME: Replace diag_dataset with this fixture @pytest.fixture(scope="class") def test_dataset(): def factory( @@ -99,7 +101,7 @@ def factory( loop: str = "ges", longitude: list[float] = [90, 91], latitude: list[float] = [22, 23], - is_used: list[int] = [1, 0], + is_used: list[bool] = [True, False], observation: list[float] = [1, 0], forecast_unadjusted: list[float] = [0, 1], forecast_adjusted: Optional[list[float]] = None, @@ -126,7 +128,7 @@ def factory( coords=dict( longitude=(["nobs"], np.array(longitude, dtype=np.float64)), latitude=(["nobs"], np.array(latitude, dtype=np.float64)), - is_used=(["nobs"], np.array(is_used, dtype=np.int8)), + is_used=(["nobs"], np.array(is_used)), **kwargs, ), attrs={ @@ -142,3 +144,26 @@ def factory( ) return factory + + +@pytest.fixture +def diag_parquet(tmp_path): + def factory( + ds: xr.Dataset, + ) -> Path: + parquet_file = ( + tmp_path + / "_".join((ds.model, ds.background, ds.system, ds.domain, ds.frequency)) + / ds.name + ) + df = ds.to_dataframe() + df["loop"] = ds.loop + df["initialization_time"] = ds.initialization_time + + df.to_parquet( + parquet_file, partition_cols=["loop"], index=True, engine="pyarrow" + ) + + return parquet_file + + return factory diff --git a/tests/test_unified_graphics.py b/tests/test_unified_graphics.py index 617d869e..2a026464 100644 --- a/tests/test_unified_graphics.py +++ b/tests/test_unified_graphics.py @@ -81,11 +81,12 @@ def uv(model, diag_zarr_path, test_dataset): @pytest.fixture -def app(diag_zarr_path, test_db): +def app(tmp_path, diag_zarr_path, test_db): _app = create_app( { "SQLALCHEMY_DATABASE_URI": test_db, "DIAG_ZARR": str(diag_zarr_path), + "DIAG_PARQUET": f"file://{tmp_path}", } ) @@ -144,14 +145,14 @@ def test_scalar_diag(t, client): } -def test_scalar_history(model, diag_zarr_path, client, test_dataset): +def test_scalar_history(model, diag_parquet, client, test_dataset): # Arrange run_list = [ { "initialization_time": "2022-05-16T04:00", "observation": [10, 20], "forecast_unadjusted": [5, 10], - "is_used": [1, 1], + "is_used": [True, True], # O - F [5, 10] }, { @@ -160,7 +161,7 @@ def test_scalar_history(model, diag_zarr_path, client, test_dataset): "forecast_unadjusted": [5, 10, 3], "longitude": [0, 0, 0], "latitude": [0, 0, 0], - "is_used": [1, 1, 1], + "is_used": [True, True, True], # O - F [-4, -8, 0] }, ] @@ -172,8 +173,7 @@ def test_scalar_history(model, diag_zarr_path, client, test_dataset): loop="ges", **run, ) - - save(diag_zarr_path, data) + diag_parquet(data) # Act response = client.get("/diag/3DRTMA/WCOSS/CONUS/HRRR/REALTIME/ps/ges/") @@ -182,60 +182,36 @@ def test_scalar_history(model, diag_zarr_path, client, test_dataset): assert response.json == [ { "initialization_time": "2022-05-16T04:00", - "obs_minus_forecast_adjusted": { - "min": 5.0, - "max": 10.0, - "mean": 7.5, - }, - "observation": { - "min": 10.0, - "max": 20.0, - "mean": 15.0, - }, - "obs_minus_forecast_unadjusted": { - "min": 5.0, - "max": 10.0, - "mean": 7.5, - }, - "obs_count": 2, + "min": 5.0, + "max": 10.0, + "mean": 7.5, + "count": 2, }, { "initialization_time": "2022-05-16T07:00", - "obs_minus_forecast_adjusted": { - "min": -8.0, - "max": 0.0, - "mean": -4.0, - }, - "observation": { - "min": 1.0, - "max": 3.0, - "mean": 2.0, - }, - "obs_minus_forecast_unadjusted": { - "min": -8.0, - "max": 0.0, - "mean": -4.0, - }, - "obs_count": 3, + "min": -8.0, + "max": 0.0, + "mean": -4.0, + "count": 3, }, ] -def test_scalar_history_unused(model, diag_zarr_path, client, test_dataset): +def test_scalar_history_unused(model, diag_parquet, client, test_dataset): # Arrange run_list = [ { "initialization_time": "2022-05-16T04:00", "observation": [10, 20], "forecast_unadjusted": [5, 10], - "is_used": [1, 0], + "is_used": [True, False], # O - F [5, 10] }, { "initialization_time": "2022-05-16T07:00", "observation": [1, 2], "forecast_unadjusted": [5, 10], - "is_used": [0, 1], + "is_used": [False, True], # O - F [-4, -8] }, ] @@ -247,7 +223,7 @@ def test_scalar_history_unused(model, diag_zarr_path, client, test_dataset): loop="ges", **run, ) - save(diag_zarr_path, data) + diag_parquet(data) # Act response = client.get("/diag/3DRTMA/WCOSS/CONUS/HRRR/REALTIME/ps/ges/") @@ -256,55 +232,31 @@ def test_scalar_history_unused(model, diag_zarr_path, client, test_dataset): assert response.json == [ { "initialization_time": "2022-05-16T04:00", - "obs_minus_forecast_adjusted": { - "min": 5.0, - "max": 5.0, - "mean": 5.0, - }, - "observation": { - "min": 10.0, - "max": 10.0, - "mean": 10.0, - }, - "obs_minus_forecast_unadjusted": { - "min": 5.0, - "max": 5.0, - "mean": 5.0, - }, - "obs_count": 1, + "min": 5.0, + "max": 5.0, + "mean": 5.0, + "count": 1, }, { "initialization_time": "2022-05-16T07:00", - "obs_minus_forecast_adjusted": { - "min": -8.0, - "max": -8.0, - "mean": -8.0, - }, - "observation": { - "min": 2.0, - "max": 2.0, - "mean": 2.0, - }, - "obs_minus_forecast_unadjusted": { - "min": -8.0, - "max": -8.0, - "mean": -8.0, - }, - "obs_count": 1, + "min": -8.0, + "max": -8.0, + "mean": -8.0, + "count": 1, }, ] -def test_scalar_history_empty(model, diag_zarr_path, client, test_dataset): +def test_scalar_history_empty(model, diag_parquet, test_dataset, client): # Arrange run_list = [ { "initialization_time": "2022-05-16T04:00", - "is_used": [0, 0], + "is_used": [False, False], }, { "initialization_time": "2022-05-16T07:00", - "is_used": [0, 0], + "is_used": [False, False], }, ] @@ -315,7 +267,7 @@ def test_scalar_history_empty(model, diag_zarr_path, client, test_dataset): loop="ges", **run, ) - save(diag_zarr_path, data) + diag_parquet(data) # Act response = client.get("/diag/3DRTMA/WCOSS/CONUS/HRRR/REALTIME/ps/ges/") @@ -325,7 +277,7 @@ def test_scalar_history_empty(model, diag_zarr_path, client, test_dataset): @pytest.mark.xfail -def test_vectory_history(): +def test_vector_history(): assert 0, "Not implemented" @@ -634,7 +586,7 @@ def test_diag_not_found(variable, client): ["t", "q", "ps", "uv"], ) def test_diag_read_error(variable, app, client): - Path(app.config["DIAG_ZARR"]).touch() + Path(app.config["DIAG_ZARR"].replace("file://", "")).touch() response = client.get( f"/diag/RTMA/WCOSS/CONUS/HRRR/REALTIME/{variable}/2022-05-05T14:00/ges/"