Skip to content

Commit

Permalink
Merge pull request #348 from yarikoptic/nf-crcns
Browse files Browse the repository at this point in the history
ENH: remove_obsolete  so we could remove files no longer referenced
  • Loading branch information
yarikoptic committed Feb 12, 2016
2 parents 2fd3268 + 5365840 commit 399c9e9
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 20 deletions.
45 changes: 40 additions & 5 deletions datalad/crawler/dbs/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@
"""

import os
from os.path import join as opj, exists, lexists, islink, realpath, basename
from os.path import join as opj, exists, lexists, islink, realpath, basename, normpath
from os.path import isabs

from ...dochelpers import exc_str
from ...support.status import FileStatus
from ...support.exceptions import CommandError
from ...utils import auto_repr
from ...utils import swallow_logs
from ...utils import find_files
from ...consts import HANDLE_META_DIR

import logging
lgr = logging.getLogger('datalad.crawler.dbs')
Expand Down Expand Up @@ -51,6 +54,12 @@ def track_queried(self):
def queried_filepaths(self):
return self._queried_filepaths

def _get_fullpath(self, fpath):
if isabs(fpath):
return normpath(fpath)
else:
return normpath(opj(self.annex.path, fpath))

# TODO: think if default should be provided
def get(self, fpath):
"""Given a file (under annex) relative path, return its status record
Expand All @@ -62,11 +71,12 @@ def get(self, fpath):
fpath: str
Path (relative to the top of the repo) of the file to get stats of
"""
filepath = opj(self.annex.path, fpath)
filepath = self._get_fullpath(fpath)
if self._track_queried:
self._queried_filepaths.add(filepath)

assert(lexists(filepath)) # of check and return None?
if not lexists(filepath):
return None

# I wish I could just test using filesystem stats but that would not
# be reliable, and also file might not even be here.
Expand Down Expand Up @@ -96,8 +106,11 @@ def get(self, fpath):
mtime=mtime
)

def set(self, fpath, status):
# This DB doesn't implement it
def set(self, fpath, status=None):
# This DB doesn't implement much of it, besides marking internally that we do care about this file
filepath = self._get_fullpath(fpath)
if self._track_queried:
self._queried_filepaths.add(filepath)
pass

def is_different(self, fpath, status, url=None):
Expand All @@ -109,3 +122,25 @@ def is_different(self, fpath, status, url=None):
if status.filename and not old_status.filename:
old_status.filename = basename(fpath)
return old_status != status

def get_obsolete(self):
"""Returns paths which weren't queried, thus must have been deleted
Note that it doesn't track across branches etc.
"""
if not self._track_queried:
raise RuntimeError("Cannot determine which files were removed since track_queried was set to False")
obsolete = []
# those aren't tracked by annexificator
datalad_path = opj(self.annex.path, HANDLE_META_DIR)
for fpath in find_files('.*', topdir=self.annex.path):
filepath = self._get_fullpath(fpath)
if filepath.startswith(datalad_path):
continue
if fpath not in self._queried_filepaths:
obsolete.append(filepath)
return obsolete

def reset(self):
"""Reset internal state, e.g. about known queried filedpaths"""
self._queried_filepaths = set()
29 changes: 28 additions & 1 deletion datalad/crawler/dbs/tests/test_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##

import os
from os.path import join as opj
from os.path import join as opj, curdir, sep
from ..files import AnnexFileAttributesDB

from ....tests.utils import with_tree
Expand Down Expand Up @@ -70,3 +70,30 @@ def test_AnnexFileAttributesDB(path):
#with chpwd(opj(path, 'd')):
# status2_dir = db.get('./file2.txt')
# assert_equal(status2, status2_dir)

# since we asked about each file we added to DB/annex -- none should be
# known as "deleted"
assert_equal(db.get_obsolete(), [])

# but if we create another db which wasn't queried yet
db2 = AnnexFileAttributesDB(annex=annex)
# all files should be returned
assert_equal(
set(db2.get_obsolete()),
{opj(path, p) for p in ['file1.txt', filep2, '2git']})
# and if we query one, it shouldn't be listed as deleted any more
status2_ = db2.get(filep2)
assert_equal(status2, status2_)
assert_equal(
set(db2.get_obsolete()),
{opj(path, p) for p in ['file1.txt', '2git']})

# and if we queried with ./ prefix, should work still
db2.get(curdir + sep + 'file1.txt')
assert_equal(
set(db2.get_obsolete()),
{opj(path, p) for p in ['2git']})

# and if we queried with full path, should work still
db2.get(opj(path, '2git'))
assert_equal(db2.get_obsolete(), [])
59 changes: 48 additions & 11 deletions datalad/crawler/nodes/annex.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
# copyright and license terms.
#
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
"""Nodes to interact with annex -- add files etc
"""Nodes to interact with annex -- initiate a new handle or operate with existing one
via Annexificator class, which could be used to add files, checkout branches etc
"""

