Skip to content

Commit

Permalink
io, dif: gitpython-developers#519: FIX DIFF freeze when reading from GIL
Browse files Browse the repository at this point in the history
+ CAUSE: In Windows, Diffs freeze while reading Popen streams,
probably buffers smaller; good-thin(TM) in this case because reading a
Popen-proc from the launching-thread freezes GIL.  The alternative to
use `proc.communicate()` also relies on big buffers.
+ SOLUTION: Use `cmd.handle_process_output()` to consume Diff-proc
streams.
+ Retroffited `handle_process_output()` code to support also
byte-streams, both Threading(Windows) and Select/Poll (Posix) paths
updated.

- TODO: Unfortunately, `Diff._index_from_patch_format()` still slurps
input; need to re-phrase header-regexes linewise to resolve it.
  • Loading branch information
ankostis committed Sep 28, 2016
1 parent 4674163 commit a5db3d3
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 88 deletions.
141 changes: 74 additions & 67 deletions git/cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
is_win,
)
import io
from _io import UnsupportedOperation

execute_kwargs = set(('istream', 'with_keep_cwd', 'with_extended_output',
'with_exceptions', 'as_process', 'stdout_as_string',
Expand All @@ -56,7 +57,7 @@
__all__ = ('Git',)

if is_win:
WindowsError = OSError
WindowsError = OSError # @ReservedAssignment

if PY3:
_bchr = bchr
Expand All @@ -72,7 +73,8 @@ def _bchr(c):
# Documentation
## @{

def handle_process_output(process, stdout_handler, stderr_handler, finalizer):
def handle_process_output(process, stdout_handler, stderr_handler, finalizer,
decode_stdout=True, decode_stderr=True):
"""Registers for notifications to lean that process output is ready to read, and dispatches lines to
the respective line handlers. We are able to handle carriage returns in case progress is sent by that
mean. For performance reasons, we only apply this to stderr.
Expand All @@ -82,8 +84,6 @@ def handle_process_output(process, stdout_handler, stderr_handler, finalizer):
:param stdout_handler: f(stdout_line_string), or None
:param stderr_hanlder: f(stderr_line_string), or None
:param finalizer: f(proc) - wait for proc to finish"""
fdmap = {process.stdout.fileno(): (stdout_handler, [b'']),
process.stderr.fileno(): (stderr_handler, [b''])}

def _parse_lines_from_buffer(buf):
line = b''
Expand All @@ -94,7 +94,7 @@ def _parse_lines_from_buffer(buf):
bi += 1

if char in (b'\r', b'\n') and line:
yield bi, line
yield bi, line + b'\n'
line = b''
else:
line += char
Expand All @@ -114,105 +114,111 @@ def _read_lines_from_fno(fno, last_buf_list):
# keep remainder
last_buf_list[0] = buf[bi:]

def _dispatch_single_line(line, handler):
line = line.decode(defenc)
def _dispatch_single_line(line, handler, decode):
if decode:
line = line.decode(defenc)
if line and handler:
handler(line)
# end dispatch helper
# end single line helper

def _dispatch_lines(fno, handler, buf_list):
def _dispatch_lines(fno, handler, buf_list, decode):
lc = 0
for line in _read_lines_from_fno(fno, buf_list):
_dispatch_single_line(line, handler)
_dispatch_single_line(line, handler, decode)
lc += 1
# for each line
return lc
# end

def _deplete_buffer(fno, handler, buf_list, wg=None):
def _deplete_buffer(fno, handler, buf_list, decode):
lc = 0
while True:
line_count = _dispatch_lines(fno, handler, buf_list)
line_count = _dispatch_lines(fno, handler, buf_list, decode)
lc += line_count
if line_count == 0:
break
# end deplete buffer

if buf_list[0]:
_dispatch_single_line(buf_list[0], handler)
_dispatch_single_line(buf_list[0], handler, decode)
lc += 1
# end

if wg:
wg.done()

return lc
# end

if hasattr(select, 'poll'):
# poll is preferred, as select is limited to file handles up to 1024 ... . This could otherwise be
# an issue for us, as it matters how many handles our own process has
poll = select.poll()
READ_ONLY = select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR
CLOSED = select.POLLHUP | select.POLLERR

poll.register(process.stdout, READ_ONLY)
poll.register(process.stderr, READ_ONLY)

closed_streams = set()
while True:
# no timeout

try:
poll_result = poll.poll()
except select.error as e:
if e.args[0] == errno.EINTR:
continue
raise
# end handle poll exception

for fd, result in poll_result:
if result & CLOSED:
closed_streams.add(fd)
else:
_dispatch_lines(fd, *fdmap[fd])
# end handle closed stream
# end for each poll-result tuple

if len(closed_streams) == len(fdmap):
break
# end its all done
# end endless loop

# Depelete all remaining buffers
for fno, (handler, buf_list) in fdmap.items():
_deplete_buffer(fno, handler, buf_list)
# end for each file handle

for fno in fdmap.keys():
poll.unregister(fno)
# end don't forget to unregister !
else:
# Oh ... probably we are on windows. select.select() can only handle sockets, we have files
try:
outfn = process.stdout.fileno()
errfn = process.stderr.fileno()
poll = select.poll() # @UndefinedVariable
except (UnsupportedOperation, AttributeError):
# Oh ... probably we are on windows. or TC mockap provided for streams.
# Anyhow, select.select() can only handle sockets, we have files
# The only reliable way to do this now is to use threads and wait for both to finish
def _handle_lines(fd, handler):
def _handle_lines(fd, handler, decode):
for line in fd:
line = line.decode(defenc)
if line and handler:
if handler:
if decode:
line = line.decode(defenc)
handler(line)

threads = []
for fd, handler in zip((process.stdout, process.stderr),
(stdout_handler, stderr_handler)):
t = threading.Thread(target=_handle_lines, args=(fd, handler))
for fd, handler, decode in zip((process.stdout, process.stderr),
(stdout_handler, stderr_handler),
(decode_stdout, decode_stderr),):
t = threading.Thread(target=_handle_lines, args=(fd, handler, decode))
t.setDaemon(True)
t.start()
threads.append(t)

for t in threads:
t.join()
# end
else:
# poll is preferred, as select is limited to file handles up to 1024 ... . This could otherwise be
# an issue for us, as it matters how many handles our own process has
fdmap = {outfn: (stdout_handler, [b''], decode_stdout),
errfn: (stderr_handler, [b''], decode_stderr)}

READ_ONLY = select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR # @UndefinedVariable
CLOSED = select.POLLHUP | select.POLLERR # @UndefinedVariable

poll.register(process.stdout, READ_ONLY)
poll.register(process.stderr, READ_ONLY)

closed_streams = set()
while True:
# no timeout

try:
poll_result = poll.poll()
except select.error as e:
if e.args[0] == errno.EINTR:
continue
raise
# end handle poll exception

for fd, result in poll_result:
if result & CLOSED:
closed_streams.add(fd)
else:
_dispatch_lines(fd, *fdmap[fd])
# end handle closed stream
# end for each poll-result tuple

if len(closed_streams) == len(fdmap):
break
# end its all done
# end endless loop

# Depelete all remaining buffers
for fno, (handler, buf_list, decode) in fdmap.items():
_deplete_buffer(fno, handler, buf_list, decode)
# end for each file handle

for fno in fdmap.keys():
poll.unregister(fno)
# end don't forget to unregister !

return finalizer(process)

Expand Down Expand Up @@ -458,6 +464,7 @@ def next(self):
line = self.readline()
if not line:
raise StopIteration

return line

def __del__(self):
Expand Down
32 changes: 21 additions & 11 deletions git/diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
defenc,
PY3
)
from git.cmd import handle_process_output
from git.util import finalize_process

__all__ = ('Diffable', 'DiffIndex', 'Diff', 'NULL_TREE')

Expand Down Expand Up @@ -145,10 +147,10 @@ def diff(self, other=Index, paths=None, create_patch=False, **kwargs):
kwargs['as_process'] = True
proc = diff_cmd(*self._process_diff_args(args), **kwargs)

diff_method = Diff._index_from_raw_format
if create_patch:
diff_method = Diff._index_from_patch_format
index = diff_method(self.repo, proc.stdout)
diff_method = (Diff._index_from_patch_format
if create_patch
else Diff._index_from_raw_format)
index = diff_method(self.repo, proc)

proc.wait()
return index
Expand Down Expand Up @@ -397,13 +399,18 @@ def _pick_best_path(cls, path_match, rename_match, path_fallback_match):
return None

@classmethod
def _index_from_patch_format(cls, repo, stream):
def _index_from_patch_format(cls, repo, proc):
"""Create a new DiffIndex from the given text which must be in patch format
:param repo: is the repository we are operating on - it is required
:param stream: result of 'git diff' as a stream (supporting file protocol)
:return: git.DiffIndex """

## FIXME: Here SLURPING raw, need to re-phrase header-regexes linewise.
text = []
handle_process_output(proc, text.append, None, finalize_process, decode_stdout=False)

# for now, we have to bake the stream
text = stream.read()
text = b''.join(text)
index = DiffIndex()
previous_header = None
for header in cls.re_header.finditer(text):
Expand Down Expand Up @@ -450,17 +457,19 @@ def _index_from_patch_format(cls, repo, stream):
return index

@classmethod
def _index_from_raw_format(cls, repo, stream):
def _index_from_raw_format(cls, repo, proc):
"""Create a new DiffIndex from the given stream which must be in raw format.
:return: git.DiffIndex"""
# handles
# :100644 100644 687099101... 37c5e30c8... M .gitignore

index = DiffIndex()
for line in stream.readlines():

def handle_diff_line(line):
line = line.decode(defenc)
if not line.startswith(":"):
continue
# END its not a valid diff line
return

meta, _, path = line[1:].partition('\t')
old_mode, new_mode, a_blob_id, b_blob_id, change_type = meta.split(None, 4)
path = path.strip()
Expand Down Expand Up @@ -489,6 +498,7 @@ def _index_from_raw_format(cls, repo, stream):
diff = Diff(repo, a_path, b_path, a_blob_id, b_blob_id, old_mode, new_mode,
new_file, deleted_file, rename_from, rename_to, '', change_type)
index.append(diff)
# END for each line

handle_process_output(proc, handle_diff_line, None, finalize_process, decode_stdout=False)

return index
20 changes: 10 additions & 10 deletions git/test/test_diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ def test_diff_with_staged_file(self, rw_dir):

def test_list_from_string_new_mode(self):
output = StringProcessAdapter(fixture('diff_new_mode'))
diffs = Diff._index_from_patch_format(self.rorepo, output.stdout)
diffs = Diff._index_from_patch_format(self.rorepo, output)
self._assert_diff_format(diffs)

assert_equal(1, len(diffs))
assert_equal(8, len(diffs[0].diff.splitlines()))

def test_diff_with_rename(self):
output = StringProcessAdapter(fixture('diff_rename'))
diffs = Diff._index_from_patch_format(self.rorepo, output.stdout)
diffs = Diff._index_from_patch_format(self.rorepo, output)
self._assert_diff_format(diffs)

assert_equal(1, len(diffs))
Expand All @@ -102,7 +102,7 @@ def test_diff_with_rename(self):
assert isinstance(str(diff), str)

output = StringProcessAdapter(fixture('diff_rename_raw'))
diffs = Diff._index_from_raw_format(self.rorepo, output.stdout)
diffs = Diff._index_from_raw_format(self.rorepo, output)
self.assertEqual(len(diffs), 1)
diff = diffs[0]
self.assertIsNotNone(diff.renamed_file)
Expand All @@ -113,7 +113,7 @@ def test_diff_with_rename(self):

def test_diff_of_modified_files_not_added_to_the_index(self):
output = StringProcessAdapter(fixture('diff_abbrev-40_full-index_M_raw_no-color'))
diffs = Diff._index_from_raw_format(self.rorepo, output.stdout)
diffs = Diff._index_from_raw_format(self.rorepo, output)

self.assertEqual(len(diffs), 1, 'one modification')
self.assertEqual(len(list(diffs.iter_change_type('M'))), 1, 'one modification')
Expand All @@ -126,7 +126,7 @@ def test_diff_of_modified_files_not_added_to_the_index(self):
)
def test_binary_diff(self, case):
method, file_name = case
res = method(None, StringProcessAdapter(fixture(file_name)).stdout)
res = method(None, StringProcessAdapter(fixture(file_name)))
self.assertEqual(len(res), 1)
self.assertEqual(len(list(res.iter_change_type('M'))), 1)
if res[0].diff:
Expand All @@ -137,7 +137,7 @@ def test_binary_diff(self, case):

