Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom aggregation #147

Merged
merged 11 commits into from Feb 8, 2023
1 change: 1 addition & 0 deletions HISTORY.rst
Expand Up @@ -19,6 +19,7 @@ New features and enhancements
* Notebooks are now tested using `pytest` with `nbval`. (:pull:`108`).
* New ``restrict_warming_level`` argument for ``extract.search_data_catalogs`` to filter dataset that are not in the warming level csv. (:issue:`105`, :pull:`138`).
* Set configuration value programmatically thourgh ``CONFIG.set``. (:pull:`144`).
* New ``to_dataset`` method on ``DataCatalog``. The same as ``to_dask``, but exposing more aggregation options. (:pull:`147).

Breaking changes
^^^^^^^^^^^^^^^^
Expand Down
6 changes: 4 additions & 2 deletions docs/columns.rst
Expand Up @@ -4,6 +4,8 @@ Columns
This section presents a definition and examples for each column of a xscen DataCatalog.
The entries for the columns are based on CMIP6 metadata and the ES-DOC controlled vocabulary (https://github.com/ES-DOC/pyessv-archive).
Some columns might be left empty (with a ``NaN``), but ``id``, ``domain``, ``processing_level`` and ``xrfreq`` are mandatory.
These four columns are what xscen uses to guess which entries can be merged together : all entries with the same unique combination of
aulemahal marked this conversation as resolved.
Show resolved Hide resolved
the four columns will be combined in a single Dataset when any of the first three functions listed :ref:`here <opening-data>` are used.

- ``id``: Unique Dataset ID generated by ``xscen`` based on a subset of columns. By default, it is based on ``xscen.catalog.ID_COLUMNS``.
- E.g. "ERA_ecmwf_ERA5_ERA5-Land_NAM", "CMIP6_ScenarioMIP_CMCC_CMCC-ESM2_ssp245_r1i1p1f1_global"
Expand Down Expand Up @@ -57,10 +59,10 @@ Some columns might be left empty (with a ``NaN``), but ``id``, ``domain``, ``pr
- E.g. "global", "NAM", "ARC-44", "ARC-22"

- ``date_start``: First date of the dataset. This a pd.Period object with an hourly frequency.
- E.g. "2022-06-03 00:00:00"
- E.g. "2022-06-03 00:00"

- ``date_end``: Last date of the dataset. This a pd.Period object with an hourly frequency.
- E.g. "2022-06-03 00:00:00"
- E.g. "2022-06-03 00:00"

- ``version``: Version of the dataset.
- E.g. "1.0"
Expand Down
45 changes: 45 additions & 0 deletions docs/goodtoknow.rst
@@ -0,0 +1,45 @@
============
Good to know
============

.. _opening-data:

Which function to use when opening data
---------------------------------------
There are many ways to open data in xscen workflows. The list below tries to make the differences clear:

:Search and extract: Using :py:func:`~xscen.extract.search_data_catalogs` + :py:func:`~xscen.extract.extract_dataset`.
This is the principal method to parse catalogs of "raw" data, data not yet modified by your worflow. It has features meant to ease the aggregation and extraction of raw files :
aulemahal marked this conversation as resolved.
Show resolved Hide resolved

* variable conversion and resampling
aulemahal marked this conversation as resolved.
Show resolved Hide resolved
* spatial and temporal subset
aulemahal marked this conversation as resolved.
Show resolved Hide resolved
* matching historical and future runs for simulations

``search_data_catalogs`` returns a dictionary with a specific catalog for each of the unique ``id`` found in the search. One should then iterate over this dictionary and call
``extract_dataset`` on each item. This then returns a dictionary with a single dataset for each ``xrfreq``. You thus end up with one dataset per frequency and ``id``.

:to_dataset_dict: Using :py:meth:`~xscen.catalog.DataCatalog.to_dataset_dict`.
When all the data you need is in a single catalog (for example, your :py:meth:`~xscen.catalog.ProjectCatalog`) and you don't need any of the features listed above.
aulemahal marked this conversation as resolved.
Show resolved Hide resolved
As explained in :doc:`columns`, it creates a dictionary with a single Dataset for each combination of ``id``, ``domain``, ``processing_level`` and ``xrfreq``.
aulemahal marked this conversation as resolved.
Show resolved Hide resolved

:to_dataset: Using :py:meth:`~xscen.catalog.DataCatalog.to_dataset`.
Similar to ``to_dataset_dict``, but only returns a single dataset. If the catalog has more than one, the call will fail. However, it exposes options to add aggregations.
aulemahal marked this conversation as resolved.
Show resolved Hide resolved
This is useful when constructing an ensemble dataset that would otherwise result in distinct entries in the output of ``to_dataset_dict``. It can usually be used in
replacement of a combination of ``to_dataset_dict`` and :py:func:`~xclim.ensembles.create_ensemble`.
aulemahal marked this conversation as resolved.
Show resolved Hide resolved

:open_dataset: Of course, xscen workflows can still use the conventional :py:func:`~xarray.open_dataset`. Just be aware that datasets opened this way will lack the attributes
automatically added by the previous functions, which will then result in poorer metadata. Same thing for :py:func:`~xarray.open_mfdataset`. If one has data listed in a catalog,
aulemahal marked this conversation as resolved.
Show resolved Hide resolved
the functions above will usually provide what you need, i.e. : ``xr.open_mfdataset(cat.df.path)`` is very rarely optimal.

:create_ensemble: With :py:meth:`~xscen.catalog.DataCatalog.to_dataset` or :py:func:`~xscen.ensembles.ensemble_stats`, you should usually find what you need. :py:func:`~xclim.ensembles.create_ensemble` is not needed in xscen workflows.


Which function to use when resampling data
------------------------------------------

:extract_dataset: :py:func:`~xscen.extract.extract_dataset` has resampling capabilities to provide daily data from finer sources.

:xclim indicators: Through :py:func:`~xscen.indicators.compute_indicators`, xscen workflows can easily use `xclim indicators <https://xclim.readthedocs.io/en/stable/indicators.html>`_
to go from daily data to coarser (monthly, seasonal, annual).

What is missing in `xscen` and `xclim` is thus a good way to start the resampling from data coarser than daily, where the base period is non-uniform (ex: months have different lengths).
aulemahal marked this conversation as resolved.
Show resolved Hide resolved
aulemahal marked this conversation as resolved.
Show resolved Hide resolved
1 change: 1 addition & 0 deletions docs/index.rst
Expand Up @@ -26,6 +26,7 @@ Features

readme
installation
goodtoknow
notebooks/1_catalog
notebooks/2_getting_started
notebooks/3_diagnostics
Expand Down
73 changes: 72 additions & 1 deletion xscen/catalog.py
Expand Up @@ -29,7 +29,7 @@

from .config import CONFIG, args_as_str, parse_config, recursive_update
from .io import get_engine
from .utils import CV # noqa
from .utils import CV, ensure_correct_time # noqa

logger = logging.getLogger(__name__)
# Monkey patch for attribute names in the output of to_dataset_dict
Expand Down Expand Up @@ -347,6 +347,77 @@ def exists_in_cat(self, **columns):
logger.info(f"An entry exists for: {columns}")
return exists

def to_dataset(
aulemahal marked this conversation as resolved.
Show resolved Hide resolved
self,
concat_on: Optional[List[str]] = None,
ensemble_on: Optional[List[str]] = None,
calendar: Optional[str] = "standard",
**kwargs,
) -> xr.Dataset:
"""
Open the catalog's entries into a single dataset.

Same as :py:meth:`xscen.catalog.DataCatalog.to_dask`, but with additionnal control over the aggregations.
Ensemble preprocessing logic is taken from :py:func:`xclim.ensembles.create_ensemble`.
When `ensemble_on` is given, the function ensures all entries have the correct time coordinate according to `xrfreq`.

Parameters
----------
concat_on : list of strings, optional
A list of catalog columns to concat the datasets over. Each will become a new dimension with the column values as coordinates.
RondeauG marked this conversation as resolved.
Show resolved Hide resolved
aulemahal marked this conversation as resolved.
Show resolved Hide resolved
Xarray concatenation rules apply and can be acted upon through `xarray_combine_by_coords_kwargs`.
ensemble_on : list of strings, optional
A list of strings to combine into a new `id` over which the datasets are concatenated.
aulemahal marked this conversation as resolved.
Show resolved Hide resolved
The new dimension is named "realization".
Xarray concatenation rules apply and can be acted upon through `xarray_combine_by_coords_kwargs`.
calendar : str, optional
If `ensemble_on` is given, all datasets are converted to this calendar before concatenation.
Ignored if `ensemble_on` is None (default). If None, no conversion is done.
`align_on` is always "date".
kwargs:
Any other arguments are passed to :py:meth:`xscen.catalog.DataCatalog.to_dataset_dict`.
The `preprocess` argument cannot be used.

Returns
-------
:py:class:`~xarray.Dataset`
"""
cat = deepcopy(self)
preprocess = None
if concat_on:
cat.esmcat.aggregation_control.aggregations.extend(
[
intake_esm.cat.Aggregation(
type=intake_esm.cat.AggregationType.join_new, attribute_name=col
)
for col in concat_on
]
)

if ensemble_on:
cat.esmcat.df["id"] = generate_id(cat.esmcat.df, ensemble_on)
cat.esmcat.aggregation_control.aggregations.append(
intake_esm.cat.Aggregation(
type=intake_esm.cat.AggregationType.join_new, attribute_name="id"
)
)
cat.esmcat.aggregation_control.groupby_attrs.remove("id")

xrfreq = cat.df["xrfreq"].unique()[0]

def preprocess(ds):
ds = ensure_correct_time(ds, xrfreq)
if calendar is not None:
ds = ds.convert_calendar(
calendar, use_cftime=(calendar == "default"), align_on="date"
)
return ds

ds = cat.to_dask(preprocess=preprocess, **kwargs)
if ensemble_on:
ds = ds.rename(id="realization")
return ds


class ProjectCatalog(DataCatalog):
"""A DataCatalog with additional 'write' functionalities that can update and upload itself.
Expand Down
24 changes: 4 additions & 20 deletions xscen/extract.py
Expand Up @@ -24,7 +24,9 @@
)
from .config import parse_config
from .indicators import load_xclim_module, registry_from_module
from .utils import CV, natural_sort
from .utils import CV
from .utils import ensure_correct_time as _ensure_correct_time
from .utils import natural_sort

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -265,25 +267,7 @@ def extract_dataset(
if "time" in ds_ts and ensure_correct_time:
# Expected freq (xrfreq is the wanted freq)
expfreq = catalog[key].df.xrfreq.iloc[0]
# Check if we got the expected freq (skip for too short timeseries)
inffreq = xr.infer_freq(ds_ts.time) if ds_ts.time.size > 2 else None
if inffreq == expfreq:
# Even when the freq is correct, we ensure the correct "anchor" for daily and finer
if expfreq in "DHTMUL":
ds_ts["time"] = ds_ts.time.dt.floor(expfreq)
else:
# We can't infer it, there might be a problem
counts = ds_ts.time.resample(time=expfreq).count()
if (counts > 1).any().item():
raise ValueError(
"Dataset is labelled as having a sampling frequency of "
f"{xrfreq}, but some periods have more than one data point."
)
if (counts.isnull()).any().item():
raise ValueError(
"The resampling count contains nans. There might be some missing data."
)
ds_ts["time"] = counts.time
ds_ts = _ensure_correct_time(ds_ts, expfreq)

for var_name, da in ds_ts.data_vars.items():
# Support for grid_mapping, crs, and other such variables
Expand Down
30 changes: 30 additions & 0 deletions xscen/utils.py
Expand Up @@ -915,3 +915,33 @@ def show_versions(
]

return _show_versions(file=file, deps=deps)


def ensure_correct_time(ds: xr.Dataset, xrfreq: str) -> xr.Dataset:
"""
Ensure a dataset has the correct time coordinate, as expected for the given frequency.

Daily or finer datasets are "floored" even if `xr.infer_freq` succeeds.
Errors are raised if the number of data points per period is not 1.
The dataset is modified in-place, but returned nonetheless.
"""
# Check if we got the expected freq (skip for too short timeseries)
inffreq = xr.infer_freq(ds.time) if ds.time.size > 2 else None
if inffreq == xrfreq:
# Even when the freq is correct, we ensure the correct "anchor" for daily and finer
if xrfreq in "DHTMUL":
ds["time"] = ds.time.dt.floor(xrfreq)
else:
# We can't infer it, there might be a problem
counts = ds.time.resample(time=xrfreq).count()
if (counts > 1).any().item():
raise ValueError(
"Dataset is labelled as having a sampling frequency of "
f"{xrfreq}, but some periods have more than one data point."
)
if (counts.isnull()).any().item():
raise ValueError(
"The resampling count contains nans. There might be some missing data."
)
ds["time"] = counts.time
return ds