From 72c4cbb2d10fd694810d0d899a1897b5115d1130 Mon Sep 17 00:00:00 2001 From: Martin Barisits Date: Thu, 14 Jan 2021 16:51:23 +0100 Subject: [PATCH] Deletion: Reaper1 deprecation; Closes #4213 --- bin/rucio-reaper | 105 ----- doc/source/man/rucio-reaper.rst | 6 - doc/source/man/rucio-reaper2.rst | 6 + lib/rucio/daemons/reaper/reaper.py | 436 ------------------ lib/rucio/daemons/reaper/reaper2.py | 5 +- lib/rucio/tests/test_abacus_account.py | 10 +- .../tests/test_abacus_collection_replica.py | 16 +- lib/rucio/tests/test_abacus_rse.py | 17 +- .../tests/test_api_external_representation.py | 20 +- lib/rucio/tests/test_daemons.py | 6 +- lib/rucio/tests/test_multi_vo.py | 79 +++- lib/rucio/tests/test_reaper.py | 79 ---- lib/rucio/tests/test_reaper2.py | 36 +- 13 files changed, 132 insertions(+), 689 deletions(-) delete mode 100755 bin/rucio-reaper delete mode 100644 doc/source/man/rucio-reaper.rst create mode 100644 doc/source/man/rucio-reaper2.rst delete mode 100644 lib/rucio/daemons/reaper/reaper.py delete mode 100644 lib/rucio/tests/test_reaper.py diff --git a/bin/rucio-reaper b/bin/rucio-reaper deleted file mode 100755 index 3f3d379b96..0000000000 --- a/bin/rucio-reaper +++ /dev/null @@ -1,105 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2012-2019 CERN for the benefit of the ATLAS collaboration. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# Authors: -# - Vincent Garonne , 2012-2018 -# - Wen Guan , 2014 -# - Mario Lassnig , 2019 -# - Patrick Austin , 2020 - -""" -Reaper is a daemon to manage file deletion -""" - -import argparse -import signal - -from rucio.daemons.reaper.reaper import run, stop - - -def get_parser(): - """ - Returns the argparse parser. - """ - parser = argparse.ArgumentParser(description="The Reaper daemon is responsible for replica deletion. It deletes them by checking if there are replicas that are not locked and have a tombstone to indicate that they can be deleted.", epilog=''' -Upload a file and prepare the rules and replicas for deletion by using the judge-cleaner daemon:: - - $ rucio upload --rse MOCK --scope mock --name file filename.txt - $ rucio add-rule mock:file 1 MOCK2 --lifetime 1 - $ rucio-judge-cleaner --run-once - -Check if the replica was created:: - - $ rucio list-file-replica mock:file - +---------+--------+------------+-----------+---------------------------------------------------------+ - | SCOPE | NAME | FILESIZE | ADLER32 | RSE: REPLICA | - |---------+--------+------------+-----------+---------------------------------------------------------| - | mock | file | 1.542 kB | 1268ee71 | MOCK: file://localhost:0/tmp/rucio_rse/mock/15/58/file | - +---------+--------+------------+-----------+---------------------------------------------------------+ - -Run the daemon:: - - $ rucio-reaper --run-once - -Check if the replica exists:: - - $ rucio list-file-replica mock:file - +---------+--------+------------+-----------+---------------------------------------------------------+ - | SCOPE | NAME | FILESIZE | ADLER32 | RSE: REPLICA | - |---------+--------+------------+-----------+---------------------------------------------------------| - +---------+--------+------------+-----------+---------------------------------------------------------+ - -When run in multi-VO mode, by default the Reaper will run on RSEs from all VOs:: - - $ rucio-reaper --run-once - 2020-07-28 15:15:14,110 5461 INFO main: starting processes - 2020-07-28 15:15:14,151 5461 INFO Reaper: This instance will work on VOs: def, abc, xyz, 123 - -By using the ``--vos`` argument only the VO or VOs specified will be affected by the Reaper:: - - $ rucio-reaper --run-once --vos abc xyz - 2020-07-28 15:16:36,022 5474 INFO main: starting processes - 2020-07-28 15:16:36,066 5474 INFO Reaper: This instance will work on VOs: abc, xyz - -Note that attempting the use the ``--vos`` argument when in single-VO mode will have no affect:: - - $ rucio-reaper --run-once --vos abc xyz - 2020-07-28 15:21:33,348 5488 INFO main: starting processes - 2020-07-28 15:21:33,349 5488 WARNING Ignoring argument vos, this is only applicable in a multi-VO setup. - ''') - parser.add_argument("--run-once", action="store_true", default=False, help='One iteration only') - parser.add_argument("--total-workers", action="store", default=1, type=int, help='Total number of workers per process') - parser.add_argument("--threads-per-worker", action="store", default=None, type=int, help='Total number of threads created by each worker') - parser.add_argument("--chunk-size", action="store", default=10, type=int, help='Chunk size') - parser.add_argument("--scheme", action="store", default=None, type=str, help='Force the reaper to use a particular protocol, e.g., mock.') - parser.add_argument('--greedy', action='store_true', default=False, help='Greedy mode') - parser.add_argument('--exclude-rses', action="store", default=None, type=str, help='RSEs expression to exclude RSEs') - parser.add_argument('--include-rses', action="store", default=None, type=str, help='RSEs expression to include RSEs') - parser.add_argument('--rses', nargs='+', type=str, help='List of RSEs') - parser.add_argument('--vos', nargs='+', type=str, help='Optional list of VOs to consider. Only used in multi-VO mode.') - parser.add_argument('--delay-seconds', action="store", default=3600, type=int, help='Delay to retry failed deletion') - return parser - - -if __name__ == '__main__': - signal.signal(signal.SIGTERM, stop) - parser = get_parser() - args = parser.parse_args() - try: - run(total_workers=args.total_workers, chunk_size=args.chunk_size, greedy=args.greedy, - once=args.run_once, scheme=args.scheme, rses=args.rses, threads_per_worker=args.threads_per_worker, - exclude_rses=args.exclude_rses, include_rses=args.include_rses, vos=args.vos, delay_seconds=args.delay_seconds) - except KeyboardInterrupt: - stop() diff --git a/doc/source/man/rucio-reaper.rst b/doc/source/man/rucio-reaper.rst deleted file mode 100644 index 67fe0b8efa..0000000000 --- a/doc/source/man/rucio-reaper.rst +++ /dev/null @@ -1,6 +0,0 @@ -Daemon rucio-reaper -******************* -.. argparse:: - :filename: bin/rucio-reaper - :func: get_parser - :prog: rucio-reaper diff --git a/doc/source/man/rucio-reaper2.rst b/doc/source/man/rucio-reaper2.rst new file mode 100644 index 0000000000..aeeb8c445d --- /dev/null +++ b/doc/source/man/rucio-reaper2.rst @@ -0,0 +1,6 @@ +Daemon rucio-reaper2 +******************** +.. argparse:: + :filename: bin/rucio-reaper2 + :func: get_parser + :prog: rucio-reaper2 diff --git a/lib/rucio/daemons/reaper/reaper.py b/lib/rucio/daemons/reaper/reaper.py deleted file mode 100644 index fd25ff04a8..0000000000 --- a/lib/rucio/daemons/reaper/reaper.py +++ /dev/null @@ -1,436 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2016-2020 CERN -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# Authors: -# - Vincent Garonne , 2016-2018 -# - Martin Barisits , 2016-2020 -# - Thomas Beermann , 2016-2021 -# - Wen Guan , 2016 -# - Hannes Hansen , 2018-2019 -# - Dimitrios Christidis , 2019 -# - James Perry , 2019 -# - Andrew Lister , 2019 -# - Brandon White , 2019 -# - Patrick Austin , 2020 -# - Benedikt Ziemons , 2020 - -''' -Reaper is a daemon to manage file deletion. -''' - -from __future__ import print_function, division - -import datetime -import hashlib -import logging -import math -import os -import random -import socket -import sys -import threading -import time -import traceback - -import rucio.db.sqla.util -from rucio.common.config import config_get_bool -from rucio.common.exception import (SourceNotFound, ServiceUnavailable, RSEAccessDenied, - ReplicaUnAvailable, ResourceTemporaryUnavailable, - DatabaseException, UnsupportedOperation, - ReplicaNotFound, RSENotFound, VONotFound) -from rucio.common.logging import setup_logging -from rucio.common.utils import chunks -from rucio.core import monitor -from rucio.core import rse as rse_core -from rucio.core.credential import get_signed_url -from rucio.core.heartbeat import live, die, sanity_check -from rucio.core.message import add_message -from rucio.core.replica import (list_unlocked_replicas, update_replicas_states, - delete_replicas) -from rucio.core.rse import get_rse_attribute, sort_rses, get_rse_name -from rucio.core.rse_expression_parser import parse_expression -from rucio.core.vo import list_vos -from rucio.db.sqla.constants import ReplicaState -from rucio.rse import rsemanager as rsemgr - -logging.getLogger("requests").setLevel(logging.CRITICAL) - -GRACEFUL_STOP = threading.Event() - - -def __check_rse_usage(rse_id): - """ - Internal method to check RSE usage and limits. - - :param rse_id: the rse id. - - :returns : max_being_deleted_files, needed_free_space, used, free. - """ - max_being_deleted_files, needed_free_space, used, free = None, None, None, None - - rse = get_rse_name(rse_id=rse_id) - # Get RSE limits - limits = rse_core.get_rse_limits(rse_id=rse_id) - if not limits and 'MinFreeSpace' not in limits and 'MaxBeingDeletedFiles' not in limits: - return max_being_deleted_files, needed_free_space, used, free - - min_free_space = limits.get('MinFreeSpace') - max_being_deleted_files = limits.get('MaxBeingDeletedFiles') - - # Check from which sources to get used and total spaces - # Default is storage - source_for_total_space, source_for_used_space = 'storage', 'storage' - values = get_rse_attribute(rse_id=rse_id, key='source_for_total_space') - if values: - source_for_total_space = values[0] - values = get_rse_attribute(rse_id=rse_id, key='source_for_used_space') - if values: - source_for_used_space = values[0] - - logging.debug('RSE: %(rse)s, source_for_total_space: %(source_for_total_space)s, ' - 'source_for_used_space: %(source_for_used_space)s' % locals()) - - # Get total and used space - usage = rse_core.get_rse_usage(rse_id=rse_id, source=source_for_total_space) - if not usage: - return max_being_deleted_files, needed_free_space, used, free - for var in usage: - total, used = var['total'], var['used'] - break - - if source_for_total_space != source_for_used_space: - usage = rse_core.get_rse_usage(rse_id=rse_id, source=source_for_used_space) - if not usage: - return max_being_deleted_files, needed_free_space, None, free - for var in usage: - used = var['used'] - break - - free = total - used - if min_free_space: - needed_free_space = min_free_space - free - - return max_being_deleted_files, needed_free_space, used, free - - -def reaper(rses, worker_number=0, child_number=0, total_children=1, chunk_size=100, - once=False, greedy=False, scheme=None, delay_seconds=0): - """ - Main loop to select and delete files. - - :param rses: List of RSEs the reaper should work against. If empty, it considers all RSEs. - :param worker_number: The worker number. - :param child_number: The child number. - :param total_children: The total number of children created per worker. - :param chunk_size: the size of chunk for deletion. - :param once: If True, only runs one iteration of the main loop. - :param greedy: If True, delete right away replicas with tombstone. - :param scheme: Force the reaper to use a particular protocol, e.g., mock. - """ - logging.info('Starting Reaper: Worker %(worker_number)s, ' - 'child %(child_number)s will work on RSEs: ' % locals() + ', '.join([rse['rse'] for rse in rses])) - - pid = os.getpid() - thread = threading.current_thread() - hostname = socket.gethostname() - executable = ' '.join(sys.argv) - # Generate a hash just for the subset of RSEs - rse_names = [rse['rse'] for rse in rses] - hash_executable = hashlib.sha256((sys.argv[0] + ''.join(rse_names)).encode()).hexdigest() - sanity_check(executable=None, hostname=hostname) - - nothing_to_do = {} - while not GRACEFUL_STOP.is_set(): - try: - # heartbeat - heartbeat = live(executable=executable, hostname=hostname, pid=pid, thread=thread, hash_executable=hash_executable) - checkpoint_time = datetime.datetime.now() - # logging.info('Reaper({0[worker_number]}/{0[child_number]}): Live gives {0[heartbeat]}'.format(locals())) - - max_deleting_rate = 0 - for rse in sort_rses(rses): - try: - if checkpoint_time + datetime.timedelta(minutes=1) < datetime.datetime.now(): - heartbeat = live(executable=executable, hostname=hostname, pid=pid, thread=thread, hash_executable=hash_executable) - # logging.info('Reaper({0[worker_number]}/{0[child_number]}): Live gives {0[heartbeat]}'.format(locals())) - checkpoint_time = datetime.datetime.now() - - if rse['id'] in nothing_to_do and nothing_to_do[rse['id']] > datetime.datetime.now(): - continue - logging.info('Reaper %s-%s: Running on RSE %s %s', worker_number, child_number, - rse['rse'], nothing_to_do.get(rse['id'])) - - rse_info = rsemgr.get_rse_info(rse_id=rse['id']) - rse_protocol = rse_core.get_rse_protocols(rse_id=rse['id']) - - if not rse_protocol['availability_delete']: - logging.info('Reaper %s-%s: RSE %s is not available for deletion', worker_number, child_number, rse_info['rse']) - nothing_to_do[rse['id']] = datetime.datetime.now() + datetime.timedelta(minutes=30) - continue - - # Temporary hack to force gfal for deletion - for protocol in rse_info['protocols']: - if protocol['impl'] == 'rucio.rse.protocols.srm.Default' or protocol['impl'] == 'rucio.rse.protocols.gsiftp.Default': - protocol['impl'] = 'rucio.rse.protocols.gfal.Default' - - needed_free_space, max_being_deleted_files = None, 100 - needed_free_space_per_child = None - if not greedy: - max_being_deleted_files, needed_free_space, used, free = __check_rse_usage(rse_id=rse['id']) - logging.info('Reaper %(worker_number)s-%(child_number)s: Space usage for RSE %(rse)s - max_being_deleted_files: %(max_being_deleted_files)s, needed_free_space: %(needed_free_space)s, used: %(used)s, free: %(free)s' % locals()) - if needed_free_space <= 0: - needed_free_space, needed_free_space_per_child = 0, 0 - logging.info('Reaper %s-%s: free space is above minimum limit for %s', worker_number, child_number, rse['rse']) - else: - if total_children and total_children > 0: - needed_free_space_per_child = needed_free_space / float(total_children) - - start = time.time() - with monitor.record_timer_block('reaper.list_unlocked_replicas'): - replicas = list_unlocked_replicas(rse_id=rse['id'], - bytes=needed_free_space_per_child, - limit=max_being_deleted_files, - worker_number=child_number, - total_workers=total_children, - delay_seconds=delay_seconds) - logging.debug('Reaper %s-%s: list_unlocked_replicas on %s for %s bytes in %s seconds: %s replicas', worker_number, child_number, rse['rse'], needed_free_space_per_child, time.time() - start, len(replicas)) - - if not replicas: - nothing_to_do[rse['id']] = datetime.datetime.now() + datetime.timedelta(minutes=30) - logging.info('Reaper %s-%s: No replicas to delete %s. The next check will occur at %s', - worker_number, child_number, rse['rse'], - nothing_to_do[rse['id']]) - continue - - prot = rsemgr.create_protocol(rse_info, 'delete', scheme=scheme) - for files in chunks(replicas, chunk_size): - logging.debug('Reaper %s-%s: Running on : %s', worker_number, child_number, str(files)) - try: - update_replicas_states(replicas=[dict(list(replica.items()) + [('state', ReplicaState.BEING_DELETED), ('rse_id', rse['id'])]) for replica in files], nowait=True) - for replica in files: - try: - replica['pfn'] = str(list(rsemgr.lfns2pfns(rse_settings=rse_info, - lfns=[{'scope': replica['scope'].external, 'name': replica['name'], 'path': replica['path']}], - operation='delete', scheme=scheme).values())[0]) - except (ReplicaUnAvailable, ReplicaNotFound) as error: - err_msg = 'Failed to get pfn UNAVAILABLE replica %s:%s on %s with error %s' % (replica['scope'], replica['name'], rse['rse'], str(error)) - logging.warning('Reaper %s-%s: %s', worker_number, child_number, err_msg) - replica['pfn'] = None - - monitor.record_counter(counters='reaper.deletion.being_deleted', delta=len(files)) - - try: - deleted_files = [] - prot.connect() - for replica in files: - try: - deletion_dict = {'scope': replica['scope'].external, - 'name': replica['name'], - 'rse': rse_info['rse'], - 'rse_id': rse_info['id'], - 'file-size': replica['bytes'], - 'bytes': replica['bytes'], - 'url': replica['pfn'], - 'protocol': prot.attributes['scheme']} - if replica['scope'].vo != 'def': - deletion_dict['vo'] = replica['scope'].vo - logging.info('Reaper %s-%s: Deletion ATTEMPT of %s:%s as %s on %s', worker_number, child_number, replica['scope'], replica['name'], replica['pfn'], rse['rse']) - start = time.time() - if rse['staging_area'] or rse['rse'].endswith("STAGING"): - logging.warning('Reaper %s-%s: Deletion STAGING of %s:%s as %s on %s, will only delete the catalog and not do physical deletion', - worker_number, child_number, replica['scope'], replica['name'], replica['pfn'], rse['rse']) - else: - if replica['pfn']: - pfn = replica['pfn'] - # sign the URL if necessary - if prot.attributes['scheme'] == 'https' and rse_info['sign_url'] is not None: - pfn = get_signed_url(rse['id'], rse_info['sign_url'], 'delete', pfn) - prot.delete(pfn) - else: - logging.warning('Reaper %s-%s: Deletion UNAVAILABLE of %s:%s as %s on %s', worker_number, child_number, replica['scope'], replica['name'], replica['pfn'], rse['rse']) - monitor.record_timer('daemons.reaper.delete.%s.%s' % (prot.attributes['scheme'], rse['rse']), (time.time() - start) * 1000) - duration = time.time() - start - - deleted_files.append({'scope': replica['scope'], 'name': replica['name']}) - - deletion_dict['duration'] = duration - add_message('deletion-done', deletion_dict) - logging.info('Reaper %s-%s: Deletion SUCCESS of %s:%s as %s on %s in %s seconds', worker_number, child_number, replica['scope'], replica['name'], replica['pfn'], rse['rse'], duration) - except SourceNotFound: - err_msg = 'Deletion NOTFOUND of %s:%s as %s on %s' % (replica['scope'], replica['name'], replica['pfn'], rse['rse']) - logging.warning(err_msg) - deleted_files.append({'scope': replica['scope'], 'name': replica['name']}) - if replica['state'] == ReplicaState.AVAILABLE: - deletion_dict['reason'] = str(err_msg) - add_message('deletion-failed', deletion_dict) - except (ServiceUnavailable, RSEAccessDenied, ResourceTemporaryUnavailable) as error: - logging.warning('Reaper %s-%s: Deletion NOACCESS of %s:%s as %s on %s: %s', worker_number, child_number, replica['scope'], replica['name'], replica['pfn'], rse['rse'], str(error)) - deletion_dict['reason'] = str(error) - add_message('deletion-failed', deletion_dict) - except Exception as error: - logging.critical('Reaper %s-%s: Deletion CRITICAL of %s:%s as %s on %s: %s', worker_number, child_number, replica['scope'], replica['name'], replica['pfn'], rse['rse'], str(traceback.format_exc())) - deletion_dict['reason'] = str(error) - add_message('deletion-failed', deletion_dict) - except: - logging.critical('Reaper %s-%s: Deletion CRITICAL of %s:%s as %s on %s: %s', worker_number, child_number, replica['scope'], replica['name'], replica['pfn'], rse['rse'], str(traceback.format_exc())) - except (ServiceUnavailable, RSEAccessDenied, ResourceTemporaryUnavailable) as error: - for replica in files: - logging.warning('Reaper %s-%s: Deletion NOACCESS of %s:%s as %s on %s: %s', worker_number, child_number, replica['scope'], replica['name'], replica['pfn'], rse['rse'], str(error)) - payload = {'scope': replica['scope'].external, - 'name': replica['name'], - 'rse': rse_info['rse'], - 'rse_id': rse_info['id'], - 'file-size': replica['bytes'], - 'bytes': replica['bytes'], - 'url': replica['pfn'], - 'reason': str(error), - 'protocol': prot.attributes['scheme']} - if replica['scope'].vo != 'def': - deletion_dict['vo'] = replica['scope'].vo - add_message('deletion-failed', payload) - break - finally: - prot.close() - start = time.time() - with monitor.record_timer_block('reaper.delete_replicas'): - delete_replicas(rse_id=rse['id'], files=deleted_files) - logging.debug('Reaper %s-%s: delete_replicas successes %s %s %s', worker_number, child_number, rse['rse'], len(deleted_files), time.time() - start) - monitor.record_counter(counters='reaper.deletion.done', delta=len(deleted_files)) - - except DatabaseException as error: - logging.warning('Reaper %s-%s: DatabaseException %s', worker_number, child_number, str(error)) - except UnsupportedOperation as error: - logging.warning('Reaper %s-%s: UnsupportedOperation %s', worker_number, child_number, str(error)) - except: - logging.critical(traceback.format_exc()) - - except RSENotFound as error: - logging.warning('Reaper %s-%s: RSE not found %s', worker_number, child_number, str(error)) - - except: - logging.critical(traceback.format_exc()) - - if once: - break - - time.sleep(1) - - except DatabaseException as error: - logging.warning('Reaper: %s', str(error)) - except: - logging.critical(traceback.format_exc()) - - die(executable=executable, hostname=hostname, pid=pid, thread=thread, hash_executable=hash_executable) - logging.info('Graceful stop requested') - logging.info('Graceful stop done') - return - - -def stop(signum=None, frame=None): - """ - Graceful exit. - """ - GRACEFUL_STOP.set() - - -def run(total_workers=1, chunk_size=100, threads_per_worker=None, once=False, greedy=False, rses=[], scheme=None, exclude_rses=None, include_rses=None, vos=None, delay_seconds=0): - """ - Starts up the reaper threads. - - :param total_workers: The total number of workers. - :param chunk_size: the size of chunk for deletion. - :param threads_per_worker: Total number of threads created by each worker. - :param once: If True, only runs one iteration of the main loop. - :param greedy: If True, delete right away replicas with tombstone. - :param rses: List of RSEs the reaper should work against. If empty, it considers all RSEs. - :param scheme: Force the reaper to use a particular protocol/scheme, e.g., mock. - :param exclude_rses: RSE expression to exclude RSEs from the Reaper. - :param include_rses: RSE expression to include RSEs. - :param vos: VOs on which to look for RSEs. Only used in multi-VO mode. - If None, we either use all VOs if run from "def", or the current VO otherwise. - """ - setup_logging() - - if rucio.db.sqla.util.is_old_db(): - raise DatabaseException('Database was not updated, daemon won\'t start') - - logging.info('Reaper1 daemon will be deprecated and replaced by reaper2 with Rucio release 1.25 (~March 2021)!') - logging.info('main: starting processes') - - multi_vo = config_get_bool('common', 'multi_vo', raise_exception=False, default=False) - if not multi_vo: - if vos: - logging.warning('Ignoring argument vos, this is only applicable in a multi-VO setup.') - vos = ['def'] - else: - if vos: - invalid = set(vos) - set([v['vo'] for v in list_vos()]) - if invalid: - msg = 'VO{} {} cannot be found'.format('s' if len(invalid) > 1 else '', ', '.join([repr(v) for v in invalid])) - raise VONotFound(msg) - else: - vos = [v['vo'] for v in list_vos()] - logging.info('Reaper: This instance will work on VO%s: %s' % ('s' if len(vos) > 1 else '', ', '.join([v for v in vos]))) - - all_rses = [] - for vo in vos: - all_rses.extend(rse_core.list_rses(filters={'vo': vo})) - - if rses: - invalid = set(rses) - set([rse['rse'] for rse in all_rses]) - if invalid: - msg = 'RSE{} {} cannot be found'.format('s' if len(invalid) > 1 else '', - ', '.join([repr(rse) for rse in invalid])) - raise RSENotFound(msg) - rses = [rse for rse in all_rses if rse['rse'] in rses] - else: - rses = all_rses - - if exclude_rses: - excluded_rses = parse_expression(exclude_rses) - rses = [rse for rse in rses if rse not in excluded_rses] - - if include_rses: - included_rses = parse_expression(include_rses) - rses = [rse for rse in rses if rse in included_rses] - - if not rses: - logging.error('Reaper: No RSEs found. Exiting.') - return - - logging.info('Reaper: This instance will work on RSEs: ' + ', '.join([rse['rse'] for rse in rses])) - - threads = [] - nb_rses_per_worker = int(math.ceil(len(rses) / float(total_workers))) or 1 - rses = random.sample(rses, len(rses)) - for worker in range(total_workers): - for child in range(threads_per_worker or 1): - rses_list = rses[worker * nb_rses_per_worker: worker * nb_rses_per_worker + nb_rses_per_worker] - if not rses_list: - logging.warning('Reaper: Empty RSEs list for worker %(worker)s' % locals()) - continue - kwargs = {'worker_number': worker, - 'child_number': child, - 'total_children': threads_per_worker or 1, - 'once': once, - 'chunk_size': chunk_size, - 'greedy': greedy, - 'rses': rses_list, - 'delay_seconds': delay_seconds, - 'scheme': scheme} - threads.append(threading.Thread(target=reaper, kwargs=kwargs, name='Worker: %s, child: %s' % (worker, child))) - [t.start() for t in threads] - while threads[0].is_alive(): - [t.join(timeout=3.14) for t in threads] diff --git a/lib/rucio/daemons/reaper/reaper2.py b/lib/rucio/daemons/reaper/reaper2.py index 0eed4ebce6..f1218f965d 100644 --- a/lib/rucio/daemons/reaper/reaper2.py +++ b/lib/rucio/daemons/reaper/reaper2.py @@ -91,7 +91,7 @@ def get_rses_to_process(rses, include_rses, exclude_rses, vos): :param exclude_rses: RSE expression to exclude RSEs from the Reaper. :param include_rses: RSE expression to include RSEs. :param vos: VOs on which to look for RSEs. Only used in multi-VO mode. - If None, we either use all VOs if run from "def", + If None, we either use all VOs if run from "def" :returns: A list of RSEs to process """ @@ -113,6 +113,7 @@ def get_rses_to_process(rses, include_rses, exclude_rses, vos): cache_key = 'rses_to_process' if multi_vo: cache_key += '@%s' % '-'.join(vo for vo in vos) + result = REGION.get(cache_key) if result is not NO_VALUE: return result @@ -238,7 +239,7 @@ def get_rses_to_hostname_mapping(): """ Return a dictionaries mapping the RSEs to the hostname of the SE - :returns: Dictionary with RSE_id as key and (hostname, rse_info) as value + :returns: Dictionary with RSE_id as key and (hostname, rse_info) as value """ result = REGION.get('rse_hostname_mapping') diff --git a/lib/rucio/tests/test_abacus_account.py b/lib/rucio/tests/test_abacus_account.py index 86657bf13e..363e16eef5 100644 --- a/lib/rucio/tests/test_abacus_account.py +++ b/lib/rucio/tests/test_abacus_account.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2018-2020 CERN +# Copyright 2018-2021 CERN # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,7 +15,7 @@ # # Authors: # - Hannes Hansen , 2018-2019 -# - Martin Barisits , 2019 +# - Martin Barisits , 2019-2021 # - Cedric Serfon , 2019 # - Andrew Lister , 2019 # - Patrick Austin , 2020 @@ -36,7 +36,7 @@ from rucio.core.rse import get_rse_id from rucio.daemons.abacus import account from rucio.daemons.judge import cleaner -from rucio.daemons.reaper import reaper +from rucio.daemons.reaper import reaper2 from rucio.daemons.undertaker import undertaker from rucio.db.sqla import models from rucio.db.sqla.session import get_session @@ -66,9 +66,9 @@ def tearDownClass(cls): undertaker.run(once=True) cleaner.run(once=True) if cls.vo: - reaper.run(once=True, include_rses='vo=%s&(%s)' % (cls.vo['vo'], cls.rse), greedy=True) + reaper2.run(once=True, include_rses='vo=%s&(%s)' % (cls.vo['vo'], cls.rse), greedy=True) else: - reaper.run(once=True, include_rses=cls.rse, greedy=True) + reaper2.run(once=True, include_rses=cls.rse, greedy=True) def test_abacus_account(self): """ ABACUS (ACCOUNT): Test update of account usage """ diff --git a/lib/rucio/tests/test_abacus_collection_replica.py b/lib/rucio/tests/test_abacus_collection_replica.py index 76a2c5bf90..b95fc97bbb 100644 --- a/lib/rucio/tests/test_abacus_collection_replica.py +++ b/lib/rucio/tests/test_abacus_collection_replica.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2018-2020 CERN +# Copyright 2018-2021 CERN # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -19,7 +19,7 @@ # - Patrick Austin , 2020 # - Eli Chadwick , 2020 # - Benedikt Ziemons , 2020 -# - Martin Barisits , 2020 +# - Martin Barisits , 2020-2021 # - Mario Lassnig , 2020 import os @@ -37,7 +37,7 @@ from rucio.core.rse import get_rse_id, add_rse from rucio.daemons.abacus import collection_replica from rucio.daemons.judge import cleaner -from rucio.daemons.reaper import reaper +from rucio.daemons.reaper import reaper2 from rucio.daemons.undertaker import undertaker from rucio.db.sqla import models, session from rucio.db.sqla.constants import DIDType, ReplicaState @@ -70,9 +70,9 @@ def tearDownClass(cls): undertaker.run(once=True) cleaner.run(once=True) if cls.vo: - reaper.run(once=True, include_rses='vo=%s&(%s)' % (cls.vo['vo'], cls.rse), greedy=True) + reaper2.run(once=True, include_rses='vo=%s&(%s)' % (cls.vo['vo'], cls.rse), greedy=True) else: - reaper.run(once=True, include_rses=cls.rse, greedy=True) + reaper2.run(once=True, include_rses=cls.rse, greedy=True) def test_abacus_collection_replica_cleanup(self): """ ABACUS (COLLECTION REPLICA): Test if the cleanup procedure works correctly. """ @@ -140,11 +140,13 @@ def test_abacus_collection_replica(self): assert str(dataset_replica['state']) == 'UNAVAILABLE' # Delete all files -> collection replica should be deleted + from rucio.daemons.reaper.reaper2 import REGION + REGION.invalidate() cleaner.run(once=True) if self.vo: - reaper.run(once=True, include_rses='vo=%s&(%s)' % (self.vo['vo'], self.rse), greedy=True) + reaper2.run(once=True, include_rses='vo=%s&(%s)' % (self.vo['vo'], self.rse), greedy=True) else: - reaper.run(once=True, include_rses=self.rse, greedy=True) + reaper2.run(once=True, include_rses=self.rse, greedy=True) self.rule_client.add_replication_rule([{'scope': self.scope, 'name': self.dataset}], 1, self.rse, lifetime=-1) collection_replica.run(once=True) dataset_replica = [replica for replica in self.replica_client.list_dataset_replicas(self.scope, self.dataset)] diff --git a/lib/rucio/tests/test_abacus_rse.py b/lib/rucio/tests/test_abacus_rse.py index 34d79293a4..b938e4d449 100644 --- a/lib/rucio/tests/test_abacus_rse.py +++ b/lib/rucio/tests/test_abacus_rse.py @@ -1,5 +1,6 @@ + # -*- coding: utf-8 -*- -# Copyright 2018-2020 CERN +# Copyright 2018-2021 CERN # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,7 +16,7 @@ # # Authors: # - Hannes Hansen , 2018-2019 -# - Martin Barisits , 2019 +# - Martin Barisits , 2019-2021 # - Andrew Lister , 2019 # - Patrick Austin , 2020 # - Eli Chadwick , 2020 @@ -30,7 +31,7 @@ from rucio.core.rse import get_rse_id, get_rse_usage from rucio.daemons.abacus import rse from rucio.daemons.judge import cleaner -from rucio.daemons.reaper import reaper +from rucio.daemons.reaper import reaper2 from rucio.daemons.undertaker import undertaker from rucio.db.sqla import models from rucio.db.sqla.session import get_session @@ -59,9 +60,9 @@ def tearDownClass(cls): undertaker.run(once=True) cleaner.run(once=True) if cls.vo: - reaper.run(once=True, include_rses='vo=%s&(%s)' % (cls.vo['vo'], cls.rse), greedy=True) + reaper2.run(once=True, include_rses='vo=%s&(%s)' % (cls.vo['vo'], cls.rse), greedy=True) else: - reaper.run(once=True, include_rses=cls.rse, greedy=True) + reaper2.run(once=True, include_rses=cls.rse, greedy=True) def test_abacus_rse(self): """ ABACUS (RSE): Test update of RSE usage. """ @@ -83,11 +84,13 @@ def test_abacus_rse(self): assert len(rse_usage_from_unavailable) == 0 # Delete files -> rse usage should decrease + from rucio.daemons.reaper.reaper2 import REGION + REGION.invalidate() cleaner.run(once=True) if self.vo: - reaper.run(once=True, include_rses='vo=%s&(%s)' % (self.vo['vo'], self.rse), greedy=True) + reaper2.run(once=True, include_rses='vo=%s&(%s)' % (self.vo['vo'], self.rse), greedy=True) else: - reaper.run(once=True, include_rses=self.rse, greedy=True) + reaper2.run(once=True, include_rses=self.rse, greedy=True) rse.run(once=True) rse_usage = get_rse_usage(rse_id=self.rse_id)[0] assert rse_usage['used'] == 0 diff --git a/lib/rucio/tests/test_api_external_representation.py b/lib/rucio/tests/test_api_external_representation.py index 47d257614a..5a234648e1 100644 --- a/lib/rucio/tests/test_api_external_representation.py +++ b/lib/rucio/tests/test_api_external_representation.py @@ -1,4 +1,5 @@ -# Copyright 2012-2020 CERN for the benefit of the ATLAS collaboration. +# -*- coding: utf-8 -*- +# Copyright 2020-2021 CERN # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,19 +14,10 @@ # limitations under the License. # # Authors: -# - Thomas Beermann, , 2012 -# - Angelos Molfetas, , 2012 -# - Mario Lassnig, , 2012-2013 -# - Vincent Garonne, , 2012-2015 -# - Joaquin Bogado, , 2015 -# - Cedric Serfon, , 2015, 2017 -# - Hannes Hansen, , 2018-2019 -# - Andrew Lister, , 2019 # - Eli Chadwick , 2020 # - Patrick Austin , 2020 # - Benedikt Ziemons , 2020 -# -# PY3K COMPATIBLE +# - Martin Barisits , 2021 import random import string @@ -53,7 +45,7 @@ from rucio.core.vo import add_vo, vo_exists from rucio.daemons.abacus import rse as abacus_rse from rucio.daemons.judge import cleaner -from rucio.daemons.reaper import reaper +from rucio.daemons.reaper import reaper2 from rucio.db.sqla import constants from rucio.tests.common import rse_name_generator @@ -375,9 +367,9 @@ def test_api_rse(self): # clean up files cleaner.run(once=True) if self.multi_vo: - reaper.run(once=True, include_rses='vo=%s&(%s)' % (self.vo['vo'], rse_mock), greedy=True) + reaper2.run(once=True, include_rses='vo=%s&(%s)' % (self.vo['vo'], rse_mock), greedy=True) else: - reaper.run(once=True, include_rses=rse_mock, greedy=True) + reaper2.run(once=True, include_rses=rse_mock, greedy=True) abacus_rse.run(once=True) out = api_rse.parse_rse_expression('%s|%s' % (self.rse_name, self.rse2_name), **self.vo) diff --git a/lib/rucio/tests/test_daemons.py b/lib/rucio/tests/test_daemons.py index bc29114457..1eb4ae0ae3 100644 --- a/lib/rucio/tests/test_daemons.py +++ b/lib/rucio/tests/test_daemons.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2020 CERN +# Copyright 2020-2021 CERN # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,6 +15,7 @@ # # Authors: # - Benedikt Ziemons , 2020 +# - Martin Barisits , 2021 import sys @@ -33,7 +34,7 @@ from rucio.daemons.hermes import hermes, hermes2 from rucio.daemons.judge import cleaner, evaluator, injector, repairer from rucio.daemons.oauthmanager import oauthmanager -from rucio.daemons.reaper import dark_reaper, light_reaper, reaper, reaper2 +from rucio.daemons.reaper import dark_reaper, light_reaper, reaper2 from rucio.daemons.replicarecoverer import suspicious_replica_recoverer from rucio.daemons.sonar.distribution import distribution_daemon from rucio.daemons.tracer import kronos @@ -76,7 +77,6 @@ oauthmanager, dark_reaper, light_reaper, - reaper, reaper2, suspicious_replica_recoverer, distribution_daemon, diff --git a/lib/rucio/tests/test_multi_vo.py b/lib/rucio/tests/test_multi_vo.py index 93293a84f2..a3c378bf1a 100644 --- a/lib/rucio/tests/test_multi_vo.py +++ b/lib/rucio/tests/test_multi_vo.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2020 CERN +# Copyright 2020-2021 CERN # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -17,8 +17,8 @@ # - Patrick Austin , 2020 # - Eli Chadwick , 2020 # - Benedikt Ziemons , 2020 -# - Mario Lassnig , 2020 -# - Martin Barisits , 2020 +# - Mario Lassnig , 2020 +# - Martin Barisits , 2020-2021 import os import sys @@ -65,7 +65,7 @@ from rucio.core.rule import add_rule from rucio.core.vo import add_vo, vo_exists from rucio.daemons.automatix.automatix import automatix -from rucio.daemons.reaper.reaper import run as run_reaper +from rucio.daemons.reaper.reaper2 import reaper from rucio.db.sqla import models, session as db_session from rucio.tests.common import execute, headers, hdrdict, vohdr, auth, loginhdr from rucio.tests.test_authentication import PRIVATE_KEY, PUBLIC_KEY @@ -1143,11 +1143,70 @@ def test_reaper(self): assert len(list(list_replicas([{'scope': scope_name, 'name': n} for n in names], rse_expression=rse_name, **self.new_vo))) == nb_files # Check we reap all VOs by default - run_reaper(once=True, rses=[rse_name]) - assert len(list(list_replicas([{'scope': scope_name, 'name': n} for n in names], rse_expression=rse_name, **self.vo))) == nb_files - 5 - assert len(list(list_replicas([{'scope': scope_name, 'name': n} for n in names], rse_expression=rse_name, **self.new_vo))) == nb_files - 5 + from rucio.daemons.reaper.reaper2 import REGION + REGION.invalidate() + reaper(once=True, rses=[rse_name], include_rses=rse_names[0], exclude_rses=[]) + # reaper(once=True, rses=[rse_name]) + assert len(list(list_replicas([{'scope': scope_name, 'name': n} for n in names], rse_expression=rse_name, **self.vo))) == 0 + assert len(list(list_replicas([{'scope': scope_name, 'name': n} for n in names], rse_expression=rse_name, **self.new_vo))) == 0 + + def test_reaper_affect_other_vo(self): + """ MULTI VO (DAEMON): Test that reaper runs on the specified VO(s) and does not reap others""" + rse_str = ''.join(choice(ascii_uppercase) for x in range(10)) + rse_name = 'SHR_%s' % rse_str + rse_id_tst = add_rse(rse_name, 'root', **self.vo) + rse_id_new = add_rse(rse_name, 'root', **self.new_vo) + + mock_protocol = {'scheme': 'MOCK', + 'hostname': 'localhost', + 'port': 123, + 'prefix': '/test/reaper', + 'impl': 'rucio.rse.protocols.mock.Default', + 'domains': { + 'lan': {'read': 1, + 'write': 1, + 'delete': 1}, + 'wan': {'read': 1, + 'write': 1, + 'delete': 1}}} + add_protocol(rse=rse_name, data=mock_protocol, issuer='root', **self.vo) + add_protocol(rse=rse_name, data=mock_protocol, issuer='root', **self.new_vo) + + scope_uuid = str(generate_uuid()).lower()[:16] + scope_name = 'shr_%s' % scope_uuid + scope_tst = InternalScope(scope_name, **self.vo) + scope_new = InternalScope(scope_name, **self.new_vo) + add_scope(scope_name, 'root', 'root', **self.vo) + add_scope(scope_name, 'root', 'root', **self.new_vo) + + nb_files = 30 + file_size = 2147483648 # 2G + + names = [] + for i in range(nb_files): + name = 'lfn%s' % generate_uuid() + names.append(name) + add_replica(rse_id=rse_id_tst, scope=scope_tst, name=name, bytes=file_size, account=InternalAccount('root', **self.vo), + adler32=None, md5=None, tombstone=datetime.utcnow()) + add_replica(rse_id=rse_id_new, scope=scope_new, name=name, bytes=file_size, account=InternalAccount('root', **self.new_vo), + adler32=None, md5=None, tombstone=datetime.utcnow()) + + set_rse_usage(rse=rse_name, source='storage', used=nb_files * file_size, free=800, issuer='root', **self.vo) + set_rse_limits(rse=rse_name, name='MinFreeSpace', value=10737418240, issuer='root', **self.vo) + set_rse_limits(rse=rse_name, name='MaxBeingDeletedFiles', value=10, issuer='root', **self.vo) + + set_rse_usage(rse=rse_name, source='storage', used=nb_files * file_size, free=800, issuer='root', **self.new_vo) + set_rse_limits(rse=rse_name, name='MinFreeSpace', value=10737418240, issuer='root', **self.new_vo) + set_rse_limits(rse=rse_name, name='MaxBeingDeletedFiles', value=10, issuer='root', **self.new_vo) + + # Check we start of with the expected number of replicas + assert len(list(list_replicas([{'scope': scope_name, 'name': n} for n in names], rse_expression=rse_name, **self.vo))) == nb_files + assert len(list(list_replicas([{'scope': scope_name, 'name': n} for n in names], rse_expression=rse_name, **self.new_vo))) == nb_files # Check we don't affect a second VO that isn't specified - run_reaper(once=True, rses=[rse_name], vos=['new']) - assert len(list(list_replicas([{'scope': scope_name, 'name': n} for n in names], rse_expression=rse_name, **self.vo))) == nb_files - 5 - assert len(list(list_replicas([{'scope': scope_name, 'name': n} for n in names], rse_expression=rse_name, **self.new_vo))) == nb_files - 10 + from rucio.daemons.reaper.reaper2 import REGION + REGION.invalidate() + # run_reaper(once=True, rses=[rse_name], vos=['new']) + reaper(once=True, rses=[rse_name], vos=['new'], include_rses=rse_names[0], exclude_rses=[]) + assert len(list(list_replicas([{'scope': scope_name, 'name': n} for n in names], rse_expression=rse_name, **self.vo))) == nb_files + assert len(list(list_replicas([{'scope': scope_name, 'name': n} for n in names], rse_expression=rse_name, **self.new_vo))) == 0 diff --git a/lib/rucio/tests/test_reaper.py b/lib/rucio/tests/test_reaper.py deleted file mode 100644 index 2de75ed6cc..0000000000 --- a/lib/rucio/tests/test_reaper.py +++ /dev/null @@ -1,79 +0,0 @@ -# Copyright 2013-2020 CERN for the benefit of the ATLAS collaboration. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# Authors: -# - Vincent Garonne , 2013-2019 -# - Martin Barisits , 2016 -# - Joaquin Bogado , 2018 -# - Andrew Lister , 2019 -# - Mario Lassnig , 2019 -# - Eli Chadwick , 2020 -# - Patrick Austin , 2020 -# - Benedikt Ziemons , 2020 - -from datetime import datetime, timedelta - -from rucio.common.config import config_get, config_get_bool -from rucio.common.types import InternalAccount, InternalScope -from rucio.common.utils import generate_uuid -from rucio.core import replica as replica_core -from rucio.core import rse as rse_core -from rucio.daemons.reaper.reaper import reaper -from rucio.tests.common import rse_name_generator - - -def test_reaper(): - """ REAPER (DAEMON): Test the reaper daemon.""" - if config_get_bool('common', 'multi_vo', raise_exception=False, default=False): - vo = {'vo': config_get('client', 'vo', raise_exception=False, default='tst')} - else: - vo = {} - - rse_name = rse_name_generator() - rse_id = rse_core.add_rse(rse_name, **vo) - - mock_protocol = {'scheme': 'MOCK', - 'hostname': 'localhost', - 'port': 123, - 'prefix': '/test/reaper', - 'impl': 'rucio.rse.protocols.mock.Default', - 'domains': { - 'lan': {'read': 1, - 'write': 1, - 'delete': 1}, - 'wan': {'read': 1, - 'write': 1, - 'delete': 1}}} - rse_core.add_protocol(rse_id=rse_id, parameter=mock_protocol) - - nb_files = 30 - file_size = 2147483648 # 2G - - file_names = [] - for i in range(nb_files): - file_name = 'lfn' + generate_uuid() - file_names.append(file_name) - replica_core.add_replica(rse_id=rse_id, scope=InternalScope('data13_hip', **vo), - name=file_name, bytes=file_size, - tombstone=datetime.utcnow() - timedelta(days=1), - account=InternalAccount('root', **vo), adler32=None, md5=None) - - rse_core.set_rse_usage(rse_id=rse_id, source='storage', used=nb_files * file_size, free=800) - rse_core.set_rse_limits(rse_id=rse_id, name='MinFreeSpace', value=10737418240) - rse_core.set_rse_limits(rse_id=rse_id, name='MaxBeingDeletedFiles', value=10) - - rses = [rse_core.get_rse(rse_id), ] - reaper(once=True, rses=rses) - reaper(once=True, rses=rses) - assert len(list(replica_core.list_replicas(dids=[{'scope': InternalScope('data13_hip', **vo), 'name': n} for n in file_names], rse_expression=rse_name))) == nb_files - 10 diff --git a/lib/rucio/tests/test_reaper2.py b/lib/rucio/tests/test_reaper2.py index de36b78cea..efb0db8d3f 100644 --- a/lib/rucio/tests/test_reaper2.py +++ b/lib/rucio/tests/test_reaper2.py @@ -60,7 +60,7 @@ def test_reaper(): 'write': 1, 'delete': 1}}} - nb_files = 30 + nb_files = 250 file_size = 2147483648 # 2G rse_names = [] @@ -90,30 +90,36 @@ def test_reaper(): all_file_names.append(file_names) rse_core.set_rse_usage(rse_id=rse_id, source='storage', used=nb_files * file_size, free=800) - rse_core.set_rse_limits(rse_id=rse_id, name='MinFreeSpace', value=10737418240) - rse_core.set_rse_limits(rse_id=rse_id, name='MaxBeingDeletedFiles', value=10) + rse_core.set_rse_limits(rse_id=rse_id, name='MinFreeSpace', value=322122547200) + # rse_core.set_rse_limits(rse_id=rse_id, name='MaxBeingDeletedFiles', value=10) if new_vo: rse_core.set_rse_usage(rse_id=rse_id_new, source='storage', used=nb_files * file_size, free=800) - rse_core.set_rse_limits(rse_id=rse_id_new, name='MinFreeSpace', value=10737418240) - rse_core.set_rse_limits(rse_id=rse_id_new, name='MaxBeingDeletedFiles', value=10) + rse_core.set_rse_limits(rse_id=rse_id_new, name='MinFreeSpace', value=322122547200) + # rse_core.set_rse_limits(rse_id=rse_id_new, name='MaxBeingDeletedFiles', value=10) + from rucio.daemons.reaper.reaper2 import REGION + REGION.invalidate() if not vo: + assert len(list(replica_core.list_replicas(dids=[{'scope': InternalScope('data13_hip', **vo), 'name': n} for n in all_file_names[0]], + rse_expression=rse_name))) == nb_files + # Check first if the reaper does not delete anything if no space is needed + rse_core.set_rse_usage(rse_id=rse_id, source='storage', used=nb_files * file_size, free=323000000000) + reaper(once=True, rses=[], include_rses=rse_names[0], exclude_rses=[]) + assert len(list(replica_core.list_replicas(dids=[{'scope': InternalScope('data13_hip', **vo), 'name': n} for n in all_file_names[0]], + rse_expression=rse_name))) == nb_files + # Now put it over threshold and delete + rse_core.set_rse_usage(rse_id=rse_id, source='storage', used=nb_files * file_size, free=800) + from rucio.daemons.reaper.reaper2 import REGION + REGION.invalidate() reaper(once=True, rses=[], include_rses=rse_names[0], exclude_rses=[]) reaper(once=True, rses=[], include_rses=rse_names[0], exclude_rses=[]) assert len(list(replica_core.list_replicas(dids=[{'scope': InternalScope('data13_hip', **vo), 'name': n} for n in all_file_names[0]], - rse_expression=rse_name))) == nb_files - 5 + rse_expression=rse_name))) == 0 else: # Check we reap all VOs by default reaper(once=True, rses=[], include_rses=rse_names[0], exclude_rses=[]) reaper(once=True, rses=[], include_rses=rse_names[0], exclude_rses=[]) assert len(list(replica_core.list_replicas(dids=[{'scope': InternalScope('data13_hip', **vo), 'name': n} for n in all_file_names[0]], - rse_expression=rse_names[0]))) == nb_files - 5 + rse_expression=rse_names[0]))) == 0 assert len(list(replica_core.list_replicas(dids=[{'scope': InternalScope('data13_hip', **new_vo), 'name': n} for n in all_file_names[0]], - rse_expression=rse_names[0]))) == nb_files - 5 - # Check we don't affect a second VO that isn't specified - reaper(once=True, rses=[], include_rses=rse_names[1], exclude_rses=[], vos=['new']) - reaper(once=True, rses=[], include_rses=rse_names[1], exclude_rses=[], vos=['new']) - assert len(list(replica_core.list_replicas(dids=[{'scope': InternalScope('data13_hip', **vo), 'name': n} for n in all_file_names[1]], - rse_expression=rse_names[1]))), nb_files - assert len(list(replica_core.list_replicas(dids=[{'scope': InternalScope('data13_hip', **new_vo), 'name': n} for n in all_file_names[1]], - rse_expression=rse_names[1]))), nb_files - 5 + rse_expression=rse_names[0]))) == 0