Skip to content

Commit

Permalink
Client : Bulk get_metadata method : Closes rucio#3645
Browse files Browse the repository at this point in the history
  • Loading branch information
cserf committed May 29, 2020
1 parent bb9abdd commit bc6de81
Show file tree
Hide file tree
Showing 6 changed files with 183 additions and 8 deletions.
17 changes: 16 additions & 1 deletion lib/rucio/api/did.py
Expand Up @@ -10,7 +10,7 @@
- Vincent Garonne, <vincent.garonne@cern.ch>, 2012-2017
- Mario Lassnig, <mario.lassnig@cern.ch>, 2012-2015
- Yun-Pin Sun, <yun-pin.sun@cern.ch>, 2013
- Cedric Serfon, <cedric.serfon@cern.ch>, 2013-2014
- Cedric Serfon, <cedric.serfon@cern.ch>, 2013-2020
- Martin Barisits, <martin.barisits@cern.ch>, 2014-2015
- Hannes Hansen, <hannes.jakob.hansen@cern.ch>, 2018-2019
- Andrew Lister, <andrew.lister@stfc.ac.uk>, 2019
Expand Down Expand Up @@ -379,6 +379,21 @@ def get_metadata(scope, name):
return api_update_return_dict(d)


def get_metadata_bulk(dids, session=None):
"""
Get metadata for a list of dids
:param dids: A list of dids.
:param session: The database session in use.
"""

validate_schema(name='dids', obj=dids)
for entry in dids:
entry['scope'] = InternalScope(entry['scope'])
meta = did.get_metadata_bulk(dids)
for met in meta:
yield api_update_return_dict(met)


def get_did_meta(scope, name):
"""
Get all metadata for a given did
Expand Down
16 changes: 15 additions & 1 deletion lib/rucio/client/didclient.py
Expand Up @@ -19,7 +19,7 @@
# - Martin Barisits <martin.barisits@cern.ch>, 2013-2018
# - Yun-Pin Sun <winter0128@gmail.com>, 2013
# - Thomas Beermann <thomas.beermann@cern.ch>, 2013
# - Cedric Serfon <cedric.serfon@cern.ch>, 2014-2015
# - Cedric Serfon <cedric.serfon@cern.ch>, 2014-2020
# - Joaquin Bogado <jbogado@linti.unlp.edu.ar>, 2014-2018
# - Brian Bockelman <bbockelm@cse.unl.edu>, 2018
# - Eric Vaandering <ericvaandering@gmail.com>, 2018
Expand Down Expand Up @@ -407,6 +407,20 @@ def get_metadata(self, scope, name):
exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content)
raise exc_cls(exc_msg)

def get_metadata_bulk(self, dids):
"""
Bulk get data identifier metadata
:param dids: A list of dids.
"""
data = {'dids': dids}
path = '/'.join([self.DIDS_BASEURL, 'bulkmeta'])
url = build_url(choice(self.list_hosts), path=path)
r = self._send_request(url, type='POST', data=dumps(data))
if r.status_code == codes.ok:
return self._load_json_data(r)
exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content)
raise exc_cls(exc_msg)

def set_metadata(self, scope, name, key, value, recursive=False):
"""
Set data identifier metadata
Expand Down
23 changes: 23 additions & 0 deletions lib/rucio/core/did.py
Expand Up @@ -1325,6 +1325,29 @@ def get_metadata(scope, name, session=None):
raise exception.DataIdentifierNotFound("Data identifier '%(scope)s:%(name)s' not found" % locals())


@stream_session
def get_metadata_bulk(dids, session=None):
"""
Get metadata for a list of dids
:param dids: A list of dids.
:param session: The database session in use.
"""
condition = []
for did in dids:
condition.append(and_(models.DataIdentifier.scope == did['scope'],
models.DataIdentifier.name == did['name']))

try:
for chunk in chunks(condition, 50):
for row in session.query(models.DataIdentifier).with_hint(models.DataIdentifier, "INDEX(DIDS DIDS_PK)", 'oracle').filter(or_(*chunk)):
data = {}
for column in row.__table__.columns:
data[column.name] = getattr(row, column.name)
yield data
except NoResultFound:
raise exception.DataIdentifierNotFound('No Data Identifiers found')


