Skip to content

Commit

Permalink
Merge pull request #341 from yarikoptic/enh-batched-annex
Browse files Browse the repository at this point in the history
ENH:  use addurl in any annex mode so we could quickly get info either file was added to git or annex
  • Loading branch information
yarikoptic committed Feb 1, 2016
2 parents dc57d31 + c52c55f commit a5a4dd8
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 58 deletions.
84 changes: 54 additions & 30 deletions datalad/crawler/nodes/annex.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from os.path import split as ops
from os import unlink, makedirs

from humanize import naturalsize

from ...api import add_archive_content
from ...consts import CRAWLER_META_DIR, CRAWLER_META_CONFIG_FILENAME
from ...utils import rmtree, updated
Expand Down Expand Up @@ -271,6 +273,8 @@ def __call__(self, data): # filename=None, get_disposition_filename=False):
if url:
downloader = self._providers.get_provider(url).get_downloader(url)

# request status since we would need it in either mode
remote_status = downloader.get_status(url)
if lexists(filepath):
# Check if URL provides us updated content. If not -- we should do nothing
# APP1: in this one it would depend on local_status being asked first BUT
Expand All @@ -282,7 +286,7 @@ def __call__(self, data): # filename=None, get_disposition_filename=False):
# APP2: no explicit local_status
# if self.mode != 'full' and fpath == '1-copy.dat':
# import pdb; pdb.set_trace()
remote_status = downloader.get_status(url)

# TODO: what if the file came from another url bearing the same mtime and size????
# unlikely but possible. We would need to provide URL for comparison(s)
if self.mode == 'relaxed' or not self.statusdb.is_different(fpath, remote_status, url):
Expand All @@ -295,47 +299,67 @@ def __call__(self, data): # filename=None, get_disposition_filename=False):
if self.yield_non_updated:
yield updated_data # There might be more to it!
return
elif self.mode != 'full':
# we would need remote_status to set mtime of the symlink
remote_status = downloader.get_status(url)

if not url:
lgr.debug("Adding %s directly into git since no url was provided" % (filepath))
lgr.debug("Adding %s to annex without url being provided" % (filepath))
# So we have only filename
assert(fpath)
# Just add into annex without addurl
_call(self.repo.annex_add, fpath)
_call(stats.increment, 'add_git')
elif self.mode == 'full':
# Since addurl ignores annex.largefiles we need first to download that file and then
# annex add it
# see http://git-annex.branchable.com/todo/make_addurl_respect_annex.largefiles_option
lgr.debug("Downloading %s into %s and adding to annex" % (url, filepath))
def _download_and_git_annex_add(url, fpath):
# Just to feed into _call for dry-run
filepath_downloaded = downloader.download(url, filepath, overwrite=True, stats=stats)
assert(filepath_downloaded == filepath)
self.repo.annex_add(fpath, options=self.options)
# and if the file ended up under annex, and not directly under git -- addurl
# TODO: better function which explicitly checks if file is under annex or either under git
if self.repo.file_has_content(fpath):
stats.add_annex += 1
self.repo.annex_addurl_to_file(fpath, url, batch=True)
else:
stats.add_git += 1
_call(_download_and_git_annex_add, url, fpath)
# Just add into git directly for now
# TODO: tune annex_add so we could use its json output, and may be even batch it
out_json = _call(self.repo.annex_add, fpath, options=self.options)
_call(stats.increment, 'add_annex' if 'key' in out_json else 'add_git')
# elif self.mode == 'full':
# # Since addurl ignores annex.largefiles we need first to download that file and then
# # annex add it
# # see http://git-annex.branchable.com/todo/make_addurl_respect_annex.largefiles_option
# lgr.debug("Downloading %s into %s and adding to annex" % (url, filepath))
# def _download_and_git_annex_add(url, fpath):
# # Just to feed into _call for dry-run
# filepath_downloaded = downloader.download(url, filepath, overwrite=True, stats=stats)
# assert(filepath_downloaded == filepath)
# self.repo.annex_add(fpath, options=self.options)
# # and if the file ended up under annex, and not directly under git -- addurl
# # TODO: better function which explicitly checks if file is under annex or either under git
# if self.repo.file_has_content(fpath):
# stats.add_annex += 1
# self.repo.annex_addurl_to_file(fpath, url, batch=True)
# else:
# stats.add_git += 1
# _call(_download_and_git_annex_add, url, fpath)
else:
# !!!! If file shouldn't get under annex due to largefile setting -- we must download it!!!
# TODO: http://git-annex.branchable.com/todo/make_addurl_respect_annex.largefiles_option/#comment-b43ef555564cc78c6dee2092f7eb9bac
annex_options = self.options + ["--%s" % self.mode]
lgr.debug("Pointing %s to %s within annex in %s mode" % (url, filepath, self.mode))
# we should make use of matchexpression command, but that might reincarnated
# above code so just left it commented out for now
annex_options = self.options
if self.mode == 'full':
lgr.debug("Downloading %s into %s and adding to annex" % (url, filepath))
else:
annex_options = annex_options + ["--%s" % self.mode]
lgr.debug("Pointing %s to %s within annex in %s mode" % (url, filepath, self.mode))

