From e61046f3d0c84af3360e5fbbabb63bb349d9b4fd Mon Sep 17 00:00:00 2001 From: Benedikt Ziemons Date: Mon, 9 Nov 2020 12:20:56 +0100 Subject: [PATCH] Transfers: Add preparer daemon; Fix #4056 Searches for minimum distance source and sets QUEUED or WAITING state based on throttler mode and limits. Add PREPARING request state. Add conveyor.use_preparer config option. Add tests for preparer. --- bin/rucio-conveyor-preparer | 37 + etc/docker/dev/docker-compose-storage.yml | 22 +- etc/docker/dev/docker-compose.yml | 8 +- etc/sql/oracle/schema.sql | 2 +- lib/rucio/alembicrevision.py | 2 +- lib/rucio/core/distance.py | 9 +- lib/rucio/core/request.py | 120 +- lib/rucio/core/transfer.py | 48 +- lib/rucio/daemons/conveyor/preparer.py | 154 +++ lib/rucio/db/sqla/constants.py | 29 +- ...95260_extend_request_state_for_preparer.py | 72 ++ lib/rucio/tests/test_daemons.py | 5 +- lib/rucio/tests/test_preparer.py | 162 +++ lib/rucio/tests/test_request.py | 1099 +---------------- lib/rucio/tests/test_throttler.py | 10 +- tools/add_header | 5 +- 16 files changed, 543 insertions(+), 1241 deletions(-) create mode 100755 bin/rucio-conveyor-preparer create mode 100644 lib/rucio/daemons/conveyor/preparer.py create mode 100644 lib/rucio/db/sqla/migrate_repo/versions/d23453595260_extend_request_state_for_preparer.py create mode 100644 lib/rucio/tests/test_preparer.py diff --git a/bin/rucio-conveyor-preparer b/bin/rucio-conveyor-preparer new file mode 100755 index 0000000000..0360279069 --- /dev/null +++ b/bin/rucio-conveyor-preparer @@ -0,0 +1,37 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -* +# Copyright 2020 CERN +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Authors: +# - Benedikt Ziemons , 2020 + +import argparse +import signal + +from rucio.daemons.conveyor.preparer import run, stop + + +def main(args): + signal.signal(signal.SIGTERM, stop) + try: + run(once=args.run_once) + except KeyboardInterrupt: + stop() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--run-once", action="store_true", default=False, help='One iteration only') + main(args=parser.parse_args()) diff --git a/etc/docker/dev/docker-compose-storage.yml b/etc/docker/dev/docker-compose-storage.yml index d4bc844086..da0a489336 100644 --- a/etc/docker/dev/docker-compose-storage.yml +++ b/etc/docker/dev/docker-compose-storage.yml @@ -15,9 +15,9 @@ services: - xrd3:xrd3 - minio:minio volumes: - - ../../../tools:/opt/rucio/tools - - ../../../bin:/opt/rucio/bin - - ../../../lib:/opt/rucio/lib + - ../../../tools:/opt/rucio/tools:Z + - ../../../bin:/opt/rucio/bin:Z + - ../../../lib:/opt/rucio/lib:Z environment: - X509_USER_CERT=/opt/rucio/etc/usercert.pem - X509_USER_KEY=/opt/rucio/etc/userkey.pem @@ -59,8 +59,8 @@ services: ports: - "1094:1094" volumes: - - ../../certs/hostcert_xrd1.pem:/tmp/xrdcert.pem - - ../../certs/hostcert_xrd1.key.pem:/tmp/xrdkey.pem + - ../../certs/hostcert_xrd1.pem:/tmp/xrdcert.pem:Z + - ../../certs/hostcert_xrd1.key.pem:/tmp/xrdkey.pem:Z xrd2: image: rucio/xrootd hostname: xrd2 @@ -69,8 +69,8 @@ services: ports: - "1095:1095" volumes: - - ../../certs/hostcert_xrd2.pem:/tmp/xrdcert.pem - - ../../certs/hostcert_xrd2.key.pem:/tmp/xrdkey.pem + - ../../certs/hostcert_xrd2.pem:/tmp/xrdcert.pem:Z + - ../../certs/hostcert_xrd2.key.pem:/tmp/xrdkey.pem:Z xrd3: image: rucio/xrootd hostname: xrd3 @@ -79,8 +79,8 @@ services: ports: - "1096:1096" volumes: - - ../../certs/hostcert_xrd3.pem:/tmp/xrdcert.pem - - ../../certs/hostcert_xrd3.key.pem:/tmp/xrdkey.pem + - ../../certs/hostcert_xrd3.pem:/tmp/xrdcert.pem:Z + - ../../certs/hostcert_xrd3.key.pem:/tmp/xrdkey.pem:Z minio: image: minio/minio hostname: minio @@ -90,6 +90,6 @@ services: ports: - "9000:9000" volumes: - - ../../certs/hostcert_minio.pem:/root/.minio/certs/public.crt - - ../../certs/hostcert_minio.key.pem:/root/.minio/certs/private.key + - ../../certs/hostcert_minio.pem:/root/.minio/certs/public.crt:Z + - ../../certs/hostcert_minio.key.pem:/root/.minio/certs/private.key:Z command: ["server", "/data"] diff --git a/etc/docker/dev/docker-compose.yml b/etc/docker/dev/docker-compose.yml index c49ea14d33..1d4fa22f34 100644 --- a/etc/docker/dev/docker-compose.yml +++ b/etc/docker/dev/docker-compose.yml @@ -1,7 +1,7 @@ version: "2" services: rucio: - image: rucio/rucio-dev + image: rucio/rucio-dev:py3 hostname: rucio ports: - "443:443" @@ -9,9 +9,9 @@ services: - ruciodb:ruciodb - graphite:graphite volumes: - - ../../../tools:/opt/rucio/tools - - ../../../bin:/opt/rucio/bin - - ../../../lib:/opt/rucio/lib + - ../../../tools:/opt/rucio/tools:Z + - ../../../bin:/opt/rucio/bin:Z + - ../../../lib:/opt/rucio/lib:Z environment: - X509_USER_CERT=/opt/rucio/etc/usercert.pem - X509_USER_KEY=/opt/rucio/etc/userkey.pem diff --git a/etc/sql/oracle/schema.sql b/etc/sql/oracle/schema.sql index 3a40a3223a..3c7447165c 100644 --- a/etc/sql/oracle/schema.sql +++ b/etc/sql/oracle/schema.sql @@ -930,7 +930,7 @@ PARTITION BY LIST(SCOPE) CONSTRAINT REQUESTS_SCOPE_NN CHECK (SCOPE IS NOT NULL), CONSTRAINT REQUESTS_NAME_NN CHECK (NAME IS NOT NULL), CONSTRAINT REQUESTS_TYPE_CHK CHECK (request_type in ('U', 'D', 'T', 'I', 'O')), - CONSTRAINT REQUESTS_STATE_CHK CHECK (state IN ('Q', 'G', 'S', 'D', 'F', 'L', 'N', 'O', 'A', 'U', 'W', 'M')) + CONSTRAINT REQUESTS_STATE_CHK CHECK (state IN ('Q', 'G', 'S', 'D', 'F', 'L', 'N', 'O', 'A', 'U', 'W', 'M', 'P')) ) PCTFREE 3; diff --git a/lib/rucio/alembicrevision.py b/lib/rucio/alembicrevision.py index b949f50715..c1f4fe9dac 100644 --- a/lib/rucio/alembicrevision.py +++ b/lib/rucio/alembicrevision.py @@ -17,4 +17,4 @@ # - Benedikt Ziemons , 2020 # - Martin Barisits , 2020 -ALEMBIC_REVISION = '8ea9122275b1' # the current alembic head revision +ALEMBIC_REVISION = 'd23453595260' # the current alembic head revision diff --git a/lib/rucio/core/distance.py b/lib/rucio/core/distance.py index ab2c0ef9b5..5a86b8d1e6 100644 --- a/lib/rucio/core/distance.py +++ b/lib/rucio/core/distance.py @@ -20,8 +20,8 @@ # - Hannes Hansen , 2018-2019 # - Andrew Lister , 2019 # - Benedikt Ziemons , 2020 -# -# PY3K COMPATIBLE + +from typing import TYPE_CHECKING from sqlalchemy.exc import DatabaseError, IntegrityError from sqlalchemy.orm import aliased @@ -30,6 +30,9 @@ from rucio.db.sqla.models import Distance, RSE from rucio.db.sqla.session import transactional_session, read_session +if TYPE_CHECKING: + from typing import List, Dict + @transactional_session def add_distance(src_rse_id, dest_rse_id, ranking=None, agis_distance=None, geoip_distance=None, @@ -77,7 +80,7 @@ def add_distance_short(src_rse_id, dest_rse_id, distance=None, session=None): @read_session -def get_distances(src_rse_id=None, dest_rse_id=None, session=None): +def get_distances(src_rse_id=None, dest_rse_id=None, session=None) -> "List[Dict]": """ Get distances between rses. diff --git a/lib/rucio/core/request.py b/lib/rucio/core/request.py index d408361704..cc5d93cfe6 100644 --- a/lib/rucio/core/request.py +++ b/lib/rucio/core/request.py @@ -1,23 +1,33 @@ -# Copyright European Organization for Nuclear Research (CERN) +# -*- coding: utf-8 -*- +# Copyright 2013-2020 CERN # # Licensed under the Apache License, Version 2.0 (the "License"); -# You may not use this file except in compliance with the License. +# you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 # -# Authors: -# - Mario Lassnig, , 2013-2015, 2017 -# - Vincent Garonne, , 2015-2017 -# - Martin Barisits, , 2014-2017 -# - Wen Guan, , 2014-2016 -# - Joaquin Bogado, , 2016 -# - Thomas Beermann, , 2016 -# - Cedric Serfon, , 2017-2020 -# - Hannes Hansen, , 2018-2019 -# - Andrew Lister, , 2019 -# - Brandon White, , 2019 +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. # -# PY3K COMPATIBLE +# Authors: +# - Mario Lassnig , 2013-2017 +# - Vincent Garonne , 2013-2017 +# - Cedric Serfon , 2014-2020 +# - Martin Barisits , 2014-2020 +# - Wen Guan , 2014-2016 +# - JoaquĆ­n Bogado , 2015-2019 +# - Thomas Beermann , 2016 +# - Joaquin Bogado , 2017 +# - Igor Mandrichenko , 2018 +# - Robert Illingworth , 2018 +# - Hannes Hansen , 2018-2019 +# - Andrew Lister , 2019 +# - Brandon White , 2019 +# - Benedikt Ziemons , 2020 import datetime import json @@ -26,19 +36,17 @@ import traceback from six import string_types - from sqlalchemy import and_, or_, func, update from sqlalchemy.exc import IntegrityError -# from sqlalchemy.sql import tuple_ from sqlalchemy.sql.expression import asc, false, true -from rucio.common.exception import RequestNotFound, RucioException, UnsupportedOperation, ConfigNotFound +from rucio.common.config import config_get_bool +from rucio.common.exception import RequestNotFound, RucioException, UnsupportedOperation from rucio.common.types import InternalAccount, InternalScope -from rucio.common.utils import generate_uuid, chunks, get_parsed_throttler_mode -from rucio.core.config import get +from rucio.common.utils import generate_uuid, chunks from rucio.core.message import add_message from rucio.core.monitor import record_counter, record_timer -from rucio.core.rse import get_rse_name, get_rse_transfer_limits, get_rse_vo +from rucio.core.rse import get_rse_name, get_rse_vo from rucio.db.sqla import models, filter_thread_work from rucio.db.sqla.constants import RequestState, RequestType, FTSState, ReplicaState, LockState, RequestErrMsg from rucio.db.sqla.session import read_session, transactional_session, stream_session @@ -126,7 +134,8 @@ def queue_requests(requests, session=None): logging.debug("queue requests") request_clause = [] - transfer_limits, rses = get_rse_transfer_limits(session=session), {} + rses = {} + preparer_enabled = config_get_bool('conveyor', 'use_preparer', raise_exception=False, default=False) for req in requests: if isinstance(req['attributes'], string_types): @@ -157,43 +166,14 @@ def queue_requests(requests, session=None): for request in query_existing_requests: existing_requests.append(request) - # Temporary disabled - source_rses = {} - # request_scopes_names = [(request['scope'], request['name']) for request in requests] - # for chunked_requests in chunks(request_scopes_names, 50): - # results = session.query(models.RSEFileAssociation.scope, models.RSEFileAssociation.name, models.RSEFileAssociation.rse_id, models.Distance.dest_rse_id, models.Distance.ranking)\ - # .filter(tuple_(models.RSEFileAssociation.scope, models.RSEFileAssociation.name).in_(chunked_requests))\ - # .join(models.Distance, models.Distance.src_rse_id == models.RSEFileAssociation.rse_id)\ - # .all() - # for result in results: - # scope = result[0] - # name = result[1] - # src_rse_id = result[2] - # dest_rse_id = result[3] - # distance = result[4] - # if scope not in source_rses: - # source_rses[scope] = {} - # if name not in source_rses[scope]: - # source_rses[scope][name] = {} - # if dest_rse_id not in source_rses[scope][name]: - # source_rses[scope][name][dest_rse_id] = {} - # if src_rse_id not in source_rses[scope][name][dest_rse_id]: - # source_rses[scope][name][dest_rse_id][src_rse_id] = distance - - try: - throttler_mode = get('throttler', 'mode', default=None, use_cache=False, session=session) - direction, all_activities = get_parsed_throttler_mode(throttler_mode) - except ConfigNotFound: - throttler_mode = None - new_requests, sources, messages = [], [], [] for request in requests: dest_rse_name = get_rse_name(rse_id=request['dest_rse_id'], session=session) if req['request_type'] == RequestType.TRANSFER and (request['scope'], request['name'], request['dest_rse_id']) in existing_requests: - logging.warn('Request TYPE %s for DID %s:%s at RSE %s exists - ignoring' % (request['request_type'], - request['scope'], - request['name'], - dest_rse_name)) + logging.warning('Request TYPE %s for DID %s:%s at RSE %s exists - ignoring' % (request['request_type'], + request['scope'], + request['name'], + dest_rse_name)) continue def temp_serializer(obj): @@ -201,38 +181,13 @@ def temp_serializer(obj): return obj.internal raise TypeError('Could not serialise object %r' % obj) - source_rse_id = request.get('source_rse_id') - if not source_rse_id: - try: - source_rses_of_request = source_rses[request['scope']][request['name']][request['dest_rse_id']] - source_rse_id = min(source_rses_of_request, key=source_rses_of_request.get) - except KeyError: - pass - activity_limit = transfer_limits.get(request['attributes']['activity'], {}) - all_activities_limit = transfer_limits.get('all_activities', {}) - limit_found = False - if throttler_mode: - if direction == 'source': - if all_activities: - if all_activities_limit.get(source_rse_id): - limit_found = True - else: - if activity_limit.get(source_rse_id): - limit_found = True - elif direction == 'destination': - if all_activities: - if all_activities_limit.get(request['dest_rse_id']): - limit_found = True - else: - if activity_limit.get(request['dest_rse_id']): - limit_found = True - request['state'] = RequestState.WAITING if limit_found else RequestState.QUEUED + request['state'] = RequestState.PREPARING if preparer_enabled else RequestState.QUEUED new_request = {'request_type': request['request_type'], 'scope': request['scope'], 'name': request['name'], 'dest_rse_id': request['dest_rse_id'], - 'source_rse_id': source_rse_id, + 'source_rse_id': request.get('source_rse_id', None), 'attributes': json.dumps(request['attributes'], default=temp_serializer), 'state': request['state'], 'rule_id': request['rule_id'], @@ -295,6 +250,7 @@ def temp_serializer(obj): for messages_chunk in chunks(messages, 1000): session.bulk_insert_mappings(models.Message, messages_chunk) + return new_requests diff --git a/lib/rucio/core/transfer.py b/lib/rucio/core/transfer.py index 587b4a0707..e9400fa153 100644 --- a/lib/rucio/core/transfer.py +++ b/lib/rucio/core/transfer.py @@ -32,9 +32,7 @@ # - Eli Chadwick , 2020 # - Nick Smith , 2020 # - Patrick Austin , 2020 -# -# PY3K COMPATIBLE - +# - Benedikt Ziemons , 2020 from __future__ import division @@ -54,14 +52,14 @@ from sqlalchemy.sql.expression import false from rucio.common import constants +from rucio.common.config import config_get +from rucio.common.constants import SUPPORTED_PROTOCOLS from rucio.common.exception import (InvalidRSEExpression, NoDistance, RequestNotFound, RSEProtocolNotSupported, RucioException, UnsupportedOperation) -from rucio.common.config import config_get from rucio.common.rse_attributes import get_rse_attributes from rucio.common.types import InternalAccount from rucio.common.utils import construct_surl -from rucio.common.constants import SUPPORTED_PROTOCOLS from rucio.core import did, message as message_core, request as request_core from rucio.core.config import get as core_config_get from rucio.core.monitor import record_counter, record_timer @@ -76,7 +74,6 @@ from rucio.rse import rsemanager as rsemgr from rucio.transfertool.fts3 import FTS3Transfertool - # Extra modules: Only imported if available EXTRA_MODULES = {'globus_sdk': False} @@ -650,6 +647,7 @@ def get_transfer_requests_and_source_replicas(total_workers=0, worker_number=0, activity=activity, older_than=older_than, rses=rses, + request_state=RequestState.QUEUED, session=session) unavailable_read_rse_ids = __get_unavailable_rse_ids(operation='read', session=session) @@ -1262,8 +1260,8 @@ def get_transfer_requests_and_source_replicas(total_workers=0, worker_number=0, @read_session -def __list_transfer_requests_and_source_replicas(total_workers=0, worker_number=0, - limit=None, activity=None, older_than=None, rses=None, session=None): +def __list_transfer_requests_and_source_replicas(total_workers=0, worker_number=0, limit=None, activity=None, + older_than=None, rses=None, request_state=None, session=None): """ List requests with source replicas :param total_workers: Number of total workers. @@ -1275,6 +1273,10 @@ def __list_transfer_requests_and_source_replicas(total_workers=0, worker_number= :param session: Database session to use. :returns: List. """ + + if request_state is None: + request_state = RequestState.QUEUED + sub_requests = session.query(models.Request.id, models.Request.rule_id, models.Request.scope, @@ -1287,12 +1289,12 @@ def __list_transfer_requests_and_source_replicas(total_workers=0, worker_number= models.Request.previous_attempt_id, models.Request.dest_rse_id, models.Request.retry_count, - models.Request.account)\ - .with_hint(models.Request, "INDEX(REQUESTS REQUESTS_TYP_STA_UPD_IDX)", 'oracle')\ - .filter(models.Request.state == RequestState.QUEUED)\ - .filter(models.Request.request_type == RequestType.TRANSFER)\ - .join(models.RSE, models.RSE.id == models.Request.dest_rse_id)\ - .filter(models.RSE.deleted == false())\ + models.Request.account) \ + .with_hint(models.Request, "INDEX(REQUESTS REQUESTS_TYP_STA_UPD_IDX)", 'oracle') \ + .filter(models.Request.state == request_state) \ + .filter(models.Request.request_type == RequestType.TRANSFER) \ + .join(models.RSE, models.RSE.id == models.Request.dest_rse_id) \ + .filter(models.RSE.deleted == false()) \ .filter(models.RSE.availability.in_((2, 3, 6, 7))) if isinstance(older_than, datetime.datetime): @@ -1328,19 +1330,19 @@ def __list_transfer_requests_and_source_replicas(total_workers=0, worker_number= sub_requests.c.retry_count, models.Source.url, models.Source.ranking, - models.Distance.ranking)\ + models.Distance.ranking) \ .outerjoin(models.RSEFileAssociation, and_(sub_requests.c.scope == models.RSEFileAssociation.scope, sub_requests.c.name == models.RSEFileAssociation.name, models.RSEFileAssociation.state == ReplicaState.AVAILABLE, - sub_requests.c.dest_rse_id != models.RSEFileAssociation.rse_id))\ - .with_hint(models.RSEFileAssociation, "+ index(replicas REPLICAS_PK)", 'oracle')\ - .outerjoin(models.RSE, and_(models.RSE.id == models.RSEFileAssociation.rse_id, - models.RSE.deleted == false()))\ + sub_requests.c.dest_rse_id != models.RSEFileAssociation.rse_id)) \ + .with_hint(models.RSEFileAssociation, "+ index(replicas REPLICAS_PK)", 'oracle') \ + .join(models.RSE, and_(models.RSE.id == models.RSEFileAssociation.rse_id, + models.RSE.deleted == false())) \ .outerjoin(models.Source, and_(sub_requests.c.id == models.Source.request_id, - models.RSE.id == models.Source.rse_id))\ - .with_hint(models.Source, "+ index(sources SOURCES_PK)", 'oracle')\ - .outerjoin(models.Distance, and_(sub_requests.c.dest_rse_id == models.Distance.dest_rse_id, - models.RSEFileAssociation.rse_id == models.Distance.src_rse_id))\ + models.RSE.id == models.Source.rse_id)) \ + .with_hint(models.Source, "+ index(sources SOURCES_PK)", 'oracle') \ + .join(models.Distance, and_(sub_requests.c.dest_rse_id == models.Distance.dest_rse_id, + models.RSEFileAssociation.rse_id == models.Distance.src_rse_id)) \ .with_hint(models.Distance, "+ index(distances DISTANCES_PK)", 'oracle') if rses: diff --git a/lib/rucio/daemons/conveyor/preparer.py b/lib/rucio/daemons/conveyor/preparer.py new file mode 100644 index 0000000000..a999be0a6d --- /dev/null +++ b/lib/rucio/daemons/conveyor/preparer.py @@ -0,0 +1,154 @@ +# -*- coding: utf-8 -*- +# Copyright 2020 CERN +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Authors: +# - Benedikt Ziemons , 2020 + +import logging +import sys +import threading +from time import time + +import rucio.db.sqla.util +from rucio.common import exception +from rucio.common.config import config_get +from rucio.common.exception import ConfigNotFound +from rucio.common.utils import get_parsed_throttler_mode, chunks +from rucio.core.config import get +from rucio.core.rse import get_rse_transfer_limits +from rucio.core.transfer import __list_transfer_requests_and_source_replicas +from rucio.db.sqla import models +from rucio.db.sqla.constants import RequestState +from rucio.db.sqla.session import transactional_session + +graceful_stop = threading.Event() + + +def stop(): + """ + Graceful exit. + """ + + graceful_stop.set() + + +def run(once=False, sleep_time=60): + """ + Running the preparer daemon either once or by default in a loop until stop is called. + """ + config_loglevel = config_get('common', 'loglevel', raise_exception=False, default='DEBUG').upper() + logging.basicConfig(stream=sys.stdout, + level=getattr(logging, config_loglevel), + format='%(asctime)s\t%(process)d\t%(levelname)s\t%(message)s') + + if rucio.db.sqla.util.is_old_db(): + raise exception.DatabaseException('Database was not updated, daemon won\'t start') + + graceful_stop.wait(10) + while not graceful_stop.is_set(): + start_time = time() + + run_once() + + if once: + return + + end_time = time() + time_diff = end_time - start_time + if time_diff < sleep_time: + logging.info('Sleeping for a while : %.2f seconds', (sleep_time - time_diff)) + graceful_stop.wait(sleep_time - time_diff) + + +@transactional_session +def run_once(chunk_size=50, session=None): + req_sources = __list_transfer_requests_and_source_replicas(request_state=RequestState.PREPARING, session=session) + if not req_sources: + return + + # important column indices from __list_transfer_requests_and_source_replicas + request_id_col = 0 + activity_col = 7 + dest_rse_id_col = 10 + source_rse_id_col = 12 + distance_col = 20 + + # sort by Request.id, should be pretty quick since the database should return it this way (tim sort) + req_sources = sorted(req_sources, key=lambda t: t[request_id_col]) + + def minimum_distance_request(): + # req_sources must be checked at this point, see above + cur_request_id = req_sources[0][request_id_col] + shortest_item = req_sources[0] + for idx in range(1, len(req_sources)): + if cur_request_id != req_sources[idx][request_id_col]: + yield shortest_item + cur_request_id = req_sources[idx][request_id_col] + shortest_item = req_sources[idx] + elif req_sources[idx][distance_col] < shortest_item[distance_col]: + shortest_item = req_sources[idx] + yield shortest_item + + req_sources = list(minimum_distance_request()) + + for chunk in chunks(req_sources, chunk_size): + # from rucio.core.transfer.get_transfer_requests_and_source_replicas + for req_source in chunk: + new_state = _prepare_request( + session=session, + activity=req_source[activity_col], + source_rse_id=req_source[source_rse_id_col], + dest_rse_id=req_source[dest_rse_id_col], + ) + session.query(models.Request).filter_by(id=req_source[request_id_col]).update({ + models.Request.state: new_state, + models.Request.source_rse_id: req_source[source_rse_id_col], + }, synchronize_session=False) + session.commit() + + +def _prepare_request(session, activity, source_rse_id, dest_rse_id) -> RequestState: + """ + Takes request attributes to return a new state for the request + based on throttler settings. Always returns QUEUED, + if the throttler mode is not set. + """ + try: + throttler_mode = get('throttler', 'mode', default=None, use_cache=False, session=session) + except ConfigNotFound: + throttler_mode = None + + limit_found = False + if throttler_mode: + transfer_limits = get_rse_transfer_limits(session=session) + activity_limit = transfer_limits.get(activity, {}) + all_activities_limit = transfer_limits.get('all_activities', {}) + direction, all_activities = get_parsed_throttler_mode(throttler_mode) + if direction == 'source': + if all_activities: + if all_activities_limit.get(source_rse_id): + limit_found = True + else: + if activity_limit.get(source_rse_id): + limit_found = True + elif direction == 'destination': + if all_activities: + if all_activities_limit.get(dest_rse_id): + limit_found = True + else: + if activity_limit.get(dest_rse_id): + limit_found = True + + return RequestState.WAITING if limit_found else RequestState.QUEUED diff --git a/lib/rucio/db/sqla/constants.py b/lib/rucio/db/sqla/constants.py index ff55b36f26..e28063c4cc 100644 --- a/lib/rucio/db/sqla/constants.py +++ b/lib/rucio/db/sqla/constants.py @@ -1,19 +1,27 @@ -# Copyright European Organization for Nuclear Research (CERN) +# -*- coding: utf-8 -*- +# Copyright 2015-2020 CERN # # Licensed under the Apache License, Version 2.0 (the "License"); -# You may not use this file except in compliance with the License. -# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. # # Authors: -# - Vincent Garonne, , 2013-2017 -# - Mario Lassnig, , 2014, 2017 -# - Martin Barisits, , 2014-2019 -# - Cedric Serfon, , 2015-2018 -# - Wen Guan, , 2016 +# - Vincent Garonne , 2015-2017 +# - Wen Guan , 2015-2016 +# - Cedric Serfon , 2016 +# - Martin Barisits , 2017-2019 +# - Hannes Hansen , 2018 # - Ruturaj Gujar , 2019 # - Jaroslav Guenther , 2019 -# -# PY3K COMPATIBLE +# - Benedikt Ziemons , 2020 """ Constants. @@ -153,6 +161,7 @@ class RequestState(DeclEnum): MISMATCH_SCHEME = 'M', 'MISMATCH_SCHEME' SUSPEND = 'U', 'SUSPEND' WAITING = 'W', 'WAITING' + PREPARING = 'P', 'PREPARING' class RequestType(DeclEnum): diff --git a/lib/rucio/db/sqla/migrate_repo/versions/d23453595260_extend_request_state_for_preparer.py b/lib/rucio/db/sqla/migrate_repo/versions/d23453595260_extend_request_state_for_preparer.py new file mode 100644 index 0000000000..9d3cc17e3f --- /dev/null +++ b/lib/rucio/db/sqla/migrate_repo/versions/d23453595260_extend_request_state_for_preparer.py @@ -0,0 +1,72 @@ +# -*- coding: utf-8 -*- +# Copyright 2020 CERN +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Authors: +# - Benedikt Ziemons , 2020 + +""" +Add PREPARING state to Request model. +""" + +from alembic import context +from alembic.op import execute, create_check_constraint, drop_constraint + +# Alembic revision identifiers +revision = 'd23453595260' +down_revision = '8ea9122275b1' + + +def upgrade(): + """ + Upgrade the database to this revision + """ + + schema = context.get_context().version_table_schema + '.' if context.get_context().version_table_schema else '' + create_check = False + + if context.get_context().dialect.name in ['oracle', 'postgresql']: + drop_constraint('REQUESTS_STATE_CHK', 'requests', type_='check') + create_check = True + elif context.get_context().dialect.name == 'mysql' and context.get_context().dialect.server_version_info[0] == 5: + create_check = True + elif context.get_context().dialect.name == 'mysql' and context.get_context().dialect.server_version_info[0] == 8: + execute('ALTER TABLE ' + schema + 'requests DROP CHECK REQUESTS_STATE_CHK') + create_check = True + + if create_check: + create_check_constraint(constraint_name='REQUESTS_STATE_CHK', table_name='requests', + condition="state in ('Q', 'G', 'S', 'D', 'F', 'L', 'N', 'O', 'A', 'U', 'W', 'M', 'P')") + + +def downgrade(): + """ + Downgrade the database to the previous revision + """ + + schema = context.get_context().version_table_schema + '.' if context.get_context().version_table_schema else '' + create_check = False + + if context.get_context().dialect.name in ['oracle', 'postgresql']: + drop_constraint('REQUESTS_STATE_CHK', 'requests', type_='check') + create_check = True + elif context.get_context().dialect.name == 'mysql' and context.get_context().dialect.server_version_info[0] == 5: + create_check = True + elif context.get_context().dialect.name == 'mysql' and context.get_context().dialect.server_version_info[0] == 8: + execute('ALTER TABLE ' + schema + 'requests DROP CHECK REQUESTS_STATE_CHK') + create_check = True + + if create_check: + create_check_constraint(constraint_name='REQUESTS_STATE_CHK', table_name='requests', + condition="state in ('Q', 'G', 'S', 'D', 'F', 'L', 'N', 'O', 'A', 'U', 'W', 'M')") diff --git a/lib/rucio/tests/test_daemons.py b/lib/rucio/tests/test_daemons.py index 34eae15425..2cf012e324 100644 --- a/lib/rucio/tests/test_daemons.py +++ b/lib/rucio/tests/test_daemons.py @@ -15,8 +15,6 @@ # # Authors: # - Benedikt Ziemons , 2020 -# -# PY3K COMPATIBLE import sys @@ -30,7 +28,7 @@ from rucio.daemons.badreplicas import minos, minos_temporary_expiration, necromancer from rucio.daemons.c3po import c3po from rucio.daemons.cache import consumer -from rucio.daemons.conveyor import finisher, fts_throttler, poller, poller_latest, receiver, stager, submitter, throttler +from rucio.daemons.conveyor import finisher, fts_throttler, poller, poller_latest, receiver, stager, submitter, throttler, preparer from rucio.daemons.follower import follower from rucio.daemons.hermes import hermes, hermes2 from rucio.daemons.judge import cleaner, evaluator, injector, repairer @@ -67,6 +65,7 @@ stager, submitter, throttler, + preparer, follower, hermes, hermes2, diff --git a/lib/rucio/tests/test_preparer.py b/lib/rucio/tests/test_preparer.py new file mode 100644 index 0000000000..7d0b228ced --- /dev/null +++ b/lib/rucio/tests/test_preparer.py @@ -0,0 +1,162 @@ +# -*- coding: utf-8 -*- +# Copyright 2020 CERN +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Authors: +# - Benedikt Ziemons , 2020 + +import pytest + +from rucio.common.types import InternalScope, InternalAccount +from rucio.common.utils import generate_uuid +from rucio.core import config +from rucio.core.distance import get_distances, add_distance +from rucio.core.replica import add_replicas, delete_replicas +from rucio.core.rse import get_rse_id, set_rse_transfer_limits, add_rse, del_rse +from rucio.core.transfer import __list_transfer_requests_and_source_replicas +from rucio.daemons.conveyor import preparer +from rucio.db.sqla import models, session +from rucio.db.sqla.constants import RequestState +from rucio.tests.common import rse_name_generator + + +@pytest.fixture(scope='module') +def dest_rse(vo): + dest_rse = 'MOCK' + dest_rse_id = get_rse_id(dest_rse, vo=vo) + return {'name': dest_rse, 'id': dest_rse_id} + + +def generate_rse(vo='def'): + rse_name = f'MOCK-{rse_name_generator()}' + rse_id = add_rse(rse_name, vo=vo) + return {'name': rse_name, 'id': rse_id} + + +@pytest.fixture +def source_rse(vo, dest_rse): + rse = generate_rse(vo=vo) + add_distance(rse['id'], dest_rse['id'], ranking=5) + yield rse + del_rse(rse['id']) + + +@pytest.fixture +def file(vo): + scope = InternalScope(scope='mock', vo=vo) + name = generate_uuid() + return {'scope': scope, 'name': name, 'bytes': 1, 'adler32': 'deadbeef'} + + +@pytest.fixture +def mock_request(vo, source_rse, dest_rse, file): + db_session = session.get_session() + account = InternalAccount('root', vo=vo) + + add_replicas(rse_id=source_rse['id'], files=[file], account=account, session=db_session) + + request = models.Request(state=RequestState.PREPARING, scope=file['scope'], name=file['name'], dest_rse_id=dest_rse['id'], account=account) + request.save(session=db_session) + db_session.commit() + + yield request + + request.delete(session=db_session) + delete_replicas(rse_id=source_rse['id'], files=[file], session=db_session) + db_session.commit() + + +@pytest.fixture +def dest_throttler(mock_request): + db_session = session.get_session() + config.set('throttler', 'mode', 'DEST_PER_ACT', session=db_session) + set_rse_transfer_limits(mock_request.dest_rse_id, activity=mock_request.activity, max_transfers=1, strategy='fifo', session=db_session) + db_session.commit() + yield + db_session.query(models.RSETransferLimit).filter_by(rse_id=mock_request.dest_rse_id).delete() + config.remove_option('throttler', 'mode', session=db_session) + db_session.commit() + + +def test_listing_preparing_transfers(mock_request): + req_sources = __list_transfer_requests_and_source_replicas(request_state=RequestState.PREPARING) + + assert len(req_sources) == 1 + req_id = req_sources[0][0] + assert req_id == mock_request.id + + +@pytest.mark.usefixtures('dest_throttler') +def test_preparer_setting_request_state_waiting(mock_request): + db_session = session.get_session() + preparer.run_once(session=db_session) + + updated_mock_request = db_session.query(models.Request).filter_by(id=mock_request.id).one() # type: models.Request + + assert updated_mock_request.state == RequestState.WAITING + + +def test_preparer_setting_request_state_queued(mock_request): + db_session = session.get_session() + preparer.run_once(session=db_session) + + updated_mock_request = db_session.query(models.Request).filter_by(id=mock_request.id).one() # type: models.Request + + assert updated_mock_request.state == RequestState.QUEUED + + +def test_preparer_setting_request_source(vo, source_rse, mock_request): + db_session = session.get_session() + + preparer.run_once(session=db_session) + + updated_mock_request = db_session.query(models.Request).filter_by(id=mock_request.id).one() # type: models.Request + + assert updated_mock_request.state == RequestState.QUEUED + assert updated_mock_request.source_rse_id == source_rse['id'] + + +@pytest.fixture +def source2_rse(vo, dest_rse): + rse = generate_rse(vo=vo) + add_distance(rse['id'], dest_rse['id'], ranking=2) + yield rse + del_rse(rse['id']) + + +def test_two_sources_one_destination(vo, file, source_rse, source2_rse, mock_request): + db_session = session.get_session() + + add_replicas(rse_id=source2_rse['id'], files=[file], account=mock_request.account, session=db_session) + try: + src1_distance, src2_distance = (get_distances( + src_rse_id=src_rse, + dest_rse_id=mock_request.dest_rse_id, + session=db_session + ) for src_rse in (source_rse['id'], source2_rse['id'])) + db_session.commit() + + assert src1_distance and len(src1_distance) == 1 and src1_distance[0]['ranking'] == 5 + assert src2_distance and len(src2_distance) == 1 and src2_distance[0]['ranking'] == 2 + + preparer.run_once(session=db_session) + + updated_mock_request = db_session.query(models.Request).filter_by(id=mock_request.id).one() # type: models.Request + + assert updated_mock_request.state == RequestState.QUEUED + assert updated_mock_request.source_rse_id == source2_rse['id'] # distance 2 < 5 + + finally: + delete_replicas(rse_id=source2_rse['id'], files=[file], session=db_session) + db_session.commit() diff --git a/lib/rucio/tests/test_request.py b/lib/rucio/tests/test_request.py index 5efb978d97..d8701ddf67 100644 --- a/lib/rucio/tests/test_request.py +++ b/lib/rucio/tests/test_request.py @@ -20,8 +20,6 @@ # - Eli Chadwick , 2020 # - Patrick Austin , 2020 # - Benedikt Ziemons , 2020 -# -# PY3K COMPATIBLE import os import unittest @@ -32,11 +30,8 @@ from rucio.common.config import config_get, config_get_bool from rucio.common.types import InternalAccount, InternalScope from rucio.common.utils import generate_uuid, parse_response -from rucio.core.config import set as config_set -from rucio.core.did import attach_dids, add_did from rucio.core.replica import add_replica -from rucio.core.request import release_all_waiting_requests, queue_requests, get_request_by_did, release_waiting_requests_per_free_volume, \ - release_waiting_requests_grouped_fifo, release_waiting_requests_fifo, list_requests, release_waiting_requests_per_deadline +from rucio.core.request import queue_requests, get_request_by_did, list_requests from rucio.core.rse import get_rse_id, set_rse_transfer_limits, add_rse_attribute from rucio.db.sqla import session, models, constants from rucio.tests.common import vohdr, hdrdict, headers, auth @@ -79,8 +74,8 @@ def setUp(self): def tearDown(self): self.db_session.commit() - def test_queue_requests_state_no_throttler(self): - """ REQUEST (CORE): queue requests with default throttler mode (None). """ + def test_queue_requests_state(self): + """ REQUEST (CORE): test queuing requests """ name = generate_uuid() name2 = generate_uuid() name3 = generate_uuid() @@ -150,1094 +145,6 @@ def test_queue_requests_state_no_throttler(self): request = get_request_by_did(self.scope, name3, self.dest_rse_id2, session=self.db_session) assert request['state'] == constants.RequestState.QUEUED - # def test_queue_requests_source_rse(self): - # """ REQUEST (CORE): queue requests and select correct source RSE. """ - # # test correct selection of source RSE - # name = generate_uuid() - # size = 8 - # add_replica(self.source_rse_id, self.scope, name, size, self.account, session=self.db_session) - # add_replica(self.source_rse_id2, self.scope, name, size, self.account, session=self.db_session) - # add_distance(self.source_rse_id, self.dest_rse_id, 1, session=self.db_session) - # add_distance(self.source_rse_id2, self.dest_rse_id, 2, session=self.db_session) - # requests = [{ - # 'dest_rse_id': self.dest_rse_id, - # 'request_type': constants.RequestType.TRANSFER, - # 'request_id': generate_uuid(), - # 'name': name, - # 'scope': self.scope, - # 'rule_id': generate_uuid(), - # 'retry_count': 1, - # 'requested_at': datetime.now().replace(year=2015), - # 'attributes': { - # 'activity': self.user_activity, - # 'bytes': size, - # 'md5': '', - # 'adler32': '' - # } - # }] - # queue_requests(requests, session=self.db_session) - # request = get_request_by_did(self.scope, name, self.dest_rse_id, session=self.db_session) - # # select source RSE with smallest distance - # assert request['source_rse_id'] == self.source_rse_id - # assert request['name'] == name - # assert request['scope'] == self.scope - # assert request['state'] == constants.RequestState.QUEUED - - def test_queue_requests_state_1(self): - """ REQUEST (CORE): queue requests and set correct request state. """ - # test correct request state depending on throttler mode - config_set('throttler', 'mode', 'DEST_PER_ACT', session=self.db_session) - set_rse_transfer_limits(self.dest_rse_id, self.user_activity, max_transfers=1, session=self.db_session) - size = 1 - name = generate_uuid() - add_replica(self.source_rse_id2, self.scope, name, size, self.account, session=self.db_session) - name2 = generate_uuid() - add_replica(self.source_rse_id2, self.scope, name2, size, self.account, session=self.db_session) - requests = [{ - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'requested_at': datetime.now().replace(year=2015), - 'attributes': { - 'activity': self.user_activity, - 'bytes': size, - 'md5': '', - 'adler32': '' - } - }, { - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name2, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'requested_at': datetime.now().replace(year=2015), - 'attributes': { - 'activity': 'Activity without limit', - 'bytes': size, - 'md5': '', - 'adler32': '' - } - }] - queue_requests(requests, session=self.db_session) - request = get_request_by_did(self.scope, name, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.WAITING - request = get_request_by_did(self.scope, name2, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.QUEUED - - def test_queue_requests_state_2(self): - """ REQUEST (CORE): queue requests and set correct request state. """ - config_set('throttler', 'mode', 'SRC_PER_ACT', session=self.db_session) - size = 1 - name = generate_uuid() - add_replica(self.source_rse_id2, self.scope, name, size, self.account, session=self.db_session) - name2 = generate_uuid() - add_replica(self.source_rse_id2, self.scope, name2, size, self.account, session=self.db_session) - requests = [{ - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'requested_at': datetime.now().replace(year=2015), - 'attributes': { - 'activity': self.user_activity, - 'bytes': size, - 'md5': '', - 'adler32': '' - } - }] - queue_requests(requests, session=self.db_session) - request = get_request_by_did(self.scope, name, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.QUEUED - - -class TestRequestCoreRelease(unittest.TestCase): - - @classmethod - def setUpClass(cls): - if config_get_bool('common', 'multi_vo', raise_exception=False, default=False): - cls.vo = {'vo': config_get('client', 'vo', raise_exception=False, default='tst')} - else: - cls.vo = {} - - cls.db_session = session.get_session() - cls.dialect = cls.db_session.bind.dialect.name - cls.dest_rse = 'MOCK' - cls.source_rse = 'MOCK4' - cls.source_rse2 = 'MOCK5' - cls.dest_rse_id = get_rse_id(cls.dest_rse, **cls.vo) - cls.source_rse_id = get_rse_id(cls.source_rse, **cls.vo) - cls.source_rse_id2 = get_rse_id(cls.source_rse2, **cls.vo) - cls.scope = InternalScope('mock', **cls.vo) - cls.account = InternalAccount('root', **cls.vo) - cls.user_activity = 'User Subscription' - cls.all_activities = 'all_activities' - - def setUp(self): - self.db_session.query(models.Request).delete() - self.db_session.query(models.RSETransferLimit).delete() - self.db_session.query(models.Distance).delete() - self.db_session.query(models.Config).delete() - # set transfer limits to put requests in waiting state - set_rse_transfer_limits(self.dest_rse_id, self.user_activity, max_transfers=1, session=self.db_session) - set_rse_transfer_limits(self.dest_rse_id, self.all_activities, max_transfers=1, session=self.db_session) - set_rse_transfer_limits(self.dest_rse_id, 'ignore', max_transfers=1, session=self.db_session) - config_set('throttler', 'mode', 'DEST_PER_ACT', session=self.db_session) - self.db_session.commit() - - def tearDown(self): - self.db_session.commit() - - @skiplimitedsql - def test_release_waiting_requests_per_free_volume(self): - """ REQUEST (CORE): release waiting requests that fit grouped in available volume.""" - # release unattached requests that fit in available volume with respect to already submitted transfers - name1 = generate_uuid() - add_replica(self.source_rse_id, self.scope, name1, 1, self.account, session=self.db_session) - name2 = generate_uuid() - add_replica(self.source_rse_id, self.scope, name2, 1, self.account, session=self.db_session) - name3 = generate_uuid() - add_replica(self.source_rse_id, self.scope, name3, 1, self.account, session=self.db_session) - request = models.Request(dest_rse_id=self.dest_rse_id, bytes=2, activity=self.all_activities, state=constants.RequestState.SUBMITTED) - request.save(session=self.db_session) - volume = 10 - set_rse_transfer_limits(self.dest_rse_id, self.all_activities, volume=volume, max_transfers=1, session=self.db_session) - requests = [{ - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name1, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'requested_at': datetime.now().replace(year=2015), - 'attributes': { - 'activity': 'User Subscription', - 'bytes': 8, - 'md5': '', - 'adler32': '' - } - }, { - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name2, - 'requested_at': datetime.now().replace(year=2020), - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'attributes': { - 'activity': 'User Subscription', - 'bytes': 2, - 'md5': '', - 'adler32': '' - } - }, { - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name3, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'requested_at': datetime.now().replace(year=2000), - 'attributes': { - 'activity': 'User Subscription', - 'bytes': 10, - 'md5': '', - 'adler32': '' - } - }] - queue_requests(requests, session=self.db_session) - release_waiting_requests_per_free_volume(self.dest_rse_id, volume=volume, session=self.db_session) - # released because small enough - request = get_request_by_did(self.scope, name1, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.QUEUED - # still waiting because requested later and to big - request = get_request_by_did(self.scope, name2, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.WAITING - # still waiting because too big - request = get_request_by_did(self.scope, name3, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.WAITING - - # release attached requests that fit together with the dataset in available volume with respect to already submitted transfers - self.db_session.query(models.Request).delete() - self.db_session.commit() - name1 = generate_uuid() - add_replica(self.source_rse_id, self.scope, name1, 1, self.account, session=self.db_session) - name2 = generate_uuid() - add_replica(self.source_rse_id, self.scope, name2, 1, self.account, session=self.db_session) - name3 = generate_uuid() - add_replica(self.source_rse_id, self.scope, name3, 1, self.account, session=self.db_session) - name4 = generate_uuid() - add_replica(self.source_rse_id, self.scope, name4, 1, self.account, session=self.db_session) - dataset1_name = generate_uuid() - add_did(self.scope, dataset1_name, constants.DIDType.DATASET, self.account, session=self.db_session) - attach_dids(self.scope, dataset1_name, [{'name': name1, 'scope': self.scope}, {'name': name4, 'scope': self.scope}], self.account, session=self.db_session) - dataset2_name = generate_uuid() - add_did(self.scope, dataset2_name, constants.DIDType.DATASET, self.account, session=self.db_session) - attach_dids(self.scope, dataset2_name, [{'name': name2, 'scope': self.scope}, {'name': name3, 'scope': self.scope}], self.account, session=self.db_session) - request = models.Request(dest_rse_id=self.dest_rse_id, bytes=2, activity=self.all_activities, state=constants.RequestState.SUBMITTED) - request.save(session=self.db_session) - volume = 10 - set_rse_transfer_limits(self.dest_rse_id, self.all_activities, volume=volume, max_transfers=1, session=self.db_session) - requests = [{ - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name1, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'requested_at': datetime.now().replace(year=2015), - 'attributes': { - 'activity': self.user_activity, - 'bytes': 6, - 'md5': '', - 'adler32': '' - } - }, { - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name2, - 'requested_at': datetime.now().replace(year=2020), - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'attributes': { - 'activity': self.user_activity, - 'bytes': 2, - 'md5': '', - 'adler32': '' - } - }, { - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name3, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'requested_at': datetime.now().replace(year=2000), - 'attributes': { - 'activity': self.user_activity, - 'bytes': 10, - 'md5': '', - 'adler32': '' - } - }, { - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name4, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'requested_at': datetime.now().replace(year=2030), - 'attributes': { - 'activity': self.user_activity, - 'bytes': 2, - 'md5': '', - 'adler32': '' - } - }] - queue_requests(requests, session=self.db_session) - release_waiting_requests_per_free_volume(self.dest_rse_id, volume=volume, session=self.db_session) - # released because dataset fits in volume - request = get_request_by_did(self.scope, name1, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.QUEUED - request = get_request_by_did(self.scope, name4, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.QUEUED - # waiting because dataset is too big - request = get_request_by_did(self.scope, name2, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.WAITING - request = get_request_by_did(self.scope, name3, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.WAITING - - # release requests with no available volume -> release nothing - self.db_session.query(models.Request).delete() - self.db_session.commit() - name1 = generate_uuid() - add_replica(self.dest_rse_id, self.scope, name1, 1, self.account, session=self.db_session) - volume = 0 - set_rse_transfer_limits(self.dest_rse_id, self.all_activities, volume=volume, max_transfers=1, session=self.db_session) - requests = [{ - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name1, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'requested_at': datetime.now().replace(year=2015), - 'attributes': { - 'activity': self.user_activity, - 'bytes': 8, - 'md5': '', - 'adler32': '' - } - }] - queue_requests(requests, session=self.db_session) - release_waiting_requests_per_free_volume(self.dest_rse_id, volume=volume, session=self.db_session) - # waiting because no available volume - request = get_request_by_did(self.scope, name1, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.WAITING - - @skiplimitedsql - def test_release_waiting_requests_grouped_fifo(self): - """ REQUEST (CORE): release waiting requests based on grouped FIFO. """ - # set volume and deadline to 0 to check first without releasing extra requests - set_rse_transfer_limits(self.dest_rse_id, self.all_activities, volume=0, max_transfers=1, session=self.db_session) - - # one request with an unattached DID -> one request should be released - self.db_session.query(models.Request).delete() - self.db_session.commit() - name = generate_uuid() - add_replica(self.source_rse_id, self.scope, name, 1, self.account, session=self.db_session) - requests = [{ - 'source_rse_id': self.source_rse_id, - 'dest_rse_id': self.dest_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }] - queue_requests(requests, session=self.db_session) - release_waiting_requests_grouped_fifo(self.dest_rse_id, count=1, volume=0, deadline=0, session=self.db_session) - request = get_request_by_did(self.scope, name, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.QUEUED - - # one request with an attached DID -> one request should be released - self.db_session.query(models.Request).delete() - self.db_session.commit() - name = generate_uuid() - dataset_name = generate_uuid() - add_replica(self.source_rse_id, self.scope, name, 1, self.account, session=self.db_session) - add_did(self.scope, dataset_name, constants.DIDType.DATASET, self.account, session=self.db_session) - attach_dids(self.scope, dataset_name, [{'name': name, 'scope': self.scope}], self.account, session=self.db_session) - requests = [{ - 'source_rse_id': self.source_rse_id, - 'dest_rse_id': self.dest_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'scope': self.scope, - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }] - queue_requests(requests, session=self.db_session) - release_waiting_requests_grouped_fifo(self.dest_rse_id, count=1, volume=0, deadline=0, session=self.db_session) - request = get_request_by_did(self.scope, name, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.QUEUED - - # five requests with different requested_at and multiple attachments per collection -> release only one request -> two requests of one collection should be released - self.db_session.query(models.Request).delete() - self.db_session.commit() - name1 = generate_uuid() - name2 = generate_uuid() - name3 = generate_uuid() - name4 = generate_uuid() - name5 = generate_uuid() - dataset_1_name = generate_uuid() - add_did(self.scope, dataset_1_name, constants.DIDType.DATASET, self.account, session=self.db_session) - dataset_2_name = generate_uuid() - add_did(self.scope, dataset_2_name, constants.DIDType.DATASET, self.account, session=self.db_session) - add_replica(self.source_rse_id, self.scope, name1, 1, self.account, session=self.db_session) - add_replica(self.source_rse_id, self.scope, name2, 1, self.account, session=self.db_session) - add_replica(self.source_rse_id, self.scope, name3, 1, self.account, session=self.db_session) - add_replica(self.source_rse_id, self.scope, name4, 1, self.account, session=self.db_session) - add_replica(self.source_rse_id, self.scope, name5, 1, self.account, session=self.db_session) - attach_dids(self.scope, dataset_1_name, [{'name': name1, 'scope': self.scope}, {'name': name2, 'scope': self.scope}], self.account, session=self.db_session) - attach_dids(self.scope, dataset_2_name, [{'name': name3, 'scope': self.scope}, {'name': name4, 'scope': self.scope}], self.account, session=self.db_session) - - requests = [{ - 'source_rse_id': self.source_rse_id, - 'dest_rse_id': self.dest_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name1, - 'scope': self.scope, - 'retry_count': 1, - 'rule_id': generate_uuid(), - 'requested_at': datetime.now().replace(year=2000), - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }, { - 'source_rse_id': self.source_rse_id, - 'dest_rse_id': self.dest_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name2, - 'requested_at': datetime.now().replace(year=2020), - 'rule_id': generate_uuid(), - 'scope': self.scope, - 'retry_count': 1, - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }, { - 'source_rse_id': self.source_rse_id, - 'dest_rse_id': self.dest_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name3, - 'requested_at': datetime.now().replace(year=2015), - 'retry_count': 1, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }, { - 'source_rse_id': self.source_rse_id, - 'dest_rse_id': self.dest_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name4, - 'requested_at': datetime.now().replace(year=2010), - 'retry_count': 1, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }, { - 'source_rse_id': self.source_rse_id, - 'dest_rse_id': self.dest_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name5, - 'retry_count': 1, - 'requested_at': datetime.now().replace(year=2018), - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }] - queue_requests(requests, session=self.db_session) - release_waiting_requests_grouped_fifo(self.dest_rse_id, count=1, deadline=0, volume=0, session=self.db_session) - request_1 = get_request_by_did(self.scope, name1, self.dest_rse_id, session=self.db_session) - assert request_1['state'] == constants.RequestState.QUEUED - request_2 = get_request_by_did(self.scope, name2, self.dest_rse_id, session=self.db_session) - assert request_2['state'] == constants.RequestState.QUEUED - request_3 = get_request_by_did(self.scope, name3, self.dest_rse_id, session=self.db_session) - assert request_3['state'] == constants.RequestState.WAITING - request_4 = get_request_by_did(self.scope, name4, self.dest_rse_id, session=self.db_session) - assert request_4['state'] == constants.RequestState.WAITING - request_5 = get_request_by_did(self.scope, name5, self.dest_rse_id, session=self.db_session) - assert request_5['state'] == constants.RequestState.WAITING - - # with maximal volume check -> release one request -> three requests should be released because of attachments and free volume space - self.db_session.query(models.Request).delete() - self.db_session.commit() - name1 = generate_uuid() - name2 = generate_uuid() - name3 = generate_uuid() - dataset_1_name = generate_uuid() - add_did(self.scope, dataset_1_name, constants.DIDType.DATASET, self.account, session=self.db_session) - add_replica(self.source_rse_id, self.scope, name1, 1, self.account, session=self.db_session) - add_replica(self.source_rse_id, self.scope, name2, 1, self.account, session=self.db_session) - add_replica(self.source_rse_id, self.scope, name3, 1, self.account, session=self.db_session) - add_replica(self.source_rse_id, self.scope, name4, 1, self.account, session=self.db_session) - attach_dids(self.scope, dataset_1_name, [{'name': name1, 'scope': self.scope}], self.account, session=self.db_session) - attach_dids(self.scope, dataset_1_name, [{'name': name2, 'scope': self.scope}], self.account, session=self.db_session) - set_rse_transfer_limits(self.dest_rse_id, self.all_activities, volume=10, max_transfers=1, session=self.db_session) - requests = [{ - 'source_rse_id': self.source_rse_id, - 'dest_rse_id': self.dest_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name1, - 'bytes': 1, - 'scope': self.scope, - 'retry_count': 1, - 'rule_id': generate_uuid(), - 'requested_at': datetime.now().replace(year=2000), - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }, { - 'source_rse_id': self.source_rse_id, - 'dest_rse_id': self.dest_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name2, - 'bytes': 2, - 'requested_at': datetime.now().replace(year=2020), - 'rule_id': generate_uuid(), - 'scope': self.scope, - 'retry_count': 1, - 'attributes': { - 'activity': self.user_activity, - 'bytes': 2, - 'md5': '', - 'adler32': '' - } - }, { - 'source_rse_id': self.source_rse_id, - 'dest_rse_id': self.dest_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name3, - 'bytes': 3, - 'requested_at': datetime.now().replace(year=2021), # requested after the request below but small enough for max_volume check - 'rule_id': generate_uuid(), - 'scope': self.scope, - 'retry_count': 1, - 'attributes': { - 'activity': self.user_activity, - 'bytes': 3, - 'md5': '', - 'adler32': '' - } - }, { - 'source_rse_id': self.source_rse_id, - 'dest_rse_id': self.dest_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name4, - 'bytes': 3000, - 'requested_at': datetime.now().replace(year=2020), - 'rule_id': generate_uuid(), - 'scope': self.scope, - 'retry_count': 1, - 'attributes': { - 'activity': self.user_activity, - 'bytes': 3000, - 'md5': '', - 'adler32': '' - } - }] - - queue_requests(requests, session=self.db_session) - amount_updated_requests = release_waiting_requests_grouped_fifo(self.dest_rse_id, count=1, deadline=0, volume=10, session=self.db_session) - assert amount_updated_requests == 3 - # released because it got requested first - request_1 = get_request_by_did(self.scope, name1, self.dest_rse_id, session=self.db_session) - assert request_1['state'] == constants.RequestState.QUEUED - # released because the DID is attached to the same dataset - request_2 = get_request_by_did(self.scope, name2, self.dest_rse_id, session=self.db_session) - assert request_2['state'] == constants.RequestState.QUEUED - # released because of available volume - request_3 = get_request_by_did(self.scope, name3, self.dest_rse_id, session=self.db_session) - assert request_3['state'] == constants.RequestState.QUEUED - # still waiting because there is no free volume - request_4 = get_request_by_did(self.scope, name4, self.dest_rse_id, session=self.db_session) - assert request_4['state'] == constants.RequestState.WAITING - - # with maximal volume check -> release one request -> two requests should be released because of attachments - self.db_session.query(models.Request).delete() - self.db_session.commit() - name1 = generate_uuid() - name2 = generate_uuid() - name3 = generate_uuid() - name4 = generate_uuid() - dataset_1_name = generate_uuid() - add_did(self.scope, dataset_1_name, constants.DIDType.DATASET, self.account, session=self.db_session) - add_replica(self.source_rse_id, self.scope, name1, 1, self.account, session=self.db_session) - add_replica(self.source_rse_id, self.scope, name2, 1, self.account, session=self.db_session) - add_replica(self.source_rse_id, self.scope, name3, 1, self.account, session=self.db_session) - add_replica(self.source_rse_id, self.scope, name4, 1, self.account, session=self.db_session) - attach_dids(self.scope, dataset_1_name, [{'name': name1, 'scope': self.scope}], self.account, session=self.db_session) - attach_dids(self.scope, dataset_1_name, [{'name': name2, 'scope': self.scope}], self.account, session=self.db_session) - set_rse_transfer_limits(self.dest_rse_id, self.all_activities, volume=5, max_transfers=1, session=self.db_session) - request = models.Request(dest_rse_id=self.dest_rse_id, bytes=2, activity=self.all_activities, state=constants.RequestState.SUBMITTED) - request.save(session=self.db_session) - requests = [{ - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name1, - 'bytes': 1, - 'scope': self.scope, - 'retry_count': 1, - 'rule_id': generate_uuid(), - 'requested_at': datetime.now().replace(year=2000), - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }, { - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name2, - 'bytes': 2, - 'requested_at': datetime.now().replace(year=2020), - 'rule_id': generate_uuid(), - 'scope': self.scope, - 'retry_count': 1, - 'attributes': { - 'activity': self.user_activity, - 'bytes': 2, - 'md5': '', - 'adler32': '' - } - }, { - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name3, - 'bytes': 1, - 'requested_at': datetime.now().replace(year=2020), - 'rule_id': generate_uuid(), - 'scope': self.scope, - 'retry_count': 1, - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }, { - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name4, - 'bytes': 1, - 'requested_at': datetime.now().replace(year=2020), - 'rule_id': generate_uuid(), - 'scope': self.scope, - 'retry_count': 1, - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }] - - queue_requests(requests, session=self.db_session) - release_waiting_requests_grouped_fifo(self.dest_rse_id, count=1, deadline=0, volume=5, session=self.db_session) - # released because it got requested first - request_1 = get_request_by_did(self.scope, name1, self.dest_rse_id, session=self.db_session) - assert request_1['state'] == constants.RequestState.QUEUED - # released because the DID is attached to the same dataset - request_2 = get_request_by_did(self.scope, name2, self.dest_rse_id, session=self.db_session) - assert request_2['state'] == constants.RequestState.QUEUED - # still waiting because there is no free volume after releasing the two requests above - request_3 = get_request_by_did(self.scope, name3, self.dest_rse_id, session=self.db_session) - assert request_3['state'] == constants.RequestState.WAITING - request_4 = get_request_by_did(self.scope, name4, self.dest_rse_id, session=self.db_session) - assert request_4['state'] == constants.RequestState.WAITING - - # with deadline check -> release 0 requests -> 1 request should be released nonetheless - self.db_session.query(models.Request).delete() - self.db_session.commit() - name1 = generate_uuid() - add_replica(self.source_rse_id, self.scope, name1, 1, self.account) - name2 = generate_uuid() - add_replica(self.source_rse_id, self.scope, name2, 1, self.account) - current_hour = datetime.now().hour - requests = [{ - 'source_rse_id': self.source_rse_id, - 'dest_rse_id': self.dest_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'requested_at': datetime.now().replace(hour=current_hour - 2), - 'request_id': generate_uuid(), - 'name': name1, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }, { - 'source_rse_id': self.source_rse_id, - 'dest_rse_id': self.dest_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'requested_at': datetime.now(), - 'request_id': generate_uuid(), - 'name': name2, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }] - queue_requests(requests, session=self.db_session) - request = get_request_by_did(self.scope, name1, self.dest_rse_id, session=self.db_session) - release_waiting_requests_grouped_fifo(self.source_rse_id, count=0, deadline=1, volume=0, session=self.db_session) - # queued because of deadline - request = get_request_by_did(self.scope, name1, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.QUEUED - # waiting because count=0 - request = get_request_by_did(self.scope, name2, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.WAITING - - def test_release_waiting_requests_fifo(self): - """ REQUEST (CORE): release waiting requests based on FIFO. """ - # without account and activity check - # two requests -> release one request -> request with oldest requested_at date should be released - name1 = generate_uuid() - name2 = generate_uuid() - add_replica(self.source_rse_id, self.scope, name1, 1, self.account, session=self.db_session) - add_replica(self.source_rse_id, self.scope, name2, 1, self.account, session=self.db_session) - requests = [{ - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name1, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'requested_at': datetime.now().replace(year=2018), - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }, { - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'requested_at': datetime.now().replace(year=2020), - 'name': name2, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }] - queue_requests(requests, session=self.db_session) - release_waiting_requests_fifo(self.dest_rse_id, count=1, session=self.db_session) - request = get_request_by_did(self.scope, name1, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.QUEUED - request2 = get_request_by_did(self.scope, name2, self.dest_rse_id, session=self.db_session) - assert request2['state'] == constants.RequestState.WAITING - - # with activity and account check - # two requests -> release two request -> requests with correct account and activity should be released - self.db_session.query(models.Request).delete() - self.db_session.commit() - name1 = generate_uuid() - name2 = generate_uuid() - name3 = generate_uuid() - name4 = generate_uuid() - add_replica(self.source_rse_id, self.scope, name1, 1, self.account, session=self.db_session) - add_replica(self.source_rse_id, self.scope, name2, 1, self.account, session=self.db_session) - add_replica(self.source_rse_id, self.scope, name3, 1, self.account, session=self.db_session) - add_replica(self.source_rse_id, self.scope, name4, 1, self.account, session=self.db_session) - requests = [{ - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name1, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'account': self.account, - 'requested_at': datetime.now().replace(year=2018), - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }, { - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'requested_at': datetime.now().replace(year=2020), - 'name': name2, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'account': self.account, - 'attributes': { - 'activity': 'ignore', - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }, { - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'requested_at': datetime.now().replace(year=2020), - 'name': name3, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'account': InternalAccount('jdoe', **self.vo), - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }, { - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'requested_at': datetime.now().replace(year=2020), # requested latest but account and activity are correct - 'name': name4, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'account': self.account, - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }] - queue_requests(requests, session=self.db_session) - release_waiting_requests_fifo(self.dest_rse_id, count=2, account=self.account, activity=self.user_activity, session=self.db_session) - request = get_request_by_did(self.scope, name1, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.QUEUED - request = get_request_by_did(self.scope, name2, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.WAITING - request = get_request_by_did(self.scope, name3, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.WAITING - request = get_request_by_did(self.scope, name4, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.QUEUED - - def test_release_waiting_requests_all(self): - """ REQUEST (CORE): release all waiting requests. """ - name1 = generate_uuid() - name2 = generate_uuid() - add_replica(self.source_rse_id, self.scope, name1, 1, self.account, session=self.db_session) - add_replica(self.source_rse_id, self.scope, name2, 1, self.account, session=self.db_session) - requests = [{ - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name1, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'requested_at': datetime.now().replace(year=2018), - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }, { - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'requested_at': datetime.now().replace(year=2020), - 'name': name2, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }] - queue_requests(requests, session=self.db_session) - release_all_waiting_requests(self.dest_rse_id, session=self.db_session) - request = get_request_by_did(self.scope, name1, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.QUEUED - request = get_request_by_did(self.scope, name2, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.QUEUED - - @skiplimitedsql - def test_release_waiting_requests_per_deadline(self): - """ REQUEST (CORE): release grouped waiting requests that exceeded waiting time.""" - # a request that exceeded the maximal waiting time to be released (1 hour) -> release one request -> only the exceeded request should be released - set_rse_transfer_limits(self.source_rse_id, activity=self.all_activities, strategy='grouped_fifo', session=self.db_session) - name1 = generate_uuid() - add_replica(self.source_rse_id, self.scope, name1, 1, self.account, session=self.db_session) - name2 = generate_uuid() - add_replica(self.source_rse_id, self.scope, name2, 1, self.account, session=self.db_session) - current_hour = datetime.now().hour - requests = [{ - 'source_rse_id': self.source_rse_id, - 'dest_rse_id': self.dest_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'requested_at': datetime.now().replace(hour=current_hour - 2), - 'request_id': generate_uuid(), - 'name': name1, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }, { - 'source_rse_id': self.source_rse_id, - 'dest_rse_id': self.dest_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'requested_at': datetime.now(), - 'request_id': generate_uuid(), - 'name': name2, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }] - queue_requests(requests, session=self.db_session) - request = get_request_by_did(self.scope, name1, self.dest_rse_id, session=self.db_session) - release_waiting_requests_per_deadline(self.source_rse_id, deadline=1, session=self.db_session) - request = get_request_by_did(self.scope, name1, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.QUEUED - request = get_request_by_did(self.scope, name2, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.WAITING - - # a request that exceeded the maximal waiting time to be released (1 hour) -> release one request -> release all requests of the same dataset - name1 = generate_uuid() - add_replica(self.source_rse_id, self.scope, name1, 1, self.account, session=self.db_session) - name2 = generate_uuid() - add_replica(self.source_rse_id, self.scope, name2, 1, self.account, session=self.db_session) - name3 = generate_uuid() - add_replica(self.source_rse_id, self.scope, name3, 1, self.account, session=self.db_session) - dataset_name = generate_uuid() - add_did(self.scope, dataset_name, constants.DIDType.DATASET, self.account, session=self.db_session) - attach_dids(self.scope, dataset_name, [{'name': name1, 'scope': self.scope}, {'name': name2, 'scope': self.scope}], self.account, session=self.db_session) - current_hour = datetime.now().hour - requests = [{ - 'source_rse_id': self.source_rse_id, - 'dest_rse_id': self.dest_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'requested_at': datetime.now().replace(hour=current_hour - 2), - 'request_id': generate_uuid(), - 'name': name1, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }, { - 'source_rse_id': self.source_rse_id, - 'dest_rse_id': self.dest_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'requested_at': datetime.now(), - 'request_id': generate_uuid(), - 'name': name2, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }, { - 'source_rse_id': self.source_rse_id, - 'dest_rse_id': self.dest_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'requested_at': datetime.now(), - 'request_id': generate_uuid(), - 'name': name3, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }] - queue_requests(requests, session=self.db_session) - release_waiting_requests_per_deadline(self.source_rse_id, deadline=1, session=self.db_session) - request = get_request_by_did(self.scope, name1, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.QUEUED - request = get_request_by_did(self.scope, name2, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.QUEUED - request = get_request_by_did(self.scope, name3, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.WAITING - class TestRequestCoreList(unittest.TestCase): diff --git a/lib/rucio/tests/test_throttler.py b/lib/rucio/tests/test_throttler.py index 8d86d34caf..a86831ff2a 100644 --- a/lib/rucio/tests/test_throttler.py +++ b/lib/rucio/tests/test_throttler.py @@ -1,4 +1,5 @@ -# Copyright 2019-2020 CERN for the benefit of the ATLAS collaboration. +# -*- coding: utf-8 -*- +# Copyright 2019-2020 CERN # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,12 +19,12 @@ # - Eli Chadwick , 2020 # - Patrick Austin , 2020 # - Benedikt Ziemons , 2020 -# -# PY3K COMPATIBLE import unittest from datetime import datetime +import pytest + from rucio.common.config import config_get, config_get_bool from rucio.common.types import InternalAccount, InternalScope from rucio.common.utils import generate_uuid @@ -37,6 +38,9 @@ from rucio.tests.test_request import skiplimitedsql +pytest.skip(allow_module_level=True) + + class TestThrottlerGroupedFIFO(unittest.TestCase): """Throttler per destination RSE and on all activites per grouped FIFO """ diff --git a/tools/add_header b/tools/add_header index 13a97a2a74..6f875b0661 100755 --- a/tools/add_header +++ b/tools/add_header @@ -193,6 +193,7 @@ def main(arguments): header += '# - %(name)s %(mail)s, %(min)s\n' % authors[author] else: header += '# - %(name)s %(mail)s, %(min)s-%(max)s\n' % authors[author] + header += '\n' if not arguments.dry_run: if not arguments.in_place: @@ -217,16 +218,12 @@ def main(arguments): modified.write(lines[0]) modified.write(header) - first = True count = 50 # only deletes old comments in the first n lines or until the first non-comment for line in lines: not_comment_line = not line.startswith('#') and len(line.lstrip()) != 0 if count != 0 and not_comment_line: count = 0 if count == 0 or not_comment_line: - if first: - first = False - modified.write('\n') modified.write(line) else: