Skip to content

Commit

Permalink
Transfers: Add preparer daemon; Fix rucio#4056
Browse files Browse the repository at this point in the history
Searches for minimum distance source and sets QUEUED or WAITING state
based on throttler mode and limits.
Add PREPARING request state.
Add conveyor.use_preparer config option.
Add tests for preparer.
  • Loading branch information
bziemons committed Dec 1, 2020
1 parent a8343c2 commit 6afac9d
Show file tree
Hide file tree
Showing 15 changed files with 1,805 additions and 1,387 deletions.
40 changes: 40 additions & 0 deletions bin/rucio-conveyor-preparer
@@ -0,0 +1,40 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*
# Copyright 2020 CERN
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Authors:
# - Benedikt Ziemons <benedikt.ziemons@cern.ch>, 2020

import argparse
import signal

from rucio.daemons.conveyor.preparer import run, stop


def main(args):
signal.signal(signal.SIGTERM, stop)
try:
run(once=args.run_once, threads=args.threads, sleep_time=args.sleep_time, bulk=args.bulk)
except KeyboardInterrupt:
stop()


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--run-once", action="store_true", default=False, help='One iteration only')
parser.add_argument("--threads", action="store", default=1, type=int, help='Concurrency control: total number of threads on this process')
parser.add_argument("--sleep-time", action="store", default=60, type=int, help='Concurrency control: thread sleep time after each chunk of work')
parser.add_argument("--bulk", action="store", default=100, type=int, help='Limit of requests per chunk')
main(args=parser.parse_args())
2 changes: 1 addition & 1 deletion etc/sql/oracle/schema.sql
Expand Up @@ -930,7 +930,7 @@ PARTITION BY LIST(SCOPE)
CONSTRAINT REQUESTS_SCOPE_NN CHECK (SCOPE IS NOT NULL),
CONSTRAINT REQUESTS_NAME_NN CHECK (NAME IS NOT NULL),
CONSTRAINT REQUESTS_TYPE_CHK CHECK (request_type in ('U', 'D', 'T', 'I', 'O')),
CONSTRAINT REQUESTS_STATE_CHK CHECK (state IN ('Q', 'G', 'S', 'D', 'F', 'L', 'N', 'O', 'A', 'U', 'W', 'M'))
CONSTRAINT REQUESTS_STATE_CHK CHECK (state IN ('Q', 'G', 'S', 'D', 'F', 'L', 'N', 'O', 'A', 'U', 'W', 'M', 'P'))
) PCTFREE 3;


Expand Down
2 changes: 1 addition & 1 deletion lib/rucio/alembicrevision.py
Expand Up @@ -17,4 +17,4 @@
# - Benedikt Ziemons <benedikt.ziemons@cern.ch>, 2020
# - Martin Barisits <martin.barisits@cern.ch>, 2020

ALEMBIC_REVISION = '8ea9122275b1' # the current alembic head revision
ALEMBIC_REVISION = 'd23453595260' # the current alembic head revision
9 changes: 6 additions & 3 deletions lib/rucio/core/distance.py
Expand Up @@ -20,8 +20,8 @@
# - Hannes Hansen <hannes.jakob.hansen@cern.ch>, 2018-2019
# - Andrew Lister <andrew.lister@stfc.ac.uk>, 2019
# - Benedikt Ziemons <benedikt.ziemons@cern.ch>, 2020
#
# PY3K COMPATIBLE

from typing import TYPE_CHECKING

from sqlalchemy.exc import DatabaseError, IntegrityError
from sqlalchemy.orm import aliased
Expand All @@ -30,6 +30,9 @@
from rucio.db.sqla.models import Distance, RSE
from rucio.db.sqla.session import transactional_session, read_session

if TYPE_CHECKING:
from typing import List, Dict


@transactional_session
def add_distance(src_rse_id, dest_rse_id, ranking=None, agis_distance=None, geoip_distance=None,
Expand Down Expand Up @@ -77,7 +80,7 @@ def add_distance_short(src_rse_id, dest_rse_id, distance=None, session=None):


@read_session
def get_distances(src_rse_id=None, dest_rse_id=None, session=None):
def get_distances(src_rse_id=None, dest_rse_id=None, session=None) -> "List[Dict]":
"""
Get distances between rses.
Expand Down
151 changes: 91 additions & 60 deletions lib/rucio/core/request.py
Expand Up @@ -27,32 +27,37 @@
# - Hannes Hansen <hannes.jakob.hansen@cern.ch>, 2018-2019
# - Andrew Lister <andrew.lister@stfc.ac.uk>, 2019
# - Brandon White <bjwhite@fnal.gov>, 2019
# - Benedikt Ziemons <benedikt.ziemons@cern.ch>, 2020

import datetime
import json
import logging
import time
import traceback
from typing import TYPE_CHECKING

from six import string_types

from sqlalchemy import and_, or_, func, update
from sqlalchemy.exc import IntegrityError
# from sqlalchemy.sql import tuple_
from sqlalchemy.sql.expression import asc, false, true

from rucio.common.config import config_get_bool
from rucio.common.exception import RequestNotFound, RucioException, UnsupportedOperation, ConfigNotFound
from rucio.common.types import InternalAccount, InternalScope
from rucio.common.utils import generate_uuid, chunks, get_parsed_throttler_mode
from rucio.core.config import get
from rucio.core.message import add_message
from rucio.core.monitor import record_counter, record_timer
from rucio.core.rse import get_rse_name, get_rse_transfer_limits, get_rse_vo
from rucio.core.rse import get_rse_name, get_rse_vo, get_rse_transfer_limits
from rucio.db.sqla import models, filter_thread_work
from rucio.db.sqla.constants import RequestState, RequestType, FTSState, ReplicaState, LockState, RequestErrMsg
from rucio.db.sqla.session import read_session, transactional_session, stream_session
from rucio.transfertool.fts3 import FTS3Transfertool

if TYPE_CHECKING:
from typing import List, Tuple, Iterable, Iterator, Optional
from sqlalchemy.orm import Session

"""
The core request.py is specifically for handling requests.
Requests accessed by external_id (So called transfers), are covered in the core transfer.py
Expand Down Expand Up @@ -135,7 +140,8 @@ def queue_requests(requests, session=None):
logging.debug("queue requests")

request_clause = []
transfer_limits, rses = get_rse_transfer_limits(session=session), {}
rses = {}
preparer_enabled = config_get_bool('conveyor', 'use_preparer', raise_exception=False, default=False)
for req in requests:

if isinstance(req['attributes'], string_types):
Expand Down Expand Up @@ -166,35 +172,6 @@ def queue_requests(requests, session=None):
for request in query_existing_requests:
existing_requests.append(request)

# Temporary disabled
source_rses = {}
# request_scopes_names = [(request['scope'], request['name']) for request in requests]
# for chunked_requests in chunks(request_scopes_names, 50):
# results = session.query(models.RSEFileAssociation.scope, models.RSEFileAssociation.name, models.RSEFileAssociation.rse_id, models.Distance.dest_rse_id, models.Distance.ranking)\
# .filter(tuple_(models.RSEFileAssociation.scope, models.RSEFileAssociation.name).in_(chunked_requests))\
# .join(models.Distance, models.Distance.src_rse_id == models.RSEFileAssociation.rse_id)\
# .all()
# for result in results:
# scope = result[0]
# name = result[1]
# src_rse_id = result[2]
# dest_rse_id = result[3]
# distance = result[4]
# if scope not in source_rses:
# source_rses[scope] = {}
# if name not in source_rses[scope]:
# source_rses[scope][name] = {}
# if dest_rse_id not in source_rses[scope][name]:
# source_rses[scope][name][dest_rse_id] = {}
# if src_rse_id not in source_rses[scope][name][dest_rse_id]:
# source_rses[scope][name][dest_rse_id][src_rse_id] = distance

try:
throttler_mode = get('throttler', 'mode', default=None, use_cache=False, session=session)
direction, all_activities = get_parsed_throttler_mode(throttler_mode)
except ConfigNotFound:
throttler_mode = None

new_requests, sources, messages = [], [], []
for request in requests:
dest_rse_name = get_rse_name(rse_id=request['dest_rse_id'], session=session)
Expand All @@ -210,38 +187,13 @@ def temp_serializer(obj):
return obj.internal
raise TypeError('Could not serialise object %r' % obj)

source_rse_id = request.get('source_rse_id')
if not source_rse_id:
try:
source_rses_of_request = source_rses[request['scope']][request['name']][request['dest_rse_id']]
source_rse_id = min(source_rses_of_request, key=source_rses_of_request.get)
except KeyError:
pass
activity_limit = transfer_limits.get(request['attributes']['activity'], {})
all_activities_limit = transfer_limits.get('all_activities', {})
limit_found = False
if throttler_mode:
if direction == 'source':
if all_activities:
if all_activities_limit.get(source_rse_id):
limit_found = True
else:
if activity_limit.get(source_rse_id):
limit_found = True
elif direction == 'destination':
if all_activities:
if all_activities_limit.get(request['dest_rse_id']):
limit_found = True
else:
if activity_limit.get(request['dest_rse_id']):
limit_found = True
request['state'] = RequestState.WAITING if limit_found else RequestState.QUEUED
request['state'] = RequestState.PREPARING if preparer_enabled else RequestState.QUEUED

new_request = {'request_type': request['request_type'],
'scope': request['scope'],
'name': request['name'],
'dest_rse_id': request['dest_rse_id'],
'source_rse_id': source_rse_id,
'source_rse_id': request.get('source_rse_id', None),
'attributes': json.dumps(request['attributes'], default=temp_serializer),
'state': request['state'],
'rule_id': request['rule_id'],
Expand Down Expand Up @@ -304,6 +256,7 @@ def temp_serializer(obj):

for messages_chunk in chunks(messages, 1000):
session.bulk_insert_mappings(models.Message, messages_chunk)

return new_requests


Expand Down Expand Up @@ -1539,3 +1492,81 @@ def list_requests(src_rse_ids, dst_rse_ids, states=[RequestState.WAITING], sessi
models.Request.dest_rse_id.in_(dst_rse_ids))
for request in query.yield_per(500):
yield request


# important column indices from __list_transfer_requests_and_source_replicas
request_id_col = 0
activity_col = 7
dest_rse_id_col = 10
source_rse_id_col = 12
distance_col = 20


@transactional_session
def preparer_update_requests(source_iter: "Iterable[Tuple]", session: "Optional[Session]" = None) -> int:
count = 0
for req_source in source_iter:
new_state = __throttler_request_state(
activity=req_source[activity_col],
source_rse_id=req_source[source_rse_id_col],
dest_rse_id=req_source[dest_rse_id_col],
session=session,
)
session.query(models.Request).filter_by(id=req_source[request_id_col]).update({
models.Request.state: new_state,
models.Request.source_rse_id: req_source[source_rse_id_col],
}, synchronize_session=False)
count += 1
return count


def __throttler_request_state(activity, source_rse_id, dest_rse_id, session: "Optional[Session]" = None) -> RequestState:
"""
Takes request attributes to return a new state for the request
based on throttler settings. Always returns QUEUED,
if the throttler mode is not set.
"""
try:
throttler_mode = get('throttler', 'mode', default=None, use_cache=False, session=session)
except ConfigNotFound:
throttler_mode = None

limit_found = False
if throttler_mode:
transfer_limits = get_rse_transfer_limits(session=session)
activity_limit = transfer_limits.get(activity, {})
all_activities_limit = transfer_limits.get('all_activities', {})
direction, all_activities = get_parsed_throttler_mode(throttler_mode)
if direction == 'source':
if all_activities:
if all_activities_limit.get(source_rse_id):
limit_found = True
else:
if activity_limit.get(source_rse_id):
limit_found = True
elif direction == 'destination':
if all_activities:
if all_activities_limit.get(dest_rse_id):
limit_found = True
else:
if activity_limit.get(dest_rse_id):
limit_found = True

return RequestState.WAITING if limit_found else RequestState.QUEUED


def minimum_distance_requests(req_sources: "List[Tuple]") -> "Iterator[Tuple]":
# sort by Request.id, should be pretty quick since the database should return it this way (tim sort)
req_sources.sort(key=lambda t: t[request_id_col])

# req_sources must be non-empty and sorted by ids at this point, see above
cur_request_id = req_sources[0][request_id_col]
shortest_item = req_sources[0]
for idx in range(1, len(req_sources)):
if cur_request_id != req_sources[idx][request_id_col]:
yield shortest_item
cur_request_id = req_sources[idx][request_id_col]
shortest_item = req_sources[idx]
elif req_sources[idx][distance_col] < shortest_item[distance_col]:
shortest_item = req_sources[idx]
yield shortest_item

0 comments on commit 6afac9d

Please sign in to comment.