Skip to content

Commit

Permalink
Merge pull request #106 from Ouranosinc/propmeas-combo
Browse files Browse the repository at this point in the history
Minor fixes following ESPO-R5's diagnostics
  • Loading branch information
aulemahal committed Nov 1, 2022
2 parents c377ca5 + b091f30 commit b53bb9d
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 22 deletions.
3 changes: 3 additions & 0 deletions HISTORY.rst
Expand Up @@ -9,6 +9,8 @@ Contributors to this version: Gabriel Rondeau-Genesse (:user:`RondeauG`), Juliet
New features and enhancements
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
* Possibility of excluding variables read from file from the catalog produced by ``parse_directory``. (:pull:`107`).
* New "timeout_cleanup" option for ``save_to_zarr``, which removes variables that were in the process of being written when receiving a ``TimeoutException``. (:pull:`106`).
* New ``scripting.skippable`` context, allowing the use of CTRL-C to skip code sections. (:pull:`106`)

Breaking changes
^^^^^^^^^^^^^^^^
Expand All @@ -17,6 +19,7 @@ Bug fixes
^^^^^^^^^
* ``clean_up`` now converts the calendar of variables that use "interpolate" in "missing_by_var" at the same time.
Hence, when it is a conversion from a 360_day calendar, the random dates are the same for all of the these variables. (:issue:`102`, :pull:`104`)
* ``properties_and_measures`` no longer casts month coordinates to string (:pull:`106`).

Internal changes
^^^^^^^^^^^^^^^^
Expand Down
4 changes: 2 additions & 2 deletions xscen/config.py
Expand Up @@ -156,12 +156,12 @@ def _wrapper(*args, **kwargs):
sig = inspect.signature(func)
if CONFIG.get("print_it_all"):
logger.debug(f"For func {func}, found config {from_config}.")
logger.debug("Original kwargs :", kwargs)
logger.debug(f"Original kwargs : {kwargs}")
for k, v in from_config.items():
if k in sig.parameters:
kwargs.setdefault(k, v)
if CONFIG.get("print_it_all"):
logger.debug("Modified kwargs :", kwargs)
logger.debug(f"Modified kwargs : {kwargs}")

return func(*args, **kwargs)

