diff --git a/etc/docker/dev/docker-compose.yml b/etc/docker/dev/docker-compose.yml index c49ea14d33..1d4fa22f34 100644 --- a/etc/docker/dev/docker-compose.yml +++ b/etc/docker/dev/docker-compose.yml @@ -1,7 +1,7 @@ version: "2" services: rucio: - image: rucio/rucio-dev + image: rucio/rucio-dev:py3 hostname: rucio ports: - "443:443" @@ -9,9 +9,9 @@ services: - ruciodb:ruciodb - graphite:graphite volumes: - - ../../../tools:/opt/rucio/tools - - ../../../bin:/opt/rucio/bin - - ../../../lib:/opt/rucio/lib + - ../../../tools:/opt/rucio/tools:Z + - ../../../bin:/opt/rucio/bin:Z + - ../../../lib:/opt/rucio/lib:Z environment: - X509_USER_CERT=/opt/rucio/etc/usercert.pem - X509_USER_KEY=/opt/rucio/etc/userkey.pem diff --git a/lib/rucio/core/request.py b/lib/rucio/core/request.py index f8055b187b..71e7d3e659 100644 --- a/lib/rucio/core/request.py +++ b/lib/rucio/core/request.py @@ -28,13 +28,12 @@ from six import string_types from sqlalchemy import and_, or_, func, update from sqlalchemy.exc import IntegrityError -# from sqlalchemy.sql import tuple_ from sqlalchemy.sql.expression import asc, false, true from rucio.common.config import config_get_bool -from rucio.common.exception import RequestNotFound, RucioException, UnsupportedOperation, ConfigNotFound +from rucio.common.exception import RequestNotFound, RucioException, UnsupportedOperation from rucio.common.types import InternalAccount, InternalScope -from rucio.common.utils import generate_uuid, chunks, get_parsed_throttler_mode +from rucio.common.utils import generate_uuid, chunks from rucio.core.message import add_message from rucio.core.monitor import record_counter, record_timer from rucio.core.rse import get_rse_name, get_rse_vo @@ -126,7 +125,7 @@ def queue_requests(requests, session=None): request_clause = [] rses = {} - preparer_enabled = config_get_bool('conveyor', 'use_preparer', default=False) + preparer_enabled = config_get_bool('conveyor', 'use_preparer', raise_exception=False, default=False) for req in requests: if isinstance(req['attributes'], string_types): diff --git a/lib/rucio/core/transfer.py b/lib/rucio/core/transfer.py index 5e12edac49..4b25fc2963 100644 --- a/lib/rucio/core/transfer.py +++ b/lib/rucio/core/transfer.py @@ -49,19 +49,19 @@ from dogpile.cache import make_region from dogpile.cache.api import NoValue -from sqlalchemy import and_, func +from sqlalchemy import and_ from sqlalchemy.exc import IntegrityError -from sqlalchemy.sql.expression import false, null +from sqlalchemy.sql.expression import false from rucio.common import constants +from rucio.common.config import config_get +from rucio.common.constants import SUPPORTED_PROTOCOLS from rucio.common.exception import (InvalidRSEExpression, NoDistance, RequestNotFound, RSEProtocolNotSupported, RucioException, UnsupportedOperation) -from rucio.common.config import config_get from rucio.common.rse_attributes import get_rse_attributes from rucio.common.types import InternalAccount from rucio.common.utils import construct_surl -from rucio.common.constants import SUPPORTED_PROTOCOLS from rucio.core import did, message as message_core, request as request_core from rucio.core.config import get as core_config_get from rucio.core.monitor import record_counter, record_timer @@ -76,7 +76,6 @@ from rucio.rse import rsemanager as rsemgr from rucio.transfertool.fts3 import FTS3Transfertool - # Extra modules: Only imported if available EXTRA_MODULES = {'globus_sdk': False} diff --git a/lib/rucio/daemons/conveyor/preparer.py b/lib/rucio/daemons/conveyor/preparer.py index aeeee12c61..be5cc74ec5 100644 --- a/lib/rucio/daemons/conveyor/preparer.py +++ b/lib/rucio/daemons/conveyor/preparer.py @@ -15,79 +15,112 @@ # # Authors: # - Benedikt Ziemons , 2020 -from time import sleep + +import logging +import sys +import threading +from time import time + +from sqlalchemy import update import rucio.db.sqla.util from rucio.common import exception +from rucio.common.config import config_get from rucio.common.exception import ConfigNotFound -from rucio.common.utils import get_parsed_throttler_mode +from rucio.common.utils import get_parsed_throttler_mode, chunks from rucio.core.config import get from rucio.core.rse import get_rse_transfer_limits from rucio.core.transfer import __list_transfer_requests_and_source_replicas +from rucio.db.sqla import models from rucio.db.sqla.constants import RequestState -from rucio.db.sqla.session import read_session +from rucio.db.sqla.session import transactional_session - -stopping = False +graceful_stop = threading.Event() def stop(): - stopping = True + """ + Graceful exit. + """ + + graceful_stop.set() -def run(once=False): +def run(once=False, sleep_time=60): + """ + Running the preparer daemon either once or by default in a loop until stop is called. + """ + config_loglevel = config_get('common', 'loglevel', raise_exception=False, default='DEBUG').upper() + logging.basicConfig(stream=sys.stdout, + level=getattr(logging, config_loglevel), + format='%(asctime)s\t%(process)d\t%(levelname)s\t%(message)s') + if rucio.db.sqla.util.is_old_db(): raise exception.DatabaseException('Database was not updated, daemon won\'t start') - if once: - runn() - else: - while not stopping: - runn() - sleep(30) + graceful_stop.wait(10) + while not graceful_stop.is_set(): + start_time = time() + + run_once() + + if once: + return + + end_time = time() + time_diff = end_time - start_time + if time_diff < sleep_time: + logging.info('Sleeping for a while : %.2f seconds', (sleep_time - time_diff)) + graceful_stop.wait(sleep_time - time_diff) -@read_session -def runn(session=None): +@transactional_session +def run_once(chunk_size=50, session=None): req_sources = __list_transfer_requests_and_source_replicas(request_state=RequestState.PREPARING, minimum_distance=True, session=session) - for req_id, rule_id, scope, name, md5, adler32, bytes, activity, attributes, previous_attempt_id, dest_rse_id, account, source_rse_id, rse, deterministic, rse_type, path, retry_count, src_url, ranking, link_ranking in req_sources: - new_request = _prepare_request({ - 'attributes': attributes, - 'source_rse_id': source_rse_id, - 'dest_rse_id': dest_rse_id, - # TODO: fill object; core method? - }, session=session) - # TODO: update request + for chunk in chunks(req_sources, chunk_size): + # from rucio.core.transfer.get_transfer_requests_and_source_replicas + for req_id, rule_id, scope, name, md5, adler32, bytes, activity, attributes, previous_attempt_id, dest_rse_id, account, source_rse_id, rse, deterministic, rse_type, path, retry_count, src_url, ranking, link_ranking in chunk: + new_state = _prepare_request( + session=session, + activity=activity, + source_rse_id=source_rse_id, + dest_rse_id=dest_rse_id, + ) + session.query(models.Request).filter_by(id=req_id).update({models.Request.state: new_state}, synchronize_session=False) + session.commit() -@read_session -def _prepare_request(request, session=None): - transfer_limits = get_rse_transfer_limits(session=session) +def _prepare_request(session, activity, source_rse_id, dest_rse_id) -> RequestState: + """ + Takes request attributes to return a new state for the request + based on throttler settings. Always returns QUEUED, + if the throttler mode is not set. + """ try: throttler_mode = get('throttler', 'mode', default=None, use_cache=False, session=session) except ConfigNotFound: throttler_mode = None - activity_limit = transfer_limits.get(request['attributes']['activity'], {}) - all_activities_limit = transfer_limits.get('all_activities', {}) limit_found = False if throttler_mode: + transfer_limits = get_rse_transfer_limits(session=session) + activity_limit = transfer_limits.get(activity, {}) + all_activities_limit = transfer_limits.get('all_activities', {}) direction, all_activities = get_parsed_throttler_mode(throttler_mode) if direction == 'source': if all_activities: - if all_activities_limit.get(request['source_rse_id']): + if all_activities_limit.get(source_rse_id): limit_found = True else: - if activity_limit.get(request['source_rse_id']): + if activity_limit.get(source_rse_id): limit_found = True elif direction == 'destination': if all_activities: - if all_activities_limit.get(request['dest_rse_id']): + if all_activities_limit.get(dest_rse_id): limit_found = True else: - if activity_limit.get(request['dest_rse_id']): + if activity_limit.get(dest_rse_id): limit_found = True - request['state'] = RequestState.WAITING if limit_found else RequestState.QUEUED - return request + return RequestState.WAITING if limit_found else RequestState.QUEUED diff --git a/lib/rucio/tests/test_preparer.py b/lib/rucio/tests/test_preparer.py index ff53ab4e06..320c758ac6 100644 --- a/lib/rucio/tests/test_preparer.py +++ b/lib/rucio/tests/test_preparer.py @@ -18,8 +18,10 @@ import pytest -from rucio.core.rse import get_rse_id +from rucio.core import config +from rucio.core.rse import get_rse_id, set_rse_transfer_limits from rucio.core.transfer import __list_transfer_requests_and_source_replicas +from rucio.daemons.conveyor import preparer from rucio.db.sqla import models, session from rucio.db.sqla.constants import RequestState @@ -37,9 +39,40 @@ def mock_request(vo): db_session.commit() +@pytest.fixture +def dest_throttler(mock_request): + db_session = session.get_session() + config.set('throttler', 'mode', 'DEST_PER_ACT', session=db_session) + set_rse_transfer_limits(mock_request.dest_rse_id, activity=mock_request.activity, max_transfers=1, strategy='fifo', session=db_session) + db_session.commit() + yield + db_session.query(models.RSETransferLimit).filter_by(rse_id=mock_request.dest_rse_id).delete() + config.remove_option('throttler', 'mode', session=db_session) + db_session.commit() + + def test_listing_preparing_transfers(mock_request): req_sources = __list_transfer_requests_and_source_replicas(request_state=RequestState.PREPARING, minimum_distance=True) assert len(req_sources) == 1 req_id = req_sources[0][0] assert req_id == mock_request.id + + +@pytest.mark.usefixtures('dest_throttler') +def test_preparer_setting_request_state_waiting(mock_request): + db_session = session.get_session() + preparer.run_once(session=db_session) + + updated_mock_request = db_session.query(models.Request).filter_by(id=mock_request.id).one() # type: models.Request + + assert updated_mock_request.state == RequestState.WAITING + + +def test_preparer_setting_request_state_queued(mock_request): + db_session = session.get_session() + preparer.run_once(session=db_session) + + updated_mock_request = db_session.query(models.Request).filter_by(id=mock_request.id).one() # type: models.Request + + assert updated_mock_request.state == RequestState.QUEUED diff --git a/lib/rucio/tests/test_request.py b/lib/rucio/tests/test_request.py index 5efb978d97..abb32e0d81 100644 --- a/lib/rucio/tests/test_request.py +++ b/lib/rucio/tests/test_request.py @@ -79,8 +79,8 @@ def setUp(self): def tearDown(self): self.db_session.commit() - def test_queue_requests_state_no_throttler(self): - """ REQUEST (CORE): queue requests with default throttler mode (None). """ + def test_queue_requests_state(self): + """ REQUEST (CORE): test queuing requests """ name = generate_uuid() name2 = generate_uuid() name3 = generate_uuid() @@ -150,1094 +150,6 @@ def test_queue_requests_state_no_throttler(self): request = get_request_by_did(self.scope, name3, self.dest_rse_id2, session=self.db_session) assert request['state'] == constants.RequestState.QUEUED - # def test_queue_requests_source_rse(self): - # """ REQUEST (CORE): queue requests and select correct source RSE. """ - # # test correct selection of source RSE - # name = generate_uuid() - # size = 8 - # add_replica(self.source_rse_id, self.scope, name, size, self.account, session=self.db_session) - # add_replica(self.source_rse_id2, self.scope, name, size, self.account, session=self.db_session) - # add_distance(self.source_rse_id, self.dest_rse_id, 1, session=self.db_session) - # add_distance(self.source_rse_id2, self.dest_rse_id, 2, session=self.db_session) - # requests = [{ - # 'dest_rse_id': self.dest_rse_id, - # 'request_type': constants.RequestType.TRANSFER, - # 'request_id': generate_uuid(), - # 'name': name, - # 'scope': self.scope, - # 'rule_id': generate_uuid(), - # 'retry_count': 1, - # 'requested_at': datetime.now().replace(year=2015), - # 'attributes': { - # 'activity': self.user_activity, - # 'bytes': size, - # 'md5': '', - # 'adler32': '' - # } - # }] - # queue_requests(requests, session=self.db_session) - # request = get_request_by_did(self.scope, name, self.dest_rse_id, session=self.db_session) - # # select source RSE with smallest distance - # assert request['source_rse_id'] == self.source_rse_id - # assert request['name'] == name - # assert request['scope'] == self.scope - # assert request['state'] == constants.RequestState.QUEUED - - def test_queue_requests_state_1(self): - """ REQUEST (CORE): queue requests and set correct request state. """ - # test correct request state depending on throttler mode - config_set('throttler', 'mode', 'DEST_PER_ACT', session=self.db_session) - set_rse_transfer_limits(self.dest_rse_id, self.user_activity, max_transfers=1, session=self.db_session) - size = 1 - name = generate_uuid() - add_replica(self.source_rse_id2, self.scope, name, size, self.account, session=self.db_session) - name2 = generate_uuid() - add_replica(self.source_rse_id2, self.scope, name2, size, self.account, session=self.db_session) - requests = [{ - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'requested_at': datetime.now().replace(year=2015), - 'attributes': { - 'activity': self.user_activity, - 'bytes': size, - 'md5': '', - 'adler32': '' - } - }, { - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name2, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'requested_at': datetime.now().replace(year=2015), - 'attributes': { - 'activity': 'Activity without limit', - 'bytes': size, - 'md5': '', - 'adler32': '' - } - }] - queue_requests(requests, session=self.db_session) - request = get_request_by_did(self.scope, name, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.WAITING - request = get_request_by_did(self.scope, name2, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.QUEUED - - def test_queue_requests_state_2(self): - """ REQUEST (CORE): queue requests and set correct request state. """ - config_set('throttler', 'mode', 'SRC_PER_ACT', session=self.db_session) - size = 1 - name = generate_uuid() - add_replica(self.source_rse_id2, self.scope, name, size, self.account, session=self.db_session) - name2 = generate_uuid() - add_replica(self.source_rse_id2, self.scope, name2, size, self.account, session=self.db_session) - requests = [{ - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'requested_at': datetime.now().replace(year=2015), - 'attributes': { - 'activity': self.user_activity, - 'bytes': size, - 'md5': '', - 'adler32': '' - } - }] - queue_requests(requests, session=self.db_session) - request = get_request_by_did(self.scope, name, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.QUEUED - - -class TestRequestCoreRelease(unittest.TestCase): - - @classmethod - def setUpClass(cls): - if config_get_bool('common', 'multi_vo', raise_exception=False, default=False): - cls.vo = {'vo': config_get('client', 'vo', raise_exception=False, default='tst')} - else: - cls.vo = {} - - cls.db_session = session.get_session() - cls.dialect = cls.db_session.bind.dialect.name - cls.dest_rse = 'MOCK' - cls.source_rse = 'MOCK4' - cls.source_rse2 = 'MOCK5' - cls.dest_rse_id = get_rse_id(cls.dest_rse, **cls.vo) - cls.source_rse_id = get_rse_id(cls.source_rse, **cls.vo) - cls.source_rse_id2 = get_rse_id(cls.source_rse2, **cls.vo) - cls.scope = InternalScope('mock', **cls.vo) - cls.account = InternalAccount('root', **cls.vo) - cls.user_activity = 'User Subscription' - cls.all_activities = 'all_activities' - - def setUp(self): - self.db_session.query(models.Request).delete() - self.db_session.query(models.RSETransferLimit).delete() - self.db_session.query(models.Distance).delete() - self.db_session.query(models.Config).delete() - # set transfer limits to put requests in waiting state - set_rse_transfer_limits(self.dest_rse_id, self.user_activity, max_transfers=1, session=self.db_session) - set_rse_transfer_limits(self.dest_rse_id, self.all_activities, max_transfers=1, session=self.db_session) - set_rse_transfer_limits(self.dest_rse_id, 'ignore', max_transfers=1, session=self.db_session) - config_set('throttler', 'mode', 'DEST_PER_ACT', session=self.db_session) - self.db_session.commit() - - def tearDown(self): - self.db_session.commit() - - @skiplimitedsql - def test_release_waiting_requests_per_free_volume(self): - """ REQUEST (CORE): release waiting requests that fit grouped in available volume.""" - # release unattached requests that fit in available volume with respect to already submitted transfers - name1 = generate_uuid() - add_replica(self.source_rse_id, self.scope, name1, 1, self.account, session=self.db_session) - name2 = generate_uuid() - add_replica(self.source_rse_id, self.scope, name2, 1, self.account, session=self.db_session) - name3 = generate_uuid() - add_replica(self.source_rse_id, self.scope, name3, 1, self.account, session=self.db_session) - request = models.Request(dest_rse_id=self.dest_rse_id, bytes=2, activity=self.all_activities, state=constants.RequestState.SUBMITTED) - request.save(session=self.db_session) - volume = 10 - set_rse_transfer_limits(self.dest_rse_id, self.all_activities, volume=volume, max_transfers=1, session=self.db_session) - requests = [{ - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name1, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'requested_at': datetime.now().replace(year=2015), - 'attributes': { - 'activity': 'User Subscription', - 'bytes': 8, - 'md5': '', - 'adler32': '' - } - }, { - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name2, - 'requested_at': datetime.now().replace(year=2020), - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'attributes': { - 'activity': 'User Subscription', - 'bytes': 2, - 'md5': '', - 'adler32': '' - } - }, { - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name3, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'requested_at': datetime.now().replace(year=2000), - 'attributes': { - 'activity': 'User Subscription', - 'bytes': 10, - 'md5': '', - 'adler32': '' - } - }] - queue_requests(requests, session=self.db_session) - release_waiting_requests_per_free_volume(self.dest_rse_id, volume=volume, session=self.db_session) - # released because small enough - request = get_request_by_did(self.scope, name1, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.QUEUED - # still waiting because requested later and to big - request = get_request_by_did(self.scope, name2, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.WAITING - # still waiting because too big - request = get_request_by_did(self.scope, name3, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.WAITING - - # release attached requests that fit together with the dataset in available volume with respect to already submitted transfers - self.db_session.query(models.Request).delete() - self.db_session.commit() - name1 = generate_uuid() - add_replica(self.source_rse_id, self.scope, name1, 1, self.account, session=self.db_session) - name2 = generate_uuid() - add_replica(self.source_rse_id, self.scope, name2, 1, self.account, session=self.db_session) - name3 = generate_uuid() - add_replica(self.source_rse_id, self.scope, name3, 1, self.account, session=self.db_session) - name4 = generate_uuid() - add_replica(self.source_rse_id, self.scope, name4, 1, self.account, session=self.db_session) - dataset1_name = generate_uuid() - add_did(self.scope, dataset1_name, constants.DIDType.DATASET, self.account, session=self.db_session) - attach_dids(self.scope, dataset1_name, [{'name': name1, 'scope': self.scope}, {'name': name4, 'scope': self.scope}], self.account, session=self.db_session) - dataset2_name = generate_uuid() - add_did(self.scope, dataset2_name, constants.DIDType.DATASET, self.account, session=self.db_session) - attach_dids(self.scope, dataset2_name, [{'name': name2, 'scope': self.scope}, {'name': name3, 'scope': self.scope}], self.account, session=self.db_session) - request = models.Request(dest_rse_id=self.dest_rse_id, bytes=2, activity=self.all_activities, state=constants.RequestState.SUBMITTED) - request.save(session=self.db_session) - volume = 10 - set_rse_transfer_limits(self.dest_rse_id, self.all_activities, volume=volume, max_transfers=1, session=self.db_session) - requests = [{ - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name1, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'requested_at': datetime.now().replace(year=2015), - 'attributes': { - 'activity': self.user_activity, - 'bytes': 6, - 'md5': '', - 'adler32': '' - } - }, { - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name2, - 'requested_at': datetime.now().replace(year=2020), - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'attributes': { - 'activity': self.user_activity, - 'bytes': 2, - 'md5': '', - 'adler32': '' - } - }, { - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name3, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'requested_at': datetime.now().replace(year=2000), - 'attributes': { - 'activity': self.user_activity, - 'bytes': 10, - 'md5': '', - 'adler32': '' - } - }, { - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name4, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'requested_at': datetime.now().replace(year=2030), - 'attributes': { - 'activity': self.user_activity, - 'bytes': 2, - 'md5': '', - 'adler32': '' - } - }] - queue_requests(requests, session=self.db_session) - release_waiting_requests_per_free_volume(self.dest_rse_id, volume=volume, session=self.db_session) - # released because dataset fits in volume - request = get_request_by_did(self.scope, name1, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.QUEUED - request = get_request_by_did(self.scope, name4, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.QUEUED - # waiting because dataset is too big - request = get_request_by_did(self.scope, name2, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.WAITING - request = get_request_by_did(self.scope, name3, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.WAITING - - # release requests with no available volume -> release nothing - self.db_session.query(models.Request).delete() - self.db_session.commit() - name1 = generate_uuid() - add_replica(self.dest_rse_id, self.scope, name1, 1, self.account, session=self.db_session) - volume = 0 - set_rse_transfer_limits(self.dest_rse_id, self.all_activities, volume=volume, max_transfers=1, session=self.db_session) - requests = [{ - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name1, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'requested_at': datetime.now().replace(year=2015), - 'attributes': { - 'activity': self.user_activity, - 'bytes': 8, - 'md5': '', - 'adler32': '' - } - }] - queue_requests(requests, session=self.db_session) - release_waiting_requests_per_free_volume(self.dest_rse_id, volume=volume, session=self.db_session) - # waiting because no available volume - request = get_request_by_did(self.scope, name1, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.WAITING - - @skiplimitedsql - def test_release_waiting_requests_grouped_fifo(self): - """ REQUEST (CORE): release waiting requests based on grouped FIFO. """ - # set volume and deadline to 0 to check first without releasing extra requests - set_rse_transfer_limits(self.dest_rse_id, self.all_activities, volume=0, max_transfers=1, session=self.db_session) - - # one request with an unattached DID -> one request should be released - self.db_session.query(models.Request).delete() - self.db_session.commit() - name = generate_uuid() - add_replica(self.source_rse_id, self.scope, name, 1, self.account, session=self.db_session) - requests = [{ - 'source_rse_id': self.source_rse_id, - 'dest_rse_id': self.dest_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }] - queue_requests(requests, session=self.db_session) - release_waiting_requests_grouped_fifo(self.dest_rse_id, count=1, volume=0, deadline=0, session=self.db_session) - request = get_request_by_did(self.scope, name, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.QUEUED - - # one request with an attached DID -> one request should be released - self.db_session.query(models.Request).delete() - self.db_session.commit() - name = generate_uuid() - dataset_name = generate_uuid() - add_replica(self.source_rse_id, self.scope, name, 1, self.account, session=self.db_session) - add_did(self.scope, dataset_name, constants.DIDType.DATASET, self.account, session=self.db_session) - attach_dids(self.scope, dataset_name, [{'name': name, 'scope': self.scope}], self.account, session=self.db_session) - requests = [{ - 'source_rse_id': self.source_rse_id, - 'dest_rse_id': self.dest_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'scope': self.scope, - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }] - queue_requests(requests, session=self.db_session) - release_waiting_requests_grouped_fifo(self.dest_rse_id, count=1, volume=0, deadline=0, session=self.db_session) - request = get_request_by_did(self.scope, name, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.QUEUED - - # five requests with different requested_at and multiple attachments per collection -> release only one request -> two requests of one collection should be released - self.db_session.query(models.Request).delete() - self.db_session.commit() - name1 = generate_uuid() - name2 = generate_uuid() - name3 = generate_uuid() - name4 = generate_uuid() - name5 = generate_uuid() - dataset_1_name = generate_uuid() - add_did(self.scope, dataset_1_name, constants.DIDType.DATASET, self.account, session=self.db_session) - dataset_2_name = generate_uuid() - add_did(self.scope, dataset_2_name, constants.DIDType.DATASET, self.account, session=self.db_session) - add_replica(self.source_rse_id, self.scope, name1, 1, self.account, session=self.db_session) - add_replica(self.source_rse_id, self.scope, name2, 1, self.account, session=self.db_session) - add_replica(self.source_rse_id, self.scope, name3, 1, self.account, session=self.db_session) - add_replica(self.source_rse_id, self.scope, name4, 1, self.account, session=self.db_session) - add_replica(self.source_rse_id, self.scope, name5, 1, self.account, session=self.db_session) - attach_dids(self.scope, dataset_1_name, [{'name': name1, 'scope': self.scope}, {'name': name2, 'scope': self.scope}], self.account, session=self.db_session) - attach_dids(self.scope, dataset_2_name, [{'name': name3, 'scope': self.scope}, {'name': name4, 'scope': self.scope}], self.account, session=self.db_session) - - requests = [{ - 'source_rse_id': self.source_rse_id, - 'dest_rse_id': self.dest_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name1, - 'scope': self.scope, - 'retry_count': 1, - 'rule_id': generate_uuid(), - 'requested_at': datetime.now().replace(year=2000), - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }, { - 'source_rse_id': self.source_rse_id, - 'dest_rse_id': self.dest_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name2, - 'requested_at': datetime.now().replace(year=2020), - 'rule_id': generate_uuid(), - 'scope': self.scope, - 'retry_count': 1, - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }, { - 'source_rse_id': self.source_rse_id, - 'dest_rse_id': self.dest_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name3, - 'requested_at': datetime.now().replace(year=2015), - 'retry_count': 1, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }, { - 'source_rse_id': self.source_rse_id, - 'dest_rse_id': self.dest_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name4, - 'requested_at': datetime.now().replace(year=2010), - 'retry_count': 1, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }, { - 'source_rse_id': self.source_rse_id, - 'dest_rse_id': self.dest_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name5, - 'retry_count': 1, - 'requested_at': datetime.now().replace(year=2018), - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }] - queue_requests(requests, session=self.db_session) - release_waiting_requests_grouped_fifo(self.dest_rse_id, count=1, deadline=0, volume=0, session=self.db_session) - request_1 = get_request_by_did(self.scope, name1, self.dest_rse_id, session=self.db_session) - assert request_1['state'] == constants.RequestState.QUEUED - request_2 = get_request_by_did(self.scope, name2, self.dest_rse_id, session=self.db_session) - assert request_2['state'] == constants.RequestState.QUEUED - request_3 = get_request_by_did(self.scope, name3, self.dest_rse_id, session=self.db_session) - assert request_3['state'] == constants.RequestState.WAITING - request_4 = get_request_by_did(self.scope, name4, self.dest_rse_id, session=self.db_session) - assert request_4['state'] == constants.RequestState.WAITING - request_5 = get_request_by_did(self.scope, name5, self.dest_rse_id, session=self.db_session) - assert request_5['state'] == constants.RequestState.WAITING - - # with maximal volume check -> release one request -> three requests should be released because of attachments and free volume space - self.db_session.query(models.Request).delete() - self.db_session.commit() - name1 = generate_uuid() - name2 = generate_uuid() - name3 = generate_uuid() - dataset_1_name = generate_uuid() - add_did(self.scope, dataset_1_name, constants.DIDType.DATASET, self.account, session=self.db_session) - add_replica(self.source_rse_id, self.scope, name1, 1, self.account, session=self.db_session) - add_replica(self.source_rse_id, self.scope, name2, 1, self.account, session=self.db_session) - add_replica(self.source_rse_id, self.scope, name3, 1, self.account, session=self.db_session) - add_replica(self.source_rse_id, self.scope, name4, 1, self.account, session=self.db_session) - attach_dids(self.scope, dataset_1_name, [{'name': name1, 'scope': self.scope}], self.account, session=self.db_session) - attach_dids(self.scope, dataset_1_name, [{'name': name2, 'scope': self.scope}], self.account, session=self.db_session) - set_rse_transfer_limits(self.dest_rse_id, self.all_activities, volume=10, max_transfers=1, session=self.db_session) - requests = [{ - 'source_rse_id': self.source_rse_id, - 'dest_rse_id': self.dest_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name1, - 'bytes': 1, - 'scope': self.scope, - 'retry_count': 1, - 'rule_id': generate_uuid(), - 'requested_at': datetime.now().replace(year=2000), - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }, { - 'source_rse_id': self.source_rse_id, - 'dest_rse_id': self.dest_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name2, - 'bytes': 2, - 'requested_at': datetime.now().replace(year=2020), - 'rule_id': generate_uuid(), - 'scope': self.scope, - 'retry_count': 1, - 'attributes': { - 'activity': self.user_activity, - 'bytes': 2, - 'md5': '', - 'adler32': '' - } - }, { - 'source_rse_id': self.source_rse_id, - 'dest_rse_id': self.dest_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name3, - 'bytes': 3, - 'requested_at': datetime.now().replace(year=2021), # requested after the request below but small enough for max_volume check - 'rule_id': generate_uuid(), - 'scope': self.scope, - 'retry_count': 1, - 'attributes': { - 'activity': self.user_activity, - 'bytes': 3, - 'md5': '', - 'adler32': '' - } - }, { - 'source_rse_id': self.source_rse_id, - 'dest_rse_id': self.dest_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name4, - 'bytes': 3000, - 'requested_at': datetime.now().replace(year=2020), - 'rule_id': generate_uuid(), - 'scope': self.scope, - 'retry_count': 1, - 'attributes': { - 'activity': self.user_activity, - 'bytes': 3000, - 'md5': '', - 'adler32': '' - } - }] - - queue_requests(requests, session=self.db_session) - amount_updated_requests = release_waiting_requests_grouped_fifo(self.dest_rse_id, count=1, deadline=0, volume=10, session=self.db_session) - assert amount_updated_requests == 3 - # released because it got requested first - request_1 = get_request_by_did(self.scope, name1, self.dest_rse_id, session=self.db_session) - assert request_1['state'] == constants.RequestState.QUEUED - # released because the DID is attached to the same dataset - request_2 = get_request_by_did(self.scope, name2, self.dest_rse_id, session=self.db_session) - assert request_2['state'] == constants.RequestState.QUEUED - # released because of available volume - request_3 = get_request_by_did(self.scope, name3, self.dest_rse_id, session=self.db_session) - assert request_3['state'] == constants.RequestState.QUEUED - # still waiting because there is no free volume - request_4 = get_request_by_did(self.scope, name4, self.dest_rse_id, session=self.db_session) - assert request_4['state'] == constants.RequestState.WAITING - - # with maximal volume check -> release one request -> two requests should be released because of attachments - self.db_session.query(models.Request).delete() - self.db_session.commit() - name1 = generate_uuid() - name2 = generate_uuid() - name3 = generate_uuid() - name4 = generate_uuid() - dataset_1_name = generate_uuid() - add_did(self.scope, dataset_1_name, constants.DIDType.DATASET, self.account, session=self.db_session) - add_replica(self.source_rse_id, self.scope, name1, 1, self.account, session=self.db_session) - add_replica(self.source_rse_id, self.scope, name2, 1, self.account, session=self.db_session) - add_replica(self.source_rse_id, self.scope, name3, 1, self.account, session=self.db_session) - add_replica(self.source_rse_id, self.scope, name4, 1, self.account, session=self.db_session) - attach_dids(self.scope, dataset_1_name, [{'name': name1, 'scope': self.scope}], self.account, session=self.db_session) - attach_dids(self.scope, dataset_1_name, [{'name': name2, 'scope': self.scope}], self.account, session=self.db_session) - set_rse_transfer_limits(self.dest_rse_id, self.all_activities, volume=5, max_transfers=1, session=self.db_session) - request = models.Request(dest_rse_id=self.dest_rse_id, bytes=2, activity=self.all_activities, state=constants.RequestState.SUBMITTED) - request.save(session=self.db_session) - requests = [{ - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name1, - 'bytes': 1, - 'scope': self.scope, - 'retry_count': 1, - 'rule_id': generate_uuid(), - 'requested_at': datetime.now().replace(year=2000), - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }, { - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name2, - 'bytes': 2, - 'requested_at': datetime.now().replace(year=2020), - 'rule_id': generate_uuid(), - 'scope': self.scope, - 'retry_count': 1, - 'attributes': { - 'activity': self.user_activity, - 'bytes': 2, - 'md5': '', - 'adler32': '' - } - }, { - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name3, - 'bytes': 1, - 'requested_at': datetime.now().replace(year=2020), - 'rule_id': generate_uuid(), - 'scope': self.scope, - 'retry_count': 1, - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }, { - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name4, - 'bytes': 1, - 'requested_at': datetime.now().replace(year=2020), - 'rule_id': generate_uuid(), - 'scope': self.scope, - 'retry_count': 1, - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }] - - queue_requests(requests, session=self.db_session) - release_waiting_requests_grouped_fifo(self.dest_rse_id, count=1, deadline=0, volume=5, session=self.db_session) - # released because it got requested first - request_1 = get_request_by_did(self.scope, name1, self.dest_rse_id, session=self.db_session) - assert request_1['state'] == constants.RequestState.QUEUED - # released because the DID is attached to the same dataset - request_2 = get_request_by_did(self.scope, name2, self.dest_rse_id, session=self.db_session) - assert request_2['state'] == constants.RequestState.QUEUED - # still waiting because there is no free volume after releasing the two requests above - request_3 = get_request_by_did(self.scope, name3, self.dest_rse_id, session=self.db_session) - assert request_3['state'] == constants.RequestState.WAITING - request_4 = get_request_by_did(self.scope, name4, self.dest_rse_id, session=self.db_session) - assert request_4['state'] == constants.RequestState.WAITING - - # with deadline check -> release 0 requests -> 1 request should be released nonetheless - self.db_session.query(models.Request).delete() - self.db_session.commit() - name1 = generate_uuid() - add_replica(self.source_rse_id, self.scope, name1, 1, self.account) - name2 = generate_uuid() - add_replica(self.source_rse_id, self.scope, name2, 1, self.account) - current_hour = datetime.now().hour - requests = [{ - 'source_rse_id': self.source_rse_id, - 'dest_rse_id': self.dest_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'requested_at': datetime.now().replace(hour=current_hour - 2), - 'request_id': generate_uuid(), - 'name': name1, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }, { - 'source_rse_id': self.source_rse_id, - 'dest_rse_id': self.dest_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'requested_at': datetime.now(), - 'request_id': generate_uuid(), - 'name': name2, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }] - queue_requests(requests, session=self.db_session) - request = get_request_by_did(self.scope, name1, self.dest_rse_id, session=self.db_session) - release_waiting_requests_grouped_fifo(self.source_rse_id, count=0, deadline=1, volume=0, session=self.db_session) - # queued because of deadline - request = get_request_by_did(self.scope, name1, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.QUEUED - # waiting because count=0 - request = get_request_by_did(self.scope, name2, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.WAITING - - def test_release_waiting_requests_fifo(self): - """ REQUEST (CORE): release waiting requests based on FIFO. """ - # without account and activity check - # two requests -> release one request -> request with oldest requested_at date should be released - name1 = generate_uuid() - name2 = generate_uuid() - add_replica(self.source_rse_id, self.scope, name1, 1, self.account, session=self.db_session) - add_replica(self.source_rse_id, self.scope, name2, 1, self.account, session=self.db_session) - requests = [{ - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name1, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'requested_at': datetime.now().replace(year=2018), - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }, { - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'requested_at': datetime.now().replace(year=2020), - 'name': name2, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }] - queue_requests(requests, session=self.db_session) - release_waiting_requests_fifo(self.dest_rse_id, count=1, session=self.db_session) - request = get_request_by_did(self.scope, name1, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.QUEUED - request2 = get_request_by_did(self.scope, name2, self.dest_rse_id, session=self.db_session) - assert request2['state'] == constants.RequestState.WAITING - - # with activity and account check - # two requests -> release two request -> requests with correct account and activity should be released - self.db_session.query(models.Request).delete() - self.db_session.commit() - name1 = generate_uuid() - name2 = generate_uuid() - name3 = generate_uuid() - name4 = generate_uuid() - add_replica(self.source_rse_id, self.scope, name1, 1, self.account, session=self.db_session) - add_replica(self.source_rse_id, self.scope, name2, 1, self.account, session=self.db_session) - add_replica(self.source_rse_id, self.scope, name3, 1, self.account, session=self.db_session) - add_replica(self.source_rse_id, self.scope, name4, 1, self.account, session=self.db_session) - requests = [{ - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name1, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'account': self.account, - 'requested_at': datetime.now().replace(year=2018), - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }, { - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'requested_at': datetime.now().replace(year=2020), - 'name': name2, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'account': self.account, - 'attributes': { - 'activity': 'ignore', - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }, { - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'requested_at': datetime.now().replace(year=2020), - 'name': name3, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'account': InternalAccount('jdoe', **self.vo), - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }, { - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'requested_at': datetime.now().replace(year=2020), # requested latest but account and activity are correct - 'name': name4, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'account': self.account, - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }] - queue_requests(requests, session=self.db_session) - release_waiting_requests_fifo(self.dest_rse_id, count=2, account=self.account, activity=self.user_activity, session=self.db_session) - request = get_request_by_did(self.scope, name1, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.QUEUED - request = get_request_by_did(self.scope, name2, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.WAITING - request = get_request_by_did(self.scope, name3, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.WAITING - request = get_request_by_did(self.scope, name4, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.QUEUED - - def test_release_waiting_requests_all(self): - """ REQUEST (CORE): release all waiting requests. """ - name1 = generate_uuid() - name2 = generate_uuid() - add_replica(self.source_rse_id, self.scope, name1, 1, self.account, session=self.db_session) - add_replica(self.source_rse_id, self.scope, name2, 1, self.account, session=self.db_session) - requests = [{ - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'name': name1, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'requested_at': datetime.now().replace(year=2018), - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }, { - 'dest_rse_id': self.dest_rse_id, - 'source_rse_id': self.source_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'request_id': generate_uuid(), - 'requested_at': datetime.now().replace(year=2020), - 'name': name2, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }] - queue_requests(requests, session=self.db_session) - release_all_waiting_requests(self.dest_rse_id, session=self.db_session) - request = get_request_by_did(self.scope, name1, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.QUEUED - request = get_request_by_did(self.scope, name2, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.QUEUED - - @skiplimitedsql - def test_release_waiting_requests_per_deadline(self): - """ REQUEST (CORE): release grouped waiting requests that exceeded waiting time.""" - # a request that exceeded the maximal waiting time to be released (1 hour) -> release one request -> only the exceeded request should be released - set_rse_transfer_limits(self.source_rse_id, activity=self.all_activities, strategy='grouped_fifo', session=self.db_session) - name1 = generate_uuid() - add_replica(self.source_rse_id, self.scope, name1, 1, self.account, session=self.db_session) - name2 = generate_uuid() - add_replica(self.source_rse_id, self.scope, name2, 1, self.account, session=self.db_session) - current_hour = datetime.now().hour - requests = [{ - 'source_rse_id': self.source_rse_id, - 'dest_rse_id': self.dest_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'requested_at': datetime.now().replace(hour=current_hour - 2), - 'request_id': generate_uuid(), - 'name': name1, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }, { - 'source_rse_id': self.source_rse_id, - 'dest_rse_id': self.dest_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'requested_at': datetime.now(), - 'request_id': generate_uuid(), - 'name': name2, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }] - queue_requests(requests, session=self.db_session) - request = get_request_by_did(self.scope, name1, self.dest_rse_id, session=self.db_session) - release_waiting_requests_per_deadline(self.source_rse_id, deadline=1, session=self.db_session) - request = get_request_by_did(self.scope, name1, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.QUEUED - request = get_request_by_did(self.scope, name2, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.WAITING - - # a request that exceeded the maximal waiting time to be released (1 hour) -> release one request -> release all requests of the same dataset - name1 = generate_uuid() - add_replica(self.source_rse_id, self.scope, name1, 1, self.account, session=self.db_session) - name2 = generate_uuid() - add_replica(self.source_rse_id, self.scope, name2, 1, self.account, session=self.db_session) - name3 = generate_uuid() - add_replica(self.source_rse_id, self.scope, name3, 1, self.account, session=self.db_session) - dataset_name = generate_uuid() - add_did(self.scope, dataset_name, constants.DIDType.DATASET, self.account, session=self.db_session) - attach_dids(self.scope, dataset_name, [{'name': name1, 'scope': self.scope}, {'name': name2, 'scope': self.scope}], self.account, session=self.db_session) - current_hour = datetime.now().hour - requests = [{ - 'source_rse_id': self.source_rse_id, - 'dest_rse_id': self.dest_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'requested_at': datetime.now().replace(hour=current_hour - 2), - 'request_id': generate_uuid(), - 'name': name1, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }, { - 'source_rse_id': self.source_rse_id, - 'dest_rse_id': self.dest_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'requested_at': datetime.now(), - 'request_id': generate_uuid(), - 'name': name2, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }, { - 'source_rse_id': self.source_rse_id, - 'dest_rse_id': self.dest_rse_id, - 'request_type': constants.RequestType.TRANSFER, - 'requested_at': datetime.now(), - 'request_id': generate_uuid(), - 'name': name3, - 'scope': self.scope, - 'rule_id': generate_uuid(), - 'retry_count': 1, - 'attributes': { - 'activity': self.user_activity, - 'bytes': 1, - 'md5': '', - 'adler32': '' - } - }] - queue_requests(requests, session=self.db_session) - release_waiting_requests_per_deadline(self.source_rse_id, deadline=1, session=self.db_session) - request = get_request_by_did(self.scope, name1, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.QUEUED - request = get_request_by_did(self.scope, name2, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.QUEUED - request = get_request_by_did(self.scope, name3, self.dest_rse_id, session=self.db_session) - assert request['state'] == constants.RequestState.WAITING - class TestRequestCoreList(unittest.TestCase): diff --git a/lib/rucio/tests/test_throttler.py b/lib/rucio/tests/test_throttler.py index 8d86d34caf..7016440f71 100644 --- a/lib/rucio/tests/test_throttler.py +++ b/lib/rucio/tests/test_throttler.py @@ -24,6 +24,8 @@ import unittest from datetime import datetime +import pytest + from rucio.common.config import config_get, config_get_bool from rucio.common.types import InternalAccount, InternalScope from rucio.common.utils import generate_uuid @@ -37,6 +39,9 @@ from rucio.tests.test_request import skiplimitedsql +pytest.skip(allow_module_level=True) + + class TestThrottlerGroupedFIFO(unittest.TestCase): """Throttler per destination RSE and on all activites per grouped FIFO """