Skip to content

Commit

Permalink
Merge branch 'release-1.12.10'
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Moffat committed Mar 2, 2017
2 parents 268c191 + 154cbe0 commit 8a8e225
Show file tree
Hide file tree
Showing 3 changed files with 268 additions and 58 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
@@ -1,5 +1,10 @@
# Changelog

## 1.12.10 - 3/02/17

* bugfix for file descriptors over 1024 [#356](https://github.com/amoffat/sh/issues/356)
* bugfix when `_err_to_out` is True and `_out` is pipe or tty [#365](https://github.com/amoffat/sh/issues/365)

## 1.12.9 - 1/04/17

* added `_bg_exc` for silencing exceptions in background threads [#350](https://github.com/amoffat/sh/pull/350)
Expand Down
271 changes: 213 additions & 58 deletions sh.py
Expand Up @@ -24,7 +24,7 @@
#===============================================================================


__version__ = "1.12.9"
__version__ = "1.12.10"
__project_url__ = "https://github.com/amoffat/sh"


Expand Down Expand Up @@ -67,6 +67,7 @@
# serious side-effects that could change anything. as long as we do that, it
# should be ok
RUNNING_TESTS = bool(int(os.environ.get("SH_TESTS_RUNNING", "0")))
FORCE_USE_SELECT = bool(int(os.environ.get("SH_TESTS_USE_SELECT", "0")))

if IS_PY3:
from io import StringIO
Expand Down Expand Up @@ -130,6 +131,140 @@ def get_num_args(fn):
_unicode_methods = set(dir(unicode()))


HAS_POLL = hasattr(select, "poll")
POLLER_EVENT_READ = 1
POLLER_EVENT_WRITE = 2
POLLER_EVENT_HUP = 4
POLLER_EVENT_ERROR = 8


# here we use an use a poller interface that transparently selects the most
# capable poller (out of either select.select or select.poll). this was added
# by zhangyafeikimi when he discovered that if the fds created internally by sh
# numbered > 1024, select.select failed (a limitation of select.select). this
# can happen if your script opens a lot of files
if HAS_POLL and not FORCE_USE_SELECT:
class Poller(object):
def __init__(self):
self._poll = select.poll()
# file descriptor <-> file object bidirectional maps
self.fd_lookup = {}
self.fo_lookup = {}

def __nonzero__(self):
return len(self.fd_lookup) != 0

def __len__(self):
return len(self.fd_lookup)

def _set_fileobject(self, f):
if hasattr(f, "fileno"):
fd = f.fileno()
self.fd_lookup[fd] = f
self.fo_lookup[f] = fd
else:
self.fd_lookup[f] = f
self.fo_lookup[f] = f

def _remove_fileobject(self, f):
if hasattr(f, "fileno"):
fd = f.fileno()
del self.fd_lookup[fd]
del self.fo_lookup[f]
else:
del self.fd_lookup[f]
del self.fo_lookup[f]

def _get_file_descriptor(self, f):
return self.fo_lookup.get(f)

def _get_file_object(self, fd):
return self.fd_lookup.get(fd)

def _register(self, f, events):
# f can be a file descriptor or file object
self._set_fileobject(f)
fd = self._get_file_descriptor(f)
self._poll.register(fd, events)

def register_read(self, f):
self._register(f, select.POLLIN | select.POLLPRI)

def register_write(self, f):
self._register(f, select.POLLOUT)

def register_error(self, f):
self._register(f, select.POLLERR | select.POLLHUP | select.POLLNVAL)

def unregister(self, f):
fd = self._get_file_descriptor(f)
self._poll.unregister(fd)
self._remove_fileobject(f)

def poll(self, timeout):
if timeout is not None:
# convert from seconds to milliseconds
timeout *= 1000
changes = self._poll.poll(timeout)
results = []
for fd, events in changes:
f = self._get_file_object(fd)
if events & (select.POLLIN | select.POLLPRI):
results.append((f, POLLER_EVENT_READ))
elif events & (select.POLLOUT):
results.append((f, POLLER_EVENT_WRITE))
elif events & (select.POLLHUP):
results.append((f, POLLER_EVENT_HUP))
elif events & (select.POLLERR | select.POLLNVAL):
results.append((f, POLLER_EVENT_ERROR))
return results
else:
class Poller(object):
def __init__(self):
self.rlist = []
self.wlist = []
self.xlist = []

def __nonzero__(self):
return len(self.rlist) + len(self.wlist) + len(self.xlist) != 0

def __len__(self):
return len(self.rlist) + len(self.wlist) + len(self.xlist)

def _register(self, f, l):
if f not in l:
l.append(f)

def _unregister(self, f, l):
if f in l:
l.remove(f)

def register_read(self, f):
self._register(f, self.rlist)

def register_write(self, f):
self._register(f, self.wlist)

def register_error(self, f):
self._register(f, self.xlist)

def unregister(self, f):
self._unregister(f, self.rlist)
self._unregister(f, self.wlist)
self._unregister(f, self.xlist)

def poll(self, timeout):
_in, _out, _err = select.select(self.rlist, self.wlist, self.xlist, timeout)
results = []
for f in _in:
results.append((f, POLLER_EVENT_READ))
for f in _out:
results.append((f, POLLER_EVENT_WRITE))
for f in _err:
results.append((f, POLLER_EVENT_ERROR))
return results


def encode_to_py3bytes_or_py2str(s):
""" takes anything and attempts to return a py2 string or py3 bytes. this
is typically used when creating command + arguments to be executed via
Expand Down Expand Up @@ -1643,9 +1778,17 @@ def __init__(self, command, parent_log, cmd, stdin, stdout, stderr,
# by the time the process exits, and the data will be lost.
# i've only seen this on OSX.
if stderr is OProc.STDOUT:
self._stderr_read_fd = os.dup(self._stdout_read_fd)
# if stderr is going to stdout, but stdout is a tty or a pipe,
# we should not specify a read_fd, because stdout is dup'd
# directly to the stdout fd (no pipe), and so stderr won't have
# a slave end of a pipe either to dup
if stdout_is_tty_or_pipe:
self._stderr_read_fd = None
else:
self._stderr_read_fd = os.dup(self._stdout_read_fd)
self._stderr_write_fd = os.dup(self._stdout_write_fd)


elif stderr_is_tty_or_pipe:
self._stderr_write_fd = os.dup(get_fileno(stderr))
self._stderr_read_fd = None
Expand Down Expand Up @@ -2234,20 +2377,21 @@ def input_thread(log, stdin, is_alive, quit, close_before_term):
done = False
closed = False
alive = True
writers = [stdin]

while writers and alive:
_, to_write, _ = select.select([], writers, [], 1)

if to_write:
log.debug("%r ready for more input", stdin)
done = stdin.write()

if done:
writers = []
if close_before_term:
stdin.close()
closed = True
poller = Poller()
poller.register_write(stdin)

while poller and alive:
changed = poller.poll(1)
for fd, events in changed:
if events & (POLLER_EVENT_WRITE | POLLER_EVENT_HUP):
log.debug("%r ready for more input", stdin)
done = stdin.write()

if done:
poller.unregister(stdin)
if close_before_term:
stdin.close()
closed = True

alive, _ = is_alive()

Expand Down Expand Up @@ -2300,36 +2444,30 @@ def output_thread(log, stdout, stderr, timeout_event, is_alive, quit,
process's stdout stream (a streamreader), and waits for it to claim that
its done """

readers = []
errors = []

poller = Poller()
if stdout is not None:
readers.append(stdout)
errors.append(stdout)
poller.register_read(stdout)
if stderr is not None:
readers.append(stderr)
errors.append(stderr)
poller.register_read(stderr)

# this is our select loop for polling stdout or stderr that is ready to
# this is our poll loop for polling stdout or stderr that is ready to
# be read and processed. if one of those streamreaders indicate that it
# is done altogether being read from, we remove it from our list of
# things to poll. when no more things are left to poll, we leave this
# loop and clean up
while readers:
outputs, inputs, err = no_interrupt(select.select, readers, [], errors, 1)

# stdout and stderr
for stream in outputs:
log.debug("%r ready to be read from", stream)
done = stream.read()
if done:
readers.remove(stream)

# for some reason, we have to just ignore streams that have had an
# error. i'm not exactly sure why, but don't remove this until we
# figure that out, and create a test for it
for stream in err:
pass
while poller:
changed = no_interrupt(poller.poll, 0.1)
for f, events in changed:
if events & (POLLER_EVENT_READ | POLLER_EVENT_HUP):
log.debug("%r ready to be read from", f)
done = f.read()
if done:
poller.unregister(f)
elif events & POLLER_EVENT_ERROR:
# for some reason, we have to just ignore streams that have had an
# error. i'm not exactly sure why, but don't remove this until we
# figure that out, and create a test for it
pass

if timeout_event and timeout_event.is_set():
break
Expand Down Expand Up @@ -2461,7 +2599,7 @@ def get_file_chunk_reader(stdin):

def fn():
# python 3.* includes a fileno on stringios, but accessing it throws an
# exception. that exception is how we'll know we can't do a select on
# exception. that exception is how we'll know we can't do a poll on
# stdin
is_real_file = True
if IS_PY3:
Expand All @@ -2470,11 +2608,17 @@ def fn():
except UnsupportedOperation:
is_real_file = False

# this select is for files that may not yet be ready to read. we test
# for fileno because StringIO/BytesIO cannot be used in a select
# this poll is for files that may not yet be ready to read. we test
# for fileno because StringIO/BytesIO cannot be used in a poll
if is_real_file and hasattr(stdin, "fileno"):
outputs, _, _ = select.select([stdin], [], [], 0.1)
if not outputs:
poller = Poller()
poller.register_read(stdin)
changed = poller.poll(0.1)
ready = False
for fd, events in changed:
if events & (POLLER_EVENT_READ | POLLER_EVENT_HUP):
ready = True
if not ready:
raise NotYetReadyToRead

chunk = stdin.read(bufsize)
Expand Down Expand Up @@ -2526,7 +2670,7 @@ def __init__(self, log, stream, stdin, bufsize_type, encoding, tty_in):


def fileno(self):
""" defining this allows us to do select.select on an instance of this
""" defining this allows us to do poll on an instance of this
class """
return self.stream

Expand Down Expand Up @@ -2719,7 +2863,7 @@ def __init__(self, log, stream, handler, buffer, bufsize_type, encoding,


def fileno(self):
""" defining this allows us to do select.select on an instance of this
""" defining this allows us to do poll on an instance of this
class """
return self.stream

Expand Down Expand Up @@ -3292,17 +3436,22 @@ def load_module(self, mod_fullname):
return module


def run_tests(env, locale, args, version, **extra_env): # pragma: no cover
def run_tests(env, locale, args, version, force_select, **extra_env): # pragma: no cover
py_version = "python"
py_version += str(version)

py_bin = which(py_version)
return_code = None

poller = "poll"
if force_select:
poller = "select"

if py_bin:
print("Testing %s, locale %r" % (py_version.capitalize(),
locale))
print("Testing %s, locale %r, poller: %s" % (py_version.capitalize(),
locale, poller))

env["SH_TESTS_USE_SELECT"] = str(int(force_select))
env["LANG"] = locale

for k,v in extra_env.items():
Expand Down Expand Up @@ -3361,6 +3510,10 @@ def parse_args():
sys_ver = "%d.%d" % (v[0], v[1])
all_versions = (sys_ver,)

all_force_select = [True]
if HAS_POLL:
all_force_select.append(False)

all_locales = ("en_US.UTF-8", "C")
i = 0
for locale in all_locales:
Expand All @@ -3371,18 +3524,20 @@ def parse_args():
if constrain_versions and version not in constrain_versions:
continue

env_copy = env.copy()
exit_code = run_tests(env_copy, locale, args, version,
SH_TEST_RUN_IDX=i)
for force_select in all_force_select:
env_copy = env.copy()

exit_code = run_tests(env_copy, locale, args, version,
force_select, SH_TEST_RUN_IDX=i)

if exit_code is None:
print("Couldn't find %s, skipping" % version)
if exit_code is None:
print("Couldn't find %s, skipping" % version)

elif exit_code != 0:
print("Failed for %s, %s" % (version, locale))
exit(1)
elif exit_code != 0:
print("Failed for %s, %s" % (version, locale))
exit(1)

i += 1
i += 1

ran_versions = ",".join(all_versions)
print("Tested Python versions: %s" % ran_versions)
Expand Down

0 comments on commit 8a8e225

Please sign in to comment.