Skip to content

Commit

Permalink
Merge pull request #334 from yarikoptic/enh-batched-annex
Browse files Browse the repository at this point in the history
ENH: various enhancements, primarily starting to use --batch mode for some annex commands
  • Loading branch information
yarikoptic committed Jan 27, 2016
2 parents 9921d42 + 56334a2 commit 1ad42d6
Show file tree
Hide file tree
Showing 51 changed files with 1,671 additions and 424 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ On Debian-based systems we recommend to enable
backports of recent fixed external modules we depend upon:

```sh
apt-get install patool python-bs4 python-git python-joblib python-testtools python-mock python-nose git-annex-standalone
apt-get install patool python-bs4 python-git python-testtools python-mock python-nose git-annex-standalone
```

or otherwise you can use pip to install Python modules
Expand Down
16 changes: 14 additions & 2 deletions datalad/auto.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"""Proxy basic file operations (such as open) to obtain files automagically upon I/O
"""

import sys
from mock import patch
from six import PY2
import six.moves.builtins as __builtin__
Expand All @@ -24,6 +25,7 @@
from os.path import dirname, abspath, pardir, join as opj, exists, basename, lexists
from git.exc import InvalidGitRepositoryError

from .dochelpers import exc_str
from .support.annexrepo import AnnexRepo
from .support.gitrepo import GitRepo
from .support.exceptions import CommandError
Expand Down Expand Up @@ -53,6 +55,7 @@ def __init__(self, autoget=True, activate=False):
self._h5py_File = None
self._autoget = autoget
self._in_open = False
self._log_online = True
if activate:
self.activate()

Expand Down Expand Up @@ -113,7 +116,7 @@ def _proxy_open_name_mode(self, origname, origfunc, *args, **kwargs):
except Exception as e:
# If anything goes wrong -- we should complain and proceed
with patch(origname, origfunc):
lgr.warning("Failed proxying open with %r, %r: %s", args, kwargs, e)
lgr.warning("Failed proxying open with %r, %r: %s", args, kwargs, exc_str(e))
finally:
self._in_open = False
# finally give it back to stock open
Expand Down Expand Up @@ -151,9 +154,18 @@ def _handle_auto_get(self, filepath):
# either it has content
if not annex.file_has_content(filepath):
lgr.info("File %s has no content -- retrieving", filepath)
annex.annex_get(filepath)
annex.annex_get(filepath, log_online=self._log_online)

def activate(self):
# Some beasts (e.g. tornado used by IPython) override outputs, and
# provide fileno which throws exception. In such cases we should not log online
self._log_online = hasattr(sys.stdout, 'fileno') and hasattr(sys.stderr, 'fileno')
try:
if self._log_online:
sys.stdout.fileno()
sys.stderr.fileno()
except:
self._log_online = False
if self.active:
lgr.warning("%s already active. No action taken" % self)
return
Expand Down
13 changes: 10 additions & 3 deletions datalad/cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
import shutil
import shlex

from six import PY3
from six import PY3, PY2
from six import string_types, binary_type

from .dochelpers import exc_str
from .support.exceptions import CommandError
from .support.protocol import NullProtocol, DryRunProtocol
from .utils import on_windows
Expand All @@ -31,6 +32,11 @@

_TEMP_std = sys.stdout, sys.stderr

if PY2:
# TODO apparently there is a recommended substitution for Python2
# which is a backported implementation of python3 subprocess
# https://pypi.python.org/pypi/subprocess32/
pass

