Skip to content

Commit

Permalink
feat: allow detailed configuration of shared FS usage (snakemake#2528)
Browse files Browse the repository at this point in the history
### Description

<!--Add a description of your PR here-->

### QC
<!-- Make sure that you can tick the boxes below. -->

* [x] The PR contains a test case for the changes or the changes are
already covered by an existing test case.
* [x] The documentation (`docs/`) is updated to reflect the changes or
this is not necessary (e.g. if the change does neither modify the
language nor the behavior or functionalities of Snakemake).
  • Loading branch information
johanneskoester committed Nov 30, 2023
1 parent e6dfdd4 commit 0d34be9
Show file tree
Hide file tree
Showing 11 changed files with 175 additions and 250 deletions.
116 changes: 4 additions & 112 deletions docs/project_info/contributing.rst
Expand Up @@ -40,119 +40,11 @@ Implement Features
Look through the Github issues for features.
If you want to start working on an issue then please write short message on the issue tracker to prevent duplicate work.

Contributing a new cluster or cloud execution backend
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Execution backends are added by implementing a so-called ``Executor``.
All executors are located in `snakemake/executors/ <https://github.com/snakemake/snakemake/tree/main/snakemake/executors>`_.
In order to implement a new executor, you have to inherit from the class ``ClusterExecutor``.
Below you find a skeleton

.. code-block:: python
class SkeletonExecutor(ClusterExecutor):
def __init__(self, workflow, dag, cores,
jobname="snakejob.{name}.{jobid}.sh",
printreason=False,
quiet=False,
printshellcmds=False,
latency_wait=3,
local_input=None,
restart_times=None,
exec_job=None,
assume_shared_fs=True,
max_status_checks_per_second=1):
super().__init__(workflow, dag, None,
jobname=jobname,
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
local_input=local_input,
restart_times=restart_times,
assume_shared_fs=False, # if your executor relies on a shared file system, set this to True
max_status_checks_per_second=max_status_checks_per_second) # set this to a reasonable default
# add additional attributes
def shutdown(self):
# perform additional steps on shutdown if necessary
super().shutdown()
def cancel(self):
for job in self.active_jobs:
# cancel active jobs here
pass
self.shutdown()
def run_jobs(self, jobs, callback=None, submit_callback=None, error_callback=None):
"""Run a list of jobs that is ready at a given point in time.
By default, this method just runs each job individually.
This behavior is inherited and therefore this method can be removed from the skeleton if the
default behavior is intended.
This method can be overwritten to submit many jobs in a more efficient way than one-by-one.
Note that in any case, for each job, the callback functions have to be called individually!
"""
for job in jobs:
self.run(
job,
callback=callback,
submit_callback=submit_callback,
error_callback=error_callback,
)
def run(self, job,
callback=None,
submit_callback=None,
error_callback=None):
"""Run an individual job or a job group.
"""
# Necessary: perform additional executor independent steps before running the job
super()._run(job)
# obtain job execution command
exec_job = self.format_job(
self.exec_job, job, _quote_all=True,
use_threads="--force-use-threads" if not job.is_group() else "")
# submit job here, and obtain job ids from the backend
# register job as active, using your own namedtuple.
# The namedtuple must at least contain the attributes
# job, jobid, callback, error_callback.
self.active_jobs.append(MyJob(
job, jobid, callback, error_callback))
async def _wait_for_jobs(self):
from snakemake.executors import sleep
# busy wait on job completion
# This is only needed if your backend does not allow to use callbacks
# for obtaining job status.
while True:
# always use self.lock to avoid race conditions
async with async_lock(self.lock):
if not self.wait:
return
active_jobs = self.active_jobs
self.active_jobs = list()
still_running = list()
for j in active_jobs:
# use self.status_rate_limiter to avoid too many API calls.
async with self.status_rate_limiter:
# Retrieve status of job j from your backend via j.jobid
# Handle completion and errors, calling either j.callback(j.job)
# or j.error_callback(j.job)
# In case of error, add job j to still_running.
pass
async with async_lock(self.lock):
self.active_jobs.extend(still_running)
await sleep()
Contributing a plugin
~~~~~~~~~~~~~~~~~~~~~

Currently, Snakemake supports executor plugins and storage plugins.
The `Snakemake plugin catalog <https://snakemake.github.io/snakemake-plugin-catalog>`_ shows which plugins are available and how to contribute new ones.

Write Documentation
===================
Expand Down
29 changes: 17 additions & 12 deletions snakemake/api.py
Expand Up @@ -36,7 +36,7 @@
RemoteExecutionSettings,
ResourceSettings,
StorageSettings,
DeploymentFSMode,
SharedFSUsage,
)