Expand Down
2 changes: 0 additions & 2 deletions xscen/diagnostics.py
Expand Up @@ -244,8 +244,6 @@ def properties_and_measures(
# to be able to save in zarr, convert object to string
if "season" in ds1:
ds1["season"] = ds1.season.astype("str")
if "month" in ds1:
ds1["month"] = ds1.month.astype("str")

prop.attrs["cat:processing_level"] = to_level_prop
meas.attrs["cat:processing_level"] = to_level_meas
Expand Down
6 changes: 4 additions & 2 deletions xscen/extract.py
Expand Up @@ -528,7 +528,7 @@ def search_data_catalogs(
match_hist_and_fut: bool, optional
If True, historical and future simulations will be combined into the same line, and search results lacking one of them will be rejected.
periods : list
[start, end] of the period to be evaluated (or a list of lists)
[start, end] of the period to be evaluated (or a list of lists).
id_columns : list, optional
List of columns used to create a id column. If None is given, the original
"id" is left.
Expand Down Expand Up @@ -1064,14 +1064,16 @@ def _subset_file_coverage(
else:
guessed_nb_hrs_sum = period_nb_hrs

# 'coverage' adds some leeway, for example to take different calendars into account or missing 2100-12-31
if (
guessed_nb_hrs / period_nb_hrs < coverage
or len(df[files_in_range]) == 0
or guessed_nb_hrs_sum.nanos / period_nb_hrs.nanos < coverage
):
logging.warning(
f"{df['id'].iloc[0] + ': ' if 'id' in df.columns else ''}Insufficient coverage."
f"% covered, min to max : {guessed_nb_hrs / period_nb_hrs:.1%}, "
f"% covered, sum of hours : {guessed_nb_hrs_sum.nanos / period_nb_hrs.nanos:.1%}, "
f"number of files in range : {len(df[files_in_range])}."
)
return pd.DataFrame(columns=df.columns)

Expand Down
40 changes: 31 additions & 9 deletions xscen/io.py
Expand Up @@ -13,6 +13,7 @@
from xclim.core.calendar import get_calendar

from .config import parse_config
from .scripting import TimeoutException
from .utils import translate_time_chunk

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -358,6 +359,7 @@ def save_to_zarr(
encoding: dict = None,
mode: str = "f",
itervar: bool = False,
timeout_cleanup: bool = True,
) -> None:
"""
Saves a Dataset to Zarr, rechunking if requested.
Expand Down Expand Up @@ -385,6 +387,10 @@ def save_to_zarr(
itervar : bool
If True, (data) variables are written one at a time, appending to the zarr.
If False, this function computes, no matter what was passed to kwargs.
timeout_cleanup : bool
If True (default) and a :py:class:`xscen.scripting.TimeoutException` is raised during the writing,
the variable being written is removed from the dataset as it is incomplete.
This does nothing if `compute` is False.
Returns
-------
Expand Down Expand Up @@ -481,17 +487,33 @@ def coerce_attrs(attrs):
for i, (name, var) in enumerate(ds.data_vars.items()):
logger.debug(f"Writing {name} ({i + 1} of {len(ds.data_vars)}) to {path}")
dsvar = ds.drop_vars(allvars - {name})
dsvar.to_zarr(
path,
mode="a",
encoding={k: v for k, v in (encoding or {}).items() if k in dsvar},
**zarr_kwargs,
)
try:
dsvar.to_zarr(
path,
mode="a",
encoding={k: v for k, v in (encoding or {}).items() if k in dsvar},
**zarr_kwargs,
)
except TimeoutException:
if timeout_cleanup:
logger.info(f"Removing incomplete {name}.")
sh.rmtree(path / name)
raise

else:
logger.debug(f"Writing {list(ds.data_vars.keys())} for {filename}.")
return ds.to_zarr(
filename, compute=compute, mode="a", encoding=encoding, **zarr_kwargs
)
try:
return ds.to_zarr(
filename, compute=compute, mode="a", encoding=encoding, **zarr_kwargs
)
except TimeoutException:
if timeout_cleanup:
logger.info(
f"Removing incomplete {list(ds.data_vars.keys())} for {filename}."
)
for name in ds.data_vars:
sh.rmtree(path / name)
raise


@parse_config
Expand Down
57 changes: 50 additions & 7 deletions xscen/scripting.py
Expand Up @@ -28,6 +28,7 @@
"measure_time",
"timeout",
"TimeoutException",
"skippable",
]


Expand Down Expand Up @@ -281,17 +282,59 @@ def timeout(seconds: int, task: str = ""):
----------
seconds : int
Number of seconds after which the context exits with a TimeoutException.
If None or negative, no timeout is set and this context does nothing.
task : str, optional
A name to give to the task, allowing a more meaningful exception.
"""
if seconds is None or seconds <= 0:
yield
else:

def _timeout_handler(signum, frame):
raise TimeoutException(seconds, task)

old_handler = signal.signal(signal.SIGALRM, _timeout_handler)
signal.alarm(seconds)
try:
yield
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, old_handler)


@contextmanager
def skippable(seconds: int = 2, task: str = "", logger: logging.Logger = None):
"""Skippable context manager.
def _timeout_handler(signum, frame):
raise TimeoutException(seconds, task)
When CTRL-C (SIGINT, KeyboardInterrupt) is sent within the context,
this catches it, prints to the log and gives a timeout during which a subsequent
interruption will stop the script. Otherwise, the context exits normally.
old_handler = signal.signal(signal.SIGALRM, _timeout_handler)
signal.alarm(seconds)
This is meant to be used within a loop so we can skip some iterations:
>>> for i in iterable:
>>> with skippable(2, i):
>>> ... skippable code ...
Parameters
----------
seconds: int
Number of seconds to wait for a second CTRL-C.
task : str
A name for the skippable task, to have an explicit script.
logger : logging.Logger, optional
The logger to use when printing the messages. The interruption signal is
notified with ERROR, while the skipping is notified with INFO.
If not given (default), a brutal print is used.
"""
if logger:
err = logger.error
inf = logger.info
else:
err = inf = print
try:
yield
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, old_handler)
except KeyboardInterrupt:
err("Received SIGINT. Do it again to stop the script.")
time.sleep(seconds)
inf(f"Skipping {task}.")

0 comments on commit b53bb9d

Please sign in to comment.