Skip to content

Commit

Permalink
Merge 3db1b2f into 785b7b5
Browse files Browse the repository at this point in the history
  • Loading branch information
jamadden committed Jan 14, 2020
2 parents 785b7b5 + 3db1b2f commit 5f49020
Show file tree
Hide file tree
Showing 15 changed files with 462 additions and 325 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Expand Up @@ -96,6 +96,7 @@ install:
- echo $BUILD_RUNTIMES/snakepit/$TRAVIS_PYTHON_VERSION.d
- ls -l $BUILD_RUNTIMES/snakepit/$TRAVIS_PYTHON_VERSION.d
- python -c 'import gevent; print(gevent.__version__)'
- python -c 'from gevent._compat import get_clock_info; print(get_clock_info("perf_counter"))'
script:
# The generic script for the matrix expansion is to
# test the defaults.
Expand Down
16 changes: 15 additions & 1 deletion CHANGES.rst
Expand Up @@ -39,7 +39,21 @@ Library and Dependency Updates
with debugging. The event libraries allocate small amounts of memory
at startup. The allocation functions have to take the GIL, but
because of the limited amount of actual allocation that gets done
this is not expected to be a concern.
this is not expected to be a bottleneck.

Other
-----

- Make `gevent.subprocess.Popen.communicate` raise exceptions raised
by reading from the process, like the standard library. In
particular, under Python 3, if the process output is being decoded
as text, this can now raise ``UnicodeDecodeError``. Reported in
:issue:`1510` by Ofer Koren.

- Make `gevent.subprocess.Popen.communicate` be more careful about
closing files. Previously if a timeout error happened, a second call
to ``communicate`` might not close the pipe.


1.5a3 (2020-01-01)
==================
Expand Down
8 changes: 7 additions & 1 deletion src/gevent/_compat.py
Expand Up @@ -188,15 +188,21 @@ def fsdecode(filename):
try:
# Python 3.3+ (PEP 418)
from time import perf_counter
from time import get_clock_info
from time import monotonic
perf_counter = perf_counter
monotonic = monotonic
get_clock_info = get_clock_info
except ImportError:
import time

if sys.platform == "win32":
perf_counter = time.clock # pylint:disable=no-member
else:
perf_counter = time.time

monotonic = perf_counter
def get_clock_info(_):
return 'Unknown'

## Monitoring
def get_this_psutil_process():
Expand Down
8 changes: 7 additions & 1 deletion src/gevent/_fileobjectcommon.py
Expand Up @@ -371,7 +371,13 @@ def __getattr__(self, name):
return getattr(self._io, name)

def __repr__(self):
return '<%s _fobj=%r%s>' % (self.__class__.__name__, self.io, self._extra_repr())
return '<%s at 0x%x %s_fobj=%r%s>' % (
self.__class__.__name__,
id(self),
'closed' if self.closed else '',
self.io,
self._extra_repr()
)

def _extra_repr(self):
return ''
Expand Down
209 changes: 120 additions & 89 deletions src/gevent/subprocess.py
Expand Up @@ -39,7 +39,6 @@
import traceback

from gevent.event import AsyncResult
from gevent.exceptions import ConcurrentObjectUseError
from gevent.hub import _get_hub_noargs as get_hub
from gevent.hub import linkproxy
from gevent.hub import sleep
Expand Down Expand Up @@ -264,6 +263,13 @@ def __repr__(self):
fork = monkey.get_original('os', 'fork')
from gevent.os import fork_and_watch

try:
BrokenPipeError
except NameError: # Python 2
class BrokenPipeError(Exception):
"Never raised, never caught."


