diff --git a/lib/rucio/daemons/reaper/reaper.py b/lib/rucio/daemons/reaper/reaper.py index 4d48772e16..a8f3425245 100644 --- a/lib/rucio/daemons/reaper/reaper.py +++ b/lib/rucio/daemons/reaper/reaper.py @@ -38,7 +38,7 @@ from rucio.common.exception import (DatabaseException, RSENotFound, ReplicaUnAvailable, ReplicaNotFound, ServiceUnavailable, RSEAccessDenied, ResourceTemporaryUnavailable, SourceNotFound, - VONotFound) + VONotFound, RSEProtocolNotSupported) from rucio.common.logging import setup_logging from rucio.common.types import InternalAccount from rucio.common.stopwatch import Stopwatch @@ -238,14 +238,18 @@ def delete_from_storage(heartbeat_handler, hb_payload, replicas, prot, rse_info, return deleted_files -def _rse_deletion_hostname(rse: RseData) -> "Optional[str]": +def _rse_deletion_hostname(rse: RseData, scheme: "Optional[str]") -> "Optional[str]": """ Retrieves the hostname of the default deletion protocol """ rse.ensure_loaded(load_info=True) for prot in rse.info['protocols']: - if prot['domains']['wan']['delete'] == 1: - return prot['hostname'] + if scheme: + if prot['scheme'] == scheme and prot['domains']['wan']['delete'] != 0: + return prot['hostname'] + else: + if prot['domains']['wan']['delete'] == 1: + return prot['hostname'] return None @@ -559,9 +563,12 @@ def _run_once(rses_to_process, chunk_size, greedy, scheme, percent = needed_free_space / tot_needed_free_space * 100 logger(logging.DEBUG, 'Working on %s. Percentage of the total space needed %.2f', rse.name, percent) - rse_hostname = _rse_deletion_hostname(rse) + rse_hostname = _rse_deletion_hostname(rse, scheme) if not rse_hostname: - logger(logging.WARNING, 'No default delete protocol for %s', rse.name) + if scheme: + logger(logging.WARNING, 'Protocol %s not supported on %s', scheme, rse.name) + else: + logger(logging.WARNING, 'No default delete protocol for %s', rse.name) REGION.set('pause_deletion_%s' % rse.id, True) continue @@ -641,6 +648,8 @@ def _run_once(rses_to_process, chunk_size, greedy, scheme, delete_replicas(rse_id=rse.id, files=deleted_files) logger(logging.DEBUG, 'delete_replicas successed on %s : %s replicas in %s seconds', rse.name, len(deleted_files), time.time() - del_start) METRICS.counter('deletion.done').inc(len(deleted_files)) + except RSEProtocolNotSupported: + logger(logging.WARNING, 'Protocol %s not supported on %s', scheme, rse.name) except Exception: logger(logging.CRITICAL, 'Exception', exc_info=True) diff --git a/lib/rucio/tests/test_reaper.py b/lib/rucio/tests/test_reaper.py index 4703735d25..3e08362868 100644 --- a/lib/rucio/tests/test_reaper.py +++ b/lib/rucio/tests/test_reaper.py @@ -459,3 +459,34 @@ def test_archive_of_deleted_dids(vo, did_factory, root_account, core_config_mock print(did) deleted_dids.append(did) assert len(deleted_dids) == len(dids) + + +@pytest.mark.parametrize("file_config_mock", [ + # Run test twice: with, and without, temp tables + {"overrides": [('core', 'use_temp_tables', 'True')]}, + {"overrides": [('core', 'use_temp_tables', 'False')]}, +], indirect=True) +@pytest.mark.parametrize("caches_mock", [{"caches_to_mock": [ + 'rucio.daemons.reaper.reaper.REGION' +]}], indirect=True) +def test_run_on_non_existing_scheme(vo, caches_mock, file_config_mock): + """ REAPER (DAEMON): Mock test the reaper daemon with a speficied scheme.""" + [cache_region] = caches_mock + scope = InternalScope('data13_hip', vo=vo) + + nb_files = 250 + file_size = 200 # 2G + rse_name, rse_id, dids = __add_test_rse_and_replicas(vo=vo, scope=scope, rse_name=rse_name_generator(), + names=['lfn' + generate_uuid() for _ in range(nb_files)], file_size=file_size) + + rse_core.set_rse_limits(rse_id=rse_id, name='MinFreeSpace', value=50 * file_size) + assert len(list(replica_core.list_replicas(dids=dids, rse_expression=rse_name))) == nb_files + + # Now put it over threshold and delete + # Nothing should be deleted since the protocol doesn't exists for this RSE + # The reaper will set a flag pause_deletion_ + cache_region.invalidate() + rse_core.set_rse_usage(rse_id=rse_id, source='storage', used=nb_files * file_size, free=1) + reaper(once=True, rses=[], include_rses=rse_name, exclude_rses=None, chunk_size=1000, scheme='https') + assert len(list(replica_core.list_replicas(dids, rse_expression=rse_name))) == 250 + assert cache_region.get('pause_deletion_%s' % rse_id)