Skip to content

Commit

Permalink
Merge 339daf3 into 3596eff
Browse files Browse the repository at this point in the history
  • Loading branch information
mlin committed Nov 8, 2019
2 parents 3596eff + 339daf3 commit 51fbeb6
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 92 deletions.
14 changes: 9 additions & 5 deletions WDL/CLI.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
install_coloredlogs,
ensure_swarm,
)
from ._util import StructuredLogMessage as _

quant_warning = False

Expand Down Expand Up @@ -402,17 +403,20 @@ def runner(
max_workers=max_workers,
)
except Exception as exn:
rundir = None
outer_rundir = None
inner_rundir = None
while isinstance(exn, runtime.RunFailed):
logger.error(str(exn))
rundir = rundir or getattr(exn, "run_dir")
outer_rundir = outer_rundir or getattr(exn, "run_dir")
inner_rundir = getattr(exn, "run_dir", inner_rundir)
exn = exn.__cause__
assert exn
if isinstance(exn, runtime.task.CommandFailed) and not (
kwargs["verbose"] or kwargs["debug"]
):
logger.notice(_("failed run", dir=inner_rundir))
logger.notice(_("standard error", file=getattr(exn, "stderr_file")))
logger.notice("run with --verbose to include task standard error streams in this log")
logger.notice("see task's standard error in %s", getattr(exn, "stderr_file"))
if isinstance(getattr(exn, "pos", None), SourcePosition):
pos = getattr(exn, "pos")
logger.error(
Expand All @@ -426,8 +430,8 @@ def runner(
)
else:
logger.error(f"{exn.__class__.__name__}{(', ' + str(exn) if str(exn) else '')}")
if rundir:
with open(os.path.join(rundir, "rerun"), "w") as rerunfile:
if outer_rundir:
with open(os.path.join(outer_rundir, "rerun"), "w") as rerunfile:
print(rerun_sh, file=rerunfile)
if kwargs["debug"]:
raise
Expand Down
20 changes: 17 additions & 3 deletions WDL/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import coloredlogs
from pygtail import Pygtail
import docker
import yaml

__all__: List[str] = []

Expand Down Expand Up @@ -201,6 +202,19 @@ def provision_run_dir(name: str, run_dir: Optional[str] = None) -> str:
sleep(1e-3)


class StructuredLogMessage:
message: str
kwargs: Dict[str, Any]

# from https://docs.python.org/3.8/howto/logging-cookbook.html#implementing-structured-logging
def __init__(self, _message: str, **kwargs) -> None: # pyre-fixme
self.message = _message
self.kwargs = kwargs

def __str__(self) -> str:
return f"{self.message} :: {yaml.dump(self.kwargs, default_flow_style=True, width=999999).strip()[1:-1]}"


VERBOSE_LEVEL = 15
__all__.append("VERBOSE_LEVEL")
logging.addLevelName(VERBOSE_LEVEL, "VERBOSE")
Expand Down Expand Up @@ -233,7 +247,7 @@ def install_coloredlogs(logger: logging.Logger) -> None:
level_styles = {}
field_styles = {}

if "NO_COLOR" not in os.environ:
if sys.stderr.isatty() and "NO_COLOR" not in os.environ:
level_styles = dict(coloredlogs.DEFAULT_LEVEL_STYLES)
level_styles["debug"]["color"] = 242
level_styles["notice"] = {}
Expand Down Expand Up @@ -309,7 +323,7 @@ def ensure_swarm(logger: logging.Logger) -> None:
break
else:
logger.notice( # pyre-fixme
"waiting for docker swarm to become active; current state = " + state
StructuredLogMessage("waiting for docker swarm to become active", state=state)
)
sleep(2)

Expand Down Expand Up @@ -356,7 +370,7 @@ def handle_signal(sig: int, frame: FrameType) -> None:
global _terminating
if not _terminating:
if sig != signal.SIGUSR1:
logger.critical("received termination signal ({})".format(sig))
logger.critical(StructuredLogMessage("ABORT", signal=sig))
else:
# SIGUSR1 comes from ourselves, as the signal to abort after something else has
# already gone wrong
Expand Down
82 changes: 44 additions & 38 deletions WDL/runtime/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
PygtailLogger,
TerminationSignalFlag,
)
from .._util import StructuredLogMessage as _
from .error import *


