Skip to content

Commit

Permalink
Transfers: Skip submitting requests from expired rules rucio#6505
Browse files Browse the repository at this point in the history
When the Cleaner handles an expired rule, it deletes requests in state
QUEUED (and states prior to it) and it tries to cancel (to FTS) requests
in state SUBMITTED.  The Submitter, however, tries to submit QUEUED
requests.  This can create two problems: (1) contention between the two
daemons and (2) inefficiency (submitting a request only to cancel it
shortly after).

This commit adjusts the underlying database query to skip requests whose
directly-associated rule has expired.

There are two somewhat special cases that won’t be fully handled:
(1) requests without any rule associated to them (e.g. multi-hop
transfers, replica recovery) and (2) request which are reused (if there
are two or more rules that result in a transfer of the same file to the
same destination, then the first request to be created by one rule is
reused by the others).
  • Loading branch information
dchristidis committed Mar 28, 2024
1 parent 1b05476 commit 9076672
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 2 deletions.
10 changes: 8 additions & 2 deletions lib/rucio/core/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import aliased
from sqlalchemy.sql.expression import asc, false, func, null, true
from sqlalchemy.sql.functions import coalesce

from rucio.common.config import config_get_bool, config_get_int
from rucio.common.exception import InvalidRSEExpression, RequestNotFound, RucioException, UnsupportedOperation
Expand Down Expand Up @@ -451,13 +452,13 @@ def list_and_mark_transfer_requests_and_source_replicas(

if partition_hash_var is None:
partition_hash_var = 'requests.id'

if request_state is None:
request_state = RequestState.QUEUED

if request_type is None:
request_type = [RequestType.TRANSFER]

now = datetime.datetime.utcnow()

sub_requests = select(
models.Request.id,
models.Request.request_type,
Expand All @@ -483,6 +484,11 @@ def list_and_mark_transfer_requests_and_source_replicas(
).where(
models.Request.state == request_state,
models.Request.request_type.in_(request_type)
).outerjoin(
models.ReplicationRule,
models.Request.rule_id == models.ReplicationRule.id
).where(
coalesce(models.ReplicationRule.expires_at, now) >= now
).join(
models.RSE,
models.RSE.id == models.Request.dest_rse_id
Expand Down
21 changes: 21 additions & 0 deletions tests/test_conveyor_submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,27 @@ def _forge_requests_creation_time(*, session=None):
assert requests_id_in_submission_order == [r['id'] for r in requests]


@pytest.mark.noparallel(groups=[NoParallelGroups.SUBMITTER])
def test_skip_requests_from_expired_rules(rse_factory, did_factory, root_account):
src_rse_name, src_rse_id = rse_factory.make_posix_rse()
dst_rse_name, dst_rse_id = rse_factory.make_posix_rse()
distance_core.add_distance(src_rse_id, dst_rse_id, distance=10)

did = did_factory.upload_test_file(rse_name=src_rse_name)
rule = rule_core.add_rule(dids=[did], account=root_account, copies=1,
rse_expression=dst_rse_name, grouping='ALL',
weight=None, lifetime=-1, locked=False,
subscription_id=None)[0]
request = request_core.get_request_by_did(rse_id=dst_rse_id, **did)
assert request_core.get_request(request_id=request['id'])['state'] == RequestState.QUEUED

submitter(once=True,
rses=[{'id': rse_id} for rse_id in (src_rse_id, dst_rse_id)],
partition_wait_time=None, transfertools=['mock'],
transfertype='single', filter_transfertool=None)
assert request_core.get_request(request_id=request['id'])['state'] == RequestState.QUEUED


@pytest.mark.noparallel(groups=[NoParallelGroups.SUBMITTER])
@pytest.mark.parametrize("core_config_mock", [
# Run test twice: with, and without, temp tables
Expand Down

0 comments on commit 9076672

Please sign in to comment.