Skip to content

Commit

Permalink
Recovery: Implement checksum comparison to suspicious replica recoverer
Browse files Browse the repository at this point in the history
rucio#5334

Before a suspicious file which is the last remaining copy is declared bad, its checksum should be compared to varify that it is corrupt.
A new method is added to /core/replica.py which retrieves the reason for which a replica was declared bad. If the reason is related
to a checksum problem, then the replica is declared bad.
  • Loading branch information
ChristophAmes committed May 16, 2022
1 parent 4bb2c21 commit c40c399
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 5 deletions.
31 changes: 31 additions & 0 deletions lib/rucio/core/replica.py
Expand Up @@ -3920,6 +3920,37 @@ def get_suspicious_files(rse_expression, available_elsewhere, filter_=None, logg
return result


@read_session
def get_suspicious_reason(rse_id, scope, name, nattempts=0, logger=logging.log, session=None):
"""
Returns the error message(s) which lead to the replica(s) being declared suspicious.
:param rse_id: ID of RSE.
:param scope: Scope of the replica DID.
:param name: Name of the replica DID.
:param session: The database session in use. Default value = None.
"""
# Alias for bad replicas
bad_replicas_alias = aliased(models.BadReplicas, name='bad_replicas_alias')

# query base
query = session.query(bad_replicas_alias.scope, bad_replicas_alias.name, bad_replicas_alias.reason, bad_replicas_alias.rse_id)\
.filter(bad_replicas_alias.rse_id.in_([rse_id]),
bad_replicas_alias.scope.in_([scope]),
bad_replicas_alias.name.in_([name]))

query_result = query.group_by(bad_replicas_alias.rse_id, bad_replicas_alias.scope, bad_replicas_alias.name, bad_replicas_alias.reason).having(func.count() > nattempts).all()

result = []
rses = {}
for scope_, name_, reason, rse_id_ in query_result:
if rse_id_ not in rses:
rse = get_rse_name(rse_id=rse_id_, session=session)
rses[rse_id_] = rse
result.append({'scope': scope, 'name': name, 'rse': rses[rse_id_], 'rse_id': rse_id_, 'reason': reason})
return result


@transactional_session
def set_tombstone(rse_id, scope, name, tombstone=OBSOLETE, session=None):
"""
Expand Down
22 changes: 20 additions & 2 deletions lib/rucio/daemons/replicarecoverer/suspicious_replica_recoverer.py
Expand Up @@ -46,7 +46,7 @@
from rucio.core.heartbeat import live, die, sanity_check
from rucio.core.monitor import record_counter
from rucio.core.did import get_metadata
from rucio.core.replica import list_replicas, get_suspicious_files, add_bad_pfns, declare_bad_file_replicas
from rucio.core.replica import list_replicas, get_suspicious_files, add_bad_pfns, declare_bad_file_replicas, get_suspicious_reason
from rucio.core.rse_expression_parser import parse_expression

from rucio.core.vo import list_vos
Expand All @@ -61,6 +61,8 @@ def declare_suspicious_replicas_bad(once=False, younger_than=3, nattempts=10, vo
Gets a list of suspicious replicas that are listed as AVAILABLE in 'replicas' table
and available on other RSE. Finds surls of these replicas and declares them as bad.
Replicas that are the last remaining copy of a file have additional checks (checksum
comparison, etc.) before being declared bad.
:param once: If True, the loop is run just once, otherwise the daemon continues looping until stopped.
:param younger_than: The number of days since which bad_replicas table will be searched
Expand Down Expand Up @@ -261,6 +263,9 @@ def declare_suspicious_replicas_bad(once=False, younger_than=3, nattempts=10, vo
for rse in list_problematic_rses:
logger(logging.INFO, "%s", rse)

auditor = 0
checksum = 0

# Label suspicious replicas as bad if they have oher copies on other RSEs (that aren't also marked as suspicious).
# If they are the last remaining copies, deal with them differently.
for rse_key in list(recoverable_replicas[vo].keys()):
Expand Down Expand Up @@ -296,7 +301,16 @@ def declare_suspicious_replicas_bad(once=False, younger_than=3, nattempts=10, vo
if action == "ignore":
files_to_be_ignored.append(recoverable_replicas[vo][rse_key][replica_key])
elif action == "declare bad":
files_to_be_declared_bad.append(recoverable_replicas[vo][rse_key][replica_key])
suspicious_reason = get_suspicious_reason(recoverable_replicas[vo][rse_key][replica_key]["rse_id"], recoverable_replicas[vo][rse_key][replica_key]["scope"], recoverable_replicas[vo][rse_key][replica_key]["name"], nattempts)
for reason in suspicious_reason:
if "auditor" in reason["reason"].lower():
auditor += 0
files_to_be_declared_bad.append(recoverable_replicas[vo][rse_key][replica_key])
break
elif "checksum" in reason["reason"].lower():
checksum += 0
files_to_be_declared_bad.append(recoverable_replicas[vo][rse_key][replica_key])
break
else:
logger(logging.WARNING, "RSE: %s, replica name %s, surl %s: Match for the metadata 'datatype' (%s) of replica found in json file, but no match for 'action' (%s)",
rse_key, replica_key, recoverable_replicas[vo][rse_key][replica_key]['surl'], i["datatype"], i["action"])
Expand All @@ -314,11 +328,15 @@ def declare_suspicious_replicas_bad(once=False, younger_than=3, nattempts=10, vo

if files_to_be_declared_bad:
logger(logging.INFO, 'Ready to declare %s bad replica(s) on %s (RSE id: %s).', len(files_to_be_declared_bad), rse_key, str(rse_id))
logger(logging.INFO, 'Number of replicas with checksum problems: %i', checksum)
logger(logging.INFO, 'Number of replicas that were declared suspicious by the auditor: %i', auditor)

if active_mode:
declare_bad_file_replicas(pfns=files_to_be_declared_bad, reason='Suspicious. Automatic recovery.', issuer=InternalAccount('root', vo=vo), session=None)

logger(logging.INFO, 'Finished declaring bad replicas on %s.\n', rse_key)
else:
logger(logging.INFO, 'No files were declared bad on %s.\n', rse_key)

logger(logging.INFO, 'Finished checking for problematic RSEs and declaring bad replicas. Total time: %s seconds.', time.time() - time_start_check_probl)

Expand Down
7 changes: 4 additions & 3 deletions lib/rucio/tests/test_replica_recoverer.py
Expand Up @@ -105,11 +105,12 @@ def setUp(self):
suspicious_pfns = replica['rses'][self.rse4suspicious_id]
for i in range(3):
print("Declaring suspicious file replica: " + suspicious_pfns[0])
self.replica_client.declare_suspicious_file_replicas([suspicious_pfns[0], ], 'This is a good reason.')
# The reason must contain the word "checksum", so that the replica can be declared bad.
self.replica_client.declare_suspicious_file_replicas([suspicious_pfns[0], ], 'checksum')
sleep(1)
if replica['name'] == path.basename(self.tmp_file2):
print("Declaring bad file replica: " + suspicious_pfns[0])
self.replica_client.declare_bad_file_replicas([suspicious_pfns[0], ], 'This is a good reason')
self.replica_client.declare_bad_file_replicas([suspicious_pfns[0], ], 'checksum')
if replica['name'] == path.basename(self.tmp_file3):
print("Updating replica state as unavailable: " + replica['rses'][self.rse4recovery_id][0])
update_replica_state(self.rse4recovery_id, self.internal_scope, path.basename(self.tmp_file3), ReplicaState.UNAVAILABLE)
Expand Down Expand Up @@ -185,7 +186,7 @@ def test_replica_recoverer(self):
# ----------------------------------------------------------------------------------------------------------------------------------
- Explaination: Suspicious replicas that are the last remaining copy (unavailable on MOCK_RECOVERY) are handeled differently depending
by their metadata "datatype". RAW files have the poilcy to be ignored. testtype_declare_bad files are of a fictional
on their metadata "datatype". RAW files have the poilcy to be ignored. testtype_declare_bad files are of a fictional
type that has the policy of being declared bad. testtype_nopolicy files are of a fictional type that doesn't have a
policy specified, meaning they should be ignored by default.
Expand Down

0 comments on commit c40c399

Please sign in to comment.