Skip to content

Commit

Permalink
Core & Internals: updated RSE export structure; Fix rucio#2237
Browse files Browse the repository at this point in the history
  • Loading branch information
hahahannes committed Jun 27, 2019
1 parent 0ae62f2 commit 93e9ac7
Show file tree
Hide file tree
Showing 9 changed files with 333 additions and 271 deletions.
7 changes: 3 additions & 4 deletions lib/rucio/client/importclient.py
Expand Up @@ -13,16 +13,15 @@
# limitations under the License.
#
# Authors:
# - Hannes Hansen <hannes.jakob.hansen@cern.ch>, 2018
# - Hannes Hansen <hannes.jakob.hansen@cern.ch>, 2018-2019
#
# PY3K COMPATIBLE

from json import dumps
from requests.status_codes import codes

from rucio.client.baseclient import BaseClient
from rucio.client.baseclient import choice
from rucio.common.utils import build_url
from rucio.common.utils import build_url, render_json


class ImportClient(BaseClient):
Expand All @@ -44,7 +43,7 @@ def import_data(self, data):
path = '/'.join([self.IMPORT_BASEURL])
url = build_url(choice(self.list_hosts), path=path)

r = self._send_request(url, type='POST', data=dumps(data))
r = self._send_request(url, type='POST', data=render_json(**data))
if r.status_code == codes.created:
return r.text
else:
Expand Down
14 changes: 3 additions & 11 deletions lib/rucio/common/schema/atlas.py
Expand Up @@ -346,16 +346,6 @@

SCOPE_NAME_REGEXP = '/(.*)/(.*)'

RSES = {"description": "list of RSEs",
"type": "array",
"items": {
"type": "object",
"properties": {
"rse": RSE
},
"required": ["rse"],
"additionalProperties": True}}

