Skip to content

Commit

Permalink
Fix Popen.communicate() to raise exceptions from reading the streams.
Browse files Browse the repository at this point in the history
And a general clean up of how the streams are read, ensuring we just have one greenlet. This avoids the 'reentrant call' errors that could prevent closing the streams on time.

Fixes #1510
  • Loading branch information
jamadden committed Jan 14, 2020
1 parent 785b7b5 commit acc67c3
Show file tree
Hide file tree
Showing 8 changed files with 334 additions and 214 deletions.
16 changes: 15 additions & 1 deletion CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,21 @@ Library and Dependency Updates
with debugging. The event libraries allocate small amounts of memory
at startup. The allocation functions have to take the GIL, but
because of the limited amount of actual allocation that gets done
this is not expected to be a concern.
this is not expected to be a bottleneck.

Other
-----

- Make `gevent.subprocess.Popen.communicate` raise exceptions raised
by reading from the process, like the standard library. In
particular, under Python 3, if the process output is being decoded
as text, this can now raise ``UnicodeDecodeError``. Reported in
:issue:`1510` by Ofer Koren.

- Make `gevent.subprocess.Popen.communicate` be more careful about
closing files. Previously if a timeout error happened, a second call
to ``communicate`` might not close the pipe.


1.5a3 (2020-01-01)
==================
Expand Down
8 changes: 7 additions & 1 deletion src/gevent/_fileobjectcommon.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,13 @@ def __getattr__(self, name):
return getattr(self._io, name)

def __repr__(self):
return '<%s _fobj=%r%s>' % (self.__class__.__name__, self.io, self._extra_repr())
return '<%s at 0x%x %s_fobj=%r%s>' % (
self.__class__.__name__,
id(self),
'closed' if self.closed else '',
self.io,
self._extra_repr()
)

def _extra_repr(self):
return ''
Expand Down
209 changes: 120 additions & 89 deletions src/gevent/subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import traceback

from gevent.event import AsyncResult
from gevent.exceptions import ConcurrentObjectUseError
from gevent.hub import _get_hub_noargs as get_hub
from gevent.hub import linkproxy
from gevent.hub import sleep
Expand Down Expand Up @@ -264,6 +263,13 @@ def __repr__(self):
fork = monkey.get_original('os', 'fork')
from gevent.os import fork_and_watch

try:
BrokenPipeError
except NameError: # Python 2
class BrokenPipeError(Exception):
"Never raised, never caught."


def call(*popenargs, **kwargs):
"""
call(args, *, stdin=None, stdout=None, stderr=None, shell=False, timeout=None) -> returncode
Expand Down Expand Up @@ -437,6 +443,95 @@ def FileObject(*args, **kwargs):
globals()['FileObject'] = _FileObject
return _FileObject(*args)


class _CommunicatingGreenlets(object):
# At most, exactly one of these objects may be created
# for a given Popen object. This ensures that only one background
# greenlet at a time will be reading from the file object. This matters because
# if a timeout exception is raised, the user may call back into communicate() to
# get the output (usually after killing the process; see run()). We must not
# lose output in that case (Python 3 specifically documents that raising a timeout
# doesn't lose output). Also, attempting to read from a pipe while it's already
# being read from results in `RuntimeError: reentrant call in io.BufferedReader`;
# the same thing happens if you attempt to close() it while that's in progress.
__slots__ = (
'stdin',
'stdout',
'stderr',
'_all_greenlets',
)

def __init__(self, popen, input_data):
self.stdin = self.stdout = self.stderr = None
if popen.stdin: # Even if no data, we need to close
self.stdin = spawn(self._write_and_close, popen.stdin, input_data)

# If the timeout parameter is used, and the caller calls back after
# getting a TimeoutExpired exception, we can wind up with multiple
# greenlets trying to run and read from and close stdout/stderr.
# That's bad because it can lead to 'RuntimeError: reentrant call in io.BufferedReader'.
# We can't just kill the previous greenlets when a timeout happens,
# though, because we risk losing the output collected by that greenlet
# (and Python 3, where timeout is an official parameter, explicitly says
# that no output should be lost in the event of a timeout.) Instead, we're
# watching for the exception and ignoring it. It's not elegant,
# but it works
if popen.stdout:
self.stdout = spawn(self._read_and_close, popen.stdout)

if popen.stderr:
self.stderr = spawn(self._read_and_close, popen.stderr)

all_greenlets = []
for g in self.stdin, self.stdout, self.stderr:
if g is not None:
all_greenlets.append(g)
self._all_greenlets = tuple(all_greenlets)

def __iter__(self):
return iter(self._all_greenlets)

def __bool__(self):
return bool(self._all_greenlets)

__nonzero__ = __bool__

def __len__(self):
return len(self._all_greenlets)

@staticmethod
def _write_and_close(fobj, data):
try:
if data:
fobj.write(data)
if hasattr(fobj, 'flush'):
# 3.6 started expecting flush to be called.
fobj.flush()
except (OSError, IOError, BrokenPipeError) as ex:
# Test cases from the stdlib can raise BrokenPipeError
# without setting an errno value. This matters because
# Python 2 doesn't have a BrokenPipeError.
if isinstance(ex, BrokenPipeError) and ex.errno is None:
ex.errno = errno.EPIPE
if ex.errno != errno.EPIPE and ex.errno != errno.EINVAL:
raise
finally:
try:
fobj.close()
except EnvironmentError:
pass

@staticmethod
def _read_and_close(fobj):
try:
return fobj.read()
finally:
try:
fobj.close()
except EnvironmentError:
pass


class Popen(object):
"""
The underlying process creation and management in this module is
Expand Down Expand Up @@ -706,13 +801,17 @@ def _get_devnull(self):
self._devnull = os.open(os.devnull, os.O_RDWR)
return self._devnull

