Skip to content

Commit

Permalink
Core & Internals: updated RSE export structure; Fix rucio#2237
Browse files Browse the repository at this point in the history
  • Loading branch information
hahahannes committed Mar 25, 2019
1 parent 52a441b commit c2bf09b
Show file tree
Hide file tree
Showing 8 changed files with 252 additions and 197 deletions.
2 changes: 1 addition & 1 deletion etc/rse_repository.json
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@
},
"srm": {
"impl": "rucio.rse.protocols.srm.Default",
"hostname": "srm://atlassrm-fzk.gridka.de.mock",
"hostname": "atlassrm-fzk.gridka.de.mock",
"prefix": "/pnfs/gridka.de/atlas/disk-only/atlasscratchdisk/",
"extended_attributes": {"web_service_path": "/srm/managerv2?SFN=", "space_token": "ATLASSCRATCHDISK"},
"domains": {
Expand Down
14 changes: 3 additions & 11 deletions lib/rucio/common/schema/atlas.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,16 +346,6 @@

SCOPE_NAME_REGEXP = '/(.*)/(.*)'

RSES = {"description": "list of RSEs",
"type": "array",
"items": {
"type": "object",
"properties": {
"rse": RSE
},
"required": ["rse"],
"additionalProperties": True}}

DISTANCE = {"description": "RSE distance",
"type": "object",
"properties": {
Expand All @@ -369,7 +359,9 @@
IMPORT = {"description": "import data into rucio.",
"type": "object",
"properties": {
"rses": RSES,
"rses": {
"type": "object"
},
"distances": {
"type": "object"
}
Expand Down
15 changes: 3 additions & 12 deletions lib/rucio/common/schema/cms.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,17 +344,6 @@

SCOPE_NAME_REGEXP = '/([^/]*)(?=/)(.*)'

RSES = {"description": "list of RSEs",
"type": "array",
"items": {
"type": "object",
"properties": {
"rse": RSE
},
"required": ["rse"],
"additionalProperties": True
}}

DISTANCE = {"description": "RSE distance",
"type": "object",
"properties": {
Expand All @@ -368,7 +357,9 @@
IMPORT = {"description": "import data into rucio.",
"type": "object",
"properties": {
"rses": RSES,
"rses": {
"type": "object"
},
"distances": {
"type": "object"
}
Expand Down
15 changes: 3 additions & 12 deletions lib/rucio/common/schema/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,17 +343,6 @@

SCOPE_NAME_REGEXP = '/(.*)/(.*)'

RSES = {"description": "list of RSEs",
"type": "array",
"items": {
"type": "object",
"properties": {
"rse": RSE
},
"required": ["rse"],
"additionalProperties": True
}}

DISTANCE = {"description": "RSE distance",
"type": "object",
"properties": {
Expand All @@ -367,7 +356,9 @@
IMPORT = {"description": "import data into rucio.",
"type": "object",
"properties": {
"rses": RSES,
"rses": {
"type": "object"
},
"distances": {
"type": "object"
}
Expand Down
19 changes: 17 additions & 2 deletions lib/rucio/core/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,29 @@
# limitations under the License.
#
# Authors:
# - Hannes Hansen <hannes.jakob.hansen@cern.ch>, 2018
# - Hannes Hansen <hannes.jakob.hansen@cern.ch>, 2018-2019
#
# PY3K COMPATIBLE

from rucio.core import rse as rse_module, distance as distance_module
from rucio.db.sqla.session import transactional_session


@transactional_session
def export_rses(session=None):
"""
Export RSE data.
:param session: database session in use.
"""
data = {}
for rse in rse_module.list_rses(session=session):
rse_name = rse['rse']
data[rse_name] = rse_module.export_rse(rse_name, session=session)

return data


@transactional_session
def export_data(session=None):
"""
Expand All @@ -29,7 +44,7 @@ def export_data(session=None):
:param session: database session in use.
"""
data = {
'rses': [rse_module.export_rse(rse['rse'], session=session) for rse in rse_module.list_rses(session=session)],
'rses': export_rses(session=session),
'distances': distance_module.export_distances(session=session)
}
return data
181 changes: 102 additions & 79 deletions lib/rucio/core/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,113 @@
# limitations under the License.
#
# Authors:
# - Hannes Hansen <hannes.jakob.hansen@cern.ch>, 2018
# - Hannes Hansen <hannes.jakob.hansen@cern.ch>, 2018-2019
#
# PY3K COMPATIBLE

try:
# PY2
from urlparse import urlparse
except ImportError:
# PY3
from urllib.parse import urlparse
from rucio.core import rse as rse_module, distance as distance_module
from rucio.db.sqla.session import transactional_session


@transactional_session
def import_rses(rses, session=None):
for rse_name in rses:
rse = rses[rse_name]
protocols = rse.get('protocols')
siteinfo = rse.get('siteinfo') or {}
if not rse_module.rse_exists(rse_name, session=session):
rse_module.add_rse(rse_name, deterministic=rse.get('deterministic'), volatile=rse.get('volatile'),
city=rse.get('city'), region_code=rse.get('region_code'), country_name=rse.get('country_name'),
staging_area=rse.get('staging_area'), continent=rse.get('continent'), time_zone=rse.get('time_zone') or siteinfo.get('timezone'),
ISP=rse.get('ISP'), rse_type=rse.get('rse_type'), latitude=rse.get('latitude') or siteinfo.get('latitude'),
longitude=rse.get('longitude') or siteinfo.get('longitude'), ASN=rse.get('ASN'), availability=rse.get('availability'),
session=session)
else:
rse_module.update_rse(rse_name, rse, session=session)

# Protocols
if protocols:
old_protocols = rse_module.get_rse_protocols(rse=rse_name, session=session)
for protocol in protocols:
protocol = protocols[protocol]
parsed_url = urlparse(protocol['endpoint'])
scheme = parsed_url.scheme
hostname = parsed_url.netloc.split(':')[0]
port = int(parsed_url.netloc.split(':')[1] or 0)
prefix = parsed_url.path
intersection = [old_protocol for old_protocol in old_protocols['protocols'] if old_protocol['scheme'] == scheme and old_protocol['hostname'] == hostname and
old_protocol['port'] == port]
if intersection:
data = {
'domains': protocol.get('domains', []),
'prefix': prefix,
'hostname': hostname,
'port': port,
'scheme': scheme,
'impl': 'TODO',
'extended_attributes': protocol.get('extended_attributes', [])
}
rse_module.update_protocols(rse=rse_name, scheme=scheme, data=data, hostname=hostname, port=port, session=session)
else:
data = {'scheme': scheme, 'hostname': hostname, 'port': port, 'prefix': prefix, 'impl': 'TODO'}
rse_module.add_protocol(rse=rse_name, parameter=data, session=session)

# Limits
limits = rse.get('limits')
if limits:
old_limits = rse_module.get_rse_limits(rse=rse_name, session=session)
for limit in limits:
if limit in old_limits:
rse_module.delete_rse_limit(rse=rse_name, name=limit, session=session)
rse_module.set_rse_limits(rse=rse_name, name=limit, value=limits[limit], session=session)

# Transfer limits
transfer_limits = rse.get('transfer_limits')
if transfer_limits:
for limit in transfer_limits:
old_transfer_limits = rse_module.get_rse_transfer_limits(rse=rse_name, activity=limit, session=session)
if limit in old_transfer_limits:
rse_module.delete_rse_transfer_limits(rse=rse_name, activity=limit, session=session)
max_transfers = transfer_limits[limit].items()[0][1]['max_transfers']
rse_module.set_rse_transfer_limits(rse=rse_name, activity=limit, max_transfers=max_transfers, session=session)

# Attributes
attributes = rse.get('attributes')
if attributes:
old_attributes = rse_module.list_rse_attributes(rse=rse_name, session=session)
for attr in attributes:
if attr in old_attributes:
rse_module.del_rse_attribute(rse=rse_name, key=attr, session=session)
rse_module.add_rse_attribute(rse=rse_name, key=attr, value=attributes[attr], session=session)


@transactional_session
def import_distances(distances, session=None):
for src_rse_name in distances:
src = rse_module.get_rse_id(src_rse_name, session=session)
for dest_rse_name in distances[src_rse_name]:
dest = rse_module.get_rse_id(dest_rse_name, session=session)
distance = distances[src_rse_name][dest_rse_name]
del distance['src_rse_id']
del distance['dest_rse_id']

old_distance = distance_module.get_distances(src_rse_id=src, dest_rse_id=dest, session=session)
if old_distance:
distance_module.update_distances(src_rse_id=src, dest_rse_id=dest, parameters=distance, session=session)
else:
distance_module.add_distance(src_rse_id=src, dest_rse_id=dest, ranking=distance.get('ranking'),
agis_distance=distance.get('agis_distance'), geoip_distance=distance.get('geoip_distance'),
active=distance.get('active'), submitted=distance.get('submitted'),
transfer_speed=distance.get('transfer_speed'), finished=distance.get('finished'),
failed=distance.get('failed'), session=session)


@transactional_session
def import_data(data, session=None):
"""
Expand All @@ -32,85 +131,9 @@ def import_data(data, session=None):
# RSEs
rses = data.get('rses')
if rses:
for rse in rses:
protocols = rse.get('protocols')
if protocols:
protocols = protocols.get('protocols')
del rse['protocols']
rse_name = rse['rse']
del rse['rse']
if not rse_module.rse_exists(rse_name, session=session):
rse_module.add_rse(rse_name, deterministic=rse.get('deterministic'), volatile=rse.get('volatile'),
city=rse.get('city'), region_code=rse.get('region_code'), country_name=rse.get('country_name'),
staging_area=rse.get('staging_area'), continent=rse.get('continent'), time_zone=rse.get('time_zone'),
ISP=rse.get('ISP'), rse_type=rse.get('rse_type'), latitude=rse.get('latitude'),
longitude=rse.get('longitude'), ASN=rse.get('ASN'), availability=rse.get('availability'),
session=session)
else:
rse_module.update_rse(rse_name, rse, session=session)

# Protocols
if protocols:
old_protocols = rse_module.get_rse_protocols(rse=rse_name, session=session)
for protocol in protocols:
scheme = protocol.get('scheme')
hostname = protocol.get('hostname')
port = protocol.get('port')
intersection = [old_protocol for old_protocol in old_protocols['protocols'] if old_protocol['scheme'] == scheme and old_protocol['hostname'] == hostname and
old_protocol['port'] == port]
if intersection:
del protocol['scheme']
del protocol['hostname']
del protocol['port']
rse_module.update_protocols(rse=rse_name, scheme=scheme, data=protocol, hostname=hostname, port=port, session=session)
else:
rse_module.add_protocol(rse=rse_name, parameter=protocol, session=session)

# Limits
limits = rse.get('limits')
if limits:
old_limits = rse_module.get_rse_limits(rse=rse_name, session=session)
for limit in limits:
if limit in old_limits:
rse_module.delete_rse_limit(rse=rse_name, name=limit, session=session)
rse_module.set_rse_limits(rse=rse_name, name=limit, value=limits[limit], session=session)

# Transfer limits
transfer_limits = rse.get('transfer_limits')
if transfer_limits:
for limit in transfer_limits:
old_transfer_limits = rse_module.get_rse_transfer_limits(rse=rse_name, activity=limit, session=session)
if limit in old_transfer_limits:
rse_module.delete_rse_transfer_limits(rse=rse_name, activity=limit, session=session)
max_transfers = transfer_limits[limit].items()[0][1]['max_transfers']
rse_module.set_rse_transfer_limits(rse=rse_name, activity=limit, max_transfers=max_transfers, session=session)

# Attributes
attributes = rse.get('attributes')
if attributes:
old_attributes = rse_module.list_rse_attributes(rse=rse_name, session=session)
for attr in attributes:
if attr in old_attributes:
rse_module.del_rse_attribute(rse=rse_name, key=attr, session=session)
rse_module.add_rse_attribute(rse=rse_name, key=attr, value=attributes[attr], session=session)
import_rses(rses, session=session)

# Distances
distances = data.get('distances')
if distances:
for src_rse_name in distances:
src = rse_module.get_rse_id(src_rse_name, session=session)
for dest_rse_name in distances[src_rse_name]:
dest = rse_module.get_rse_id(dest_rse_name, session=session)
distance = distances[src_rse_name][dest_rse_name]
del distance['src_rse_id']
del distance['dest_rse_id']

old_distance = distance_module.get_distances(src_rse_id=src, dest_rse_id=dest, session=session)
if old_distance:
distance_module.update_distances(src_rse_id=src, dest_rse_id=dest, parameters=distance, session=session)
else:
distance_module.add_distance(src_rse_id=src, dest_rse_id=dest, ranking=distance.get('ranking'),
agis_distance=distance.get('agis_distance'), geoip_distance=distance.get('geoip_distance'),
active=distance.get('active'), submitted=distance.get('submitted'),
transfer_speed=distance.get('transfer_speed'), finished=distance.get('finished'),
failed=distance.get('failed'), session=session)
import_distances(distances, session=session)

0 comments on commit c2bf09b

Please sign in to comment.