Skip to content

Commit

Permalink
Transfers: Prepare database, add simple test; rucio#4056
Browse files Browse the repository at this point in the history
  • Loading branch information
bziemons committed Nov 5, 2020
1 parent a2ee1fb commit a80bbfe
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 15 deletions.
2 changes: 1 addition & 1 deletion etc/sql/oracle/schema.sql
Expand Up @@ -930,7 +930,7 @@ PARTITION BY LIST(SCOPE)
CONSTRAINT REQUESTS_SCOPE_NN CHECK (SCOPE IS NOT NULL),
CONSTRAINT REQUESTS_NAME_NN CHECK (NAME IS NOT NULL),
CONSTRAINT REQUESTS_TYPE_CHK CHECK (request_type in ('U', 'D', 'T', 'I', 'O')),
CONSTRAINT REQUESTS_STATE_CHK CHECK (state IN ('Q', 'G', 'S', 'D', 'F', 'L', 'N', 'O', 'A', 'U', 'W', 'M'))
CONSTRAINT REQUESTS_STATE_CHK CHECK (state IN ('Q', 'G', 'S', 'D', 'F', 'L', 'N', 'O', 'A', 'U', 'W', 'M', 'P'))
) PCTFREE 3;


Expand Down
2 changes: 1 addition & 1 deletion lib/rucio/alembicrevision.py
Expand Up @@ -17,4 +17,4 @@
# - Benedikt Ziemons <benedikt.ziemons@cern.ch>, 2020
# - Martin Barisits <martin.barisits@cern.ch>, 2020

ALEMBIC_REVISION = '8ea9122275b1' # the current alembic head revision
ALEMBIC_REVISION = 'd23453595260' # the current alembic head revision
3 changes: 2 additions & 1 deletion lib/rucio/core/request.py
Expand Up @@ -31,6 +31,7 @@
# from sqlalchemy.sql import tuple_
from sqlalchemy.sql.expression import asc, false, true

from rucio.common.config import config_get_bool
from rucio.common.exception import RequestNotFound, RucioException, UnsupportedOperation, ConfigNotFound
from rucio.common.types import InternalAccount, InternalScope
from rucio.common.utils import generate_uuid, chunks, get_parsed_throttler_mode
Expand Down Expand Up @@ -125,7 +126,7 @@ def queue_requests(requests, session=None):

request_clause = []
rses = {}
preparer_enabled = True # FIXME: config variable?
preparer_enabled = config_get_bool('conveyor', 'use_preparer', default=False)
for req in requests:

if isinstance(req['attributes'], string_types):
Expand Down
11 changes: 3 additions & 8 deletions lib/rucio/core/transfer.py
Expand Up @@ -1317,8 +1317,6 @@ def __list_transfer_requests_and_source_replicas(total_workers=0, worker_number=

sub_requests = sub_requests.subquery()

distance_ranking = func.min(models.Distance.ranking) if minimum_distance else models.Distance.ranking

query = session.query(sub_requests.c.id,
sub_requests.c.rule_id,
sub_requests.c.scope,
Expand All @@ -1339,7 +1337,7 @@ def __list_transfer_requests_and_source_replicas(total_workers=0, worker_number=
sub_requests.c.retry_count,
models.Source.url,
models.Source.ranking,
distance_ranking) \
models.Distance.ranking) \
.outerjoin(models.RSEFileAssociation, and_(sub_requests.c.scope == models.RSEFileAssociation.scope,
sub_requests.c.name == models.RSEFileAssociation.name,
models.RSEFileAssociation.state == ReplicaState.AVAILABLE,
Expand All @@ -1354,11 +1352,8 @@ def __list_transfer_requests_and_source_replicas(total_workers=0, worker_number=
models.RSEFileAssociation.rse_id == models.Distance.src_rse_id)) \
.with_hint(models.Distance, "+ index(distances DISTANCES_PK)", 'oracle')

if minimum_distance:
# FIXME: get minimum distance per src,dest pair
query = query \
.filter(models.RSEFileAssociation.rse_id != null()) \
.group_by(sub_requests.c.dest_rse_id, models.RSEFileAssociation.rse_id)
# if minimum_distance:
# query = query.group_by(sub_requests.c.id)

if rses:
result = []
Expand Down
@@ -0,0 +1,72 @@
# -*- coding: utf-8 -*-
# Copyright 2020 CERN
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Authors:
# - Benedikt Ziemons <benedikt.ziemons@cern.ch>, 2020

"""
Add PREPARING state to Request model.
"""

from alembic import context
from alembic.op import execute, create_check_constraint, drop_constraint

# Alembic revision identifiers
revision = 'd23453595260'
down_revision = '8ea9122275b1'


def upgrade():
"""
Upgrade the database to this revision
"""

schema = context.get_context().version_table_schema + '.' if context.get_context().version_table_schema else ''
create_check = False

if context.get_context().dialect.name in ['oracle', 'postgresql']:
drop_constraint('REQUESTS_STATE_CHK', 'requests', type_='check')
create_check = True
elif context.get_context().dialect.name == 'mysql' and context.get_context().dialect.server_version_info[0] == 5:
create_check = True
elif context.get_context().dialect.name == 'mysql' and context.get_context().dialect.server_version_info[0] == 8:
execute('ALTER TABLE ' + schema + 'requests DROP CHECK REQUESTS_STATE_CHK')
create_check = True

if create_check:
create_check_constraint(constraint_name='REQUESTS_STATE_CHK', table_name='requests',
condition="state in ('Q', 'G', 'S', 'D', 'F', 'L', 'N', 'O', 'A', 'U', 'W', 'M', 'P')")


def downgrade():
"""
Downgrade the database to the previous revision
"""

schema = context.get_context().version_table_schema + '.' if context.get_context().version_table_schema else ''
create_check = False

if context.get_context().dialect.name in ['oracle', 'postgresql']:
drop_constraint('REQUESTS_STATE_CHK', 'requests', type_='check')
create_check = True
elif context.get_context().dialect.name == 'mysql' and context.get_context().dialect.server_version_info[0] == 5:
create_check = True
elif context.get_context().dialect.name == 'mysql' and context.get_context().dialect.server_version_info[0] == 8:
execute('ALTER TABLE ' + schema + 'requests DROP CHECK REQUESTS_STATE_CHK')
create_check = True

if create_check:
create_check_constraint(constraint_name='REQUESTS_STATE_CHK', table_name='requests',
condition="state in ('Q', 'G', 'S', 'D', 'F', 'L', 'N', 'O', 'A', 'U', 'W', 'M')")
27 changes: 23 additions & 4 deletions lib/rucio/tests/test_preparer.py
Expand Up @@ -16,11 +16,30 @@
# Authors:
# - Benedikt Ziemons <benedikt.ziemons@cern.ch>, 2020

import pytest

from rucio.core.rse import get_rse_id
from rucio.core.transfer import __list_transfer_requests_and_source_replicas
from rucio.db.sqla import models, session
from rucio.db.sqla.constants import RequestState


def test_listing_preparing_transfers():
from pprint import pprint
pprint(__list_transfer_requests_and_source_replicas(request_state=RequestState.PREPARING, minimum_distance=True))
assert False
@pytest.fixture
def mock_request(vo):
db_session = session.get_session()
dest_rse = 'MOCK'
dest_rse_id = get_rse_id(dest_rse, vo=vo, session=db_session)
request = models.Request(state=RequestState.PREPARING, dest_rse_id=dest_rse_id)
request.save(session=db_session)
db_session.commit()
yield request
request.delete(session=db_session)
db_session.commit()


def test_listing_preparing_transfers(mock_request):
req_sources = __list_transfer_requests_and_source_replicas(request_state=RequestState.PREPARING, minimum_distance=True)

assert len(req_sources) == 1
req_id = req_sources[0][0]
assert req_id == mock_request.id

0 comments on commit a80bbfe

Please sign in to comment.