Skip to content

Commit

Permalink
Merge 8ae6d53 into e14e362
Browse files Browse the repository at this point in the history
  • Loading branch information
mlin committed Nov 7, 2019
2 parents e14e362 + 8ae6d53 commit 8e792e3
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 40 deletions.
33 changes: 21 additions & 12 deletions WDL/CLI.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ def main(args=None):
class PipVersionAction(Action):
def __call__(self, parser, namespace, values, option_string=None):
try:
print(pkg_resources.get_distribution("miniwdl"))
except pkg_resources.DistributionNotFound as exc:
print("miniwdl version unknown ({}: {})".format(type(exc).__name__, exc))
print(f"miniwdl v{pkg_resources.get_distribution('miniwdl').version}")
except pkg_resources.DistributionNotFound:
print("miniwdl version unknown")
print("Cromwell version: " + CROMWELL_VERSION)
sys.exit(0)

Expand Down Expand Up @@ -379,14 +379,15 @@ def runner(
logger = logging.getLogger("miniwdl-run")
install_coloredlogs(logger)

try:
logger.debug(pkg_resources.get_distribution("miniwdl"))
except pkg_resources.DistributionNotFound as exc:
logger.debug("miniwdl version unknown ({}: {})".format(type(exc).__name__, exc))
for pkg in ["docker", "lark-parser", "argcomplete", "pygtail"]:
logger.debug(pkg_resources.get_distribution(pkg))
for pkg in ["miniwdl", "docker", "lark-parser", "argcomplete", "pygtail"]:
try:
logger.debug(pkg_resources.get_distribution(pkg))
except pkg_resources.DistributionNotFound:
logger.debug(f"{pkg} UNKNOWN")
logger.debug("dockerd: " + str(docker.from_env().version()))

rerun_sh = f"pushd {shellquote(os.getcwd())} && miniwdl {' '.join(shellquote(t) for t in sys.argv[1:])}; popd"

ensure_swarm(logger)

try:
Expand All @@ -401,10 +402,13 @@ def runner(
max_workers=max_workers,
)
except Exception as exn:
if isinstance(exn, runtime.task.TaskFailure):
rundir = None
while isinstance(exn, runtime.RunFailed):
logger.error(str(exn))
exn = exn.__cause__ or exn
if isinstance(exn, runtime.task.CommandFailure) and not (
rundir = rundir or getattr(exn, "run_dir")
exn = exn.__cause__
assert exn
if isinstance(exn, runtime.task.CommandFailed) and not (
kwargs["verbose"] or kwargs["debug"]
):
logger.notice("run with --verbose to include task standard error streams in this log")
Expand All @@ -422,13 +426,18 @@ def runner(
)
else:
logger.error(f"{exn.__class__.__name__}{(', ' + str(exn) if str(exn) else '')}")
if rundir:
with open(os.path.join(rundir, "rerun"), "w") as rerunfile:
print(rerun_sh, file=rerunfile)
if kwargs["debug"]:
raise
sys.exit(2)

# link output files
outputs_json = values_to_json(output_env, namespace=target.name)
runner_organize_outputs(target, {"outputs": outputs_json}, rundir)
with open(os.path.join(rundir, "rerun"), "w") as rerunfile:
print(rerun_sh, file=rerunfile)

return outputs_json

Expand Down
15 changes: 8 additions & 7 deletions WDL/runtime/error.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# pyre-strict
from typing import Union
from ..Error import RuntimeError as _RuntimeError
from ..Tree import Task as _Task
from ..Tree import Task as _Task, Workflow as _Workflow


class CommandFailure(_RuntimeError):
class CommandFailed(_RuntimeError):
"""
Failure of the task command
"""
Expand Down Expand Up @@ -40,19 +41,19 @@ class OutputError(_RuntimeError):
pass


class TaskFailure(_RuntimeError):
class RunFailed(_RuntimeError):
"""
"""

task: _Task
exe: Union[_Task, _Workflow]
run_id: str
run_dir: str

def __init__(self, task: _Task, run_id: str, run_dir: str) -> None:
def __init__(self, exe: Union[_Task, _Workflow], run_id: str, run_dir: str) -> None:
super().__init__(
f"task {task.name} ({task.pos.uri} Ln {task.pos.line} Col {task.pos.column}) failed"
f"{'task' if isinstance(exe, _Task) else 'workflow'} {exe.name} ({exe.pos.uri} Ln {exe.pos.line} Col {exe.pos.column}) failed"
)
self.task = task
self.exe = exe
self.run_id = run_id
self.run_dir = run_dir
8 changes: 4 additions & 4 deletions WDL/runtime/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def run(self, logger: logging.Logger, command: str, cpu: int) -> None:
{container_dir} inside the container)
3. Standard output is written to ``{host_dir}/stdout.txt``
4. Standard error is written to ``{host_dir}/stderr.txt`` and logged at VERBOSE level
5. Raises CommandFailure for nonzero exit code, or any other error
5. Raises CommandFailed for nonzero exit code, or any other error
The container is torn down in any case, including SIGTERM/SIGHUP signal which is trapped.
"""
Expand All @@ -148,7 +148,7 @@ def run(self, logger: logging.Logger, command: str, cpu: int) -> None:
self._running = False

if exit_status != 0:
raise CommandFailure(
raise CommandFailed(
exit_status, os.path.join(self.host_dir, "stderr.txt")
) if not terminating() else Terminated()

Expand Down Expand Up @@ -433,14 +433,14 @@ def run_local_task(
return (run_dir, outputs)
except Exception as exn:
logger.debug(traceback.format_exc())
wrapper = TaskFailure(task, run_id, run_dir)
wrapper = RunFailed(task, run_id, run_dir)
msg = str(wrapper)
if hasattr(exn, "job_id"):
msg += " evaluating " + getattr(exn, "job_id")
msg += ": " + exn.__class__.__name__
if str(exn):
msg += ", " + str(exn)
if isinstance(exn, CommandFailure):
if isinstance(exn, CommandFailed):
logger.info("run directory: %s", run_dir)
logger.error(msg)
raise wrapper from exn
Expand Down
20 changes: 14 additions & 6 deletions WDL/runtime/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import traceback
import pickle
import threading
import pkg_resources
from concurrent import futures
from typing import Optional, List, Set, Tuple, NamedTuple, Dict, Union, Iterable, Callable, Any
from .. import Env, Type, Value, Tree, StdLib
Expand All @@ -51,7 +52,7 @@
install_coloredlogs,
TerminationSignalFlag,
)
from .error import TaskFailure, Terminated
from .error import RunFailed, Terminated


class WorkflowOutputs(Tree.WorkflowNode):
Expand Down Expand Up @@ -599,6 +600,12 @@ def run_local_workflow(
fh.setFormatter(logging.Formatter(LOGGING_FORMAT))
logger.addHandler(fh)
install_coloredlogs(logger)
if not _thread_pools:
try:
version = f"v{pkg_resources.get_distribution('miniwdl').version}"
except pkg_resources.DistributionNotFound:
version = "version unknown"
logger.notice("miniwdl %s", version)
logger.notice(
"starting workflow %s (%s Ln %d Col %d) in %s",
workflow.name,
Expand Down Expand Up @@ -667,13 +674,14 @@ def run_local_workflow(

except Exception as exn:
logger.debug(traceback.format_exc())
if isinstance(exn, TaskFailure):
wrapper = RunFailed(workflow, run_id, run_dir)
if isinstance(exn, RunFailed):
logger.error("%s failed", getattr(exn, "run_id"))
else:
msg = ""
msg = str(wrapper)
if hasattr(exn, "job_id"):
msg += getattr(exn, "job_id") + " "
msg += exn.__class__.__name__
msg += " evaluating " + getattr(exn, "job_id")
msg += ": " + exn.__class__.__name__
if str(exn):
msg += ", " + str(exn)
logger.error(msg)
Expand All @@ -685,7 +693,7 @@ def run_local_workflow(
# from top-level workflow, signal abort to anything still running concurrently
# (SIGUSR1 will be picked up by TerminationSignalFlag)
os.kill(os.getpid(), signal.SIGUSR1)
raise
raise wrapper from exn
finally:
if not _thread_pools:
# thread pools are "ours", so wind them down
Expand Down
2 changes: 1 addition & 1 deletion tests/no_docker_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@
if miniwdl_services:
for d in miniwdl_services:
print(d["Spec"]["Labels"]["miniwdl_run_id"])
print("docker swarm lists existing miniwdl-related services, suggesting miniwdl task runtime failed to clean up after itself", file=sys.stderr)
print("Docker swarm lists existing miniwdl-related services, suggesting miniwdl task runtime failed to clean up after itself. Debug this and/or `docker swarm leave --force` to clear.", file=sys.stderr)
sys.exit(1)
3 changes: 2 additions & 1 deletion tests/runner.t
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ source tests/bash-tap/bash-tap-bootstrap
export PYTHONPATH="$SOURCE_DIR:$PYTHONPATH"
miniwdl="python3 -m WDL"

plan tests 47
plan tests 48

$miniwdl run_self_test
is "$?" "0" "run_self_test"
Expand Down Expand Up @@ -131,6 +131,7 @@ f1=$(jq -r '.outputs["echo.t.out_f"][2]' workflowrun/outputs.json)
is "$(basename $f1)" "fox" "workflow product fox"
is "$(ls $f1)" "$f1" "workflow product fox file"
is "$(ls workflowrun/output_links/echo.t.out_f/2)" "fox" "workflow product fox link"
is "$(cat workflowrun/rerun)" "pushd $DN && miniwdl run --dir workflowrun echo.wdl t.s=foo t.f=quick t.a_s=bar t.a_f=brown --empty a_s; popd"

cat << 'EOF' > scatter_echo.wdl
version 1.0
Expand Down
8 changes: 4 additions & 4 deletions tests/test_4taskrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def _test_task(self, wdl:str, inputs = None, expected_exception: Exception = Non
if isinstance(inputs, dict):
inputs = WDL.values_from_json(inputs, doc.tasks[0].available_inputs, doc.tasks[0].required_inputs)
rundir, outputs = WDL.runtime.run_local_task(doc.tasks[0], (inputs or WDL.Env.Bindings()), run_dir=self._dir, copy_input_files=copy_input_files)
except WDL.runtime.TaskFailure as exn:
except WDL.runtime.RunFailed as exn:
if expected_exception:
self.assertIsInstance(exn.__context__, expected_exception)
return exn.__context__
Expand Down Expand Up @@ -410,7 +410,7 @@ def test_command_failure(self):
exit 1
}
}
""", expected_exception=WDL.runtime.CommandFailure)
""", expected_exception=WDL.runtime.CommandFailed)

def test_write_lines(self):
outputs = self._test_task(R"""
Expand Down Expand Up @@ -754,7 +754,7 @@ def test_input_files_rw(self):
outfile.write("Ben\n")