def test_diff_index(self):
output = StringProcessAdapter(fixture('diff_index_patch'))
res = Diff._index_from_patch_format(None, output.stdout)
res = Diff._index_from_patch_format(None, output)
self.assertEqual(len(res), 6)
for dr in res:
self.assertTrue(dr.diff.startswith(b'@@'), dr)
Expand All @@ -149,7 +149,7 @@ def test_diff_index(self):

def test_diff_index_raw_format(self):
output = StringProcessAdapter(fixture('diff_index_raw'))
res = Diff._index_from_raw_format(None, output.stdout)
res = Diff._index_from_raw_format(None, output)
self.assertIsNotNone(res[0].deleted_file)
self.assertIsNone(res[0].b_path,)

Expand All @@ -171,7 +171,7 @@ def test_diff_initial_commit(self):

def test_diff_unsafe_paths(self):
output = StringProcessAdapter(fixture('diff_patch_unsafe_paths'))
res = Diff._index_from_patch_format(None, output.stdout)
res = Diff._index_from_patch_format(None, output)

# The "Additions"
self.assertEqual(res[0].b_path, u'path/ starting with a space')
Expand Down Expand Up @@ -207,12 +207,12 @@ def test_diff_patch_format(self):

for fixture_name in fixtures:
diff_proc = StringProcessAdapter(fixture(fixture_name))
Diff._index_from_patch_format(self.rorepo, diff_proc.stdout)
Diff._index_from_patch_format(self.rorepo, diff_proc)
# END for each fixture

def test_diff_with_spaces(self):
data = StringProcessAdapter(fixture('diff_file_with_spaces'))
diff_index = Diff._index_from_patch_format(self.rorepo, data.stdout)
diff_index = Diff._index_from_patch_format(self.rorepo, data)
self.assertIsNone(diff_index[0].a_path, repr(diff_index[0].a_path))
self.assertEqual(diff_index[0].b_path, u'file with spaces', repr(diff_index[0].b_path))

Expand Down

0 comments on commit a5db3d3

Please sign in to comment.