def call(*popenargs, **kwargs):
"""
call(args, *, stdin=None, stdout=None, stderr=None, shell=False, timeout=None) -> returncode
Expand Down Expand Up @@ -437,6 +443,95 @@ def FileObject(*args, **kwargs):
globals()['FileObject'] = _FileObject
return _FileObject(*args)


class _CommunicatingGreenlets(object):
# At most, exactly one of these objects may be created
# for a given Popen object. This ensures that only one background
# greenlet at a time will be reading from the file object. This matters because
# if a timeout exception is raised, the user may call back into communicate() to
# get the output (usually after killing the process; see run()). We must not
# lose output in that case (Python 3 specifically documents that raising a timeout
# doesn't lose output). Also, attempting to read from a pipe while it's already
# being read from results in `RuntimeError: reentrant call in io.BufferedReader`;
# the same thing happens if you attempt to close() it while that's in progress.
__slots__ = (
'stdin',
'stdout',
'stderr',
'_all_greenlets',
)

def __init__(self, popen, input_data):
self.stdin = self.stdout = self.stderr = None
if popen.stdin: # Even if no data, we need to close
self.stdin = spawn(self._write_and_close, popen.stdin, input_data)

# If the timeout parameter is used, and the caller calls back after
# getting a TimeoutExpired exception, we can wind up with multiple
# greenlets trying to run and read from and close stdout/stderr.
# That's bad because it can lead to 'RuntimeError: reentrant call in io.BufferedReader'.
# We can't just kill the previous greenlets when a timeout happens,
# though, because we risk losing the output collected by that greenlet
# (and Python 3, where timeout is an official parameter, explicitly says
# that no output should be lost in the event of a timeout.) Instead, we're
# watching for the exception and ignoring it. It's not elegant,
# but it works
if popen.stdout:
self.stdout = spawn(self._read_and_close, popen.stdout)

if popen.stderr:
self.stderr = spawn(self._read_and_close, popen.stderr)

all_greenlets = []
for g in self.stdin, self.stdout, self.stderr:
if g is not None:
all_greenlets.append(g)
self._all_greenlets = tuple(all_greenlets)

def __iter__(self):
return iter(self._all_greenlets)

def __bool__(self):
return bool(self._all_greenlets)

__nonzero__ = __bool__

def __len__(self):
return len(self._all_greenlets)

@staticmethod
def _write_and_close(fobj, data):
try:
if data:
fobj.write(data)
if hasattr(fobj, 'flush'):
# 3.6 started expecting flush to be called.
fobj.flush()
except (OSError, IOError, BrokenPipeError) as ex:
# Test cases from the stdlib can raise BrokenPipeError
# without setting an errno value. This matters because
# Python 2 doesn't have a BrokenPipeError.
if isinstance(ex, BrokenPipeError) and ex.errno is None:
ex.errno = errno.EPIPE
if ex.errno != errno.EPIPE and ex.errno != errno.EINVAL:
raise
finally:
try:
fobj.close()
except EnvironmentError:
pass

@staticmethod
def _read_and_close(fobj):
try:
return fobj.read()
finally:
try:
fobj.close()
except EnvironmentError:
pass


class Popen(object):
"""
The underlying process creation and management in this module is
Expand Down Expand Up @@ -706,13 +801,17 @@ def _get_devnull(self):
self._devnull = os.open(os.devnull, os.O_RDWR)
return self._devnull

_stdout_buffer = None
_stderr_buffer = None
_communicating_greenlets = None

