From 0d34be90040f937021eb25becb4ef5d5aae66473 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20K=C3=B6ster?= Date: Thu, 30 Nov 2023 17:08:50 +0100 Subject: [PATCH] feat: allow detailed configuration of shared FS usage (#2528) ### Description ### QC * [x] The PR contains a test case for the changes or the changes are already covered by an existing test case. * [x] The documentation (`docs/`) is updated to reflect the changes or this is not necessary (e.g. if the change does neither modify the language nor the behavior or functionalities of Snakemake). --- docs/project_info/contributing.rst | 116 +---------------------------- snakemake/api.py | 29 +++++--- snakemake/cli.py | 38 ++++------ snakemake/common/tests/__init__.py | 4 +- snakemake/dag.py | 55 ++++++++++---- snakemake/jobs.py | 86 ++++++++++++--------- snakemake/scheduler.py | 2 + snakemake/settings.py | 16 +--- snakemake/spawn_jobs.py | 19 +++-- snakemake/workflow.py | 58 +++++++-------- tests/test_api.py | 2 +- 11 files changed, 175 insertions(+), 250 deletions(-) diff --git a/docs/project_info/contributing.rst b/docs/project_info/contributing.rst index 27c9ffe07..f73ba8614 100644 --- a/docs/project_info/contributing.rst +++ b/docs/project_info/contributing.rst @@ -40,119 +40,11 @@ Implement Features Look through the Github issues for features. If you want to start working on an issue then please write short message on the issue tracker to prevent duplicate work. -Contributing a new cluster or cloud execution backend -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -Execution backends are added by implementing a so-called ``Executor``. -All executors are located in `snakemake/executors/ `_. -In order to implement a new executor, you have to inherit from the class ``ClusterExecutor``. -Below you find a skeleton - -.. code-block:: python - - class SkeletonExecutor(ClusterExecutor): - def __init__(self, workflow, dag, cores, - jobname="snakejob.{name}.{jobid}.sh", - printreason=False, - quiet=False, - printshellcmds=False, - latency_wait=3, - local_input=None, - restart_times=None, - exec_job=None, - assume_shared_fs=True, - max_status_checks_per_second=1): - - super().__init__(workflow, dag, None, - jobname=jobname, - printreason=printreason, - quiet=quiet, - printshellcmds=printshellcmds, - latency_wait=latency_wait, - local_input=local_input, - restart_times=restart_times, - assume_shared_fs=False, # if your executor relies on a shared file system, set this to True - max_status_checks_per_second=max_status_checks_per_second) # set this to a reasonable default - - # add additional attributes - - def shutdown(self): - # perform additional steps on shutdown if necessary - super().shutdown() - - def cancel(self): - for job in self.active_jobs: - # cancel active jobs here - pass - self.shutdown() - - def run_jobs(self, jobs, callback=None, submit_callback=None, error_callback=None): - """Run a list of jobs that is ready at a given point in time. - - By default, this method just runs each job individually. - This behavior is inherited and therefore this method can be removed from the skeleton if the - default behavior is intended. - This method can be overwritten to submit many jobs in a more efficient way than one-by-one. - - Note that in any case, for each job, the callback functions have to be called individually! - """ - for job in jobs: - self.run( - job, - callback=callback, - submit_callback=submit_callback, - error_callback=error_callback, - ) - - def run(self, job, - callback=None, - submit_callback=None, - error_callback=None): - """Run an individual job or a job group. - """ - - # Necessary: perform additional executor independent steps before running the job - super()._run(job) - - # obtain job execution command - exec_job = self.format_job( - self.exec_job, job, _quote_all=True, - use_threads="--force-use-threads" if not job.is_group() else "") - - # submit job here, and obtain job ids from the backend - - # register job as active, using your own namedtuple. - # The namedtuple must at least contain the attributes - # job, jobid, callback, error_callback. - self.active_jobs.append(MyJob( - job, jobid, callback, error_callback)) - - async def _wait_for_jobs(self): - from snakemake.executors import sleep - # busy wait on job completion - # This is only needed if your backend does not allow to use callbacks - # for obtaining job status. - while True: - # always use self.lock to avoid race conditions - async with async_lock(self.lock): - if not self.wait: - return - active_jobs = self.active_jobs - self.active_jobs = list() - still_running = list() - for j in active_jobs: - # use self.status_rate_limiter to avoid too many API calls. - async with self.status_rate_limiter: - - # Retrieve status of job j from your backend via j.jobid - # Handle completion and errors, calling either j.callback(j.job) - # or j.error_callback(j.job) - # In case of error, add job j to still_running. - pass - async with async_lock(self.lock): - self.active_jobs.extend(still_running) - await sleep() +Contributing a plugin +~~~~~~~~~~~~~~~~~~~~~ +Currently, Snakemake supports executor plugins and storage plugins. +The `Snakemake plugin catalog `_ shows which plugins are available and how to contribute new ones. Write Documentation =================== diff --git a/snakemake/api.py b/snakemake/api.py index ee2548104..5c34d8260 100644 --- a/snakemake/api.py +++ b/snakemake/api.py @@ -36,7 +36,7 @@ RemoteExecutionSettings, ResourceSettings, StorageSettings, - DeploymentFSMode, + SharedFSUsage, ) from snakemake_interface_executor_plugins.settings import ExecMode, ExecutorSettingsBase @@ -134,13 +134,6 @@ def workflow( if storage_provider_settings is None: storage_provider_settings = dict() - if deployment_settings.fs_mode is None: - deployment_settings.fs_mode = ( - DeploymentFSMode.SHARED - if storage_settings.assume_shared_fs - else DeploymentFSMode.NOT_SHARED - ) - self._check_is_in_context() self._setup_logger(mode=workflow_settings.exec_mode) @@ -444,18 +437,30 @@ def execute_workflow( executor_plugin.validate_settings(executor_settings) if executor_plugin.common_settings.implies_no_shared_fs: - self.workflow_api.storage_settings.assume_shared_fs = False + # no shard FS at all + self.workflow_api.storage_settings.shared_fs_usage = frozenset() if executor_plugin.common_settings.job_deploy_sources: remote_execution_settings.job_deploy_sources = True if ( self.workflow_api.workflow_settings.exec_mode == ExecMode.DEFAULT - and not self.workflow_api.storage_settings.assume_shared_fs + and SharedFSUsage.INPUT_OUTPUT + not in self.workflow_api.storage_settings.shared_fs_usage and not self.workflow_api.storage_settings.default_storage_provider ): raise ApiError( - "If no shared filesystem is assumed, a default storage provider " - "has to be set." + "If no shared filesystem is assumed for input and output files, a " + "default storage provider has to be set." + ) + if ( + executor_plugin.common_settings.local_exec + and not executor_plugin.common_settings.dryrun_exec + and self.workflow_api.workflow_settings.exec_mode == ExecMode.DEFAULT + and self.workflow_api.storage_settings.shared_fs_usage + != SharedFSUsage.all() + ): + raise ApiError( + "For local execution, --shared-fs-usage has to be unrestricted." ) self.snakemake_api._setup_logger( diff --git a/snakemake/cli.py b/snakemake/cli.py index b883ffd3c..2d3d2fb8d 100644 --- a/snakemake/cli.py +++ b/snakemake/cli.py @@ -38,7 +38,7 @@ StorageSettings, WorkflowSettings, GroupSettings, - DeploymentFSMode, + SharedFSUsage, ) from snakemake_interface_executor_plugins.settings import ExecMode @@ -1357,19 +1357,20 @@ def get_argument_parser(profiles=None): "freely chosen, e.g. in order to store those files on a local scratch disk.", ) group_behavior.add_argument( - "--no-shared-fs", - action="store_true", - help="Do not assume that jobs share a common file " - "system. When this flag is activated, Snakemake will " - "assume that the filesystem on a cluster node is not " - "shared with other nodes. For example, this will lead " - "to downloading remote files on each cluster node " - "separately. Further, it won't take special measures " - "to deal with filesystem latency issues. This option " - "will in most cases only make sense in combination with " - "--default-storage-provider. " - "Only activate this if you " - "know what you are doing.", + "--shared-fs-usage", + nargs="*", + default=SharedFSUsage.all(), + choices=SharedFSUsage.choices(), + parse_func=SharedFSUsage.parse_choices_set, + help="Set assumptions on shared filesystem for non-local " + "workflow execution. Usually, the executor plugin sets this to a correct " + "default. However, sometimes it is worth tuning this setting, e.g. for " + "optimizing cluster performance. For example, when using " + "'--default-storage-provider fs' together with a cluster executor like " + "slurm, you might want to set '--shared-fs-usage persistence deployment', " + "such that software deployment and data provenance will be handled by NFS " + "but input and output files will be handled exclusively by the storage " + "provider.", ) group_behavior.add_argument( "--scheduler-greediness", @@ -1499,12 +1500,6 @@ def get_argument_parser(profiles=None): default=set(), help="Specify software environment deployment method.", ) - group_deployment.add_argument( - "--software-deployment-fs-mode", - choices=DeploymentFSMode.choices(), - parse_func=DeploymentFSMode.parse_choice, - help="Whether or not to assume a shared filesystem for software deployment.", - ) group_deployment.add_argument( "--container-cleanup-images", action="store_true", @@ -1827,7 +1822,7 @@ def args_to_api(args, parser): default_storage_provider=args.default_storage_provider, default_storage_prefix=args.default_storage_prefix, local_storage_prefix=args.local_storage_prefix, - assume_shared_fs=not args.no_shared_fs, + shared_fs_usage=args.shared_fs_usage, keep_storage_local=args.keep_storage_local_copies, notemp=args.notemp, all_temp=args.all_temp, @@ -1869,7 +1864,6 @@ def args_to_api(args, parser): ), deployment_settings=DeploymentSettings( deployment_method=deployment_method, - fs_mode=args.software_deployment_fs_mode, conda_prefix=args.conda_prefix, conda_cleanup_pkgs=args.conda_cleanup_pkgs, conda_base_path=args.conda_base_path, diff --git a/snakemake/common/tests/__init__.py b/snakemake/common/tests/__init__.py index 65318e3cc..a7291f233 100644 --- a/snakemake/common/tests/__init__.py +++ b/snakemake/common/tests/__init__.py @@ -114,7 +114,9 @@ def run_workflow(self, test_name, tmp_path, deployment_method=frozenset()): storage_settings=settings.StorageSettings( default_storage_provider=self.get_default_storage_provider(), default_storage_prefix=self.get_default_storage_prefix(), - assume_shared_fs=self.get_assume_shared_fs(), + shared_fs_usage=settings.SharedFSUsage.all() + if self.get_assume_shared_fs() + else frozenset(), ), deployment_settings=self.get_deployment_settings(deployment_method), storage_provider_settings=self.get_default_storage_provider_settings(), diff --git a/snakemake/dag.py b/snakemake/dag.py index abfb52e84..d535acfb2 100755 --- a/snakemake/dag.py +++ b/snakemake/dag.py @@ -63,6 +63,7 @@ JobFactory, Reason, ) +from snakemake.settings import SharedFSUsage from snakemake.logging import logger from snakemake.output_index import OutputIndex from snakemake.sourcecache import LocalSourceFile, SourceFile @@ -91,7 +92,6 @@ def __init__( omitrules=None, ignore_incomplete=False, ): - self.is_main_process = workflow.exec_mode == ExecMode.DEFAULT self.dependencies = defaultdict(partial(defaultdict, set)) self.depending = defaultdict(partial(defaultdict, set)) self._needrun = set() @@ -298,9 +298,13 @@ def update_conda_envs(self): if job.conda_env_spec and ( job.is_local - or self.workflow.deployment_settings.assume_shared_fs - or self.workflow.remote_exec - and not self.workflow.deployment_settings.assume_shared_fs + or SharedFSUsage.SOFTWARE_DEPLOYMENT + in self.workflow.storage_settings.shared_fs_usage + or ( + self.workflow.remote_exec + and SharedFSUsage.SOFTWARE_DEPLOYMENT + not in self.workflow.storage_settings.shared_fs_usage + ) ) } @@ -325,17 +329,23 @@ def update_conda_envs(self): self.conda_envs[key] = env async def retrieve_storage_inputs(self): - if self.is_main_process or self.workflow.remote_exec_no_shared_fs: + shared_local_copies = ( + SharedFSUsage.STORAGE_LOCAL_COPIES + in self.workflow.storage_settings.shared_fs_usage + ) + if (self.workflow.is_main_process and shared_local_copies) or ( + self.workflow.remote_exec and not shared_local_copies + ): async with asyncio.TaskGroup() as tg: - for job in self.jobs: + for job in self.needrun_jobs(): for f in job.input: if f.is_storage and self.is_external_input(f, job): tg.create_task(f.retrieve_from_storage()) async def store_storage_outputs(self): - if self.workflow.remote_exec_no_shared_fs: + if self.workflow.remote_exec: async with asyncio.TaskGroup() as tg: - for job in self.jobs: + for job in self.needrun_jobs(exclude_finished=False): for f in job.output: if ( f.is_storage @@ -345,9 +355,15 @@ async def store_storage_outputs(self): tg.create_task(f.store_in_storage()) def cleanup_storage_objects(self): - if self.is_main_process or self.workflow.remote_exec_no_shared_fs: - cleaned = set() - for job in self.jobs: + shared_local_copies = ( + SharedFSUsage.STORAGE_LOCAL_COPIES + in self.workflow.storage_settings.shared_fs_usage + ) + cleaned = set() + for job in self.jobs: + if ( + self.workflow.is_main_process and (job.is_local or shared_local_copies) + ) or (self.workflow.remote_exec and not shared_local_copies): for f in chain(job.input, job.output): if ( f.is_storage @@ -736,8 +752,9 @@ async def handle_log(self, job): async def handle_storage(self, job, store_in_storage=True): """Remove local files if they are no longer needed and upload.""" - mode = self.workflow.exec_mode - if store_in_storage and (mode == ExecMode.REMOTE or mode == ExecMode.DEFAULT): + if store_in_storage and ( + self.workflow.remote_exec or self.workflow.is_main_process + ): # handle output files files = job.output if job.benchmark: @@ -949,7 +966,7 @@ async def update_( if not res.jobs: # no producing job found - if self.is_main_process and not await res.file.exists(): + if self.workflow.is_main_process and not await res.file.exists(): # file not found, hence missing input missing_input.add(res.file) known_producers[res.file] = None @@ -1009,7 +1026,7 @@ async def update_( async def update_needrun(self, create_inventory=False): """Update the information whether a job needs to be executed.""" - if create_inventory and self.is_main_process: + if create_inventory and self.workflow.is_main_process: # Concurrently collect mtimes of all existing files. await self.workflow.iocache.mtime_inventory(self.jobs) @@ -1623,7 +1640,13 @@ async def update_checkpoint_dependencies(self, jobs=None): updated = True if updated: await self.postprocess() - if self.workflow.global_or_node_local_shared_fs: + shared_input_output = ( + SharedFSUsage.INPUT_OUTPUT + in self.workflow.storage_settings.shared_fs_usage + ) + if ( + self.workflow.is_main_process and shared_input_output + ) or self.workflow.remote_exec: await self.retrieve_storage_inputs() return updated diff --git a/snakemake/jobs.py b/snakemake/jobs.py index e64678b15..43fea7553 100644 --- a/snakemake/jobs.py +++ b/snakemake/jobs.py @@ -23,6 +23,7 @@ GroupJobExecutorInterface, SingleJobExecutorInterface, ) +from snakemake_interface_executor_plugins.settings import ExecMode from snakemake.io import ( _IOFile, @@ -34,6 +35,7 @@ get_flag_value, wait_for_files, ) +from snakemake.settings import SharedFSUsage from snakemake.resources import GroupResources from snakemake.target_jobs import TargetSpec from snakemake.utils import format @@ -48,22 +50,25 @@ from snakemake.common.tbdstring import TBDString +def format_file(f, is_input: bool): + if is_flagged(f, "pipe"): + return f"{f} (pipe)" + elif is_flagged(f, "service"): + return f"{f} (service)" + elif is_flagged(f, "checkpoint_target"): + return TBDString() + elif is_flagged(f, "sourcecache_entry"): + orig_path_or_uri = get_flag_value(f, "sourcecache_entry") + return f"{orig_path_or_uri} (cached)" + elif f.is_storage: + phrase = "retrieve from" if is_input else "send to" + return f"{f.storage_object.query} ({phrase} storage)" + else: + return f + + def format_files(io, is_input: bool): - for f in io: - if is_flagged(f, "pipe"): - yield f"{f} (pipe)" - elif is_flagged(f, "service"): - yield f"{f} (service)" - elif is_flagged(f, "checkpoint_target"): - yield TBDString() - elif is_flagged(f, "sourcecache_entry"): - orig_path_or_uri = get_flag_value(f, "sourcecache_entry") - yield f"{orig_path_or_uri} (cached)" - elif f.is_storage: - phrase = "retrieve from" if is_input else "send to" - yield f"{f.storage_object.query} ({phrase} storage)" - else: - yield f + return [format_file(f, is_input=is_input) for f in io] def jobfiles(jobs, type): @@ -917,15 +922,21 @@ def is_group(self): def log_info(self, indent=False, printshellcmd=True): priority = self.priority + + benchmark = ( + format_file(self.benchmark, is_input=False) + if self.benchmark is not None + else None + ) logger.job_info( jobid=self.dag.jobid(self), msg=self.message, name=self.rule.name, local=self.dag.workflow.is_local(self.rule), - input=list(format_files(self.input, is_input=True)), - output=list(format_files(self.output, is_input=False)), - log=list(self.log), - benchmark=self.benchmark, + input=format_files(self.input, is_input=True), + output=format_files(self.output, is_input=False), + log=format_files(self.log, is_input=False), + benchmark=benchmark, wildcards=self.wildcards_dict, reason=str(self.dag.reason(self)), resources=self.resources, @@ -948,9 +959,9 @@ def get_log_error_info( name=self.rule.name, msg=msg, jobid=self.dag.jobid(self), - input=list(format_files(self.input, is_input=True)), - output=list(format_files(self.output, is_input=False)), - log=list(self.log) + aux_logs, + input=format_files(self.input, is_input=True), + output=format_files(self.output, is_input=False), + log=format_files(self.log, is_input=False) + aux_logs, conda_env=self.conda_env.address if self.conda_env else None, aux=kwargs, indent=indent, @@ -1009,7 +1020,16 @@ async def postprocess( # No postprocessing necessary, we have just created the skeleton notebook and # execution will anyway stop afterwards. return - if self.dag.workflow.global_or_node_local_shared_fs: + + shared_input_output = ( + SharedFSUsage.INPUT_OUTPUT + in self.dag.workflow.storage_settings.shared_fs_usage + ) + if ( + self.dag.workflow.exec_mode == ExecMode.SUBPROCESS + or shared_input_output + or (self.dag.workflow.remote_exec and not shared_input_output) + ): if not error and handle_touch: self.dag.handle_touch(self) if handle_log: @@ -1591,14 +1611,8 @@ def get_names(self): yield "software environment definition has changed since last execution" def __str__(self): - def format_file(f): - if is_flagged(f, "sourcecache_entry"): - return f"{get_flag_value(f, 'sourcecache_entry')} (cached)" - else: - return f - - def format_files(files): - return ", ".join(map(format_file, files)) + def concat_files(files, is_input: bool): + return ", ".join(format_files(files, is_input=is_input)) s = list() if self.forced: @@ -1616,18 +1630,20 @@ def format_files(files): else: if self._missing_output: s.append( - f"Missing output files: {format_files(self.missing_output)}" + f"Missing output files: {concat_files(self.missing_output, is_input=False)}" ) if self._incomplete_output: s.append( - f"Incomplete output files: {format_files(self.incomplete_output)}" + f"Incomplete output files: {concat_files(self.incomplete_output, is_input=False)}" ) if self._updated_input: updated_input = self.updated_input - self.updated_input_run - s.append(f"Updated input files: {format_files(updated_input)}") + s.append( + f"Updated input files: {concat_files(updated_input, is_input=True)}" + ) if self._updated_input_run: s.append( - f"Input files updated by another job: {format_files(self.updated_input_run)}" + f"Input files updated by another job: {concat_files(self.updated_input_run, is_input=True)}" ) if self.pipe: s.append( diff --git a/snakemake/scheduler.py b/snakemake/scheduler.py index 7c5518a16..b79915e59 100644 --- a/snakemake/scheduler.py +++ b/snakemake/scheduler.py @@ -276,6 +276,8 @@ def schedule(self): # remove from ready_jobs self.workflow.dag.register_running(run) + logger.info(f"Execute {len(run)} jobs...") + # actually run jobs local_runjobs = [job for job in run if job.is_local] runjobs = [job for job in run if not job.is_local] diff --git a/snakemake/settings.py b/snakemake/settings.py index 585c3e66d..cc8d7267f 100644 --- a/snakemake/settings.py +++ b/snakemake/settings.py @@ -16,6 +16,7 @@ StorageSettingsExecutorInterface, DeploymentMethod, ExecMode, + SharedFSUsage, ) from snakemake_interface_common.settings import SettingsEnumBase @@ -180,7 +181,7 @@ def _check(self): class StorageSettings(SettingsBase, StorageSettingsExecutorInterface): default_storage_provider: Optional[str] = None default_storage_prefix: Optional[str] = None - assume_shared_fs: bool = True + shared_fs_usage: Set[SharedFSUsage] = SharedFSUsage.all() keep_storage_local: bool = False local_storage_prefix: Path = Path(".snakemake/storage") notemp: bool = False @@ -193,11 +194,6 @@ class CondaCleanupPkgs(SettingsEnumBase): CACHE = 1 -class DeploymentFSMode(SettingsEnumBase): - SHARED = 0 - NOT_SHARED = 1 - - @dataclass class DeploymentSettings(SettingsBase, DeploymentSettingsExecutorInterface): """ @@ -219,7 +215,6 @@ class DeploymentSettings(SettingsBase, DeploymentSettingsExecutorInterface): """ deployment_method: Set[DeploymentMethod] = frozenset() - fs_mode: Optional[DeploymentFSMode] = None conda_prefix: Optional[Path] = None conda_cleanup_pkgs: Optional[CondaCleanupPkgs] = None conda_base_path: Optional[Path] = None @@ -232,13 +227,6 @@ def imply_deployment_method(self, method: DeploymentMethod): self.deployment_method = set(self.deployment_method) self.deployment_method.add(method) - @property - def assume_shared_fs(self): - assert ( - self.fs_mode is not None - ), "bug: called DeploymentSettings.assume_shared_fs before fs_mode has been inferred from StorageSettings" - return True if self.fs_mode == DeploymentFSMode.SHARED else False - @dataclass class SchedulingSettings(SettingsBase): diff --git a/snakemake/spawn_jobs.py b/snakemake/spawn_jobs.py index 75ff6ab85..cf635b2ce 100644 --- a/snakemake/spawn_jobs.py +++ b/snakemake/spawn_jobs.py @@ -7,6 +7,7 @@ from snakemake_interface_storage_plugins.registry import StoragePluginRegistry from snakemake import common +from snakemake.settings import SharedFSUsage if TYPE_CHECKING: from snakemake.workflow import Workflow @@ -150,7 +151,7 @@ def precommand( ) if ( - not self.workflow.storage_settings.assume_shared_fs + SharedFSUsage.SOURCES not in self.workflow.storage_settings.shared_fs_usage and self.workflow.remote_execution_settings.job_deploy_sources ): archive = self.workflow.source_archive @@ -176,6 +177,11 @@ def general_args( """ w2a = self.workflow_property_to_arg + shared_deployment = ( + SharedFSUsage.SOFTWARE_DEPLOYMENT + in self.workflow.storage_settings.shared_fs_usage + ) + args = [ "--force", "--target-files-omit-workdir-adjustment", @@ -200,15 +206,12 @@ def general_args( w2a("deployment_settings.conda_prefix"), w2a( "conda_base_path", - skip=not self.workflow.deployment_settings.assume_shared_fs, + skip=not shared_deployment, ), w2a("deployment_settings.apptainer_prefix"), w2a("deployment_settings.apptainer_args"), w2a("resource_settings.max_threads"), - w2a( - "storage_settings.assume_shared_fs", flag="--no-shared-fs", invert=True - ), - w2a("deployment_settings.fs_mode", flag="--software-deployment-fs-mode"), + w2a("storage_settings.shared_fs_usage"), w2a( "execution_settings.keep_metadata", flag="--drop-metadata", invert=True ), @@ -224,12 +227,12 @@ def general_args( format_cli_arg( "--scheduler-solver-path", os.path.dirname(sys.executable), - skip=not self.workflow.deployment_settings.assume_shared_fs, + skip=not shared_deployment, ), w2a( "overwrite_workdir", flag="--directory", - skip=self.workflow.storage_settings.assume_shared_fs, + skip=self.workflow.storage_settings.assume_common_workdir, ), self.get_set_resources_args(), self.get_resource_scopes_args(), diff --git a/snakemake/workflow.py b/snakemake/workflow.py index 24a420c42..cde308af9 100644 --- a/snakemake/workflow.py +++ b/snakemake/workflow.py @@ -33,6 +33,7 @@ SchedulingSettings, StorageSettings, WorkflowSettings, + SharedFSUsage, ) from snakemake_interface_executor_plugins.workflow import WorkflowExecutorInterface @@ -218,13 +219,17 @@ def tear_down(self): self._workdir_handler.change_back() self._snakemake_tmp_dir.cleanup() + @property + def is_main_process(self): + return self.exec_mode == ExecMode.DEFAULT + @property def snakemake_tmp_dir(self) -> Path: return Path(self._snakemake_tmp_dir.name) @property def source_cache_path(self) -> Path: - if self.remote_exec_no_shared_fs: + if SharedFSUsage.SOURCE_CACHE not in self.storage_settings.shared_fs_usage: return self.snakemake_tmp_dir / "source-cache" else: return Path( @@ -375,14 +380,6 @@ def non_local_exec(self): def remote_exec(self): return self.exec_mode == ExecMode.REMOTE - @property - def global_or_node_local_shared_fs(self): - return self.storage_settings.assume_shared_fs or self.remote_exec_no_shared_fs - - @property - def remote_exec_no_shared_fs(self): - return self.remote_exec and not self.storage_settings.assume_shared_fs - @property def exec_mode(self): return self.workflow_settings.exec_mode @@ -614,8 +611,9 @@ def list_resources(self): logger.info(resource) def is_local(self, rule): - return rule.group is None and ( - rule.name in self._localrules or rule.norun or rule.is_template_engine + return self.local_exec or ( + rule.group is None + and (rule.name in self._localrules or rule.norun or rule.is_template_engine) ) def check_localrules(self): @@ -743,7 +741,7 @@ def files(items): persistence_path = ( self.snakemake_tmp_dir / "persistence" - if self.remote_exec_no_shared_fs + if SharedFSUsage.PERSISTENCE not in self.storage_settings.shared_fs_usage else None ) @@ -956,12 +954,6 @@ def conda_list_envs(self): lock_warn_only=False, ) self._build_dag() - - if ( - DeploymentMethod.APPTAINER in self.deployment_settings.deployment_method - and self.deployment_settings.assume_shared_fs - ): - self.dag.pull_container_imgs() self.dag.create_conda_envs( dryrun=True, quiet=True, @@ -985,10 +977,7 @@ def conda_create_envs(self): ) self._build_dag() - if ( - DeploymentMethod.APPTAINER in self.deployment_settings.deployment_method - and self.deployment_settings.assume_shared_fs - ): + if DeploymentMethod.APPTAINER in self.deployment_settings.deployment_method: self.dag.pull_container_imgs() self.dag.create_conda_envs() @@ -1083,9 +1072,12 @@ def execute( f for job in self.dag.needrun_jobs() for f in job.output ) - if self.deployment_settings.assume_shared_fs or ( - self.remote_exec and not self.deployment_settings.assume_shared_fs - ): + shared_deployment = ( + SharedFSUsage.SOFTWARE_DEPLOYMENT + in self.storage_settings.shared_fs_usage + ) + + if shared_deployment or (self.remote_exec and not shared_deployment): if ( DeploymentMethod.APPTAINER in self.deployment_settings.deployment_method @@ -1094,11 +1086,18 @@ def execute( if DeploymentMethod.CONDA in self.deployment_settings.deployment_method: self.dag.create_conda_envs() - if self.global_or_node_local_shared_fs: + shared_storage_local_copies = ( + SharedFSUsage.STORAGE_LOCAL_COPIES + in self.storage_settings.shared_fs_usage + ) + if not self.dryrun and ( + (self.exec_mode == ExecMode.DEFAULT and shared_storage_local_copies) + or (self.remote_exec and not shared_storage_local_copies) + ): async_run(self.dag.retrieve_storage_inputs()) if ( - not self.storage_settings.assume_shared_fs + SharedFSUsage.SOURCES not in self.storage_settings.shared_fs_usage and self.exec_mode == ExecMode.DEFAULT and self.remote_execution_settings.job_deploy_sources ): @@ -1216,8 +1215,9 @@ def log_provenance_info(): ): self.dag.cleanup_workdir() - async_run(self.dag.store_storage_outputs()) - self.dag.cleanup_storage_objects() + if not self.dryrun: + async_run(self.dag.store_storage_outputs()) + self.dag.cleanup_storage_objects() if success: if self.dryrun: diff --git a/tests/test_api.py b/tests/test_api.py index 8a9d62284..a0ffbc27a 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -22,7 +22,7 @@ def test_deploy_sources(s3_storage): storage_settings=settings.StorageSettings( default_storage_prefix=s3_prefix, default_storage_provider="s3", - assume_shared_fs=False, + shared_fs_usage=frozenset(), ), resource_settings=settings.ResourceSettings( cores=1,