Skip to content

Commit

Permalink
Transfers: Add preparer daemon; Fix rucio#4056
Browse files Browse the repository at this point in the history
Searches for minimum distance source and sets QUEUED or WAITING state
based on throttler mode and limits.
Add PREPARING request state.
Add conveyor.use_preparer config option.
Add tests for preparer.
  • Loading branch information
bziemons committed Nov 10, 2020
1 parent 703f027 commit 09897a3
Show file tree
Hide file tree
Showing 15 changed files with 1,781 additions and 1,419 deletions.
37 changes: 37 additions & 0 deletions bin/rucio-conveyor-preparer
@@ -0,0 +1,37 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*
# Copyright 2020 CERN
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Authors:
# - Benedikt Ziemons <benedikt.ziemons@cern.ch>, 2020

import argparse
import signal

from rucio.daemons.conveyor.preparer import run, stop


def main(args):
signal.signal(signal.SIGTERM, stop)
try:
run(once=args.run_once)
except KeyboardInterrupt:
stop()


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--run-once", action="store_true", default=False, help='One iteration only')
main(args=parser.parse_args())
2 changes: 1 addition & 1 deletion etc/sql/oracle/schema.sql
Expand Up @@ -930,7 +930,7 @@ PARTITION BY LIST(SCOPE)
CONSTRAINT REQUESTS_SCOPE_NN CHECK (SCOPE IS NOT NULL),
CONSTRAINT REQUESTS_NAME_NN CHECK (NAME IS NOT NULL),
CONSTRAINT REQUESTS_TYPE_CHK CHECK (request_type in ('U', 'D', 'T', 'I', 'O')),
CONSTRAINT REQUESTS_STATE_CHK CHECK (state IN ('Q', 'G', 'S', 'D', 'F', 'L', 'N', 'O', 'A', 'U', 'W', 'M'))
CONSTRAINT REQUESTS_STATE_CHK CHECK (state IN ('Q', 'G', 'S', 'D', 'F', 'L', 'N', 'O', 'A', 'U', 'W', 'M', 'P'))
) PCTFREE 3;


Expand Down
2 changes: 1 addition & 1 deletion lib/rucio/alembicrevision.py
Expand Up @@ -17,4 +17,4 @@
# - Benedikt Ziemons <benedikt.ziemons@cern.ch>, 2020
# - Martin Barisits <martin.barisits@cern.ch>, 2020

ALEMBIC_REVISION = '8ea9122275b1' # the current alembic head revision
ALEMBIC_REVISION = 'd23453595260' # the current alembic head revision
9 changes: 6 additions & 3 deletions lib/rucio/core/distance.py
Expand Up @@ -20,8 +20,8 @@
# - Hannes Hansen <hannes.jakob.hansen@cern.ch>, 2018-2019
# - Andrew Lister <andrew.lister@stfc.ac.uk>, 2019
# - Benedikt Ziemons <benedikt.ziemons@cern.ch>, 2020
#
# PY3K COMPATIBLE

from typing import TYPE_CHECKING

from sqlalchemy.exc import DatabaseError, IntegrityError
from sqlalchemy.orm import aliased
Expand All @@ -30,6 +30,9 @@
from rucio.db.sqla.models import Distance, RSE
from rucio.db.sqla.session import transactional_session, read_session

if TYPE_CHECKING:
from typing import List, Dict


