Skip to content

Commit

Permalink
tidy internal data structures for File/Directory
Browse files Browse the repository at this point in the history
  • Loading branch information
mlin committed Sep 5, 2020
1 parent d2e4327 commit 444d0d9
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 91 deletions.
15 changes: 13 additions & 2 deletions WDL/Value.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,11 @@ def coerce(self, desired_type: Optional[Type.Base] = None) -> Base:
class String(Base):
"""``value`` has Python type ``str``"""

def __init__(self, value: str, expr: "Optional[Expr.Base]" = None) -> None:
super().__init__(Type.String(), value, expr)
def __init__(
self, value: str, expr: "Optional[Expr.Base]" = None, subtype: Optional[Type.Base] = None
) -> None:
subtype = subtype or Type.String()
super().__init__(subtype, value, expr)

def coerce(self, desired_type: Optional[Type.Base] = None) -> Base:
""
Expand All @@ -169,6 +172,11 @@ def coerce(self, desired_type: Optional[Type.Base] = None) -> Base:
class File(String):
"""``value`` has Python type ``str``"""

def __init__(self, value: str, expr: "Optional[Expr.Base]" = None) -> None:
super().__init__(value, expr=expr, subtype=Type.File())
if value != value.rstrip("/"):
raise Error.InputError("WDL.Value.File invalid path: " + value)

def coerce(self, desired_type: Optional[Type.Base] = None) -> Base:
""
if self.value is None:
Expand All @@ -184,6 +192,9 @@ def coerce(self, desired_type: Optional[Type.Base] = None) -> Base:
class Directory(String):
"""``value`` has Python type ``str``"""

def __init__(self, value: str, expr: "Optional[Expr.Base]" = None) -> None:
super().__init__(value, expr=expr, subtype=Type.Directory())

def coerce(self, desired_type: Optional[Type.Base] = None) -> Base:
""
# TODO: similar coercion logic for Directory? outputs when we support those
Expand Down
39 changes: 24 additions & 15 deletions WDL/runtime/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,12 +314,15 @@ def _eval_task_inputs(
container: TaskContainer,
) -> Env.Bindings[Value.Base]:

# Map all the provided input Files to in-container paths
container.add_files(_filenames(posix_inputs))
# Map all the provided input File & Directory paths to in-container paths
container.add_paths(_fspaths(posix_inputs))

# copy posix_inputs with all Files mapped to their in-container paths
# copy posix_inputs with all File & Directory values mapped to their in-container paths
def map_paths(fn: Union[Value.File, Value.Directory]) -> str:
return container.input_file_map[fn.value]
p = fn.value
if isinstance(fn, Value.Directory):
p += "/"
return container.input_path_map[p]

container_inputs = Value.rewrite_env_paths(posix_inputs, map_paths)

Expand All @@ -346,7 +349,7 @@ def map_paths(fn: Union[Value.File, Value.Directory]) -> str:
assert len(decls_by_id) == len(decls_to_eval)

# evaluate each declaration in that order
# note: the write_* functions call container.add_files as a side-effect
# note: the write_* functions call container.add_paths as a side-effect
stdlib = InputStdLib(logger, container)
for decl in decls_to_eval:
assert isinstance(decl, Tree.Decl)
Expand All @@ -370,13 +373,19 @@ def map_paths(fn: Union[Value.File, Value.Directory]) -> str:
return container_env


def _filenames(env: Env.Bindings[Value.Base]) -> Set[str]:
"Get the filenames of all File values in the environment"
def _fspaths(env: Env.Bindings[Value.Base]) -> Set[str]:
"""
Get the unique paths of all File & Directory values in the environment. Directory paths will
have a trailing '/'.
"""
ans = set()

def collector(v: Value.Base) -> None:
if isinstance(v, (Value.File, Value.Directory)):
if isinstance(v, Value.File):
assert not v.value.endswith("/")
ans.add(v.value)
elif isinstance(v, Value.Directory):
ans.add(v.value + ("/" if not v.value.endswith("/") else ""))
for ch in v.children:
collector(ch)

