From 444d0d9870b4e774463c51534a061c95120ecfae Mon Sep 17 00:00:00 2001 From: Mike Lin Date: Thu, 3 Sep 2020 20:10:53 -1000 Subject: [PATCH] tidy internal data structures for File/Directory --- WDL/Value.py | 15 +++- WDL/runtime/task.py | 39 +++++---- WDL/runtime/task_container.py | 148 ++++++++++++++++++---------------- WDL/runtime/workflow.py | 8 +- tests/runner.t | 2 +- 5 files changed, 121 insertions(+), 91 deletions(-) diff --git a/WDL/Value.py b/WDL/Value.py index 92d1e6cc..7421548c 100644 --- a/WDL/Value.py +++ b/WDL/Value.py @@ -145,8 +145,11 @@ def coerce(self, desired_type: Optional[Type.Base] = None) -> Base: class String(Base): """``value`` has Python type ``str``""" - def __init__(self, value: str, expr: "Optional[Expr.Base]" = None) -> None: - super().__init__(Type.String(), value, expr) + def __init__( + self, value: str, expr: "Optional[Expr.Base]" = None, subtype: Optional[Type.Base] = None + ) -> None: + subtype = subtype or Type.String() + super().__init__(subtype, value, expr) def coerce(self, desired_type: Optional[Type.Base] = None) -> Base: "" @@ -169,6 +172,11 @@ def coerce(self, desired_type: Optional[Type.Base] = None) -> Base: class File(String): """``value`` has Python type ``str``""" + def __init__(self, value: str, expr: "Optional[Expr.Base]" = None) -> None: + super().__init__(value, expr=expr, subtype=Type.File()) + if value != value.rstrip("/"): + raise Error.InputError("WDL.Value.File invalid path: " + value) + def coerce(self, desired_type: Optional[Type.Base] = None) -> Base: "" if self.value is None: @@ -184,6 +192,9 @@ def coerce(self, desired_type: Optional[Type.Base] = None) -> Base: class Directory(String): """``value`` has Python type ``str``""" + def __init__(self, value: str, expr: "Optional[Expr.Base]" = None) -> None: + super().__init__(value, expr=expr, subtype=Type.Directory()) + def coerce(self, desired_type: Optional[Type.Base] = None) -> Base: "" # TODO: similar coercion logic for Directory? outputs when we support those diff --git a/WDL/runtime/task.py b/WDL/runtime/task.py index 021cd797..8810c7c4 100644 --- a/WDL/runtime/task.py +++ b/WDL/runtime/task.py @@ -314,12 +314,15 @@ def _eval_task_inputs( container: TaskContainer, ) -> Env.Bindings[Value.Base]: - # Map all the provided input Files to in-container paths - container.add_files(_filenames(posix_inputs)) + # Map all the provided input File & Directory paths to in-container paths + container.add_paths(_fspaths(posix_inputs)) - # copy posix_inputs with all Files mapped to their in-container paths + # copy posix_inputs with all File & Directory values mapped to their in-container paths def map_paths(fn: Union[Value.File, Value.Directory]) -> str: - return container.input_file_map[fn.value] + p = fn.value + if isinstance(fn, Value.Directory): + p += "/" + return container.input_path_map[p] container_inputs = Value.rewrite_env_paths(posix_inputs, map_paths) @@ -346,7 +349,7 @@ def map_paths(fn: Union[Value.File, Value.Directory]) -> str: assert len(decls_by_id) == len(decls_to_eval) # evaluate each declaration in that order - # note: the write_* functions call container.add_files as a side-effect + # note: the write_* functions call container.add_paths as a side-effect stdlib = InputStdLib(logger, container) for decl in decls_to_eval: assert isinstance(decl, Tree.Decl) @@ -370,13 +373,19 @@ def map_paths(fn: Union[Value.File, Value.Directory]) -> str: return container_env -def _filenames(env: Env.Bindings[Value.Base]) -> Set[str]: - "Get the filenames of all File values in the environment" +def _fspaths(env: Env.Bindings[Value.Base]) -> Set[str]: + """ + Get the unique paths of all File & Directory values in the environment. Directory paths will + have a trailing '/'. + """ ans = set() def collector(v: Value.Base) -> None: - if isinstance(v, (Value.File, Value.Directory)): + if isinstance(v, Value.File): + assert not v.value.endswith("/") ans.add(v.value) + elif isinstance(v, Value.Directory): + ans.add(v.value + ("/" if not v.value.endswith("/") else "")) for ch in v.children: collector(ch) @@ -543,7 +552,7 @@ def _eval_task_outputs( # helper to rewrite Files from in-container paths to host paths def rewriter(fn: str, output_name: str) -> str: - host_file = container.host_file(fn) + host_file = container.host_path(fn) if host_file is None: logger.warning( _( @@ -694,20 +703,20 @@ def __init__(self, logger: logging.Logger, container: TaskContainer, inputs_only def _devirtualize_filename(self, filename: str) -> str: # check allowability of reading this file, & map from in-container to host - ans = self.container.host_file(filename, inputs_only=self.inputs_only) + ans = self.container.host_path(filename, inputs_only=self.inputs_only) if ans is None: raise OutputError("function was passed non-existent file " + filename) self.logger.debug(_("read_", container=filename, host=ans)) return ans def _virtualize_filename(self, filename: str) -> str: - # register new file with container input_file_map - self.container.add_files([filename]) + # register new file with container input_path_map + self.container.add_paths([filename]) self.logger.debug( - _("write_", host=filename, container=self.container.input_file_map[filename]) + _("write_", host=filename, container=self.container.input_path_map[filename]) ) - self.logger.info(_("wrote", file=self.container.input_file_map[filename])) - return self.container.input_file_map[filename] + self.logger.info(_("wrote", file=self.container.input_path_map[filename])) + return self.container.input_path_map[filename] class InputStdLib(_StdLib): diff --git a/WDL/runtime/task_container.py b/WDL/runtime/task_container.py index d94bcf14..298d09fb 100644 --- a/WDL/runtime/task_container.py +++ b/WDL/runtime/task_container.py @@ -70,15 +70,16 @@ def detect_resource_limits(cls, cfg: config.Loader, logger: logging.Logger) -> D command's working directory will be ``{container_dir}/work/``. """ - input_file_map: Dict[str, str] + input_path_map: Dict[str, str] """ :type: Dict[str,str] - A mapping of host input file paths to in-container mounted paths, - maintained by ``add_files``. + A mapping of host input file/directory paths to in-container mounted paths, maintained by + ``add_paths``. Directory paths are distinguished by trailing slashes on both keys and values; + the slashes often should be trimmed for use elsewhere. """ - input_file_map_rev: Dict[str, str] + input_path_map_rev: Dict[str, str] runtime_values: Dict[str, Any] """ @@ -101,67 +102,72 @@ def __init__(self, cfg: config.Loader, run_id: str, host_dir: str) -> None: self.run_id = run_id self.host_dir = host_dir self.container_dir = "/mnt/miniwdl_task_container" - self.input_file_map = {} - self.input_file_map_rev = {} + self.input_path_map = {} + self.input_path_map_rev = {} self.stderr_callback = None self._running = False self.runtime_values = {} os.makedirs(os.path.join(self.host_dir, "work")) - def add_files(self, host_files: Iterable[str]) -> None: + def add_paths(self, host_paths: Iterable[str]) -> None: """ - Use before running the container to add a list of host files to mount - inside the container as inputs. The host-to-container path mapping is - maintained in ``input_file_map``. + Use before running the container to add a list of host paths to mount inside the container + as inputs. Directory paths should have a trailing slash. The host-to-container path mapping + is maintained in ``input_path_map``. - Although ``add_files`` can be used multiple times, files should be - added together where possible, as this allows heuristics for dealing - with any name collisions among them. + Although ``add_paths`` can be used multiple times, paths should be added together where + possible, as this allows heuristics for dealing with any name collisions among them. """ assert not self._running # partition the files by host directory - host_files_by_dir = {} - for host_file in host_files: - if host_file not in self.input_file_map: - if not os.path.exists(host_file): - raise Error.InputError("input path not found: " + host_file) - host_files_by_dir.setdefault(os.path.dirname(host_file), set()).add(host_file) + host_paths_by_dir = {} + for host_path in host_paths: + host_path_strip = host_path.rstrip("/") + if host_path not in self.input_path_map and host_path_strip not in self.input_path_map: + if not os.path.exists(host_path_strip): + raise Error.InputError("input path not found: " + host_path) + host_paths_by_dir.setdefault(os.path.dirname(host_path_strip), set()).add(host_path) # for each such partition of files # - if there are no basename collisions under input subdirectory 0, then mount them there. # - otherwise, mount them in a fresh subdirectory - for files in host_files_by_dir.values(): + for paths in host_paths_by_dir.values(): based = os.path.join(self.container_dir, "work/_miniwdl_inputs") subd = "0" - for host_file in files: - container_file = os.path.join(based, subd, os.path.basename(host_file)) - if container_file in self.input_file_map_rev: - subd = str(len(self.input_file_map) + 1) - for host_file in files: - container_file = os.path.join(based, subd, os.path.basename(host_file)) - assert container_file not in self.input_file_map_rev - self.input_file_map[host_file] = container_file - self.input_file_map_rev[container_file] = host_file + for host_path in paths: + container_path = os.path.join(based, subd, os.path.basename(host_path.rstrip("/"))) + if host_path.endswith("/"): + container_path += "/" + if container_path in self.input_path_map_rev: + assert subd == "0" + subd = str(len(self.input_path_map) + 1) + for host_path in paths: + container_path = os.path.join(based, subd, os.path.basename(host_path.rstrip("/"))) + if host_path.endswith("/"): + container_path += "/" + assert container_path not in self.input_path_map_rev + self.input_path_map[host_path] = container_path + self.input_path_map_rev[container_path] = host_path def copy_input_files(self, logger: logging.Logger) -> None: - # After add_files has been used as needed, copy the input files from their original + # After add_paths has been used as needed, copy the input files from their original # locations to the appropriate subdirectories of the container working directory. This may # not be necessary e.g. if the container backend supports bind-mounting the input # files from their original host paths. # called once per task run (attempt) - for host_filename, container_filename in self.input_file_map.items(): - assert container_filename.startswith(self.container_dir) - host_copy_filename = os.path.join( - self.host_dir, os.path.relpath(container_filename, self.container_dir) + for host_path, container_path in self.input_path_map.items(): + assert container_path.startswith(self.container_dir) + host_copy_path = os.path.join( + self.host_dir, os.path.relpath(container_path.rstrip("/"), self.container_dir) ) - logger.info(_("copy host input file", input=host_filename, copy=host_copy_filename)) - os.makedirs(os.path.dirname(host_copy_filename), exist_ok=True) - if os.path.isdir(host_filename): - shutil.copytree(host_filename, host_copy_filename, symlinks=False) + logger.info(_("copy host input file", input=host_path, copy=host_copy_path)) + os.makedirs(os.path.dirname(host_copy_path), exist_ok=True) + if host_path.endswith("/"): + shutil.copytree(host_path.rstrip("/"), host_copy_path, symlinks=False) else: - shutil.copy(host_filename, host_copy_filename) + shutil.copy(host_path, host_copy_path) def run(self, logger: logging.Logger, command: str) -> None: """ @@ -223,43 +229,42 @@ def reset(self, logger: logging.Logger, retries: int, delete_work: bool = False) ) os.makedirs(os.path.join(self.host_dir, "work")) - def host_file(self, container_file: str, inputs_only: bool = False) -> Optional[str]: + def host_path(self, container_path: str, inputs_only: bool = False) -> Optional[str]: """ - Map an output file's in-container path under ``container_dir`` to a host path under - ``host_dir``. Return None if the designated file does not exist. + Map the in-container path of an output File/Directory to a host path under ``host_dir``. + Directory paths should be given a trailing "/". Return None if the path does not exist. - SECURITY: except for input files, this method must only return host paths under - ``host_dir`` and prevent any reference to other host files (e.g. /etc/passwd), including - via sneaky symlinks + SECURITY: except for inputs, this method must only return host paths under ``host_dir`` + and prevent any reference to other host files (e.g. /etc/passwd), including via symlinks. """ - if os.path.isabs(container_file): + if os.path.isabs(container_path): # handle output of std{out,err}.txt - if container_file in [ + if container_path in [ os.path.join(self.container_dir, pipe_file) for pipe_file in ["stdout.txt", "stderr.txt"] ]: - return os.path.join(self.host_dir, os.path.basename(container_file)) + return os.path.join(self.host_dir, os.path.basename(container_path)) # handle output of an input file - if container_file in self.input_file_map_rev: - return self.input_file_map_rev[container_file] + if container_path in self.input_path_map_rev: + return self.input_path_map_rev[container_path] if inputs_only: raise Error.InputError( - "task inputs attempted to use a non-input or non-existent file " - + container_file + "task inputs attempted to use a non-input or non-existent path " + + container_path ) # relativize the path to the provisioned working directory - container_file = os.path.relpath( - container_file, os.path.join(self.container_dir, "work") + container_path = os.path.relpath( + container_path, os.path.join(self.container_dir, "work") ) host_workdir = os.path.join(self.host_dir, "work") - ans = os.path.join(host_workdir, container_file) + ans = os.path.join(host_workdir, container_path) if os.path.isfile(ans): if path_really_within(ans, host_workdir): return ans raise OutputError( "task outputs attempted to use a file outside its working directory: " - + container_file + + container_path ) return None @@ -534,18 +539,18 @@ def _run(self, logger: logging.Logger, terminating: Callable[[], bool], command: logger.exception("failed to close docker-py client") def prepare_mounts(self, logger: logging.Logger) -> List[docker.types.Mount]: - def touch_mount_point(container_file: str, is_dir: bool = False) -> None: + def touch_mount_point(container_path: str) -> None: # touching each mount point ensures they'll be owned by invoking user:group - assert container_file.startswith(self.container_dir + "/") - host_file = os.path.join( - self.host_dir, os.path.relpath(container_file, self.container_dir) + assert container_path.startswith(self.container_dir + "/") + host_path = os.path.join( + self.host_dir, os.path.relpath(container_path.rstrip("/"), self.container_dir) ) - assert host_file.startswith(self.host_dir + "/") - if is_dir: - os.makedirs(host_file, exist_ok=True) + assert host_path.startswith(self.host_dir + "/") + if container_path.endswith("/"): + os.makedirs(host_path, exist_ok=True) else: - os.makedirs(os.path.dirname(host_file), exist_ok=True) - with open(host_file, "x") as _: + os.makedirs(os.path.dirname(host_path), exist_ok=True) + with open(host_path, "x") as _: pass def escape(s): @@ -553,10 +558,11 @@ def escape(s): return s.replace("{{", '{{"{{"}}') mounts = [] - # mount input files and command + # mount input files/directories and command if self._bind_input_files: perm_warn = True - for host_path, container_path in self.input_file_map.items(): + for host_path, container_path in self.input_path_map.items(): + host_path = host_path.rstrip("/") st = os.stat(host_path) if perm_warn and not ( (st.st_mode & stat.S_IROTH) @@ -571,10 +577,14 @@ def escape(s): ) ) perm_warn = False - touch_mount_point(container_path, is_dir=stat.S_ISDIR(st.st_mode)) + assert (not container_path.endswith("/")) or stat.S_ISDIR(st.st_mode) + touch_mount_point(container_path) mounts.append( docker.types.Mount( - escape(container_path), escape(host_path), type="bind", read_only=True + escape(container_path.rstrip("/")), + escape(host_path), + type="bind", + read_only=True, ) ) mounts.append( diff --git a/WDL/runtime/workflow.py b/WDL/runtime/workflow.py index adbe9b51..a37311e4 100644 --- a/WDL/runtime/workflow.py +++ b/WDL/runtime/workflow.py @@ -45,7 +45,7 @@ import importlib_metadata from .. import Env, Type, Value, Tree, StdLib from ..Error import InputError -from .task import run_local_task, _filenames, link_outputs, _add_downloadable_default_files +from .task import run_local_task, _fspaths, link_outputs, _add_downloadable_default_files from .download import able as downloadable, run_cached as download from .._util import ( write_atomic, @@ -156,7 +156,7 @@ def __init__( self.finished = set() self.running = set() self.waiting = set() - self.filename_whitelist = _filenames(inputs) + self.filename_whitelist = _fspaths(inputs) from .. import values_to_json @@ -299,7 +299,7 @@ def call_finished(self, job_id: str, outputs: Env.Bindings[Value.Base]) -> None: call_node = self.jobs[job_id].node assert isinstance(call_node, Tree.Call) self.job_outputs[job_id] = outputs.wrap_namespace(call_node.name) - self.filename_whitelist |= _filenames(outputs) + self.filename_whitelist |= _fspaths(outputs) self.finished.add(job_id) self.running.remove(job_id) @@ -373,7 +373,7 @@ def _do_job( lambda b: Env.Binding(b.name, b.value.coerce(callee_inputs[b.name].type)) ) # check input files against whitelist - disallowed_filenames = _filenames(call_inputs) - self.filename_whitelist + disallowed_filenames = _fspaths(call_inputs) - self.filename_whitelist disallowed_filenames = set( fn for fn in disallowed_filenames if not downloadable(cfg, fn) ) diff --git a/tests/runner.t b/tests/runner.t index 55728f11..be864cc6 100644 --- a/tests/runner.t +++ b/tests/runner.t @@ -277,7 +277,7 @@ EOF mkdir -p indir/subdir echo alice > indir/alice.txt echo bob > indir/subdir/bob.txt -miniwdl run dir_io.wdl d=indir +$miniwdl run dir_io.wdl d=indir is "$?" "0" "directory input" is `jq -r '.["w.dsz"]' _LAST/outputs.json` "10" "use of directory input"