diff --git a/Framework/CMakeLists.txt b/Framework/CMakeLists.txt index 5aa8de9441..9fa49b6b4f 100644 --- a/Framework/CMakeLists.txt +++ b/Framework/CMakeLists.txt @@ -400,8 +400,8 @@ install(FILES example-default.json streamerinfos_v017.root DESTINATION etc) -install(PROGRAMS script/RepoCleaner/1_per_hour.py script/RepoCleaner/Ccdb.py script/RepoCleaner/skip.py - script/RepoCleaner/repoCleaner.py script/RepoCleaner/1_per_run.py script/RepoCleaner/none_kept.py +install(PROGRAMS script/RepoCleaner/rules/1_per_hour.py script/RepoCleaner/Ccdb.py script/RepoCleaner/rules/skip.py + script/RepoCleaner/repoCleaner.py script/RepoCleaner/rules/1_per_run.py script/RepoCleaner/rules/none_kept.py script/o2-qc-functional-test.sh DESTINATION bin) diff --git a/Framework/script/RepoCleaner/Ccdb.py b/Framework/script/RepoCleaner/Ccdb.py index 9f95d3c270..954fb2faae 100644 --- a/Framework/script/RepoCleaner/Ccdb.py +++ b/Framework/script/RepoCleaner/Ccdb.py @@ -15,28 +15,28 @@ class ObjectVersion: This class represents a single version. ''' - def __init__(self, path, uuid, validFrom, validTo, metadata): + def __init__(self, path, validFrom, validTo, uuid=None, metadata=None): ''' Construct an ObjectVersion. :param path: path to the object :param uuid: unique id of the object - :param validFromAsDatetime: validity range smaller limit (in ms) + :param validFrom: validity range smaller limit (in ms) :param validTo: validity range bigger limit (in ms) ''' self.path = path self.uuid = uuid - self.validFromAsDatetime = datetime.datetime.fromtimestamp(validFrom / 1000) # /1000 because we get ms self.validFrom = validFrom + # precomputed Datetime ("Dt") of the timestamp `validFrom` + self.validFromAsDt = datetime.datetime.fromtimestamp(int(validFrom) / 1000) # /1000 because we get ms self.validTo = validTo self.metadata = metadata def __repr__(self): if "Run" in self.metadata or "RunNumber" in self.metadata: run_number = self.metadata["Run"] if "Run" in self.metadata else self.metadata["RunNumber"] - return f"Version of object {self.path} valid from {self.validFromAsDatetime} (uuid {self.uuid}, " \ - f"ts {self.validFrom}), run {run_number}" + return f"Version of object {self.path} valid from {self.validFromAsDt}, run {run_number}" else: - return f"Version of object {self.path} valid from {self.validFromAsDatetime} (uuid {self.uuid}, " \ + return f"Version of object {self.path} valid from {self.validFromAsDt} (uuid {self.uuid}, " \ f"ts {self.validFrom})" @@ -116,31 +116,58 @@ def deleteVersion(self, version: ObjectVersion): sys.exit(1) # really ? @dryable.Dryable() - def updateValidity(self, version: ObjectVersion, validFrom: int, validTo: int): + def updateValidity(self, version: ObjectVersion, valid_from: int, valid_to: int, metadata=None): ''' Update the validity range of the specified version of an object. :param version: The ObjectVersion to update. - :param validFrom: The new "from" validity. - :param validTo: The new "to" validity. + :param valid_from: The new "from" validity. + :param valid_to: The new "to" validity. + :param metadata: Add or modify metadata ''' - if version.validTo == validTo: + if version.validTo == valid_to: logging.debug("The new timestamp for validTo is identical to the existing one. Skipping.") return - url_update_validity = self.url + '/' + version.path + '/' + str(validFrom) + '/' + str(validTo) - logging.debug(f"Update end limit validity of {version.path} from {version.validTo} to {validTo}") + full_path = self.url + '/' + version.path + '/' + str(valid_from) + '/' + str(valid_to) + '/' + str(version.uuid) + '?' + logging.debug(f"Update end limit validity of {version.path} ({version.uuid}) from {version.validTo} to {valid_to}") + if metadata is not None: + logging.debug(f"{metadata}") + for key in metadata: + full_path += key + "=" + metadata[key] + "&" try: - r = requests.put(url_update_validity) + r = requests.put(full_path) r.raise_for_status() self.counter_validity_updated += 1 except requests.exceptions.RequestException as e: print(e) sys.exit(1) # really ? - - + + def putVersion(self, version: ObjectVersion, data): + ''' + :param version: An ObjectVersion that describes the data to be uploaded. + :param data: the actual data to send. E.g.:{'somekey': 'somevalue'} + :return A list of ObjectVersion. + ''' + full_path=self.url + "/" + version.path + "/" + str(version.validFrom) + "/" + str(version.validTo) + "/" + if version.metadata is not None: + for key in version.metadata: + full_path += key + "=" + version.metadata[key] + "/" + logging.debug(f"fullpath: {full_path}") + r = requests.post(full_path, files=data) + if r.ok: + logging.debug(f"Version pushed to {version.path}") + else: + logging.error(f"Could not post a new version of {version.path}: {r.text}") + def main(): + logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S') + logging.getLogger().setLevel(int(10)) + ccdb = Ccdb('http://ccdb-test.cern.ch:8080') - objectsList = ccdb.getObjectsList() - print(f"{objectsList}") + + data = {'somekey': 'somevalue'} + metadata = {'RunNumber': '213564', 'test': 'on'} + version_info = ObjectVersion(path="qc/TST/MO/repo/test", validFrom=1605091858183, validTo=1920451858183, metadata=metadata) + ccdb.putVersion(version_info, data) if __name__ == "__main__": # to be able to run the test code above when not imported. diff --git a/Framework/script/RepoCleaner/README.md b/Framework/script/RepoCleaner/README.md index d6202bd826..eefb7caca1 100644 --- a/Framework/script/RepoCleaner/README.md +++ b/Framework/script/RepoCleaner/README.md @@ -32,7 +32,13 @@ The configuration for ccdb-test is described [here](../../../doc/DevelopersTips. ## Unit Tests `cd QualityControl/Framework/script/RepoCleaner ; python3 -m unittest discover` -To run just one of the rules, do `python3 1_per_run.py`. +In particular there is a test for the `production` rule that is pretty extensive. It hits the ccdb though and it needs the following path to be truncated: +` +qc/TST/MO/repo/test* +` + +## Other tests +Most of the classes and Rules have a main to help test them. To run do e.g. `python3 1_per_run.py`. ## Installation CMake will install the python scripts in bin and the config file in etc. diff --git a/Framework/script/RepoCleaner/config-test.yaml b/Framework/script/RepoCleaner/config-test.yaml index 7bee2e219c..5f3e5bc96c 100644 --- a/Framework/script/RepoCleaner/config-test.yaml +++ b/Framework/script/RepoCleaner/config-test.yaml @@ -7,4 +7,4 @@ Rules: policy: 1_per_hour Ccdb: - Url: http://UPDATEME.cern.ch:8080 \ No newline at end of file + Url: http://ccdb-test.cern.ch:8080 \ No newline at end of file diff --git a/Framework/script/RepoCleaner/config.yaml b/Framework/script/RepoCleaner/config.yaml index 875072bab6..f0589ec9bf 100644 --- a/Framework/script/RepoCleaner/config.yaml +++ b/Framework/script/RepoCleaner/config.yaml @@ -1,24 +1,28 @@ Rules: - - object_path: qc/ITS/.* + - object_path: qc/TST/MO/barth/hello_0 delay: 240 - policy: 1_per_run - delete_when_no_run: True - - object_path: qc/TST_KEEP/.* - delay: 240 - policy: 1_per_run - delete_when_no_run: True - - object_path: qc/.* # Path in the CCDB to a certain object - delay: 1440 # Delay in minutes during which a new object is not touched. (1 day) - policy: 1_per_hour # name of the policy to apply, must correspond to a python script. - - object_path: QcCheck/.* - delay: 60 policy: 1_per_hour - - object_path: Test - delay: 240 - policy: none_kept - - object_path: .* - delay: 1440 - policy: skip + migration: True +# - object_path: qc/ITS/.* +# delay: 240 +# policy: 1_per_run +# delete_when_no_run: True +# - object_path: qc/TST_KEEP/.* +# delay: 240 +# policy: 1_per_run +# delete_when_no_run: True +# - object_path: qc/.* # Path in the CCDB to a certain object +# delay: 1440 # Delay in minutes during which a new object is not touched. (1 day) +# policy: 1_per_hour # name of the policy to apply, must correspond to a python script. +# - object_path: QcCheck/.* +# delay: 60 +# policy: 1_per_hour +# - object_path: Test +# delay: 240 +# policy: none_kept +# - object_path: .* +# delay: 1440 +# policy: skip # - object_path: no_cleanup/.* # delay: 60 # policy: skip @@ -51,4 +55,4 @@ Rules: # policy: 1_per_hour Ccdb: - Url: http://UPDATEME.cern.ch:8080 + Url: http://ccdb-test.cern.ch:8080 diff --git a/Framework/script/RepoCleaner/repoCleaner.py b/Framework/script/RepoCleaner/repoCleaner.py index 24f8cfb0fa..c179dc0206 100755 --- a/Framework/script/RepoCleaner/repoCleaner.py +++ b/Framework/script/RepoCleaner/repoCleaner.py @@ -36,7 +36,8 @@ class Rule: """A class to hold information about a "rule" defined in the config file.""" - def __init__(self, object_path=None, delay=None, policy=None, all_params=None): + def __init__(self, object_path=None, delay=None, policy=None, # migration=None, + all_params=None): ''' Constructor. :param object_path: path to the object, or pattern, to which a rule will apply. @@ -48,13 +49,18 @@ def __init__(self, object_path=None, delay=None, policy=None, all_params=None): self.object_path = object_path self.delay = delay self.policy = policy + # self.migration = migration + self.extra_params = all_params - self.extra_params.pop("object_path") - self.extra_params.pop("delay") - self.extra_params.pop("policy") + if all_params is not None: + self.extra_params.pop("object_path") + self.extra_params.pop("delay") + self.extra_params.pop("policy") + # self.extra_params.pop("migration") def __repr__(self): - return 'Rule(object_path={.object_path}, delay={.delay}, policy={.policy}, extra_params={.extra_params})'.format(self, self, self, self) + return 'Rule(object_path={.object_path}, delay={.delay}, policy={.policy}, extra_params={.extra_params})'.format( + self, self, self, self, self) def parseArgs(): @@ -86,7 +92,7 @@ def parseConfig(config_file_path): :param config_file_path: Path to the config file :raises yaml.YAMLError If the config file does not contain a valid yaml. """ - + logging.info(f"Parsing config file {config_file_path}") with open(config_file_path, 'r') as stream: config_content = yaml.safe_load(stream) @@ -94,7 +100,8 @@ def parseConfig(config_file_path): rules = [] logging.debug("Rules found in the config file:") for rule_yaml in config_content["Rules"]: - rule = Rule(rule_yaml["object_path"], rule_yaml["delay"], rule_yaml["policy"], rule_yaml) + rule = Rule(rule_yaml["object_path"], rule_yaml["delay"], rule_yaml["policy"], # rule_yaml["migration"], + rule_yaml) rules.append(rule) logging.debug(f" * {rule}") @@ -102,6 +109,7 @@ def parseConfig(config_file_path): return {'rules': rules, 'ccdb_url': ccdb_url} + def downloadConfigFromGit(): """ Download a config file from git. @@ -110,7 +118,8 @@ def downloadConfigFromGit(): """ logging.debug("Get it from git") - r = requests.get('https://raw.github.com/AliceO2Group/QualityControl/repo_cleaner/Framework/script/RepoCleaner/config.yaml') + r = requests.get( + 'https://raw.github.com/AliceO2Group/QualityControl/repo_cleaner/Framework/script/RepoCleaner/config.yaml') logging.debug(f"config file from git : \n{r.text}") path = "/tmp/config.yaml" with open(path, 'w') as f: @@ -121,13 +130,13 @@ def downloadConfigFromGit(): def findMatchingRule(rules, object_path): """Return the first matching rule for the given path or None if none is found.""" - + logging.debug(f"findMatchingRule for {object_path}") - + if object_path == None: logging.error(f"findMatchingRule: object_path is None") return None - + for rule in rules: pattern = re.compile(rule.object_path) result = pattern.match(object_path) @@ -137,9 +146,11 @@ def findMatchingRule(rules, object_path): logging.debug(" No rule found, skipping.") return None + filepath = tempfile.gettempdir() + "/repoCleaner.txt" currentTimeStamp = int(time.time() * 1000) + def getTimestampLastExecution(): """ Returns the timestamp of the last execution. @@ -156,6 +167,7 @@ def getTimestampLastExecution(): f.close() return timestamp + def storeSavedTimestamp(): """ Store the timestamp we saved at the beginning of the execution of this script. @@ -175,8 +187,9 @@ def storeSavedTimestamp(): def main(): # Logging (you can use funcName in the template) - logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S') - + logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', + datefmt='%d-%b-%y %H:%M:%S') + # Parse arguments args = parseArgs() logging.getLogger().setLevel(int(args.log_level)) @@ -205,14 +218,17 @@ def main(): rule = findMatchingRule(rules, object_path); if rule == None: continue - + # Apply rule on object (find the plug-in script and apply) module = __import__(rule.policy) - stats = module.process(ccdb, object_path, int(rule.delay), rule.extra_params) + stats = module.process(ccdb, object_path, int(rule.delay), # rule.migration == "True", + rule.extra_params) logging.info(f"{rule.policy} applied on {object_path}: {stats}") - - logging.info(f" *** DONE *** (total deleted: {ccdb.counter_deleted}, total updated: {ccdb.counter_validity_updated})") + + logging.info( + f" *** DONE *** (total deleted: {ccdb.counter_deleted}, total updated: {ccdb.counter_validity_updated})") storeSavedTimestamp() + if __name__ == "__main__": # to be able to run the test code above when not imported. main() diff --git a/Framework/script/RepoCleaner/1_per_hour.py b/Framework/script/RepoCleaner/rules/1_per_hour.py similarity index 82% rename from Framework/script/RepoCleaner/1_per_hour.py rename to Framework/script/RepoCleaner/rules/1_per_hour.py index f87055f611..c6645b05f5 100644 --- a/Framework/script/RepoCleaner/1_per_hour.py +++ b/Framework/script/RepoCleaner/rules/1_per_hour.py @@ -6,7 +6,8 @@ from Ccdb import Ccdb, ObjectVersion -def process(ccdb: Ccdb, object_path: str, delay: int, extra_params: Dict[str, str]): +def process(ccdb: Ccdb, object_path: str, delay: int, #migration: bool, + extra_params: Dict[str, str]): ''' Process this deletion rule on the object. We use the CCDB passed by argument. Objects who have been created recently are spared (delay is expressed in minutes). @@ -17,6 +18,8 @@ def process(ccdb: Ccdb, object_path: str, delay: int, extra_params: Dict[str, st :param ccdb: the ccdb in which objects are cleaned up. :param object_path: path to the object, or pattern, to which a rule will apply. :param delay: the grace period during which a new object is never deleted. + :param migration: whether the objects that have not been deleted should be migrated to the Grid. It does not apply to the objects spared because they are recent (see delay). + :param extra_params: a dictionary containing extra parameters for this rule. :return a dictionary with the number of deleted, preserved and updated versions. Total = deleted+preserved. ''' @@ -29,7 +32,7 @@ def process(ccdb: Ccdb, object_path: str, delay: int, extra_params: Dict[str, st deletion_list: List[ObjectVersion] = [] update_list: List[ObjectVersion] = [] for v in versions: - if last_preserved == None or last_preserved.validFromAsDatetime < v.validFromAsDatetime - timedelta(hours=1): + if last_preserved == None or last_preserved.validFromAsDatetime < v.validFromAsDt - timedelta(hours=1): # first extend validity of the previous preserved (should we take into account the run ?) if last_preserved != None: ccdb.updateValidity(last_preserved, last_preserved.validFrom, str(int(v.validFrom) - 1)) @@ -38,7 +41,7 @@ def process(ccdb: Ccdb, object_path: str, delay: int, extra_params: Dict[str, st preservation_list.append(v) else: deletion_list.append(v) - if v.validFromAsDatetime < datetime.now() - timedelta(minutes=delay): + if v.validFromAsDt < datetime.now() - timedelta(minutes=delay): logging.debug(f"not in the grace period, we delete {v}") ccdb.deleteVersion(v) diff --git a/Framework/script/RepoCleaner/rules/1_per_run.py b/Framework/script/RepoCleaner/rules/1_per_run.py new file mode 100755 index 0000000000..ff3387a9f8 --- /dev/null +++ b/Framework/script/RepoCleaner/rules/1_per_run.py @@ -0,0 +1,106 @@ +from datetime import datetime +from datetime import timedelta +import logging +from collections import defaultdict +from Ccdb import Ccdb, ObjectVersion +import dryable +from typing import Dict + +def in_grace_period(version: ObjectVersion, delay: int): + return not (version.validFromAsDt < datetime.now() - timedelta(minutes=delay)) + +def process(ccdb: Ccdb, object_path: str, delay: int, extra_params: Dict[str, str]): + ''' + Process this deletion rule on the object. We use the CCDB passed by argument. + Objects which have been created recently are spared (delay is expressed in minutes). + This specific policy, 1_per_run, keeps only the most recent version for a given run based on the validity_from. + + It is implemented like this : + Go through all objects: if a run is set, add the object to the corresponding map element. + Go through the map: for each run keep the most recent object and delete the rest. + Files without runs are preserved if delete_when_no_run is set to false. Otherwise only the last one is preserved. + + TODO: how to handle the validity period ? + + :param ccdb: the ccdb in which objects are cleaned up. + :param object_path: path to the object, or pattern, to which a rule will apply. + :param delay: the grace period during which a new object is never deleted. + :param extra_params: a dictionary containing extra parameters for this rule. + :return a dictionary with the number of deleted, preserved and updated versions. Total = deleted+preserved. + ''' + + logging.debug(f"Plugin 1_per_run processing {object_path}") + + preservation_list: List[ObjectVersion] = [] + deletion_list: List[ObjectVersion] = [] + update_list: List[ObjectVersion] = [] + runs_dict: DefaultDict[str, List[ObjectVersion]] = defaultdict(list) + + delete_when_no_run = (extra_params.get("delete_when_no_run", False) == True) + logging.debug(f"delete_when_no_run : {delete_when_no_run}") + + # Find all the runs and group the versions + versions = ccdb.getVersionsList(object_path) + for v in versions: + logging.debug(f"Processing {v}") + if "Run" in v.metadata: + runs_dict[v.metadata['Run']].append(v) + else: + runs_dict[-1].append(v) # the ones with no run specified + + logging.debug(f"Number of runs : {len(runs_dict)}") + logging.debug(f"Number of versions without runs : {len(runs_dict[-1])}") + + # if we should not touch the files with no runs, let's just remove them from the map + if not delete_when_no_run: + del runs_dict[-1] + + # Dispatch the versions to deletion and preservation lists + for r, run_versions in runs_dict.items(): + # logging.debug(f"- run {r}") + + freshest: ObjectVersion = None + for v in run_versions: + # logging.debug(f" - version {v}") + if freshest is None or freshest.validFromAsDatetime < v.validFromAsDt: + if freshest is not None: + if in_grace_period(freshest, delay): + preservation_list.append(freshest) + else: + deletion_list.append(freshest) + freshest = v + else: + if in_grace_period(freshest, delay): + preservation_list.append(v) + else: + deletion_list.append(v) + preservation_list.append(freshest) + + # actual deletion + for d in deletion_list: + ccdb.deleteVersion(d) + + logging.debug(f"deleted ({len(deletion_list)}) : ") + for v in deletion_list: + logging.debug(f" {v}") + + logging.debug(f"preserved ({len(preservation_list)}) : ") + for v in preservation_list: + logging.debug(f" {v}") + + logging.debug(f"updated ({len(update_list)}) : ") + for v in update_list: + logging.debug(f" {v}") + + return {"deleted" : len(deletion_list), "preserved": len(preservation_list), "updated" : len(update_list)} + + +def main(): + logging.getLogger().setLevel(int(10)) + dryable.set( True ) + ccdb = Ccdb('http://ccdb-test.cern.ch:8080') + process(ccdb, "qc/testRunCleanup", 0) + + +if __name__ == "__main__": # to be able to run the test code above when not imported. + main() diff --git a/Framework/script/RepoCleaner/rules/__init__.py b/Framework/script/RepoCleaner/rules/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/Framework/script/RepoCleaner/last_only.py b/Framework/script/RepoCleaner/rules/last_only.py similarity index 88% rename from Framework/script/RepoCleaner/last_only.py rename to Framework/script/RepoCleaner/rules/last_only.py index f43f6a67f6..5b8b7be011 100644 --- a/Framework/script/RepoCleaner/last_only.py +++ b/Framework/script/RepoCleaner/rules/last_only.py @@ -15,6 +15,7 @@ def process(ccdb: Ccdb, object_path: str, delay: int, extra_params: Dict[str, st :param ccdb: the ccdb in which objects are cleaned up. :param object_path: path to the object, or pattern, to which a rule will apply. :param delay: the grace period during which a new object is never deleted. + :param extra_params: a dictionary containing extra parameters (unused in this rule) :return a dictionary with the number of deleted, preserved and updated versions. Total = deleted+preserved. ''' @@ -27,7 +28,7 @@ def process(ccdb: Ccdb, object_path: str, delay: int, extra_params: Dict[str, st deletion_list: List[ObjectVersion] = [] # find the earliest for v in versions: - if earliest == None or v.validFromAsDatetime > earliest.validFromAsDatetime: + if earliest == None or v.validFromAsDt > earliest.validFromAsDatetime: earliest = v logging.debug(f"earliest : {earliest}") @@ -37,7 +38,7 @@ def process(ccdb: Ccdb, object_path: str, delay: int, extra_params: Dict[str, st preservation_list.append(v) continue - if v.validFromAsDatetime < datetime.now() - timedelta(minutes=delay): + if v.validFromAsDt < datetime.now() - timedelta(minutes=delay): deletion_list.append(v) ccdb.deleteVersion(v) else: diff --git a/Framework/script/RepoCleaner/none_kept.py b/Framework/script/RepoCleaner/rules/none_kept.py similarity index 91% rename from Framework/script/RepoCleaner/none_kept.py rename to Framework/script/RepoCleaner/rules/none_kept.py index c1e81b6475..b65010ee98 100644 --- a/Framework/script/RepoCleaner/none_kept.py +++ b/Framework/script/RepoCleaner/rules/none_kept.py @@ -15,6 +15,7 @@ def process(ccdb: Ccdb, object_path: str, delay: int, extra_params: Dict[str, st :param ccdb: the ccdb in which objects are cleaned up. :param object_path: path to the object, or pattern, to which a rule will apply. :param delay: the grace period in minutes during which a new object is never deleted. + :param extra_params: a dictionary containing extra parameters (unused in this rule) :return a dictionary with the number of deleted, preserved and updated versions. Total = deleted+preserved. ''' @@ -25,7 +26,7 @@ def process(ccdb: Ccdb, object_path: str, delay: int, extra_params: Dict[str, st deletion_list: List[ObjectVersion] = [] for v in versions: - if v.validFromAsDatetime < datetime.now() - timedelta(minutes=delay): + if v.validFromAsDt < datetime.now() - timedelta(minutes=delay): logging.debug(f"not in the grace period, we delete {v}") deletion_list.append(v) ccdb.deleteVersion(v) diff --git a/Framework/script/RepoCleaner/rules/production.py b/Framework/script/RepoCleaner/rules/production.py new file mode 100644 index 0000000000..662c3628e0 --- /dev/null +++ b/Framework/script/RepoCleaner/rules/production.py @@ -0,0 +1,224 @@ +import logging +from typing import Dict +from collections import defaultdict +from datetime import datetime +import time +from datetime import timedelta, date + +from Ccdb import Ccdb, ObjectVersion + + +def in_grace_period(version: ObjectVersion, delay: int): + return version.validFromAsDt + timedelta(minutes=delay) > datetime.now() + +eor_dict = {} # to have fake eor numbers + + +def process(ccdb: Ccdb, object_path: str, delay: int, extra_params: Dict[str, str]): + ''' + Process this deletion rule on the object. We use the CCDB passed by argument. + + This is the rule we use in production for the objects that need to be migrated. + + What it does: + - Versions without run number -> delete after the `delay` + - (The run number is set in "RunNumber" metadata) + - For a given run + - Keep everything for 30 minutes (configurable: delay_first_trimming) + - Keep 1 per 10 minutes (configurable: period_btw_versions_first) after this delay. + - Keep 1 per 1 hour (configurable: period_btw_versions_final) + as well as first and last at EOR+3h (configurable: delay_final_trimming) + - What has not been deleted at this stage is marked to be migrated (preservation = true) + + Extra parameters: + - delay_first_trimming: Delay in minutes before first trimming. (default: 30) + - period_btw_versions_first: Period in minutes between the versions we will keep after first trimming. (default: 10) + - delay_final_trimming: Delay in minutes, counted from the EOR, before we do the final cleanup and mark for migration. (default: 180) + - period_btw_versions_final: Period in minutes between the versions we will migrate. (default: 60) + + Implementation : + - Go through all objects: + - if a run is set, add the object to the corresponding map element. + - if not, if the delay has passed, delete. + - Go through the map: for each run + - Check if run has finished and get the time of EOR if so. + - if run is over for more than 3 hours + - Final trimming + - else + - For each version + - First trimming (1 per 10 minutes) + During the first trimming we mark (trim1=true) the versions we have already treated to avoid redoing the work. + + + :param ccdb: the ccdb in which objects are cleaned up. + :param object_path: path to the object, or pattern, to which a rule will apply. + :param delay: the grace period during which a new object is not deleted although it has no run number. + :param extra_params: a dictionary containing extra parameters for this rule. + :return a dictionary with the number of deleted, preserved and updated versions. Total = deleted+preserved. + ''' + + logging.debug(f"Plugin 'production' processing {object_path}") + + # Variables + preservation_list: List[ObjectVersion] = [] + deletion_list: List[ObjectVersion] = [] + update_list: List[ObjectVersion] = [] + runs_dict: DefaultDict[str, List[ObjectVersion]] = defaultdict(list) + + # Extra parameters + delay_first_trimming = int(extra_params.get("delay_first_trimming", 30)) + logging.debug(f"delay_first_trimming : {delay_first_trimming}") + period_btw_versions_first = int(extra_params.get("period_btw_versions_first", 10)) + logging.debug(f"period_btw_versions_first : {period_btw_versions_first}") + delay_final_trimming = int(extra_params.get("delay_final_trimming", 180)) + logging.debug(f"delay_final_trimming : {delay_final_trimming}") + period_btw_versions_final = int(extra_params.get("period_btw_versions_final", 60)) + logging.debug(f"period_btw_versions_final : {period_btw_versions_final}") + + # Find all the runs and group the versions + versions = ccdb.getVersionsList(object_path) + logging.debug(f"Dispatching versions to runs") + for v in versions: + if "RunNumber" in v.metadata: + runs_dict[v.metadata['RunNumber']].append(v) + else: + runs_dict[-1].append(v) # the ones with no run specified + logging.debug(f" Number of runs : {len(runs_dict)}") + logging.debug(f" Number of versions without runs : {len(runs_dict[-1])}") + + # Versions without runs: spare if more recent than the delay + logging.debug(f"Eliminating versions without runs if older than the grace period") + for run_version in runs_dict[-1]: + if in_grace_period(run_version, delay): + preservation_list.append(run_version) + else: + logging.debug(f" delete {run_version}") + deletion_list.append(run_version) + ccdb.deleteVersion(run_version) + del runs_dict[-1] # remove this "run" from the list + + # For each run + logging.debug(f"Trimming the versions with a run number") + for run, run_versions in runs_dict.items(): + logging.debug(f" Processing run {run}") + # TODO get the EOR if it happened, meanwhile we use `eor_dict` or compute first object time + 15 hours + eor = eor_dict.get(int(run), run_versions[0].validFromAsDt + timedelta(hours=15)) + logging.debug(f" EOR : {eor}") + + # run is finished for long enough + if eor is not None and datetime.now() > eor + timedelta(minutes=delay_final_trimming): + logging.debug(" Run is over for long enough, let's do the final trimming") + final_trimming(ccdb, period_btw_versions_final, run_versions, preservation_list, update_list, deletion_list) + else: # trim the versions as the run is ongoing or too fresh + logging.debug(" Run is too fresh or still ongoing, we do the light trimming") + first_trimming(ccdb, delay_first_trimming, period_btw_versions_first, run_versions, + preservation_list, update_list, deletion_list) + + # Print result + logging.debug("*** Results ***") + logging.debug(f"deleted ({len(deletion_list)}) : ") + for v in deletion_list: + logging.debug(f" {v}") + logging.debug(f"preserved ({len(preservation_list)}) : ") + for v in preservation_list: + logging.debug(f" {v}") + logging.debug(f"updated ({len(update_list)}) : ") + for v in update_list: + logging.debug(f" {v}") + + return {"deleted": len(deletion_list), "preserved": len(preservation_list), "updated": len(update_list)} + + +def first_trimming(ccdb, delay_first_trimming, period_btw_versions_first, run_versions, preservation_list, + update_list, deletion_list): + last_preserved: ObjectVersion = None + limit_first_trimming = datetime.now() - timedelta(minutes=delay_first_trimming) + metadata = {'trim1': 'done'} + for v in run_versions: + logging.debug(f" Processing {v} ") + + if 'trim1' in v.metadata: # check if it is already in the cache + logging.debug(f" Already processed - skip") + last_preserved = v + preservation_list.append(v) + continue + + if v.validFromAsDt < limit_first_trimming: # delay for 1st trimming is exhausted + # if it is the first or if it is "far enough" from the previous one + if last_preserved is None or \ + last_preserved.validFromAsDt < v.validFromAsDt - timedelta(minutes=period_btw_versions_first): + if last_preserved is not None: + # first extend validity of the previous preserved and set flag + ccdb.updateValidity(last_preserved, last_preserved.validFrom, str(int(v.validFrom) - 1), metadata) + update_list.append(last_preserved) + logging.debug(f" Extension of {last_preserved}") + last_preserved = v + preservation_list.append(v) + else: # too close to the previous one, delete + deletion_list.append(v) + logging.debug(f" Deletion of {v}") + ccdb.deleteVersion(v) + else: + preservation_list.append(v) + + +def final_trimming(ccdb, period_btw_versions_final, run_versions, preservation_list, update_list, deletion_list): + # go through the whole run, keep only one version every period_btw_versions_final minutes + last_preserved: ObjectVersion = None + metadata = {'trim1': '', 'preservation':'true'} + for v in run_versions: + logging.debug(f" Processing {v} ") + # if it is the first or if the last_preserved is older than `period_btw_versions_final` + if last_preserved is None \ + or last_preserved.validFromAsDt < v.validFromAsDt - timedelta(minutes=period_btw_versions_final) \ + or v == run_versions[-1]: # v is last element, which we must preserve + if last_preserved is not None: # first extend validity of the previous preserved and set flag + ccdb.updateValidity(last_preserved, last_preserved.validFrom, str(int(v.validFrom) - 1), metadata) + update_list.append(last_preserved) + logging.debug(f" Extension of {last_preserved}") + last_preserved = v + preservation_list.append(v) + else: # too close to the previous one, delete + deletion_list.append(v) + logging.debug(f" Deletion of {v}") + ccdb.deleteVersion(v) + + +def main(): + logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', + datefmt='%d-%b-%y %H:%M:%S') + logging.getLogger().setLevel(int(10)) + + ccdb = Ccdb('http://ccdb-test.cern.ch:8080') + extra = {"delay_first_trimming": "25", "period_btw_versions_first": "11", "delay_final_trimming": "179", + "period_btw_versions_final": "15"} + path = "qc/TST/MO/repo/test" + run = 123456 + + prepare_test_data(ccdb, path, run) + + process(ccdb, path, 15, extra) + + +def prepare_test_data(ccdb, path, run): + current_timestamp = int(time.time() * 1000) + data = {'part': 'part'} + metadata = {'RunNumber': str(run)} + # 1 version every 1 minutes starting 1 hour ago + for x in range(60): + from_ts = current_timestamp - (60 - x) * 60 * 1000 + to_ts = from_ts + 24 * 60 * 60 * 1000 # a day + version_info = ObjectVersion(path=path, validFrom=from_ts, validTo=to_ts, metadata=metadata) + ccdb.putVersion(version=version_info, data=data) + # 1 version every 1 minutes starting 1/2 hour ago WITHOUT run + current_timestamp = int(time.time() * 1000) + metadata = {} + for x in range(30): + from_ts = current_timestamp - (60 - x) * 60 * 1000 + to_ts = from_ts + 24 * 60 * 60 * 1000 # a day + version_info = ObjectVersion(path=path, validFrom=from_ts, validTo=to_ts, metadata=metadata) + ccdb.putVersion(version=version_info, data=data) + + +if __name__ == "__main__": # to be able to run the test code above when not imported. + main() diff --git a/Framework/script/RepoCleaner/skip.py b/Framework/script/RepoCleaner/rules/skip.py similarity index 92% rename from Framework/script/RepoCleaner/skip.py rename to Framework/script/RepoCleaner/rules/skip.py index 9071bb1bb5..557138edd1 100644 --- a/Framework/script/RepoCleaner/skip.py +++ b/Framework/script/RepoCleaner/rules/skip.py @@ -15,6 +15,7 @@ def process(ccdb: Ccdb, object_path: str, delay: int, extra_params: Dict[str, st :param ccdb: the ccdb in which objects are cleaned up. :param object_path: path to the object, or pattern, to which a rule will apply. :param delay: the grace period during which a new object is never deleted. + :param extra_params: a dictionary containing extra parameters (unused in this rule) :return a dictionary with the number of deleted, preserved and updated versions. Total = deleted+preserved. ''' diff --git a/Framework/script/RepoCleaner/test_Ccdb.py b/Framework/script/RepoCleaner/test_Ccdb.py index ff20cb1e4f..edb16b68b8 100644 --- a/Framework/script/RepoCleaner/test_Ccdb.py +++ b/Framework/script/RepoCleaner/test_Ccdb.py @@ -45,4 +45,3 @@ def test_getVersionsList(self): if __name__ == '__main__': unittest.main() - diff --git a/Framework/script/RepoCleaner/test_Production.py b/Framework/script/RepoCleaner/test_Production.py new file mode 100644 index 0000000000..e0acdccc60 --- /dev/null +++ b/Framework/script/RepoCleaner/test_Production.py @@ -0,0 +1,148 @@ +import logging +import time +import unittest +from datetime import timedelta, date, datetime + +from Ccdb import Ccdb, ObjectVersion +from rules import production + + +class TestProduction(unittest.TestCase): + """ + This test pushes data to the CCDB and then run the Rule Production and then check. + It does it for several use cases. + One should truncate /qc/TST/MO/repo/test before running it. + """ + + def setUp(self): + self.ccdb = Ccdb('http://ccdb-test.cern.ch:8080') + self.extra = {"delay_first_trimming": "30", "period_btw_versions_first": "10", "delay_final_trimming": "60", + "period_btw_versions_final": "60"} + self.path = "qc/TST/MO/repo/test" + self.run = 124321 + + def test_start_run(self): + """ + Ongoing run (25). + 0'-25': not trimmed + Expected output: nothing trimmed, then change parameter delay_first_trimming to 5' and trim + """ + logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', + datefmt='%d-%b-%y %H:%M:%S') + logging.getLogger().setLevel(int(10)) + + # Prepare data + test_path = self.path + "/test_start_run" + self.prepare_data(test_path, 25, 30, True, 60, False) + + production.eor_dict.pop(int(self.run), None) + stats = production.process(self.ccdb, test_path, 30, self.extra) + self.assertEqual(stats["deleted"], 0) + self.assertEqual(stats["preserved"], 26) + self.assertEqual(stats["updated"], 0) + + objects_versions = self.ccdb.getVersionsList(test_path) + self.assertEqual(len(objects_versions), 26) + + self.extra = {"delay_first_trimming": "5", "period_btw_versions_first": "10", "delay_final_trimming": "60", + "period_btw_versions_final": "15"} + stats = production.process(self.ccdb, test_path, 30, self.extra) + self.assertEqual(stats["deleted"], 19) + self.assertEqual(stats["preserved"], 7) + self.assertEqual(stats["updated"], 1) + + def test_mid_run(self): + """ + Ongoing run (1h30). + 0-30' : already trimmed + 30-90': not trimmed + Expected output: 0-60' trimmed + """ + logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', + datefmt='%d-%b-%y %H:%M:%S') + logging.getLogger().setLevel(int(10)) + + # Prepare data + test_path = self.path + "/test_mid_run" + self.prepare_data(test_path, 90) + + production.eor_dict.pop(int(self.run), None) + stats = production.process(self.ccdb, test_path, 30, self.extra) + self.assertEqual(stats["deleted"], 28) + self.assertEqual(stats["preserved"], 35) + self.assertEqual(stats["updated"], 3) + + objects_versions = self.ccdb.getVersionsList(test_path) + self.assertEqual(len(objects_versions), 35) + self.assertTrue("trim1" in objects_versions[0].metadata) + + def test_run_finished(self): + """ + Finished run (3h10). + 0'-190': not trimmed + running after delay for final trimming + Expected output: 5 versions : SOR, EOR, 2 at 1 hour interval + """ + logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', + datefmt='%d-%b-%y %H:%M:%S') + logging.getLogger().setLevel(int(10)) + + # Prepare data + test_path = self.path + "/test_run_finished" + self.prepare_data(test_path, 290, 190, False, 0, True) + + production.eor_dict[int(self.run)] = datetime.now() - timedelta(minutes=100) + stats = production.process(self.ccdb, test_path, 30, self.extra) + self.assertEqual(stats["deleted"], 15) + self.assertEqual(stats["preserved"], 4) + self.assertEqual(stats["updated"], 3) + + objects_versions = self.ccdb.getVersionsList(test_path) + self.assertEqual(len(objects_versions), 4) + self.assertTrue("trim1" not in objects_versions[0].metadata) + self.assertTrue("preservation" in objects_versions[0].metadata) + + def prepare_data(self, path, minutes_since_sor, duration_first_part=30, skip_first_part=False, + minutes_second_part=60, skip_second_part=False): + """ + Prepare a data set starting `minutes_since_sor` in the past. + The data is layed out in two parts + 1. `duration_first_part` minutes already trimmed data (every 10 minutes) + 2. `minutes_second_part` minutes untrimmed + Each part can be skipped with the respective parameter. + Depending how far in the past one starts, different outputs of the production rule are expected. + If `minutes_since_sor` is shorter than the run, we only create data in the past, never in the future. + """ + + current_timestamp = int(time.time() * 1000) + sor = current_timestamp - minutes_since_sor * 60 * 1000 + data = {'part': 'part'} + metadata = {'RunNumber': str(self.run), 'trim1': 'true'} + cursor = sor + + # 1 version every 10 minutes starting at sor and finishing 30 minutes after with trim1 flag set + if not skip_first_part: + for x in range(int(round(duration_first_part / 10))): + from_ts = cursor + x * 10 * 60 * 1000 + if from_ts > current_timestamp: + return + to_ts = from_ts + 24 * 60 * 60 * 1000 # a day + version_info = ObjectVersion(path=path, validFrom=from_ts, validTo=to_ts, metadata=metadata) + self.ccdb.putVersion(version=version_info, data=data) + cursor = cursor + duration_first_part * 60 * 1000 + + # 1 version every 1 minutes starting after and lasting `minutes_second_part` minutes + if not skip_second_part: + current_timestamp = int(time.time() * 1000) + metadata = {'RunNumber': str(self.run)} + for x in range(minutes_second_part): + from_ts = cursor + x * 60 * 1000 + if from_ts > current_timestamp: + return + to_ts = from_ts + 24 * 60 * 60 * 1000 # a day + version_info = ObjectVersion(path=path, validFrom=from_ts, validTo=to_ts, metadata=metadata) + self.ccdb.putVersion(version=version_info, data=data) + + +if __name__ == '__main__': + unittest.main() diff --git a/Framework/script/RepoCleaner/test_repoCleaner.py b/Framework/script/RepoCleaner/test_repoCleaner.py index 1c9416145e..a1df806f19 100644 --- a/Framework/script/RepoCleaner/test_repoCleaner.py +++ b/Framework/script/RepoCleaner/test_repoCleaner.py @@ -1,8 +1,6 @@ -import logging import unittest import yaml -import repoCleaner from repoCleaner import parseConfig, Rule, findMatchingRule