Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Framework/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -400,8 +400,8 @@ install(FILES example-default.json
streamerinfos_v017.root
DESTINATION etc)

install(PROGRAMS script/RepoCleaner/1_per_hour.py script/RepoCleaner/Ccdb.py script/RepoCleaner/skip.py
script/RepoCleaner/repoCleaner.py script/RepoCleaner/1_per_run.py script/RepoCleaner/none_kept.py
install(PROGRAMS script/RepoCleaner/rules/1_per_hour.py script/RepoCleaner/Ccdb.py script/RepoCleaner/rules/skip.py
script/RepoCleaner/repoCleaner.py script/RepoCleaner/rules/1_per_run.py script/RepoCleaner/rules/none_kept.py
script/o2-qc-functional-test.sh
DESTINATION bin)

Expand Down
61 changes: 44 additions & 17 deletions Framework/script/RepoCleaner/Ccdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,28 @@ class ObjectVersion:
This class represents a single version.
'''

def __init__(self, path, uuid, validFrom, validTo, metadata):
def __init__(self, path, validFrom, validTo, uuid=None, metadata=None):
'''
Construct an ObjectVersion.
:param path: path to the object
:param uuid: unique id of the object
:param validFromAsDatetime: validity range smaller limit (in ms)
:param validFrom: validity range smaller limit (in ms)
:param validTo: validity range bigger limit (in ms)
'''
self.path = path
self.uuid = uuid
self.validFromAsDatetime = datetime.datetime.fromtimestamp(validFrom / 1000) # /1000 because we get ms
self.validFrom = validFrom
# precomputed Datetime ("Dt") of the timestamp `validFrom`
self.validFromAsDt = datetime.datetime.fromtimestamp(int(validFrom) / 1000) # /1000 because we get ms
self.validTo = validTo
self.metadata = metadata

def __repr__(self):
if "Run" in self.metadata or "RunNumber" in self.metadata:
run_number = self.metadata["Run"] if "Run" in self.metadata else self.metadata["RunNumber"]
return f"Version of object {self.path} valid from {self.validFromAsDatetime} (uuid {self.uuid}, " \
f"ts {self.validFrom}), run {run_number}"
return f"Version of object {self.path} valid from {self.validFromAsDt}, run {run_number}"
else:
return f"Version of object {self.path} valid from {self.validFromAsDatetime} (uuid {self.uuid}, " \
return f"Version of object {self.path} valid from {self.validFromAsDt} (uuid {self.uuid}, " \
f"ts {self.validFrom})"


Expand Down Expand Up @@ -116,31 +116,58 @@ def deleteVersion(self, version: ObjectVersion):
sys.exit(1) # really ?

@dryable.Dryable()
def updateValidity(self, version: ObjectVersion, validFrom: int, validTo: int):
def updateValidity(self, version: ObjectVersion, valid_from: int, valid_to: int, metadata=None):
'''
Update the validity range of the specified version of an object.
:param version: The ObjectVersion to update.
:param validFrom: The new "from" validity.
:param validTo: The new "to" validity.
:param valid_from: The new "from" validity.
:param valid_to: The new "to" validity.
:param metadata: Add or modify metadata
'''
if version.validTo == validTo:
if version.validTo == valid_to:
logging.debug("The new timestamp for validTo is identical to the existing one. Skipping.")
return
url_update_validity = self.url + '/' + version.path + '/' + str(validFrom) + '/' + str(validTo)
logging.debug(f"Update end limit validity of {version.path} from {version.validTo} to {validTo}")
full_path = self.url + '/' + version.path + '/' + str(valid_from) + '/' + str(valid_to) + '/' + str(version.uuid) + '?'
logging.debug(f"Update end limit validity of {version.path} ({version.uuid}) from {version.validTo} to {valid_to}")
if metadata is not None:
logging.debug(f"{metadata}")
for key in metadata:
full_path += key + "=" + metadata[key] + "&"
try:
r = requests.put(url_update_validity)
r = requests.put(full_path)
r.raise_for_status()
self.counter_validity_updated += 1
except requests.exceptions.RequestException as e:
print(e)
sys.exit(1) # really ?



def putVersion(self, version: ObjectVersion, data):
'''
:param version: An ObjectVersion that describes the data to be uploaded.
:param data: the actual data to send. E.g.:{'somekey': 'somevalue'}
:return A list of ObjectVersion.
'''
full_path=self.url + "/" + version.path + "/" + str(version.validFrom) + "/" + str(version.validTo) + "/"
if version.metadata is not None:
for key in version.metadata:
full_path += key + "=" + version.metadata[key] + "/"
logging.debug(f"fullpath: {full_path}")
r = requests.post(full_path, files=data)
if r.ok:
logging.debug(f"Version pushed to {version.path}")
else:
logging.error(f"Could not post a new version of {version.path}: {r.text}")

def main():
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S')
logging.getLogger().setLevel(int(10))

ccdb = Ccdb('http://ccdb-test.cern.ch:8080')
objectsList = ccdb.getObjectsList()
print(f"{objectsList}")

data = {'somekey': 'somevalue'}
metadata = {'RunNumber': '213564', 'test': 'on'}
version_info = ObjectVersion(path="qc/TST/MO/repo/test", validFrom=1605091858183, validTo=1920451858183, metadata=metadata)
ccdb.putVersion(version_info, data)


if __name__ == "__main__": # to be able to run the test code above when not imported.
Expand Down
8 changes: 7 additions & 1 deletion Framework/script/RepoCleaner/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,13 @@ The configuration for ccdb-test is described [here](../../../doc/DevelopersTips.
## Unit Tests
`cd QualityControl/Framework/script/RepoCleaner ; python3 -m unittest discover`

To run just one of the rules, do `python3 1_per_run.py`.
In particular there is a test for the `production` rule that is pretty extensive. It hits the ccdb though and it needs the following path to be truncated:
`
qc/TST/MO/repo/test*
`

## Other tests
Most of the classes and Rules have a main to help test them. To run do e.g. `python3 1_per_run.py`.

## Installation
CMake will install the python scripts in bin and the config file in etc.
2 changes: 1 addition & 1 deletion Framework/script/RepoCleaner/config-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ Rules:
policy: 1_per_hour

Ccdb:
Url: http://UPDATEME.cern.ch:8080
Url: http://ccdb-test.cern.ch:8080
42 changes: 23 additions & 19 deletions Framework/script/RepoCleaner/config.yaml
Original file line number Diff line number Diff line change
@@ -1,24 +1,28 @@
Rules:
- object_path: qc/ITS/.*
- object_path: qc/TST/MO/barth/hello_0
delay: 240
policy: 1_per_run
delete_when_no_run: True
- object_path: qc/TST_KEEP/.*
delay: 240
policy: 1_per_run
delete_when_no_run: True
- object_path: qc/.* # Path in the CCDB to a certain object
delay: 1440 # Delay in minutes during which a new object is not touched. (1 day)
policy: 1_per_hour # name of the policy to apply, must correspond to a python script.
- object_path: QcCheck/.*
delay: 60
policy: 1_per_hour
- object_path: Test
delay: 240
policy: none_kept
- object_path: .*
delay: 1440
policy: skip
migration: True
# - object_path: qc/ITS/.*
# delay: 240
# policy: 1_per_run
# delete_when_no_run: True
# - object_path: qc/TST_KEEP/.*
# delay: 240
# policy: 1_per_run
# delete_when_no_run: True
# - object_path: qc/.* # Path in the CCDB to a certain object
# delay: 1440 # Delay in minutes during which a new object is not touched. (1 day)
# policy: 1_per_hour # name of the policy to apply, must correspond to a python script.
# - object_path: QcCheck/.*
# delay: 60
# policy: 1_per_hour
# - object_path: Test
# delay: 240
# policy: none_kept
# - object_path: .*
# delay: 1440
# policy: skip
# - object_path: no_cleanup/.*
# delay: 60
# policy: skip
Expand Down Expand Up @@ -51,4 +55,4 @@ Rules:
# policy: 1_per_hour

Ccdb:
Url: http://UPDATEME.cern.ch:8080
Url: http://ccdb-test.cern.ch:8080
50 changes: 33 additions & 17 deletions Framework/script/RepoCleaner/repoCleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
class Rule:
"""A class to hold information about a "rule" defined in the config file."""

def __init__(self, object_path=None, delay=None, policy=None, all_params=None):
def __init__(self, object_path=None, delay=None, policy=None, # migration=None,
all_params=None):
'''
Constructor.
:param object_path: path to the object, or pattern, to which a rule will apply.
Expand All @@ -48,13 +49,18 @@ def __init__(self, object_path=None, delay=None, policy=None, all_params=None):
self.object_path = object_path
self.delay = delay
self.policy = policy
# self.migration = migration

self.extra_params = all_params
self.extra_params.pop("object_path")
self.extra_params.pop("delay")
self.extra_params.pop("policy")
if all_params is not None:
self.extra_params.pop("object_path")
self.extra_params.pop("delay")
self.extra_params.pop("policy")
# self.extra_params.pop("migration")

def __repr__(self):
return 'Rule(object_path={.object_path}, delay={.delay}, policy={.policy}, extra_params={.extra_params})'.format(self, self, self, self)
return 'Rule(object_path={.object_path}, delay={.delay}, policy={.policy}, extra_params={.extra_params})'.format(
self, self, self, self, self)


def parseArgs():
Expand Down Expand Up @@ -86,22 +92,24 @@ def parseConfig(config_file_path):
:param config_file_path: Path to the config file
:raises yaml.YAMLError If the config file does not contain a valid yaml.
"""

logging.info(f"Parsing config file {config_file_path}")
with open(config_file_path, 'r') as stream:
config_content = yaml.safe_load(stream)

rules = []
logging.debug("Rules found in the config file:")
for rule_yaml in config_content["Rules"]:
rule = Rule(rule_yaml["object_path"], rule_yaml["delay"], rule_yaml["policy"], rule_yaml)
rule = Rule(rule_yaml["object_path"], rule_yaml["delay"], rule_yaml["policy"], # rule_yaml["migration"],
rule_yaml)
rules.append(rule)
logging.debug(f" * {rule}")

ccdb_url = config_content["Ccdb"]["Url"]

return {'rules': rules, 'ccdb_url': ccdb_url}


def downloadConfigFromGit():
"""
Download a config file from git.
Expand All @@ -110,7 +118,8 @@ def downloadConfigFromGit():
"""

logging.debug("Get it from git")
r = requests.get('https://raw.github.com/AliceO2Group/QualityControl/repo_cleaner/Framework/script/RepoCleaner/config.yaml')
r = requests.get(
'https://raw.github.com/AliceO2Group/QualityControl/repo_cleaner/Framework/script/RepoCleaner/config.yaml')
logging.debug(f"config file from git : \n{r.text}")
path = "/tmp/config.yaml"
with open(path, 'w') as f:
Expand All @@ -121,13 +130,13 @@ def downloadConfigFromGit():

def findMatchingRule(rules, object_path):
"""Return the first matching rule for the given path or None if none is found."""

logging.debug(f"findMatchingRule for {object_path}")

if object_path == None:
logging.error(f"findMatchingRule: object_path is None")
return None

for rule in rules:
pattern = re.compile(rule.object_path)
result = pattern.match(object_path)
Expand All @@ -137,9 +146,11 @@ def findMatchingRule(rules, object_path):
logging.debug(" No rule found, skipping.")
return None


filepath = tempfile.gettempdir() + "/repoCleaner.txt"
currentTimeStamp = int(time.time() * 1000)


def getTimestampLastExecution():
"""
Returns the timestamp of the last execution.
Expand All @@ -156,6 +167,7 @@ def getTimestampLastExecution():
f.close()
return timestamp


def storeSavedTimestamp():
"""
Store the timestamp we saved at the beginning of the execution of this script.
Expand All @@ -175,8 +187,9 @@ def storeSavedTimestamp():

def main():
# Logging (you can use funcName in the template)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S')

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%d-%b-%y %H:%M:%S')

# Parse arguments
args = parseArgs()
logging.getLogger().setLevel(int(args.log_level))
Expand Down Expand Up @@ -205,14 +218,17 @@ def main():
rule = findMatchingRule(rules, object_path);
if rule == None:
continue

# Apply rule on object (find the plug-in script and apply)
module = __import__(rule.policy)
stats = module.process(ccdb, object_path, int(rule.delay), rule.extra_params)
stats = module.process(ccdb, object_path, int(rule.delay), # rule.migration == "True",
rule.extra_params)
logging.info(f"{rule.policy} applied on {object_path}: {stats}")

logging.info(f" *** DONE *** (total deleted: {ccdb.counter_deleted}, total updated: {ccdb.counter_validity_updated})")

logging.info(
f" *** DONE *** (total deleted: {ccdb.counter_deleted}, total updated: {ccdb.counter_validity_updated})")
storeSavedTimestamp()


if __name__ == "__main__": # to be able to run the test code above when not imported.
main()
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from Ccdb import Ccdb, ObjectVersion


def process(ccdb: Ccdb, object_path: str, delay: int, extra_params: Dict[str, str]):
def process(ccdb: Ccdb, object_path: str, delay: int, #migration: bool,
extra_params: Dict[str, str]):
'''
Process this deletion rule on the object. We use the CCDB passed by argument.
Objects who have been created recently are spared (delay is expressed in minutes).
Expand All @@ -17,6 +18,8 @@ def process(ccdb: Ccdb, object_path: str, delay: int, extra_params: Dict[str, st
:param ccdb: the ccdb in which objects are cleaned up.
:param object_path: path to the object, or pattern, to which a rule will apply.
:param delay: the grace period during which a new object is never deleted.
:param migration: whether the objects that have not been deleted should be migrated to the Grid. It does not apply to the objects spared because they are recent (see delay).
:param extra_params: a dictionary containing extra parameters for this rule.
:return a dictionary with the number of deleted, preserved and updated versions. Total = deleted+preserved.
'''

Expand All @@ -29,7 +32,7 @@ def process(ccdb: Ccdb, object_path: str, delay: int, extra_params: Dict[str, st
deletion_list: List[ObjectVersion] = []
update_list: List[ObjectVersion] = []
for v in versions:
if last_preserved == None or last_preserved.validFromAsDatetime < v.validFromAsDatetime - timedelta(hours=1):
if last_preserved == None or last_preserved.validFromAsDatetime < v.validFromAsDt - timedelta(hours=1):
# first extend validity of the previous preserved (should we take into account the run ?)
if last_preserved != None:
ccdb.updateValidity(last_preserved, last_preserved.validFrom, str(int(v.validFrom) - 1))
Expand All @@ -38,7 +41,7 @@ def process(ccdb: Ccdb, object_path: str, delay: int, extra_params: Dict[str, st
preservation_list.append(v)
else:
deletion_list.append(v)
if v.validFromAsDatetime < datetime.now() - timedelta(minutes=delay):
if v.validFromAsDt < datetime.now() - timedelta(minutes=delay):
logging.debug(f"not in the grace period, we delete {v}")
ccdb.deleteVersion(v)

Expand Down
Loading