Skip to content

Commit

Permalink
Merge bab7de1 into b8a7330
Browse files Browse the repository at this point in the history
  • Loading branch information
Zeitsperre committed Mar 17, 2023
2 parents b8a7330 + bab7de1 commit 276e24f
Show file tree
Hide file tree
Showing 33 changed files with 1,727 additions and 1,219 deletions.
7 changes: 2 additions & 5 deletions miranda/__init__.py
Expand Up @@ -19,15 +19,12 @@
__version__ = "0.3.5-beta"


from . import (
from . import ( # convert,
archive,
convert,
cv,
decode,
eccc,
ecmwf,
ncar,
remote,
io,
scripting,
structure,
units,
Expand Down
3 changes: 2 additions & 1 deletion miranda/archive/_selection.py
Expand Up @@ -4,8 +4,9 @@
from pathlib import Path
from typing import List, Union

from miranda.io import find_filepaths
from miranda.io.utils import creation_date
from miranda.scripting import LOGGING_CONFIG
from miranda.utils import creation_date, find_filepaths

__all__ = ["select_by_date_modified"]

Expand Down
7 changes: 4 additions & 3 deletions miranda/convert/__init__.py
@@ -1,5 +1,6 @@
from . import deh, ec, ecmwf, hq, melcc, utils
from . import deh, eccc, ecmwf, hq, melcc, utils
from ._aggregation import *
from ._data_corrections import *
from ._data_definitions import *
from ._rechunk import *
from ._reconstruction import *

# from ._reconstruction import *
105 changes: 105 additions & 0 deletions miranda/convert/_aggregation.py
@@ -0,0 +1,105 @@
import logging.config
from typing import Dict, Set

import xarray as xr
import xclim.core.options
from xclim.indices import tas

from miranda.scripting import LOGGING_CONFIG
from miranda.units import get_time_frequency

logging.config.dictConfig(LOGGING_CONFIG)

__all__ = ["aggregations_possible", "aggregate"]

# There needs to be a better way (is there something in xclim?)
_resampling_keys = dict()
_resampling_keys["hour"] = "H"
_resampling_keys["day"] = "D"
_resampling_keys["month"] = "M"
_resampling_keys["year"] = "A"


def aggregations_possible(ds: xr.Dataset, freq: str = "day") -> Dict[str, Set[str]]:
logging.info("Determining potential upscaled climate variables.")

offset, meaning = get_time_frequency(ds, minimum_continuous_period="1h")

aggregation_legend = dict()
for v in ["tas", "tdps"]:
if freq == meaning:
if not hasattr(ds, v) and (
hasattr(ds, f"{v}max") and hasattr(ds, f"{v}min")
):
aggregation_legend[f"_{v}"] = {"mean"}
for variable in ds.data_vars:
if variable in ["tas", "tdps"]:
aggregation_legend[variable] = {"max", "mean", "min"}
elif variable in [
"evspsblpot",
"hfls",
"hfss",
"hur",
"hus",
"pr",
"prsn",
"ps",
"psl",
"rsds",
"rss",
"rlds",
"rls",
"snd",
"snr",
"snw",
"swe",
]:
aggregation_legend[variable] = {"mean"}

return aggregation_legend


def aggregate(ds, freq: str = "day") -> Dict[str, xr.Dataset]:
mappings = aggregations_possible(ds)

try:
xarray_agg = _resampling_keys[freq]
except KeyError:
xarray_agg = freq

aggregated = dict()
for variable, transformations in mappings.items():
for op in transformations:
ds_out = xr.Dataset()
ds_out.attrs = ds.attrs.copy()
ds_out.attrs["frequency"] = freq

with xclim.core.options.set_options(keep_attrs=True):
if variable.startswith("_"):
if op == "mean":
var = variable.strip("_")
min_var = "".join([var, "min"])
max_var = "".join([var, "max"])

mean_variable = tas(
tasmin=ds[min_var], tasmax=ds[max_var]
).resample(time=xarray_agg)
ds_out[var] = mean_variable.mean(dim="time", keep_attrs=True)
method = f"time: mean (interval: 1 {freq})"
ds_out[var].attrs["cell_methods"] = method
aggregated[var] = ds_out
continue

else:
if op in {"max", "min"}:
transformed = f"{variable}{op}"
else:
transformed = variable

r = ds[variable].resample(time=xarray_agg)
ds_out[transformed] = getattr(r, op)(dim="time", keep_attrs=True)
method = f"time: {op}{'imum' if op != 'mean' else ''} (interval: 1 {freq})"
ds_out[transformed].attrs["cell_methods"] = method
aggregated[transformed] = ds_out

return aggregated
96 changes: 27 additions & 69 deletions miranda/convert/_data_corrections.py
Expand Up @@ -2,7 +2,6 @@
import json
import logging.config
import os
import shutil
from functools import partial
from pathlib import Path
from typing import Callable, Dict, Iterator, List, Optional, Sequence, Union
Expand All @@ -16,20 +15,20 @@
from xclim.core.calendar import parse_offset

from miranda import __version__ as __miranda_version__
from miranda.convert.utils import delayed_write, find_version_hash, name_output_file
from miranda.decode import date_parser
from miranda.gis import subset_domain
from miranda.scripting import LOGGING_CONFIG
from miranda.units import get_time_frequency

from .utils import date_parser, find_version_hash

logging.config.dictConfig(LOGGING_CONFIG)

VERSION = datetime.datetime.now().strftime("%Y.%m.%d")

__all__ = [
"dataset_corrections",
"dims_conversion",
"file_conversion",
"dataset_conversion",
"load_json_data_mappings",
"metadata_conversion",
"threshold_mask",
Expand Down Expand Up @@ -241,7 +240,7 @@ def correct_var_names(d: xr.Dataset, split: str = "_", location: int = 0) -> xr.
return d.rename({old_name: new_name})


def preprocess_corrections(ds: xr.Dataset, *, project: str) -> xr.Dataset:
def preprocessing_corrections(ds: xr.Dataset, *, project: str) -> xr.Dataset:
def _preprocess_correct(d: xr.Dataset, *, ops: List[partial]) -> xr.Dataset:
for correction in ops:
d = correction(d)
Expand Down Expand Up @@ -748,86 +747,71 @@ def dataset_corrections(ds: xr.Dataset, project: str) -> xr.Dataset:

ds = metadata_conversion(ds, project, metadata_definition)

ds.attrs["history"] = (
f"{datetime.datetime.now()}: "
f"Variables converted from original files using miranda.convert.{dataset_corrections.__name__}. "
f"{ds.attrs.get('history')}".strip()
)

return ds


def file_conversion(
input: Union[
def dataset_conversion(
input_files: Union[
str,
os.PathLike,
Sequence[Union[str, os.PathLike]],
Iterator[os.PathLike],
xr.Dataset,
],
project: str,
output_path: Union[str, os.PathLike],
output_format: str,
*,
domain: Optional[str] = None,
mask: Optional[Union[xr.Dataset, xr.DataArray]] = None,
mask_cutoff: float = 0.5,
chunks: Optional[dict] = None,
overwrite: bool = False,
add_version_hashes: bool = True,
preprocess: Optional[Union[Callable, str]] = "auto",
compute: bool = True,
**xr_kwargs,
) -> Dict:
) -> Union[xr.Dataset, xr.DataArray]:
"""Convert an existing Xarray-compatible dataset to another format with variable corrections applied.
Parameters
----------
input : str or os.PathLike or Sequence[str or os.PathLike] or Iterator[os.PathLike] or xr.Dataset
input_files : str or os.PathLike or Sequence[str or os.PathLike] or Iterator[os.PathLike] or xr.Dataset
Files or objects to be converted.
If sent a list or GeneratorType, will open with :py:func:`xarray.open_mfdataset` and concatenate files.
project : {"cordex", "cmip5", "cmip6", "ets-grnch", "isimip-ft", "pcic-candcs-u6", "converted"}
Project name for decoding/handling purposes.
output_path : str or os.PathLike
Output folder path.
output_format: {"netcdf", "zarr"}
Output data container type.
domain: {"global", "nam", "can", "qc", "mtl"}, optional
Domain to perform subsetting for. Default: None.
mask : Optional[Union[xr.Dataset, xr.DataArray]]
DataArray or single data_variable dataset containing mask.
mask_cutoff : float
If land_sea_mask supplied, the threshold above which to mask with land_sea_mask. Default: 0.5.
chunks : dict, optional
Chunking layout to be written to new files. If None, chunking will be left to the relevant backend engine.
overwrite : bool
Whether to remove existing files or fail if files already exist.
add_version_hashes : bool
If True, version name and sha256sum of source file(s) will be added as a field among the global attributes.
preprocess : callable or str, optional
Preprocessing functions to perform over each Dataset.
Default: "auto" - Run preprocessing fixes based on supplied fields from metadata definition.
Callable - Runs function over Dataset (single) or supplied to `preprocess` (multifile dataset).
compute : bool
If True, files will be converted with each call to file conversion.
If False, will return a dask.Delayed object that can be computed later.
Default: True.
**xr_kwargs
Arguments passed directly to xarray.
Returns
-------
dict
xr.Dataset or xr.DataArray
"""
if not isinstance(input, xr.Dataset):
if isinstance(output_path, str):
output_path = Path(output_path)

if isinstance(input, (str, os.PathLike)):
if Path(input).is_dir():
if not isinstance(input_files, xr.Dataset):
if isinstance(input_files, (str, os.PathLike)):
if Path(input_files).is_dir():
files = []
files.extend([f for f in Path(input).glob("*.nc")])
files.extend([f for f in Path(input).glob("*.zarr")])
files.extend([f for f in Path(input_files).glob("*.nc")])
files.extend([f for f in Path(input_files).glob("*.zarr")])
else:
files = [Path(input)]
elif isinstance(input, (Sequence, Iterator)):
files = [Path(f) for f in input]
files = [Path(input_files)]
elif isinstance(input_files, (Sequence, Iterator)):
files = [Path(f) for f in input_files]
else:
files = input
files = input_files

version_hashes = dict()
if add_version_hashes:
Expand All @@ -838,7 +822,7 @@ def file_conversion(
if preprocess:
if preprocess == "auto":
preprocess_kwargs.update(
preprocess=partial(preprocess_corrections, project=project)
preprocess=partial(preprocessing_corrections, project=project)
)
elif isinstance(preprocess, Callable):
preprocess_kwargs.update(preprocess=preprocess)
Expand All @@ -853,14 +837,9 @@ def file_conversion(
if version_hashes:
ds.attrs.update(dict(original_files=str(version_hashes)))
else:
ds = input
ds = input_files

ds = dataset_corrections(ds, project)
ds.attrs["history"] = (
f"{datetime.datetime.now()}: "
f"Variables converted from original files using miranda.convert.{file_conversion.__name__}. "
f"{ds.attrs.get('history')}".strip()
)

if domain:
ds = subset_domain(ds, domain)
Expand All @@ -875,25 +854,4 @@ def file_conversion(
# mask = conservative_regrid(mask, ds)
ds = threshold_mask(ds, mask=mask, mask_cutoff=mask_cutoff)

outfile = name_output_file(ds, project, output_format)
outfile_path = output_path.joinpath(outfile)

if overwrite and outfile_path.exists():
logging.warning(f"Removing existing {output_format} files for {outfile}.")
if outfile_path.is_dir():
shutil.rmtree(outfile_path)
if outfile_path.is_file():
outfile_path.unlink()

logging.info(f"Writing {outfile}.")
write_object = delayed_write(
ds,
outfile_path,
output_format,
overwrite,
target_chunks=chunks,
)
if compute:
write_object.compute()
return dict(path=outfile_path)
return dict(path=outfile_path, object=write_object)
return ds

0 comments on commit 276e24f

Please sign in to comment.