class Runner(object):
"""Provides a wrapper for calling functions and commands.
Expand Down Expand Up @@ -231,7 +237,7 @@ def run(self, cmd, log_stdout=True, log_stderr=True, log_online=False,
except Exception as e:
prot_exc = e
lgr.error("Failed to start %r%r: %s" %
(cmd, " under %r" % cwd if cwd else '', e))
(cmd, " under %r" % cwd if cwd else '', exc_str(e)))
raise

finally:
Expand Down Expand Up @@ -348,13 +354,14 @@ def link_file_load(src, dst, dry_run=False):

try:
os.link(src_realpath, dst)
except AttributeError as e:
except AttributeError as e:
lgr.warn("Linking of %s failed (%s), copying file" % (src, e))
shutil.copyfile(src_realpath, dst)
shutil.copystat(src_realpath, dst)
else:
lgr.log(1, "Hardlinking finished")


def get_runner(*args, **kwargs):
# TODO: this is all crawl specific -- should be moved away
if cfg.getboolean('crawl', 'dryrun', default=False):
Expand Down
6 changes: 4 additions & 2 deletions datalad/crawler/dbs/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"""

import os
from os.path import join as opj, exists, lexists, islink, realpath
from os.path import join as opj, exists, lexists, islink, realpath, basename

from ...dochelpers import exc_str
from ...support.status import FileStatus
Expand Down Expand Up @@ -74,7 +74,7 @@ def get(self, fpath):
with swallow_logs():
info = self.annex.annex_info(fpath)
size = info['size']
except CommandError as exc:
except (CommandError, TypeError) as exc:
# must be under git or a plain file
lgr.debug("File %s must be not under annex, since info failed: %s" % (filepath, exc_str(exc)))
size = filestat.st_size
Expand All @@ -100,4 +100,6 @@ def is_different(self, fpath, status, url=None):
# TODO: make use of URL -- we should validate that url is among those associated
# with the file
old_status = self.get(fpath)
if status.filename and not old_status.filename:
old_status.filename = basename(fpath)
return old_status != status
8 changes: 8 additions & 0 deletions datalad/crawler/dbs/tests/test_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from ....tests.utils import with_tree
from ....tests.utils import assert_equal
from ....tests.utils import assert_false
from ....tests.utils import assert_true
from ....tests.utils import chpwd
from ....support.annexrepo import AnnexRepo

Expand Down Expand Up @@ -42,6 +43,13 @@ def test_AnnexFileAttributesDB(path):
status1_ = db.get('file1.txt')
assert_equal(status1, status1_)
assert_false(db.is_different('file1.txt', status1))
assert_false(db.is_different('file1.txt', status1_))
# even if we add a filename specification
status1_.filename = 'file1.txt'
assert_false(db.is_different('file1.txt', status1_))
status1_.filename = 'different.txt'
assert_true(db.is_different('file1.txt', status1_))


os.unlink(filepath1) # under annex- - we don't have unlock yet and thus can't inplace augment
with open(filepath1, 'a') as f:
Expand Down
124 changes: 71 additions & 53 deletions datalad/crawler/nodes/annex.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import os
import time
from os.path import expanduser, join as opj, exists, isabs, lexists, islink, realpath
from os.path import split as ops
from os import unlink, makedirs

