Skip to content
This repository has been archived by the owner on May 23, 2023. It is now read-only.

Commit

Permalink
use thraeding for tee, eliminate tee: input/output error messages
Browse files Browse the repository at this point in the history
  • Loading branch information
bukzor committed Dec 23, 2014
1 parent dabd16f commit 23ecfce
Showing 1 changed file with 31 additions and 68 deletions.
99 changes: 31 additions & 68 deletions tests/testing/capture_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,24 +70,6 @@ def __init__(self): # pylint:disable=super-init-not-called
pty_normalize_newlines(self.read)


def tee(read_fd, write_fd, *other_fds):
"""send output from read_fd to write_fd,
but also copy it to each of other_fds
"""
ischild = not os.fork()
if ischild:
os.dup2(read_fd, STDIN)
os.dup2(write_fd, STDOUT)
os.execvp(
'tee',
('tee', ) + tuple(
'/dev/fd/%i' % fd
for fd in other_fds
)
) # never returns
os.close(read_fd)


def read_block(fd, block=4 * 1024):
"""Read up to 4k bytes from fd.
Returns empty-string upon end of file.
Expand Down Expand Up @@ -119,41 +101,31 @@ def read_all(fd):
return b''.join(result)


def _communicate_with_select(read_set):
"""stolen from stdlib subprocess.Popen._communicate_with_select
changes:
arbitrary-length list of fds as input
deleted stdin/input support
class Tee(object):
"""send output from read_fd to each of write_fds
call .join() to get a complete copy of output
"""
import select
import errno

orig_read_set = read_set
read_set = list(read_set)
result = {}
for fd in read_set:
result[fd] = []

while read_set:
try:
readable, _, _ = select.select(read_set, [], [])
except select.error as error:
if error.args[0] == errno.EINTR:
continue
raise
def __init__(self, read_fd, *write_fds):
self.read = read_fd
self.write = write_fds
self._result = []

for fd in readable:
data = read_block(fd, 1024)
if data == b'':
os.close(fd)
read_set.remove(fd)
result[fd].append(data)
from threading import Thread
self.thread = Thread(target=self.tee)
self.thread.start()

return tuple(
b''.join(result[fd])
for fd in orig_read_set
)
def tee(self):
line = read_block(self.read)
while line != b'':
self._result.append(line)
for w in self.write:
os.write(w, line)
line = read_block(self.read)
os.close(self.read)

def join(self):
self.thread.join()
return b''.join(self._result)


def capture_subprocess(cmd, encoding='UTF-8', **popen_kwargs):
Expand All @@ -162,39 +134,30 @@ def capture_subprocess(cmd, encoding='UTF-8', **popen_kwargs):
No temporary files are used.
"""
stdout_orig = Pty() # libc uses full buffering for stdout if it doesn't see a tty
stderr_orig = Pipe()
stdout = Pty() # libc uses full buffering for stdout if it doesn't see a tty
stderr = Pipe()

# deadlocks occur if we have any write-end of a pipe open more than once
# best practice: close any used write pipes just after spawn
outputter = Popen(
cmd,
stdout=stdout_orig.write,
stderr=stderr_orig.write,
stdout=stdout.write,
stderr=stderr.write,
**popen_kwargs
)
stdout_orig.readonly() # deadlock otherwise
stderr_orig.readonly() # deadlock otherwise
stdout.readonly() # deadlock otherwise
stderr.readonly() # deadlock otherwise

# start one tee each on the original stdout and stderr
# writing each to three places:
# 1. the original destination
# 2. a pipe just for that one stream
stdout_teed = Pipe()
stderr_teed = Pipe()

tee(stdout_orig.read, STDOUT, stdout_teed.write)
tee(stderr_orig.read, STDERR, stderr_teed.write)
stdout_teed.readonly() # deadlock otherwise
stderr_teed.readonly() # deadlock otherwise

# communicate closes fds when it's done with them
result = _communicate_with_select((stdout_teed.read, stderr_teed.read))
stdout_tee = Tee(stdout.read, STDOUT)
stderr_tee = Tee(stderr.read, STDERR)

# clean up left-over processes and pipes:
exit_code = outputter.wait()
stdout_teed.closed()
stderr_teed.closed()
result = (stdout_tee.join(), stderr_tee.join())

if encoding is not None:
result = tuple(
Expand Down

0 comments on commit 23ecfce

Please sign in to comment.