if lexists(filepath):
lgr.debug("Removing %s since it exists before fetching a new copy" % filepath)
_call(unlink, filepath)
_call(stats.increment, 'overwritten')
_call(self.repo.annex_addurl_to_file, fpath, url, options=annex_options, batch=True)
_call(stats.increment, 'add_annex')

# TODO: We need to implement our special remote here since no downloaders used
if self.mode == 'full' and remote_status and remote_status.size: # > 1024**2:
lgr.info("Need to download %s from %s. No progress indication will be reported"
% (naturalsize(remote_status.size), url))
out_json = _call(self.repo.annex_addurl_to_file, fpath, url, options=annex_options, batch=True)
added_to_annex = 'key' in out_json

if self.mode == 'full' or not added_to_annex:
# we need to adjust our download stats since addurl doesn't do that and we do not use our downloaders here
_call(stats.increment, 'downloaded')
_call(stats.increment, 'downloaded_size', _call(lambda: os.stat(filepath).st_size))

if out_json: # if not try -- should be here!
_call(stats.increment, 'add_annex' if added_to_annex else 'add_git')

# So we have downloaded the beast
# Since annex doesn't care to set mtime for the symlink itself we better set it outselves
if remote_status and lexists(filepath): # and islink(filepath):
# Set mtime of the symlink or git-added file itself
# utime dereferences!
Expand Down
40 changes: 26 additions & 14 deletions datalad/crawler/nodes/tests/test_annex.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,24 +51,26 @@ def _test_annex_file(mode, topdir, topurl, outdir):
expected_output = [input.copy()] # nothing to be added/changed
output = list(annex(input))
assert_equal(output, expected_output)

# addurl is batched, and we haven't forced annex flushing so there should
# be a batched process
assert_equal(len(annex.repo._batched), 1)
assert_raises(AssertionError, ok_file_under_git, tfile, annexed=True)
# if we finalize, it should flush batched annexes and commit
list(annex.finalize({}))
assert(lexists(tfile))

ok_file_under_git(tfile, annexed=True)
if mode == 'full':
ok_file_under_git(tfile, annexed=True)
ok_file_has_content(tfile, '1.dat load')
else:
# addurl is batched, and we haven't forced annex flushing so there should
# be a batched process
assert_equal(len(annex.repo._batched), 1)
assert_raises(AssertionError, ok_file_under_git, tfile, annexed=True)
# if we finalize, it should flush batched annexes and commit
list(annex.finalize({}))
# in fast or relaxed mode there must not be any content
assert_raises(AssertionError, ok_file_has_content, tfile, '1.dat load')
assert(lexists(tfile))

whereis = annex.repo.annex_whereis(tfile)
assert_in(annex.repo.WEB_UUID, whereis) # url must have been added
assert_equal(len(whereis), 1 + int(mode=='full'))
assert_equal(len(whereis), 1 + int(mode == 'full'))
# TODO: check the url

# Neither file should not be attempted to download again, since nothing changed
# and by default we do use files db
output = list(annex(input))
Expand All @@ -94,16 +96,16 @@ def _test_annex_file(mode, topdir, topurl, outdir):
assert_equal(stats, ActivityStats(files=2, urls=2, **kwargs))