Expand Down Expand Up @@ -543,7 +552,7 @@ def _eval_task_outputs(

# helper to rewrite Files from in-container paths to host paths
def rewriter(fn: str, output_name: str) -> str:
host_file = container.host_file(fn)
host_file = container.host_path(fn)
if host_file is None:
logger.warning(
_(
Expand Down Expand Up @@ -694,20 +703,20 @@ def __init__(self, logger: logging.Logger, container: TaskContainer, inputs_only

def _devirtualize_filename(self, filename: str) -> str:
# check allowability of reading this file, & map from in-container to host
ans = self.container.host_file(filename, inputs_only=self.inputs_only)
ans = self.container.host_path(filename, inputs_only=self.inputs_only)
if ans is None:
raise OutputError("function was passed non-existent file " + filename)
self.logger.debug(_("read_", container=filename, host=ans))
return ans

def _virtualize_filename(self, filename: str) -> str:
# register new file with container input_file_map
self.container.add_files([filename])
# register new file with container input_path_map
self.container.add_paths([filename])
self.logger.debug(
_("write_", host=filename, container=self.container.input_file_map[filename])
_("write_", host=filename, container=self.container.input_path_map[filename])
)
self.logger.info(_("wrote", file=self.container.input_file_map[filename]))
return self.container.input_file_map[filename]
self.logger.info(_("wrote", file=self.container.input_path_map[filename]))
return self.container.input_path_map[filename]


class InputStdLib(_StdLib):
Expand Down
148 changes: 79 additions & 69 deletions WDL/runtime/task_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,16 @@ def detect_resource_limits(cls, cfg: config.Loader, logger: logging.Logger) -> D
command's working directory will be ``{container_dir}/work/``.
"""

input_file_map: Dict[str, str]
input_path_map: Dict[str, str]
"""
:type: Dict[str,str]
A mapping of host input file paths to in-container mounted paths,
maintained by ``add_files``.
A mapping of host input file/directory paths to in-container mounted paths, maintained by
``add_paths``. Directory paths are distinguished by trailing slashes on both keys and values;
the slashes often should be trimmed for use elsewhere.
"""

input_file_map_rev: Dict[str, str]
input_path_map_rev: Dict[str, str]

runtime_values: Dict[str, Any]
"""
Expand All @@ -101,67 +102,72 @@ def __init__(self, cfg: config.Loader, run_id: str, host_dir: str) -> None:
self.run_id = run_id
self.host_dir = host_dir
self.container_dir = "/mnt/miniwdl_task_container"
self.input_file_map = {}
self.input_file_map_rev = {}
self.input_path_map = {}
self.input_path_map_rev = {}
self.stderr_callback = None
self._running = False
self.runtime_values = {}
os.makedirs(os.path.join(self.host_dir, "work"))

def add_files(self, host_files: Iterable[str]) -> None:
def add_paths(self, host_paths: Iterable[str]) -> None:
"""
Use before running the container to add a list of host files to mount
inside the container as inputs. The host-to-container path mapping is
maintained in ``input_file_map``.
Use before running the container to add a list of host paths to mount inside the container
as inputs. Directory paths should have a trailing slash. The host-to-container path mapping
is maintained in ``input_path_map``.
Although ``add_files`` can be used multiple times, files should be
added together where possible, as this allows heuristics for dealing
with any name collisions among them.
Although ``add_paths`` can be used multiple times, paths should be added together where
possible, as this allows heuristics for dealing with any name collisions among them.
"""
assert not self._running

# partition the files by host directory
host_files_by_dir = {}
for host_file in host_files:
if host_file not in self.input_file_map:
if not os.path.exists(host_file):
raise Error.InputError("input path not found: " + host_file)
host_files_by_dir.setdefault(os.path.dirname(host_file), set()).add(host_file)
host_paths_by_dir = {}
for host_path in host_paths:
host_path_strip = host_path.rstrip("/")
if host_path not in self.input_path_map and host_path_strip not in self.input_path_map:
if not os.path.exists(host_path_strip):
raise Error.InputError("input path not found: " + host_path)
host_paths_by_dir.setdefault(os.path.dirname(host_path_strip), set()).add(host_path)

# for each such partition of files
# - if there are no basename collisions under input subdirectory 0, then mount them there.
# - otherwise, mount them in a fresh subdirectory
for files in host_files_by_dir.values():
for paths in host_paths_by_dir.values():
based = os.path.join(self.container_dir, "work/_miniwdl_inputs")
subd = "0"
for host_file in files:
container_file = os.path.join(based, subd, os.path.basename(host_file))
if container_file in self.input_file_map_rev:
subd = str(len(self.input_file_map) + 1)
for host_file in files:
container_file = os.path.join(based, subd, os.path.basename(host_file))
assert container_file not in self.input_file_map_rev
self.input_file_map[host_file] = container_file
self.input_file_map_rev[container_file] = host_file
for host_path in paths:
container_path = os.path.join(based, subd, os.path.basename(host_path.rstrip("/")))
if host_path.endswith("/"):
container_path += "/"
if container_path in self.input_path_map_rev:
assert subd == "0"
subd = str(len(self.input_path_map) + 1)
for host_path in paths:
container_path = os.path.join(based, subd, os.path.basename(host_path.rstrip("/")))
if host_path.endswith("/"):
container_path += "/"
assert container_path not in self.input_path_map_rev
self.input_path_map[host_path] = container_path
self.input_path_map_rev[container_path] = host_path

def copy_input_files(self, logger: logging.Logger) -> None:
# After add_files has been used as needed, copy the input files from their original
# After add_paths has been used as needed, copy the input files from their original
# locations to the appropriate subdirectories of the container working directory. This may
# not be necessary e.g. if the container backend supports bind-mounting the input
# files from their original host paths.
# called once per task run (attempt)
for host_filename, container_filename in self.input_file_map.items():
assert container_filename.startswith(self.container_dir)
host_copy_filename = os.path.join(
self.host_dir, os.path.relpath(container_filename, self.container_dir)
for host_path, container_path in self.input_path_map.items():
assert container_path.startswith(self.container_dir)
host_copy_path = os.path.join(
self.host_dir, os.path.relpath(container_path.rstrip("/"), self.container_dir)
)

logger.info(_("copy host input file", input=host_filename, copy=host_copy_filename))
os.makedirs(os.path.dirname(host_copy_filename), exist_ok=True)
if os.path.isdir(host_filename):
shutil.copytree(host_filename, host_copy_filename, symlinks=False)
logger.info(_("copy host input file", input=host_path, copy=host_copy_path))
os.makedirs(os.path.dirname(host_copy_path), exist_ok=True)
if host_path.endswith("/"):
shutil.copytree(host_path.rstrip("/"), host_copy_path, symlinks=False)
else:
shutil.copy(host_filename, host_copy_filename)
shutil.copy(host_path, host_copy_path)

def run(self, logger: logging.Logger, command: str) -> None:
"""
Expand Down Expand Up @@ -223,43 +229,42 @@ def reset(self, logger: logging.Logger, retries: int, delete_work: bool = False)
)
os.makedirs(os.path.join(self.host_dir, "work"))

def host_file(self, container_file: str, inputs_only: bool = False) -> Optional[str]:
def host_path(self, container_path: str, inputs_only: bool = False) -> Optional[str]:
"""
Map an output file's in-container path under ``container_dir`` to a host path under
``host_dir``. Return None if the designated file does not exist.
Map the in-container path of an output File/Directory to a host path under ``host_dir``.
Directory paths should be given a trailing "/". Return None if the path does not exist.
SECURITY: except for input files, this method must only return host paths under
``host_dir`` and prevent any reference to other host files (e.g. /etc/passwd), including
via sneaky symlinks
SECURITY: except for inputs, this method must only return host paths under ``host_dir``
and prevent any reference to other host files (e.g. /etc/passwd), including via symlinks.
"""
if os.path.isabs(container_file):
if os.path.isabs(container_path):
# handle output of std{out,err}.txt
if container_file in [
if container_path in [
os.path.join(self.container_dir, pipe_file)
for pipe_file in ["stdout.txt", "stderr.txt"]
]:
return os.path.join(self.host_dir, os.path.basename(container_file))
return os.path.join(self.host_dir, os.path.basename(container_path))
# handle output of an input file
if container_file in self.input_file_map_rev:
return self.input_file_map_rev[container_file]
if container_path in self.input_path_map_rev:
return self.input_path_map_rev[container_path]
if inputs_only:
raise Error.InputError(
"task inputs attempted to use a non-input or non-existent file "
+ container_file
"task inputs attempted to use a non-input or non-existent path "
+ container_path
)
# relativize the path to the provisioned working directory
container_file = os.path.relpath(
container_file, os.path.join(self.container_dir, "work")
container_path = os.path.relpath(
container_path, os.path.join(self.container_dir, "work")
)

host_workdir = os.path.join(self.host_dir, "work")
ans = os.path.join(host_workdir, container_file)
ans = os.path.join(host_workdir, container_path)
if os.path.isfile(ans):
if path_really_within(ans, host_workdir):
return ans
raise OutputError(
"task outputs attempted to use a file outside its working directory: "
+ container_file
+ container_path
)
return None

Expand Down Expand Up @@ -534,29 +539,30 @@ def _run(self, logger: logging.Logger, terminating: Callable[[], bool], command:
logger.exception("failed to close docker-py client")

def prepare_mounts(self, logger: logging.Logger) -> List[docker.types.Mount]:
def touch_mount_point(container_file: str, is_dir: bool = False) -> None:
def touch_mount_point(container_path: str) -> None:
# touching each mount point ensures they'll be owned by invoking user:group
assert container_file.startswith(self.container_dir + "/")
host_file = os.path.join(
self.host_dir, os.path.relpath(container_file, self.container_dir)
assert container_path.startswith(self.container_dir + "/")
host_path = os.path.join(
self.host_dir, os.path.relpath(container_path.rstrip("/"), self.container_dir)
)
assert host_file.startswith(self.host_dir + "/")
if is_dir:
os.makedirs(host_file, exist_ok=True)
assert host_path.startswith(self.host_dir + "/")
if container_path.endswith("/"):
os.makedirs(host_path, exist_ok=True)
else:
os.makedirs(os.path.dirname(host_file), exist_ok=True)
with open(host_file, "x") as _:
os.makedirs(os.path.dirname(host_path), exist_ok=True)
with open(host_path, "x") as _:
pass

def escape(s):
# docker processes {{ interpolations }}
return s.replace("{{", '{{"{{"}}')

mounts = []
# mount input files and command
# mount input files/directories and command
if self._bind_input_files:
perm_warn = True
for host_path, container_path in self.input_file_map.items():
for host_path, container_path in self.input_path_map.items():
host_path = host_path.rstrip("/")
st = os.stat(host_path)
if perm_warn and not (
(st.st_mode & stat.S_IROTH)
Expand All @@ -571,10 +577,14 @@ def escape(s):
)
)
perm_warn = False
touch_mount_point(container_path, is_dir=stat.S_ISDIR(st.st_mode))
assert (not container_path.endswith("/")) or stat.S_ISDIR(st.st_mode)
touch_mount_point(container_path)
mounts.append(
docker.types.Mount(
escape(container_path), escape(host_path), type="bind", read_only=True
escape(container_path.rstrip("/")),
escape(host_path),
type="bind",
read_only=True,
)
)
mounts.append(
Expand Down

0 comments on commit 444d0d9

Please sign in to comment.