diff --git a/WDL/CLI.py b/WDL/CLI.py index d1625dd3..40f755b1 100644 --- a/WDL/CLI.py +++ b/WDL/CLI.py @@ -1000,6 +1000,20 @@ def runner_input_value(s_value, ty, file_found, root): elif not (file_found and file_found(fn)): # maybe URI raise Error.InputError("File not found: " + fn) return Value.File(fn) + if isinstance(ty, Type.Directory): + dn = os.path.expanduser(s_value) + if os.path.isdir(dn): + dn = os.path.abspath(dn) + if not path_really_within(dn, root): + raise Error.InputError( + f"all input paths must be located within the configured `file_io.root' directory `{root}' " + f"unlike `{dn}'" + ) + # TODO: courtesy check for symlinks that have absolute paths or relatively point + # outside the directory + else: # TODO: relax for URIs + raise Error.InputError("Directory not found: " + dn) + return Value.Directory(dn) if isinstance(ty, Type.Boolean): if s_value == "true": return Value.Boolean(True) @@ -1241,7 +1255,7 @@ def localize( doc = load(wdlfile, path or [], check_quant=check_quant, read_source=read_source) def file_found(fn): - return runtime.download.able(cfg, fn) or os.path.isfile(fn) + return runtime.download.able(cfg, fn) or os.path.exists(fn) try: target, input_env, input_json = runner_input( diff --git a/WDL/Lint.py b/WDL/Lint.py index 182abe88..1e2d2f9b 100644 --- a/WDL/Lint.py +++ b/WDL/Lint.py @@ -188,9 +188,11 @@ def _compound_coercion(to_type, from_type, base_to_type, extra_from_type=None): to_type.right_type, from_type.right_type, base_to_type, extra_from_type ) if isinstance(to_type, base_to_type): + coercible = list(base_to_type) if extra_from_type: - return not isinstance(from_type, (base_to_type, extra_from_type, Type.Any)) - return not isinstance(from_type, (base_to_type, Type.Any)) + coercible.append(extra_from_type) + coercible.append(Type.Any) + return not isinstance(from_type, tuple(coercible)) return False @@ -215,7 +217,7 @@ def decl(self, obj: Tree.Decl) -> Any: if obj.expr and _compound_coercion( obj.type, obj.expr.type, - Type.String, + (Type.String,), (Type.File if isinstance(_parent_executable(obj), Tree.Task) else None), ): self.add(obj, "{} {} = :{}:".format(str(obj.type), obj.name, str(obj.expr.type))) @@ -259,7 +261,7 @@ def expr(self, obj: Expr.Base) -> Any: if _compound_coercion( F_i, arg_i.type, - Type.String, + (Type.String,), (Type.File if isinstance(_parent_executable(obj), Tree.Task) else None), ): msg = "{} argument of {}() = :{}:".format( @@ -287,7 +289,7 @@ def expr(self, obj: Expr.Base) -> Any: def call(self, obj: Tree.Call) -> Any: for name, inp_expr in obj.inputs.items(): decl = _find_input_decl(obj, name) - if _compound_coercion(decl.type, inp_expr.type, Type.String): + if _compound_coercion(decl.type, inp_expr.type, (Type.String,)): msg = "input {} {} = :{}:".format(str(decl.type), decl.name, str(inp_expr.type)) self.add(obj, msg, inp_expr.pos) @@ -316,7 +318,7 @@ def decl(self, obj: Tree.Decl) -> Any: super().decl(obj) if ( obj.expr - and _compound_coercion(obj.type, obj.expr.type, Type.File) + and _compound_coercion(obj.type, obj.expr.type, (Type.File, Type.Directory)) and not ( isinstance(obj.expr, Expr.String) and obj.expr.literal @@ -334,7 +336,7 @@ def expr(self, obj: Expr.Base) -> Any: for i in range(min(len(F.argument_types), len(obj.arguments))): F_i = F.argument_types[i] arg_i = obj.arguments[i] - if _compound_coercion(F_i, arg_i.type, Type.File): + if _compound_coercion(F_i, arg_i.type, (Type.File, Type.Directory)): msg = "{} argument of {}() = :{}:".format(str(F_i), F.name, str(arg_i.type)) self.add(obj, msg, arg_i.pos) elif obj.function_name == "size": @@ -354,7 +356,7 @@ def call(self, obj: Tree.Call) -> Any: super().call(obj) for name, inp_expr in obj.inputs.items(): decl = _find_input_decl(obj, name) - if _compound_coercion(decl.type, inp_expr.type, Type.File): + if _compound_coercion(decl.type, inp_expr.type, (Type.File, Type.Directory)): msg = "input {} {} = :{}:".format(str(decl.type), decl.name, str(inp_expr.type)) self.add(obj, msg, inp_expr.pos) diff --git a/WDL/Tree.py b/WDL/Tree.py index 90ac69a3..2f61effa 100644 --- a/WDL/Tree.py +++ b/WDL/Tree.py @@ -405,6 +405,11 @@ def typecheck( errors.try1( lambda: decl.typecheck(type_env, stdlib=stdlib, check_quant=check_quant) ) + if _has_directories(decl.type): + # FIXME + raise Error.ValidationError( + decl, "Directory outputs aren't supported in this version of miniwdl" + ) # check for cyclic dependencies among decls _detect_cycles( @@ -1062,6 +1067,11 @@ def typecheck(self, doc: "Document", check_quant: bool) -> None: ) ) output_type_env = output_type_env2 + if _has_directories(output.type): + # FIXME + raise Error.ValidationError( + output, "Directory outputs aren't supported in this version of miniwdl" + ) # 6. check for cyclic dependencies _detect_cycles(_workflow_dependency_matrix(self)) @@ -1804,3 +1814,14 @@ def _add_struct_instance_to_type_env( else: ans = ans.bind(namespace + "." + member_name, member_type, ctx) return ans + + +def _has_directories(t: Type.Base): + """ + used to check output declarations for Directory types while we don't support them + """ + if isinstance(t, Type.Directory) or next( + (p for p in t.parameters if _has_directories(p)), None + ): + return True + return False diff --git a/WDL/Type.py b/WDL/Type.py index 2099e755..9f89695d 100644 --- a/WDL/Type.py +++ b/WDL/Type.py @@ -174,13 +174,24 @@ def coerces(self, rhs: Base, check_quant: bool = True) -> bool: return super().coerces(rhs, check_quant) +class Directory(Base): + def __init__(self, optional: bool = False) -> None: + self._optional = optional + + def coerces(self, rhs: Base, check_quant: bool = True) -> bool: + "" + if isinstance(rhs, String): + return True + return super().coerces(rhs, check_quant) + + class String(Base): def __init__(self, optional: bool = False) -> None: self._optional = optional def coerces(self, rhs: Base, check_quant: bool = True) -> bool: "" - if isinstance(rhs, (File, Int, Float)): + if isinstance(rhs, (File, Directory, Int, Float)): return self._check_optional(rhs, check_quant) return super().coerces(rhs, check_quant) diff --git a/WDL/Value.py b/WDL/Value.py index 34b66210..45ceb8d4 100644 --- a/WDL/Value.py +++ b/WDL/Value.py @@ -145,13 +145,18 @@ def coerce(self, desired_type: Optional[Type.Base] = None) -> Base: class String(Base): """``value`` has Python type ``str``""" - def __init__(self, value: str, expr: "Optional[Expr.Base]" = None) -> None: - super().__init__(Type.String(), value, expr) + def __init__( + self, value: str, expr: "Optional[Expr.Base]" = None, subtype: Optional[Type.Base] = None + ) -> None: + subtype = subtype or Type.String() + super().__init__(subtype, value, expr) def coerce(self, desired_type: Optional[Type.Base] = None) -> Base: "" if isinstance(desired_type, Type.File) and not isinstance(self, File): return File(self.value, self.expr) + if isinstance(desired_type, Type.Directory) and not isinstance(self, Directory): + return Directory(self.value, self.expr) try: if isinstance(desired_type, Type.Int): return Int(int(self.value), self.expr) @@ -167,6 +172,11 @@ def coerce(self, desired_type: Optional[Type.Base] = None) -> Base: class File(String): """``value`` has Python type ``str``""" + def __init__(self, value: str, expr: "Optional[Expr.Base]" = None) -> None: + super().__init__(value, expr=expr, subtype=Type.File()) + if value != value.rstrip("/"): + raise Error.InputError("WDL.Value.File invalid path: " + value) + def coerce(self, desired_type: Optional[Type.Base] = None) -> Base: "" if self.value is None: @@ -179,6 +189,18 @@ def coerce(self, desired_type: Optional[Type.Base] = None) -> Base: return super().coerce(desired_type) +class Directory(String): + """``value`` has Python type ``str``""" + + def __init__(self, value: str, expr: "Optional[Expr.Base]" = None) -> None: + super().__init__(value, expr=expr, subtype=Type.Directory()) + + def coerce(self, desired_type: Optional[Type.Base] = None) -> Base: + "" + # TODO: similar coercion logic for Directory? outputs when we support those + return super().coerce(desired_type) + + class Array(Base): """``value`` is a Python ``list`` of other ``WDL.Value.Base`` instances""" @@ -412,6 +434,8 @@ def from_json(type: Type.Base, value: Any) -> Base: return Float(float(value)) if isinstance(type, Type.File) and isinstance(value, str): return File(value) + if isinstance(type, Type.Directory) and isinstance(value, str): + return Directory(value) if isinstance(type, (Type.String, Type.Any)) and isinstance(value, str): return String(value) if isinstance(type, Type.Array) and isinstance(value, list): @@ -470,28 +494,51 @@ def _infer_from_json(j: Any) -> Base: raise Error.InputError(f"couldn't construct value from: {json.dumps(j)}") -def rewrite_files(v: Base, f: Callable[[str], str]) -> Base: +def rewrite_paths(v: Base, f: Callable[[Union[File, Directory]], str]) -> Base: """ - Produce a deep copy of the given Value with all File names rewritten by the given function - (including Files nested inside compound Values). + Produce a deep copy of the given Value with all File & Directory paths (including those nested + inside compound Values) rewritten by the given function. """ - mapped_files = set() + mapped_paths = set() - def map_files(v2: Base) -> Base: - if isinstance(v2, File): - assert id(v2) not in mapped_files, f"File {id(v2)} reused in deepcopy" - v2.value = f(v2.value) - mapped_files.add(id(v2)) + def map_paths(v2: Base) -> Base: + if isinstance(v2, (File, Directory)): + assert id(v2) not in mapped_paths, f"File/Directory {id(v2)} reused in deepcopy" + v2.value = f(v2) + mapped_paths.add(id(v2)) for ch in v2.children: - map_files(ch) + map_paths(ch) return v2 - return map_files(copy.deepcopy(v)) + return map_paths(copy.deepcopy(v)) + + +def rewrite_env_paths( + env: Env.Bindings[Base], f: Callable[[Union[File, Directory]], str] +) -> Env.Bindings[Base]: + """ + Produce a deep copy of the given Value Env with all File & Directory paths rewritten by the + given function. + """ + return env.map(lambda binding: Env.Binding(binding.name, rewrite_paths(binding.value, f))) + + +def rewrite_files(v: Base, f: Callable[[str], str]) -> Base: + """ + Produce a deep copy of the given Value with all File names rewritten by the given function + (including Files nested inside compound Values). + + (deprecated: use ``rewrite_paths`` to handle Directory values as well) + """ + + return rewrite_paths(v, lambda fd: f(fd.value) if isinstance(fd, File) else fd.value) def rewrite_env_files(env: Env.Bindings[Base], f: Callable[[str], str]) -> Env.Bindings[Base]: """ Produce a deep copy of the given Value Env with all File names rewritten by the given function. + + (deprecated: use ``rewrite_env_paths`` to handle Directory values as well) """ return env.map(lambda binding: Env.Binding(binding.name, rewrite_files(binding.value, f))) diff --git a/WDL/_grammar.py b/WDL/_grammar.py index d04fe024..267c4e51 100644 --- a/WDL/_grammar.py +++ b/WDL/_grammar.py @@ -260,7 +260,7 @@ keywords = {} keywords["draft-2"] = set( - "Array Float Int Map None Pair String as call command else false if import input left meta object output parameter_meta right runtime scatter task then true workflow".split( + "Array File Float Int Map None Pair String as call command else false if import input left meta object output parameter_meta right runtime scatter task then true workflow".split( " " ) ) @@ -479,7 +479,7 @@ %ignore COMMENT """ keywords["development"] = set( - "Array Float Int Map None Pair String alias as call command else false if import input left meta object output parameter_meta right runtime scatter struct task then true workflow".split( + "Array Directory File Float Int Map None Pair String alias as call command else false if import input left meta object output parameter_meta right runtime scatter struct task then true workflow".split( " " ) ) diff --git a/WDL/_parser.py b/WDL/_parser.py index 30973089..a247711b 100644 --- a/WDL/_parser.py +++ b/WDL/_parser.py @@ -165,9 +165,38 @@ def fn(self, items, meta, op=op): setattr(_ExprTransformer, op, lark.v_args(meta=True)(classmethod(fn))) # pyre-fixme -class _TypeTransformer(_SourcePositionTransformerMixin, lark.Transformer): +class _DocTransformer(_ExprTransformer): # pylint: disable=no-self-use,unused-argument + _keywords: Set[str] + _source_text: str + _comments: List[lark.Token] + _version: Optional[str] + _declared_version: Optional[str] + + def __init__( + self, + source_text: str, + keywords: Set[str], + comments: List[lark.Token], + version: str, + declared_version: Optional[str], + *args, + **kwargs, + ): + super().__init__(*args, **kwargs) + self._source_text = source_text + self._keywords = keywords + self._comments = comments + self._version = version + self._declared_version = declared_version + + def _check_keyword(self, pos, name): + if name in self._keywords: + raise Error.SyntaxError( + pos, "unexpected keyword {}".format(name), self._version, self._declared_version + ) + def optional(self, items, meta): return set(["optional"]) @@ -204,6 +233,8 @@ def type(self, items, meta): "String": Type.String, "File": Type.File, } + if self._version not in ("draft-2", "1.0"): + atomic_types["Directory"] = Type.Directory if items[0].value in atomic_types: if param or param2: raise Error.InvalidType( @@ -234,39 +265,6 @@ def type(self, items, meta): ans.pos = self._sp(meta) return ans - -class _DocTransformer(_ExprTransformer, _TypeTransformer): - # pylint: disable=no-self-use,unused-argument - - _keywords: Set[str] - _source_text: str - _comments: List[lark.Token] - _version: Optional[str] - _declared_version: Optional[str] - - def __init__( - self, - source_text: str, - keywords: Set[str], - comments: List[lark.Token], - version: str, - declared_version: Optional[str], - *args, - **kwargs, - ): - super().__init__(*args, **kwargs) - self._source_text = source_text - self._keywords = keywords - self._comments = comments - self._version = version - self._declared_version = declared_version - - def _check_keyword(self, pos, name): - if name in self._keywords: - raise Error.SyntaxError( - pos, "unexpected keyword {}".format(name), self._version, self._declared_version - ) - def decl(self, items, meta): self._check_keyword(self._sp(meta), items[1].value) return Tree.Decl( @@ -562,7 +560,7 @@ def document(self, items, meta): # have lark pass the 'meta' with line/column numbers to each transformer method -for _klass in [_ExprTransformer, _TypeTransformer, _DocTransformer]: +for _klass in [_ExprTransformer, _DocTransformer]: for name, method in inspect.getmembers(_klass, inspect.isfunction): if not name.startswith("_"): setattr(_klass, name, lark.v_args(meta=True)(method)) # pyre-fixme diff --git a/WDL/runtime/task.py b/WDL/runtime/task.py index a3f09441..744aa941 100644 --- a/WDL/runtime/task.py +++ b/WDL/runtime/task.py @@ -314,14 +314,17 @@ def _eval_task_inputs( container: TaskContainer, ) -> Env.Bindings[Value.Base]: - # Map all the provided input Files to in-container paths - container.add_files(_filenames(posix_inputs)) + # Map all the provided input File & Directory paths to in-container paths + container.add_paths(_fspaths(posix_inputs)) - # copy posix_inputs with all Files mapped to their in-container paths - def map_files(fn: str) -> str: - return container.input_file_map[fn] + # copy posix_inputs with all File & Directory values mapped to their in-container paths + def map_paths(fn: Union[Value.File, Value.Directory]) -> str: + p = fn.value.rstrip("/") + if isinstance(fn, Value.Directory): + p += "/" + return container.input_path_map[p] - container_inputs = Value.rewrite_env_files(posix_inputs, map_files) + container_inputs = Value.rewrite_env_paths(posix_inputs, map_paths) # initialize value environment with the inputs container_env = Env.Bindings() @@ -346,7 +349,7 @@ def map_files(fn: str) -> str: assert len(decls_by_id) == len(decls_to_eval) # evaluate each declaration in that order - # note: the write_* functions call container.add_files as a side-effect + # note: the write_* functions call container.add_paths as a side-effect stdlib = InputStdLib(logger, container) for decl in decls_to_eval: assert isinstance(decl, Tree.Decl) @@ -370,13 +373,19 @@ def map_files(fn: str) -> str: return container_env -def _filenames(env: Env.Bindings[Value.Base]) -> Set[str]: - "Get the filenames of all File values in the environment" +def _fspaths(env: Env.Bindings[Value.Base]) -> Set[str]: + """ + Get the unique paths of all File & Directory values in the environment. Directory paths will + have a trailing '/'. + """ ans = set() def collector(v: Value.Base) -> None: if isinstance(v, Value.File): + assert not v.value.endswith("/") ans.add(v.value) + elif isinstance(v, Value.Directory): + ans.add(v.value.rstrip("/") + "/") for ch in v.children: collector(ch) @@ -543,7 +552,7 @@ def _eval_task_outputs( # helper to rewrite Files from in-container paths to host paths def rewriter(fn: str, output_name: str) -> str: - host_file = container.host_file(fn) + host_file = container.host_path(fn) if host_file is None: logger.warning( _( @@ -694,20 +703,20 @@ def __init__(self, logger: logging.Logger, container: TaskContainer, inputs_only def _devirtualize_filename(self, filename: str) -> str: # check allowability of reading this file, & map from in-container to host - ans = self.container.host_file(filename, inputs_only=self.inputs_only) + ans = self.container.host_path(filename, inputs_only=self.inputs_only) if ans is None: raise OutputError("function was passed non-existent file " + filename) self.logger.debug(_("read_", container=filename, host=ans)) return ans def _virtualize_filename(self, filename: str) -> str: - # register new file with container input_file_map - self.container.add_files([filename]) + # register new file with container input_path_map + self.container.add_paths([filename]) self.logger.debug( - _("write_", host=filename, container=self.container.input_file_map[filename]) + _("write_", host=filename, container=self.container.input_path_map[filename]) ) - self.logger.info(_("wrote", file=self.container.input_file_map[filename])) - return self.container.input_file_map[filename] + self.logger.info(_("wrote", file=self.container.input_path_map[filename])) + return self.container.input_path_map[filename] class InputStdLib(_StdLib): diff --git a/WDL/runtime/task_container.py b/WDL/runtime/task_container.py index eba80a8c..298d09fb 100644 --- a/WDL/runtime/task_container.py +++ b/WDL/runtime/task_container.py @@ -13,6 +13,7 @@ import uuid import hashlib import shlex +import stat from typing import Callable, Iterable, List, Set, Tuple, Type, Any, Dict, Optional from abc import ABC, abstractmethod import docker @@ -69,15 +70,16 @@ def detect_resource_limits(cls, cfg: config.Loader, logger: logging.Logger) -> D command's working directory will be ``{container_dir}/work/``. """ - input_file_map: Dict[str, str] + input_path_map: Dict[str, str] """ :type: Dict[str,str] - A mapping of host input file paths to in-container mounted paths, - maintained by ``add_files``. + A mapping of host input file/directory paths to in-container mounted paths, maintained by + ``add_paths``. Directory paths are distinguished by trailing slashes on both keys and values; + the slashes often should be trimmed for use elsewhere. """ - input_file_map_rev: Dict[str, str] + input_path_map_rev: Dict[str, str] runtime_values: Dict[str, Any] """ @@ -100,64 +102,72 @@ def __init__(self, cfg: config.Loader, run_id: str, host_dir: str) -> None: self.run_id = run_id self.host_dir = host_dir self.container_dir = "/mnt/miniwdl_task_container" - self.input_file_map = {} - self.input_file_map_rev = {} + self.input_path_map = {} + self.input_path_map_rev = {} self.stderr_callback = None self._running = False self.runtime_values = {} os.makedirs(os.path.join(self.host_dir, "work")) - def add_files(self, host_files: Iterable[str]) -> None: + def add_paths(self, host_paths: Iterable[str]) -> None: """ - Use before running the container to add a list of host files to mount - inside the container as inputs. The host-to-container path mapping is - maintained in ``input_file_map``. + Use before running the container to add a list of host paths to mount inside the container + as inputs. Directory paths should have a trailing slash. The host-to-container path mapping + is maintained in ``input_path_map``. - Although ``add_files`` can be used multiple times, files should be - added together where possible, as this allows heuristics for dealing - with any name collisions among them. + Although ``add_paths`` can be used multiple times, paths should be added together where + possible, as this allows heuristics for dealing with any name collisions among them. """ assert not self._running # partition the files by host directory - host_files_by_dir = {} - for host_file in host_files: - if host_file not in self.input_file_map: - if not os.path.isfile(host_file): - raise Error.InputError("input file not found: " + host_file) - host_files_by_dir.setdefault(os.path.dirname(host_file), set()).add(host_file) + host_paths_by_dir = {} + for host_path in host_paths: + host_path_strip = host_path.rstrip("/") + if host_path not in self.input_path_map and host_path_strip not in self.input_path_map: + if not os.path.exists(host_path_strip): + raise Error.InputError("input path not found: " + host_path) + host_paths_by_dir.setdefault(os.path.dirname(host_path_strip), set()).add(host_path) # for each such partition of files # - if there are no basename collisions under input subdirectory 0, then mount them there. # - otherwise, mount them in a fresh subdirectory - for files in host_files_by_dir.values(): + for paths in host_paths_by_dir.values(): based = os.path.join(self.container_dir, "work/_miniwdl_inputs") subd = "0" - for host_file in files: - container_file = os.path.join(based, subd, os.path.basename(host_file)) - if container_file in self.input_file_map_rev: - subd = str(len(self.input_file_map) + 1) - for host_file in files: - container_file = os.path.join(based, subd, os.path.basename(host_file)) - assert container_file not in self.input_file_map_rev - self.input_file_map[host_file] = container_file - self.input_file_map_rev[container_file] = host_file + for host_path in paths: + container_path = os.path.join(based, subd, os.path.basename(host_path.rstrip("/"))) + if host_path.endswith("/"): + container_path += "/" + if container_path in self.input_path_map_rev: + assert subd == "0" + subd = str(len(self.input_path_map) + 1) + for host_path in paths: + container_path = os.path.join(based, subd, os.path.basename(host_path.rstrip("/"))) + if host_path.endswith("/"): + container_path += "/" + assert container_path not in self.input_path_map_rev + self.input_path_map[host_path] = container_path + self.input_path_map_rev[container_path] = host_path def copy_input_files(self, logger: logging.Logger) -> None: - # After add_files has been used as needed, copy the input files from their original + # After add_paths has been used as needed, copy the input files from their original # locations to the appropriate subdirectories of the container working directory. This may # not be necessary e.g. if the container backend supports bind-mounting the input # files from their original host paths. # called once per task run (attempt) - for host_filename, container_filename in self.input_file_map.items(): - assert container_filename.startswith(self.container_dir) - host_copy_filename = os.path.join( - self.host_dir, os.path.relpath(container_filename, self.container_dir) + for host_path, container_path in self.input_path_map.items(): + assert container_path.startswith(self.container_dir) + host_copy_path = os.path.join( + self.host_dir, os.path.relpath(container_path.rstrip("/"), self.container_dir) ) - logger.info(_("copy host input file", input=host_filename, copy=host_copy_filename)) - os.makedirs(os.path.dirname(host_copy_filename), exist_ok=True) - shutil.copy(host_filename, host_copy_filename) + logger.info(_("copy host input file", input=host_path, copy=host_copy_path)) + os.makedirs(os.path.dirname(host_copy_path), exist_ok=True) + if host_path.endswith("/"): + shutil.copytree(host_path.rstrip("/"), host_copy_path, symlinks=False) + else: + shutil.copy(host_path, host_copy_path) def run(self, logger: logging.Logger, command: str) -> None: """ @@ -219,43 +229,42 @@ def reset(self, logger: logging.Logger, retries: int, delete_work: bool = False) ) os.makedirs(os.path.join(self.host_dir, "work")) - def host_file(self, container_file: str, inputs_only: bool = False) -> Optional[str]: + def host_path(self, container_path: str, inputs_only: bool = False) -> Optional[str]: """ - Map an output file's in-container path under ``container_dir`` to a host path under - ``host_dir``. Return None if the designated file does not exist. + Map the in-container path of an output File/Directory to a host path under ``host_dir``. + Directory paths should be given a trailing "/". Return None if the path does not exist. - SECURITY: except for input files, this method must only return host paths under - ``host_dir`` and prevent any reference to other host files (e.g. /etc/passwd), including - via sneaky symlinks + SECURITY: except for inputs, this method must only return host paths under ``host_dir`` + and prevent any reference to other host files (e.g. /etc/passwd), including via symlinks. """ - if os.path.isabs(container_file): + if os.path.isabs(container_path): # handle output of std{out,err}.txt - if container_file in [ + if container_path in [ os.path.join(self.container_dir, pipe_file) for pipe_file in ["stdout.txt", "stderr.txt"] ]: - return os.path.join(self.host_dir, os.path.basename(container_file)) + return os.path.join(self.host_dir, os.path.basename(container_path)) # handle output of an input file - if container_file in self.input_file_map_rev: - return self.input_file_map_rev[container_file] + if container_path in self.input_path_map_rev: + return self.input_path_map_rev[container_path] if inputs_only: raise Error.InputError( - "task inputs attempted to use a non-input or non-existent file " - + container_file + "task inputs attempted to use a non-input or non-existent path " + + container_path ) # relativize the path to the provisioned working directory - container_file = os.path.relpath( - container_file, os.path.join(self.container_dir, "work") + container_path = os.path.relpath( + container_path, os.path.join(self.container_dir, "work") ) host_workdir = os.path.join(self.host_dir, "work") - ans = os.path.join(host_workdir, container_file) + ans = os.path.join(host_workdir, container_path) if os.path.isfile(ans): if path_really_within(ans, host_workdir): return ans raise OutputError( "task outputs attempted to use a file outside its working directory: " - + container_file + + container_path ) return None @@ -530,29 +539,34 @@ def _run(self, logger: logging.Logger, terminating: Callable[[], bool], command: logger.exception("failed to close docker-py client") def prepare_mounts(self, logger: logging.Logger) -> List[docker.types.Mount]: - def touch_mount_point(container_file: str) -> None: + def touch_mount_point(container_path: str) -> None: # touching each mount point ensures they'll be owned by invoking user:group - assert container_file.startswith(self.container_dir + "/") - host_file = os.path.join( - self.host_dir, os.path.relpath(container_file, self.container_dir) + assert container_path.startswith(self.container_dir + "/") + host_path = os.path.join( + self.host_dir, os.path.relpath(container_path.rstrip("/"), self.container_dir) ) - assert host_file.startswith(self.host_dir + "/") - os.makedirs(os.path.dirname(host_file), exist_ok=True) - with open(host_file, "x") as _: - pass + assert host_path.startswith(self.host_dir + "/") + if container_path.endswith("/"): + os.makedirs(host_path, exist_ok=True) + else: + os.makedirs(os.path.dirname(host_path), exist_ok=True) + with open(host_path, "x") as _: + pass def escape(s): # docker processes {{ interpolations }} return s.replace("{{", '{{"{{"}}') mounts = [] - # mount input files and command + # mount input files/directories and command if self._bind_input_files: perm_warn = True - for host_path, container_path in self.input_file_map.items(): + for host_path, container_path in self.input_path_map.items(): + host_path = host_path.rstrip("/") st = os.stat(host_path) if perm_warn and not ( - (st.st_mode & 4) or (st.st_gid == os.getegid() and (st.st_mode & 0o40)) + (st.st_mode & stat.S_IROTH) + or (st.st_gid == os.getegid() and (st.st_mode & stat.S_IRGRP)) ): # file is neither world-readable, nor group-readable for the invoking user's primary group logger.warning( @@ -563,10 +577,14 @@ def escape(s): ) ) perm_warn = False + assert (not container_path.endswith("/")) or stat.S_ISDIR(st.st_mode) touch_mount_point(container_path) mounts.append( docker.types.Mount( - escape(container_path), escape(host_path), type="bind", read_only=True + escape(container_path.rstrip("/")), + escape(host_path), + type="bind", + read_only=True, ) ) mounts.append( diff --git a/WDL/runtime/workflow.py b/WDL/runtime/workflow.py index adbe9b51..a37311e4 100644 --- a/WDL/runtime/workflow.py +++ b/WDL/runtime/workflow.py @@ -45,7 +45,7 @@ import importlib_metadata from .. import Env, Type, Value, Tree, StdLib from ..Error import InputError -from .task import run_local_task, _filenames, link_outputs, _add_downloadable_default_files +from .task import run_local_task, _fspaths, link_outputs, _add_downloadable_default_files from .download import able as downloadable, run_cached as download from .._util import ( write_atomic, @@ -156,7 +156,7 @@ def __init__( self.finished = set() self.running = set() self.waiting = set() - self.filename_whitelist = _filenames(inputs) + self.filename_whitelist = _fspaths(inputs) from .. import values_to_json @@ -299,7 +299,7 @@ def call_finished(self, job_id: str, outputs: Env.Bindings[Value.Base]) -> None: call_node = self.jobs[job_id].node assert isinstance(call_node, Tree.Call) self.job_outputs[job_id] = outputs.wrap_namespace(call_node.name) - self.filename_whitelist |= _filenames(outputs) + self.filename_whitelist |= _fspaths(outputs) self.finished.add(job_id) self.running.remove(job_id) @@ -373,7 +373,7 @@ def _do_job( lambda b: Env.Binding(b.name, b.value.coerce(callee_inputs[b.name].type)) ) # check input files against whitelist - disallowed_filenames = _filenames(call_inputs) - self.filename_whitelist + disallowed_filenames = _fspaths(call_inputs) - self.filename_whitelist disallowed_filenames = set( fn for fn in disallowed_filenames if not downloadable(cfg, fn) ) diff --git a/tests/runner.t b/tests/runner.t index 44b474df..be864cc6 100644 --- a/tests/runner.t +++ b/tests/runner.t @@ -11,7 +11,7 @@ source tests/bash-tap/bash-tap-bootstrap export PYTHONPATH="$SOURCE_DIR:$PYTHONPATH" miniwdl="python3 -m WDL" -plan tests 53 +plan tests 55 $miniwdl run_self_test is "$?" "0" "run_self_test" @@ -246,6 +246,41 @@ $miniwdl run --copy-input-files mv_input_file.wdl file=quick is "$?" "0" "copy input files" is "$(basename `jq -r '.["mv_input_file.xxx"]' _LAST/outputs.json`)" "xxx" "updated _LAST" +cat << 'EOF' > dir_io.wdl +version development +workflow w { + input { + Directory d + } + call t { + input: + d = d + } + output { + Int dsz = round(size(t.files)) + } +} +task t { + input { + Directory d + } + command <<< + mkdir outdir + find ~{d} -type f | xargs -i{} cp {} outdir/ + >>> + output { + Array[File] files = glob("outdir/*") + } +} +EOF + +mkdir -p indir/subdir +echo alice > indir/alice.txt +echo bob > indir/subdir/bob.txt +$miniwdl run dir_io.wdl d=indir +is "$?" "0" "directory input" +is `jq -r '.["w.dsz"]' _LAST/outputs.json` "10" "use of directory input" + cat << 'EOF' > uri_inputs.json {"my_workflow.files": ["https://google.com/robots.txt", "https://raw.githubusercontent.com/chanzuckerberg/miniwdl/main/tests/alyssa_ben.txt"]} EOF diff --git a/tests/test_4taskrun.py b/tests/test_4taskrun.py index 3004e151..120d3994 100644 --- a/tests/test_4taskrun.py +++ b/tests/test_4taskrun.py @@ -648,6 +648,9 @@ def chk(outfiles): outputs = self._test_task(txt, inp, cfg=cfg) chk(outputs["outfiles"]) + with self.assertRaises(WDL.Error.InputError): + self._test_task(txt, {"files": [os.path.join(self._dir, "a", "x") + "/"]}) + def test_topsort(self): txt = R""" version 1.0 diff --git a/tests/test_7runner.py b/tests/test_7runner.py index b821fd7a..b974c5d5 100644 --- a/tests/test_7runner.py +++ b/tests/test_7runner.py @@ -56,6 +56,86 @@ def _run(self, wdl:str, inputs = None, expected_exception: Exception = None, cfg self.assertIsNone(expected_exception, str(expected_exception) + " not raised") return WDL.values_to_json(outputs) +class TestDirectoryIO(RunnerTestCase): + def test_coercion(self): + assert WDL.Type.Directory().coerces(WDL.Type.String()) + d = WDL.Value.String("foo").coerce(WDL.Type.Directory()) + assert isinstance(d, WDL.Value.Directory) + assert d.value == "foo" + + def test_basic_directory(self): + wdl = R""" + version development + workflow w { + input { + Directory d + } + call t { + input: + d = d + } + output { + Int dsz = round(size(t.files)) + } + } + task t { + input { + Directory d + Boolean touch = false + } + command { + set -euxo pipefail + mkdir outdir + cp "~{d}"/* outdir/ + if [ "~{touch}" == "true" ]; then + touch "~{d}"/foo + fi + >&2 ls -Rl + } + output { + Array[File] files = glob("outdir/*.txt") + } + } + """ + os.makedirs(os.path.join(self._dir, "d")) + with open(os.path.join(self._dir, "d/alice.txt"), mode="w") as outfile: + print("Alice", file=outfile) + with open(os.path.join(self._dir, "d/bob.txt"), mode="w") as outfile: + print("Bob", file=outfile) + outp = self._run(wdl, {"d": os.path.join(self._dir, "d")}) + assert outp["dsz"] == 10 + + with self.assertRaises(WDL.runtime.error.RunFailed): + self._run(wdl, {"d": os.path.join(self._dir, "d"), "t.touch": True}) + + cfg = WDL.runtime.config.Loader(logging.getLogger(self.id()), []) + cfg.override({"file_io": {"copy_input_files": True}}) + outp = self._run(wdl, {"d": os.path.join(self._dir, "d"), "t.touch": True}, cfg=cfg) + assert outp["dsz"] == 10 + + def test_no_outputs(self): + with self.assertRaisesRegex(WDL.Error.ValidationError, "Directory outputs"): + self._run(""" + version development + task t { + command {} + output { + Directory d = "." + } + } + """, {}) + + with self.assertRaisesRegex(WDL.Error.ValidationError, "Directory outputs"): + self._run(""" + version development + workflow w { + Directory d = "." + output { + Directory d2 = d + } + } + """, {}) + class TestDownload(RunnerTestCase): count_wdl: str = R""" version 1.0