Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 76 additions & 41 deletions Framework/script/RepoCleaner/o2-qc-repo-cleaner
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ import dryable
import yaml
import time
import consul

import pidfile
import multiprocessing as mp

from Ccdb import Ccdb
from pidfile import PIDFile, AlreadyRunningError


class Rule:
Expand Down Expand Up @@ -82,6 +82,8 @@ def parseArgs():
help='Dry run, no actual deletion nor modification to the CCDB.')
parser.add_argument('--only-path', dest='only_path', action='store', default="",
help='Only work on given path (omit the initial slash).')
parser.add_argument('--workers', dest='workers', action='store', default="1",
help='Number of parallel workers.')
args = parser.parse_args()
dryable.set(args.dry_run)
logging.info(args)
Expand Down Expand Up @@ -204,38 +206,40 @@ def storeSavedTimestamp():
f.close()


# ****************
# We start here !
# ****************

def main():
def prepare_main_logger():
logger = logging.getLogger()
# Logging (split between stderr and stdout)
formatter = logging.Formatter(fmt='%(asctime)s - %(levelname)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S')
h1 = logging.StreamHandler(sys.stdout)
h1.setLevel(logging.DEBUG)
# filter out everything that is above INFO level (WARN, ERROR, ...)
h1.addFilter(lambda record: record.levelno <= logging.INFO)
h1.addFilter(lambda record: record.levelno <= logging.INFO) # filter out everything that is above INFO level
h1.setFormatter(formatter)
logging.getLogger().addHandler(h1)
logger.addHandler(h1)
h2 = logging.StreamHandler(sys.stderr)
# take only warnings and error logs
h2.setLevel(logging.WARNING)
h2.setLevel(logging.WARNING) # take only warnings and error logs
h2.setFormatter(formatter)
logging.getLogger().addHandler(h2)
logger.addHandler(h2)

# Parse arguments
args = parseArgs()
logging.getLogger().setLevel(int(args.log_level))

try:
with pidfile.PIDFile():
print('No other copies around, we go ahead.')
time.sleep(30)
except pidfile.AlreadyRunningError:
print('Already running, stopping.')
sys.exit(1)

# Read configuration
def create_parallel_logger():
# TODO merge with prepare_main_logger. It creates problems though.
logger = mp.get_logger()
logger.setLevel(logging.INFO)
formatter = logging.Formatter(fmt='%(asctime)s - %(levelname)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S')
h1 = logging.StreamHandler(sys.stdout)
h1.setLevel(logging.DEBUG)
h1.addFilter(lambda record: record.levelno <= logging.INFO) # filter out everything that is above INFO level
h1.setFormatter(formatter)
h2 = logging.StreamHandler(sys.stderr)
h2.setLevel(logging.WARNING) # take only warnings and error logs
h2.setFormatter(formatter)
if not len(logger.handlers):
logger.addHandler(h2)
logger.addHandler(h1)
return logger


def readConfig(args):
path = args.config
if args.config_consul:
items = args.config_consul.split(':')
Expand All @@ -246,34 +250,65 @@ def main():
config = parseConfig(path)
rules: List[Rule] = config['rules']
ccdb_url = config['ccdb_url']
return ccdb_url, rules


def process_object(object_path, rules, ccdb):
logger = create_parallel_logger()
logger.info(f"Processing {object_path}")

# Take the first matching rule, if any
rule = findMatchingRule(rules, object_path)
if rule is None:
return

# Apply rule on object (find the plug-in script and apply)
module = __import__(rule.policy)
stats = module.process(ccdb, object_path, int(rule.delay), # rule.migration == "True",
rule.extra_params)
logger.info(f"{rule.policy} applied on {object_path}: {stats}")


def run(args, ccdb_url, rules):
# Get list of objects from CCDB
ccdb = Ccdb(ccdb_url)
paths = ccdb.getObjectsList(getTimestampLastExecution())
if args.only_path != '':
paths = [item for item in paths if item.startswith(args.only_path)]
logging.debug(paths)
logging.debug(len(paths))

# For each object call the first matching rule
logging.info("Loop through the objects and apply first matching rule.")
for object_path in paths:
logging.info(f"Processing {object_path}")
# Take the first matching rule, if any
rule = findMatchingRule(rules, object_path);
if rule == None:
continue

# Apply rule on object (find the plug-in script and apply)
module = __import__(rule.policy)
stats = module.process(ccdb, object_path, int(rule.delay), # rule.migration == "True",
rule.extra_params)
logging.info(f"{rule.policy} applied on {object_path}: {stats}")

logging.info(
f" *** DONE *** (total deleted: {ccdb.counter_deleted}, total updated: {ccdb.counter_validity_updated})")

logging.info(f"workers: {args.workers}")
pool = mp.Pool(int(args.workers))
[pool.apply_async(process_object, args=(object_path, rules, ccdb)) for object_path in paths]
pool.close()
pool.join()

logging.info(f" *** DONE *** (total deleted: {ccdb.counter_deleted}, total updated: {ccdb.counter_validity_updated})")
storeSavedTimestamp()


# ****************
# We start here !
# ****************

def main():
prepare_main_logger()

# Parse arguments
args = parseArgs()
logging.getLogger().setLevel(int(args.log_level))

try:
with PIDFile(filename='o2-qc-repo-cleaner.pid'):
ccdb_url, rules = readConfig(args)
run(args, ccdb_url, rules)

except AlreadyRunningError:
print('Already running. Exiting.')


if __name__ == "__main__": # to be able to run the test code above when not imported.
main()