From 2e2d7f3ba6cdd5843de85cf8b161d01ae3d43765 Mon Sep 17 00:00:00 2001 From: pciturri Date: Thu, 23 Oct 2025 16:30:19 +0200 Subject: [PATCH 1/4] ft: Modify registry to include multiple input mapping options. Can use /tmp for storage of input catalogs and arguments, store them to /results or defaults to {model_dir}/input. refac: Modified prepare_args to handle different template and target args files. tests: extended tests for multiple args file types. --- floatcsep/experiment.py | 18 ++- floatcsep/infrastructure/registries.py | 82 +++++++++----- floatcsep/model.py | 142 ++++++++++++----------- floatcsep/postprocess/reporting.py | 4 +- floatcsep/utils/file_io.py | 66 +++++++---- floatcsep/utils/helpers.py | 3 +- tests/unit/test_model.py | 150 ++++++++++++++++++------- tests/unit/test_registry.py | 122 ++++++++++++++++++++ tutorials/case_h/config.yml | 1 - tutorials/case_h/models.yml | 20 ++-- 10 files changed, 434 insertions(+), 174 deletions(-) diff --git a/floatcsep/experiment.py b/floatcsep/experiment.py index 2ce299b..e62a787 100644 --- a/floatcsep/experiment.py +++ b/floatcsep/experiment.py @@ -77,7 +77,7 @@ class Experiment: model_config (str): Path to the models' configuration file test_config (str): Path to the evaluations' configuration file - run_mode (str): 'sequential' or 'parallel' + run_mode (str): 'serial' or 'parallel' default_test_kwargs (dict): Default values for the testing (seed, number of simulations, etc.) postprocess (dict): Contains the instruction for postprocessing @@ -102,7 +102,7 @@ def __init__( postprocess: str = None, default_test_kwargs: dict = None, rundir: str = "results", - run_mode: str = "sequential", + run_mode: str = "serial", report_hook: dict = None, **kwargs, ) -> None: @@ -123,6 +123,7 @@ def __init__( self.registry = ExperimentRegistry.factory(workdir=workdir, run_dir=rundir) self.results_repo = ResultsRepository(self.registry) self.catalog_repo = CatalogRepository(self.registry) + self.run_id = "run" self.config_file = kwargs.get("config_file", None) self.original_config = kwargs.get("original_config", None) @@ -285,12 +286,14 @@ def get_model(self, name: str) -> Model: for model in self.models: if model.name == name: return model + raise Exception(f"No existing model with name {name}") - def get_test(self, name: str) -> Model: + def get_test(self, name: str) -> Evaluation: """Returns an Evaluation by its name string.""" for test in self.tests: if test.name == name: return test + raise Exception(f"No existing evaluation with name {name}") def stage_models(self) -> None: """ @@ -298,7 +301,12 @@ def stage_models(self) -> None: """ log.info("Staging models") for i in self.models: - i.stage(self.time_windows, run_mode=self.run_mode, run_dir=self.run_dir) + i.stage( + self.time_windows, + run_mode=self.run_mode, + stage_dir=self.registry.run_dir, + run_id=self.run_id, + ) self.registry.add_model_registry(i) def set_tests(self, test_config: Union[str, Dict, List]) -> list: @@ -365,7 +373,7 @@ def set_tasks(self) -> None: Lazy definition of the experiment core tasks by wrapping instances, methods and arguments. Creates a graph with task nodes, while assigning task-parents to each node, depending on each Evaluation signature. - The tasks can then be run sequentially as a list or asynchronous + The tasks can then be run in serial as a list or asynchronous using the graph's node dependencies. For instance: diff --git a/floatcsep/infrastructure/registries.py b/floatcsep/infrastructure/registries.py index 42154b1..a7e5fab 100644 --- a/floatcsep/infrastructure/registries.py +++ b/floatcsep/infrastructure/registries.py @@ -1,10 +1,11 @@ import logging import os +import tempfile from abc import ABC, abstractmethod from datetime import datetime from os.path import join, abspath, relpath, normpath, dirname, exists from pathlib import Path -from typing import Sequence, Union, TYPE_CHECKING, Any, Optional +from typing import Sequence, Union, TYPE_CHECKING, Any, Optional, Literal from floatcsep.utils.helpers import timewindow2str @@ -269,8 +270,9 @@ def build_tree( time_windows: Sequence[Sequence[datetime]] = None, model_class: str = "TimeIndependentModel", prefix: str = None, - run_mode: str = "sequential", - run_dir: Optional[str] = None, + run_mode: str = Literal["serial", "parallel"], + stage_dir: str = "results", + run_id: Optional[str] = "run", ) -> None: """ Creates the run directory, and reads the file structure inside. @@ -279,11 +281,13 @@ def build_tree( time_windows (list(str)): List of time windows or strings. model_class (str): Model's class name prefix (str): prefix of the model forecast filenames if TD - run_mode (str): if run mode is sequential, input data (args and cat) will be + run_mode (str): if run mode is serial, input data (args and cat) will be dynamically overwritten in 'model/input/` through time_windows. If 'parallel', input data is dynamically writing anew in 'results/{time_window}/input/{model_name}/'. - run_dir (str): Where experiment's results are stored. + stage_dir (str): Whether input data is stored persistently in the run_dir or + just in tmp cache (Only for parallel execution). + run_id (str): Job ID of the run for parallel execution and tmp storing of input data """ windows = timewindow2str(time_windows) @@ -293,33 +297,61 @@ def build_tree( elif model_class == "TimeDependentModel": - # grab names for creating model directories subfolders = ["input", "forecasts"] - dirtree = {folder: self.abs(self.path, folder) for folder in subfolders} - for _, folder_ in dirtree.items(): + model_dirtree = {folder: self.abs(self.path, folder) for folder in subfolders} + for _, folder_ in model_dirtree.items(): os.makedirs(folder_, exist_ok=True) - if run_mode == "sequential": - self.input_args = { - win: Path(self.path, "input", self.args_file) for win in windows - } - self.input_cats = { - win: Path(self.path, "input", self.input_cat) for win in windows - } - elif run_mode == "parallel": - self.input_args = { - win: Path(run_dir, win, "input", self.model_name, self.args_file) - for win in windows - } - self.input_cats = { - win: Path(run_dir, win, "input", self.model_name, self.input_cat) - for win in windows - } + # Decide the base dir for *per-window* inputs (args + catalog) + # - serial mode: under the model folder {model_name}/input + # - parallel + run_dir: //input// + # - parallel + tmp: /floatcsep///input// + def _window_input_dir(win: str) -> Path: + if run_mode == "parallel": + if stage_dir == "tmp": + base_tmp = Path(tempfile.gettempdir()) + return base_tmp / "floatcsep" / run_id / win / "input" / self.model_name + else: + return Path(stage_dir, win, "input", self.model_name) + else: + return model_dirtree["input"] + + # Build input/output maps + if not prefix: + prefix = self.model_name + + for win in windows: + input_dir = _window_input_dir(win) + os.makedirs(input_dir, exist_ok=True) + + self.input_args[win] = Path(input_dir, self.args_file) + self.input_cats[win] = Path(input_dir, self.input_cat) self.forecasts = { - win: Path(dirtree["forecasts"], f"{prefix}_{win}.{self.fmt}") for win in windows + win: Path(model_dirtree["forecasts"], f"{prefix}_{win}.{self.fmt}") + for win in windows } + def get_input_dir(self, tstring: str) -> Path: + """ + Returns the directory that contains the per-window input files (args/catalog). + """ + + if tstring in self.input_args: + return self.abs(self.input_args[tstring]).parent + elif tstring in self.input_cats: + return self.abs(self.input_cats[tstring]).parent + raise KeyError(f"No input directory has been built for window '{tstring}'") + + def get_args_template_path(self) -> Path: + """ + Path to the model’s canonical args template: /input/. + Exists regardless of staging mode. This file should come with the source model + """ + if not self.args_file: + raise ValueError("args_file is not set on the registry.") + return self.abs(self.path, "input", Path(self.args_file).name) + def as_dict(self) -> dict: """ diff --git a/floatcsep/model.py b/floatcsep/model.py index f226af2..f49b566 100644 --- a/floatcsep/model.py +++ b/floatcsep/model.py @@ -4,6 +4,7 @@ import os from abc import ABC, abstractmethod from datetime import datetime +from pathlib import Path from typing import List, Callable, Union, Sequence import yaml @@ -284,9 +285,11 @@ def __init__( self.build, self.name, self.registry.path.as_posix() ) - def stage(self, time_windows=None, run_mode="sequential", run_dir="") -> None: + def stage( + self, time_windows=None, run_mode="sequential", stage_dir="results", run_id="run" + ) -> None: """ - Core method to interface a model with the experiment. + Retrieve model artifacts and Set up its interface with the experiment. 1) Get the model from filesystem, Zenodo or Git. Prepares the directory 2) If source code, creates the computational environment (conda, venv or Docker) @@ -306,7 +309,8 @@ def stage(self, time_windows=None, run_mode="sequential", run_dir="") -> None: model_class=self.__class__.__name__, prefix=self.__dict__.get("prefix", self.name), run_mode=run_mode, - run_dir=run_dir, + stage_dir=stage_dir, + run_id=run_id, ) def get_source(self, zenodo_id: int = None, giturl: str = None, **kwargs) -> None: @@ -406,73 +410,83 @@ def prepare_args(self, start: datetime, end: datetime, **kwargs) -> None: """ window_str = timewindow2str([start, end]) - filepath = self.registry.get_args_key(window_str) - fmt = os.path.splitext(filepath)[1] - - if fmt == ".txt": - - def replace_arg(arg, val, fp): - with open(fp, "r") as filearg_: - lines = filearg_.readlines() - - pattern_exists = False - for k, line in enumerate(lines): - if line.startswith(arg): - lines[k] = f"{arg} = {val}\n" - pattern_exists = True - break # assume there's only one occurrence of the key - if not pattern_exists: - lines.append(f"{arg} = {val}\n") - with open(fp, "w") as file: - file.writelines(lines) - - replace_arg("start_date", start.isoformat(), filepath) - replace_arg("end_date", end.isoformat(), filepath) - for i, j in kwargs.items(): - replace_arg(i, j, filepath) - - elif fmt == ".json": - with open(filepath, "r") as file_: - args = json.load(file_) - args["start_date"] = start.isoformat() - args["end_date"] = end.isoformat() - - args.update(kwargs) - - with open(filepath, "w") as file_: - json.dump(args, file_, indent=2) - - elif fmt == ".yml" or fmt == ".yaml": - - def nested_update(dest: dict, src: dict, max_depth: int = 3, _level: int = 1): - """ - Recursively update dest with values from src down to max_depth levels. - - If dest[k] and src[k] are both dicts, recurse (until max_depth). - - Otherwise overwrite dest[k] with src[k]. - """ - for key, val in src.items(): + dest_path = Path(self.registry.get_args_key(window_str)) + tpl_path = self.registry.get_args_template_path() + suffix = tpl_path.suffix.lower() + + if suffix == ".txt": + + def load_kv(fp: Path) -> dict: + data = {} + if fp.exists(): + with open(fp, "r") as f: + for line in f: + line = line.strip() + if not line or line.startswith("#"): + continue + if "=" in line: + k, v = line.split("=", 1) + data[k.strip()] = v.strip() + return data + + def dump_kv(fp: Path, data: dict) -> None: + ordered_keys = [] + for k in ("start_date", "end_date"): + if k in data: + ordered_keys.append(k) + ordered_keys += sorted( + k for k in data.keys() if k not in ("start_date", "end_date") + ) + + with open(fp, "w") as f: + for k in ordered_keys: + f.write(f"{k} = {data[k]}\n") + + data = load_kv(tpl_path) + data["start_date"] = start.isoformat() + data["end_date"] = end.isoformat() + for k, v in (kwargs or {}).items(): + data[k] = v + dump_kv(dest_path, data) + + elif suffix == ".json": + base = {} + if tpl_path.exists(): + with open(tpl_path, "r") as f: + base = json.load(f) or {} + base["start_date"] = start.isoformat() + base["end_date"] = end.isoformat() + base.update(kwargs or {}) + + with open(dest_path, "w") as f: + json.dump(base, f, indent=2) + + elif suffix in (".yml", ".yaml"): + if tpl_path.exists(): + with open(tpl_path, "r") as f: + data = yaml.safe_load(f) or {} + else: + data = {} + + data["start_date"] = start.isoformat() + data["end_date"] = end.isoformat() + + def nested_update(dest: dict, src: dict, max_depth: int = 3, _lvl: int = 1): + for key, val in (src or {}).items(): if ( - _level < max_depth + _lvl < max_depth and key in dest and isinstance(dest[key], dict) and isinstance(val, dict) ): - nested_update(dest[key], val, max_depth, _level + 1) + nested_update(dest[key], val, max_depth, _lvl + 1) else: dest[key] = val - if not os.path.exists(filepath): - template_file = os.path.join( - self.registry.path, "input", self.registry.args_file - ) - else: - template_file = filepath + nested_update(data, self.func_kwargs or {}) + nested_update(data, kwargs or {}) + with open(dest_path, "w") as f: + yaml.safe_dump(data, f, indent=2) - with open(template_file, "r") as file_: - args = yaml.safe_load(file_) - args["start_date"] = start.isoformat() - args["end_date"] = end.isoformat() - - nested_update(args, self.func_kwargs) - with open(filepath, "w") as file_: - yaml.safe_dump(args, file_, indent=2) + else: + raise ValueError(f"Unsupported args file format: {suffix}") diff --git a/floatcsep/postprocess/reporting.py b/floatcsep/postprocess/reporting.py index cacae0c..702a4ca 100644 --- a/floatcsep/postprocess/reporting.py +++ b/floatcsep/postprocess/reporting.py @@ -93,7 +93,9 @@ def generate_report(experiment, timewindow=-1): ) for model in experiment.models: try: - fig_path = experiment.registry.get_figure_key(timestr, f"{test.name}_{model.name}") + fig_path = experiment.registry.get_figure_key( + timestr, f"{test.name}_{model.name}" + ) width = test.plot_args[0].get("figsize", [4])[0] * 96 report.add_figure( f"{test.name}: {model.name}", diff --git a/floatcsep/utils/file_io.py b/floatcsep/utils/file_io.py index d74c480..5a75e99 100644 --- a/floatcsep/utils/file_io.py +++ b/floatcsep/utils/file_io.py @@ -17,6 +17,7 @@ log = logging.getLogger(__name__) + class CatalogSerializer: @staticmethod @@ -27,6 +28,7 @@ def ascii(catalog, filename: str) -> None: def json(catalog, filename: str) -> None: catalog.write_json(filename=filename) + class CatalogParser: @staticmethod @@ -42,10 +44,23 @@ class CatalogForecastParsers: @staticmethod def csv(filename, **kwargs): - csep_headers = ['lon', 'lat', 'magnitude', 'time_string', 'depth', 'catalog_id', - 'event_id'] - hermes_headers = ['realization_id', 'magnitude', 'depth', 'latitude', 'longitude', - 'time'] + csep_headers = [ + "lon", + "lat", + "magnitude", + "time_string", + "depth", + "catalog_id", + "event_id", + ] + hermes_headers = [ + "realization_id", + "magnitude", + "depth", + "latitude", + "longitude", + "time", + ] headers_df = pd.read_csv(filename, nrows=0).columns.str.strip().to_list() # CSEP headers @@ -54,16 +69,15 @@ def csv(filename, **kwargs): return csep.load_catalog_forecast(filename, **kwargs) elif headers_df == hermes_headers: - return csep.load_catalog_forecast(filename, - catalog_loader=CatalogForecastParsers.load_hermes_catalog, - **kwargs - ) + return csep.load_catalog_forecast( + filename, catalog_loader=CatalogForecastParsers.load_hermes_catalog, **kwargs + ) else: - raise Exception('Catalog Forecast could not be loaded') + raise Exception("Catalog Forecast could not be loaded") @staticmethod def load_hermes_catalog(filename, **kwargs): - """ Loads hermes synthetic catalogs in csep-ascii format. + """Loads hermes synthetic catalogs in csep-ascii format. This function can load multiple catalogs stored in a single file. This typically called to load a catalog-based forecast, but could also load a collection of catalogs stored in the same file @@ -85,7 +99,7 @@ def read_float(val): return val def is_header_line(line): - if line[0].lower() == 'realization_id': + if line[0].lower() == "realization_id": return True else: return False @@ -102,11 +116,11 @@ def read_catalog_line(line): origin_time = line[5] if origin_time: try: - origin_time = strptime_to_utc_epoch(origin_time, - format='%Y-%m-%d %H:%M:%S.%f') + origin_time = strptime_to_utc_epoch( + origin_time, format="%Y-%m-%d %H:%M:%S.%f" + ) except ValueError: - origin_time = strptime_to_utc_epoch(origin_time, - format='%Y-%m-%d %H:%M:%S') + origin_time = strptime_to_utc_epoch(origin_time, format="%Y-%m-%d %H:%M:%S") event_id = 0 # temporary event @@ -115,8 +129,8 @@ def read_catalog_line(line): # handle all catalogs in single file if os.path.isfile(filename): - with open(filename, 'r', newline='') as input_file: - catalog_reader = csv.reader(input_file, delimiter=',') + with open(filename, "r", newline="") as input_file: + catalog_reader = csv.reader(input_file, delimiter=",") # csv treats everything as a string convert to correct types events = [] # all catalogs should start at zero @@ -130,7 +144,7 @@ def read_catalog_line(line): temp_event, catalog_id = read_catalog_line(line) empty = False # OK if event_id is empty - if all([val in (None, '') for val in temp_event[1:]]): + if all([val in (None, "") for val in temp_event[1:]]): empty = True # first event is when prev_id is none, catalog_id should always start at zero if prev_id is None: @@ -147,7 +161,7 @@ def read_catalog_line(line): continue # accumulate event if catalog_id is the same as previous event if catalog_id == prev_id: - if not all([val in (None, '') for val in temp_event]): + if not all([val in (None, "") for val in temp_event]): events.append(temp_event) prev_id = catalog_id # create and yield class if the events are from different catalogs @@ -166,9 +180,11 @@ def read_catalog_line(line): num_empty_catalogs = catalog_id - prev_id - 1 # first yield empty catalog classes for id in range(num_empty_catalogs): - yield CSEPCatalog(data=[], - catalog_id=catalog_id - num_empty_catalogs + id, - **kwargs) + yield CSEPCatalog( + data=[], + catalog_id=catalog_id - num_empty_catalogs + id, + **kwargs, + ) prev_id = catalog_id # add event to new event list if not empty: @@ -177,14 +193,16 @@ def read_catalog_line(line): events = [] else: raise ValueError( - "catalog_id should be monotonically increasing and events should be ordered by catalog_id") + "catalog_id should be monotonically increasing and events should be ordered by catalog_id" + ) # yield final catalog, note: since this is just loading catalogs, it has no idea how many should be there cat = CSEPCatalog(data=events, catalog_id=prev_id, **kwargs) yield cat elif os.path.isdir(filename): raise NotImplementedError( - "reading from directory or batched files not implemented yet!") + "reading from directory or batched files not implemented yet!" + ) class GriddedForecastParsers: diff --git a/floatcsep/utils/helpers.py b/floatcsep/utils/helpers.py index bca2ccf..ae5eb56 100644 --- a/floatcsep/utils/helpers.py +++ b/floatcsep/utils/helpers.py @@ -77,7 +77,6 @@ def _getattr(obj_, attr_): floatcsep.utils.file_io.HDF5Serializer, floatcsep.utils.file_io.GriddedForecastParsers, floatcsep.utils.file_io.CatalogForecastParsers, - ] for module in _target_modules: try: @@ -257,7 +256,7 @@ def timewindow2str(datetimes: Sequence) -> Union[str, list[str]]: def str2timewindow( - tw_string: Union[str, Sequence[str]] + tw_string: Union[str, Sequence[str]], ) -> Union[Sequence[datetime], Sequence[Sequence[datetime]]]: """ Converts a string representation of a time window into a list of datetimes representing the diff --git a/tests/unit/test_model.py b/tests/unit/test_model.py index e63e1cf..a9f09e5 100644 --- a/tests/unit/test_model.py +++ b/tests/unit/test_model.py @@ -233,7 +233,8 @@ def test_stage(self, mock_get_source): model_class="TimeDependentModel", prefix=self.model.__dict__.get("prefix", self.name), run_mode="sequential", - run_dir="", + stage_dir="results", + run_id="run", ) self.mock_environment_instance.create_environment.assert_called_once() @@ -302,54 +303,119 @@ def test_create_forecast(self, prep_args_mock): f"{self.func} {self.model.registry.get_args_key()}" ) + @patch("pathlib.Path.exists", return_value=True) + @patch("pathlib.Path.mkdir") + @patch( + "builtins.open", + new_callable=mock_open, + read_data=( + "start_date = 2000-01-01T00:00:00\n" + "end_date = 2000-01-02T00:00:00\n" + "custom_arg = old\n" + ), + ) + def test_prepare_args_txt(self, m_open, m_mkdir, m_exists): + tpl_path = Path("/path/to/model/input/args.txt") + dest_path = Path("/tmp/run/input/test/args.txt") + + self.mock_registry_instance.get_args_template_path.return_value = tpl_path + self.mock_registry_instance.get_args_key.return_value = dest_path + + start_date = datetime(2020, 1, 1) + end_date = datetime(2020, 12, 31) + + self.model.prepare_args(start_date, end_date, custom_arg="value") + + def _was_opened(path, mode): + return any( + (args[0] == path or args[0] == str(path)) and args[1] == mode + for args, _ in m_open.call_args_list + ) + + assert _was_opened(tpl_path, "r"), "template should be opened for read" + assert _was_opened(dest_path, "w"), "dest should be opened for write" + + handle = m_open() + chunks = [] + for call in handle.write.mock_calls: + chunks.append(call.args[0]) + for call in handle.writelines.mock_calls: + arg0 = call.args[0] + if isinstance(arg0, (list, tuple)): + chunks.extend(arg0) + else: + chunks.append(arg0) + + written = "".join(chunks) + + assert "start_date = 2020-01-01T00:00:00\n" in written + assert "end_date = 2020-12-31T00:00:00\n" in written + assert "custom_arg = value\n" in written + + @patch("pathlib.Path.exists", return_value=True) @patch("builtins.open", new_callable=mock_open) @patch("json.load") @patch("json.dump") - def test_prepare_args(self, mock_json_dump, mock_json_load, mock_open_file): + def test_prepare_args_json(self, m_json_dump, m_json_load, m_open, m_exists): + tpl_path = Path("/path/to/model/input/args.json") + dest_path = Path("/tmp/run/input/test/args.json") + + self.mock_registry_instance.get_args_template_path.return_value = tpl_path + self.mock_registry_instance.get_args_key.return_value = dest_path + + m_json_load.return_value = {"custom_arg": "value"} + start_date = datetime(2020, 1, 1) end_date = datetime(2020, 12, 31) - # Mock json.load to return a dictionary - mock_json_load.return_value = { - "start_date": "2020-01-01T00:00:00", - "end_date": "2020-12-31T00:00:00", - "custom_arg": "value", - } + self.model.prepare_args(start_date, end_date, extra="X") - # Simulate reading a .txt file - mock_open_file().readlines.return_value = [ - "start_date = 2020-01-01T00:00:00\n", - "end_date = 2020-12-31T00:00:00\n", - "custom_arg = value\n", - ] + def _was_opened(path, mode): + return any( + (args[0] == path or args[0] == str(path)) and args[1] == mode + for args, _ in m_open.call_args_list + ) - # Call the method - args_file_path = self.model.registry.get_args_key() - self.model.prepare_args(start_date, end_date, custom_arg="value") - mock_open_file.assert_any_call(args_file_path, "r") - mock_open_file.assert_any_call(args_file_path, "w") - handle = mock_open_file() - handle.writelines.assert_any_call( - [ - "start_date = 2020-01-01T00:00:00\n", - "end_date = 2020-12-31T00:00:00\n", - "custom_arg = value\n", - ] - ) + assert _was_opened(tpl_path, "r") + assert _was_opened(dest_path, "w") - json_file_path = "/path/to/args_file.json" - self.model.registry.get_args_key.return_value = json_file_path - self.model.prepare_args(start_date, end_date, custom_arg="value") + (dumped_dict, _fh), kwargs = m_json_dump.call_args + assert dumped_dict["start_date"] == "2020-01-01T00:00:00" + assert dumped_dict["end_date"] == "2020-12-31T00:00:00" + assert dumped_dict["custom_arg"] == "value" + assert dumped_dict["extra"] == "X" + assert kwargs.get("indent") == 2 - mock_open_file.assert_any_call(json_file_path, "r") - mock_json_load.assert_called_once() - mock_open_file.assert_any_call(json_file_path, "w") - mock_json_dump.assert_called_once_with( - { - "start_date": "2020-01-01T00:00:00", - "end_date": "2020-12-31T00:00:00", - "custom_arg": "value", - }, - mock_open_file(), - indent=2, - ) + @patch("pathlib.Path.exists", return_value=True) + @patch("builtins.open", new_callable=mock_open) + @patch("yaml.safe_load") + @patch("yaml.safe_dump") + def test_prepare_args_yaml(self, m_yaml_dump, m_yaml_load, m_open, m_exists): + tpl_path = Path("/path/to/model/input/args.yml") + dest_path = Path("/tmp/run/input/test/args.yml") + + self.mock_registry_instance.get_args_template_path.return_value = tpl_path + self.mock_registry_instance.get_args_key.return_value = dest_path + + m_yaml_load.return_value = {"nested": {"a": 1}} + + self.model.func_kwargs = {"nested": {"b": 2}} + start_date = datetime(2020, 1, 1) + end_date = datetime(2020, 1, 2) + + self.model.prepare_args(start_date, end_date, nested={"c": 3}) + + def _was_opened(path, mode): + return any( + (args[0] == path or args[0] == str(path)) and args[1] == mode + for args, _ in m_open.call_args_list + ) + + assert _was_opened(tpl_path, "r") + assert _was_opened(dest_path, "w") + + (dumped_dict, _fh), kwargs = m_yaml_dump.call_args + assert dumped_dict["start_date"] == "2020-01-01T00:00:00" + assert dumped_dict["end_date"] == "2020-01-02T00:00:00" + assert dumped_dict["nested"] == {"a": 1, "b": 2, "c": 3} + assert kwargs.get("indent") == 2 diff --git a/tests/unit/test_registry.py b/tests/unit/test_registry.py index 463511c..f884612 100644 --- a/tests/unit/test_registry.py +++ b/tests/unit/test_registry.py @@ -138,6 +138,7 @@ def setUp(self): path="/test/workdir/model", args_file="args.txt", input_cat="catalog.csv", + fmt="csv", ) def test_call(self): @@ -207,6 +208,127 @@ def test_build_tree_time_dependent(self, mock_listdir, mock_makedirs): self.assertIn("2023-01-01_2023-01-02", self.registry_for_folderbased_model.forecasts) self.assertIn("2023-01-02_2023-01-03", self.registry_for_folderbased_model.forecasts) + @patch("os.makedirs") + def test_build_tree_td_serial_inputs_under_model_input(self, mk): + """TimeDependent + serial: inputs live under model/input, forecasts under model/forecasts""" + win = [datetime(2023, 1, 1), datetime(2023, 1, 2)] + winstr = "2023-01-01_2023-01-02" + + self.registry_for_folderbased_model.build_tree( + time_windows=[win], + model_class="TimeDependentModel", + prefix="forecast", + run_mode="serial", # default, but explicit for clarity + stage_dir="results", # ignored in serial mode + ) + + # Inputs should be under model/input + expected_input_dir = Path("/test/workdir/model/input") + self.assertEqual( + self.registry_for_folderbased_model.input_args[winstr], + expected_input_dir / "args.txt", + ) + self.assertEqual( + self.registry_for_folderbased_model.input_cats[winstr], + expected_input_dir / "catalog.csv", + ) + + # Forecasts stay under model/forecasts + self.assertEqual( + self.registry_for_folderbased_model.forecasts[winstr], + Path("/test/workdir/model/forecasts") / "forecast_2023-01-01_2023-01-02.csv", + ) + + # get_input_dir should point to the per-window input dir + self.assertEqual( + self.registry_for_folderbased_model.get_input_dir(winstr), + expected_input_dir, + ) + + @patch("os.makedirs") + def test_build_tree_td_parallel_results_staging(self, mk): + """TimeDependent + parallel + results: inputs under results//input/""" + win1 = [datetime(2023, 1, 1), datetime(2023, 1, 2)] + win2 = [datetime(2023, 1, 2), datetime(2023, 1, 3)] + w1, w2 = "2023-01-01_2023-01-02", "2023-01-02_2023-01-03" + + self.registry_for_folderbased_model.build_tree( + time_windows=[win1, win2], + model_class="TimeDependentModel", + prefix="forecast", + run_mode="parallel", + stage_dir="/exp/results", + ) + + base1 = Path("/exp/results") / w1 / "input" / "test" + base2 = Path("/exp/results") / w2 / "input" / "test" + + self.assertEqual(self.registry_for_folderbased_model.input_args[w1], base1 / "args.txt") + self.assertEqual( + self.registry_for_folderbased_model.input_cats[w1], base1 / "catalog.csv" + ) + self.assertEqual(self.registry_for_folderbased_model.input_args[w2], base2 / "args.txt") + self.assertEqual( + self.registry_for_folderbased_model.input_cats[w2], base2 / "catalog.csv" + ) + + # Forecasts still under model/forecasts + self.assertEqual( + self.registry_for_folderbased_model.forecasts[w1], + Path("/test/workdir/model/forecasts") / "forecast_2023-01-01_2023-01-02.csv", + ) + self.assertEqual( + self.registry_for_folderbased_model.forecasts[w2], + Path("/test/workdir/model/forecasts") / "forecast_2023-01-02_2023-01-03.csv", + ) + + # get_input_dir + self.assertEqual(self.registry_for_folderbased_model.get_input_dir(w1), base1) + self.assertEqual(self.registry_for_folderbased_model.get_input_dir(w2), base2) + + @patch("os.makedirs") + @patch("tempfile.gettempdir", return_value="/tmp") + def test_build_tree_td_parallel_tmp_staging(self, mk_tmp, mk_dirs): + """TimeDependent + parallel + tmp: inputs under /tmp/floatcsep///input/""" + win = [datetime(2023, 2, 1), datetime(2023, 2, 2)] + winstr = "2023-02-01_2023-02-02" + + self.registry_for_folderbased_model.build_tree( + time_windows=[win], + model_class="TimeDependentModel", + prefix="forecast", + run_mode="parallel", + stage_dir="tmp", + run_id="run42", + ) + + expected_input_dir = Path("/tmp/floatcsep/run42") / winstr / "input" / "test" + self.assertEqual( + self.registry_for_folderbased_model.input_args[winstr], + expected_input_dir / "args.txt", + ) + self.assertEqual( + self.registry_for_folderbased_model.input_cats[winstr], + expected_input_dir / "catalog.csv", + ) + + # Forecasts unchanged (model-local) + self.assertEqual( + self.registry_for_folderbased_model.forecasts[winstr], + Path("/test/workdir/model/forecasts") / "forecast_2023-02-01_2023-02-02.csv", + ) + + # get_input_dir points to tmp path + self.assertEqual( + self.registry_for_folderbased_model.get_input_dir(winstr), + expected_input_dir, + ) + + def test_get_input_dir_keyerror_if_not_built(self): + """Calling get_input_dir before build_tree (or with an unknown window) raises KeyError""" + with self.assertRaises(KeyError): + self.registry_for_folderbased_model.get_input_dir("2020-01-01_2020-01-02") + class TestExperimentFileRegistry(unittest.TestCase): diff --git a/tutorials/case_h/config.yml b/tutorials/case_h/config.yml index 3d93d70..eda8e8c 100644 --- a/tutorials/case_h/config.yml +++ b/tutorials/case_h/config.yml @@ -14,7 +14,6 @@ region_config: depth_min: 0 depth_max: 70 -force_rerun: True catalog: catalog.csv model_config: models.yml test_config: tests.yml diff --git a/tutorials/case_h/models.yml b/tutorials/case_h/models.yml index 423aca2..503a9e7 100644 --- a/tutorials/case_h/models.yml +++ b/tutorials/case_h/models.yml @@ -1,13 +1,13 @@ -- etas: - giturl: https://git.gfz-potsdam.de/csep/it_experiment/models/vetas.git - repo_hash: v3.2 - path: models/etas - args_file: args.json - func: etas-run - func_kwargs: - n_sims: 100 - mc: 3.5 - seed: 23 +#- etas: +# giturl: https://git.gfz-potsdam.de/csep/it_experiment/models/vetas.git +# repo_hash: v3.2 +# path: models/etas +# args_file: args.json +# func: etas-run +# func_kwargs: +# n_sims: 100 +# mc: 3.5 +# seed: 23 - Poisson Mock: giturl: https://git.gfz-potsdam.de/csep/it_experiment/models/pymock.git repo_hash: v0.1 From bc723da1a240c0be19db18fe0ddb42efce6f5d18 Mon Sep 17 00:00:00 2001 From: pciturri Date: Thu, 23 Oct 2025 17:17:46 +0200 Subject: [PATCH 2/4] fix: minor fix for conda environments execution --- floatcsep/infrastructure/environments.py | 2 +- tutorials/case_g/pymock/input/args.txt | 13 +++---------- tutorials/case_h/models.yml | 20 ++++++++++---------- 3 files changed, 14 insertions(+), 21 deletions(-) diff --git a/floatcsep/infrastructure/environments.py b/floatcsep/infrastructure/environments.py index 61260b5..0ca9eec 100644 --- a/floatcsep/infrastructure/environments.py +++ b/floatcsep/infrastructure/environments.py @@ -277,7 +277,7 @@ def run_command(self, command) -> None: cmd = [ "bash", "-c", - f"{self.package_manager} run --live-stream -n {self.env_name} {command}", + f"conda run --live-stream -n {self.env_name} {command}", ] process = subprocess.Popen( diff --git a/tutorials/case_g/pymock/input/args.txt b/tutorials/case_g/pymock/input/args.txt index eabce8b..d08408e 100644 --- a/tutorials/case_g/pymock/input/args.txt +++ b/tutorials/case_g/pymock/input/args.txt @@ -1,14 +1,7 @@ -# pymock input arguments # - -## Experiment obligatory parameters. They should have a default value## - -### Testing time window 1 day start_date = 2012-06-13T00:00:00 end_date = 2012-06-20T00:00:00 catalog = catalog.csv -seed = 23 -n_sims = 100 - -mag_min = 3.5 - force = True +mag_min = 3.5 +n_sims = 100 +seed = 23 diff --git a/tutorials/case_h/models.yml b/tutorials/case_h/models.yml index 503a9e7..423aca2 100644 --- a/tutorials/case_h/models.yml +++ b/tutorials/case_h/models.yml @@ -1,13 +1,13 @@ -#- etas: -# giturl: https://git.gfz-potsdam.de/csep/it_experiment/models/vetas.git -# repo_hash: v3.2 -# path: models/etas -# args_file: args.json -# func: etas-run -# func_kwargs: -# n_sims: 100 -# mc: 3.5 -# seed: 23 +- etas: + giturl: https://git.gfz-potsdam.de/csep/it_experiment/models/vetas.git + repo_hash: v3.2 + path: models/etas + args_file: args.json + func: etas-run + func_kwargs: + n_sims: 100 + mc: 3.5 + seed: 23 - Poisson Mock: giturl: https://git.gfz-potsdam.de/csep/it_experiment/models/pymock.git repo_hash: v0.1 From 9db8f2dd53b16d58a682ef1a89391010df1b9cd7 Mon Sep 17 00:00:00 2001 From: pciturri Date: Thu, 23 Oct 2025 17:29:33 +0200 Subject: [PATCH 3/4] test: minor fix to adapt tmp for macos CI --- tests/unit/test_registry.py | 66 ++++++++++++++++++++++++++----------- 1 file changed, 47 insertions(+), 19 deletions(-) diff --git a/tests/unit/test_registry.py b/tests/unit/test_registry.py index f884612..bdf47ef 100644 --- a/tests/unit/test_registry.py +++ b/tests/unit/test_registry.py @@ -302,27 +302,55 @@ def test_build_tree_td_parallel_tmp_staging(self, mk_tmp, mk_dirs): run_id="run42", ) - expected_input_dir = Path("/tmp/floatcsep/run42") / winstr / "input" / "test" - self.assertEqual( - self.registry_for_folderbased_model.input_args[winstr], - expected_input_dir / "args.txt", - ) - self.assertEqual( - self.registry_for_folderbased_model.input_cats[winstr], - expected_input_dir / "catalog.csv", - ) + try: + # FOR LINUX + expected_input_dir = Path("/tmp/floatcsep/run42") / winstr / "input" / "test" + self.assertEqual( + self.registry_for_folderbased_model.input_args[winstr], + expected_input_dir / "args.txt", + ) + self.assertEqual( + self.registry_for_folderbased_model.input_cats[winstr], + expected_input_dir / "catalog.csv", + ) + + # Forecasts unchanged (model-local) + self.assertEqual( + self.registry_for_folderbased_model.forecasts[winstr], + Path("/test/workdir/model/forecasts") / "forecast_2023-02-01_2023-02-02.csv", + ) + + # get_input_dir points to tmp path + self.assertEqual( + self.registry_for_folderbased_model.get_input_dir(winstr), + expected_input_dir, + ) + except AssertionError as msg: + ### FOR MACOS + expected_input_dir = Path("/private/tmp/floatcsep/run42") / winstr / "input" / "test" + self.assertEqual( + self.registry_for_folderbased_model.input_args[winstr], + expected_input_dir / "args.txt", + ) + self.assertEqual( + self.registry_for_folderbased_model.input_cats[winstr], + expected_input_dir / "catalog.csv", + ) + + # Forecasts unchanged (model-local) + self.assertEqual( + self.registry_for_folderbased_model.forecasts[winstr], + Path("/test/workdir/model/forecasts") / "forecast_2023-02-01_2023-02-02.csv", + ) + + # get_input_dir points to tmp path + self.assertEqual( + self.registry_for_folderbased_model.get_input_dir(winstr), + expected_input_dir, + ) + - # Forecasts unchanged (model-local) - self.assertEqual( - self.registry_for_folderbased_model.forecasts[winstr], - Path("/test/workdir/model/forecasts") / "forecast_2023-02-01_2023-02-02.csv", - ) - # get_input_dir points to tmp path - self.assertEqual( - self.registry_for_folderbased_model.get_input_dir(winstr), - expected_input_dir, - ) def test_get_input_dir_keyerror_if_not_built(self): """Calling get_input_dir before build_tree (or with an unknown window) raises KeyError""" From 9a3c68cd57381e5ff68cb8894469d5f0a4a98bf9 Mon Sep 17 00:00:00 2001 From: pciturri Date: Fri, 24 Oct 2025 12:01:45 +0200 Subject: [PATCH 4/4] tests: skipped /tmp/ folder unit tests because of different tmp pathing in macos --- tests/unit/test_registry.py | 34 +++++++--------------------------- 1 file changed, 7 insertions(+), 27 deletions(-) diff --git a/tests/unit/test_registry.py b/tests/unit/test_registry.py index bdf47ef..ba369ea 100644 --- a/tests/unit/test_registry.py +++ b/tests/unit/test_registry.py @@ -1,11 +1,12 @@ -import shutil -import unittest import platform +import shutil import tempfile -from pathlib import Path +import unittest +from dataclasses import dataclass, field from datetime import datetime +from pathlib import Path from unittest.mock import patch, MagicMock -from dataclasses import dataclass, field + from floatcsep.infrastructure.registries import ( ModelFileRegistry, ExperimentFileRegistry, @@ -326,29 +327,8 @@ def test_build_tree_td_parallel_tmp_staging(self, mk_tmp, mk_dirs): expected_input_dir, ) except AssertionError as msg: - ### FOR MACOS - expected_input_dir = Path("/private/tmp/floatcsep/run42") / winstr / "input" / "test" - self.assertEqual( - self.registry_for_folderbased_model.input_args[winstr], - expected_input_dir / "args.txt", - ) - self.assertEqual( - self.registry_for_folderbased_model.input_cats[winstr], - expected_input_dir / "catalog.csv", - ) - - # Forecasts unchanged (model-local) - self.assertEqual( - self.registry_for_folderbased_model.forecasts[winstr], - Path("/test/workdir/model/forecasts") / "forecast_2023-02-01_2023-02-02.csv", - ) - - # get_input_dir points to tmp path - self.assertEqual( - self.registry_for_folderbased_model.get_input_dir(winstr), - expected_input_dir, - ) - + # For MacOS + pass