import os
Expand Down Expand Up @@ -271,6 +273,9 @@ def register_url_in_db(self, url, filename):
# might need to go outside -- since has nothing to do with self
raise NotImplementedError()

def reset(self):
self.statusdb.reset()

@staticmethod
def _get_filename_from_url(url):
if url is None:
Expand Down Expand Up @@ -333,6 +338,10 @@ def __call__(self, data): # filename=None, get_disposition_filename=False):
if self.yield_non_updated:
yield updated_data # There might be more to it!
return
else:
# just to mark file as still of interest to us so it doesn't get wiped out later
# as it should have happened if we removed creation/tracking of that file intentionally
self.statusdb.get(fpath)

if not url:
lgr.debug("Adding %s to annex without url being provided" % (filepath))
Expand Down Expand Up @@ -396,14 +405,20 @@ def __call__(self, data): # filename=None, get_disposition_filename=False):

# So we have downloaded the beast
# Since annex doesn't care to set mtime for the symlink itself we better set it outselves
if remote_status and lexists(filepath): # and islink(filepath):
# Set mtime of the symlink or git-added file itself
# utime dereferences!
# _call(os.utime, filepath, (time.time(), remote_status.mtime))
# *nix only! TODO
_call(lmtime, filepath, remote_status.mtime)
if lexists(filepath): # and islink(filepath):
if remote_status:
# Set mtime of the symlink or git-added file itself
# utime dereferences!
# _call(os.utime, filepath, (time.time(), remote_status.mtime))
# *nix only! TODO
_call(lmtime, filepath, remote_status.mtime)
_call(self.statusdb.set, filepath, remote_status)
else:
# we still need to inform db about this file so later it would signal to remove it
# if we no longer care about it
_call(self.statusdb.set, filepath)

self._states.add("Added files to git/annex")
self._states.add("Updated git/annex from a remote location")

# WiP: commented out to do testing before merge
# db_filename = self.db.get_filename(url)
Expand Down Expand Up @@ -851,7 +866,6 @@ def _finalize(data):
stats = data.get('datalad_stats', None)
if self.repo.dirty: # or self.tracker.dirty # for dry run
lgr.info("Repository found dirty -- adding and committing")
# # TODO: introduce activities tracker
_call(self.repo.annex_add, '.', options=self.options) # so everything is committed

stats_str = ('\n\n' + stats.as_str(mode='full')) if stats else ''
Expand All @@ -874,8 +888,31 @@ def _finalize(data):
stats_str = "\n\n" + total_stats.as_str(mode='full')
if tag_ in self.repo.repo.tags:
# TODO: config.tag.allow_override
raise RuntimeError("There is already tag %s in the repository" % tag)
self.repo.repo.create_tag(tag_, message="Automatically crawled and tagged by datalad %s.%s" % (__version__, stats_str))
lgr.warning("There is already tag %s in the repository. Delete it first if you want it updated" % tag_)
else:
self.repo.repo.create_tag(tag_, message="Automatically crawled and tagged by datalad %s.%s" % (__version__, stats_str))
self._states = set()
yield data
return _finalize

def remove_obsolete(self):
"""Remove obsolete files which were not referenced in queries to db
Note that it doesn't reset any state within statusdb upon call, so shouldn't be
called multiple times for the same state.
"""
# made as a class so could be reset
class _remove_obsolete(object):
def __call__(self_, data):
obsolete = self.statusdb.get_obsolete()
if obsolete:
lgr.info('Removing %d obsolete files' % len(obsolete))
stats = data.get('datalad_stats', None)
_call(self.repo.git_remove, obsolete)
if stats:
_call(stats.increment, 'removed', len(obsolete))
yield data
def reset(self_):
self.statusdb.reset()

return _remove_obsolete()
1 change: 1 addition & 0 deletions datalad/crawler/pipelines/openfmri.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ def pipeline(dataset, versioned_urls=True, topurl="https://openfmri.org/dataset/
# Now some true magic -- possibly multiple commits, 1 per each detected new version!
annex.commit_versions('_R(?P<version>\d+[\.\d]*)(?=[\._])'),
],
annex.remove_obsolete(), # should be called while still within incoming but only once
# TODO: since it is a very common pattern -- consider absorbing into e.g. add_archive_content?
[ # nested pipeline so we could skip it entirely if nothing new to be merged
{'loop': True}, # loop for multiple versions merges
Expand Down
42 changes: 41 additions & 1 deletion datalad/crawler/pipelines/tests/test_openfmri.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from ....api import clean
from ....utils import chpwd
from ....utils import find_files
from ....utils import swallow_logs
from ....tests.utils import with_tree
from ....tests.utils import SkipTest
from ....tests.utils import eq_, assert_not_equal, ok_, assert_raises
Expand Down Expand Up @@ -68,6 +69,13 @@ def add_to_index(index_file, content):
f.write(old_index.replace(_PLUG_HERE, content + _PLUG_HERE))


def remove_from_index(index_file, regexp):
with open(index_file) as f:
old_index = f.read()
with open(index_file, 'w') as f:
f.write(re.sub(regexp, '', old_index))


@skip_if_no_network
@use_cassette('fixtures/vcr_cassettes/openfmri.yaml')
def __test_basic_openfmri_top_pipeline():
Expand Down Expand Up @@ -251,6 +259,11 @@ def hexsha(l):
'./.datalad/config.ttl', './.datalad/crawl/crawl.cfg',
'./.datalad/crawl/versions/incoming.json', './.datalad/datalad.ttl',
'./README.txt', './changelog.txt', './sub-1/anat/sub-1_T1w.dat', './sub-1/beh/responses.tsv'}
target_incoming_files = {
'README.txt', 'changelog.txt',
'ds666-beh_R1.0.1.tar.gz', 'ds666_R1.0.0.tar.gz', 'ds666_R1.0.1.tar.gz', 'ds666_R2.0.0.tar.gz',
'.datalad/crawl/versions/incoming.json'
}
eq_(set(all_files), target_files)

