Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog/712.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Fixed a `KeyError: 'branded_variable'` when solving diagnostics against CMIP7 datasets.
The `branded_variable` facet is now reconstructed when a data catalog is loaded from the database,
so it is available to data requirement filters.
11 changes: 7 additions & 4 deletions packages/climate-ref/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,11 @@ class AdapterTestConfig:
instance_id_prefix: str
instance_id_part_count: int

# Round-trip columns to drop (not persisted through the DB)
non_roundtrip_columns: list[str]
# Columns excluded from the raw-vs-DB round-trip equality comparison, because they are
# either parse-only intermediates dropped before storage (e.g. time_range) or stored
# values that do not compare equal across the round-trip (e.g. tracking_id).
# Derived columns are NOT listed here.
roundtrip_exclude_columns: list[str]

# Complete parser core fields (must be non-NA after parsing)
complete_parser_core_fields: list[str]
Expand Down Expand Up @@ -216,7 +219,7 @@ class AdapterTestConfig:
parser_config_attr="cmip6_parser",
instance_id_prefix="CMIP6",
instance_id_part_count=10,
non_roundtrip_columns=["time_range"],
roundtrip_exclude_columns=["time_range"],
complete_parser_core_fields=[
"source_id",
"experiment_id",
Expand Down Expand Up @@ -253,7 +256,7 @@ class AdapterTestConfig:
parser_config_attr="cmip7_parser",
instance_id_prefix="CMIP7",
instance_id_part_count=12,
non_roundtrip_columns=["time_range", "branded_variable", "tracking_id"],
roundtrip_exclude_columns=["time_range", "tracking_id"],
complete_parser_core_fields=[
"source_id",
"experiment_id",
Expand Down
53 changes: 51 additions & 2 deletions packages/climate-ref/src/climate_ref/datasets/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,15 @@ class DatasetAdapter(Protocol):
This is generally the columns that describe the `slug` of a dataset,
excluding the version information.
"""
derived_metadata: tuple[str, ...] = ()
"""
Columns that are not stored in the database but are computed from stored metadata.

These are reconstructed by :meth:`_add_derived_columns` whenever a catalog is loaded,
so that a DB-loaded catalog has the same columns as a freshly parsed one
(e.g. CMIP7's ``branded_variable``).
:meth:`load_catalog` enforces that every column listed here is present after loading.
"""

def pretty_subset(self, data_catalog: pd.DataFrame) -> pd.DataFrame:
"""
Expand Down Expand Up @@ -504,12 +513,52 @@ def load_catalog(

# If there are no datasets, return an empty DataFrame
if catalog.empty:
return pd.DataFrame(columns=self.dataset_specific_metadata + self.file_specific_metadata)
empty = pd.DataFrame(columns=self.dataset_specific_metadata + self.file_specific_metadata)
return self._finalise_loaded_catalog(empty)

# Convert start_time/end_time strings from DB to cftime objects
if "start_time" in catalog.columns:
cal = catalog["calendar"] if "calendar" in catalog.columns else "standard"
catalog["start_time"] = parse_cftime_dates(catalog["start_time"], cal)
catalog["end_time"] = parse_cftime_dates(catalog["end_time"], cal)

return self.filter_latest_versions(catalog)
return self._finalise_loaded_catalog(self.filter_latest_versions(catalog))

def _finalise_loaded_catalog(self, catalog: pd.DataFrame) -> pd.DataFrame:
"""
Add derived columns and enforce the loaded-catalog invariant.

Every column listed in :attr:`derived_metadata` must be present after loading,
so downstream code (e.g. the solver applying data requirement filters) can rely on its existence.
"""
catalog = self._add_derived_columns(catalog)

missing = set(self.derived_metadata) - set(catalog.columns)
if missing:
raise RuntimeError(
f"{type(self).__name__} did not produce its declared derived column(s): {sorted(missing)}"
)
return catalog

def _add_derived_columns(self, catalog: pd.DataFrame) -> pd.DataFrame:
"""
Add derived columns to a loaded data catalog.

Derived columns are computed from stored metadata rather than persisted in
the database (e.g. CMIP7's ``branded_variable``).
They must be reconstructed whenever a catalog is loaded from the database.

Adapters that declare :attr:`derived_metadata` must override this to populate
every column listed there; the base implementation is a no-op.

Parameters
----------
catalog
Data catalog loaded from the database

Returns
-------
:
Data catalog with any derived columns added
"""
return catalog
42 changes: 38 additions & 4 deletions packages/climate-ref/src/climate_ref/datasets/cmip7.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ class CMIP7DatasetAdapter(FinaliseableDatasetAdapterMixin, DatasetAdapter):

file_specific_metadata = ("start_time", "end_time", "path", "tracking_id")

# Not stored in the DB; reconstructed by _add_derived_columns on every load.
derived_metadata = ("branded_variable",)

version_metadata = "version"

# CMIP7 DRS directory structure (MIP-DRS7 spec):
Expand Down Expand Up @@ -216,13 +219,44 @@ def _enrich_parsed_catalog(self, datasets: pd.DataFrame) -> pd.DataFrame:
datasets[column] = pd.NA

# Add branded_variable for the raw catalog (before DB ingestion)
if not datasets.empty:
datasets["branded_variable"] = datasets["variable_id"] + "_" + datasets["branding_suffix"]
else:
datasets["branded_variable"] = pd.Series(dtype="object")
datasets = self._add_derived_columns(datasets)

return datasets

def _add_derived_columns(self, catalog: pd.DataFrame) -> pd.DataFrame:
"""
Add the derived ``branded_variable`` column (``{variable_id}_{branding_suffix}``).

``branded_variable`` is not stored in the database as it is derived from
``variable_id`` and ``branding_suffix``.
Both inputs are mandatory CMIP7 DRS facets,
so a catalog missing the columns or carrying null values is malformed and an exception is raised.
"""
catalog = super()._add_derived_columns(catalog)

required = ("variable_id", "branding_suffix")
missing = [column for column in required if column not in catalog.columns]
if missing:
raise ValueError(
f"Cannot derive 'branded_variable': catalog is missing required column(s) {missing}"
)

if catalog.empty:
catalog["branded_variable"] = pd.Series(dtype="object")
return catalog

invalid = catalog["variable_id"].isna() | catalog["branding_suffix"].isna()
if invalid.any():
raise ValueError(
"Cannot derive 'branded_variable': "
f"'variable_id'/'branding_suffix' is null for {int(invalid.sum())} row(s)"
)

catalog["branded_variable"] = (
catalog["variable_id"].astype(str) + "_" + catalog["branding_suffix"].astype(str)
)
return catalog

def find_local_datasets(self, file_or_directory: Path) -> pd.DataFrame:
"""
Generate a data catalog from the specified file or directory.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,12 @@ def test_round_trip(self, parser, config, adapter_config, adapter_local_catalogs
adapter.register_dataset(database, data_catalog_dataset)

local_data_catalog = sort_data_catalog(
catalog.drop(columns=adapter_config.non_roundtrip_columns, errors="ignore")
catalog.drop(columns=adapter_config.roundtrip_exclude_columns, errors="ignore")
).reset_index(drop=True)

db_data_catalog = sort_data_catalog(
adapter.load_catalog(database).drop(
columns=adapter_config.non_roundtrip_columns, errors="ignore"
columns=adapter_config.roundtrip_exclude_columns, errors="ignore"
)
).reset_index(drop=True)

Expand All @@ -162,6 +162,25 @@ def test_round_trip(self, parser, config, adapter_config, adapter_local_catalogs
check_like=True,
)

@pytest.mark.parametrize("parser", ["complete", "drs"])
def test_derived_columns_survive_round_trip(self, parser, config, adapter_config, adapter_local_catalogs):
"""Every column an adapter declares in ``derived_metadata`` is present after a DB load."""
setattr(config, adapter_config.parser_config_attr, parser)
catalog = adapter_local_catalogs[parser]

with Database.from_config(config, run_migrations=True) as database:
adapter = adapter_config.adapter_cls()
with database.session.begin():
for _instance_id, data_catalog_dataset in catalog.groupby(adapter.slug_column):
adapter.register_dataset(database, data_catalog_dataset)

db_catalog = adapter.load_catalog(database)

for column in adapter.derived_metadata:
assert column in db_catalog.columns, (
f"Derived column '{column}' missing from DB-loaded catalog: {sorted(db_catalog.columns)}"
)

def test_finalise_datasets(self, config, adapter_config, adapter_data_dir):
"""DRS ingest -> register -> finalise -> verify metadata populated."""
setattr(config, adapter_config.parser_config_attr, "drs")
Expand Down
50 changes: 50 additions & 0 deletions packages/climate-ref/tests/unit/datasets/test_cmip7.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import ClassVar

import pandas as pd
import pytest

from climate_ref.datasets.base import _is_na
from climate_ref.datasets.cmip7 import (
Expand Down Expand Up @@ -410,6 +411,55 @@ def test_branded_variable_derived(self, cmip7_converted_file, config):
assert "branded_variable" in data_catalog.columns
assert row["branded_variable"] == f"{row['variable_id']}_{row['branding_suffix']}"

def test_branded_variable_survives_db_round_trip(self, cmip7_converted_file, db, config):
"""branded_variable is reconstructed (with correct value) when loading from the DB.

Regression test for issue #687
"""
adapter = CMIP7DatasetAdapter(config=config)

catalog = adapter.validate_data_catalog(adapter.find_local_datasets(cmip7_converted_file.parent))
with db.session.begin():
for _instance_id, dataset in catalog.groupby(adapter.slug_column):
adapter.register_dataset(db, dataset)

db_catalog = adapter.load_catalog(db)

assert "branded_variable" in db_catalog.columns
row = db_catalog.iloc[0]
assert row["branded_variable"] == f"{row['variable_id']}_{row['branding_suffix']}"

def test_branded_variable_requires_source_column(self, config):
"""Deriving branded_variable on a catalog missing branding_suffix is a hard error."""
adapter = CMIP7DatasetAdapter(config=config)
catalog = pd.DataFrame({"variable_id": ["tas"]}) # no branding_suffix column

with pytest.raises(ValueError, match="missing required column"):
adapter._add_derived_columns(catalog)

def test_branded_variable_rejects_null_source_value(self, config):
"""Deriving branded_variable when an input value is null is a hard error."""
adapter = CMIP7DatasetAdapter(config=config)
catalog = pd.DataFrame({"variable_id": ["tas"], "branding_suffix": [pd.NA]})

with pytest.raises(ValueError, match="is null"):
adapter._add_derived_columns(catalog)

def test_branded_variable_empty_catalog_has_column(self, config):
"""An empty catalog still gains the branded_variable column (no rows to validate)."""
adapter = CMIP7DatasetAdapter(config=config)
catalog = pd.DataFrame(
{
"variable_id": pd.Series(dtype="object"),
"branding_suffix": pd.Series(dtype="object"),
}
)

result = adapter._add_derived_columns(catalog)

assert "branded_variable" in result.columns
assert len(result) == 0


class TestParseCMIP7UsingDirectories:
"""Tests for parse_cmip7_using_directories."""
Expand Down
Loading