Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NF: WitlessProtocol for annex ... --json commands #4228

Merged
merged 1 commit into from
Mar 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 5 additions & 2 deletions datalad/cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ def pipe_data_received(self, fd, data):
if self.buffer[fd - 1] is not None:
self.buffer[fd - 1].extend(data)

def process_exited(self):
def _prepare_result(self):
"""Prepares the final result to be returned to the runner

Note for derived classes overwriting this method:
Expand All @@ -315,8 +315,11 @@ class as kwargs on error. The Runner will overwrite 'cmd' and
for name, byt in zip(self.FD_NAMES[1:], self.buffer)
}
results['code'] = return_code
return results

def process_exited(self):
# actually fulfill the future promise and let the execution finish
self.done.set_result(results)
self.done.set_result(self._prepare_result())


class NoCapture(WitlessProtocol):
Expand Down
110 changes: 109 additions & 1 deletion datalad/support/annexrepo.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@
split_cmdline,
unlink,
)
from datalad.support.json_py import loads as json_loads
from datalad.log import log_progress
# must not be loads, because this one would log, and we need to log ourselves
from datalad.support.json_py import json_loads
from datalad.cmd import (
BatchedCommand,
GitRunner,
Expand Down Expand Up @@ -3494,6 +3496,112 @@ def _save_post(self, message, status, partial_commit):
)


class AnnexJsonProtocol(WitlessProtocol):
"""Subprocess communication protocol for `annex ... --json` commands

Importantly, parsed JSON content is returned as a result, not string output.

This protocol also handles git-annex's JSON-style progress reporting.
"""
# capture both streams and handle messaging completely
proc_out = True
proc_err = True

def __init__(self, done_future):
# to collect parsed JSON command output
self.json_out = []
super().__init__(done_future)

def connection_made(self, transport):
super().connection_made(transport)
self._pbars = set()

def pipe_data_received(self, fd, data):
if fd != 1:
# let the base class decide what to do with it
super().pipe_data_received(fd, data)
return
# this is where the JSON records come in
# json_loads() is already logging any error, which is OK, because
# under no circumstances we would expect broken JSON
try:
j = json_loads(data)
except Exception:
# TODO turn this into an error result, or put the exception
# onto the result future -- needs more thought
if data.strip():
# do not complain on empty lines
lgr.error('Received undecodable JSON output: %s', data)
return
# check for progress reports and act on them immediately
# but only if there is something to build a progress report from
if 'action' in j and 'byte-progress' in j:
for err_msg in j['action'].pop('error-messages', []):
lgr.error(err_msg)
# use the action report to build a stable progress bar ID
pbar_id = 'annexprogress-{}-{}'.format(
id(self),
hash(frozenset(j['action'])))
if pbar_id in self._pbars and \
j.get('byte-progress', None) == j.get('total-size', None):
# take a known pbar down, completion or broken report
log_progress(
lgr.info,
pbar_id,
'Finished annex action: {}'.format(j['action']),
)
self._pbars.discard(pbar_id)
# we are done here
return

if pbar_id not in self._pbars and \
j.get('byte-progress', None) != j.get('total-size', None):
# init the pbar, the is some progress left to be made
# worth it
log_progress(
lgr.info,
pbar_id,
'Start annex action: {}'.format(j['action']),
# do not crash if no command is reported
label=j['action'].get('command', ''),
unit=' Bytes',
total=float(j['total-size']),
)
self._pbars.add(pbar_id)
log_progress(
lgr.info,
pbar_id,
j['percent-progress'],
update=float(j['byte-progress']),
)
# do not let progress reports leak into the return value
return
# don't do anything to the results for now in terms of normalization
# TODO the protocol could be made aware of the runner's CWD and
# also any dataset the annex command is operating on. This would
# enable 'file' property conversion to absolute paths
self.json_out.append(j)

def _prepare_result(self):
# first let the base class do its thing
results = super()._prepare_result()
# now amend the results, make clear in the key-name that these records
# came from stdout -- may not be important here or now, but it is easy
# to imagine structured output on stderr at some point
results['stdout_json'] = self.json_out
return results

def process_exited(self):
# take down any progress bars that were not closed orderly
for pbar_id in self._pbars:
log_progress(
lgr.info,
pbar_id,
'Finished',
)
super().process_exited()


# TODO: Why was this commented out?
# @auto_repr
class BatchedAnnexes(SafeDelCloseMixin, dict):
Expand Down
41 changes: 40 additions & 1 deletion datalad/support/tests/test_annexrepo.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@
from unittest.mock import patch
import gc

from datalad.cmd import Runner
from datalad.cmd import (
Runner,
WitlessRunner,
)

from datalad.support.external_versions import external_versions
from datalad.support import path as op
Expand All @@ -64,6 +67,7 @@
assert_not_in,
assert_raises,
assert_re_in,
assert_repo_status,
assert_result_count,
assert_true,
create_tree,
Expand Down Expand Up @@ -116,6 +120,7 @@
from datalad.support.annexrepo import (
_get_size_from_perc_complete,
AnnexRepo,
AnnexJsonProtocol,
ProcessAnnexProgressIndicators,
)
from .utils import check_repo_deals_with_inode_change
Expand Down Expand Up @@ -2118,6 +2123,40 @@ def test_error_reporting(path):
)


@with_tree(tree={
'file1': "content1",
'dir1': {'file2': 'content2'},
})
def test_annexjson_protocol(path):
ar = AnnexRepo(path, create=True)
ar.save()
assert_repo_status(path)
runner = WitlessRunner(cwd=ar.path)
# first an orderly execution
res = runner.run(
['git', 'annex', 'find', '.', '--json'],
protocol=AnnexJsonProtocol)
for k in ('stdout', 'stdout_json', 'stderr'):
assert_in(k, res)
orig_j = res['stdout_json']
eq_(len(orig_j), 2)
# not meant as an exhaustive check for output structure,
# just some assurance that it is not totally alien
ok_(all(j['file'] for j in orig_j))
# no complaints
eq_(res['stderr'], '')

# now the same, but with a forced error
with assert_raises(CommandError) as e:
res = runner.run(
['git', 'annex', 'find', '.', 'error', '--json'],
protocol=AnnexJsonProtocol)
# normal operation is not impaired
eq_(e.stdout_json, orig_j)
# we get a clue what went wrong
assert_in('error not found', e.stderr)


# http://git-annex.branchable.com/bugs/cannot_commit___34__annex_add__34__ed_modified_file_which_switched_its_largefile_status_to_be_committed_to_git_now/#comment-bf70dd0071de1bfdae9fd4f736fd1ec
# https://github.com/datalad/datalad/issues/1651
@known_failure_githubci_win
Expand Down