# Download into a file which will be added to git
# TODO: for now added to git only in full mode. in --fast or --relaxed, still goes to annex
# http://git-annex.branchable.com/bugs/treatment_of_largefiles_is_not_working_for_addurl_--fast___40__or_--relaxed__41__/
input = {'url': "%sd1/1.dat" % topurl, 'filename': '1.txt', 'datalad_stats': ActivityStats()}
tfile = opj(outdir, '1.txt')
output = list(annex(input))
annexed = mode not in {'full'}
list(annex.finalize({}))
if not annexed:
ok_file_has_content(tfile, '1.dat load+')
else:
# TODO: unfortunately we can't decide to add .txt without providing
# our own parser for annex.largefiles ATM
list(annex.finalize({}))
assert_raises(AssertionError, ok_file_has_content, tfile, '1.dat load+')
ok_file_under_git(tfile, annexed=annexed)
assert_equal(len(output), 1)
Expand All @@ -112,6 +114,16 @@ def _test_annex_file(mode, topdir, topurl, outdir):
stats.downloaded_time = 0
assert_equal(stats, ActivityStats(files=1, urls=1, add_git=1-int(annexed), add_annex=int(annexed), **download_stats))

# Let's add a file without specifying URL
sfilepath = opj(outdir, 'sample.txt')
with open(sfilepath, 'w') as f:
f.write("sample")
ok_file_has_content(sfilepath, "sample")
output = list(annex({'filename': 'sample.txt', 'datalad_stats': ActivityStats()}))
ok_file_under_git(sfilepath, annexed=False)
assert(output)
assert_equal(output[0]['datalad_stats'], ActivityStats(files=1, add_git=1))


def test_annex_file():
for mode in ('full', 'fast', 'relaxed',):
Expand All @@ -127,7 +139,7 @@ def _test_add_archive_content_tar(direct, repo_path):
allow_dirty=True,
mode=mode,
direct=direct,
options=["-c", "annex.largefiles=exclude=*.txt"])
options=["-c", "annex.largefiles=exclude=*.txt and exclude=SOMEOTHER"])
output_add = list(annex({'filename': '1.tar'})) # adding it to annex
assert_equal(output_add, [{'filename': '1.tar'}])

Expand Down
11 changes: 9 additions & 2 deletions datalad/crawler/pipelines/openfmri.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
"""A pipeline for crawling openfmri dataset"""

import os
from os.path import lexists

# Import necessary nodes
from ..nodes.crawl_url import crawl_url
from ..nodes.matches import css_match, a_href_match
Expand All @@ -26,6 +29,8 @@

def extract_readme(data):
# TODO - extract data from the page/response
if lexists("README.txt"):
os.unlink("README.txt")
with open("README.txt", "w") as f:
f.write("OpenfMRI dataset from %(url)s" % data)
lgr.info("Generated README.txt")
Expand All @@ -38,7 +43,7 @@ def pipeline(dataset, versioned_urls=True):
lgr.info("Creating a pipeline for the openfmri dataset %s" % dataset)
annex = Annexificator(
create=False, # must be already initialized etc
options=["-c", "annex.largefiles=exclude=*.txt and exclude=README"])
options=["-c", "annex.largefiles=exclude=*.txt and exclude=*.json and exclude=README* and exclude=*.[mc]"])

