Skip to content

Commit

Permalink
Merge pull request #325 from yarikoptic/enh-annexing-openfmri
Browse files Browse the repository at this point in the history
Various fixes and enhancements which came about while crawling first openfmri datasets
  • Loading branch information
yarikoptic committed Dec 21, 2015
2 parents 0bfc48f + cf5ee9f commit 2bde97e
Show file tree
Hide file tree
Showing 11 changed files with 147 additions and 50 deletions.
13 changes: 12 additions & 1 deletion datalad/crawler/nodes/annex.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class initiate_handle(object):
"""Action to initiate a handle following one of the known templates
"""
def __init__(self, template, handle_name=None, collection_name=None,
path=None,
path=None, branch=None,
data_fields=[], add_fields={}, existing='raise'):
"""
Parameters
Expand All @@ -54,6 +54,8 @@ def __init__(self, template, handle_name=None, collection_name=None,
path : str, optional
Path were to initiate the handle. If not specified, would use
default path for all new handles (DATALAD_CRAWL_COLLECTIONSPATH)
branch : str, optional
Which branch to initialize
data_fields : list or tuple of str, optional
Additional fields from data to store into configuration for
the handle crawling options -- would be passed into the corresponding
Expand All @@ -73,8 +75,17 @@ def __init__(self, template, handle_name=None, collection_name=None,
self.add_fields = add_fields
self.existing = existing
self.path = path
self.branch = branch

def _initiate_handle(self, path, name):
if self.branch is not None:
# Because all the 'create' magic is stuffed into the constructor ATM
# we need first initiate a git repository
git_repo = GitRepo(path, create=True)
# since we are initiatializing, that branch shouldn't exist yet, thus --orphan
git_repo.git_checkout(self.branch, options="--orphan")
# TODO: RF whenevever create becomes a dedicated factory/method
# and/or branch becomes an option for the "creater"
return HandleRepo(
path,
direct=cfg.getboolean('crawl', 'init direct', default=False),
Expand Down
15 changes: 9 additions & 6 deletions datalad/crawler/pipelines/openfmri.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ def pipeline(dataset):
return [
annex.switch_branch('incoming'),
crawl_url(dataset_url),
# TODO: needs fixing of the openfmri bucket
# email sent out
#func_to_node(get_versioned_url, kwargs={'guarantee_versioned': True,
# 'verify': True})
[ # changelog
a_href_match(".*release_history.txt"), # , limit=1
assign({'filename': 'changelog.txt'}),
Expand All @@ -67,15 +63,22 @@ def pipeline(dataset):
# xpath('//h4[contains(text(), "Data:")]')
# so let's just select all the ones going to /tarballs/
a_href_match('.*/tarballs/.*\.(tgz|tar.*|zip)', min_count=1),
# TODO: needs fixing of the openfmri bucket
# email sent out
func_to_node(get_versioned_url,
data_args=['url'],
outputs=['url'],
kwargs={'guarantee_versioned': True,
'verify': True}),

# TODO: we need to "version" those urls which we can version, e.g.,
# if coming from versioned S3 buckets
# version_url,
# TODO TEMP -- too heavy, use some bogie for now
#assign({'url': 'http://www.onerussian.com/tmp/ds005_raw_boogie.tgz'}),
#assign({'url': 'http://www.onerussian.com/tmp/ds005_raw_boogie_2.tgz'}),
assign({'url': 'http://www.onerussian.com/tmp/ds005_raw_boogie_4.tgz'}),
assign({'filename': 'ds005_raw_boogie.tgz'}),
#assign({'url': 'http://www.onerussian.com/tmp/ds005_raw_boogie_4.tgz'}),
#assign({'filename': 'ds005_raw_boogie.tgz'}),

annex,
],
Expand Down
8 changes: 5 additions & 3 deletions datalad/crawler/pipelines/openfmri_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@ def pipeline(
# Should return a list representing a pipeline
# TODO: get to 'incoming branch'
return [
crawl_url("https://openfmri.org/data-sets"),
crawl_url("https://openfmri.org/dataset/"),
#a_href_match("(?P<url>.*/dataset/(?P<dataset>ds0*(?P<dataset_index>[1-9][0-9a-z]*)))$"),
a_href_match("(?P<url>.*/dataset/(?P<dataset>ds0*(?P<dataset_index>05)))$"),
a_href_match("(?P<url>.*/dataset/(?P<dataset>ds0*(?P<dataset_index>[0-9]*)))/*$"),
# https://openfmri.org/dataset/ds000001/
assign({'handle_name': '%(dataset)s'}, interpolate=True),
initiate_handle(
template="openfmri",
data_fields=['dataset']
data_fields=['dataset'],
branch='incoming', # there will be archives etc
# further any additional options
)
]
9 changes: 9 additions & 0 deletions datalad/downloaders/configs/openfmri.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,12 @@
url_re = https?://openfmri.org/.*
# does not require any, good folks
authentication_type = none

# TODO: make it proper catch-all handling
# see https://github.com/datalad/datalad/issues/322
[provider:openfmri-s3]
url_re = s3://openfmri/.*
credential = datalad-test-s3
authentication_type = aws-s3


65 changes: 36 additions & 29 deletions datalad/interface/crawl.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
__docformat__ = 'restructuredtext'


from os.path import exists
from os.path import exists, isdir
from .base import Interface
from datalad.support.param import Parameter
from datalad.support.constraints import EnsureStr, EnsureChoice, EnsureNone
Expand All @@ -38,48 +38,55 @@ class Crawl(Interface):
args=("--is-pipeline",),
action="store_true",
doc="""Flag if provided file is a Python script which defines pipeline()"""),
chdir=Parameter(
args=("-C", "--chdir"),
constraints=EnsureStr() | EnsureNone(),
doc="""Directory to chrdir to for crawling"""),
path=Parameter(
args=('path',),
metavar='file',
nargs='?',
constraints=EnsureStr() | EnsureNone(),
doc="""Configuration (or pipeline if --is-pipeline) file defining crawling"""),
doc="""Configuration (or pipeline if --is-pipeline) file defining crawling, or a directory
of a handle on which to perform crawling using its standard crawling specification"""),
)

def __call__(self, path=None, dry_run=False, is_pipeline=False):
def __call__(self, path=None, dry_run=False, is_pipeline=False, chdir=None):
from datalad.crawler.pipeline import (
load_pipeline_from_config, load_pipeline_from_script,
get_repo_pipeline_config_path, get_repo_pipeline_script_path
)
from datalad.crawler.pipeline import run_pipeline
from datalad.utils import chpwd # import late so we could mock during tests
with chpwd(chdir):
# TODO: centralize via _params_ handling
if dry_run:
if not 'crawl' in cfg.sections():
cfg.add_section('crawl')
cfg.set('crawl', 'dryrun', "True")

# TODO: centralize via _params_ handling
if dry_run:
if not 'crawl' in cfg.sections():
cfg.add_section('crawl')
cfg.set('crawl', 'dryrun', "True")
if path is None:

if path is None:
# get config from the current repository/handle
if is_pipeline:
raise ValueError("You must specify the file if --pipeline")
# Let's see if there is a config or pipeline in this repo
path = get_repo_pipeline_config_path()
if not path or not exists(path):
# Check if there may be the pipeline provided
path = get_repo_pipeline_script_path()
if path and exists(path):
is_pipeline = True
# get config from the current repository/handle
if is_pipeline:
raise ValueError("You must specify the file if --pipeline")
# Let's see if there is a config or pipeline in this repo
path = get_repo_pipeline_config_path()
if not path or not exists(path):
# Check if there may be the pipeline provided
path = get_repo_pipeline_script_path()
if path and exists(path):
is_pipeline = True

if not path:
raise RuntimeError("Cannot locate crawler config or pipeline file")
if not path:
raise RuntimeError("Cannot locate crawler config or pipeline file")

if is_pipeline:
lgr.info("Loading pipeline definition from %s" % path)
pipeline = load_pipeline_from_script(path)
else:
lgr.info("Loading pipeline specification from %s" % path)
pipeline = load_pipeline_from_config(path)
if is_pipeline:
lgr.info("Loading pipeline definition from %s" % path)
pipeline = load_pipeline_from_script(path)
else:
lgr.info("Loading pipeline specification from %s" % path)
pipeline = load_pipeline_from_config(path)

lgr.info("Running pipeline %s" % str(pipeline))
run_pipeline(pipeline)
lgr.info("Running pipeline %s" % str(pipeline))
run_pipeline(pipeline)
30 changes: 30 additions & 0 deletions datalad/interface/tests/test_crawl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# emacs: -*- mode: python; py-indent-offset: 4; tab-width: 4; indent-tabs-mode: nil -*-
# ex: set sts=4 ts=4 sw=4 noet:
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
#
# See COPYING file distributed along with the datalad package for the
# copyright and license terms.
#
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
"""Tests for crawl command
"""

__docformat__ = 'restructuredtext'

from mock import patch
from ...api import crawl

from ...tests.utils import assert_cwd_unchanged

@assert_cwd_unchanged(ok_to_chdir=True)
@patch('datalad.utils.chpwd')
@patch('datalad.crawler.pipeline.load_pipeline_from_config', return_value=['pipeline'])
@patch('datalad.crawler.pipeline.run_pipeline')
# Note that order of patched things as args is reverse for some reason :-/
def test_crawl_api_chdir(run_pipeline_, load_pipeline_from_config_, chpwd_):
crawl('some_path_not_checked', chdir='somedir')

chpwd_.assert_called_with('somedir')
load_pipeline_from_config_.assert_called_with('some_path_not_checked')
run_pipeline_.assert_called_with(['pipeline'])
12 changes: 10 additions & 2 deletions datalad/interface/tests/test_get.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,23 @@
from mock import patch
from ...api import get

from ...tests.utils import with_testrepos
from ...support.annexrepo import AnnexRepo
#from ...tests.utils import with_testrepos
from ...tests.utils import with_tempfile
from ...tests.utils import assert_cwd_unchanged
from ...utils import chpwd

@with_testrepos('basic_annex', flavors=['clone'])
#@with_testrepos('basic_annex', flavors=['clone'], count=1)
@with_tempfile(mkdir=True)
@assert_cwd_unchanged(ok_to_chdir=True)
@patch('datalad.support.annexrepo.AnnexRepo.annex_get')
def test_get_basic(repo_path, annex_get_mocked):
hndl = AnnexRepo(repo_path, create=True)
chpwd(repo_path)
with open('test-annex.dat', 'w') as f:
f.write('123')
hndl.annex_add('test-annex.dat')
hndl.commit("test")
get(['test-annex.dat'])
annex_get_mocked.assert_called_with(['test-annex.dat'])

Expand Down
6 changes: 5 additions & 1 deletion datalad/support/cookies.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import shelve
import appdirs
import os.path
from six import PY2
from .network import get_tld

# FIXME should make into a decorator so that it closes the cookie_db upon exiting whatever func uses it
Expand Down Expand Up @@ -47,7 +48,10 @@ def _load(self):
def _get_provider(self, url):
if self._cookies_db is None:
self._load()
return get_tld(url)
tld = get_tld(url)
if PY2:
return tld.encode()
return tld

def __getitem__(self, url):
return self._cookies_db[self._get_provider(url)]
Expand Down
18 changes: 13 additions & 5 deletions datalad/support/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@


# TODO: should become a config option and managed along with the rest
S3_ADMIN_CREDENTIAL = "datalad-s3-admin"
S3_TEST_CREDENTIAL = "datalad-s3-test"
S3_ADMIN_CREDENTIAL = "datalad-datalad-admin-s3"
S3_TEST_CREDENTIAL = "datalad-datalad-test-s3"


def get_bucket_connection(credential):
Expand Down Expand Up @@ -210,8 +210,16 @@ def get_versioned_url(url, guarantee_versioned=False, return_all=False, verify=F
was_versioned = False
all_versions = []
if s3_bucket:
s3conn = s3conn or get_bucket_connection(S3_TEST_CREDENTIAL)
bucket = s3conn.get_bucket(s3_bucket) # TODO cache
# TODO: cache
if s3conn is None:
# we need to reuse our providers
from ..downloaders.providers import Providers
providers = Providers.from_config_files()
s3url = "s3://%s/" % s3_bucket
s3provider = providers.get_provider(s3url)
bucket = s3provider.authenticator.authenticate(s3_bucket, s3provider.credential) # s3conn or get_bucket_connection(S3_TEST_CREDENTIAL)
else:
bucket = s3conn.get_bucket(s3_bucket)
supports_versioning = True # assume that it does
try:
supports_versioning = bucket.get_versioning_status() # TODO cache
Expand Down Expand Up @@ -270,4 +278,4 @@ def get_versioned_url(url, guarantee_versioned=False, return_all=False, verify=F
else:
locals()['gen_bucket_%s' % name]()
else:
print("nothing todo")
print("nothing todo")
6 changes: 6 additions & 0 deletions datalad/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,12 @@ def test_getpwd_basic():
ok_(isabs(pwd))
eq_(os.getcwd(), abspath(pwd))

# that we do not chdir anywhere if None provided
with patch('os.chdir') as oschdir:
with chpwd(None):
eq_(getpwd(), pwd)
assert_false(oschdir.called)


@skip_if_on_windows
@with_tempfile(mkdir=True)
Expand Down
15 changes: 12 additions & 3 deletions datalad/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,14 +559,22 @@ class chpwd(object):
to the given path
"""
def __init__(self, path, mkdir=False):

if path:
pwd = getpwd()
self._prev_pwd = pwd
else:
self._prev_pwd = None
return

if not isabs(path):
path = normpath(opj(getpwd(), path))
path = normpath(opj(pwd, path))
if not os.path.exists(path) and mkdir:
self._mkdir = True
os.mkdir(path)
else:
self._mkdir = False
self._prev_pwd = getpwd()

os.chdir(path) # for grep people -- ok, to chdir here!
os.environ['PWD'] = path

Expand All @@ -575,5 +583,6 @@ def __enter__(self):
pass

def __exit__(self, exc_type, exc_val, exc_tb):
chpwd(self._prev_pwd)
if self._prev_pwd:
chpwd(self._prev_pwd)

0 comments on commit 2bde97e

Please sign in to comment.