Skip to content

Commit

Permalink
Merge 4cfad60 into 9d5b8d7
Browse files Browse the repository at this point in the history
  • Loading branch information
mlin committed Mar 1, 2020
2 parents 9d5b8d7 + 4cfad60 commit 32a7961
Show file tree
Hide file tree
Showing 14 changed files with 640 additions and 238 deletions.
128 changes: 74 additions & 54 deletions WDL/CLI.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,10 @@ def __call__(self, parser, namespace, values, option_string=None):
# that they give inconsistent results?
import pkg_resources

for plugin_group in ["miniwdl.plugin.file_download"]:
for plugin in pkg_resources.iter_entry_points(group=plugin_group):
print(f"{plugin_group}\t{plugin}\t{plugin.dist}")
for group in runtime.config.DEFAULT_PLUGINS.keys():
group = f"miniwdl.plugin.{group}"
for plugin in pkg_resources.iter_entry_points(group=group):
print(f"{group}\t{plugin}\t{plugin.dist}")
print("Cromwell " + CROMWELL_VERSION)
sys.exit(0)

Expand Down Expand Up @@ -440,16 +441,6 @@ def runner(
error_json=False,
**kwargs,
):
# load WDL document
doc = load(uri, path or [], check_quant=check_quant, read_source=read_source)

# parse and validate the provided inputs
target, input_env, input_json = runner_input(doc, inputs, input_file, empty, task=task)

if json_only:
print(json.dumps(input_json, indent=2))
sys.exit(0)

