diff --git a/etc/sql/oracle/schema.sql b/etc/sql/oracle/schema.sql index 3a40a3223a..3c7447165c 100644 --- a/etc/sql/oracle/schema.sql +++ b/etc/sql/oracle/schema.sql @@ -930,7 +930,7 @@ PARTITION BY LIST(SCOPE) CONSTRAINT REQUESTS_SCOPE_NN CHECK (SCOPE IS NOT NULL), CONSTRAINT REQUESTS_NAME_NN CHECK (NAME IS NOT NULL), CONSTRAINT REQUESTS_TYPE_CHK CHECK (request_type in ('U', 'D', 'T', 'I', 'O')), - CONSTRAINT REQUESTS_STATE_CHK CHECK (state IN ('Q', 'G', 'S', 'D', 'F', 'L', 'N', 'O', 'A', 'U', 'W', 'M')) + CONSTRAINT REQUESTS_STATE_CHK CHECK (state IN ('Q', 'G', 'S', 'D', 'F', 'L', 'N', 'O', 'A', 'U', 'W', 'M', 'P')) ) PCTFREE 3; diff --git a/lib/rucio/alembicrevision.py b/lib/rucio/alembicrevision.py index b949f50715..c1f4fe9dac 100644 --- a/lib/rucio/alembicrevision.py +++ b/lib/rucio/alembicrevision.py @@ -17,4 +17,4 @@ # - Benedikt Ziemons , 2020 # - Martin Barisits , 2020 -ALEMBIC_REVISION = '8ea9122275b1' # the current alembic head revision +ALEMBIC_REVISION = 'd23453595260' # the current alembic head revision diff --git a/lib/rucio/core/request.py b/lib/rucio/core/request.py index 2ad4cbf449..f8055b187b 100644 --- a/lib/rucio/core/request.py +++ b/lib/rucio/core/request.py @@ -31,6 +31,7 @@ # from sqlalchemy.sql import tuple_ from sqlalchemy.sql.expression import asc, false, true +from rucio.common.config import config_get_bool from rucio.common.exception import RequestNotFound, RucioException, UnsupportedOperation, ConfigNotFound from rucio.common.types import InternalAccount, InternalScope from rucio.common.utils import generate_uuid, chunks, get_parsed_throttler_mode @@ -125,7 +126,7 @@ def queue_requests(requests, session=None): request_clause = [] rses = {} - preparer_enabled = True # FIXME: config variable? + preparer_enabled = config_get_bool('conveyor', 'use_preparer', default=False) for req in requests: if isinstance(req['attributes'], string_types): diff --git a/lib/rucio/core/transfer.py b/lib/rucio/core/transfer.py index ee31f55772..ece1d2689e 100644 --- a/lib/rucio/core/transfer.py +++ b/lib/rucio/core/transfer.py @@ -1317,8 +1317,6 @@ def __list_transfer_requests_and_source_replicas(total_workers=0, worker_number= sub_requests = sub_requests.subquery() - distance_ranking = func.min(models.Distance.ranking) if minimum_distance else models.Distance.ranking - query = session.query(sub_requests.c.id, sub_requests.c.rule_id, sub_requests.c.scope, @@ -1339,7 +1337,7 @@ def __list_transfer_requests_and_source_replicas(total_workers=0, worker_number= sub_requests.c.retry_count, models.Source.url, models.Source.ranking, - distance_ranking) \ + models.Distance.ranking) \ .outerjoin(models.RSEFileAssociation, and_(sub_requests.c.scope == models.RSEFileAssociation.scope, sub_requests.c.name == models.RSEFileAssociation.name, models.RSEFileAssociation.state == ReplicaState.AVAILABLE, @@ -1354,11 +1352,8 @@ def __list_transfer_requests_and_source_replicas(total_workers=0, worker_number= models.RSEFileAssociation.rse_id == models.Distance.src_rse_id)) \ .with_hint(models.Distance, "+ index(distances DISTANCES_PK)", 'oracle') - if minimum_distance: - # FIXME: get minimum distance per src,dest pair - query = query \ - .filter(models.RSEFileAssociation.rse_id != null()) \ - .group_by(sub_requests.c.dest_rse_id, models.RSEFileAssociation.rse_id) + # if minimum_distance: + # query = query.group_by(sub_requests.c.id) if rses: result = [] diff --git a/lib/rucio/db/sqla/migrate_repo/versions/d23453595260_extend_request_state_for_preparer.py b/lib/rucio/db/sqla/migrate_repo/versions/d23453595260_extend_request_state_for_preparer.py new file mode 100644 index 0000000000..9d3cc17e3f --- /dev/null +++ b/lib/rucio/db/sqla/migrate_repo/versions/d23453595260_extend_request_state_for_preparer.py @@ -0,0 +1,72 @@ +# -*- coding: utf-8 -*- +# Copyright 2020 CERN +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Authors: +# - Benedikt Ziemons , 2020 + +""" +Add PREPARING state to Request model. +""" + +from alembic import context +from alembic.op import execute, create_check_constraint, drop_constraint + +# Alembic revision identifiers +revision = 'd23453595260' +down_revision = '8ea9122275b1' + + +def upgrade(): + """ + Upgrade the database to this revision + """ + + schema = context.get_context().version_table_schema + '.' if context.get_context().version_table_schema else '' + create_check = False + + if context.get_context().dialect.name in ['oracle', 'postgresql']: + drop_constraint('REQUESTS_STATE_CHK', 'requests', type_='check') + create_check = True + elif context.get_context().dialect.name == 'mysql' and context.get_context().dialect.server_version_info[0] == 5: + create_check = True + elif context.get_context().dialect.name == 'mysql' and context.get_context().dialect.server_version_info[0] == 8: + execute('ALTER TABLE ' + schema + 'requests DROP CHECK REQUESTS_STATE_CHK') + create_check = True + + if create_check: + create_check_constraint(constraint_name='REQUESTS_STATE_CHK', table_name='requests', + condition="state in ('Q', 'G', 'S', 'D', 'F', 'L', 'N', 'O', 'A', 'U', 'W', 'M', 'P')") + + +def downgrade(): + """ + Downgrade the database to the previous revision + """ + + schema = context.get_context().version_table_schema + '.' if context.get_context().version_table_schema else '' + create_check = False + + if context.get_context().dialect.name in ['oracle', 'postgresql']: + drop_constraint('REQUESTS_STATE_CHK', 'requests', type_='check') + create_check = True + elif context.get_context().dialect.name == 'mysql' and context.get_context().dialect.server_version_info[0] == 5: + create_check = True + elif context.get_context().dialect.name == 'mysql' and context.get_context().dialect.server_version_info[0] == 8: + execute('ALTER TABLE ' + schema + 'requests DROP CHECK REQUESTS_STATE_CHK') + create_check = True + + if create_check: + create_check_constraint(constraint_name='REQUESTS_STATE_CHK', table_name='requests', + condition="state in ('Q', 'G', 'S', 'D', 'F', 'L', 'N', 'O', 'A', 'U', 'W', 'M')") diff --git a/lib/rucio/tests/test_preparer.py b/lib/rucio/tests/test_preparer.py index 5ee2663ed4..ff53ab4e06 100644 --- a/lib/rucio/tests/test_preparer.py +++ b/lib/rucio/tests/test_preparer.py @@ -16,11 +16,30 @@ # Authors: # - Benedikt Ziemons , 2020 +import pytest + +from rucio.core.rse import get_rse_id from rucio.core.transfer import __list_transfer_requests_and_source_replicas +from rucio.db.sqla import models, session from rucio.db.sqla.constants import RequestState -def test_listing_preparing_transfers(): - from pprint import pprint - pprint(__list_transfer_requests_and_source_replicas(request_state=RequestState.PREPARING, minimum_distance=True)) - assert False +@pytest.fixture +def mock_request(vo): + db_session = session.get_session() + dest_rse = 'MOCK' + dest_rse_id = get_rse_id(dest_rse, vo=vo, session=db_session) + request = models.Request(state=RequestState.PREPARING, dest_rse_id=dest_rse_id) + request.save(session=db_session) + db_session.commit() + yield request + request.delete(session=db_session) + db_session.commit() + + +def test_listing_preparing_transfers(mock_request): + req_sources = __list_transfer_requests_and_source_replicas(request_state=RequestState.PREPARING, minimum_distance=True) + + assert len(req_sources) == 1 + req_id = req_sources[0][0] + assert req_id == mock_request.id