_stdout_buffer = None
_stderr_buffer = None
_communicating_greenlets = None

def communicate(self, input=None, timeout=None):
"""Interact with process: Send data to stdin. Read data from
stdout and stderr, until end-of-file is reached. Wait for
process to terminate. The optional input argument should be a
"""
Interact with process and return its output and error.
- Send *input* data to stdin.
- Read data from stdout and stderr, until end-of-file is reached.
- Wait for process to terminate.
The optional *input* argument should be a
string to be sent to the child process, or None, if no data
should be sent to the child.
Expand All @@ -731,57 +830,9 @@ def communicate(self, input=None, timeout=None):
Honor a *timeout* even if there's no way to communicate with the child
(stdin, stdout, and stderr are not pipes).
"""
greenlets = []
if self.stdin:
greenlets.append(spawn(write_and_close, self.stdin, input))

# If the timeout parameter is used, and the caller calls back after
# getting a TimeoutExpired exception, we can wind up with multiple
# greenlets trying to run and read from and close stdout/stderr.
# That's bad because it can lead to 'RuntimeError: reentrant call in io.BufferedReader'.
# We can't just kill the previous greenlets when a timeout happens,
# though, because we risk losing the output collected by that greenlet
# (and Python 3, where timeout is an official parameter, explicitly says
# that no output should be lost in the event of a timeout.) Instead, we're
# watching for the exception and ignoring it. It's not elegant,
# but it works
def _make_pipe_reader(pipe_name):
pipe = getattr(self, pipe_name)
buf_name = '_' + pipe_name + '_buffer'

def _read():
try:
data = pipe.read()
except (
# io.Buffered* can raise RuntimeError: 'reentrant call'
RuntimeError,
# unbuffered Posix IO that we're already waiting on
# can raise this. Closing the pipe will free those greenlets up.
ConcurrentObjectUseError
):
return
if not data:
return
the_buffer = getattr(self, buf_name)
if the_buffer:
the_buffer.append(data)
else:
setattr(self, buf_name, [data])
return _read

if self.stdout:
_read_out = _make_pipe_reader('stdout')
stdout = spawn(_read_out)
greenlets.append(stdout)
else:
stdout = None

if self.stderr:
_read_err = _make_pipe_reader('stderr')
stderr = spawn(_read_err)
greenlets.append(stderr)
else:
stderr = None
if self._communicating_greenlets is None:
self._communicating_greenlets = _CommunicatingGreenlets(self, input)
greenlets = self._communicating_greenlets

# If we were given stdin=stdout=stderr=None, we have no way to
# communicate with the child, and thus no greenlets to wait
Expand All @@ -793,9 +844,18 @@ def _read():
self.wait(timeout=timeout, _raise_exc=True)

done = joinall(greenlets, timeout=timeout)
if timeout is not None and len(done) != len(greenlets):
# Allow finished greenlets, if any, to raise. This takes priority over
# the timeout exception.
for greenlet in done:
greenlet.get()
if timeout is not None and len(done) != len(self._communicating_greenlets):
raise TimeoutExpired(self.args, timeout)

# Close only after we're sure that everything is done
# (there was no timeout, or there was, but everything finished).
# There should be no greenlets still running, even from a prior
# attempt. If there are, then this can raise RuntimeError: 'reentrant call'.
# So we ensure that previous greenlets are dead.
for pipe in (self.stdout, self.stderr):
if pipe:
try:
Expand All @@ -805,21 +865,8 @@ def _read():

self.wait()

def _get_output_value(pipe_name):
buf_name = '_' + pipe_name + '_buffer'
buf_value = getattr(self, buf_name)
setattr(self, buf_name, None)
if buf_value:
buf_value = self._communicate_empty_value.join(buf_value)
else:
buf_value = self._communicate_empty_value
return buf_value

stdout_value = _get_output_value('stdout')
stderr_value = _get_output_value('stderr')

return (None if stdout is None else stdout_value,
None if stderr is None else stderr_value)
return (None if greenlets.stdout is None else greenlets.stdout.get(),
None if greenlets.stderr is None else greenlets.stderr.get())

def poll(self):
"""Check if child process has terminated. Set and return :attr:`returncode` attribute."""
Expand Down Expand Up @@ -1648,22 +1695,6 @@ def kill(self):
self.send_signal(signal.SIGKILL)


def write_and_close(fobj, data):
try:
if data:
fobj.write(data)
if hasattr(fobj, 'flush'):
# 3.6 started expecting flush to be called.
fobj.flush()
except (OSError, IOError) as ex:
if ex.errno != errno.EPIPE and ex.errno != errno.EINVAL:
raise
finally:
try:
fobj.close()
except EnvironmentError:
pass

def _with_stdout_stderr(exc, stderr):
# Prior to Python 3.5, most exceptions didn't have stdout
# and stderr attributes and can't take the stderr attribute in their
Expand Down
1 change: 1 addition & 0 deletions src/gevent/testing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
from .skipping import skipWithCExtensions
from .skipping import skipOnLibuvOnTravisOnCPython27
from .skipping import skipOnPy37
from .skipping import skipOnPy3
from .skipping import skipWithoutResource
from .skipping import skipWithoutExternalNetwork
from .skipping import skipOnPy2
Expand Down
Loading

0 comments on commit acc67c3

Please sign in to comment.