DISTANCE = {"description": "RSE distance",
"type": "object",
"properties": {
Expand All @@ -369,7 +359,9 @@
IMPORT = {"description": "import data into rucio.",
"type": "object",
"properties": {
"rses": RSES,
"rses": {
"type": "object"
},
"distances": {
"type": "object"
}
Expand Down
15 changes: 3 additions & 12 deletions lib/rucio/common/schema/cms.py
Expand Up @@ -350,17 +350,6 @@

SCOPE_NAME_REGEXP = '/([^/]*)(?=/)(.*)'

RSES = {"description": "list of RSEs",
"type": "array",
"items": {
"type": "object",
"properties": {
"rse": RSE
},
"required": ["rse"],
"additionalProperties": True
}}

DISTANCE = {"description": "RSE distance",
"type": "object",
"properties": {
Expand All @@ -374,7 +363,9 @@
IMPORT = {"description": "import data into rucio.",
"type": "object",
"properties": {
"rses": RSES,
"rses": {
"type": "object"
},
"distances": {
"type": "object"
}
Expand Down
15 changes: 3 additions & 12 deletions lib/rucio/common/schema/generic.py
Expand Up @@ -347,17 +347,6 @@

SCOPE_NAME_REGEXP = '/(.*)/(.*)'

RSES = {"description": "list of RSEs",
"type": "array",
"items": {
"type": "object",
"properties": {
"rse": RSE
},
"required": ["rse"],
"additionalProperties": True
}}

DISTANCE = {"description": "RSE distance",
"type": "object",
"properties": {
Expand All @@ -371,7 +360,9 @@
IMPORT = {"description": "import data into rucio.",
"type": "object",
"properties": {
"rses": RSES,
"rses": {
"type": "object"
},
"distances": {
"type": "object"
}
Expand Down
19 changes: 17 additions & 2 deletions lib/rucio/core/exporter.py
Expand Up @@ -13,14 +13,29 @@
# limitations under the License.
#
# Authors:
# - Hannes Hansen <hannes.jakob.hansen@cern.ch>, 2018
# - Hannes Hansen <hannes.jakob.hansen@cern.ch>, 2018-2019
#
# PY3K COMPATIBLE

from rucio.core import rse as rse_module, distance as distance_module
from rucio.db.sqla.session import transactional_session


@transactional_session
def export_rses(session=None):
"""
Export RSE data.
:param session: database session in use.
"""
data = {}
for rse in rse_module.list_rses(session=session):
rse_name = rse['rse']
data[rse_name] = rse_module.export_rse(rse_name, session=session)

return data


@transactional_session
def export_data(session=None):
"""
Expand All @@ -29,7 +44,7 @@ def export_data(session=None):
:param session: database session in use.
"""
data = {
'rses': [rse_module.export_rse(rse['rse'], session=session) for rse in rse_module.list_rses(session=session)],
'rses': export_rses(session=session),
'distances': distance_module.export_distances(session=session)
}
return data
183 changes: 104 additions & 79 deletions lib/rucio/core/importer.py
Expand Up @@ -13,14 +13,115 @@
# limitations under the License.
#
# Authors:
# - Hannes Hansen <hannes.jakob.hansen@cern.ch>, 2018
# - Hannes Hansen <hannes.jakob.hansen@cern.ch>, 2018-2019
#
# PY3K COMPATIBLE

from six import string_types
from rucio.common.exception import RSEOperationNotSupported
from rucio.core import rse as rse_module, distance as distance_module
from rucio.db.sqla.constants import RSEType
from rucio.db.sqla.session import transactional_session


@transactional_session
def import_rses(rses, session=None):
new_rses = []
for rse_name in rses:
new_rses.append(rse_name)
rse = rses[rse_name]
if isinstance(rse.get('rse_type'), string_types):
rse['rse_type'] = RSEType.from_string(str(rse['rse_type']))
if not rse_module.rse_exists(rse_name, session=session):
rse_module.add_rse(rse_name, deterministic=rse.get('deterministic'), volatile=rse.get('volatile'),
city=rse.get('city'), region_code=rse.get('region_code'), country_name=rse.get('country_name'),
staging_area=rse.get('staging_area'), continent=rse.get('continent'), time_zone=rse.get('time_zone'),
ISP=rse.get('ISP'), rse_type=rse.get('rse_type'), latitude=rse.get('latitude'),
longitude=rse.get('longitude'), ASN=rse.get('ASN'), availability=rse.get('availability'),
session=session)
else:
rse_module.update_rse(rse_name, rse, session=session)

# Protocols
new_protocols = rse.get('protocols')
if new_protocols:
# update existing, add missing and remove left over protocols
old_protocols = [{'scheme': protocol['scheme'], 'hostname': protocol['hostname'], 'port': protocol['port']} for protocol in rse_module.get_rse_protocols(rse=rse_name, session=session)['protocols']]
missing_protocols = [new_protocol for new_protocol in new_protocols if {'scheme': new_protocol['scheme'], 'hostname': new_protocol['hostname'], 'port': new_protocol['port']} not in old_protocols]
outdated_protocols = [new_protocol for new_protocol in new_protocols if {'scheme': new_protocol['scheme'], 'hostname': new_protocol['hostname'], 'port': new_protocol['port']} in old_protocols]
new_protocols = [{'scheme': protocol['scheme'], 'hostname': protocol['hostname'], 'port': protocol['port']} for protocol in new_protocols]
to_be_removed_protocols = [old_protocol for old_protocol in old_protocols if old_protocol not in new_protocols]
for protocol in outdated_protocols:
scheme = protocol['scheme']
port = protocol['port']
hostname = protocol['hostname']
del protocol['scheme']
del protocol['hostname']
del protocol['port']
rse_module.update_protocols(rse=rse_name, scheme=scheme, data=protocol, hostname=hostname, port=port, session=session)

for protocol in missing_protocols:
rse_module.add_protocol(rse=rse_name, parameter=protocol, session=session)

for protocol in to_be_removed_protocols:
scheme = protocol['scheme']
port = protocol['port']
hostname = protocol['hostname']
rse_module.del_protocols(rse=rse_name, scheme=scheme, port=port, hostname=hostname, session=session)

# Limits
old_limits = rse_module.get_rse_limits(rse=rse_name, session=session)
for limit_name in ['MaxBeingDeletedFiles', 'MinFreeSpace']:
limit = rse.get(limit_name)
if limit:
if limit_name in old_limits:
rse_module.delete_rse_limit(rse=rse_name, name=limit_name, session=session)
rse_module.set_rse_limits(rse=rse_name, name=limit_name, value=limit, session=session)

# Attributes
attributes = rse.get('attributes', {})
attributes['lfn2pfn_algorithm'] = rse.get('lfn2pfn_algorithm')
attributes['verify_checksum'] = rse.get('verify_checksum')

old_attributes = rse_module.list_rse_attributes(rse=rse_name, session=session)
for attr in attributes:
value = attributes[attr]
if value is not None:
if attr in old_attributes:
rse_module.del_rse_attribute(rse=rse_name, key=attr, session=session)
rse_module.add_rse_attribute(rse=rse_name, key=attr, value=value, session=session)

# set deleted flag to RSEs that are missing in the import data
old_rses = [old_rse['rse'] for old_rse in rse_module.list_rses(session=session)]
for old_rse in old_rses:
if old_rse not in new_rses:
try:
rse_module.del_rse(rse=old_rse, session=session)
except RSEOperationNotSupported:
pass


@transactional_session
def import_distances(distances, session=None):
for src_rse_name in distances:
src = rse_module.get_rse_id(src_rse_name, session=session)
for dest_rse_name in distances[src_rse_name]:
dest = rse_module.get_rse_id(dest_rse_name, session=session)
distance = distances[src_rse_name][dest_rse_name]
del distance['src_rse_id']
del distance['dest_rse_id']

old_distance = distance_module.get_distances(src_rse_id=src, dest_rse_id=dest, session=session)
if old_distance:
distance_module.update_distances(src_rse_id=src, dest_rse_id=dest, parameters=distance, session=session)
else:
distance_module.add_distance(src_rse_id=src, dest_rse_id=dest, ranking=distance.get('ranking'),
agis_distance=distance.get('agis_distance'), geoip_distance=distance.get('geoip_distance'),
active=distance.get('active'), submitted=distance.get('submitted'),
transfer_speed=distance.get('transfer_speed'), finished=distance.get('finished'),
failed=distance.get('failed'), session=session)


@transactional_session
def import_data(data, session=None):
"""
Expand All @@ -32,85 +133,9 @@ def import_data(data, session=None):
# RSEs
rses = data.get('rses')
if rses:
for rse in rses:
protocols = rse.get('protocols')
if protocols:
protocols = protocols.get('protocols')
del rse['protocols']
rse_name = rse['rse']
del rse['rse']
if not rse_module.rse_exists(rse_name, session=session):
rse_module.add_rse(rse_name, deterministic=rse.get('deterministic'), volatile=rse.get('volatile'),
city=rse.get('city'), region_code=rse.get('region_code'), country_name=rse.get('country_name'),
staging_area=rse.get('staging_area'), continent=rse.get('continent'), time_zone=rse.get('time_zone'),
ISP=rse.get('ISP'), rse_type=rse.get('rse_type'), latitude=rse.get('latitude'),
longitude=rse.get('longitude'), ASN=rse.get('ASN'), availability=rse.get('availability'),
session=session)
else:
rse_module.update_rse(rse_name, rse, session=session)

# Protocols
if protocols:
old_protocols = rse_module.get_rse_protocols(rse=rse_name, session=session)
for protocol in protocols:
scheme = protocol.get('scheme')
hostname = protocol.get('hostname')
port = protocol.get('port')
intersection = [old_protocol for old_protocol in old_protocols['protocols'] if old_protocol['scheme'] == scheme and old_protocol['hostname'] == hostname and
old_protocol['port'] == port]
if intersection:
del protocol['scheme']
del protocol['hostname']
del protocol['port']
rse_module.update_protocols(rse=rse_name, scheme=scheme, data=protocol, hostname=hostname, port=port, session=session)
else:
rse_module.add_protocol(rse=rse_name, parameter=protocol, session=session)

# Limits
limits = rse.get('limits')
if limits:
old_limits = rse_module.get_rse_limits(rse=rse_name, session=session)
for limit in limits:
if limit in old_limits:
rse_module.delete_rse_limit(rse=rse_name, name=limit, session=session)
rse_module.set_rse_limits(rse=rse_name, name=limit, value=limits[limit], session=session)

# Transfer limits
transfer_limits = rse.get('transfer_limits')
if transfer_limits:
for limit in transfer_limits:
old_transfer_limits = rse_module.get_rse_transfer_limits(rse=rse_name, activity=limit, session=session)
if limit in old_transfer_limits:
rse_module.delete_rse_transfer_limits(rse=rse_name, activity=limit, session=session)
max_transfers = transfer_limits[limit].items()[0][1]['max_transfers']
rse_module.set_rse_transfer_limits(rse=rse_name, activity=limit, max_transfers=max_transfers, session=session)

# Attributes
attributes = rse.get('attributes')
if attributes:
old_attributes = rse_module.list_rse_attributes(rse=rse_name, session=session)
for attr in attributes:
if attr in old_attributes:
rse_module.del_rse_attribute(rse=rse_name, key=attr, session=session)
rse_module.add_rse_attribute(rse=rse_name, key=attr, value=attributes[attr], session=session)
import_rses(rses, session=session)

# Distances
distances = data.get('distances')
if distances:
for src_rse_name in distances:
src = rse_module.get_rse_id(src_rse_name, session=session)
for dest_rse_name in distances[src_rse_name]:
dest = rse_module.get_rse_id(dest_rse_name, session=session)
distance = distances[src_rse_name][dest_rse_name]
del distance['src_rse_id']
del distance['dest_rse_id']

old_distance = distance_module.get_distances(src_rse_id=src, dest_rse_id=dest, session=session)
if old_distance:
distance_module.update_distances(src_rse_id=src, dest_rse_id=dest, parameters=distance, session=session)
else:
distance_module.add_distance(src_rse_id=src, dest_rse_id=dest, ranking=distance.get('ranking'),
agis_distance=distance.get('agis_distance'), geoip_distance=distance.get('geoip_distance'),
active=distance.get('active'), submitted=distance.get('submitted'),
transfer_speed=distance.get('transfer_speed'), finished=distance.get('finished'),
failed=distance.get('failed'), session=session)
import_distances(distances, session=session)

0 comments on commit 93e9ac7

Please sign in to comment.