self._test_task(txt, {"files": [os.path.join(self._dir, "alyssa.txt"), os.path.join(self._dir, "ben.txt")]},
expected_exception=WDL.runtime.task.CommandFailure)
expected_exception=WDL.runtime.task.CommandFailed)

outputs = self._test_task(txt, {"files": [os.path.join(self._dir, "alyssa.txt"), os.path.join(self._dir, "ben.txt")]},
copy_input_files=True)
Expand All @@ -772,7 +772,7 @@ def test_input_files_rw(self):
>>>
}
""", {"files": [os.path.join(self._dir, "alyssa.txt"), os.path.join(self._dir, "ben.txt")]},
expected_exception=WDL.runtime.task.CommandFailure)
expected_exception=WDL.runtime.task.CommandFailed)
self.assertTrue(os.path.exists(os.path.join(self._dir, "alyssa.txt")))

def test_optional_file_outputs(self):
Expand Down
2 changes: 1 addition & 1 deletion tests/test_5stdlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def _test_task(self, wdl:str, inputs = None, expected_exception: Exception = Non
if isinstance(inputs, dict):
inputs = WDL.values_from_json(inputs, doc.tasks[0].available_inputs, doc.tasks[0].required_inputs)
rundir, outputs = WDL.runtime.run_local_task(doc.tasks[0], (inputs or WDL.Env.Bindings()), run_dir=self._dir)
except WDL.runtime.task.TaskFailure as exn:
except WDL.runtime.RunFailed as exn:
if expected_exception:
self.assertIsInstance(exn.__context__, expected_exception)
return exn.__context__
Expand Down
10 changes: 6 additions & 4 deletions tests/test_6workflowrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ def _test_workflow(self, wdl:str, inputs = None, expected_exception: Exception =
if isinstance(inputs, dict):
inputs = WDL.values_from_json(inputs, doc.workflow.available_inputs, doc.workflow.required_inputs)
rundir, outputs = WDL.runtime.run_local_workflow(doc.workflow, (inputs or WDL.Env.Bindings()), run_dir=self._dir, _test_pickle=True)
except WDL.runtime.TaskFailure as exn:
except WDL.runtime.RunFailed as exn:
while isinstance(exn, WDL.runtime.RunFailed):
exn = exn.__context__
if expected_exception:
self.assertIsInstance(exn.__context__, expected_exception)
return exn.__context__
raise exn.__context__
self.assertIsInstance(exn, expected_exception)
return exn
raise exn
except WDL.Error.MultipleValidationErrors as multi:
for exn in multi.exceptions:
logging.error("%s: %s", str(exn.pos), str(exn))
Expand Down

0 comments on commit 8e792e3

Please sign in to comment.