From 22128a065315938b9bf0291798494f631ac5580e Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Sat, 27 Apr 2024 00:24:19 +0200 Subject: [PATCH] Make parsl.AUTO_LOGNAME generation pluggable (#3388) A generation function can now be specified at configuration time, with the previous behaviour invoked as default. --- docs/userguide/plugins.rst | 9 ++ parsl/config.py | 5 + parsl/dataflow/dflow.py | 53 +++++---- parsl/tests/test_bash_apps/test_std_uri.py | 128 +++++++++++++++++++++ 4 files changed, 173 insertions(+), 22 deletions(-) create mode 100644 parsl/tests/test_bash_apps/test_std_uri.py diff --git a/docs/userguide/plugins.rst b/docs/userguide/plugins.rst index e3d4b18186..7eb83bedba 100644 --- a/docs/userguide/plugins.rst +++ b/docs/userguide/plugins.rst @@ -49,6 +49,15 @@ be added in the workflow configuration, in the ``storage`` parameter of the relevant `ParslExecutor`. Each provider should subclass the `Staging` class. +Default stdout/stderr name generation +------------------------------------- +Parsl can choose names for your bash apps stdout and stderr streams +automatically, with the parsl.AUTO_LOGNAME parameter. The choice of path is +made by a function which can be configured with the ``std_autopath`` +parameter of Parsl `Config`. By default, ``DataFlowKernel.default_std_autopath`` +will be used. + + Memoization/checkpointing ------------------------- diff --git a/parsl/config.py b/parsl/config.py index 45cdde1c75..5682f7c535 100644 --- a/parsl/config.py +++ b/parsl/config.py @@ -51,6 +51,9 @@ class Config(RepresentationMixin, UsageInformation): of 1. run_dir : str, optional Path to run directory. Default is 'runinfo'. + std_autopath : function, optional + Sets the function used to generate stdout/stderr specifications when parsl.AUTO_LOGPATH is used. If no function + is specified, generates paths that look like: ``rundir/NNN/task_logs/X/task_{id}_{name}{label}.{out/err}`` strategy : str, optional Strategy to use for scaling blocks according to workflow needs. Can be 'simple', 'htex_auto_scale', 'none' or `None`. @@ -90,6 +93,7 @@ def __init__(self, retries: int = 0, retry_handler: Optional[Callable[[Exception, TaskRecord], float]] = None, run_dir: str = 'runinfo', + std_autopath: Optional[Callable] = None, strategy: Optional[str] = 'simple', strategy_period: Union[float, int] = 5, max_idletime: float = 120.0, @@ -130,6 +134,7 @@ def __init__(self, self.usage_tracking = usage_tracking self.initialize_logging = initialize_logging self.monitoring = monitoring + self.std_autopath: Optional[Callable] = std_autopath @property def executors(self) -> Sequence[ParslExecutor]: diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index b1dc54b684..93e7e9b4bf 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -995,32 +995,16 @@ def submit(self, executor = random.choice(choices) logger.debug("Task {} will be sent to executor {}".format(task_id, executor)) - # The below uses func.__name__ before it has been wrapped by any staging code. - - label = app_kwargs.get('label') - for kw in ['stdout', 'stderr']: - if kw in app_kwargs: - if app_kwargs[kw] == parsl.AUTO_LOGNAME: - if kw not in ignore_for_cache: - ignore_for_cache += [kw] - app_kwargs[kw] = os.path.join( - self.run_dir, - 'task_logs', - str(int(task_id / 10000)).zfill(4), # limit logs to 10k entries per directory - 'task_{}_{}{}.{}'.format( - str(task_id).zfill(4), - func.__name__, - '' if label is None else '_{}'.format(label), - kw) - ) - resource_specification = app_kwargs.get('parsl_resource_specification', {}) task_record: TaskRecord - task_record = {'depends': [], + task_record = {'args': app_args, + 'depends': [], 'dfk': self, 'executor': executor, + 'func': func, 'func_name': func.__name__, + 'kwargs': app_kwargs, 'memoize': cache, 'hashsum': None, 'exec_fu': None, @@ -1042,18 +1026,30 @@ def submit(self, self.update_task_state(task_record, States.unsched) + for kw in ['stdout', 'stderr']: + if kw in app_kwargs: + if app_kwargs[kw] == parsl.AUTO_LOGNAME: + if kw not in ignore_for_cache: + ignore_for_cache += [kw] + if self.config.std_autopath is None: + app_kwargs[kw] = self.default_std_autopath(task_record, kw) + else: + app_kwargs[kw] = self.config.std_autopath(task_record, kw) + app_fu = AppFuture(task_record) + task_record['app_fu'] = app_fu # Transform remote input files to data futures app_args, app_kwargs, func = self._add_input_deps(executor, app_args, app_kwargs, func) func = self._add_output_deps(executor, app_args, app_kwargs, app_fu, func) + # Replace the function invocation in the TaskRecord with whatever file-staging + # substitutions have been made. task_record.update({ 'args': app_args, 'func': func, - 'kwargs': app_kwargs, - 'app_fu': app_fu}) + 'kwargs': app_kwargs}) assert task_id not in self.tasks @@ -1441,6 +1437,19 @@ def log_std_stream(name: str, target) -> None: log_std_stream("Standard out", task_record['app_fu'].stdout) log_std_stream("Standard error", task_record['app_fu'].stderr) + def default_std_autopath(self, taskrecord, kw): + label = taskrecord['kwargs'].get('label') + task_id = taskrecord['id'] + return os.path.join( + self.run_dir, + 'task_logs', + str(int(task_id / 10000)).zfill(4), # limit logs to 10k entries per directory + 'task_{}_{}{}.{}'.format( + str(task_id).zfill(4), + taskrecord['func_name'], + '' if label is None else '_{}'.format(label), + kw)) + class DataFlowKernelLoader: """Manage which DataFlowKernel is active. diff --git a/parsl/tests/test_bash_apps/test_std_uri.py b/parsl/tests/test_bash_apps/test_std_uri.py new file mode 100644 index 0000000000..8acb9e4d34 --- /dev/null +++ b/parsl/tests/test_bash_apps/test_std_uri.py @@ -0,0 +1,128 @@ +import logging +import parsl +import pytest +import zipfile + +from functools import partial +from parsl.app.futures import DataFuture +from parsl.data_provider.files import File +from parsl.executors import ThreadPoolExecutor + + +@parsl.bash_app +def app_stdout(stdout=parsl.AUTO_LOGNAME): + return "echo hello" + + +def const_str(cpath, task_record, err_or_out): + return cpath + + +def const_with_cpath(autopath_specifier, content_path, caplog): + with parsl.load(parsl.Config(std_autopath=partial(const_str, autopath_specifier))): + fut = app_stdout() + + # we don't have to wait for a result to check this attributes + assert fut.stdout is autopath_specifier + + # there is no DataFuture to wait for in the str case: the model is that + # the stdout will be immediately available on task completion. + fut.result() + + with open(content_path, "r") as file: + assert file.readlines() == ["hello\n"] + + for record in caplog.records: + assert record.levelno < logging.ERROR + + parsl.clear() + + +@pytest.mark.local +def test_std_autopath_const_str(caplog, tmpd_cwd): + """Tests str and tuple mode autopaths with constant autopath, which should + all be passed through unmodified. + """ + cpath = str(tmpd_cwd / "CONST") + const_with_cpath(cpath, cpath, caplog) + + +@pytest.mark.local +def test_std_autopath_const_pathlike(caplog, tmpd_cwd): + cpath = tmpd_cwd / "CONST" + const_with_cpath(cpath, cpath, caplog) + + +@pytest.mark.local +def test_std_autopath_const_tuples(caplog, tmpd_cwd): + file = tmpd_cwd / "CONST" + cpath = (file, "w") + const_with_cpath(cpath, file, caplog) + + +class URIFailError(Exception): + pass + + +def fail_uri(task_record, err_or_out): + raise URIFailError("Deliberate failure in std stream filename generation") + + +@pytest.mark.local +def test_std_autopath_fail(caplog): + with parsl.load(parsl.Config(std_autopath=fail_uri)): + with pytest.raises(URIFailError): + app_stdout() + + parsl.clear() + + +@parsl.bash_app +def app_both(stdout=parsl.AUTO_LOGNAME, stderr=parsl.AUTO_LOGNAME): + return "echo hello; echo goodbye >&2" + + +def zip_uri(base, task_record, err_or_out): + """Should generate Files in base.zip like app_both.0.out or app_both.123.err""" + zip_path = base / "base.zip" + file = f"{task_record['func_name']}.{task_record['id']}.{task_record['try_id']}.{err_or_out}" + return File(f"zip:{zip_path}/{file}") + + +@pytest.mark.local +def test_std_autopath_zip(caplog, tmpd_cwd): + with parsl.load(parsl.Config(run_dir=str(tmpd_cwd), + executors=[ThreadPoolExecutor(working_dir=str(tmpd_cwd))], + std_autopath=partial(zip_uri, tmpd_cwd))): + futs = [] + + for _ in range(10): + fut = app_both() + + # assertions that should hold after submission + assert isinstance(fut.stdout, DataFuture) + assert fut.stdout.file_obj.url.startswith("zip") + + futs.append(fut) + + # Barrier for all the stageouts to complete so that we can + # poke at the zip file. + [(fut.stdout.result(), fut.stderr.result()) for fut in futs] + + with zipfile.ZipFile(tmpd_cwd / "base.zip") as z: + for fut in futs: + + assert fut.done(), "AppFuture should be done if stageout is done" + + stdout_relative_path = f"app_both.{fut.tid}.0.stdout" + with z.open(stdout_relative_path) as f: + assert f.readlines() == [b'hello\n'] + + stderr_relative_path = f"app_both.{fut.tid}.0.stderr" + with z.open(stderr_relative_path) as f: + assert f.readlines()[-1] == b'goodbye\n' + + for record in caplog.records: + assert record.levelno < logging.ERROR + + parsl.clear()