Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions floatcsep/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class Experiment:

model_config (str): Path to the models' configuration file
test_config (str): Path to the evaluations' configuration file
run_mode (str): 'sequential' or 'parallel'
run_mode (str): 'serial' or 'parallel'
default_test_kwargs (dict): Default values for the testing
(seed, number of simulations, etc.)
postprocess (dict): Contains the instruction for postprocessing
Expand All @@ -102,7 +102,7 @@ def __init__(
postprocess: str = None,
default_test_kwargs: dict = None,
rundir: str = "results",
run_mode: str = "sequential",
run_mode: str = "serial",
report_hook: dict = None,
**kwargs,
) -> None:
Expand All @@ -123,6 +123,7 @@ def __init__(
self.registry = ExperimentRegistry.factory(workdir=workdir, run_dir=rundir)
self.results_repo = ResultsRepository(self.registry)
self.catalog_repo = CatalogRepository(self.registry)
self.run_id = "run"

self.config_file = kwargs.get("config_file", None)
self.original_config = kwargs.get("original_config", None)
Expand Down Expand Up @@ -285,20 +286,27 @@ def get_model(self, name: str) -> Model:
for model in self.models:
if model.name == name:
return model
raise Exception(f"No existing model with name {name}")

def get_test(self, name: str) -> Model:
def get_test(self, name: str) -> Evaluation:
"""Returns an Evaluation by its name string."""
for test in self.tests:
if test.name == name:
return test
raise Exception(f"No existing evaluation with name {name}")

def stage_models(self) -> None:
"""
Stages all the experiment's models. See :meth:`floatcsep.model.Model.stage`
"""
log.info("Staging models")
for i in self.models:
i.stage(self.time_windows, run_mode=self.run_mode, run_dir=self.run_dir)
i.stage(
self.time_windows,
run_mode=self.run_mode,
stage_dir=self.registry.run_dir,
run_id=self.run_id,
)
self.registry.add_model_registry(i)

def set_tests(self, test_config: Union[str, Dict, List]) -> list:
Expand Down Expand Up @@ -365,7 +373,7 @@ def set_tasks(self) -> None:
Lazy definition of the experiment core tasks by wrapping instances,
methods and arguments. Creates a graph with task nodes, while assigning
task-parents to each node, depending on each Evaluation signature.
The tasks can then be run sequentially as a list or asynchronous
The tasks can then be run in serial as a list or asynchronous
using the graph's node dependencies.
For instance:

Expand Down
2 changes: 1 addition & 1 deletion floatcsep/infrastructure/environments.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ def run_command(self, command) -> None:
cmd = [
"bash",
"-c",
f"{self.package_manager} run --live-stream -n {self.env_name} {command}",
f"conda run --live-stream -n {self.env_name} {command}",
]

process = subprocess.Popen(
Expand Down
82 changes: 57 additions & 25 deletions floatcsep/infrastructure/registries.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import logging
import os
import tempfile
from abc import ABC, abstractmethod
from datetime import datetime
from os.path import join, abspath, relpath, normpath, dirname, exists
from pathlib import Path
from typing import Sequence, Union, TYPE_CHECKING, Any, Optional
from typing import Sequence, Union, TYPE_CHECKING, Any, Optional, Literal

from floatcsep.utils.helpers import timewindow2str

Expand Down Expand Up @@ -269,8 +270,9 @@ def build_tree(
time_windows: Sequence[Sequence[datetime]] = None,
model_class: str = "TimeIndependentModel",
prefix: str = None,
run_mode: str = "sequential",
run_dir: Optional[str] = None,
run_mode: str = Literal["serial", "parallel"],
stage_dir: str = "results",
run_id: Optional[str] = "run",
) -> None:
"""
Creates the run directory, and reads the file structure inside.
Expand All @@ -279,11 +281,13 @@ def build_tree(
time_windows (list(str)): List of time windows or strings.
model_class (str): Model's class name
prefix (str): prefix of the model forecast filenames if TD
run_mode (str): if run mode is sequential, input data (args and cat) will be
run_mode (str): if run mode is serial, input data (args and cat) will be
dynamically overwritten in 'model/input/` through time_windows. If 'parallel',
input data is dynamically writing anew in
'results/{time_window}/input/{model_name}/'.
run_dir (str): Where experiment's results are stored.
stage_dir (str): Whether input data is stored persistently in the run_dir or
just in tmp cache (Only for parallel execution).
run_id (str): Job ID of the run for parallel execution and tmp storing of input data
"""

windows = timewindow2str(time_windows)
Expand All @@ -293,33 +297,61 @@ def build_tree(

elif model_class == "TimeDependentModel":

# grab names for creating model directories
subfolders = ["input", "forecasts"]
dirtree = {folder: self.abs(self.path, folder) for folder in subfolders}
for _, folder_ in dirtree.items():
model_dirtree = {folder: self.abs(self.path, folder) for folder in subfolders}
for _, folder_ in model_dirtree.items():
os.makedirs(folder_, exist_ok=True)

if run_mode == "sequential":
self.input_args = {
win: Path(self.path, "input", self.args_file) for win in windows
}
self.input_cats = {
win: Path(self.path, "input", self.input_cat) for win in windows
}
elif run_mode == "parallel":
self.input_args = {
win: Path(run_dir, win, "input", self.model_name, self.args_file)
for win in windows
}
self.input_cats = {
win: Path(run_dir, win, "input", self.model_name, self.input_cat)
for win in windows
}
# Decide the base dir for *per-window* inputs (args + catalog)
# - serial mode: under the model folder {model_name}/input
# - parallel + run_dir: <run_dir>/<win>/input/<model_name>/
# - parallel + tmp: <tmp_root>/floatcsep/<run_id>/<win>/input/<model_name>/
def _window_input_dir(win: str) -> Path:
if run_mode == "parallel":
if stage_dir == "tmp":
base_tmp = Path(tempfile.gettempdir())
return base_tmp / "floatcsep" / run_id / win / "input" / self.model_name
else:
return Path(stage_dir, win, "input", self.model_name)
else:
return model_dirtree["input"]

# Build input/output maps
if not prefix:
prefix = self.model_name

for win in windows:
input_dir = _window_input_dir(win)
os.makedirs(input_dir, exist_ok=True)

self.input_args[win] = Path(input_dir, self.args_file)
self.input_cats[win] = Path(input_dir, self.input_cat)

self.forecasts = {
win: Path(dirtree["forecasts"], f"{prefix}_{win}.{self.fmt}") for win in windows
win: Path(model_dirtree["forecasts"], f"{prefix}_{win}.{self.fmt}")
for win in windows
}

def get_input_dir(self, tstring: str) -> Path:
"""
Returns the directory that contains the per-window input files (args/catalog).
"""

if tstring in self.input_args:
return self.abs(self.input_args[tstring]).parent
elif tstring in self.input_cats:
return self.abs(self.input_cats[tstring]).parent
raise KeyError(f"No input directory has been built for window '{tstring}'")

def get_args_template_path(self) -> Path:
"""
Path to the model’s canonical args template: <model.path>/input/<args_file>.
Exists regardless of staging mode. This file should come with the source model
"""
if not self.args_file:
raise ValueError("args_file is not set on the registry.")
return self.abs(self.path, "input", Path(self.args_file).name)

def as_dict(self) -> dict:
"""

Expand Down
142 changes: 78 additions & 64 deletions floatcsep/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
from abc import ABC, abstractmethod
from datetime import datetime
from pathlib import Path
from typing import List, Callable, Union, Sequence

import yaml
Expand Down Expand Up @@ -284,9 +285,11 @@ def __init__(
self.build, self.name, self.registry.path.as_posix()
)

def stage(self, time_windows=None, run_mode="sequential", run_dir="") -> None:
def stage(
self, time_windows=None, run_mode="sequential", stage_dir="results", run_id="run"
) -> None:
"""
Core method to interface a model with the experiment.
Retrieve model artifacts and Set up its interface with the experiment.

1) Get the model from filesystem, Zenodo or Git. Prepares the directory
2) If source code, creates the computational environment (conda, venv or Docker)
Expand All @@ -306,7 +309,8 @@ def stage(self, time_windows=None, run_mode="sequential", run_dir="") -> None:
model_class=self.__class__.__name__,
prefix=self.__dict__.get("prefix", self.name),
run_mode=run_mode,
run_dir=run_dir,
stage_dir=stage_dir,
run_id=run_id,
)

