Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed-up GitRepo.get_content_info() (fixes gh-3300) #3301

Merged
merged 14 commits into from Apr 9, 2019
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
149 changes: 149 additions & 0 deletions datalad/cmd.py
Expand Up @@ -44,6 +44,7 @@
assure_unicode,
assure_bytes,
unlink,
auto_repr,
)
from .dochelpers import borrowdoc

Expand Down Expand Up @@ -692,3 +693,151 @@ def run(self, cmd, env=None, *args, **kwargs):
# All communication here will be returned as unicode
# TODO: do that instead within the super's run!
return assure_unicode(out), assure_unicode(err)


def readline_rstripped(stdout):
#return iter(stdout.readline, b'').next().rstrip()
return stdout.readline().rstrip()


@auto_repr
class BatchedCommand(object):
"""Container for an annex process which would allow for persistent communication
kyleam marked this conversation as resolved.
Show resolved Hide resolved
"""

def __init__(self, cmd, path=None, output_proc=None):
if not isinstance(cmd, list):
cmd = [cmd]
self.cmd = cmd
self.path = path
self.output_proc = output_proc if output_proc else readline_rstripped
self._process = None
self._stderr_out = None
self._stderr_out_fname = None

def _initialize(self):
lgr.debug("Initiating a new process for %s" % repr(self))
lgr.log(5, "Command: %s" % self.cmd)
# according to the internet wisdom there is no easy way with subprocess
# while avoid deadlocks etc. We would need to start a thread/subprocess
# to timeout etc
# kwargs = dict(bufsize=1, universal_newlines=True) if PY3 else {}
self._stderr_out, self._stderr_out_fname = tempfile.mkstemp()
self._process = subprocess.Popen(
self.cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=self._stderr_out,
env=GitRunner.get_git_environ_adjusted(),
cwd=self.path,
bufsize=1,
universal_newlines=True # **kwargs
)

def _check_process(self, restart=False):
"""Check if the process was terminated and restart if restart

Returns
-------
bool
True if process was alive.
str
stderr if any recorded if was terminated
"""
process = self._process
ret = True
ret_stderr = None
if process and process.poll():
lgr.warning("Process %s was terminated with returncode %s" % (process, process.returncode))
ret_stderr = self.close(return_stderr=True)
ret = False
if self._process is None and restart:
lgr.warning("Restarting the process due to previous failure")
self._initialize()
return ret, ret_stderr

def __call__(self, cmds):
"""

Parameters
----------
cmds : str or tuple or list of (str or tuple)

Returns
-------
str or list
Output received from annex. list in case if cmds was a list
"""
# TODO: add checks -- may be process died off and needs to be reinitiated
if not self._process:
self._initialize()

input_multiple = isinstance(cmds, list)
if not input_multiple:
cmds = [cmds]

output = []

for entry in cmds:
if not isinstance(entry, string_types):
entry = ' '.join(entry)
entry = entry + '\n'
lgr.log(5, "Sending %r to batched command %s" % (entry, self))
# apparently communicate is just a one time show
# stdout, stderr = self._process.communicate(entry)
# according to the internet wisdom there is no easy way with subprocess
self._check_process(restart=True)
process = self._process # _check_process might have restarted it
process.stdin.write(assure_bytes(entry) if PY2 else entry)
process.stdin.flush()
lgr.log(5, "Done sending.")
still_alive, stderr = self._check_process(restart=False)
# TODO: we might want to handle still_alive, e.g. to allow for
# a number of restarts/resends, but it should be per command
# since for some we cannot just resend the same query. But if
# it is just a "get"er - we could resend it few times
# We are expecting a single line output
kyleam marked this conversation as resolved.
Show resolved Hide resolved
# TODO: timeouts etc
stdout = assure_unicode(self.output_proc(process.stdout)) \
if not process.stdout.closed else None
if stderr:
lgr.warning("Received output in stderr: %r", stderr)
lgr.log(5, "Received output: %r" % stdout)
output.append(stdout)

return output if input_multiple else output[0]

def __del__(self):
self.close()

def close(self, return_stderr=False):
"""Close communication and wait for process to terminate

Returns
-------
str
stderr output if return_stderr and stderr file was there.
None otherwise
"""
ret = None
if self._stderr_out:
# close possibly still open fd
os.fdopen(self._stderr_out).close()
self._stderr_out = None
if self._process:
process = self._process
lgr.debug(
"Closing stdin of %s and waiting process to finish", process)
process.stdin.close()
process.stdout.close()
process.wait()
self._process = None
lgr.debug("Process %s has finished", process)
if self._stderr_out_fname and os.path.exists(self._stderr_out_fname):
if return_stderr:
with open(self._stderr_out_fname, 'r') as f:
ret = f.read()
# remove the file where we kept dumping stderr
unlink(self._stderr_out_fname)
self._stderr_out_fname = None
return ret
166 changes: 17 additions & 149 deletions datalad/support/annexrepo.py
Expand Up @@ -63,6 +63,7 @@
from datalad.utils import unlink
from datalad.support.json_py import loads as json_loads
from datalad.cmd import GitRunner
from datalad.cmd import BatchedCommand

# imports from same module:
from .repo import RepoInterface
Expand Down Expand Up @@ -3386,11 +3387,6 @@ def __del__(self):
self.close()


def readline_rstripped(stdout):
#return iter(stdout.readline, b'').next().rstrip()
return stdout.readline().rstrip()


