Skip to content

Commit

Permalink
Merge pull request #4228 from mih/nf-annexjsonproto
Browse files Browse the repository at this point in the history
NF: WitlessProtocol for `annex ... --json` commands
  • Loading branch information
mih committed Mar 3, 2020
2 parents ce9420a + 54c70f4 commit 3ee322d
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 4 deletions.
7 changes: 5 additions & 2 deletions datalad/cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ def pipe_data_received(self, fd, data):
if self.buffer[fd - 1] is not None:
self.buffer[fd - 1].extend(data)

def process_exited(self):
def _prepare_result(self):
"""Prepares the final result to be returned to the runner
Note for derived classes overwriting this method:
Expand All @@ -315,8 +315,11 @@ class as kwargs on error. The Runner will overwrite 'cmd' and
for name, byt in zip(self.FD_NAMES[1:], self.buffer)
}
results['code'] = return_code
return results

def process_exited(self):
# actually fulfill the future promise and let the execution finish
self.done.set_result(results)
self.done.set_result(self._prepare_result())


class NoCapture(WitlessProtocol):
Expand Down
110 changes: 109 additions & 1 deletion datalad/support/annexrepo.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@
split_cmdline,
unlink,
)
from datalad.support.json_py import loads as json_loads
from datalad.log import log_progress
# must not be loads, because this one would log, and we need to log ourselves
from datalad.support.json_py import json_loads
from datalad.cmd import (
BatchedCommand,
GitRunner,
Expand Down Expand Up @@ -3494,6 +3496,112 @@ def _save_post(self, message, status, partial_commit):
)


class AnnexJsonProtocol(WitlessProtocol):
"""Subprocess communication protocol for `annex ... --json` commands
Importantly, parsed JSON content is returned as a result, not string output.
This protocol also handles git-annex's JSON-style progress reporting.
"""
# capture both streams and handle messaging completely
proc_out = True
proc_err = True

def __init__(self, done_future):
# to collect parsed JSON command output
self.json_out = []
super().__init__(done_future)

def connection_made(self, transport):
super().connection_made(transport)
self._pbars = set()

def pipe_data_received(self, fd, data):
if fd != 1:
# let the base class decide what to do with it
super().pipe_data_received(fd, data)
return
# this is where the JSON records come in
# json_loads() is already logging any error, which is OK, because
# under no circumstances we would expect broken JSON
try:
j = json_loads(data)
except Exception:
# TODO turn this into an error result, or put the exception
# onto the result future -- needs more thought
if data.strip():
# do not complain on empty lines
lgr.error('Received undecodable JSON output: %s', data)
return
# check for progress reports and act on them immediately
# but only if there is something to build a progress report from
if 'action' in j and 'byte-progress' in j:
for err_msg in j['action'].pop('error-messages', []):
lgr.error(err_msg)
# use the action report to build a stable progress bar ID
pbar_id = 'annexprogress-{}-{}'.format(
id(self),
hash(frozenset(j['action'])))
if pbar_id in self._pbars and \
j.get('byte-progress', None) == j.get('total-size', None):
# take a known pbar down, completion or broken report
log_progress(
lgr.info,
pbar_id,
'Finished annex action: {}'.format(j['action']),
)
self._pbars.discard(pbar_id)
# we are done here
return

if pbar_id not in self._pbars and \
j.get('byte-progress', None) != j.get('total-size', None):
# init the pbar, the is some progress left to be made
# worth it
log_progress(
lgr.info,
pbar_id,
'Start annex action: {}'.format(j['action']),
# do not crash if no command is reported
label=j['action'].get('command', ''),
unit=' Bytes',
total=float(j['total-size']),
)
self._pbars.add(pbar_id)
log_progress(
lgr.info,
pbar_id,
j['percent-progress'],
update=float(j['byte-progress']),
)
# do not let progress reports leak into the return value
return
# don't do anything to the results for now in terms of normalization
# TODO the protocol could be made aware of the runner's CWD and
# also any dataset the annex command is operating on. This would
# enable 'file' property conversion to absolute paths
self.json_out.append(j)

def _prepare_result(self):
# first let the base class do its thing
results = super()._prepare_result()
# now amend the results, make clear in the key-name that these records
# came from stdout -- may not be important here or now, but it is easy
# to imagine structured output on stderr at some point
results['stdout_json'] = self.json_out
return results

def process_exited(self):
# take down any progress bars that were not closed orderly
for pbar_id in self._pbars:
log_progress(
lgr.info,
pbar_id,
'Finished',
)
super().process_exited()


# TODO: Why was this commented out?
# @auto_repr
class BatchedAnnexes(SafeDelCloseMixin, dict):
Expand Down
41 changes: 40 additions & 1 deletion datalad/support/tests/test_annexrepo.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@
from unittest.mock import patch
import gc

from datalad.cmd import Runner
from datalad.cmd import (
Runner,
WitlessRunner,
)

from datalad.support.external_versions import external_versions
from datalad.support import path as op
Expand All @@ -64,6 +67,7 @@
assert_not_in,
assert_raises,
assert_re_in,
assert_repo_status,
assert_result_count,
assert_true,
create_tree,
Expand Down Expand Up @@ -116,6 +120,7 @@
from datalad.support.annexrepo import (
_get_size_from_perc_complete,
AnnexRepo,
AnnexJsonProtocol,
ProcessAnnexProgressIndicators,
)
from .utils import check_repo_deals_with_inode_change
Expand Down Expand Up @@ -2118,6 +2123,40 @@ def test_error_reporting(path):
)


@with_tree(tree={
'file1': "content1",
'dir1': {'file2': 'content2'},
})
def test_annexjson_protocol(path):
ar = AnnexRepo(path, create=True)
ar.save()
assert_repo_status(path)
runner = WitlessRunner(cwd=ar.path)
# first an orderly execution
res = runner.run(
['git', 'annex', 'find', '.', '--json'],
protocol=AnnexJsonProtocol)
for k in ('stdout', 'stdout_json', 'stderr'):
assert_in(k, res)
orig_j = res['stdout_json']
eq_(len(orig_j), 2)
# not meant as an exhaustive check for output structure,
# just some assurance that it is not totally alien
ok_(all(j['file'] for j in orig_j))
# no complaints
eq_(res['stderr'], '')

# now the same, but with a forced error
with assert_raises(CommandError) as e:
res = runner.run(
['git', 'annex', 'find', '.', 'error', '--json'],
protocol=AnnexJsonProtocol)
# normal operation is not impaired
eq_(e.stdout_json, orig_j)
# we get a clue what went wrong
assert_in('error not found', e.stderr)


# http://git-annex.branchable.com/bugs/cannot_commit___34__annex_add__34__ed_modified_file_which_switched_its_largefile_status_to_be_committed_to_git_now/#comment-bf70dd0071de1bfdae9fd4f736fd1ec
# https://github.com/datalad/datalad/issues/1651
@known_failure_githubci_win
Expand Down

0 comments on commit 3ee322d

Please sign in to comment.