Skip to content

Commit

Permalink
Transfers: Add tests for preparer; rucio#4056
Browse files Browse the repository at this point in the history
Implement throttling in preparer and disable/remove throttler tests for now.
  • Loading branch information
bziemons committed Oct 22, 2020
1 parent a0881d1 commit e9da9ff
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 1,138 deletions.
8 changes: 4 additions & 4 deletions etc/docker/dev/docker-compose.yml
@@ -1,17 +1,17 @@
version: "2"
services:
rucio:
image: rucio/rucio-dev
image: rucio/rucio-dev:py3
hostname: rucio
ports:
- "443:443"
links:
- ruciodb:ruciodb
- graphite:graphite
volumes:
- ../../../tools:/opt/rucio/tools
- ../../../bin:/opt/rucio/bin
- ../../../lib:/opt/rucio/lib
- ../../../tools:/opt/rucio/tools:Z
- ../../../bin:/opt/rucio/bin:Z
- ../../../lib:/opt/rucio/lib:Z
environment:
- X509_USER_CERT=/opt/rucio/etc/usercert.pem
- X509_USER_KEY=/opt/rucio/etc/userkey.pem
Expand Down
7 changes: 3 additions & 4 deletions lib/rucio/core/request.py
Expand Up @@ -28,13 +28,12 @@
from six import string_types
from sqlalchemy import and_, or_, func, update
from sqlalchemy.exc import IntegrityError
# from sqlalchemy.sql import tuple_
from sqlalchemy.sql.expression import asc, false, true

from rucio.common.config import config_get_bool
from rucio.common.exception import RequestNotFound, RucioException, UnsupportedOperation, ConfigNotFound
from rucio.common.exception import RequestNotFound, RucioException, UnsupportedOperation
from rucio.common.types import InternalAccount, InternalScope
from rucio.common.utils import generate_uuid, chunks, get_parsed_throttler_mode
from rucio.common.utils import generate_uuid, chunks
from rucio.core.message import add_message
from rucio.core.monitor import record_counter, record_timer
from rucio.core.rse import get_rse_name, get_rse_vo
Expand Down Expand Up @@ -126,7 +125,7 @@ def queue_requests(requests, session=None):

request_clause = []
rses = {}
preparer_enabled = config_get_bool('conveyor', 'use_preparer', default=False)
preparer_enabled = config_get_bool('conveyor', 'use_preparer', raise_exception=False, default=False)
for req in requests:

if isinstance(req['attributes'], string_types):
Expand Down
9 changes: 4 additions & 5 deletions lib/rucio/core/transfer.py
Expand Up @@ -49,19 +49,19 @@

from dogpile.cache import make_region
from dogpile.cache.api import NoValue
from sqlalchemy import and_, func
from sqlalchemy import and_
from sqlalchemy.exc import IntegrityError
from sqlalchemy.sql.expression import false, null
from sqlalchemy.sql.expression import false

from rucio.common import constants
from rucio.common.config import config_get
from rucio.common.constants import SUPPORTED_PROTOCOLS
from rucio.common.exception import (InvalidRSEExpression, NoDistance,
RequestNotFound, RSEProtocolNotSupported,
RucioException, UnsupportedOperation)
from rucio.common.config import config_get
from rucio.common.rse_attributes import get_rse_attributes
from rucio.common.types import InternalAccount
from rucio.common.utils import construct_surl
from rucio.common.constants import SUPPORTED_PROTOCOLS
from rucio.core import did, message as message_core, request as request_core
from rucio.core.config import get as core_config_get
from rucio.core.monitor import record_counter, record_timer
Expand All @@ -76,7 +76,6 @@
from rucio.rse import rsemanager as rsemgr
from rucio.transfertool.fts3 import FTS3Transfertool


# Extra modules: Only imported if available
EXTRA_MODULES = {'globus_sdk': False}

Expand Down
101 changes: 67 additions & 34 deletions lib/rucio/daemons/conveyor/preparer.py
Expand Up @@ -15,79 +15,112 @@
#
# Authors:
# - Benedikt Ziemons <benedikt.ziemons@cern.ch>, 2020
from time import sleep

import logging
import sys
import threading
from time import time

from sqlalchemy import update

import rucio.db.sqla.util
from rucio.common import exception
from rucio.common.config import config_get
from rucio.common.exception import ConfigNotFound
from rucio.common.utils import get_parsed_throttler_mode
from rucio.common.utils import get_parsed_throttler_mode, chunks
from rucio.core.config import get
from rucio.core.rse import get_rse_transfer_limits
from rucio.core.transfer import __list_transfer_requests_and_source_replicas
from rucio.db.sqla import models
from rucio.db.sqla.constants import RequestState
from rucio.db.sqla.session import read_session
from rucio.db.sqla.session import transactional_session


stopping = False
graceful_stop = threading.Event()


def stop():
stopping = True
"""
Graceful exit.
"""

graceful_stop.set()


def run(once=False):
def run(once=False, sleep_time=60):
"""
Running the preparer daemon either once or by default in a loop until stop is called.
"""
config_loglevel = config_get('common', 'loglevel', raise_exception=False, default='DEBUG').upper()
logging.basicConfig(stream=sys.stdout,
level=getattr(logging, config_loglevel),
format='%(asctime)s\t%(process)d\t%(levelname)s\t%(message)s')

if rucio.db.sqla.util.is_old_db():
raise exception.DatabaseException('Database was not updated, daemon won\'t start')

