Skip to content

Commit

Permalink
adjust task stderr logging
Browse files Browse the repository at this point in the history
  • Loading branch information
mlin committed Apr 25, 2020
1 parent f3a8abb commit e870cb6
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 14 deletions.
21 changes: 13 additions & 8 deletions WDL/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,31 +419,36 @@ def configure_logger(
@export
@contextmanager
def PygtailLogger(
logger: logging.Logger, filename: str, prefix: str = "2| "
logger: logging.Logger, filename: str, callback: Optional[Callable[[str], None]] = None
) -> Iterator[Callable[[], None]]:
"""
Helper for streaming task stderr into logger using pygtail. Context manager yielding a function
which reads the latest lines from the file and writes them into logger at verbose level. This
function also runs automatically on context exit.
Truncates lines at 4KB in case writer goes haywire.
Stops if it sees a line greater than 4KB, in case writer goes haywire.
"""
pygtail = None
if logger.isEnabledFor(VERBOSE_LEVEL):
pygtail = Pygtail(filename, full_lines=True)
logger2 = logger.getChild("stderr")

def default_callback(line: str) -> None:
assert len(line) <= 4096, "line > 4KB"
logger2.verbose(line.rstrip()) # pyre-fixme

callback = callback or default_callback

def poll() -> None:
nonlocal pygtail
if pygtail:
try:
for line in pygtail:
logger.verbose((prefix + line.rstrip())[:4096]) # pyre-ignore
except:
callback(line)
except Exception as exn:
# cf. https://github.com/bgreenlee/pygtail/issues/48
logger.verbose( # pyre-ignore
"incomplete log stream due to the following exception; see %s",
filename,
exc_info=sys.exc_info(),
logger.error(
StructuredLogMessage("incomplete log stream", filename=filename, error=str(exn))
)
pygtail = None

Expand Down
14 changes: 13 additions & 1 deletion WDL/runtime/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,13 @@ def detect_resource_limits(cls, cfg: config.Loader, logger: logging.Logger) -> D

input_file_map_rev: Dict[str, str]

stderr_callback: Optional[Callable[[str], None]]
"""
A function called line-by-line for the task's standard error stream, if verbose logging is
enabled. If provided it overrides the default standard error logging behavior (writing the line
to the 'stderr' child of the task logger)
"""

_running: bool

def __init__(self, cfg: config.Loader, run_id: str, host_dir: str) -> None:
Expand All @@ -109,6 +116,7 @@ def __init__(self, cfg: config.Loader, run_id: str, host_dir: str) -> None:
self.container_dir = "/mnt/miniwdl_task_container"
self.input_file_map = {}
self.input_file_map_rev = {}
self.stderr_callback = None
self._running = False
os.makedirs(os.path.join(self.host_dir, "work"))

Expand Down Expand Up @@ -480,7 +488,11 @@ def _run(
# stream stderr into log
with contextlib.ExitStack() as cleanup:
poll_stderr = cleanup.enter_context(
PygtailLogger(logger, os.path.join(self.host_dir, "stderr.txt"))
PygtailLogger(
logger,
os.path.join(self.host_dir, "stderr.txt"),
callback=self.stderr_callback,
)
)

# poll for container exit
Expand Down
10 changes: 5 additions & 5 deletions tests/test_4taskrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,10 @@ def test_logging_std_err(self, capture):
}
""")

std_error_msgs = [record for record in capture.records if str(record.msg).startswith("2|")]
std_error_msgs = [record for record in capture.records if str(record.name).endswith("stderr")]

self.assertEqual(std_error_msgs.pop(0).msg, "2| Start logging")
self.assertEqual(std_error_msgs.pop().msg, "2| End logging")
self.assertEqual(std_error_msgs.pop(0).msg, "Start logging")
self.assertEqual(std_error_msgs.pop().msg, "End logging")
for record in std_error_msgs:
line_written = int(record.msg.split('=')[1])
self.assertGreater(record.created, line_written)
Expand Down Expand Up @@ -181,10 +181,10 @@ def test_logging_std_err_captures_full_line(self, capture):
}
""")
std_error_msgs = [record for record in capture.records if str(record.msg).startswith("2|")]
std_error_msgs = [record for record in capture.records if str(record.name).endswith("stderr")]

self.assertEqual(len(std_error_msgs), 6)
self.assertEqual(std_error_msgs[0].msg, "2| Part onePart two")
self.assertEqual(std_error_msgs[0].msg, "Part onePart two")

def test_hello_blank(self):
self._test_task(R"""
Expand Down

0 comments on commit e870cb6

Please sign in to comment.