Skip to content

Commit

Permalink
Core & Internals: updated RSE export structure; Fix rucio#2237
Browse files Browse the repository at this point in the history
  • Loading branch information
hahahannes committed Jun 13, 2019
1 parent 52a441b commit d635234
Show file tree
Hide file tree
Showing 9 changed files with 264 additions and 271 deletions.
4 changes: 2 additions & 2 deletions etc/rse_repository.json
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@
},
"srm": {
"impl": "rucio.rse.protocols.srm.Default",
"hostname": "srm://atlassrm-fzk.gridka.de.mock",
"hostname": "atlassrm-fzk.gridka.de.mock",
"prefix": "/pnfs/gridka.de/atlas/disk-only/atlasscratchdisk/",
"extended_attributes": {"web_service_path": "/srm/managerv2?SFN=", "space_token": "ATLASSCRATCHDISK"},
"domains": {
Expand Down Expand Up @@ -528,7 +528,7 @@
"supported": {
"rfio": {
"impl": "rucio.rse.protocols.rfio.Default",
"hostname": "castoratlas.cern.ch.mock",
"hostname": "castoratlas.cern.ch.bmock",
"port": 9002,
"prefix": "/castor/cern.ch/grid/atlas/tzero/",
"domains": {
Expand Down
7 changes: 3 additions & 4 deletions lib/rucio/client/importclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,15 @@
# limitations under the License.
#
# Authors:
# - Hannes Hansen <hannes.jakob.hansen@cern.ch>, 2018
# - Hannes Hansen <hannes.jakob.hansen@cern.ch>, 2018-2019
#
# PY3K COMPATIBLE

from json import dumps
from requests.status_codes import codes

from rucio.client.baseclient import BaseClient
from rucio.client.baseclient import choice
from rucio.common.utils import build_url
from rucio.common.utils import build_url, render_json


class ImportClient(BaseClient):
Expand All @@ -44,7 +43,7 @@ def import_data(self, data):
path = '/'.join([self.IMPORT_BASEURL])
url = build_url(choice(self.list_hosts), path=path)

r = self._send_request(url, type='POST', data=dumps(data))
r = self._send_request(url, type='POST', data=render_json(**data))
if r.status_code == codes.created:
return r.text
else:
Expand Down
14 changes: 3 additions & 11 deletions lib/rucio/common/schema/atlas.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,16 +346,6 @@

SCOPE_NAME_REGEXP = '/(.*)/(.*)'

RSES = {"description": "list of RSEs",
"type": "array",
"items": {
"type": "object",
"properties": {
"rse": RSE
},
"required": ["rse"],
"additionalProperties": True}}

DISTANCE = {"description": "RSE distance",
"type": "object",
"properties": {
Expand All @@ -369,7 +359,9 @@
IMPORT = {"description": "import data into rucio.",
"type": "object",
"properties": {
"rses": RSES,
"rses": {
"type": "object"
},
"distances": {
"type": "object"
}
Expand Down
15 changes: 3 additions & 12 deletions lib/rucio/common/schema/cms.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,17 +344,6 @@

SCOPE_NAME_REGEXP = '/([^/]*)(?=/)(.*)'

RSES = {"description": "list of RSEs",
"type": "array",
"items": {
"type": "object",
"properties": {
"rse": RSE
},
"required": ["rse"],
"additionalProperties": True
}}

DISTANCE = {"description": "RSE distance",
"type": "object",
"properties": {
Expand All @@ -368,7 +357,9 @@
IMPORT = {"description": "import data into rucio.",
"type": "object",
"properties": {
"rses": RSES,
"rses": {
"type": "object"
},
"distances": {
"type": "object"
}
Expand Down
15 changes: 3 additions & 12 deletions lib/rucio/common/schema/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,17 +343,6 @@

SCOPE_NAME_REGEXP = '/(.*)/(.*)'

RSES = {"description": "list of RSEs",
"type": "array",
"items": {
"type": "object",
"properties": {
"rse": RSE
},
"required": ["rse"],
"additionalProperties": True
}}

DISTANCE = {"description": "RSE distance",
"type": "object",
"properties": {
Expand All @@ -367,7 +356,9 @@
IMPORT = {"description": "import data into rucio.",
"type": "object",
"properties": {
"rses": RSES,
"rses": {
"type": "object"
},
"distances": {
"type": "object"
}
Expand Down
19 changes: 17 additions & 2 deletions lib/rucio/core/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,29 @@
# limitations under the License.
#
# Authors:
# - Hannes Hansen <hannes.jakob.hansen@cern.ch>, 2018
# - Hannes Hansen <hannes.jakob.hansen@cern.ch>, 2018-2019
#
# PY3K COMPATIBLE

from rucio.core import rse as rse_module, distance as distance_module
from rucio.db.sqla.session import transactional_session


@transactional_session
def export_rses(session=None):
"""
Export RSE data.
:param session: database session in use.
"""
data = {}
for rse in rse_module.list_rses(session=session):
rse_name = rse['rse']
data[rse_name] = rse_module.export_rse(rse_name, session=session)

return data


@transactional_session
def export_data(session=None):
"""
Expand All @@ -29,7 +44,7 @@ def export_data(session=None):
:param session: database session in use.
"""
data = {
'rses': [rse_module.export_rse(rse['rse'], session=session) for rse in rse_module.list_rses(session=session)],
'rses': export_rses(session=session),
'distances': distance_module.export_distances(session=session)
}
return data
168 changes: 89 additions & 79 deletions lib/rucio/core/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,100 @@
# limitations under the License.
#
# Authors:
# - Hannes Hansen <hannes.jakob.hansen@cern.ch>, 2018
# - Hannes Hansen <hannes.jakob.hansen@cern.ch>, 2018-2019
#
# PY3K COMPATIBLE

try:
# PY2
from urlparse import urlparse
except ImportError:
# PY3
from urllib.parse import urlparse
from six import string_types
from rucio.core import rse as rse_module, distance as distance_module
from rucio.db.sqla.constants import RSEType
from rucio.db.sqla.session import transactional_session


@transactional_session
def import_rses(rses, session=None):
for rse_name in rses:
rse = rses[rse_name]
if isinstance(rse.get('rse_type'), string_types):
rse['rse_type'] = RSEType.from_string(str(rse['rse_type']))
if not rse_module.rse_exists(rse_name, session=session):
rse_module.add_rse(rse_name, deterministic=rse.get('deterministic'), volatile=rse.get('volatile'),
city=rse.get('city'), region_code=rse.get('region_code'), country_name=rse.get('country_name'),
staging_area=rse.get('staging_area'), continent=rse.get('continent'), time_zone=rse.get('time_zone'),
ISP=rse.get('ISP'), rse_type=rse.get('rse_type'), latitude=rse.get('latitude'),
longitude=rse.get('longitude'), ASN=rse.get('ASN'), availability=rse.get('availability'),
session=session)
else:
rse_module.update_rse(rse_name, rse, session=session)

# Protocols
protocols = rse.get('protocols')
if protocols:
old_protocols = rse_module.get_rse_protocols(rse=rse_name, session=session)
for protocol in protocols:
scheme = protocol.get('scheme')
hostname = protocol.get('hostname')
port = protocol.get('port')
intersection = [old_protocol for old_protocol in old_protocols['protocols'] if old_protocol['scheme'] == scheme and old_protocol['hostname'] == hostname and
old_protocol['port'] == port]
if intersection:
del protocol['scheme']
del protocol['hostname']
del protocol['port']
rse_module.update_protocols(rse=rse_name, scheme=scheme, data=protocol, hostname=hostname, port=port, session=session)
else:
rse_module.add_protocol(rse=rse_name, parameter=protocol, session=session)

# Limits
old_limits = rse_module.get_rse_limits(rse=rse_name, session=session)
for limit_name in ['MaxBeingDeletedFiles', 'MinFreeSpace']:
limit = rse.get(limit_name)
if limit:
if limit_name in old_limits:
rse_module.delete_rse_limit(rse=rse_name, name=limit_name, session=session)
rse_module.set_rse_limits(rse=rse_name, name=limit_name, value=limit, session=session)

# Attributes
attributes = rse.get('attributes', {})
attributes['lfn2pfn_algorithm'] = rse.get('lfn2pfn_algorithm')
attributes['verify_checksum'] = rse.get('verify_checksum')

if attributes:
old_attributes = rse_module.list_rse_attributes(rse=rse_name, session=session)
for attr in attributes:
if attr:
if attr in old_attributes:
rse_module.del_rse_attribute(rse=rse_name, key=attr, session=session)
rse_module.add_rse_attribute(rse=rse_name, key=attr, value=attributes[attr], session=session)


@transactional_session
def import_distances(distances, session=None):
for src_rse_name in distances:
src = rse_module.get_rse_id(src_rse_name, session=session)
for dest_rse_name in distances[src_rse_name]:
dest = rse_module.get_rse_id(dest_rse_name, session=session)
distance = distances[src_rse_name][dest_rse_name]
del distance['src_rse_id']
del distance['dest_rse_id']

old_distance = distance_module.get_distances(src_rse_id=src, dest_rse_id=dest, session=session)
if old_distance:
distance_module.update_distances(src_rse_id=src, dest_rse_id=dest, parameters=distance, session=session)
else:
distance_module.add_distance(src_rse_id=src, dest_rse_id=dest, ranking=distance.get('ranking'),
agis_distance=distance.get('agis_distance'), geoip_distance=distance.get('geoip_distance'),
active=distance.get('active'), submitted=distance.get('submitted'),
transfer_speed=distance.get('transfer_speed'), finished=distance.get('finished'),
failed=distance.get('failed'), session=session)


@transactional_session
def import_data(data, session=None):
"""
Expand All @@ -32,85 +118,9 @@ def import_data(data, session=None):
# RSEs
rses = data.get('rses')
if rses:
for rse in rses:
protocols = rse.get('protocols')
if protocols:
protocols = protocols.get('protocols')
del rse['protocols']
rse_name = rse['rse']
del rse['rse']
if not rse_module.rse_exists(rse_name, session=session):
rse_module.add_rse(rse_name, deterministic=rse.get('deterministic'), volatile=rse.get('volatile'),
city=rse.get('city'), region_code=rse.get('region_code'), country_name=rse.get('country_name'),
staging_area=rse.get('staging_area'), continent=rse.get('continent'), time_zone=rse.get('time_zone'),
ISP=rse.get('ISP'), rse_type=rse.get('rse_type'), latitude=rse.get('latitude'),
longitude=rse.get('longitude'), ASN=rse.get('ASN'), availability=rse.get('availability'),
session=session)
else:
rse_module.update_rse(rse_name, rse, session=session)

# Protocols
if protocols:
old_protocols = rse_module.get_rse_protocols(rse=rse_name, session=session)
for protocol in protocols:
scheme = protocol.get('scheme')
hostname = protocol.get('hostname')
port = protocol.get('port')
intersection = [old_protocol for old_protocol in old_protocols['protocols'] if old_protocol['scheme'] == scheme and old_protocol['hostname'] == hostname and
old_protocol['port'] == port]
if intersection:
del protocol['scheme']
del protocol['hostname']
del protocol['port']
rse_module.update_protocols(rse=rse_name, scheme=scheme, data=protocol, hostname=hostname, port=port, session=session)
else:
rse_module.add_protocol(rse=rse_name, parameter=protocol, session=session)

# Limits
limits = rse.get('limits')
if limits:
old_limits = rse_module.get_rse_limits(rse=rse_name, session=session)
for limit in limits:
if limit in old_limits:
rse_module.delete_rse_limit(rse=rse_name, name=limit, session=session)
rse_module.set_rse_limits(rse=rse_name, name=limit, value=limits[limit], session=session)

# Transfer limits
transfer_limits = rse.get('transfer_limits')
if transfer_limits:
for limit in transfer_limits:
old_transfer_limits = rse_module.get_rse_transfer_limits(rse=rse_name, activity=limit, session=session)
if limit in old_transfer_limits:
rse_module.delete_rse_transfer_limits(rse=rse_name, activity=limit, session=session)
max_transfers = transfer_limits[limit].items()[0][1]['max_transfers']
rse_module.set_rse_transfer_limits(rse=rse_name, activity=limit, max_transfers=max_transfers, session=session)

# Attributes
attributes = rse.get('attributes')
if attributes:
old_attributes = rse_module.list_rse_attributes(rse=rse_name, session=session)
for attr in attributes:
if attr in old_attributes:
rse_module.del_rse_attribute(rse=rse_name, key=attr, session=session)
rse_module.add_rse_attribute(rse=rse_name, key=attr, value=attributes[attr], session=session)
import_rses(rses, session=session)

# Distances
distances = data.get('distances')
if distances:
for src_rse_name in distances:
src = rse_module.get_rse_id(src_rse_name, session=session)
for dest_rse_name in distances[src_rse_name]:
dest = rse_module.get_rse_id(dest_rse_name, session=session)
distance = distances[src_rse_name][dest_rse_name]
del distance['src_rse_id']
del distance['dest_rse_id']

old_distance = distance_module.get_distances(src_rse_id=src, dest_rse_id=dest, session=session)
if old_distance:
distance_module.update_distances(src_rse_id=src, dest_rse_id=dest, parameters=distance, session=session)
else:
distance_module.add_distance(src_rse_id=src, dest_rse_id=dest, ranking=distance.get('ranking'),
agis_distance=distance.get('agis_distance'), geoip_distance=distance.get('geoip_distance'),
active=distance.get('active'), submitted=distance.get('submitted'),
transfer_speed=distance.get('transfer_speed'), finished=distance.get('finished'),
failed=distance.get('failed'), session=session)
import_distances(distances, session=session)

0 comments on commit d635234

Please sign in to comment.