def get_source(self, zenodo_id: int = None, giturl: str = None, **kwargs) -> None:
Expand Down Expand Up @@ -406,73 +410,83 @@ def prepare_args(self, start: datetime, end: datetime, **kwargs) -> None:
"""
window_str = timewindow2str([start, end])

filepath = self.registry.get_args_key(window_str)
fmt = os.path.splitext(filepath)[1]

if fmt == ".txt":

def replace_arg(arg, val, fp):
with open(fp, "r") as filearg_:
lines = filearg_.readlines()

pattern_exists = False
for k, line in enumerate(lines):
if line.startswith(arg):
lines[k] = f"{arg} = {val}\n"
pattern_exists = True
break # assume there's only one occurrence of the key
if not pattern_exists:
lines.append(f"{arg} = {val}\n")
with open(fp, "w") as file:
file.writelines(lines)

replace_arg("start_date", start.isoformat(), filepath)
replace_arg("end_date", end.isoformat(), filepath)
for i, j in kwargs.items():
replace_arg(i, j, filepath)

elif fmt == ".json":
with open(filepath, "r") as file_:
args = json.load(file_)
args["start_date"] = start.isoformat()
args["end_date"] = end.isoformat()

args.update(kwargs)

with open(filepath, "w") as file_:
json.dump(args, file_, indent=2)

elif fmt == ".yml" or fmt == ".yaml":

def nested_update(dest: dict, src: dict, max_depth: int = 3, _level: int = 1):
"""
Recursively update dest with values from src down to max_depth levels.
- If dest[k] and src[k] are both dicts, recurse (until max_depth).
- Otherwise overwrite dest[k] with src[k].
"""
for key, val in src.items():
dest_path = Path(self.registry.get_args_key(window_str))
tpl_path = self.registry.get_args_template_path()
suffix = tpl_path.suffix.lower()

if suffix == ".txt":

def load_kv(fp: Path) -> dict:
data = {}
if fp.exists():
with open(fp, "r") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" in line:
k, v = line.split("=", 1)
data[k.strip()] = v.strip()
return data

def dump_kv(fp: Path, data: dict) -> None:
ordered_keys = []
for k in ("start_date", "end_date"):
if k in data:
ordered_keys.append(k)
ordered_keys += sorted(
k for k in data.keys() if k not in ("start_date", "end_date")
)

with open(fp, "w") as f:
for k in ordered_keys:
f.write(f"{k} = {data[k]}\n")

data = load_kv(tpl_path)
data["start_date"] = start.isoformat()
data["end_date"] = end.isoformat()
for k, v in (kwargs or {}).items():
data[k] = v
dump_kv(dest_path, data)

elif suffix == ".json":
base = {}
if tpl_path.exists():
with open(tpl_path, "r") as f:
base = json.load(f) or {}
base["start_date"] = start.isoformat()
base["end_date"] = end.isoformat()
base.update(kwargs or {})

with open(dest_path, "w") as f:
json.dump(base, f, indent=2)

elif suffix in (".yml", ".yaml"):
if tpl_path.exists():
with open(tpl_path, "r") as f:
data = yaml.safe_load(f) or {}
else:
data = {}

data["start_date"] = start.isoformat()
data["end_date"] = end.isoformat()

def nested_update(dest: dict, src: dict, max_depth: int = 3, _lvl: int = 1):
for key, val in (src or {}).items():
if (
_level < max_depth
_lvl < max_depth
and key in dest
and isinstance(dest[key], dict)
and isinstance(val, dict)
):
nested_update(dest[key], val, max_depth, _level + 1)
nested_update(dest[key], val, max_depth, _lvl + 1)
else:
dest[key] = val

if not os.path.exists(filepath):
template_file = os.path.join(
self.registry.path, "input", self.registry.args_file
)
else:
template_file = filepath
nested_update(data, self.func_kwargs or {})
nested_update(data, kwargs or {})
with open(dest_path, "w") as f:
yaml.safe_dump(data, f, indent=2)

with open(template_file, "r") as file_:
args = yaml.safe_load(file_)
args["start_date"] = start.isoformat()
args["end_date"] = end.isoformat()

nested_update(args, self.func_kwargs)
with open(filepath, "w") as file_:
yaml.safe_dump(args, file_, indent=2)
else:
raise ValueError(f"Unsupported args file format: {suffix}")
4 changes: 3 additions & 1 deletion floatcsep/postprocess/reporting.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ def generate_report(experiment, timewindow=-1):
)
for model in experiment.models:
try:
fig_path = experiment.registry.get_figure_key(timestr, f"{test.name}_{model.name}")
fig_path = experiment.registry.get_figure_key(
timestr, f"{test.name}_{model.name}"
)
width = test.plot_args[0].get("figsize", [4])[0] * 96
report.add_figure(
f"{test.name}: {model.name}",
Expand Down
Loading