def readlines_until_ok_or_failed(stdout, maxlines=100):
"""Read stdout until line ends with ok or failed"""
out = ''
Expand All @@ -3415,156 +3411,28 @@ def readline_json(stdout):


@auto_repr
class BatchedAnnex(object):
class BatchedAnnex(BatchedCommand):
"""Container for an annex process which would allow for persistent communication
"""

def __init__(self, annex_cmd, git_options=None, annex_options=None, path=None,
json=False,
output_proc=None):
json=False, output_proc=None):
if not isinstance(annex_cmd, list):
annex_cmd = [annex_cmd]
self.annex_cmd = annex_cmd
self.git_options = git_options if git_options else []
annex_options = annex_options if annex_options else []
self.annex_options = annex_options + (['--json'] if json else [])
self.path = path
if output_proc is None:
output_proc = readline_json if json else readline_rstripped
self.output_proc = output_proc
self._process = None
self._stderr_out = None
self._stderr_out_fname = None

def _initialize(self):
# TODO -- should get all those options about --debug and --backend which are used/composed
# in AnnexRepo class
lgr.debug("Initiating a new process for %s" % repr(self))
cmd = ['git'] + self.git_options + \
['annex'] + self.annex_cmd + self.annex_options + ['--batch'] # , '--debug']
lgr.log(5, "Command: %s" % cmd)
# TODO: look into _run_annex_command to support default options such as --debug
#
# according to the internet wisdom there is no easy way with subprocess
# while avoid deadlocks etc. We would need to start a thread/subprocess
# to timeout etc
# kwargs = dict(bufsize=1, universal_newlines=True) if PY3 else {}
self._stderr_out, self._stderr_out_fname = tempfile.mkstemp()
self._process = Popen(
cmd, stdin=PIPE, stdout=PIPE, stderr=self._stderr_out,
env=GitRunner.get_git_environ_adjusted(),
cwd=self.path,
bufsize=1,
universal_newlines=True # **kwargs
)

def _check_process(self, restart=False):
"""Check if the process was terminated and restart if restart

Returns
-------
bool
True if process was alive.
str
stderr if any recorded if was terminated
"""
process = self._process
ret = True
ret_stderr = None
if process and process.poll():
lgr.warning("Process %s was terminated with returncode %s" % (process, process.returncode))
ret_stderr = self.close(return_stderr=True)
ret = False
if self._process is None and restart:
lgr.warning("Restarting the process due to previous failure")
self._initialize()
return ret, ret_stderr

def __call__(self, cmds):
"""

Parameters
----------
cmds : str or tuple or list of (str or tuple)

Returns
-------
str or list
Output received from annex. list in case if cmds was a list
"""
# TODO: add checks -- may be process died off and needs to be reinitiated
if not self._process:
self._initialize()

input_multiple = isinstance(cmds, list)
if not input_multiple:
cmds = [cmds]

output = []

for entry in cmds:
if not isinstance(entry, string_types):
entry = ' '.join(entry)
entry = entry + '\n'
lgr.log(5, "Sending %r to batched annex %s" % (entry, self))
# apparently communicate is just a one time show
# stdout, stderr = self._process.communicate(entry)
# according to the internet wisdom there is no easy way with subprocess
self._check_process(restart=True)
process = self._process # _check_process might have restarted it
process.stdin.write(assure_bytes(entry) if PY2 else entry)
process.stdin.flush()
lgr.log(5, "Done sending.")
still_alive, stderr = self._check_process(restart=False)
# TODO: we might want to handle still_alive, e.g. to allow for
# a number of restarts/resends, but it should be per command
# since for some we cannot just resend the same query. But if
# it is just a "get"er - we could resend it few times
# We are expecting a single line output
# TODO: timeouts etc
stdout = assure_unicode(self.output_proc(process.stdout)) \
if not process.stdout.closed else None
if stderr:
lgr.warning("Received output in stderr: %r", stderr)
lgr.log(5, "Received output: %r" % stdout)
output.append(stdout)

return output if input_multiple else output[0]

def __del__(self):
self.close()

def close(self, return_stderr=False):
"""Close communication and wait for process to terminate

Returns
-------
str
stderr output if return_stderr and stderr file was there.
None otherwise
"""
ret = None
if self._stderr_out:
# close possibly still open fd
os.fdopen(self._stderr_out).close()
self._stderr_out = None
if self._process:
process = self._process
lgr.debug(
"Closing stdin of %s and waiting process to finish", process)
process.stdin.close()
process.stdout.close()
process.wait()
self._process = None
lgr.debug("Process %s has finished", process)
if self._stderr_out_fname and os.path.exists(self._stderr_out_fname):
if return_stderr:
with open(self._stderr_out_fname, 'r') as f:
ret = f.read()
# remove the file where we kept dumping stderr
unlink(self._stderr_out_fname)
self._stderr_out_fname = None
return ret
cmd = \
['git'] + \
(git_options if git_options else []) + \
['annex'] + \
annex_cmd + \
(annex_options if annex_options else []) + \
(['--json'] if json else []) + \
['--batch'] # , '--debug']
output_proc = \
output_proc if output_proc else readline_json if json else None
super(BatchedAnnex, self).__init__(
cmd,
path=path,
output_proc=output_proc)


def _get_size_from_perc_complete(count, perc):
Expand Down