from snakemake_interface_executor_plugins.settings import ExecMode, ExecutorSettingsBase
Expand Down Expand Up @@ -134,13 +134,6 @@ def workflow(
if storage_provider_settings is None:
storage_provider_settings = dict()

if deployment_settings.fs_mode is None:
deployment_settings.fs_mode = (
DeploymentFSMode.SHARED
if storage_settings.assume_shared_fs
else DeploymentFSMode.NOT_SHARED
)

self._check_is_in_context()

self._setup_logger(mode=workflow_settings.exec_mode)
Expand Down Expand Up @@ -444,18 +437,30 @@ def execute_workflow(
executor_plugin.validate_settings(executor_settings)

if executor_plugin.common_settings.implies_no_shared_fs:
self.workflow_api.storage_settings.assume_shared_fs = False
# no shard FS at all
self.workflow_api.storage_settings.shared_fs_usage = frozenset()
if executor_plugin.common_settings.job_deploy_sources:
remote_execution_settings.job_deploy_sources = True

if (
self.workflow_api.workflow_settings.exec_mode == ExecMode.DEFAULT
and not self.workflow_api.storage_settings.assume_shared_fs
and SharedFSUsage.INPUT_OUTPUT
not in self.workflow_api.storage_settings.shared_fs_usage
and not self.workflow_api.storage_settings.default_storage_provider
):
raise ApiError(
"If no shared filesystem is assumed, a default storage provider "
"has to be set."
"If no shared filesystem is assumed for input and output files, a "
"default storage provider has to be set."
)
if (
executor_plugin.common_settings.local_exec
and not executor_plugin.common_settings.dryrun_exec
and self.workflow_api.workflow_settings.exec_mode == ExecMode.DEFAULT
and self.workflow_api.storage_settings.shared_fs_usage
!= SharedFSUsage.all()
):
raise ApiError(
"For local execution, --shared-fs-usage has to be unrestricted."
)

self.snakemake_api._setup_logger(
Expand Down
38 changes: 16 additions & 22 deletions snakemake/cli.py
Expand Up @@ -38,7 +38,7 @@
StorageSettings,
WorkflowSettings,
GroupSettings,
DeploymentFSMode,
SharedFSUsage,
)

from snakemake_interface_executor_plugins.settings import ExecMode
Expand Down Expand Up @@ -1357,19 +1357,20 @@ def get_argument_parser(profiles=None):
"freely chosen, e.g. in order to store those files on a local scratch disk.",
)
group_behavior.add_argument(
"--no-shared-fs",
action="store_true",
help="Do not assume that jobs share a common file "
"system. When this flag is activated, Snakemake will "
"assume that the filesystem on a cluster node is not "
"shared with other nodes. For example, this will lead "
"to downloading remote files on each cluster node "
"separately. Further, it won't take special measures "
"to deal with filesystem latency issues. This option "
"will in most cases only make sense in combination with "
"--default-storage-provider. "
"Only activate this if you "
"know what you are doing.",
"--shared-fs-usage",
nargs="*",
default=SharedFSUsage.all(),
choices=SharedFSUsage.choices(),
parse_func=SharedFSUsage.parse_choices_set,
help="Set assumptions on shared filesystem for non-local "
"workflow execution. Usually, the executor plugin sets this to a correct "
"default. However, sometimes it is worth tuning this setting, e.g. for "
"optimizing cluster performance. For example, when using "
"'--default-storage-provider fs' together with a cluster executor like "
"slurm, you might want to set '--shared-fs-usage persistence deployment', "
"such that software deployment and data provenance will be handled by NFS "
"but input and output files will be handled exclusively by the storage "
"provider.",
)
group_behavior.add_argument(
"--scheduler-greediness",
Expand Down Expand Up @@ -1499,12 +1500,6 @@ def get_argument_parser(profiles=None):
default=set(),
help="Specify software environment deployment method.",
)
group_deployment.add_argument(
"--software-deployment-fs-mode",
choices=DeploymentFSMode.choices(),
parse_func=DeploymentFSMode.parse_choice,
help="Whether or not to assume a shared filesystem for software deployment.",
)
group_deployment.add_argument(
"--container-cleanup-images",
action="store_true",
Expand Down Expand Up @@ -1827,7 +1822,7 @@ def args_to_api(args, parser):
default_storage_provider=args.default_storage_provider,
default_storage_prefix=args.default_storage_prefix,
local_storage_prefix=args.local_storage_prefix,
assume_shared_fs=not args.no_shared_fs,
shared_fs_usage=args.shared_fs_usage,
keep_storage_local=args.keep_storage_local_copies,
notemp=args.notemp,
all_temp=args.all_temp,
Expand Down Expand Up @@ -1869,7 +1864,6 @@ def args_to_api(args, parser):
),
deployment_settings=DeploymentSettings(
deployment_method=deployment_method,
fs_mode=args.software_deployment_fs_mode,
conda_prefix=args.conda_prefix,
conda_cleanup_pkgs=args.conda_cleanup_pkgs,
conda_base_path=args.conda_base_path,
Expand Down
4 changes: 3 additions & 1 deletion snakemake/common/tests/__init__.py
Expand Up @@ -114,7 +114,9 @@ def run_workflow(self, test_name, tmp_path, deployment_method=frozenset()):
storage_settings=settings.StorageSettings(
default_storage_provider=self.get_default_storage_provider(),
default_storage_prefix=self.get_default_storage_prefix(),
assume_shared_fs=self.get_assume_shared_fs(),
shared_fs_usage=settings.SharedFSUsage.all()
if self.get_assume_shared_fs()
else frozenset(),
),
deployment_settings=self.get_deployment_settings(deployment_method),
storage_provider_settings=self.get_default_storage_provider_settings(),
Expand Down
55 changes: 39 additions & 16 deletions snakemake/dag.py
Expand Up @@ -63,6 +63,7 @@
JobFactory,
Reason,
)
from snakemake.settings import SharedFSUsage
from snakemake.logging import logger
from snakemake.output_index import OutputIndex
from snakemake.sourcecache import LocalSourceFile, SourceFile
Expand Down Expand Up @@ -91,7 +92,6 @@ def __init__(
omitrules=None,
ignore_incomplete=False,
):
self.is_main_process = workflow.exec_mode == ExecMode.DEFAULT
self.dependencies = defaultdict(partial(defaultdict, set))
self.depending = defaultdict(partial(defaultdict, set))
self._needrun = set()
Expand Down Expand Up @@ -298,9 +298,13 @@ def update_conda_envs(self):
if job.conda_env_spec
and (
job.is_local
or self.workflow.deployment_settings.assume_shared_fs
or self.workflow.remote_exec
and not self.workflow.deployment_settings.assume_shared_fs
or SharedFSUsage.SOFTWARE_DEPLOYMENT
in self.workflow.storage_settings.shared_fs_usage
or (
self.workflow.remote_exec
and SharedFSUsage.SOFTWARE_DEPLOYMENT
not in self.workflow.storage_settings.shared_fs_usage
)
)
}