# check that -beh was committed in 2nd commit in incoming, not the first one
Expand All @@ -272,7 +285,8 @@ def hexsha(l):

# rerun pipeline when new content is available
# add new revision, rerun pipeline and check that stuff was processed/added correctly
add_to_index(opj(ind, 'ds666', 'index.html'),
index_html = opj(ind, 'ds666', 'index.html')
add_to_index(index_html,
content = '<a href="ds666_R2.0.0.tar.gz">Raw data on AWS version 2.0.0</a>')

with chpwd(outd):
Expand Down Expand Up @@ -302,6 +316,32 @@ def hexsha(l):
merges=[['incoming', 'incoming-processed']]))

check_dropall_get(repo)

# Let's see if pipeline would remove files we stopped tracking
remove_from_index(index_html, '<a href=.ds666_R1.0.0[^<]*</a>')
with chpwd(outd):
with swallow_logs() as cml:
out = run_pipeline(pipeline)
# since files get removed in incoming, but repreprocessed completely
# incomming-processed and merged into master -- new commits will come
# They shouldn't have any difference but still should be new commits
assert_in("There is already tag 2.0.0 in the repository", cml.out)
eq_(len(out), 1)
incoming_files = repo.git_get_files('incoming')
target_incoming_files.remove('ds666_R1.0.0.tar.gz')
eq_(set(incoming_files), target_incoming_files)
commits_hexsha_removed = {b: list(repo.git_get_branch_commits(b, value='hexsha')) for b in branches}
# so no new changes to master/incomming-processed since older revision was removed
for b in 'master', 'incoming-processed':
eq_(repo.repo.branches[b].commit.diff(commits_hexsha_[b][0]), [])
dincoming = repo.repo.branches['incoming'].commit.diff(commits_hexsha_['incoming'][0])
eq_(len(dincoming), 1) # 1 diff object -- 1 file removed
# since it seems to diff "from current to the specified", it will be listed as new_file
assert dincoming[0].new_file

eq_(out[0]['datalad_stats'].get_total().removed, 1)
assert_not_equal(commits_hexsha_, commits_hexsha_removed)

test_openfmri_pipeline1.tags = ['integration']


Expand Down
10 changes: 9 additions & 1 deletion datalad/interface/crawl.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,12 @@ def __call__(self, path=None, dry_run=False, is_pipeline=False, chdir=None):
pipeline = load_pipeline_from_config(path)

lgr.info("Running pipeline %s" % str(pipeline))
run_pipeline(pipeline)
# TODO: capture the state of all branches so in case of crash
# we could gracefully reset back
try:
run_pipeline(pipeline)
except Exception as exc:
# TODO: config.crawl.failure = full-reset | last-good-master
# probably ask via ui which action should be performed unless
# explicitly specified
raise
4 changes: 3 additions & 1 deletion datalad/support/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
_COUNTS = (
'files', 'urls',
'add_git', 'add_annex',
'skipped', 'overwritten', 'renamed',
'skipped', 'overwritten', 'renamed', 'removed',
'downloaded', 'downloaded_size', 'downloaded_time'
)
_LISTS = (
Expand Down Expand Up @@ -139,6 +139,7 @@ def as_str(self, mode='full'):
Files processed: {files}
skipped: {skipped}
renamed: {renamed}
removed: {removed}
added to git: {add_git}
added to annex: {add_annex}
overwritten: {overwritten}
Expand All @@ -160,6 +161,7 @@ def as_str(self, mode='full'):
("Files processed", "files"),
(" skipped", "skipped"),
(" renamed", "renamed"),
(" removed", "removed"),
(" overwritten", "overwritten"),
(" +git", "add_git"),
(" +annex", "add_annex"),
Expand Down

0 comments on commit 399c9e9

Please sign in to comment.