@transactional_session
def add_distance(src_rse_id, dest_rse_id, ranking=None, agis_distance=None, geoip_distance=None,
Expand Down Expand Up @@ -77,7 +80,7 @@ def add_distance_short(src_rse_id, dest_rse_id, distance=None, session=None):


@read_session
def get_distances(src_rse_id=None, dest_rse_id=None, session=None):
def get_distances(src_rse_id=None, dest_rse_id=None, session=None) -> "List[Dict]":
"""
Get distances between rses.
Expand Down
120 changes: 38 additions & 82 deletions lib/rucio/core/request.py
@@ -1,23 +1,33 @@
# Copyright European Organization for Nuclear Research (CERN)
# -*- coding: utf-8 -*-
# Copyright 2013-2020 CERN
#
# Licensed under the Apache License, Version 2.0 (the "License");
# You may not use this file except in compliance with the License.
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Authors:
# - Mario Lassnig, <mario.lassnig@cern.ch>, 2013-2015, 2017
# - Vincent Garonne, <vincent.garonne@cern.ch>, 2015-2017
# - Martin Barisits, <martin.barisits@cern.ch>, 2014-2017
# - Wen Guan, <wen.guan@cern.ch>, 2014-2016
# - Joaquin Bogado, <jbogadog@cern.ch>, 2016
# - Thomas Beermann, <thomas.beermann@cern.ch>, 2016
# - Cedric Serfon, <cedric.serfon@cern.ch>, 2017-2020
# - Hannes Hansen, <hannes.jakob.hansen@cern.ch>, 2018-2019
# - Andrew Lister, <andrew.lister@stfc.ac.uk>, 2019
# - Brandon White, <bjwhite@fnal.gov>, 2019
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# PY3K COMPATIBLE
# Authors:
# - Mario Lassnig <mario.lassnig@cern.ch>, 2013-2017
# - Vincent Garonne <vincent.garonne@cern.ch>, 2013-2017
# - Cedric Serfon <cedric.serfon@cern.ch>, 2014-2020
# - Martin Barisits <martin.barisits@cern.ch>, 2014-2020
# - Wen Guan <wen.guan@cern.ch>, 2014-2016
# - Joaquín Bogado <jbogado@linti.unlp.edu.ar>, 2015-2019
# - Thomas Beermann <thomas.beermann@cern.ch>, 2016
# - Joaquin Bogado <jbogadog@cern.ch>, 2017
# - Igor Mandrichenko <rucio@fermicloud055.fnal.gov>, 2018
# - Robert Illingworth <illingwo@fnal.gov>, 2018
# - Hannes Hansen <hannes.jakob.hansen@cern.ch>, 2018-2019
# - Andrew Lister <andrew.lister@stfc.ac.uk>, 2019
# - Brandon White <bjwhite@fnal.gov>, 2019
# - Benedikt Ziemons <benedikt.ziemons@cern.ch>, 2020

import datetime
import json
Expand All @@ -26,19 +36,17 @@
import traceback

from six import string_types

from sqlalchemy import and_, or_, func, update
from sqlalchemy.exc import IntegrityError
# from sqlalchemy.sql import tuple_
from sqlalchemy.sql.expression import asc, false, true

from rucio.common.exception import RequestNotFound, RucioException, UnsupportedOperation, ConfigNotFound
from rucio.common.config import config_get_bool
from rucio.common.exception import RequestNotFound, RucioException, UnsupportedOperation
from rucio.common.types import InternalAccount, InternalScope
from rucio.common.utils import generate_uuid, chunks, get_parsed_throttler_mode
from rucio.core.config import get
from rucio.common.utils import generate_uuid, chunks
from rucio.core.message import add_message
from rucio.core.monitor import record_counter, record_timer
from rucio.core.rse import get_rse_name, get_rse_transfer_limits, get_rse_vo
from rucio.core.rse import get_rse_name, get_rse_vo
from rucio.db.sqla import models, filter_thread_work
from rucio.db.sqla.constants import RequestState, RequestType, FTSState, ReplicaState, LockState, RequestErrMsg
from rucio.db.sqla.session import read_session, transactional_session, stream_session
Expand Down Expand Up @@ -126,7 +134,8 @@ def queue_requests(requests, session=None):
logging.debug("queue requests")

request_clause = []
transfer_limits, rses = get_rse_transfer_limits(session=session), {}
rses = {}
preparer_enabled = config_get_bool('conveyor', 'use_preparer', raise_exception=False, default=False)
for req in requests:

if isinstance(req['attributes'], string_types):
Expand Down Expand Up @@ -157,82 +166,28 @@ def queue_requests(requests, session=None):
for request in query_existing_requests:
existing_requests.append(request)

# Temporary disabled
source_rses = {}
# request_scopes_names = [(request['scope'], request['name']) for request in requests]
# for chunked_requests in chunks(request_scopes_names, 50):
# results = session.query(models.RSEFileAssociation.scope, models.RSEFileAssociation.name, models.RSEFileAssociation.rse_id, models.Distance.dest_rse_id, models.Distance.ranking)\
# .filter(tuple_(models.RSEFileAssociation.scope, models.RSEFileAssociation.name).in_(chunked_requests))\
# .join(models.Distance, models.Distance.src_rse_id == models.RSEFileAssociation.rse_id)\
# .all()
# for result in results:
# scope = result[0]
# name = result[1]
# src_rse_id = result[2]
# dest_rse_id = result[3]
# distance = result[4]
# if scope not in source_rses:
# source_rses[scope] = {}
# if name not in source_rses[scope]:
# source_rses[scope][name] = {}
# if dest_rse_id not in source_rses[scope][name]:
# source_rses[scope][name][dest_rse_id] = {}
# if src_rse_id not in source_rses[scope][name][dest_rse_id]:
# source_rses[scope][name][dest_rse_id][src_rse_id] = distance

try:
throttler_mode = get('throttler', 'mode', default=None, use_cache=False, session=session)
direction, all_activities = get_parsed_throttler_mode(throttler_mode)
except ConfigNotFound:
throttler_mode = None

new_requests, sources, messages = [], [], []
for request in requests:
dest_rse_name = get_rse_name(rse_id=request['dest_rse_id'], session=session)
if req['request_type'] == RequestType.TRANSFER and (request['scope'], request['name'], request['dest_rse_id']) in existing_requests:
logging.warn('Request TYPE %s for DID %s:%s at RSE %s exists - ignoring' % (request['request_type'],
request['scope'],
request['name'],
dest_rse_name))
logging.warning('Request TYPE %s for DID %s:%s at RSE %s exists - ignoring' % (request['request_type'],
request['scope'],
request['name'],
dest_rse_name))
continue

def temp_serializer(obj):
if isinstance(obj, (InternalAccount, InternalScope)):
return obj.internal
raise TypeError('Could not serialise object %r' % obj)

source_rse_id = request.get('source_rse_id')
if not source_rse_id:
try:
source_rses_of_request = source_rses[request['scope']][request['name']][request['dest_rse_id']]
source_rse_id = min(source_rses_of_request, key=source_rses_of_request.get)
except KeyError:
pass
activity_limit = transfer_limits.get(request['attributes']['activity'], {})
all_activities_limit = transfer_limits.get('all_activities', {})
limit_found = False
if throttler_mode:
if direction == 'source':
if all_activities:
if all_activities_limit.get(source_rse_id):
limit_found = True
else:
if activity_limit.get(source_rse_id):
limit_found = True
elif direction == 'destination':
if all_activities:
if all_activities_limit.get(request['dest_rse_id']):
limit_found = True
else:
if activity_limit.get(request['dest_rse_id']):
limit_found = True
request['state'] = RequestState.WAITING if limit_found else RequestState.QUEUED
request['state'] = RequestState.PREPARING if preparer_enabled else RequestState.QUEUED

new_request = {'request_type': request['request_type'],
'scope': request['scope'],
'name': request['name'],
'dest_rse_id': request['dest_rse_id'],
'source_rse_id': source_rse_id,
'source_rse_id': request.get('source_rse_id', None),
'attributes': json.dumps(request['attributes'], default=temp_serializer),
'state': request['state'],
'rule_id': request['rule_id'],
Expand Down Expand Up @@ -295,6 +250,7 @@ def temp_serializer(obj):

for messages_chunk in chunks(messages, 1000):
session.bulk_insert_mappings(models.Message, messages_chunk)

return new_requests


Expand Down
44 changes: 23 additions & 21 deletions lib/rucio/core/transfer.py
Expand Up @@ -32,9 +32,7 @@
# - Eli Chadwick <eli.chadwick@stfc.ac.uk>, 2020
# - Nick Smith <nick.smith@cern.ch>, 2020
# - Patrick Austin <patrick.austin@stfc.ac.uk>, 2020
#
# PY3K COMPATIBLE

# - Benedikt Ziemons <benedikt.ziemons@cern.ch>, 2020

from __future__ import division

Expand All @@ -54,14 +52,14 @@
from sqlalchemy.sql.expression import false

from rucio.common import constants
from rucio.common.config import config_get
from rucio.common.constants import SUPPORTED_PROTOCOLS
from rucio.common.exception import (InvalidRSEExpression, NoDistance,
RequestNotFound, RSEProtocolNotSupported,
RucioException, UnsupportedOperation)
from rucio.common.config import config_get
from rucio.common.rse_attributes import get_rse_attributes
from rucio.common.types import InternalAccount
from rucio.common.utils import construct_surl
from rucio.common.constants import SUPPORTED_PROTOCOLS
from rucio.core import did, message as message_core, request as request_core
from rucio.core.config import get as core_config_get
from rucio.core.monitor import record_counter, record_timer
Expand All @@ -76,7 +74,6 @@
from rucio.rse import rsemanager as rsemgr
from rucio.transfertool.fts3 import FTS3Transfertool


# Extra modules: Only imported if available
EXTRA_MODULES = {'globus_sdk': False}

Expand Down Expand Up @@ -650,6 +647,7 @@ def get_transfer_requests_and_source_replicas(total_workers=0, worker_number=0,
activity=activity,
older_than=older_than,
rses=rses,
request_state=RequestState.QUEUED,
session=session)

unavailable_read_rse_ids = __get_unavailable_rse_ids(operation='read', session=session)
Expand Down Expand Up @@ -1260,8 +1258,8 @@ def get_transfer_requests_and_source_replicas(total_workers=0, worker_number=0,


@read_session
def __list_transfer_requests_and_source_replicas(total_workers=0, worker_number=0,
limit=None, activity=None, older_than=None, rses=None, session=None):
def __list_transfer_requests_and_source_replicas(total_workers=0, worker_number=0, limit=None, activity=None,
older_than=None, rses=None, request_state=None, session=None):
"""
List requests with source replicas
:param total_workers: Number of total workers.
Expand All @@ -1273,6 +1271,10 @@ def __list_transfer_requests_and_source_replicas(total_workers=0, worker_number=
:param session: Database session to use.
:returns: List.
"""

if request_state is None:
request_state = RequestState.QUEUED

sub_requests = session.query(models.Request.id,
models.Request.rule_id,
models.Request.scope,
Expand All @@ -1285,12 +1287,12 @@ def __list_transfer_requests_and_source_replicas(total_workers=0, worker_number=
models.Request.previous_attempt_id,
models.Request.dest_rse_id,
models.Request.retry_count,
models.Request.account)\
.with_hint(models.Request, "INDEX(REQUESTS REQUESTS_TYP_STA_UPD_IDX)", 'oracle')\
.filter(models.Request.state == RequestState.QUEUED)\
.filter(models.Request.request_type == RequestType.TRANSFER)\
.join(models.RSE, models.RSE.id == models.Request.dest_rse_id)\
.filter(models.RSE.deleted == false())\
models.Request.account) \
.with_hint(models.Request, "INDEX(REQUESTS REQUESTS_TYP_STA_UPD_IDX)", 'oracle') \
.filter(models.Request.state == request_state) \
.filter(models.Request.request_type == RequestType.TRANSFER) \
.join(models.RSE, models.RSE.id == models.Request.dest_rse_id) \
.filter(models.RSE.deleted == false()) \
.filter(models.RSE.availability.in_((2, 3, 6, 7)))

if isinstance(older_than, datetime.datetime):
Expand Down Expand Up @@ -1326,19 +1328,19 @@ def __list_transfer_requests_and_source_replicas(total_workers=0, worker_number=
sub_requests.c.retry_count,
models.Source.url,
models.Source.ranking,
models.Distance.ranking)\
models.Distance.ranking) \
.outerjoin(models.RSEFileAssociation, and_(sub_requests.c.scope == models.RSEFileAssociation.scope,
sub_requests.c.name == models.RSEFileAssociation.name,
models.RSEFileAssociation.state == ReplicaState.AVAILABLE,
sub_requests.c.dest_rse_id != models.RSEFileAssociation.rse_id))\
.with_hint(models.RSEFileAssociation, "+ index(replicas REPLICAS_PK)", 'oracle')\
sub_requests.c.dest_rse_id != models.RSEFileAssociation.rse_id)) \
.with_hint(models.RSEFileAssociation, "+ index(replicas REPLICAS_PK)", 'oracle') \
.outerjoin(models.RSE, and_(models.RSE.id == models.RSEFileAssociation.rse_id,
models.RSE.deleted == false()))\
models.RSE.deleted == false())) \
.outerjoin(models.Source, and_(sub_requests.c.id == models.Source.request_id,
models.RSE.id == models.Source.rse_id))\
.with_hint(models.Source, "+ index(sources SOURCES_PK)", 'oracle')\
models.RSE.id == models.Source.rse_id)) \
.with_hint(models.Source, "+ index(sources SOURCES_PK)", 'oracle') \
.outerjoin(models.Distance, and_(sub_requests.c.dest_rse_id == models.Distance.dest_rse_id,
models.RSEFileAssociation.rse_id == models.Distance.src_rse_id))\
models.RSEFileAssociation.rse_id == models.Distance.src_rse_id)) \
.with_hint(models.Distance, "+ index(distances DISTANCES_PK)", 'oracle')

if rses:
Expand Down

0 comments on commit 09897a3

Please sign in to comment.