Expand All @@ -325,17 +329,23 @@ def update_conda_envs(self):
self.conda_envs[key] = env

async def retrieve_storage_inputs(self):
if self.is_main_process or self.workflow.remote_exec_no_shared_fs:
shared_local_copies = (
SharedFSUsage.STORAGE_LOCAL_COPIES
in self.workflow.storage_settings.shared_fs_usage
)
if (self.workflow.is_main_process and shared_local_copies) or (
self.workflow.remote_exec and not shared_local_copies
):
async with asyncio.TaskGroup() as tg:
for job in self.jobs:
for job in self.needrun_jobs():
for f in job.input:
if f.is_storage and self.is_external_input(f, job):
tg.create_task(f.retrieve_from_storage())

async def store_storage_outputs(self):
if self.workflow.remote_exec_no_shared_fs:
if self.workflow.remote_exec:
async with asyncio.TaskGroup() as tg:
for job in self.jobs:
for job in self.needrun_jobs(exclude_finished=False):
for f in job.output:
if (
f.is_storage
Expand All @@ -345,9 +355,15 @@ async def store_storage_outputs(self):
tg.create_task(f.store_in_storage())

def cleanup_storage_objects(self):
if self.is_main_process or self.workflow.remote_exec_no_shared_fs:
cleaned = set()
for job in self.jobs:
shared_local_copies = (
SharedFSUsage.STORAGE_LOCAL_COPIES
in self.workflow.storage_settings.shared_fs_usage
)
cleaned = set()
for job in self.jobs:
if (
self.workflow.is_main_process and (job.is_local or shared_local_copies)
) or (self.workflow.remote_exec and not shared_local_copies):
for f in chain(job.input, job.output):
if (
f.is_storage
Expand Down Expand Up @@ -736,8 +752,9 @@ async def handle_log(self, job):

async def handle_storage(self, job, store_in_storage=True):
"""Remove local files if they are no longer needed and upload."""
mode = self.workflow.exec_mode
if store_in_storage and (mode == ExecMode.REMOTE or mode == ExecMode.DEFAULT):
if store_in_storage and (
self.workflow.remote_exec or self.workflow.is_main_process
):
# handle output files
files = job.output
if job.benchmark:
Expand Down Expand Up @@ -949,7 +966,7 @@ async def update_(

if not res.jobs:
# no producing job found
if self.is_main_process and not await res.file.exists():
if self.workflow.is_main_process and not await res.file.exists():
# file not found, hence missing input
missing_input.add(res.file)
known_producers[res.file] = None
Expand Down Expand Up @@ -1009,7 +1026,7 @@ async def update_(
async def update_needrun(self, create_inventory=False):
"""Update the information whether a job needs to be executed."""

if create_inventory and self.is_main_process:
if create_inventory and self.workflow.is_main_process:
# Concurrently collect mtimes of all existing files.
await self.workflow.iocache.mtime_inventory(self.jobs)

Expand Down Expand Up @@ -1623,7 +1640,13 @@ async def update_checkpoint_dependencies(self, jobs=None):
updated = True
if updated:
await self.postprocess()
if self.workflow.global_or_node_local_shared_fs:
shared_input_output = (
SharedFSUsage.INPUT_OUTPUT
in self.workflow.storage_settings.shared_fs_usage
)
if (
self.workflow.is_main_process and shared_input_output
) or self.workflow.remote_exec:
await self.retrieve_storage_inputs()
return updated

Expand Down

0 comments on commit 0d34be9

Please sign in to comment.