return [
annex.switch_branch('incoming'),
Expand Down Expand Up @@ -111,7 +116,9 @@ def pipeline(dataset, versioned_urls=True):
[ # nested pipeline so we could skip it entirely if nothing new to be merged
annex.merge_branch('incoming', strategy='theirs', commit=False),
[ # Pipeline to augment content of the incoming and commit it to master
find_files("\.(tgz|tar(\..+)?)$", fail_if_none=True), # So we fail if none found -- there must be some! ;)),
# There might be archives within archives, so we need to loop
{'loop': True},
find_files("\.(zip|tgz|tar(\..+)?)$", fail_if_none=True), # we fail if none found -- there must be some! ;)),
annex.add_archive_content(
#rename=[
# r"|^[^/]*/(.*)|\1" # e.g. to strip leading dir, or could prepend etc
Expand Down
8 changes: 4 additions & 4 deletions datalad/support/annexrepo.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,7 @@ def annex_add(self, files, backend=None, options=None):
"""
options = options[:] if options else []

self._run_annex_command('add', annex_options=options + files,
backend=backend)
return list(self._run_annex_command_json('add', args=options + files, backend=backend))

def annex_proxy(self, git_cmd, **kwargs):
"""Use git-annex as a proxy to git
Expand Down Expand Up @@ -741,14 +740,15 @@ def _whereis_json_to_dict(self, j):
return remotes


def _run_annex_command_json(self, command, args=[]):
def _run_annex_command_json(self, command, args=[], **kwargs):
"""Run an annex command with --json and load output results into a tuple of dicts
"""
try:
# TODO: refactor to account for possible --batch ones
out, err = self._run_annex_command(
command,
annex_options=['--json'] + args)
annex_options=['--json'] + args,
**kwargs)
except CommandError as e:
# if multiple files, whereis may technically fail,
# but still returns correct response
Expand Down
4 changes: 2 additions & 2 deletions datalad/support/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ def _get_updated_total(self):
out[k] = out[k] + v
return out

def increment(self, k):
def increment(self, k, v=1):
"""Helper for incrementing counters"""
self._current[k] += 1
self._current[k] += v

def _reset_values(self, d, vals={}):
for c in _COUNTS:
Expand Down
24 changes: 20 additions & 4 deletions datalad/support/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,29 @@ def __eq__(self, other):
if other.size is None and other.mtime is None and other.filename is None:
return NotImplemented

same = \
self.size == other.size and \
self.filename == other.filename
if not same:
return False

# now deal with time.

# TODO: provide a config option for mtime comparison precision
# we might want to claim times equal up to a second precision
# since e.g. some file systems do not even store sub-sec timing
return \
self.size == other.size and \
self.mtime == other.mtime and \
self.filename == other.filename
# TODO: config crawl.mtime_delta

# if any of them int and another float -- we need to trim float to int
if self.mtime == other.mtime:
return True
elif self.mtime is None or other.mtime is None:
return False

# none is None if here and not equal exactly
if isinstance(self.mtime, int) or isinstance(other.mtime, int):
return int(self.mtime) == int(other.mtime)
return False

def __ne__(self, other):
out = self == other
Expand Down
6 changes: 6 additions & 0 deletions datalad/support/tests/test_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@
def test_FileStatus_basic():
assert_equal(FileStatus(size=0), FileStatus(size=0))
assert_not_equal(FileStatus(size=0), FileStatus(size=1))
# mtimes allow trimming if one is int
assert_equal(FileStatus(mtime=0), FileStatus(mtime=0.9999))
assert_equal(FileStatus(mtime=0), FileStatus(mtime=0.0001))
assert_not_equal(FileStatus(mtime=0.2), FileStatus(mtime=0.1))
assert_not_equal(FileStatus(mtime=0.2), FileStatus(mtime=None))
assert_not_equal(FileStatus(mtime=1), FileStatus(mtime=None))
# adding more information would result in not-equal
assert_not_equal(FileStatus(size=0), FileStatus(size=0, mtime=123))
# empty ones can't be compared
Expand Down
4 changes: 3 additions & 1 deletion datalad/tests/test_annexrepo.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,15 +151,17 @@ def test_AnnexRepo_annex_add(src, annex_path):
f = open(filename_abs, 'w')
f.write("What to write?")
f.close()
ar.annex_add(filename)
out_json = ar.annex_add(filename)
if not ar.is_direct_mode():
assert_true(os.path.islink(filename_abs),
"Annexed file is not a link.")
else:
assert_false(os.path.islink(filename_abs),
"Annexed file is link in direct mode.")
assert_in('key', out_json)
key = ar.get_file_key(filename)
assert_false(key == '')
assert_equal(key, out_json['key'])
# could test for the actual key, but if there's something
# and no exception raised, it's fine anyway.

Expand Down
2 changes: 1 addition & 1 deletion datalad/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ def lmtime(filepath, mtime):
os.utime(filepath, (time.time(), mtime))
else:
def lmtime(filepath, mtime):
"""Set mtime for files, while de-referencing symlinks.
"""Set mtime for files, while not de-referencing symlinks.
To overcome absence of os.lutime
Expand Down

0 comments on commit a5a4dd8

Please sign in to comment.