Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom aggregation #147

Merged
merged 11 commits into from Feb 8, 2023
1 change: 1 addition & 0 deletions HISTORY.rst
Expand Up @@ -19,6 +19,7 @@ New features and enhancements
* Notebooks are now tested using `pytest` with `nbval`. (:pull:`108`).
* New ``restrict_warming_level`` argument for ``extract.search_data_catalogs`` to filter dataset that are not in the warming level csv. (:issue:`105`, :pull:`138`).
* Set configuration value programmatically thourgh ``CONFIG.set``. (:pull:`144`).
* New ``to_dataset`` method on ``DataCatalog``. The same as ``to_dask``, but exposing more aggregation options. (:pull:`147`).

Breaking changes
^^^^^^^^^^^^^^^^
Expand Down
6 changes: 4 additions & 2 deletions docs/columns.rst
Expand Up @@ -4,6 +4,8 @@ Columns
This section presents a definition and examples for each column of a xscen DataCatalog.
The entries for the columns are based on CMIP6 metadata and the ES-DOC controlled vocabulary (https://github.com/ES-DOC/pyessv-archive).
Some columns might be left empty (with a ``NaN``), but ``id``, ``domain``, ``processing_level`` and ``xrfreq`` are mandatory.
These four columns are what xscen uses to guess which entries can be merged together : all entries with the same unique combination of
aulemahal marked this conversation as resolved.
Show resolved Hide resolved
the four columns will be combined in a single Dataset when any of the first three functions listed :ref:`here <opening-data>` are used.

- ``id``: Unique Dataset ID generated by ``xscen`` based on a subset of columns. By default, it is based on ``xscen.catalog.ID_COLUMNS``.
- E.g. "ERA_ecmwf_ERA5_ERA5-Land_NAM", "CMIP6_ScenarioMIP_CMCC_CMCC-ESM2_ssp245_r1i1p1f1_global"
Expand Down Expand Up @@ -57,10 +59,10 @@ Some columns might be left empty (with a ``NaN``), but ``id``, ``domain``, ``pr
- E.g. "global", "NAM", "ARC-44", "ARC-22"

- ``date_start``: First date of the dataset. This a pd.Period object with an hourly frequency.
- E.g. "2022-06-03 00:00:00"
- E.g. "2022-06-03 00:00"

- ``date_end``: Last date of the dataset. This a pd.Period object with an hourly frequency.
- E.g. "2022-06-03 00:00:00"
- E.g. "2022-06-03 00:00"

- ``version``: Version of the dataset.
- E.g. "1.0"
Expand Down
45 changes: 45 additions & 0 deletions docs/goodtoknow.rst
@@ -0,0 +1,45 @@
============
Good to know
============

.. _opening-data:

Which function to use when opening data
---------------------------------------
There are many ways to open data in xscen workflows. The list below tries to make the differences clear:

:Search and extract: Using :py:func:`~xscen.extract.search_data_catalogs` + :py:func:`~xscen.extract.extract_dataset`.
This is the principal method to parse catalogs of "raw" data, data not yet modified by your worflow. It has features meant to ease the aggregation and extraction of raw files :
aulemahal marked this conversation as resolved.
Show resolved Hide resolved

* variable conversion and resampling
aulemahal marked this conversation as resolved.
Show resolved Hide resolved
* spatial and temporal subset
aulemahal marked this conversation as resolved.
Show resolved Hide resolved
* matching historical and future runs for simulations

``search_data_catalogs`` returns a dictionary with a specific catalog for each of the unique ``id`` found in the search. One should then iterate over this dictionary and call
``extract_dataset`` on each item. This then returns a dictionary with a single dataset for each ``xrfreq``. You thus end up with one dataset per frequency and ``id``.

:to_dataset_dict: Using :py:meth:`~xscen.catalog.DataCatalog.to_dataset_dict`.
When all the data you need is in a single catalog (for example, your :py:meth:`~xscen.catalog.ProjectCatalog`) and you don't need any of the features listed above.
aulemahal marked this conversation as resolved.
Show resolved Hide resolved
As explained in :doc:`columns`, it creates a dictionary with a single Dataset for each combination of ``id``, ``domain``, ``processing_level`` and ``xrfreq``.
aulemahal marked this conversation as resolved.
Show resolved Hide resolved

:to_dataset: Using :py:meth:`~xscen.catalog.DataCatalog.to_dataset`.
Similar to ``to_dataset_dict``, but only returns a single dataset. If the catalog has more than one, the call will fail. However, it exposes options to add aggregations.
aulemahal marked this conversation as resolved.
Show resolved Hide resolved
This is useful when constructing an ensemble dataset that would otherwise result in distinct entries in the output of ``to_dataset_dict``. It can usually be used in
replacement of a combination of ``to_dataset_dict`` and :py:func:`~xclim.ensembles.create_ensemble`.
aulemahal marked this conversation as resolved.
Show resolved Hide resolved

:open_dataset: Of course, xscen workflows can still use the conventional :py:func:`~xarray.open_dataset`. Just be aware that datasets opened this way will lack the attributes
automatically added by the previous functions, which will then result in poorer metadata. Same thing for :py:func:`~xarray.open_mfdataset`. If one has data listed in a catalog,
aulemahal marked this conversation as resolved.
Show resolved Hide resolved
the functions above will usually provide what you need, i.e. : ``xr.open_mfdataset(cat.df.path)`` is very rarely optimal.

:create_ensemble: With :py:meth:`~xscen.catalog.DataCatalog.to_dataset` or :py:func:`~xscen.ensembles.ensemble_stats`, you should usually find what you need. :py:func:`~xclim.ensembles.create_ensemble` is not needed in xscen workflows.


Which function to use when resampling data
------------------------------------------

:extract_dataset: :py:func:`~xscen.extract.extract_dataset` has resampling capabilities to provide daily data from finer sources.

:xclim indicators: Through :py:func:`~xscen.indicators.compute_indicators`, xscen workflows can easily use `xclim indicators <https://xclim.readthedocs.io/en/stable/indicators.html>`_
to go from daily data to coarser (monthly, seasonal, annual).

What is missing in `xscen` and `xclim` is thus a good way to start the resampling from data coarser than daily, where the base period is non-uniform (ex: months have different lengths).
aulemahal marked this conversation as resolved.
Show resolved Hide resolved
aulemahal marked this conversation as resolved.
Show resolved Hide resolved
1 change: 1 addition & 0 deletions docs/index.rst
Expand Up @@ -26,6 +26,7 @@ Features

readme
installation
goodtoknow
notebooks/1_catalog
notebooks/2_getting_started
notebooks/3_diagnostics
Expand Down
16 changes: 8 additions & 8 deletions docs/notebooks/2_getting_started.ipynb
Expand Up @@ -171,7 +171,7 @@
"source": [
"### Preparing arguments for *xarray*\n",
"\n",
"`xscen` makes use of `intake_esm`'s [to_dataset_dict()](https://intake-esm.readthedocs.io/en/stable/reference/api.html?highlight=to_dataset_dict) for the extraction process, which will automatically compute missing variables as required. Also, this function manages Catalogs, IDs, and both NetCDF and Zarr files seamlessly. When the catalog is made of a single dataset, `to_dask()` can be used instead to directly obtain an *xr.Dataset* instead of a dictionary.\n",
"`xscen` makes use of `intake_esm`'s [to_dataset_dict()](https://intake-esm.readthedocs.io/en/stable/reference/api.html?highlight=to_dataset_dict) for the extraction process, which will automatically compute missing variables as required. Also, this function manages Catalogs, IDs, and both NetCDF and Zarr files seamlessly. When the catalog is made of a single dataset, `to_dataset()` can be used instead to directly obtain an *xr.Dataset* instead of a dictionary.\n",
"\n",
"There are a few key differences compared to using *xarray* directly, one of which being that it uses `xr.open_dataset`, even when multiple files are involved, with a subsequent call to `xr.combine_by_coords`. Kwargs are therefore separated in two:\n",
"\n",
Expand Down Expand Up @@ -424,12 +424,12 @@
" \"mask_nans\": True,\n",
"}\n",
"\n",
"# to_dask() will open the dataset, as long as the search() gave a single result.\n",
"# to_dataset() will open the dataset, as long as the search() gave a single result.\n",
"ds_example = pcat.search(\n",
" id=\"CMIP6_ScenarioMIP_NCC_NorESM2-MM_ssp126_r1i1p1f1_example-region\",\n",
" processing_level=\"extracted\",\n",
" variable=\"sftlf\",\n",
").to_dask()\n",
").to_dataset()\n",
"\n",
"# Masking function\n",
"ds_example[\"mask\"] = xs.create_mask(ds_example, mask_args=mask_args)"
Expand Down Expand Up @@ -552,7 +552,7 @@
" ds[\"mask\"] = xs.create_mask(\n",
" pcat.search(\n",
" id=ds.attrs[\"cat:id\"], processing_level=\"extracted\", variable=\"sftlf\"\n",
" ).to_dask(),\n",
" ).to_dataset(),\n",
" mask_args=mask_args,\n",
" )\n",
"\n",
Expand Down Expand Up @@ -684,7 +684,7 @@
" processing_level=\"regridded\",\n",
" id=\"CMIP6_ScenarioMIP_NCC_NorESM2-MM_ssp245_r2i1p1f1_example-region\",\n",
" variable=v,\n",
" ).to_dask()\n",
" ).to_dataset()\n",
" ds_dict = pcat.search(processing_level=\"regridded\", variable=v).to_dataset_dict()\n",
"\n",
" for ds in ds_dict.values():\n",
Expand Down Expand Up @@ -728,12 +728,12 @@
" processing_level=\"regridded\",\n",
" variable=\"tas\",\n",
" id=\"CMIP6_ScenarioMIP_NCC_NorESM2-MM_ssp245_r1i1p1f1_example-region\",\n",
").to_dask()\n",
").to_dataset()\n",
"ds_adj = pcat.search(\n",
" processing_level=\"biasadjusted\",\n",
" variable=\"tas\",\n",
" id=\"CMIP6_ScenarioMIP_NCC_NorESM2-MM_ssp245_r1i1p1f1_example-region\",\n",
").to_dask()\n",
").to_dataset()\n",
"\n",
"vmin = float(ds_ref.tas.sel(time=slice(\"2000\", \"2005\")).mean(dim=\"time\").min())\n",
"vmax = float(ds_ref.tas.sel(time=slice(\"2000\", \"2005\")).mean(dim=\"time\").max())\n",
Expand Down Expand Up @@ -1175,7 +1175,7 @@
"source": [
"ds = pcat.search(\n",
" processing_level=\"biasadjusted\", variable=\"tas\", experiment=\"ssp585\"\n",
").to_dask()\n",
").to_dataset()\n",
"\n",
"ds_clean = xs.clean_up(\n",
" ds=ds,\n",
Expand Down
6 changes: 3 additions & 3 deletions docs/notebooks/3_diagnostics.ipynb
Expand Up @@ -150,7 +150,7 @@
"dref = gettingStarted_cat.search(\n",
" id=\"CMIP6_ScenarioMIP_NCC_NorESM2-MM_ssp245_r2i1p1f1_example-region\",\n",
" processing_level=\"regridded\",\n",
").to_dask()\n",
").to_dataset()\n",
"\n",
"# calculate properties and measures\n",
"prop_ref, _ = xs.properties_and_measures(\n",
Expand Down Expand Up @@ -193,7 +193,7 @@
" experiment=\"ssp245\",\n",
" member=\"r1.*\",\n",
" processing_level=\"biasadjusted\",\n",
").to_dask()\n",
").to_dataset()\n",
"\n",
"# calculate properties and measures\n",
"prop_scen, meas_scen = xs.properties_and_measures(\n",
Expand Down Expand Up @@ -267,7 +267,7 @@
" experiment=\"ssp245\",\n",
" member=\"r1.*\",\n",
" processing_level=\"regridded\",\n",
").to_dask()\n",
").to_dataset()\n",
"prop_sim, meas_sim = xs.properties_and_measures(\n",
" ds=dsim,\n",
" properties=properties,\n",
Expand Down
2 changes: 1 addition & 1 deletion docs/notebooks/5_warminglevels.ipynb
Expand Up @@ -162,7 +162,7 @@
"ds_wl = pcat.search(\n",
" id=\"CMIP6_ScenarioMIP_NCC_NorESM2-MM_ssp585_r1i1p1f1_example-region\",\n",
" processing_level=\"warminglevel-1.5vs1850-1900\",\n",
").to_dask()\n",
").to_dataset()\n",
"display(ds_wl)"
]
},
Expand Down
129 changes: 120 additions & 9 deletions xscen/catalog.py
Expand Up @@ -29,7 +29,7 @@

from .config import CONFIG, args_as_str, parse_config, recursive_update
from .io import get_engine
from .utils import CV # noqa
from .utils import CV, ensure_correct_time # noqa

logger = logging.getLogger(__name__)
# Monkey patch for attribute names in the output of to_dataset_dict
Expand Down Expand Up @@ -347,6 +347,115 @@ def exists_in_cat(self, **columns):
logger.info(f"An entry exists for: {columns}")
return exists

def to_dataset(
aulemahal marked this conversation as resolved.
Show resolved Hide resolved
self,
concat_on: Optional[Union[List[str], str]] = None,
ensemble_on: Optional[Union[List[str], str]] = None,
calendar: Optional[str] = "standard",
**kwargs,
) -> xr.Dataset:
"""
Open the catalog's entries into a single dataset.

Same as :py:meth:`~intake_esm.core.esm_datastore.to_dask`, but with additionnal control over the aggregations.
aulemahal marked this conversation as resolved.
Show resolved Hide resolved
Ensemble preprocessing logic is taken from :py:func:`xclim.ensembles.create_ensemble`.
When `ensemble_on` is given, the function ensures all entries have the correct time coordinate according to `xrfreq`.
If either argument is given, the "id" is reconstructed by removing mentions of aggregated columns.
This will override any "custom" id, ones not unstackable with :py:func:`~xscen.catalog.unstack_id`.

Parameters
----------
concat_on : list of strings or str, optional
aulemahal marked this conversation as resolved.
Show resolved Hide resolved
A list of catalog columns to concat the datasets over. Each will become a new dimension with the column values as coordinates.
RondeauG marked this conversation as resolved.
Show resolved Hide resolved
aulemahal marked this conversation as resolved.
Show resolved Hide resolved
Xarray concatenation rules apply and can be acted upon through `xarray_combine_by_coords_kwargs`.
ensemble_on : list of strings or str, optional
aulemahal marked this conversation as resolved.
Show resolved Hide resolved
A list of catalog columns to combine into a new `id` over which the datasets are concatenated.
The new dimension is named "realization".
Xarray concatenation rules apply and can be acted upon through `xarray_combine_by_coords_kwargs`.
calendar : str, optional
If `ensemble_on` is given, all datasets are converted to this calendar before concatenation.
Ignored if `ensemble_on` is None (default). If None, no conversion is done.
`align_on` is always "date".
kwargs:
Any other arguments are passed to :py:meth:`~intake_esm.core.esm_datastore.to_dataset_dict`.
The `preprocess` argument cannot be used if `ensemble_on` is given.

Returns
-------
:py:class:`~xarray.Dataset`

See Also
--------
intake_esm.core.esm_datastore.to_dataset_dict
intake_esm.core.esm_datastore.to_dask
xclim.ensembles.create_ensemble
"""
cat = deepcopy(self)
preprocess = kwargs.get("preprocess")

if isinstance(concat_on, str):
concat_on = [concat_on]
if isinstance(ensemble_on, str):
ensemble_on = [ensemble_on]
rm_from_id = (concat_on or []) + (ensemble_on or []) + ["realization"]

aggs = {
agg.attribute_name for agg in cat.esmcat.aggregation_control.aggregations
}
if not set(cat.esmcat.aggregation_control.groupby_attrs).isdisjoint(rm_from_id):
raise ValueError(
"Can't add aggregations for columns in the catalog's groupby_attrs "
f"({cat.esmcat.aggregation_control.groupby_attrs})"
)
if not aggs.isdisjoint(rm_from_id):
raise ValueError(
"Can't add aggregations for columns were an aggregation already exists ({aggs})"
)

if concat_on:
cat.esmcat.aggregation_control.aggregations.extend(
[
intake_esm.cat.Aggregation(
type=intake_esm.cat.AggregationType.join_new, attribute_name=col
)
for col in concat_on
]
)

if ensemble_on:
if preprocess is not None:
warnings.warn(
"Using `ensemble_on` will override the iven `preprocess` function."
aulemahal marked this conversation as resolved.
Show resolved Hide resolved
)
cat.df["realization"] = generate_id(cat.df, ensemble_on)
cat.esmcat.aggregation_control.aggregations.append(
intake_esm.cat.Aggregation(
type=intake_esm.cat.AggregationType.join_new,
attribute_name="realization",
)
)
xrfreq = cat.df["xrfreq"].unique()[0]

def preprocess(ds):
ds = ensure_correct_time(ds, xrfreq)
if calendar is not None:
ds = ds.convert_calendar(
calendar, use_cftime=(calendar == "default"), align_on="date"
)
return ds

if len(rm_from_id) > 1:
# Guess what the ID was and rebuild a new one, omitting the columns part of the aggregation
unstacked = unstack_id(cat)
cat.esmcat.df["id"] = cat.df.apply(
lambda row: _build_id(
row, [col for col in unstacked[row["id"]] if col not in rm_from_id]
),
axis=1,
)
ds = cat.to_dask(preprocess=preprocess, **kwargs)
return ds


class ProjectCatalog(DataCatalog):
"""A DataCatalog with additional 'write' functionalities that can update and upload itself.
Expand Down Expand Up @@ -1346,6 +1455,11 @@ def _parse_date(date, fmts):
return date


def _build_id(element: pd.Series, columns: List[str]):
"""Build an ID from a catalog's row and a list of columns."""
return "_".join(map(str, filter(pd.notna, element[columns].values)))


def generate_id(
df: Union[pd.DataFrame, xr.Dataset], id_columns: Optional[list] = None
): # noqa: D401
Expand All @@ -1370,9 +1484,7 @@ def generate_id(

id_columns = [x for x in (id_columns or ID_COLUMNS) if x in df.columns]

return df[id_columns].apply(
lambda row: "_".join(map(str, filter(pd.notna, row.values))), axis=1
)
return df.apply(_build_id, axis=1, args=(id_columns,))


def unstack_id(
Expand Down Expand Up @@ -1407,11 +1519,10 @@ def unstack_id(
].drop("id", axis=1)

# Make sure that all elements are the same, if there are multiple lines
if len(subset) > 1:
if not all([subset[col].is_unique for col in subset.columns]):
raise ValueError(
"Not all elements of the columns are the same for a given ID!"
)
if not (subset.nunique() == 1).all():
raise ValueError(
"Not all elements of the columns are the same for a given ID!"
)

out[ids] = {attr: subset[attr].iloc[0] for attr in subset.columns}

Expand Down
24 changes: 4 additions & 20 deletions xscen/extract.py
Expand Up @@ -24,7 +24,9 @@
)
from .config import parse_config
from .indicators import load_xclim_module, registry_from_module
from .utils import CV, natural_sort
from .utils import CV
from .utils import ensure_correct_time as _ensure_correct_time
from .utils import natural_sort

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -265,25 +267,7 @@ def extract_dataset(
if "time" in ds_ts and ensure_correct_time:
# Expected freq (xrfreq is the wanted freq)
expfreq = catalog[key].df.xrfreq.iloc[0]
# Check if we got the expected freq (skip for too short timeseries)
inffreq = xr.infer_freq(ds_ts.time) if ds_ts.time.size > 2 else None
if inffreq == expfreq:
# Even when the freq is correct, we ensure the correct "anchor" for daily and finer
if expfreq in "DHTMUL":
ds_ts["time"] = ds_ts.time.dt.floor(expfreq)
else:
# We can't infer it, there might be a problem
counts = ds_ts.time.resample(time=expfreq).count()
if (counts > 1).any().item():
raise ValueError(
"Dataset is labelled as having a sampling frequency of "
f"{xrfreq}, but some periods have more than one data point."
)
if (counts.isnull()).any().item():
raise ValueError(
"The resampling count contains nans. There might be some missing data."
)
ds_ts["time"] = counts.time
ds_ts = _ensure_correct_time(ds_ts, expfreq)

for var_name, da in ds_ts.data_vars.items():
# Support for grid_mapping, crs, and other such variables
Expand Down