Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/v0.9.x' into mlin-compound-map-keys
Browse files Browse the repository at this point in the history
  • Loading branch information
mlin committed Sep 13, 2020
2 parents 0181928 + dfd2245 commit 41e5813
Show file tree
Hide file tree
Showing 6 changed files with 264 additions and 70 deletions.
21 changes: 0 additions & 21 deletions WDL/Tree.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,11 +407,6 @@ def typecheck(
errors.try1(
lambda: decl.typecheck(type_env, stdlib=stdlib, check_quant=check_quant)
)
if _has_directories(decl.type):
# FIXME
raise Error.ValidationError(
decl, "Directory outputs aren't supported in this version of miniwdl"
)
errors.try1(lambda: _check_serializable_map_keys(decl.type, decl.name, decl))

# check for cyclic dependencies among decls
Expand Down Expand Up @@ -1072,11 +1067,6 @@ def typecheck(self, doc: "Document", check_quant: bool) -> None:
)
)
output_type_env = output_type_env2
if _has_directories(output.type):
# FIXME
raise Error.ValidationError(
output, "Directory outputs aren't supported in this version of miniwdl"
)
errors.try1(
lambda: _check_serializable_map_keys(output.type, output.name, output)
)
Expand Down Expand Up @@ -1824,17 +1814,6 @@ def _add_struct_instance_to_type_env(
return ans


def _has_directories(t: Type.Base):
"""
used to check output declarations for Directory types while we don't support them
"""
if isinstance(t, Type.Directory) or next(
(p for p in t.parameters if _has_directories(p)), None
):
return True
return False


def _check_serializable_map_keys(t: Type.Base, name: str, node: SourceNode) -> None:
# For any Map[K,V] in an input or output declaration, K must be coercible to & from String, so
# that it can be de/serialized as JSON.
Expand Down
7 changes: 6 additions & 1 deletion WDL/Value.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,12 @@ def __init__(self, value: str, expr: "Optional[Expr.Base]" = None) -> None:

def coerce(self, desired_type: Optional[Type.Base] = None) -> Base:
""
# TODO: similar coercion logic for Directory? outputs when we support those
if self.value is None:
# as above
if isinstance(desired_type, Type.Directory) and desired_type.optional:
return Null(self.expr)
else:
raise FileNotFoundError()
return super().coerce(desired_type)


Expand Down
7 changes: 5 additions & 2 deletions WDL/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,12 +598,15 @@ def chmod_R_plus(path: str, file_bits: int = 0, dir_bits: int = 0) -> None:

def do1(path1: str, bits: int) -> None:
assert 0 <= bits < 0o10000
if path_really_within(path1, path):
if not os.path.islink(path1) and path_really_within(path1, path):
os.chmod(path1, (os.stat(path1).st_mode & 0o7777) | bits)

def raiser(exc: OSError):
raise exc

if os.path.isdir(path):
do1(path, dir_bits)
for root, subdirs, files in os.walk(path, followlinks=False):
for root, subdirs, files in os.walk(path, onerror=raiser, followlinks=False):
for dn in subdirs:
do1(os.path.join(root, dn), dir_bits)
for fn in files:
Expand Down
93 changes: 63 additions & 30 deletions WDL/runtime/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,23 +546,31 @@ def _eval_task_outputs(
logger: logging.Logger, task: Tree.Task, env: Env.Bindings[Value.Base], container: TaskContainer
) -> Env.Bindings[Value.Base]:

# helper to rewrite Files from in-container paths to host paths
def rewriter(fn: str, output_name: str) -> str:
host_file = container.host_path(fn)
if host_file is None:
# helper to rewrite File/Directory from in-container paths to host paths
def rewriter(v: Union[Value.File, Value.Directory], output_name: str) -> str:
container_path = v.value
if isinstance(v, Value.Directory) and not container_path.endswith("/"):
container_path += "/"
host_path = container.host_path(container_path)
if host_path is None:
logger.warning(
_(
"output file not found in container (error unless declared type is optional)",
name=output_name,
file=fn,
"output path not found in container (error unless declared type is optional)",
output=output_name,
path=container_path,
)
)
elif isinstance(v, Value.Directory):
if host_path.endswith("/"):
host_path = host_path[:-1]
_check_directory(host_path, output_name)
logger.debug(_("output dir", container=container_path, host=host_path))
else:
logger.debug(_("output file", container=fn, host=host_file))
logger.debug(_("output file", container=container_path, host=host_path))
# We may overwrite File.value with None, which is an invalid state, then we'll fix it
# up (or abort) below. This trickery is because we don't, at this point, know whether
# the 'desired' output type is File or File?.
return host_file # pyre-fixme
# the -declared- output type is optional.
return host_path # pyre-fixme

stdlib = OutputStdLib(logger, container)
outputs = Env.Bindings()
Expand All @@ -588,22 +596,41 @@ def rewriter(fn: str, output_name: str) -> str:
# First bind the value as-is in the environment, so that subsequent output expressions will
# "see" the in-container path(s) if they use this binding.
env = env.bind(decl.name, v)
# Rewrite each File.value to either a host path, or None if the file doesn't exist.
v = Value.rewrite_files(v, lambda fn: rewriter(fn, decl.name))
# Rewrite each File/Directory path to a host path, or None if it doesn't exist.
v = Value.rewrite_paths(v, lambda v: rewriter(v, decl.name))
# File.coerce has a special behavior for us so that, if the value is None:
# - produces Value.Null() if the desired type is File?
# - raises FileNotFoundError otherwise.
try:
v = v.coerce(decl.type)
except FileNotFoundError:
exn = OutputError("File not found in task output " + decl.name)
exn = OutputError("File/Directory path not found in task output " + decl.name)
setattr(exn, "job_id", decl.workflow_node_id)
raise exn
outputs = outputs.bind(decl.name, v)

return outputs


def _check_directory(host_path: str, output_name: str) -> None:
"""
traverse output directory to check that all symlinks are relative & resolve inside the dir
"""

def raiser(exc: OSError):
raise exc

for root, subdirs, files in os.walk(host_path, onerror=raiser, followlinks=False):
for fn in files:
fn = os.path.join(root, fn)
if os.path.islink(fn) and (
not os.path.exists(fn)
or os.path.isabs(os.readlink(fn))
or not path_really_within(fn, host_path)
):
raise OutputError(f"Directory in output {output_name} contains unusable symlink")


def link_outputs(
outputs: Env.Bindings[Value.Base], run_dir: str, hardlinks: bool = False
) -> Env.Bindings[Value.Base]:
Expand All @@ -614,23 +641,29 @@ def link_outputs(
outputs env to use these symlinks.
"""

def map_files(v: Value.Base, dn: str) -> Value.Base:
if isinstance(v, Value.File):
if os.path.isfile(v.value):
hardlink = os.path.realpath(v.value)
assert os.path.isfile(hardlink)
newlink = os.path.join(dn, os.path.basename(v.value))
os.makedirs(dn, exist_ok=False)
if not hardlinks and path_really_within(hardlink, os.path.dirname(run_dir)):
def map_paths(v: Value.Base, dn: str) -> Value.Base:
if isinstance(v, (Value.File, Value.Directory)):
if os.path.exists(v.value):
target = os.path.realpath(v.value)
if not hardlinks and path_really_within(target, os.path.dirname(run_dir)):
# make symlink relative
hardlink = os.path.relpath(hardlink, start=os.path.realpath(dn))
(os.link if hardlinks else os.symlink)(hardlink, newlink)
v.value = newlink
target = os.path.relpath(target, start=os.path.realpath(dn))
link = os.path.join(dn, os.path.basename(v.value))
os.makedirs(dn, exist_ok=False)
if hardlinks:
# TODO: what if target is an input from a different filesystem?
if isinstance(v, Value.Directory):
shutil.copytree(target, link, symlinks=True, copy_function=os.link)
else:
os.link(target, link)
else:
os.symlink(target, link)
v.value = link
# recurse into compound values
elif isinstance(v, Value.Array) and v.value:
d = int(math.ceil(math.log10(len(v.value)))) # how many digits needed
for i in range(len(v.value)):
v.value[i] = map_files(v.value[i], os.path.join(dn, str(i).rjust(d, "0")))
v.value[i] = map_paths(v.value[i], os.path.join(dn, str(i).rjust(d, "0")))
elif isinstance(v, Value.Map):
# create a subdirectory for each key, as long as the key names seem to make reasonable
# path components; otherwise, treat the dict as a list of its values
Expand All @@ -646,18 +679,18 @@ def map_files(v: Value.Base, dn: str) -> Value.Base:
for i, b in enumerate(v.value):
v.value[i] = (
b[0],
map_files(
map_paths(
b[1], os.path.join(dn, str(b[0]) if keys_ok else str(i).rjust(d, "0"))
),
)
elif isinstance(v, Value.Pair):
v.value = (
map_files(v.value[0], os.path.join(dn, "left")),
map_files(v.value[1], os.path.join(dn, "right")),
map_paths(v.value[0], os.path.join(dn, "left")),
map_paths(v.value[1], os.path.join(dn, "right")),
)
elif isinstance(v, Value.Struct):
for key in v.value:
v.value[key] = map_files(v.value[key], os.path.join(dn, key))
v.value[key] = map_paths(v.value[key], os.path.join(dn, key))
return v

os.makedirs(os.path.join(run_dir, "out"), exist_ok=False)
Expand All @@ -666,7 +699,7 @@ def map_files(v: Value.Base, dn: str) -> Value.Base:
return outputs.map(
lambda binding: Env.Binding(
binding.name,
map_files(copy.deepcopy(binding.value), os.path.join(run_dir, "out", binding.name)),
map_paths(copy.deepcopy(binding.value), os.path.join(run_dir, "out", binding.name)),
)
)

Expand Down
32 changes: 26 additions & 6 deletions WDL/runtime/task_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,19 +271,39 @@ def host_path(self, container_path: str, inputs_only: bool = False) -> Optional[
+ container_path
)
# relativize the path to the provisioned working directory
container_path = os.path.relpath(
container_relpath = os.path.relpath(
container_path, os.path.join(self.container_dir, "work")
)
if container_path.endswith("/") and not container_relpath.endswith("/"):
container_relpath += "/"
container_path = container_relpath

ans = os.path.join(self.host_work_dir(), container_path)
if os.path.isfile(ans):
if path_really_within(ans, self.host_work_dir()):
return ans
if container_path.endswith("/") and not ans.endswith("/"):
ans += "/"
if not (
(container_path.endswith("/") and os.path.isdir(ans))
or (not container_path.endswith("/") and os.path.isfile(ans))
):
return None
if not path_really_within(ans, self.host_work_dir()):
raise OutputError(
"task outputs attempted to use a file outside its working directory: "
"task outputs attempted to use a path outside its working directory: "
+ container_path
)
return None
if (
ans.endswith("/")
and self.input_path_map
and (
path_really_within(self.host_work_dir(), ans[:-1])
or path_really_within(
ans[:-1], os.path.join(self.host_work_dir(), "_miniwdl_inputs")
)
)
):
# prevent output of an input mount point
raise OutputError("unusable output directory: " + container_path)
return ans

def host_work_dir(self):
return os.path.join(
Expand Down
Loading

0 comments on commit 41e5813

Please sign in to comment.