from ...api import add_archive_content
Expand Down Expand Up @@ -146,11 +147,12 @@ def __call__(self, data={}):
if exists(handle_path):
if self.existing == 'skip':
yield data
return
elif self.existing == 'raise':
raise RuntimeError("%s already exists" % handle_path)
elif self.existing == 'replace':
_call(rmtree, handle_path)
else:
else: # TODO: 'crawl' ;)
raise ValueError(self.existing)
_call(self._initiate_handle, handle_path, handle_name)
_call(self._save_crawl_config, handle_path, handle_name, data)
Expand Down Expand Up @@ -218,17 +220,17 @@ def __init__(self, path=None, mode='full', options=None,
self.statusdb = statusdb


def add(self, filename, url=None):
# TODO: modes
self.repo.annex_addurl_to_file(filename, url #, TODO backend
)
raise NotImplementedError()

def addurl(self, url, filename):
raise NotImplementedError()
# TODO: register url within "The DB" after it was added
self.register_url_in_db(url, filename)

# def add(self, filename, url=None):
# # TODO: modes
# self.repo.annex_addurl_to_file(filename, url, batch=True #, TODO backend
# )
# raise NotImplementedError()
#
# def addurl(self, url, filename):
# raise NotImplementedError()
# # TODO: register url within "The DB" after it was added
# self.register_url_in_db(url, filename)
#
def register_url_in_db(self, url, filename):
# might need to go outside -- since has nothing to do with self
raise NotImplementedError()
Expand Down Expand Up @@ -256,31 +258,13 @@ def __call__(self, data): # filename=None, get_disposition_filename=False):
if url:
stats.urls += 1

# figure out the filename. If disposition one was needed, pipeline should
# have had it explicitly
fpath = filename = \
data['filename'] if 'filename' in data else self._get_filename_from_url(url)

stats.files += 1
if filename is None:
stats.skipped += 1
raise ValueError("No filename were provided or could be deduced from url=%r" % url)
elif isabs(filename):
stats.skipped += 1
raise ValueError("Got absolute filename %r" % filename)

path_ = data.get('path', None)
if path_:
# TODO: test all this handling of provided paths
if isabs(path_):
stats.skipped += 1
raise ValueError("Absolute path %s was provided" % path_)
fpath = opj(path_, fpath)
fpath = self._get_fpath(data, stats, url)
filepath = opj(self.repo.path, fpath)

lgr.debug("Request to annex %(url)s to %(fpath)s", locals())

updated_data = updated(data, {'filename': filename,
# since filename could have come from url -- let's update with it
updated_data = updated(data, {'filename': ops(fpath)[1],
#TODO? 'filepath': filepath
})
remote_status = None
Expand Down Expand Up @@ -318,9 +302,9 @@ def __call__(self, data): # filename=None, get_disposition_filename=False):
if not url:
lgr.debug("Adding %s directly into git since no url was provided" % (filepath))
# So we have only filename
assert(filename)
# Thus add directly into git
_call(self.repo.git_add, filename)
assert(fpath)
# Just add into annex without addurl
_call(self.repo.annex_add, fpath)
_call(stats.increment, 'add_git')
elif self.mode == 'full':
# Since addurl ignores annex.largefiles we need first to download that file and then
Expand All @@ -336,7 +320,7 @@ def _download_and_git_annex_add(url, fpath):
# TODO: better function which explicitly checks if file is under annex or either under git
if self.repo.file_has_content(fpath):
stats.add_annex += 1
self.repo.annex_addurl_to_file(fpath, url)
self.repo.annex_addurl_to_file(fpath, url, batch=True)
else:
stats.add_git += 1
_call(_download_and_git_annex_add, url, fpath)
Expand All @@ -349,7 +333,7 @@ def _download_and_git_annex_add(url, fpath):
lgr.debug("Removing %s since it exists before fetching a new copy" % filepath)
_call(unlink, filepath)
_call(stats.increment, 'overwritten')
_call(self.repo.annex_addurl_to_file, fpath, url, options=annex_options)
_call(self.repo.annex_addurl_to_file, fpath, url, options=annex_options, batch=True)
_call(stats.increment, 'add_annex')

if remote_status and lexists(filepath): # and islink(filepath):
Expand Down Expand Up @@ -377,6 +361,33 @@ def _download_and_git_annex_add(url, fpath):
# with subsequent "drop" leaves no record that it ever was here
yield updated_data # There might be more to it!

def _get_fpath(self, data, stats, url=None):
"""Return relative path (fpath) to the file based on information in data or url
"""
# figure out the filename. If disposition one was needed, pipeline should
# have had it explicitly
fpath = filename = \
data['filename'] if 'filename' in data else self._get_filename_from_url(url)

stats.files += 1

if filename is None:
stats.skipped += 1
raise ValueError("No filename were provided or could be deduced from url=%r" % url)
elif isabs(filename):
stats.skipped += 1
raise ValueError("Got absolute filename %r" % filename)

path_ = data.get('path', None)
if path_:
# TODO: test all this handling of provided paths
if isabs(path_):
stats.skipped += 1
raise ValueError("Absolute path %s was provided" % path_)
fpath = opj(path_, fpath)

return fpath

def switch_branch(self, branch, parent=None):
"""Node generator to switch branch, returns actual node
Expand Down Expand Up @@ -433,6 +444,8 @@ def merge_branch(self, branch, strategy=None, commit=True, skip_no_changes=None)
assert(strategy in (None, 'theirs'))

def merge_branch(data):
if self.repo.dirty:
raise RuntimeError("Requested to merge another branch while current state is dirty")
last_merged_checksum = self.repo.git_get_merge_base([self.repo.git_get_active_branch(), branch])
if last_merged_checksum == self.repo.git_get_hexsha(branch):
lgr.debug("Branch %s doesn't provide any new commits for current HEAD" % branch)
Expand All @@ -453,7 +466,7 @@ def merge_branch(data):
self.repo.cmd_call_wrapper.run("git read-tree -m -u %s" % branch)
self.repo.annex_add('.', options=self.options) # so everything is staged to be committed
if commit:
self._commit("Merged %s using strategy %s" % (branch, strategy), options="-a")
self._commit("Merged %s using strategy %s" % (branch, strategy), options=["-a"])
else:
# Record into our activity stats
stats = data.get('datalad_stats', None)
Expand All @@ -462,17 +475,18 @@ def merge_branch(data):
yield data
return merge_branch

def _commit(self, msg=None, options=''):
def _commit(self, msg=None, options=[]):
# We need a custom commit due to "fancy" merges and GitPython
# not supporting that ATM
# https://github.com/gitpython-developers/GitPython/issues/361
if msg is not None:
options += " -m %r" % msg
self.repo.cmd_call_wrapper.run("git commit %s" % options)
options = options + ["-m", msg]
self.repo.precommit() # so that all batched annexes stop
self.repo.cmd_call_wrapper.run(["git", "commit"] + options)


#TODO: @borrow_kwargs from api_add_...
def add_archive_content(self, **aac_kwargs):
def add_archive_content(self, commit=False, **aac_kwargs):
"""
Parameters
Expand All @@ -481,33 +495,37 @@ def add_archive_content(self, **aac_kwargs):
Options to pass into api.add_archive_content
"""
def _add_archive_content(data):
archive = data['path']
# if no stats -- they will be brand new each time :-/
stats = data.get('datalad_stats', ActivityStats())
archive = self._get_fpath(data, stats)
# TODO: may be adjust annex_options
#import pdb; pdb.set_trace()
annex = add_archive_content(
archive, annex=self.repo,
delete=True, key=False, commit=False, allow_dirty=True,
delete=True, key=False, commit=commit, allow_dirty=True,
annex_options=self.options,
stats=data.get('datalad_stats', None),
stats=stats,
**aac_kwargs
)
assert(annex is self.repo) # must be the same annex, and no new created
# TODO: how to propagate statistics from this call into commit msg
# since we commit=False here
# Probably we should carry though 'data' somehow so it gets accumulated
# until commit...?
yield data
# to propagate statistics from this call into commit msg since we commit=False here
# we update data with stats which gets a new instance if wasn't present
yield updated(data, {'datalad_stats': stats})
return _add_archive_content

# TODO: either separate out commit or allow to pass a custom commit msg?
def finalize(self, data):
"""Finalize operations -- commit uncommited, prune abandoned? etc"""
self.repo.precommit()
if self.repo.dirty: # or self.tracker.dirty # for dry run
lgr.info("Repository found dirty -- adding and committing")
# # TODO: introduce activities tracker
_call(self.repo.annex_add, '.', options=self.options) # so everything is committed
_call(self.repo.annex_add, '.', options=self.options) # so everything is committed
stats = data.get('datalad_stats', None)
stats_str = stats.as_str(mode='line') if stats else ''
_call(self._commit, "Finalizing %s %s" % (','.join(self._states), stats_str), options="-a")
_call(self._commit, "Finalizing %s %s" % (','.join(self._states), stats_str), options=["-a"])
if stats:
_call(stats.reset)
else:
lgr.info("Found branch non-dirty - nothing is committed")
self._states = []
Expand Down

0 comments on commit 1ad42d6

Please sign in to comment.