def communicate(self, input=None, timeout=None):
"""Interact with process: Send data to stdin. Read data from
stdout and stderr, until end-of-file is reached. Wait for
process to terminate. The optional input argument should be a
"""
Interact with process and return its output and error.
- Send *input* data to stdin.
- Read data from stdout and stderr, until end-of-file is reached.
- Wait for process to terminate.
The optional *input* argument should be a
string to be sent to the child process, or None, if no data
should be sent to the child.
Expand All @@ -731,57 +830,9 @@ def communicate(self, input=None, timeout=None):
Honor a *timeout* even if there's no way to communicate with the child
(stdin, stdout, and stderr are not pipes).
"""
greenlets = []
if self.stdin:
greenlets.append(spawn(write_and_close, self.stdin, input))

# If the timeout parameter is used, and the caller calls back after
# getting a TimeoutExpired exception, we can wind up with multiple
# greenlets trying to run and read from and close stdout/stderr.
# That's bad because it can lead to 'RuntimeError: reentrant call in io.BufferedReader'.
# We can't just kill the previous greenlets when a timeout happens,
# though, because we risk losing the output collected by that greenlet
# (and Python 3, where timeout is an official parameter, explicitly says
# that no output should be lost in the event of a timeout.) Instead, we're
# watching for the exception and ignoring it. It's not elegant,
# but it works
def _make_pipe_reader(pipe_name):
pipe = getattr(self, pipe_name)
buf_name = '_' + pipe_name + '_buffer'

def _read():
try:
data = pipe.read()
except (
# io.Buffered* can raise RuntimeError: 'reentrant call'
RuntimeError,
# unbuffered Posix IO that we're already waiting on
# can raise this. Closing the pipe will free those greenlets up.
ConcurrentObjectUseError
):
return
if not data:
return
the_buffer = getattr(self, buf_name)
if the_buffer:
the_buffer.append(data)
else:
setattr(self, buf_name, [data])
return _read

if self.stdout:
_read_out = _make_pipe_reader('stdout')
stdout = spawn(_read_out)
greenlets.append(stdout)
else:
stdout = None

if self.stderr:
_read_err = _make_pipe_reader('stderr')
stderr = spawn(_read_err)
greenlets.append(stderr)
else:
stderr = None
if self._communicating_greenlets is None:
self._communicating_greenlets = _CommunicatingGreenlets(self, input)
greenlets = self._communicating_greenlets

# If we were given stdin=stdout=stderr=None, we have no way to
# communicate with the child, and thus no greenlets to wait
Expand All @@ -793,9 +844,18 @@ def _read():
self.wait(timeout=timeout, _raise_exc=True)

done = joinall(greenlets, timeout=timeout)
if timeout is not None and len(done) != len(greenlets):
# Allow finished greenlets, if any, to raise. This takes priority over
# the timeout exception.
for greenlet in done:
greenlet.get()
if timeout is not None and len(done) != len(self._communicating_greenlets):
raise TimeoutExpired(self.args, timeout)

# Close only after we're sure that everything is done
# (there was no timeout, or there was, but everything finished).
# There should be no greenlets still running, even from a prior
# attempt. If there are, then this can raise RuntimeError: 'reentrant call'.
# So we ensure that previous greenlets are dead.
for pipe in (self.stdout, self.stderr):
if pipe:
try:
Expand All @@ -805,21 +865,8 @@ def _read():

self.wait()

def _get_output_value(pipe_name):
buf_name = '_' + pipe_name + '_buffer'
buf_value = getattr(self, buf_name)
setattr(self, buf_name, None)
if buf_value:
buf_value = self._communicate_empty_value.join(buf_value)
else:
buf_value = self._communicate_empty_value
return buf_value

stdout_value = _get_output_value('stdout')
stderr_value = _get_output_value('stderr')

return (None if stdout is None else stdout_value,
None if stderr is None else stderr_value)
return (None if greenlets.stdout is None else greenlets.stdout.get(),
None if greenlets.stderr is None else greenlets.stderr.get())

def poll(self):
"""Check if child process has terminated. Set and return :attr:`returncode` attribute."""
Expand Down Expand Up @@ -1648,22 +1695,6 @@ def kill(self):
self.send_signal(signal.SIGKILL)


def write_and_close(fobj, data):
try:
if data:
fobj.write(data)
if hasattr(fobj, 'flush'):
# 3.6 started expecting flush to be called.
fobj.flush()
except (OSError, IOError) as ex:
if ex.errno != errno.EPIPE and ex.errno != errno.EINVAL:
raise
finally:
try:
fobj.close()
except EnvironmentError:
pass

def _with_stdout_stderr(exc, stderr):
# Prior to Python 3.5, most exceptions didn't have stdout
# and stderr attributes and can't take the stderr attribute in their
Expand Down
1 change: 1 addition & 0 deletions src/gevent/testing/__init__.py
Expand Up @@ -85,6 +85,7 @@
from .skipping import skipWithCExtensions
from .skipping import skipOnLibuvOnTravisOnCPython27
from .skipping import skipOnPy37
from .skipping import skipOnPy3
from .skipping import skipWithoutResource
from .skipping import skipWithoutExternalNetwork
from .skipping import skipOnPy2
Expand Down

0 comments on commit 5f49020

Please sign in to comment.