# set up logging
level = NOTICE_LEVEL
if kwargs["verbose"]:
Expand All @@ -463,29 +454,6 @@ def runner(
logger = logging.getLogger("miniwdl-run")
install_coloredlogs(logger)

versionlog = {}
for pkg in ["miniwdl", "docker", "lark-parser", "argcomplete", "pygtail"]:
try:
versionlog[pkg] = str(importlib_metadata.version(pkg))
except importlib_metadata.PackageNotFoundError:
versionlog[pkg] = "UNKNOWN"
logger.debug(_("package versions", **versionlog))

envlog = {}
for k in os.environ:
if k.upper().startswith("MINIWDL") or k in [
"LANG",
"SHELL",
"USER",
"HOME",
"PWD",
"TMPDIR",
]:
envlog[k] = os.environ[k]
logger.debug(_("environment", **envlog))

rerun_sh = f"pushd {shellquote(os.getcwd())} && miniwdl {' '.join(shellquote(t) for t in sys.argv[1:])}; popd"

# load configuration & apply command-line overrides
cfg_arg = None
if cfg:
Expand Down Expand Up @@ -524,6 +492,55 @@ def runner(
cfg.override(cfg_overrides)
cfg.log_all()

# load WDL document
doc = load(uri, path or [], check_quant=check_quant, read_source=read_source)

# parse and validate the provided inputs
def file_found(fn):
return runtime.download.able(cfg, fn) or os.path.isfile(fn)

target, input_env, input_json = runner_input(
doc, inputs, input_file, empty, task=task, file_found=file_found
)

if json_only:
print(json.dumps(input_json, indent=2))
sys.exit(0)

# debug logging
versionlog = {}
for pkg in ["miniwdl", "docker", "lark-parser", "argcomplete", "pygtail"]:
try:
versionlog[pkg] = str(importlib_metadata.version(pkg))
except importlib_metadata.PackageNotFoundError:
versionlog[pkg] = "UNKNOWN"
logger.debug(_("package versions", **versionlog))

envlog = {}
for k in os.environ:
if k.upper().startswith("MINIWDL") or k in [
"LANG",
"SHELL",
"USER",
"HOME",
"PWD",
"TMPDIR",
]:
envlog[k] = os.environ[k]
logger.debug(_("environment", **envlog))

enabled_plugins = []
disabled_plugins = []
for group in runtime.config.DEFAULT_PLUGINS.keys():
for enabled, plugin in runtime.config.load_all_plugins(cfg, group):
(enabled_plugins if enabled else disabled_plugins).append(
f"{plugin.name} = {plugin.value}"
)
if enabled_plugins or disabled_plugins:
logger.debug(_("plugin configuration", enabled=enabled_plugins, disabled=disabled_plugins))

rerun_sh = f"pushd {shellquote(os.getcwd())} && miniwdl {' '.join(shellquote(t) for t in sys.argv[1:])}; popd"

# initialize local Docker Swarm
runtime.task.LocalSwarmContainer.global_init(cfg, logger)

Expand All @@ -545,7 +562,7 @@ def runner(
exn_pos = getattr(exn, "pos", None)
if isinstance(exn_pos, SourcePosition):
pos = exn_pos
msg = str(msg)
msg = str(exn)
exn = exn.__cause__
if isinstance(exn, runtime.CommandFailed):
exit_status = (lambda v: v if v else exit_status)(getattr(exn, "exit_status", 0))
Expand Down Expand Up @@ -586,7 +603,7 @@ def runner_input_completer(prefix, parsed_args, **kwargs):
# load document. in the completer setting, we need to substitute the home directory
# and environment variables
uri = os.path.expandvars(os.path.expanduser(parsed_args.uri))
if not (runtime.download.able(uri) or os.path.exists(uri)):
if not (uri.startswith("http:") or uri.startswith("https:") or os.path.exists(uri)):
argcomplete.warn("file not found: " + uri)
return []
try:
Expand Down Expand Up @@ -624,7 +641,7 @@ def runner_input_completer(prefix, parsed_args, **kwargs):
return available_input_names


def runner_input(doc, inputs, input_file, empty, task=None, check_required=True):
def runner_input(doc, inputs, input_file, empty, task=None, check_required=True, file_found=None):
"""
- Determine the target workflow/task
- Check types of supplied inputs
Expand Down Expand Up @@ -691,7 +708,7 @@ def runner_input(doc, inputs, input_file, empty, task=None, check_required=True)
)

# create a Value based on the expected type
v = runner_input_value(s_value, decl.type)
v = runner_input_value(s_value, decl.type, file_found)

# insert value into input_env
try:
Expand Down Expand Up @@ -802,20 +819,20 @@ def is_constant_expr(expr):
return False


def runner_input_value(s_value, ty):
def runner_input_value(s_value, ty, file_found):
"""
Given an input value from the command line (right-hand side of =) and the
WDL type of the corresponding input decl, create an appropriate Value.
"""
if isinstance(ty, Type.String):
return Value.String(s_value)
if isinstance(ty, Type.File):
downloadable = runtime.download.able(s_value)
if not (downloadable or os.path.isfile(s_value)):
die("File not found: " + s_value)
return Value.File(
os.path.abspath(os.path.expanduser(s_value)) if not downloadable else s_value
)
fn = os.path.expanduser(s_value)
if os.path.isfile(fn):
fn = os.path.abspath(fn)
elif not (file_found and file_found(fn)): # maybe URI
die("File not found: " + fn)
return Value.File(fn)
if isinstance(ty, Type.Boolean):
if s_value == "true":
return Value.Boolean(True)
Expand All @@ -830,7 +847,7 @@ def runner_input_value(s_value, ty):
ty.item_type, (Type.String, Type.File, Type.Int, Type.Float)
):
# just produce a length-1 array, to be combined ex post facto
return Value.Array(ty.item_type, [runner_input_value(s_value, ty.item_type)])
return Value.Array(ty.item_type, [runner_input_value(s_value, ty.item_type, file_found)])
return die(
"No command-line support yet for inputs of type {}; workaround: specify in JSON file with --input".format(
str(ty)
Expand Down Expand Up @@ -1243,16 +1260,23 @@ def localize(
doc = load(wdlfile, path or [], check_quant=check_quant, read_source=read_source)

# parse the provided input JSON
logging.basicConfig(level=NOTICE_LEVEL)
logger = logging.getLogger("miniwdl-localize")
cfg = runtime.config.Loader(logger)

def file_found(fn):
return runtime.download.able(cfg, fn) or os.path.isfile(fn)

target, input_env, input_json = runner_input(
doc, [], infile, [], task=task, check_required=False
doc, [], infile, [], task=task, check_required=False, file_found=file_found
)

# read input JSON
name = name or os.path.basename(infile).split(".")[0]

# scan for Files that appear to be downloadable URIs
def scan(x):
if isinstance(x, Value.File) and runtime.download.able(x.value):
if isinstance(x, Value.File) and runtime.download.able(cfg, x.value):
yield x.value
for y in x.children:
yield from scan(y)
Expand All @@ -1262,10 +1286,6 @@ def scan(x):
uris |= set(scan(b.value))

if uris:
logging.basicConfig(level=NOTICE_LEVEL)
# initialize Docker
logger = logging.getLogger("miniwdl-localize")
cfg = runtime.config.Loader(logger)
# initialize local Docker Swarm
runtime.task.LocalSwarmContainer.global_init(cfg, logger)

Expand Down
69 changes: 69 additions & 0 deletions WDL/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
Generic,
Optional,
Callable,
Generator,
Any,
)
from types import FrameType
Expand Down Expand Up @@ -518,3 +519,71 @@ def next(self) -> int:
with self._lock:
self._value += 1
return self._value


@export
@contextmanager
def compose_coroutines( # pyre-fixme
generators: List[Callable[[Any], Generator[Any, Any, None]]], x: Any # pyre-fixme
) -> Iterator[Generator[Any, Any, None]]:
"""
Coroutine (generator) which composes several other coroutines to run in lockstep for one or
more "rounds." On each round, caller sends a value, which is sent to the first coroutine; the
value it yields is sent to the second coroutine; and so on until finally the value yielded by
the last coroutine is yielded back to the caller. Exceptions propagate in the same way, so a
coroutine can catch and manipulate (but not suppress) an exception raised by the caller or by
one of the other coroutines.
"""

def _impl() -> Generator[Any, Any, None]: # pyre-fixme
# start the coroutines by invoking each generator and taking the first value it yields
nonlocal x
cors = []
try:
for gen in generators:
cor = gen(x)
x = next(cor)
cors.append(cor)
while True: # GeneratorExit will break
# yield to caller and get updated value back
try:
x = yield x
except Exception as exn:
for cor in cors:
try:
cor.throw(exn)
except Exception as exn2:
exn = exn2
raise exn
# pass value through coroutines
exn = None
for cor in cors:
try:
if not exn:
x = cor.send(x)
else:
cor.throw(exn)
except Exception as exn2:
exn = exn2
if exn:
raise exn
finally:
close_exn = None
for cor in cors:
try:
cor.close()
except Exception as exn2:
close_exn = close_exn or exn2
if close_exn:
raise close_exn

# this outer contextmanager is for closing the coroutines promptly and propagating any caller
# exceptions back through them. see: https://stackoverflow.com/a/58854646
chain = _impl()
try:
yield chain
except Exception as exn:
chain.throw(exn) # pyre-ignore
raise
finally:
chain.close()
35 changes: 34 additions & 1 deletion WDL/runtime/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import configparser
import logging
import json
from typing import Optional, List, Dict, Any, Callable, TypeVar, Set, Tuple
import importlib_metadata
from fnmatch import fnmatchcase
from typing import Optional, List, Dict, Any, Callable, TypeVar, Set, Tuple, Iterable
from xdg import XDG_CONFIG_DIRS, XDG_CONFIG_HOME
from .._util import StructuredLogMessage as _

Expand Down Expand Up @@ -288,3 +290,34 @@ def _parse_list(v: str) -> List[Any]:
ans = json.loads(v)
assert isinstance(ans, list)
return ans


DEFAULT_PLUGINS = {
"file_download": [
importlib_metadata.EntryPoint(
group="miniwdl.plugin.file_download",
name="gs",
value="WDL.runtime.download:gsutil_downloader",
)
],
"task": [],
}


def load_all_plugins(cfg: Loader, group: str) -> Iterable[Tuple[bool, Any]]:
assert group in DEFAULT_PLUGINS.keys(), group
enable_patterns = cfg["plugins"].get_list("enable_patterns")
disable_patterns = cfg["plugins"].get_list("disable_patterns")
for plugin in importlib_metadata.entry_points().get(
f"miniwdl.plugin.{group}", DEFAULT_PLUGINS[group]
):
enabled = next(
(pat for pat in enable_patterns if fnmatchcase(plugin.value, pat)), False
) and not next((pat for pat in disable_patterns if fnmatchcase(plugin.value, pat)), False)
yield (enabled, plugin)


def load_plugins(cfg: Loader, group: str) -> Iterable[Tuple[str, Callable[..., Any]]]:
yield from (
(plugin.name, plugin.load()) for enabled, plugin in load_all_plugins(cfg, group) if enabled
)
9 changes: 9 additions & 0 deletions WDL/runtime/config_templates/default.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,12 @@ disregard_query = false
allow_patterns = ["*"]
# Do not use the cache with URIs matching these glob patterns.
deny_patterns = ["*/cgi-bin/*"]


[plugins]
# Control which plugins are used. Plugins are installed using the Python entry points convention,
# https://packaging.python.org/specifications/entry-points/
# Furthermore for a plugin to be used, its "object reference" must (i) match at least one glob
# pattern in enable_patterns, AND (ii) not match any disable_patterns.
enable_patterns = ["*"]
disable_patterns = ["miniwdl_task_omnibus_example:*"]
Loading

0 comments on commit 32a7961

Please sign in to comment.