Skip to content

Commit

Permalink
Merge cd407df into 7a9fa78
Browse files Browse the repository at this point in the history
  • Loading branch information
mlin committed Mar 27, 2020
2 parents 7a9fa78 + cd407df commit 1b82497
Show file tree
Hide file tree
Showing 12 changed files with 233 additions and 115 deletions.
108 changes: 81 additions & 27 deletions WDL/CLI.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
NOTICE_LEVEL,
install_coloredlogs,
parse_byte_size,
path_really_within,
)
from ._util import StructuredLogMessage as _

Expand Down Expand Up @@ -464,14 +465,14 @@ def runner(
cfg = runtime.config.Loader(logger, filenames=cfg_arg)
cfg_overrides = {
"scheduler": {},
"task_io": {},
"file_io": {},
"task_runtime": {},
"download_cache": {},
}
if max_tasks is not None:
cfg_overrides["call_concurrency"] = max_tasks
if copy_input_files:
cfg_overrides["task_io"]["copy_input_files"] = copy_input_files
cfg_overrides["file_io"]["copy_input_files"] = copy_input_files
if as_me:
cfg_overrides["task_runtime"]["as_user"] = as_me
if runtime_defaults:
Expand All @@ -494,15 +495,45 @@ def runner(
cfg.override(cfg_overrides)
cfg.log_all()

# check root
if not path_really_within((run_dir or os.getcwd()), cfg["file_io"]["root"]):
logger.error(
_(
"working directory or --dir must be within the configured `file_io.root' directory",
dir=(run_dir or os.getcwd()),
root=cfg["file_io"]["root"],
)
)
sys.exit(2)
if (
cfg["download_cache"].get_bool("get") or cfg["download_cache"].get_bool("put")
) and not path_really_within(cfg["download_cache"]["dir"], cfg["file_io"]["root"]):
logger.error(
_(
"configuration error: 'download_cache.dir' must be within the `file_io.root' directory",
dir=cfg["download_cache"]["dir"],
root=cfg["file_io"]["root"],
)
)
sys.exit(2)

# load WDL document
doc = load(uri, path or [], check_quant=check_quant, read_source=read_source)

# parse and validate the provided inputs
def file_found(fn):
return runtime.download.able(cfg, fn) or os.path.isfile(fn)

eff_root = cfg["file_io"]["root"] if not cfg["file_io"].get_bool("copy_input_files") else "/"

target, input_env, input_json = runner_input(
doc, inputs, input_file, empty, task=task, file_found=file_found
doc,
inputs,
input_file,
empty,
task=task,
file_found=file_found,
root=eff_root, # if copy_input_files is set, then input files need not reside under the configured root
)

if json_only:
Expand Down Expand Up @@ -544,7 +575,7 @@ def file_found(fn):
rerun_sh = f"pushd {shellquote(os.getcwd())} && miniwdl {' '.join(shellquote(t) for t in sys.argv[1:])}; popd"

# initialize local Docker Swarm
runtime.task.LocalSwarmContainer.global_init(cfg, logger)
runtime.task.SwarmContainer.global_init(cfg, logger)

# run & log any errors
rundir = None
Expand Down Expand Up @@ -637,7 +668,9 @@ def runner_input_completer(prefix, parsed_args, **kwargs):
return available_input_names


def runner_input(doc, inputs, input_file, empty, task=None, check_required=True, file_found=None):
def runner_input(
doc, inputs, input_file, empty, task=None, check_required=True, file_found=None, root="/"
):
"""
- Determine the target workflow/task
- Check types of supplied inputs
Expand All @@ -664,7 +697,7 @@ def runner_input(doc, inputs, input_file, empty, task=None, check_required=True,
# build up an values env of the provided inputs
available_inputs = target.available_inputs
input_env = runner_input_json_file(
available_inputs, (target.name if isinstance(target, Workflow) else ""), input_file
available_inputs, (target.name if isinstance(target, Workflow) else ""), input_file, root
)

# set explicitly empty arrays
Expand Down Expand Up @@ -704,7 +737,7 @@ def runner_input(doc, inputs, input_file, empty, task=None, check_required=True,
)

# create a Value based on the expected type
v = runner_input_value(s_value, decl.type, file_found)
v = runner_input_value(s_value, decl.type, file_found, root)

# insert value into input_env
try:
Expand Down Expand Up @@ -738,7 +771,7 @@ def runner_input(doc, inputs, input_file, empty, task=None, check_required=True,
)


def runner_input_json_file(available_inputs, namespace, input_file):
def runner_input_json_file(available_inputs, namespace, input_file, root):
"""
Load user-supplied inputs JSON file, if any
"""
Expand All @@ -763,8 +796,14 @@ def runner_input_json_file(available_inputs, namespace, input_file):

def absolutify_files(v: Value.Base) -> Value.Base:
if isinstance(v, Value.File):
if "://" not in v.value and not os.path.isabs(v.value):
v.value = os.path.normpath(os.path.join(os.getcwd(), v.value))
if "://" not in v.value:
if not os.path.isabs(v.value):
v.value = os.path.normpath(os.path.join(os.getcwd(), v.value))
if not path_really_within(v.value, root):
die(
f"all input files must be located within the configured `file_io.root' directory `{root}' unlike `{v.value}'"
)
sys.exit(2)
for ch in v.children:
absolutify_files(ch)
return v
Expand Down Expand Up @@ -815,7 +854,7 @@ def is_constant_expr(expr):
return False


def runner_input_value(s_value, ty, file_found):
def runner_input_value(s_value, ty, file_found, root):
"""
Given an input value from the command line (right-hand side of =) and the
WDL type of the corresponding input decl, create an appropriate Value.
Expand All @@ -826,6 +865,10 @@ def runner_input_value(s_value, ty, file_found):
fn = os.path.expanduser(s_value)
if os.path.isfile(fn):
fn = os.path.abspath(fn)
if not path_really_within(fn, root):
die(
f"all input files must be located within the configured `file_io.root' directory `{root}' unlike `{fn}'"
)
elif not (file_found and file_found(fn)): # maybe URI
die("File not found: " + fn)
return Value.File(fn)
Expand All @@ -843,7 +886,9 @@ def runner_input_value(s_value, ty, file_found):
ty.item_type, (Type.String, Type.File, Type.Int, Type.Float)
):
# just produce a length-1 array, to be combined ex post facto
return Value.Array(ty.item_type, [runner_input_value(s_value, ty.item_type, file_found)])
return Value.Array(
ty.item_type, [runner_input_value(s_value, ty.item_type, file_found, root)]
)
return die(
"No command-line support yet for inputs of type {}; workaround: specify in JSON file with --input".format(
str(ty)
Expand Down Expand Up @@ -940,21 +985,20 @@ def run_self_test(**kwargs):
assert len(outputs["hello_caller.messages"]) == 2
assert outputs["hello_caller.messages"][0].rstrip() == "Hello, Alyssa P. Hacker!"
assert outputs["hello_caller.messages"][1].rstrip() == "Hello, Ben Bitdiddle!"
except SystemExit as exn:
assert getattr(exn, "code") == 0 # because of --debug
except:
atexit.register(
lambda: print(
"* Hint: ensure Docker is installed & running"
+ (
", and user has permission to control it per https://docs.docker.com/install/linux/linux-postinstall/#manage-docker-as-a-non-root-user"
if platform.system() != "Darwin"
else "; and on macOS override the environment variable TMPDIR=/tmp/"
),
file=sys.stderr,
except BaseException as exn:
if not (isinstance(exn, SystemExit) and getattr(exn, "code") == 0):
atexit.register(
lambda: print(
"* Hint: ensure Docker is installed & running"
+ (
", and user has permission to control it per https://docs.docker.com/install/linux/linux-postinstall/#manage-docker-as-a-non-root-user"
if platform.system() != "Darwin"
else "; and on macOS override the environment variable TMPDIR=/tmp/"
),
file=sys.stderr,
)
)
)
raise
raise exn

print("miniwdl run_self_test OK", file=sys.stderr)
if os.geteuid() == 0:
Expand Down Expand Up @@ -1328,6 +1372,16 @@ def scan(x):
)
sys.exit(2)

if not path_really_within(cfg["download_cache"]["dir"], cfg["file_io"]["root"]):
logger.error(
_(
"configuration error: `download_cache.dir' must be within the `file_io.root' directory",
dir=cfg["download_cache"]["dir"],
root=cfg["file_io"]["root"],
)
)
sys.exit(2)

cache = runtime.cache.CallCache(cfg, logger)
disabled = [u for u in uri if not cache.download_path(u)]
if disabled:
Expand All @@ -1340,7 +1394,7 @@ def scan(x):
logger.notice(_("starting downloads", uri=uri))

# initialize local Docker Swarm
runtime.task.LocalSwarmContainer.global_init(cfg, logger)
runtime.task.SwarmContainer.global_init(cfg, logger)

# cheesy trick: provide the list of URIs as File inputs to a dummy workflow, causing the
# runtime to download & cache them
Expand Down
10 changes: 7 additions & 3 deletions WDL/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ def notice(self, message, *args, **kws): # pyre-fixme

logging.Logger.notice = notice

LOGGING_FORMAT = "%(asctime)s,%(msecs)03d %(name)s %(levelname)s %(message)s"
LOGGING_FORMAT = "%(asctime)s.%(msecs)03d %(name)s %(levelname)s %(message)s"
__all__.append("LOGGING_FORMAT")


Expand All @@ -266,9 +266,13 @@ def install_coloredlogs(logger: logging.Logger) -> None:
if sys.stderr.isatty() and "NO_COLOR" not in os.environ:
level_styles = dict(coloredlogs.DEFAULT_LEVEL_STYLES)
level_styles["debug"]["color"] = 242
level_styles["notice"] = {"color": "magenta"}
level_styles["notice"] = {"color": "green", "bold": True}
level_styles["error"]["bold"] = True
level_styles["warning"]["bold"] = True
level_styles["info"] = {}
field_styles = None
field_styles = dict(coloredlogs.DEFAULT_FIELD_STYLES)
field_styles["asctime"] = {"color": "blue"}
field_styles["name"] = {"color": "magenta"}

coloredlogs.install(
level=logger.getEffectiveLevel(),
Expand Down
34 changes: 24 additions & 10 deletions WDL/runtime/config_templates/default.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,32 @@
call_concurrency = 0


[task_io]
[docker_swarm]
# Docker Swarm Mode is the default container backend. It's included with every docker installation,
# but disabled by default. The following option allows miniwdl to automatically initialize the local
# host as a one-node swarm if needed, by performing
# docker swarm init --listen-addr 127.0.0.1 --advertise-addr 127.0.0.1
# This is is typically helpful for single-node use, but should be disabled if a preexisting swarm
# is to be used.
auto_init = true


[file_io]
# During startup, require the run directory and input files to reside somewhere within this root
# directory. This constraint is needed in cluster deployments relying on a shared mount. It can
# also be useful on a single node to prevent inadvertent consumption of a small boot/home volume,
# by confining workflow operations to some spacious scratch/data mount.
root = /
# Populate task working directory with writable copies of the input files, instead of mounting them
# in situ & read-only. Needed if tasks want to write/move/rename input files, but costs time and
# disk space. --copy-input-files
copy_input_files = false
# Each succeeded task run directory has an "output_links/" folder containing (by default) a symbolic
# link to each output file in its original location under work/. If output_hardlinks is set, then
# output_links/ will contain hardlinks instead of symlinks.
# With output_hardlinks, one may simply "rm -rf work/" to clean up any other detritus the task may
# have left behind in the working directory (while keeping the output files). Otherwise, beware the
# potential confusion arising from files with multiple hardlinks!
# Each succeeded run directory has an "output_links/" folder containing (by default) a symbolic
# link to each output file in its original working location. If output_hardlinks is set, then
# output_links/ is populated with hardlinks instead of symlinks.
# With output_hardlinks, one may clean up any garbage left behind in a task's run directory by
# simply "rm -rf ./work/" while the hardlinks retain the output files. Otherwise, beware the
# potential confusion arising from files with multiple hardlinks.
output_hardlinks = false


Expand Down Expand Up @@ -58,9 +73,8 @@ dir = /tmp/miniwdl_download_cache
# Remove URI query strings for cache key/lookup purposes. By default, downloads from URIs with
# query strings are never cached (neither put nor get).
ignore_query = false
# Enable/disable cache based on URI pattern. To work with the cache, a URI must (i) match at least
# one glob pattern in enable_patterns, AND (ii) not match any disable_patterns. (In addition to the
# master put/get settings.)
# To be eligible for caching (in addition to above options), a URI must (i) match at least one glob
# pattern in enable_patterns, AND (ii) not match any disable_patterns.
enable_patterns = ["*"]
disable_patterns = ["*.php", "*.aspx"]

Expand Down

0 comments on commit 1b82497

Please sign in to comment.