Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

subprocpool: handle long STDOUT/ERR from commands #2876

Merged
merged 2 commits into from Nov 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
71 changes: 64 additions & 7 deletions lib/cylc/subprocpool.py
Expand Up @@ -20,6 +20,7 @@
from collections import deque
import json
import os
import select
from signal import SIGKILL
from subprocess import Popen, PIPE
import sys
Expand Down Expand Up @@ -103,6 +104,7 @@ class SuiteProcPool(object):

ERR_SUITE_STOPPING = 'suite stopping, command not run'
JOBS_SUBMIT = 'jobs-submit'
POLLREAD = select.POLLIN | select.POLLPRI
RET_CODE_SUITE_STOPPING = 999

def __init__(self, size=None):
Expand All @@ -116,6 +118,10 @@ def __init__(self, size=None):
self.stopping_lock = RLock()
self.queuings = deque()
self.runnings = []
try:
self.pipepoller = select.poll()
except AttributeError: # select.poll not implemented for this OS
self.pipepoller = None

def close(self):
"""Close pool."""
Expand All @@ -136,30 +142,42 @@ def _is_stopping(self):
def _proc_exit(self, proc, err_xtra, ctx, callback, callback_args):
"""Get ret_code, out, err of exited command, and call its callback."""
ctx.ret_code = proc.wait()
ctx.out, err = proc.communicate()
ctx.err = err + err_xtra
out, err = proc.communicate()
if out:
if ctx.out is None:
ctx.out = ''
ctx.out += out
if err + err_xtra:
if ctx.err is None:
ctx.err = ''
ctx.err += err + err_xtra
self._run_command_exit(ctx, callback, callback_args)

def process(self):
"""Process done child processes and submit more."""
# Handle child processes that are done
runnings = []
for proc, ctx, callback, callback_args in self.runnings:
# Command completed/exited
if proc.poll() is not None:
self._proc_exit(proc, "", ctx, callback, callback_args)
elif time() < ctx.timeout:
runnings.append([proc, ctx, callback, callback_args])
else:
# Timed out, kill it.
continue
# Command timed out, kill it
if time() > ctx.timeout:
try:
os.killpg(proc.pid, SIGKILL)
os.killpg(proc.pid, SIGKILL) # kill process group
except OSError:
# must have just exited, since poll.
err_xtra = ""
else:
err_xtra = "\nkilled on timeout (%s)" % (
self.proc_pool_timeout)
self._proc_exit(proc, err_xtra, ctx, callback, callback_args)
continue
# Command still running, see if STDOUT/STDERR are readable or not
runnings.append([proc, ctx, callback, callback_args])
# Unblock proc's STDOUT/STDERR if possible
self._poll_proc_pipes(proc, ctx)

# Update list of running items
self.runnings[:] = runnings
Expand Down Expand Up @@ -234,6 +252,45 @@ def terminate(self):
# Wait for child processes
self.process()

def _poll_proc_pipes(self, proc, ctx):
"""Poll STDOUT/ERR of proc and read some data if possible.

This helps to unblock the command by unblocking its pipes.
"""
if self.pipepoller is None:
return # select.poll not supported on this OS
for handle in [proc.stdout, proc.stderr]:
if not handle.closed:
self.pipepoller.register(handle.fileno(), self.POLLREAD)
while True:
fileno_list = [
fileno
for fileno, event in self.pipepoller.poll(0.0)
if event & self.POLLREAD]
if not fileno_list:
# Nothing readable
break
for fileno in fileno_list:
data = ''
while True:
# Use the low level `os.read` here instead of
# `file.read` to avoid any buffering that may cause
# the file handle to block.
res = os.read(fileno, 65536) # 64K
if not res:
break
data += res
if fileno == proc.stdout.fileno():
if ctx.out is None:
ctx.out = ''
ctx.out += data
elif fileno == proc.stderr.fileno():
if ctx.err is None:
ctx.err = ''
ctx.err += data
self.pipepoller.unregister(proc.stdout.fileno())
self.pipepoller.unregister(proc.stderr.fileno())

@classmethod
def _run_command_init(cls, ctx, callback=None, callback_args=None):
"""Prepare and launch shell command in ctx."""
Expand Down
96 changes: 96 additions & 0 deletions tests/events/47-long-output.t
@@ -0,0 +1,96 @@
#!/bin/bash
# THIS FILE IS PART OF THE CYLC SUITE ENGINE.
# Copyright (C) 2008-2018 NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#-------------------------------------------------------------------------------
# Test that a long output from an event handler is not going to hang or die.

. "$(dirname "$0")/test_header"
if ! python -c 'from select import poll' 2>'/dev/null'; then
skip_all '"select.poll" not supported on this OS'
fi

set_test_number 10

create_test_globalrc "
process pool timeout = PT10S" ""

# Long STDOUT output

init_suite "${TEST_NAME_BASE}" <<'__SUITERC__'
[cylc]
[scheduling]
[[dependencies]]
graph = t1
[runtime]
[[t1]]
script = true
[[[events]]]
succeeded handler = cat "${CYLC_DIR}/COPYING" "${CYLC_DIR}/COPYING" "${CYLC_DIR}/COPYING" && echo
__SUITERC__

run_ok "${TEST_NAME_BASE}-validate" cylc validate "${SUITE_NAME}"

suite_run_ok "${TEST_NAME_BASE}-run" \
cylc run --debug --no-detach "${SUITE_NAME}"

cylc cat-log "${SUITE_NAME}" >'log'
sed -n 's/^.*\(GNU GENERAL PUBLIC LICENSE\)/\1/p' 'log' >'log-1'
contains_ok 'log-1' <<'__LOG__'
GNU GENERAL PUBLIC LICENSE
GNU GENERAL PUBLIC LICENSE
GNU GENERAL PUBLIC LICENSE
__LOG__
run_ok "log-event-handler-00-out" \
grep -qF "[(('event-handler-00', 'succeeded'), 1) out]" 'log'
run_ok "log-event-handler-ret-code" \
grep -qF "[(('event-handler-00', 'succeeded'), 1) ret_code] 0" 'log'

purge_suite "${SUITE_NAME}"

# REPEAT: Long STDERR output

init_suite "${TEST_NAME_BASE}" <<'__SUITERC__'
[cylc]
[scheduling]
[[dependencies]]
graph = t1
[runtime]
[[t1]]
script = true
[[[events]]]
succeeded handler = cat "${CYLC_DIR}/COPYING" "${CYLC_DIR}/COPYING" "${CYLC_DIR}/COPYING" >&2 && echo
__SUITERC__

run_ok "${TEST_NAME_BASE}-validate" cylc validate "${SUITE_NAME}"

suite_run_ok "${TEST_NAME_BASE}-run" \
cylc run --debug --no-detach "${SUITE_NAME}"

cylc cat-log "${SUITE_NAME}" >'log'
sed -n 's/^.*\(GNU GENERAL PUBLIC LICENSE\)/\1/p' 'log' >'log-1'
contains_ok 'log-1' <<'__LOG__'
GNU GENERAL PUBLIC LICENSE
GNU GENERAL PUBLIC LICENSE
GNU GENERAL PUBLIC LICENSE
__LOG__
run_ok "log-event-handler-00-err" \
grep -qF "[(('event-handler-00', 'succeeded'), 1) err]" 'log'
run_ok "log-event-handler-00-ret-code" \
grep -qF "[(('event-handler-00', 'succeeded'), 1) ret_code] 0" 'log'

purge_suite "${SUITE_NAME}"

exit