From aaa4584d634f42cf07b347129eceb6e88e639df2 Mon Sep 17 00:00:00 2001 From: Christoph Ames Date: Fri, 13 Oct 2023 10:57:20 +0200 Subject: [PATCH] Recovery: Adjust suspicious replica recoverer creation of rules. #6337 --- bin/rucio-replica-recoverer | 4 +- etc/suspicious_replica_recoverer.json | 5 + lib/rucio/core/replica.py | 9 +- lib/rucio/daemons/auditor/__init__.py | 2 +- .../suspicious_replica_recoverer.py | 226 ++++++++++++------ tests/test_replica_recoverer.py | 134 +++++++---- 6 files changed, 258 insertions(+), 122 deletions(-) diff --git a/bin/rucio-replica-recoverer b/bin/rucio-replica-recoverer index cadc95677cf..05fe7d98028 100755 --- a/bin/rucio-replica-recoverer +++ b/bin/rucio-replica-recoverer @@ -228,8 +228,8 @@ Note that attempting the use the ``--vos`` argument when in single-VO mode will $ rucio-replica-recoverer --run-once --vos abc xyz 2020-07-28 15:21:33,349 5488 WARNING Ignoring argument vos, this is only applicable in a multi-VO setup. ''', formatter_class=argparse.RawDescriptionHelpFormatter) # NOQA: E501 - parser.add_argument("--nattempts", action="store", default=10, help='Minimum count of suspicious file replica appearance in bad_replicas table. Default value is 10.') - parser.add_argument("--younger-than", action="store", default=3, help='Consider all file replicas logged in bad_replicas table since speicified number of younger-than days. Default value is 3.') + parser.add_argument("--nattempts", action="store", default=5, help='Minimum count of suspicious file replica appearance in bad_replicas table. Default value is 5.') + parser.add_argument("--younger-than", action="store", default=5, help='Consider all file replicas logged in bad_replicas table since speicified number of younger-than days. Default value is 5.') parser.add_argument('--vos', nargs='+', type=str, help='Optional list of VOs to consider. Only used in multi-VO mode.') parser.add_argument("--run-once", action="store_true", default=False, help='One iteration only.') parser.add_argument("--limit-suspicious-files-on-rse", action="store", default=5, help='Maximum number of suspicious replicas on an RSE before that RSE is considered problematic and the suspicious replicas on that RSE are declared "TEMPORARY_UNAVAILABLE". Default value is 5.') diff --git a/etc/suspicious_replica_recoverer.json b/etc/suspicious_replica_recoverer.json index 3086b8c0d4d..c3127ee0c8c 100644 --- a/etc/suspicious_replica_recoverer.json +++ b/etc/suspicious_replica_recoverer.json @@ -9,4 +9,9 @@ "datatype": ["RAW"], "scope": [] } + { + "action": "dry run", + "datatype": [], + "scope": ["mc.*"] + } ] diff --git a/lib/rucio/core/replica.py b/lib/rucio/core/replica.py index b406514abab..6aad802bc65 100644 --- a/lib/rucio/core/replica.py +++ b/lib/rucio/core/replica.py @@ -3406,7 +3406,7 @@ def get_replicas_state(scope=None, name=None, *, session: "Session"): @read_session -def get_suspicious_files(rse_expression, available_elsewhere, filter_=None, logger=logging.log, younger_than=10, nattempts=0, nattempts_exact=False, *, session: "Session", exclude_states=['B', 'R', 'D'], is_suspicious=False): +def get_suspicious_files(rse_expression, available_elsewhere, filter_=None, logger=logging.log, younger_than=5, nattempts=0, nattempts_exact=False, *, session: "Session", exclude_states=['B', 'R', 'D'], is_suspicious=False): """ Gets a list of replicas from bad_replicas table which are: declared more than times since date, present on the RSE specified by the and do not have a state in list. @@ -3539,7 +3539,12 @@ def get_suspicious_reason(rse_id, scope, name, nattempts=0, logger=logging.log, query = session.query(bad_replicas_alias.scope, bad_replicas_alias.name, bad_replicas_alias.reason, bad_replicas_alias.rse_id)\ .filter(bad_replicas_alias.rse_id == rse_id, bad_replicas_alias.scope == scope, - bad_replicas_alias.name == name) + bad_replicas_alias.name == name, + bad_replicas_alias.state == 'S', + ~exists(select(1).where(and_(bad_replicas_alias.rse_id == rse_id, + bad_replicas_alias.scope == scope, + bad_replicas_alias.name == name, + bad_replicas_alias.state != 'S',)))) count = query.count() query_result = query.group_by(bad_replicas_alias.rse_id, bad_replicas_alias.scope, bad_replicas_alias.name, bad_replicas_alias.reason).having(func.count() > nattempts).all() diff --git a/lib/rucio/daemons/auditor/__init__.py b/lib/rucio/daemons/auditor/__init__.py index fd1b2d2fd70..d60be16492f 100644 --- a/lib/rucio/daemons/auditor/__init__.py +++ b/lib/rucio/daemons/auditor/__init__.py @@ -155,7 +155,7 @@ def process_output(output, sanity_check=True, compress=True): rse = os.path.basename(output[:output.rfind('_')]) rse_id = get_rse_id(rse=rse) usage = get_rse_usage(rse_id=rse_id, source='rucio')[0] - threshold = config.config_get_float('auditor', 'threshold', False, 0.2) + threshold = config.config_get_float('auditor', 'threshold', False, 0.1) # Perform a basic sanity check by comparing the number of entries # with the total number of files on the RSE. If the percentage is diff --git a/lib/rucio/daemons/replicarecoverer/suspicious_replica_recoverer.py b/lib/rucio/daemons/replicarecoverer/suspicious_replica_recoverer.py index f73fa232632..104aa453e94 100755 --- a/lib/rucio/daemons/replicarecoverer/suspicious_replica_recoverer.py +++ b/lib/rucio/daemons/replicarecoverer/suspicious_replica_recoverer.py @@ -56,7 +56,32 @@ DAEMON_NAME = 'suspicious-replica-recoverer' -def declare_suspicious_replicas_bad(once: bool = False, younger_than: int = 3, nattempts: int = 10, vos: Optional[list[str]] = None, limit_suspicious_files_on_rse: int = 5, json_file_name: str = "/opt/rucio/etc/suspicious_replica_recoverer.json", sleep_time: int = 3600, active_mode: bool = False) -> None: +def check_suspicious_policy(policy: dict[str, str], file_metadata_datatype: str, file_metadata_scope: str) -> str: + match_scope = False + match_datatype = False + action = "" + + if not policy.get("scope", []): + match_scope = True + for scope in policy.get("scope", []): + if re.match(scope, file_metadata_scope): + match_scope = True + break + + if not policy.get("datatype", []): + match_datatype = True + for datatype in policy.get("datatype", []): + if re.match(datatype, file_metadata_datatype): + match_datatype = True + break + + if match_scope and match_datatype: + action = policy["action"] + + return action + + +def declare_suspicious_replicas_bad(once: bool = False, younger_than: int = 5, nattempts: int = 5, vos: Optional[list[str]] = None, limit_suspicious_files_on_rse: int = 5, json_file_name: str = "/opt/rucio/etc/suspicious_replica_recoverer.json", sleep_time: int = 3600, active_mode: bool = False) -> None: """ Main loop to check for available replicas which are labeled as suspicious. @@ -282,20 +307,71 @@ def run_once(heartbeat_handler: Any, younger_than: int, nattempts: int, vos: Opt logger(logging.INFO, 'Create rules for replicas with nattempts=1.') - for rse_key in replicas_nattempts_1[vo]: + # Create as many rules as necessary for the replicas to be picked up by the daemon on the next run + # Create rules only for replicas that can be declared bad. + # Replicas from the auditor should be declared bad regardless of suspicious declarations, so no rules necessary. + for rse_key in list(replicas_nattempts_1[vo].keys()): + if not replicas_nattempts_1[vo][rse_key]: + # This is needed for testing purposes. + continue + files_to_be_declared_bad_nattempts_1 = [] dids_nattempts_1 = [] - for replica_values in replicas_nattempts_1[vo][rse_key].values(): - dids = {'scope': replica_values['scope'], 'name': replica_values['name'], 'rse': rse_key} - dids_nattempts_1.append(dids) - + # Get the rse_id by going to one of the suspicious replicas from that RSE and reading it from there + rse_id = list(replicas_nattempts_1[vo][rse_key].values())[0]['rse_id'] + for replica_key in replicas_nattempts_1[vo][rse_key].keys(): + from_auditor = False + file_scope = replicas_nattempts_1[vo][rse_key][replica_key]["scope"] + file_name = replicas_nattempts_1[vo][rse_key][replica_key]["name"] + file_metadata = get_metadata(file_scope, file_name) + replicas_nattempts_1[vo][rse_key][replica_key]["datatype"] = str(file_metadata["datatype"]) + + # Auditor + suspicious_reason = get_suspicious_reason(replicas_nattempts_1[vo][rse_key][replica_key]["rse_id"], file_scope, file_name, nattempts) + for reason in suspicious_reason: + if "auditor" in reason["reason"].lower(): + from_auditor = True + files_to_be_declared_bad_nattempts_1.append(recoverable_replicas[vo][rse_key][replica_key]) + break + + # Bad + if not from_auditor: + if (file_name.startswith("log.")) or (file_name.startswith("user")): + # Don't keep log files or user files + files_to_be_declared_bad_nattempts_1.append(recoverable_replicas[vo][rse_key][replica_key]) + action = "" + else: + # Deal with replicas based on their metadata. + if file_metadata["datatype"] is None: # "None" type has no function "split()" + logger(logging.WARNING, "RSE: %s, replica name %s, surl %s: Replica does not have a data type associated with it. No action will be taken.", + rse_key, replica_key, replicas_nattempts_1[vo][rse_key][replica_key]['surl']) + continue + file_metadata_datatype = str(file_metadata["datatype"]) + file_metadata_scope = str(file_metadata["scope"]) + action = "" + if file_metadata_datatype: + # Some files don't have a datatype. They should be ignored. + for policy in json_data: + action = check_suspicious_policy(policy=policy, file_metadata_datatype=file_metadata_datatype, file_metadata_scope=file_metadata_scope) + if action: + logger(logging.INFO, "The action that will be performed is %s", action) + break + if action: + # Rules will be created for these replicas. + dids = {'scope': file_scope, 'name': file_name, 'rse': rse_key} + dids_nattempts_1.append(dids) if active_mode: - # Create as many rules as necessary for the replicas to be picked up by the daemon on the next run if len(dids_nattempts_1) > 0: - add_rule(dids=dids_nattempts_1, account=InternalAccount('root', vo=vo), copies=nattempts, rse_expression='type=SCRATCHDISK', grouping=None, weight=None, lifetime=24 * 3600, locked=False, subscription_id=None) - + add_rule(dids=dids_nattempts_1, account=InternalAccount('root', vo=vo), copies=nattempts, rse_expression='type=SCRATCHDISK', grouping=None, weight=None, lifetime=5 * 24 * 3600, locked=False, subscription_id=None) logger(logging.INFO, 'Rules have been created for %i replicas on %s.', len(dids_nattempts_1), rse_key) else: - logger(logging.INFO, 'No replicas on %s with nattempts=1.', rse_key) + logger(logging.INFO, 'No rules have been created for replicas on %s.', rse_key) + if len(files_to_be_declared_bad_nattempts_1) > 0: + logger(logging.INFO, 'Ready to declare %s bad replica(s) with nattempts=1 on %s (RSE id: %s).', len(files_to_be_declared_bad_nattempts_1), rse_key, str(rse_id)) + declare_bad_file_replicas(replicas=files_to_be_declared_bad_nattempts_1, reason='Suspicious. Automatic recovery.', issuer=InternalAccount('root', vo=vo), session=None) + else: + logger(logging.INFO, 'No suspicious replica(s) with nattempts=1 on %s (RSE id: %s) have been declared bad.', rse_key, str(rse_id)) + else: + logger(logging.INFO, 'No replicas on %s with nattempts=1.', rse_key) logger(logging.INFO, 'Begin check for problematic RSEs.') time_start_check_probl = time.time() @@ -335,6 +411,7 @@ def run_once(heartbeat_handler: Any, younger_than: int, nattempts: int, vos: Opt for rse_key in list(recoverable_replicas[vo].keys()): files_to_be_declared_bad = [] files_to_be_ignored = [] + files_dry_run_monitoring = [] # Remove RSEs from dictionary that don't have any suspicious replicas if len(recoverable_replicas[vo][rse_key]) == 0: del recoverable_replicas[vo][rse_key] @@ -342,79 +419,76 @@ def run_once(heartbeat_handler: Any, younger_than: int, nattempts: int, vos: Opt # Get the rse_id by going to one of the suspicious replicas from that RSE and reading it from there rse_id = list(recoverable_replicas[vo][rse_key].values())[0]['rse_id'] for replica_key in list(recoverable_replicas[vo][rse_key].keys()): + from_auditor = False file_scope = recoverable_replicas[vo][rse_key][replica_key]["scope"] file_name = recoverable_replicas[vo][rse_key][replica_key]["name"] file_metadata = get_metadata(file_scope, file_name) recoverable_replicas[vo][rse_key][replica_key]["datatype"] = str(file_metadata["datatype"]) - if recoverable_replicas[vo][rse_key][replica_key]['available_elsewhere'] is True: - # Replicas with other copies on at least one other RSE can safely be labeled as bad - files_to_be_declared_bad.append(recoverable_replicas[vo][rse_key][replica_key]) - # Remove replica from dictionary - del recoverable_replicas[vo][rse_key][replica_key] - elif recoverable_replicas[vo][rse_key][replica_key]['available_elsewhere'] is False: - if (file_name.startswith("log.")) or (file_name.startswith("user")): - # Don't keep log files or user files - files_to_be_declared_bad.append(recoverable_replicas[vo][rse_key][replica_key]) - del recoverable_replicas[vo][rse_key][replica_key] - else: - # Deal with replicas based on their metadata. - if file_metadata["datatype"] is None: # "None" type has no function "split()" - files_to_be_ignored.append(recoverable_replicas[vo][rse_key][replica_key]) - logger(logging.WARNING, "RSE: %s, replica name %s, surl %s: Replica does not have a data type associated with it. No action will be taken.", - rse_key, replica_key, recoverable_replicas[vo][rse_key][replica_key]['surl']) - continue - file_metadata_datatype = str(file_metadata["datatype"]) - file_metadata_scope = str(file_metadata["scope"]) - action = "" - if file_metadata_datatype: - # Some files don't have a datatype. They should be ignored. - for policy in json_data: - match_scope = False - match_datatype = False - - if not policy.get("scope", []): - match_scope = True - for scope in policy.get("scope", []): - if re.match(scope, file_metadata_scope): - match_scope = True - break + suspicious_reason = get_suspicious_reason(recoverable_replicas[vo][rse_key][replica_key]["rse_id"], + file_scope, + file_name, + nattempts) + for reason in suspicious_reason: + if "auditor" in reason["reason"].lower(): + auditor += 1 + files_to_be_declared_bad.append(recoverable_replicas[vo][rse_key][replica_key]) + from_auditor = True + break - if not policy.get("datatype", []): - match_datatype = True - for datatype in policy.get("datatype", []): - if re.match(datatype, file_metadata_datatype): - match_datatype = True + if not from_auditor: + if recoverable_replicas[vo][rse_key][replica_key]['available_elsewhere'] is True: + # Replicas with other copies on at least one other RSE can safely be labeled as bad + files_to_be_declared_bad.append(recoverable_replicas[vo][rse_key][replica_key]) + # Remove replica from dictionary + del recoverable_replicas[vo][rse_key][replica_key] + elif recoverable_replicas[vo][rse_key][replica_key]['available_elsewhere'] is False: + if (file_name.startswith("log.")) or (file_name.startswith("user")): + # Don't keep log files or user files + files_to_be_declared_bad.append(recoverable_replicas[vo][rse_key][replica_key]) + del recoverable_replicas[vo][rse_key][replica_key] + else: + # Deal with replicas based on their metadata. + if file_metadata["datatype"] is None: # "None" type has no function "split()" + files_to_be_ignored.append(recoverable_replicas[vo][rse_key][replica_key]) + logger(logging.WARNING, "RSE: %s, replica name %s, surl %s: Replica does not have a data type associated with it. No action will be taken.", + rse_key, replica_key, recoverable_replicas[vo][rse_key][replica_key]['surl']) + continue + + file_metadata_datatype = str(file_metadata["datatype"]) + file_metadata_scope = str(file_metadata["scope"]) + action = "" + if file_metadata_datatype: + # Some files don't have a datatype. They should be ignored. + for policy in json_data: + action = check_suspicious_policy(policy=policy, file_metadata_datatype=file_metadata_datatype, file_metadata_scope=file_metadata_scope) + if action: + logger(logging.INFO, "The action that will be performed is %s", action) break - if match_scope and match_datatype: - action = policy["action"] - logger(logging.INFO, "The action that will be performed is %s", action) - break - - if not action: - logger(logging.WARNING, "No recognised actions (ignore/declare bad) found in policy file (etc/suspicious_replica_recoverer.json). Replica will be ignored by default.") - - if action: - if action == "ignore": + if not action: + logger(logging.WARNING, "No recognised actions (ignore/declare bad) found in policy file (etc/suspicious_replica_recoverer.json). Replica will be ignored by default.") + + if action: + if action == "dry run": + # Monitoring purposes: Will look like a file has been declared bad, even though no + # actions will be taken. + files_dry_run_monitoring.append(recoverable_replicas[vo][rse_key][replica_key]) + elif action == "ignore": + files_to_be_ignored.append(recoverable_replicas[vo][rse_key][replica_key]) + elif action == "declare bad": + suspicious_reason = get_suspicious_reason(recoverable_replicas[vo][rse_key][replica_key]["rse_id"], + file_scope, + file_name, + nattempts) + for reason in suspicious_reason: + if "checksum" in reason["reason"].lower(): + checksum += 1 + files_to_be_declared_bad.append(recoverable_replicas[vo][rse_key][replica_key]) + break + else: + # If no policy has been set, default to ignoring the file (no action taken). files_to_be_ignored.append(recoverable_replicas[vo][rse_key][replica_key]) - elif action == "declare bad": - suspicious_reason = get_suspicious_reason(recoverable_replicas[vo][rse_key][replica_key]["rse_id"], - file_scope, - file_name, - nattempts) - for reason in suspicious_reason: - if "auditor" in reason["reason"].lower(): - auditor += 1 - files_to_be_declared_bad.append(recoverable_replicas[vo][rse_key][replica_key]) - break - elif "checksum" in reason["reason"].lower(): - checksum += 1 - files_to_be_declared_bad.append(recoverable_replicas[vo][rse_key][replica_key]) - break - else: - # If no policy has been set, default to ignoring the file (no action taken). - files_to_be_ignored.append(recoverable_replicas[vo][rse_key][replica_key]) logger(logging.INFO, '(%s) Remaining replicas (pfns) that will be ignored:', rse_key) for i in files_to_be_ignored: @@ -422,6 +496,8 @@ def run_once(heartbeat_handler: Any, younger_than: int, nattempts: int, vos: Opt logger(logging.INFO, '(%s) Remaining replica (pfns) that will be declared BAD:', rse_key) for i in files_to_be_declared_bad: logger(logging.INFO, 'Declare bad: RSE: %s Scope: %s Name: %s Datatype: %s PFN: %s', rse_key, i["scope"], i["name"], i["datatype"], i["surl"]) + for i in files_dry_run_monitoring: + logger(logging.INFO, 'Declare bad (dry run): RSE: %s Scope: %s Name: %s Datatype: %s PFN: %s', rse_key, i["scope"], i["name"], i["datatype"], i["surl"]) if files_to_be_declared_bad: logger(logging.INFO, 'Ready to declare %s bad replica(s) on %s (RSE id: %s).', len(files_to_be_declared_bad), rse_key, str(rse_id)) @@ -446,7 +522,7 @@ def run_once(heartbeat_handler: Any, younger_than: int, nattempts: int, vos: Opt return must_sleep -def run(once: bool = False, younger_than: int = 3, nattempts: int = 10, vos: list[str] = None, limit_suspicious_files_on_rse: int = 5, json_file_name: str = "/opt/rucio/etc/suspicious_replica_recoverer.json", sleep_time: int = 3600, active_mode: bool = False) -> None: +def run(once: bool = False, younger_than: int = 5, nattempts: int = 5, vos: list[str] = None, limit_suspicious_files_on_rse: int = 5, json_file_name: str = "/opt/rucio/etc/suspicious_replica_recoverer.json", sleep_time: int = 3600, active_mode: bool = False) -> None: """ Starts up the Suspicious-Replica-Recoverer threads. """ diff --git a/tests/test_replica_recoverer.py b/tests/test_replica_recoverer.py index f25a7bdf505..528c0d542f0 100644 --- a/tests/test_replica_recoverer.py +++ b/tests/test_replica_recoverer.py @@ -55,12 +55,14 @@ def setup_obj(self, vo, rse_factory, replica_client, mock_scope, file_factory, s self.tmp_file8 = file_factory.file_generator() self.tmp_file9 = file_factory.file_generator() self.tmp_file10 = file_factory.file_generator() - self.tmp_file11 = file_factory.file_generator() + self.tmp_file11 = file_factory.file_generator() # tmp_file11 shouldn't be declare as bad, as it doesn't have a data type. + self.tmp_file12 = file_factory.file_generator() # tmp_file12 is used to test the creation of rules by the daemon. + self.tmp_file13 = file_factory.file_generator() self.listdids_mock = [{'scope': mock_scope, 'name': f.name, 'type': DIDType.FILE} - for f in [self.tmp_file1, self.tmp_file2, self.tmp_file3, self.tmp_file4, self.tmp_file5, self.tmp_file6]] + for f in [self.tmp_file1, self.tmp_file2, self.tmp_file3, self.tmp_file4, self.tmp_file5, self.tmp_file6, self.tmp_file12]] self.listdids_declarebad = [{'scope': self.scope_declarebad, 'name': f.name, 'type': DIDType.FILE} - for f in [self.tmp_file7, self.tmp_file9, self.tmp_file11]] + for f in [self.tmp_file7, self.tmp_file9, self.tmp_file11, self.tmp_file13]] self.listdids_nopolicy = [{'scope': self.scope_nopolicy, 'name': f.name, 'type': DIDType.FILE} for f in [self.tmp_file8]] self.listdids_ignore = [{'scope': self.scope_ignore, 'name': f.name, 'type': DIDType.FILE} @@ -68,14 +70,14 @@ def setup_obj(self, vo, rse_factory, replica_client, mock_scope, file_factory, s for rse in [self.rse4suspicious, self.rse4recovery]: # Upload files with scope "mock_scope" - cmd = 'rucio -v upload --rse {0} --scope {1} {2} {3} {4} {5} {6} {7}'.format(rse, mock_scope.external, self.tmp_file1, self.tmp_file2, self.tmp_file3, self.tmp_file4, self.tmp_file5, self.tmp_file6) + cmd = 'rucio -v upload --rse {0} --scope {1} {2} {3} {4} {5} {6} {7} {8}'.format(rse, mock_scope.external, self.tmp_file1, self.tmp_file2, self.tmp_file3, self.tmp_file4, self.tmp_file5, self.tmp_file6, self.tmp_file12) exitcode, out, err = execute(cmd) print("mock_scope:", exitcode, out, err) # checking if Rucio upload went OK assert exitcode == 0 # Upload files with scope "scope_declarebad" - cmd = 'rucio -v upload --rse {0} --scope {1} {2} {3} {4}'.format(rse, self.scope_declarebad.external, self.tmp_file7, self.tmp_file9, self.tmp_file11) + cmd = 'rucio -v upload --rse {0} --scope {1} {2} {3} {4} {5}'.format(rse, self.scope_declarebad.external, self.tmp_file7, self.tmp_file9, self.tmp_file11, self.tmp_file13) exitcode, out, err = execute(cmd) print("scope_declarebad:", exitcode, out, err) # checking if Rucio upload went OK @@ -109,7 +111,8 @@ def setup_obj(self, vo, rse_factory, replica_client, mock_scope, file_factory, s set_metadata(self.scope_nopolicy, self.tmp_file8.name, 'datatype', 'testtypenopolicy') set_metadata(self.scope_declarebad, self.tmp_file9.name, 'datatype', 'testtypeignore') set_metadata(self.scope_ignore, self.tmp_file10.name, 'datatype', 'testtypedeclarebad') - # tmp_file11 doesn't have a datatype. + set_metadata(self.scope_declarebad, self.tmp_file13.name, 'datatype', 'testtypedryrun') + # tmp_file1, 2, 11 and 12 don't have a datatypes. # Allow for the RSEs to be affected by the suspicious file recovery daemon add_rse_attribute(self.rse4suspicious_id, "enable_suspicious_file_recovery", True) @@ -127,15 +130,17 @@ def setup_obj(self, vo, rse_factory, replica_client, mock_scope, file_factory, s remove(self.tmp_file9) remove(self.tmp_file10) remove(self.tmp_file11) + remove(self.tmp_file12) + remove(self.tmp_file13) # Reset the cache to include the new RSEs rse_expression_parser.REGION.invalidate() # Gather replica info - replicalist_mock = list(list_replicas(dids=self.listdids_mock)) - replicalist_declarebad = list(list_replicas(dids=self.listdids_declarebad)) - replicalist_nopolicy = list(list_replicas(dids=self.listdids_nopolicy)) - replicalist_ignore = list(list_replicas(dids=self.listdids_ignore)) + replicalist_scope_mock = list(list_replicas(dids=self.listdids_mock)) + replicalist_scope_declarebad = list(list_replicas(dids=self.listdids_declarebad)) + replicalist_scope_nopolicy = list(list_replicas(dids=self.listdids_nopolicy)) + replicalist_scope_ignore = list(list_replicas(dids=self.listdids_ignore)) # Changing the replica statuses as follows: # ---------------------------------------------------------------------------------------------------------------------------------------------------- @@ -152,16 +157,26 @@ def setup_obj(self, vo, rse_factory, replica_client, mock_scope, file_factory, s # tmp_file9 unavailable suspicious (available) scope_declarebad testtypeignore # tmp_file10 unavailable suspicious (available) scope_ignore testtypedeclarebad # tmp_file11 unavailable suspicious (available) scope_declarebad + # tmp_file12 unavailable suspicious (available) mock_scope + # tmp_file13 unavailable suspicious (available) scope_declarebad testtypedryrun # ---------------------------------------------------------------------------------------------------------------------------------------------------- - for replica in replicalist_mock: + for replica in replicalist_scope_mock: suspicious_pfns = replica['rses'][self.rse4suspicious_id] - # Declare each file as suspicious multiple times - for i in range(3): + # Declare each file as suspicious multiple times, apart from tmp_file12, which + # should only be declared suspicious once. tmp_file12 is used to test the + # creation of rules by the daemon. + if replica['name'] == self.tmp_file12.name: print("Declaring suspicious file replica: " + suspicious_pfns[0]) # The reason must contain the word "checksum", so that the replica can be declared bad. replica_client.declare_suspicious_file_replicas([suspicious_pfns[0], ], 'checksum') sleep(1) + else: + for i in range(3): + print("Declaring suspicious file replica: " + suspicious_pfns[0]) + # The reason must contain the word "checksum", so that the replica can be declared bad. + replica_client.declare_suspicious_file_replicas([suspicious_pfns[0], ], 'checksum') + sleep(1) if replica['name'] == self.tmp_file2.name: print("Declaring bad file replica: " + suspicious_pfns[0]) replica_client.declare_bad_file_replicas([suspicious_pfns[0], ], 'checksum') @@ -177,8 +192,11 @@ def setup_obj(self, vo, rse_factory, replica_client, mock_scope, file_factory, s if replica['name'] == self.tmp_file6.name: print("Updating replica state as unavailable: " + replica['rses'][self.rse4recovery_id][0]) update_replica_state(self.rse4recovery_id, mock_scope, self.tmp_file6.name, ReplicaState.UNAVAILABLE) + if replica['name'] == self.tmp_file12.name: + print("Updating replica state as unavailable: " + replica['rses'][self.rse4recovery_id][0]) + update_replica_state(self.rse4recovery_id, mock_scope, self.tmp_file12.name, ReplicaState.UNAVAILABLE) - for replica in replicalist_declarebad: + for replica in replicalist_scope_declarebad: suspicious_pfns = replica['rses'][self.rse4suspicious_id] # Declare each file as suspicious multiple times for i in range(3): @@ -195,8 +213,11 @@ def setup_obj(self, vo, rse_factory, replica_client, mock_scope, file_factory, s if replica['name'] == self.tmp_file11.name: print("Updating replica state as unavailable: " + replica['rses'][self.rse4recovery_id][0]) update_replica_state(self.rse4recovery_id, self.scope_declarebad, self.tmp_file11.name, ReplicaState.UNAVAILABLE) + if replica['name'] == self.tmp_file13.name: + print("Updating replica state as unavailable: " + replica['rses'][self.rse4recovery_id][0]) + update_replica_state(self.rse4recovery_id, self.scope_declarebad, self.tmp_file13.name, ReplicaState.UNAVAILABLE) - for replica in replicalist_nopolicy: + for replica in replicalist_scope_nopolicy: suspicious_pfns = replica['rses'][self.rse4suspicious_id] # Declare each file as suspicious multiple times for i in range(3): @@ -208,7 +229,7 @@ def setup_obj(self, vo, rse_factory, replica_client, mock_scope, file_factory, s print("Updating replica state as unavailable: " + replica['rses'][self.rse4recovery_id][0]) update_replica_state(self.rse4recovery_id, self.scope_nopolicy, self.tmp_file8.name, ReplicaState.UNAVAILABLE) - for replica in replicalist_ignore: + for replica in replicalist_scope_ignore: suspicious_pfns = replica['rses'][self.rse4suspicious_id] # Declare each file as suspicious multiple times for i in range(3): @@ -221,13 +242,13 @@ def setup_obj(self, vo, rse_factory, replica_client, mock_scope, file_factory, s update_replica_state(self.rse4recovery_id, self.scope_ignore, self.tmp_file10.name, ReplicaState.UNAVAILABLE) # Gather replica info after setting initial replica statuses - replicalist_mock = list(list_replicas(dids=self.listdids_mock)) - replicalist_declarebad = list(list_replicas(dids=self.listdids_declarebad)) - replicalist_nopolicy = list(list_replicas(dids=self.listdids_nopolicy)) - replicalist_ignore = list(list_replicas(dids=self.listdids_ignore)) + replicalist_scope_mock = list(list_replicas(dids=self.listdids_mock)) + replicalist_scope_declarebad = list(list_replicas(dids=self.listdids_declarebad)) + replicalist_scope_nopolicy = list(list_replicas(dids=self.listdids_nopolicy)) + replicalist_scope_ignore = list(list_replicas(dids=self.listdids_ignore)) # Checking if the status changes were effective - for replica in replicalist_mock: + for replica in replicalist_scope_mock: if replica['name'] == self.tmp_file1.name: assert replica['states'][self.rse4suspicious_id] == 'AVAILABLE' assert replica['states'][self.rse4recovery_id] == 'AVAILABLE' @@ -246,8 +267,11 @@ def setup_obj(self, vo, rse_factory, replica_client, mock_scope, file_factory, s if replica['name'] == self.tmp_file6.name: assert replica['states'][self.rse4suspicious_id] == 'AVAILABLE' assert (self.rse4recovery_id in replica['states']) is False + if replica['name'] == self.tmp_file12.name: + assert replica['states'][self.rse4suspicious_id] == 'AVAILABLE' + assert (self.rse4recovery_id in replica['states']) is False - for replica in replicalist_declarebad: + for replica in replicalist_scope_declarebad: if replica['name'] == self.tmp_file7.name: assert replica['states'][self.rse4suspicious_id] == 'AVAILABLE' assert (self.rse4recovery_id in replica['states']) is False @@ -255,16 +279,19 @@ def setup_obj(self, vo, rse_factory, replica_client, mock_scope, file_factory, s assert replica['states'][self.rse4suspicious_id] == 'AVAILABLE' assert (self.rse4recovery_id in replica['states']) is False if replica['name'] == self.tmp_file11.name: - # tmp_file11 should be ignored, as it doesn't have a datatype + # tmp_file11 should be ignored, as it doesn't have a datatype + assert replica['states'][self.rse4suspicious_id] == 'AVAILABLE' + assert (self.rse4recovery_id in replica['states']) is False + if replica['name'] == self.tmp_file13.name: assert replica['states'][self.rse4suspicious_id] == 'AVAILABLE' assert (self.rse4recovery_id in replica['states']) is False - for replica in replicalist_nopolicy: + for replica in replicalist_scope_nopolicy: if replica['name'] == self.tmp_file8.name: assert replica['states'][self.rse4suspicious_id] == 'AVAILABLE' assert (self.rse4recovery_id in replica['states']) is False - for replica in replicalist_ignore: + for replica in replicalist_scope_ignore: if replica['name'] == self.tmp_file10.name: assert replica['states'][self.rse4suspicious_id] == 'AVAILABLE' assert (self.rse4recovery_id in replica['states']) is False @@ -285,6 +312,8 @@ def setup_obj(self, vo, rse_factory, replica_client, mock_scope, file_factory, s assert (self.tmp_file9.name, self.rse4suspicious_id, BadFilesStatus.BAD) not in bad_checklist assert (self.tmp_file10.name, self.rse4suspicious_id, BadFilesStatus.BAD) not in bad_checklist assert (self.tmp_file11.name, self.rse4suspicious_id, BadFilesStatus.BAD) not in bad_checklist + assert (self.tmp_file12.name, self.rse4suspicious_id, BadFilesStatus.BAD) not in bad_checklist + assert (self.tmp_file13.name, self.rse4suspicious_id, BadFilesStatus.BAD) not in bad_checklist bad_replicas_list = list_bad_replicas_status(rse_id=self.rse4recovery_id, younger_than=self.from_date, vo=vo) bad_checklist = [(badf['name'], badf['rse_id'], badf['state']) for badf in bad_replicas_list] @@ -300,6 +329,8 @@ def setup_obj(self, vo, rse_factory, replica_client, mock_scope, file_factory, s assert (self.tmp_file9.name, self.rse4recovery_id, BadFilesStatus.BAD) not in bad_checklist assert (self.tmp_file10.name, self.rse4recovery_id, BadFilesStatus.BAD) not in bad_checklist assert (self.tmp_file11.name, self.rse4recovery_id, BadFilesStatus.BAD) not in bad_checklist + assert (self.tmp_file12.name, self.rse4recovery_id, BadFilesStatus.BAD) not in bad_checklist + assert (self.tmp_file13.name, self.rse4recovery_id, BadFilesStatus.BAD) not in bad_checklist # Purposefully not checking for the 'SUSPICIOUS' status on rse4suspicious. # The only existing function (to date) gathering info about 'SUSPICIOUS' replicas @@ -318,10 +349,11 @@ def setup_obj(self, vo, rse_factory, replica_client, mock_scope, file_factory, s # "datatype": [] and "scope": [] are wildcards; they stand for every datatype or scope. json_testentry1 = {"action": "ignore", "datatype": ["testtypedeclarebad"], "scope": [str(self.scope_ignore)]} json_testentry2 = {"action": "declare bad", "datatype": ["testtypeignore"], "scope": [str(self.scope_declarebad)]} - json_testentry3 = {"action": "declare bad", "datatype": ["testtypedeclarebad"], "scope": []} - json_testentry4 = {"action": "ignore", "datatype": ["testtypeignore"], "scope": []} - json_testentry5 = {"action": "declare bad", "datatype": [], "scope": [str(self.scope_declarebad)]} - json_testentry6 = {"action": "ignore", "datatype": [], "scope": [str(self.scope_ignore)]} + json_testentry3 = {"action": "dry run", "datatype": ["testtypedryrun"], "scope": [str(self.scope_declarebad)]} + json_testentry4 = {"action": "declare bad", "datatype": ["testtypedeclarebad"], "scope": []} + json_testentry5 = {"action": "ignore", "datatype": ["testtypeignore"], "scope": []} + json_testentry6 = {"action": "declare bad", "datatype": [], "scope": [str(self.scope_declarebad)]} + json_testentry7 = {"action": "ignore", "datatype": [], "scope": [str(self.scope_ignore)]} json_data.append(json_HITS) json_data.append(json_RAW) json_data.append(json_testentry1) @@ -330,6 +362,7 @@ def setup_obj(self, vo, rse_factory, replica_client, mock_scope, file_factory, s json_data.append(json_testentry4) json_data.append(json_testentry5) json_data.append(json_testentry6) + json_data.append(json_testentry7) json.dump(json_data, json_file) def test_replica_recoverer(self, vo): @@ -354,6 +387,9 @@ def test_replica_recoverer(self, vo): # tmp_file8 unavailable suspicious (available) scope_nopolicy testtypenopolicy # tmp_file9 unavailable suspicious (available) scope_declarebad testtypeignore # tmp_file10 unavailable suspicious (available) scope_ignore testtypedeclarebad + # tmp_file11 unavailable suspicious (available) scope_declarebad + # tmp_file12 unavailable suspicious (available) mock_scope + # tmp_file13 unavailable suspicious (available) scope_declarebad testtypedryrun # ---------------------------------------------------------------------------------------------------------------------------------------------------- - Explaination: Suspicious replicas that are the last remaining copy (unavailable on rse4recovery) are handeled differently depending @@ -374,7 +410,7 @@ def test_replica_recoverer(self, vo): Concluding: - - checks that tmp_file1, tmp_file4, tmp_file7, tmp_file9 and tmp_file10 were declared as 'BAD' on rse4suspicious + - checks that tmp_file1, tmp_file4, tmp_file7, tmp_file9, tmp_file10 were declared as 'BAD' on rse4suspicious """ @@ -384,12 +420,12 @@ def test_replica_recoverer(self, vo): stop() # Checking the outcome: - # We expect to see four changes: tmp_file1, tmp_file4, tmp_file7 and tmp_file9 should be declared as bad on rse4suspicious + # We expect to see four changes: tmp_file1, tmp_file4, tmp_file7, tmp_file9, tmp_file10 should be declared as bad on rse4suspicious # ---------------------------------------------------------------------------------------------------------------------------------------------------- # Name State(s) declared on rse4recovery State(s) declared on rse4suspicious Scope Metadata "datatype" # ---------------------------------------------------------------------------------------------------------------------------------------------------- - # tmp_file1 available suspicious + bad (unavailable) mock_scope - # tmp_file2 available suspicious + bad (unavailable) mock_scope + # tmp_file1 available suspicious + bad (unavailable) mock_scope + # tmp_file2 available suspicious + bad (unavailable) mock_scope # tmp_file3 unavailable suspicious (available) mock_scope RAW # tmp_file4 unavailable suspicious + bad (unavailable) mock_scope testtypedeclarebad # tmp_file5 unavailable suspicious (available) mock_scope testtypenopolicy @@ -398,15 +434,18 @@ def test_replica_recoverer(self, vo): # tmp_file8 unavailable suspicious (available) scope_nopolicy testtypenopolicy # tmp_file9 unavailable suspicious + bad (unavailable) scope_declarebad testtypeignore # tmp_file10 unavailable suspicious (available) scope_ignore testtypedeclarebad + # tmp_file11 unavailable suspicious (available) scope_declarebad + # tmp_file12 unavailable suspicious (available) mock_scope + # tmp_file13 unavailable suspicious (available) scope_declarebad testtypedryrun # ---------------------------------------------------------------------------------------------------------------------------------------------------- # Gather replica info after replica_recoverer has run. - replicalist_mock = list(list_replicas(dids=self.listdids_mock)) - replicalist_declarebad = list(list_replicas(dids=self.listdids_declarebad)) - replicalist_nopolicy = list(list_replicas(dids=self.listdids_nopolicy)) - replicalist_ignore = list(list_replicas(dids=self.listdids_ignore)) + replicalist_scope_mock = list(list_replicas(dids=self.listdids_mock)) + replicalist_scope_declarebad = list(list_replicas(dids=self.listdids_declarebad)) + replicalist_scope_nopolicy = list(list_replicas(dids=self.listdids_nopolicy)) + replicalist_scope_ignore = list(list_replicas(dids=self.listdids_ignore)) - for replica in replicalist_mock: + for replica in replicalist_scope_mock: if replica['name'] == self.tmp_file1.name or replica['name'] == self.tmp_file2.name: assert (self.rse4suspicious_id in replica['states']) is False assert replica['states'][self.rse4recovery_id] == 'AVAILABLE' @@ -419,24 +458,31 @@ def test_replica_recoverer(self, vo): if replica['name'] == self.tmp_file5.name: assert replica['states'][self.rse4suspicious_id] == 'AVAILABLE' assert (self.rse4recovery_id in replica['states']) is False + if replica['name'] == self.tmp_file12.name: + assert replica['states'][self.rse4suspicious_id] == 'AVAILABLE' + assert (self.rse4recovery_id in replica['states']) is False - for replica in replicalist_declarebad: + for replica in replicalist_scope_declarebad: if replica['name'] == self.tmp_file7.name: # The 'states' key doesn't exist if the replica isn't available on at least one RSE assert not replica.get('states') if replica['name'] == self.tmp_file9.name: assert not replica.get('states') if replica['name'] == self.tmp_file11.name: - # tmp_file11 should have been ignored, as it doesn't have a datatype + # tmp_file11 should have been ignored, as it doesn't have a datatype + assert replica['states'][self.rse4suspicious_id] == 'AVAILABLE' + assert (self.rse4recovery_id in replica['states']) is False + if replica['name'] == self.tmp_file13.name: + # tmp_file13 should have been ignored, as it is running as a dry run assert replica['states'][self.rse4suspicious_id] == 'AVAILABLE' assert (self.rse4recovery_id in replica['states']) is False - for replica in replicalist_nopolicy: + for replica in replicalist_scope_nopolicy: if replica['name'] == self.tmp_file8.name: assert replica['states'][self.rse4suspicious_id] == 'AVAILABLE' assert (self.rse4recovery_id in replica['states']) is False - for replica in replicalist_ignore: + for replica in replicalist_scope_ignore: if replica['name'] == self.tmp_file10.name: assert replica['states'][self.rse4suspicious_id] == 'AVAILABLE' assert (self.rse4recovery_id in replica['states']) is False @@ -456,6 +502,8 @@ def test_replica_recoverer(self, vo): assert (self.tmp_file9.name, self.rse4suspicious_id, BadFilesStatus.BAD) in bad_checklist assert (self.tmp_file10.name, self.rse4suspicious_id, BadFilesStatus.BAD) not in bad_checklist assert (self.tmp_file11.name, self.rse4suspicious_id, BadFilesStatus.BAD) not in bad_checklist + assert (self.tmp_file12.name, self.rse4suspicious_id, BadFilesStatus.BAD) not in bad_checklist + assert (self.tmp_file13.name, self.rse4suspicious_id, BadFilesStatus.BAD) not in bad_checklist bad_replicas_list = list_bad_replicas_status(rse_id=self.rse4recovery_id, younger_than=self.from_date, vo=vo) bad_checklist = [(badf['name'], badf['rse_id'], badf['state']) for badf in bad_replicas_list] @@ -471,3 +519,5 @@ def test_replica_recoverer(self, vo): assert (self.tmp_file9.name, self.rse4recovery_id, BadFilesStatus.BAD) not in bad_checklist assert (self.tmp_file10.name, self.rse4recovery_id, BadFilesStatus.BAD) not in bad_checklist assert (self.tmp_file11.name, self.rse4recovery_id, BadFilesStatus.BAD) not in bad_checklist + assert (self.tmp_file12.name, self.rse4recovery_id, BadFilesStatus.BAD) not in bad_checklist + assert (self.tmp_file13.name, self.rse4recovery_id, BadFilesStatus.BAD) not in bad_checklist