if once:
runn()
else:
while not stopping:
runn()
sleep(30)
graceful_stop.wait(10)
while not graceful_stop.is_set():
start_time = time()

run_once()

if once:
return

end_time = time()
time_diff = end_time - start_time
if time_diff < sleep_time:
logging.info('Sleeping for a while : %.2f seconds', (sleep_time - time_diff))
graceful_stop.wait(sleep_time - time_diff)


@read_session
def runn(session=None):
@transactional_session
def run_once(chunk_size=50, session=None):
req_sources = __list_transfer_requests_and_source_replicas(request_state=RequestState.PREPARING, minimum_distance=True, session=session)
for req_id, rule_id, scope, name, md5, adler32, bytes, activity, attributes, previous_attempt_id, dest_rse_id, account, source_rse_id, rse, deterministic, rse_type, path, retry_count, src_url, ranking, link_ranking in req_sources:
new_request = _prepare_request({
'attributes': attributes,
'source_rse_id': source_rse_id,
'dest_rse_id': dest_rse_id,
# TODO: fill object; core method?
}, session=session)
# TODO: update request

for chunk in chunks(req_sources, chunk_size):
# from rucio.core.transfer.get_transfer_requests_and_source_replicas
for req_id, rule_id, scope, name, md5, adler32, bytes, activity, attributes, previous_attempt_id, dest_rse_id, account, source_rse_id, rse, deterministic, rse_type, path, retry_count, src_url, ranking, link_ranking in chunk:
new_state = _prepare_request(
session=session,
activity=activity,
source_rse_id=source_rse_id,
dest_rse_id=dest_rse_id,
)
session.query(models.Request).filter_by(id=req_id).update({models.Request.state: new_state}, synchronize_session=False)
session.commit()

@read_session
def _prepare_request(request, session=None):
transfer_limits = get_rse_transfer_limits(session=session)

def _prepare_request(session, activity, source_rse_id, dest_rse_id) -> RequestState:
"""
Takes request attributes to return a new state for the request
based on throttler settings. Always returns QUEUED,
if the throttler mode is not set.
"""
try:
throttler_mode = get('throttler', 'mode', default=None, use_cache=False, session=session)
except ConfigNotFound:
throttler_mode = None

activity_limit = transfer_limits.get(request['attributes']['activity'], {})
all_activities_limit = transfer_limits.get('all_activities', {})
limit_found = False
if throttler_mode:
transfer_limits = get_rse_transfer_limits(session=session)
activity_limit = transfer_limits.get(activity, {})
all_activities_limit = transfer_limits.get('all_activities', {})
direction, all_activities = get_parsed_throttler_mode(throttler_mode)
if direction == 'source':
if all_activities:
if all_activities_limit.get(request['source_rse_id']):
if all_activities_limit.get(source_rse_id):
limit_found = True
else:
if activity_limit.get(request['source_rse_id']):
if activity_limit.get(source_rse_id):
limit_found = True
elif direction == 'destination':
if all_activities:
if all_activities_limit.get(request['dest_rse_id']):
if all_activities_limit.get(dest_rse_id):
limit_found = True
else:
if activity_limit.get(request['dest_rse_id']):
if activity_limit.get(dest_rse_id):
limit_found = True

request['state'] = RequestState.WAITING if limit_found else RequestState.QUEUED
return request
return RequestState.WAITING if limit_found else RequestState.QUEUED
35 changes: 34 additions & 1 deletion lib/rucio/tests/test_preparer.py
Expand Up @@ -18,8 +18,10 @@

import pytest

from rucio.core.rse import get_rse_id
from rucio.core import config
from rucio.core.rse import get_rse_id, set_rse_transfer_limits
from rucio.core.transfer import __list_transfer_requests_and_source_replicas
from rucio.daemons.conveyor import preparer
from rucio.db.sqla import models, session
from rucio.db.sqla.constants import RequestState

Expand All @@ -37,9 +39,40 @@ def mock_request(vo):
db_session.commit()


@pytest.fixture
def dest_throttler(mock_request):
db_session = session.get_session()
config.set('throttler', 'mode', 'DEST_PER_ACT', session=db_session)
set_rse_transfer_limits(mock_request.dest_rse_id, activity=mock_request.activity, max_transfers=1, strategy='fifo', session=db_session)
db_session.commit()
yield
db_session.query(models.RSETransferLimit).filter_by(rse_id=mock_request.dest_rse_id).delete()
config.remove_option('throttler', 'mode', session=db_session)
db_session.commit()


def test_listing_preparing_transfers(mock_request):
req_sources = __list_transfer_requests_and_source_replicas(request_state=RequestState.PREPARING, minimum_distance=True)

assert len(req_sources) == 1
req_id = req_sources[0][0]
assert req_id == mock_request.id


@pytest.mark.usefixtures('dest_throttler')
def test_preparer_setting_request_state_waiting(mock_request):
db_session = session.get_session()
preparer.run_once(session=db_session)

updated_mock_request = db_session.query(models.Request).filter_by(id=mock_request.id).one() # type: models.Request

assert updated_mock_request.state == RequestState.WAITING


def test_preparer_setting_request_state_queued(mock_request):
db_session = session.get_session()
preparer.run_once(session=db_session)

updated_mock_request = db_session.query(models.Request).filter_by(id=mock_request.id).one() # type: models.Request

assert updated_mock_request.state == RequestState.QUEUED

0 comments on commit e9da9ff

Please sign in to comment.