Expand Down Expand Up @@ -119,7 +120,7 @@ def copy_input_files(self, logger: logging.Logger) -> None:
self.host_dir, os.path.relpath(container_filename, self.container_dir)
)

logger.info("copy host input file %s -> %s", host_filename, host_copy_filename)
logger.info(_("copy host input file", input=host_filename, copy=host_copy_filename))
os.makedirs(os.path.dirname(host_copy_filename), exist_ok=True)
shutil.copy(host_filename, host_copy_filename)

Expand Down Expand Up @@ -247,7 +248,7 @@ def _run(
mounts.append(
f"{os.path.join(self.host_dir, 'work')}:{os.path.join(self.container_dir, 'work')}:rw"
)
logger.debug("docker mounts: " + str(mounts))
logger.debug(_("docker mounts", mounts=mounts))

if ":" not in self.image_tag:
# seems we need to do this explicitly under some configurations -- issue #232
Expand All @@ -259,7 +260,7 @@ def _run(
try:
# run container as a transient docker swarm service, letting docker handle the resource
# scheduling (waiting until requested # of CPUs are available)
logger.info("scheduling task with image: {}".format(self.image_tag))
logger.info(_("docker image", tag=self.image_tag))
svc = client.services.create(
self.image_tag,
command=[
Expand All @@ -279,7 +280,7 @@ def _run(
labels={"miniwdl_run_id": self.run_id},
container_labels={"miniwdl_run_id": self.run_id},
)
logger.debug("docker service name = {}, id = {}".format(svc.name, svc.short_id))
logger.debug(_("docker service", name=svc.name, id=svc.short_id))

exit_code = None
# stream stderr into log
Expand All @@ -294,7 +295,7 @@ def _run(
raise Terminated() from None
exit_code = self.poll_service(logger, svc)
i += 1
logger.info("container exit code = " + str(exit_code))
logger.info(_("docker exit", code=exit_code))

# retrieve and check container exit status
assert isinstance(exit_code, int)
Expand Down Expand Up @@ -323,14 +324,14 @@ def poll_service(
if tasks:
assert len(tasks) == 1
status = tasks[0]["Status"]
logger.debug("docker task status = " + str(status))
logger.debug(_("docker task", status=status))
state = status["State"]

# log each new state
if self._observed_states is None:
self._observed_states = set()
if state not in self._observed_states:
logger.info("docker task state = " + state)
logger.info(_("docker task", state=state))
self._observed_states.add(state)

# https://docs.docker.com/engine/swarm/how-swarm-mode-works/swarm-task-states/
Expand All @@ -353,7 +354,7 @@ def run_local_task(
run_id: Optional[str] = None,
run_dir: Optional[str] = None,
copy_input_files: bool = False,
logger_prefix: str = "wdl:",
logger_prefix: Optional[List[str]] = None,
max_workers: Optional[int] = None, # unused
) -> Tuple[str, Env.Bindings[Value.Base]]:
"""
Expand All @@ -371,20 +372,22 @@ def run_local_task(

run_id = run_id or task.name
run_dir = provision_run_dir(task.name, run_dir)
logger = logging.getLogger(logger_prefix + "task:" + run_id)
logger = logging.getLogger(".".join((logger_prefix or ["wdl"]) + ["t:" + run_id]))
fh = logging.FileHandler(os.path.join(run_dir, "task.log"))
fh.setFormatter(logging.Formatter(LOGGING_FORMAT))
logger.addHandler(fh)
_util.install_coloredlogs(logger)
logger.notice( # pyre-fixme
"starting task %s (%s Ln %d Col %d) in %s",
task.name,
task.pos.uri,
task.pos.line,
task.pos.column,
run_dir,
_(
"task start",
name=task.name,
source=task.pos.uri,
line=task.pos.line,
column=task.pos.column,
dir=run_dir,
)
)
logger.info("thread %d", threading.get_ident())
logger.info(_("thread", ident=threading.get_ident()))
write_values_json(posix_inputs, os.path.join(run_dir, "inputs.json"))

try:
Expand All @@ -408,15 +411,14 @@ def run_local_task(
assert isinstance(cpu_value, int)
cpu = max(1, min(multiprocessing.cpu_count(), cpu_value))
if cpu != cpu_value:
logger.warning(f"runtime.cpu: {cpu} (adjusted from {cpu_value})")
else:
logger.info(f"runtime.cpu: {cpu}")
logger.warning(_("runtime.cpu", original=cpu_value, adjusted=cpu))
logger.info(_("runtime", cpu=cpu))

# interpolate command
command = _util.strip_leading_whitespace(
task.command.eval(container_env, stdlib=InputStdLib(logger, container)).value
)[1]
logger.debug("command:\n%s", command.rstrip())
logger.debug(_("command", command=command.strip()))

# if needed, copy input files into working directory
if copy_input_files:
Expand All @@ -434,15 +436,12 @@ def run_local_task(
except Exception as exn:
logger.debug(traceback.format_exc())
wrapper = RunFailed(task, run_id, run_dir)
msg = str(wrapper)
if hasattr(exn, "job_id"):
msg += " evaluating " + getattr(exn, "job_id")
msg += ": " + exn.__class__.__name__
info = {"error": exn.__class__.__name__}
if str(exn):
msg += ", " + str(exn)
if isinstance(exn, CommandFailed):
logger.info("run directory: %s", run_dir)
logger.error(msg)
info["message"] = str(exn)
if hasattr(exn, "job_id"):
info["node"] = getattr(exn, "job_id")
logger.error(_(str(wrapper), **info))
raise wrapper from exn


Expand Down Expand Up @@ -476,7 +475,7 @@ def map_files(v: Value.Base) -> Value.Base:
assert isinstance(v, Value.Base)
container_env = container_env.bind(b.name, v)
vj = json.dumps(v.json)
logger.info("input {} -> {}".format(b.name, vj if len(vj) < 4096 else "(large)"))
logger.info(_("input", name=b.name, value=(v.json if len(vj) < 4096 else "(((large)))")))

# collect remaining declarations requiring evaluation.
decls_to_eval = []
Expand Down Expand Up @@ -509,7 +508,7 @@ def map_files(v: Value.Base) -> Value.Base:
else:
assert decl.type.optional
vj = json.dumps(v.json)
logger.info("eval {} -> {}".format(decl.name, vj if len(vj) < 4096 else "(large)"))
logger.info(_("eval", name=decl.name, value=(v.json if len(vj) < 4096 else "(((large)))")))
container_env = container_env.bind(decl.name, v)

return container_env
Expand Down Expand Up @@ -540,12 +539,14 @@ def rewrite_files(v: Value.Base, output_name: str) -> None:
host_file = container.host_file(v.value)
if host_file is None:
logger.warning(
"file not found for output %s: %s (error unless declared type is optional File?)",
output_name,
v.value,
_(
"output file not found in container (error unless declared type is optional)",
name=output_name,
file=v.value,
)
)
else:
logger.debug("container output file %s -> host %s", v.value, host_file)
logger.debug(_("output file", container=v.value, host=host_file))
# We may overwrite File.value with None, which is an invalid state, then we'll fix it
# up (or abort) below. This trickery is because we don't, at this point, know whether
# the 'desired' output type is File or File?.
Expand All @@ -566,7 +567,10 @@ def rewrite_files(v: Value.Base, output_name: str) -> None:
exn2 = Error.EvalError(decl, str(exn))
setattr(exn2, "job_id", decl.workflow_node_id)
raise exn2 from exn
logger.info("output {} -> {}".format(decl.name, json.dumps(v.json)))
vj = json.dumps(v.json)
logger.info(
_("output", name=decl.name, value=(v.json if len(vj) < 4096 else "(((large)))"))
)

# Now, a delicate sequence for postprocessing File outputs (including Files nested within
# compound values)
Expand Down Expand Up @@ -607,14 +611,16 @@ def _devirtualize_filename(self, filename: str) -> str:
ans = self.container.host_file(filename, inputs_only=self.inputs_only)
if ans is None:
raise OutputError("function was passed non-existent file " + filename)
self.logger.debug("read_ %s from host %s", filename, ans)
self.logger.debug(_("read_", container=filename, host=ans))
return ans

def _virtualize_filename(self, filename: str) -> str:
# register new file with container input_file_map
self.container.add_files([filename])
self.logger.debug("write_ host %s", filename)
self.logger.info("wrote %s", self.container.input_file_map[filename])
self.logger.debug(
_("write_", host=filename, container=self.container.input_file_map[filename])
)
self.logger.info(_("wrote", file=self.container.input_file_map[filename]))
return self.container.input_file_map[filename]


Expand Down
Loading

0 comments on commit 51fbeb6

Please sign in to comment.