@read_session
def get_did_meta(scope, name, session=None):
"""
Expand Down
45 changes: 44 additions & 1 deletion lib/rucio/tests/test_did.py
Expand Up @@ -16,7 +16,7 @@
# - Vincent Garonne <vgaronne@gmail.com>, 2013-2018
# - Martin Barisits <martin.barisits@cern.ch>, 2013-2018
# - Mario Lassnig <mario.lassnig@cern.ch>, 2013-2018
# - Cedric Serfon <cedric.serfon@cern.ch>, 2013-2018
# - Cedric Serfon <cedric.serfon@cern.ch>, 2013-2020
# - Ralph Vigne <ralph.vigne@cern.ch>, 2013
# - Yun-Pin Sun <winter0128@gmail.com>, 2013
# - Thomas Beermann <thomas.beermann@cern.ch>, 2013-2018
Expand Down Expand Up @@ -1000,3 +1000,46 @@ def test_open(self):

# Add a third file replica
self.did_client.set_status(scope=tmp_scope, name=tmp_dataset, open=True)

def test_bulk_get_meta(self):
""" DATA IDENTIFIERS (CLIENT): Add a new meta data for a list of DIDs and try to retrieve them back"""
key = 'project'
rse = 'MOCK'
scope = 'mock'
files = ['file_%s' % generate_uuid() for _ in range(4)]
dst = ['dst_%s' % generate_uuid() for _ in range(4)]
cnt = ['cnt_%s' % generate_uuid() for _ in range(4)]
meta_mapping = {}
list_dids = []
for idx in range(4):
self.replica_client.add_replica(rse, scope, files[idx], 1, '0cc737eb')
self.did_client.set_metadata(scope, files[idx], key, 'file_%s' % idx)
list_dids.append({'scope': scope, 'name': files[idx]})
meta_mapping['%s:%s' % (scope, files[idx])] = (key, 'file_%s' % idx)
for idx in range(4):
self.did_client.add_did(scope, dst[idx], 'DATASET', statuses=None, meta={key: 'dsn_%s' % idx}, rules=None)
list_dids.append({'scope': scope, 'name': dst[idx]})
meta_mapping['%s:%s' % (scope, dst[idx])] = (key, 'dsn_%s' % idx)
for idx in range(4):
self.did_client.add_did(scope, cnt[idx], 'CONTAINER', statuses=None, meta={key: 'cnt_%s' % idx}, rules=None)
list_dids.append({'scope': scope, 'name': cnt[idx]})
meta_mapping['%s:%s' % (scope, cnt[idx])] = (key, 'cnt_%s' % idx)
list_meta = [_ for _ in self.did_client.get_metadata_bulk(list_dids)]
res_list_dids = [{'scope': entry['scope'], 'name': entry['name']} for entry in list_meta]
res_list_dids.sort()
list_dids.sort()
assert_equal(list_dids, res_list_dids)
for meta in list_meta:
did = '%s:%s' % (meta['scope'], meta['name'])
met = meta_mapping[did]
assert_equal((key, meta[key]), met)
cnt = ['cnt_%s' % generate_uuid() for _ in range(4)]
for idx in range(4):
list_dids.append({'scope': scope, 'name': cnt[idx]})
list_meta = [_ for _ in self.did_client.get_metadata_bulk(list_dids)]
assert_equal(len(list_meta), 12)
list_dids = []
for idx in range(4):
list_dids.append({'scope': scope, 'name': cnt[idx]})
list_meta = [_ for _ in self.did_client.get_metadata_bulk(list_dids)]
assert_equal(len(list_meta), 0)
45 changes: 43 additions & 2 deletions lib/rucio/web/rest/flaskapi/v1/did.py
Expand Up @@ -19,7 +19,7 @@
# - Vincent Garonne <vincent.garonne@cern.ch>, 2012-2016
# - Mario Lassnig <mario.lassnig@cern.ch>, 2012-2018
# - Yun-Pin Sun <yun-pin.sun@cern.ch>, 2013
# - Cedric Serfon <cedric.serfon@cern.ch>, 2014-2018
# - Cedric Serfon <cedric.serfon@cern.ch>, 2014-2020
# - Martin Baristis <martin.barisits@cern.ch>, 2014-2015
# - Hannes Hansen <hannes.jakob.hansen@cern.ch>, 2018
#
Expand All @@ -34,7 +34,7 @@

from rucio.api.did import (add_did, add_dids, list_content, list_content_history,
list_dids, list_files, scope_list, get_did, set_metadata,
get_metadata, set_status, attach_dids, detach_dids,
get_metadata, get_metadata_bulk, set_status, attach_dids, detach_dids,
attach_dids_to_dids, get_dataset_by_guid, list_parent_dids,
create_did_sample, list_new_dids, resurrect)
from rucio.api.rule import list_replication_rules, list_associated_replication_rules_for_file
Expand Down Expand Up @@ -809,6 +809,45 @@ def post(self, scope, name, key):
return "Created", 201


class BulkMeta(MethodView):

@check_accept_header_wrapper_flask(['application/x-json-stream'])
def post(self):
"""
List all meta of a list of data identifiers.
HTTP Success:
200 OK
HTTP Error:
400 Bad Request
401 Unauthorized
404 DataIdentifierNotFound
500 InternalError
:returns: A list of dictionaries containing all meta.
"""
json_data = request.data
try:
params = loads(json_data)
dids = params['dids']
except KeyError as error:
raise generate_http_error_flask(400, 'ValueError', 'Cannot find mandatory parameter : %s' % str(error))
except ValueError:
raise generate_http_error_flask(400, 'ValueError', 'Cannot decode json parameter list')
try:
data = ""
for meta in get_metadata_bulk(dids):
data += render_json(**meta) + '\n'
return Response(data, content_type='application/x-json-stream')
except ValueError:
return generate_http_error_flask(400, 'ValueError', 'Cannot decode json parameter list')
except DataIdentifierNotFound as error:
return generate_http_error_flask(404, 'DataIdentifierNotFound', error.args[0])
except RucioException as error:
return generate_http_error_flask(500, error.__class__.__name__, error.args[0])
except Exception as error:
print(format_exc())
return error, 500


class Rules(MethodView):

@check_accept_header_wrapper_flask(['application/x-json-stream'])
Expand Down Expand Up @@ -1064,6 +1103,8 @@ def post(self):
bp.add_url_rule('/new', view_func=new_dids_view, methods=['get', ])
resurrect_view = Resurrect.as_view('resurrect')
bp.add_url_rule('/resurrect', view_func=resurrect_view, methods=['post', ])
bulkmeta_view = BulkMeta.as_view('bulkmeta')
bp.add_url_rule('/bulkmeta', view_func=bulkmeta_view, methods=['post', ])

application = Flask(__name__)
application.register_blueprint(bp)
Expand Down
45 changes: 42 additions & 3 deletions lib/rucio/web/rest/webpy/v1/did.py
Expand Up @@ -19,7 +19,7 @@
# - Vincent Garonne <vincent.garonne@cern.ch>, 2012-2016
# - Mario Lassnig <mario.lassnig@cern.ch>, 2012-2018
# - Yun-Pin Sun <yun-pin.sun@cern.ch>, 2013
# - Cedric Serfon <cedric.serfon@cern.ch>, 2014-2018
# - Cedric Serfon <cedric.serfon@cern.ch>, 2014-2020
# - Martin Baristis <martin.barisits@cern.ch>, 2014-2020
# - Hannes Hansen <hannes.jakob.hansen@cern.ch>, 2018-2019
# - Ruturaj Gujar, <ruturaj.gujar23@gmail.com>, 2019
Expand All @@ -37,7 +37,7 @@

from rucio.api.did import (add_did, add_dids, list_content, list_content_history,
list_dids, list_files, scope_list, get_did, set_metadata,
get_metadata, set_status, attach_dids, detach_dids,
get_metadata, get_metadata_bulk, set_status, attach_dids, detach_dids,
attach_dids_to_dids, get_dataset_by_guid, list_parent_dids,
create_did_sample, list_new_dids, resurrect, get_did_meta,
add_did_meta, list_dids_by_meta, delete_did_meta, add_did_to_followed,
Expand All @@ -51,7 +51,7 @@
RSENotFound, RucioException, RuleNotFound,
InvalidMetadata)
from rucio.common.schema import SCOPE_NAME_REGEXP
from rucio.common.utils import generate_http_error, render_json, APIEncoder
from rucio.common.utils import generate_http_error, render_json, APIEncoder, parse_response
from rucio.web.rest.common import rucio_loadhook, RucioController, check_accept_header_wrapper

URLS = (
Expand All @@ -76,6 +76,7 @@
'/resurrect', 'Resurrect',
'/list_dids_by_meta', 'ListByMeta',
'%s/follow' % SCOPE_NAME_REGEXP, 'Follow',
'/bulkmeta', 'BulkMeta',
)


Expand Down Expand Up @@ -682,6 +683,44 @@ def GET(self, scope, name):
raise InternalError(error)


class BulkMeta(RucioController):

@check_accept_header_wrapper(['application/x-json-stream'])
def POST(self):
"""
List all meta of a list of data identifiers.
HTTP Success:
200 OK
HTTP Error:
400 Bad Request
401 Unauthorized
404 DataIdentifierNotFound
500 InternalError
:returns: A list of dictionaries containing all meta.
"""
header('Content-Type', 'application/x-json-stream')
json_data = data()
try:
params = parse_response(json_data)
dids = params['dids']
except KeyError as error:
raise generate_http_error(400, 'ValueError', 'Cannot find mandatory parameter : %s' % str(error))
except ValueError:
raise generate_http_error(400, 'ValueError', 'Cannot decode json parameter list')
try:
for meta in get_metadata_bulk(dids):
yield render_json(**meta) + '\n'
except ValueError:
raise generate_http_error(400, 'ValueError', 'Cannot decode json parameter list')
except DataIdentifierNotFound as error:
raise generate_http_error(404, 'DataIdentifierNotFound', error.args[0])
except RucioException as error:
raise generate_http_error(500, error.__class__.__name__, error.args[0])
except Exception as error:
print(format_exc())
raise InternalError(error)


class AssociatedRules(RucioController):

@check_accept_header_wrapper(['application/x-json-stream'])
Expand Down

0 comments on commit bc6de81

Please sign in to comment.