From 856c71ca1797d3fa5cf4eddf26d7bf0c5523f9e9 Mon Sep 17 00:00:00 2001 From: Barthelemy Date: Tue, 20 Jul 2021 14:08:16 +0200 Subject: [PATCH 1/4] parallel but no logger --- .../script/RepoCleaner/o2-qc-repo-cleaner | 94 ++++++++++++++----- 1 file changed, 71 insertions(+), 23 deletions(-) diff --git a/Framework/script/RepoCleaner/o2-qc-repo-cleaner b/Framework/script/RepoCleaner/o2-qc-repo-cleaner index 74255537a7..d2b65e6c12 100755 --- a/Framework/script/RepoCleaner/o2-qc-repo-cleaner +++ b/Framework/script/RepoCleaner/o2-qc-repo-cleaner @@ -31,6 +31,7 @@ import dryable import yaml import time import consul +import multiprocessing as mp import pidfile @@ -204,11 +205,7 @@ def storeSavedTimestamp(): f.close() -# **************** -# We start here ! -# **************** - -def main(): +def prepareLogger(logger): # Logging (split between stderr and stdout) formatter = logging.Formatter(fmt='%(asctime)s - %(levelname)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S') h1 = logging.StreamHandler(sys.stdout) @@ -216,16 +213,13 @@ def main(): # filter out everything that is above INFO level (WARN, ERROR, ...) h1.addFilter(lambda record: record.levelno <= logging.INFO) h1.setFormatter(formatter) - logging.getLogger().addHandler(h1) + logger.addHandler(h1) h2 = logging.StreamHandler(sys.stderr) # take only warnings and error logs h2.setLevel(logging.WARNING) h2.setFormatter(formatter) - logging.getLogger().addHandler(h2) + logger.addHandler(h2) - # Parse arguments - args = parseArgs() - logging.getLogger().setLevel(int(args.log_level)) try: with pidfile.PIDFile(): @@ -235,7 +229,8 @@ def main(): print('Already running, stopping.') sys.exit(1) - # Read configuration + +def readConfig(args): path = args.config if args.config_consul: items = args.config_consul.split(':') @@ -246,7 +241,28 @@ def main(): config = parseConfig(path) rules: List[Rule] = config['rules'] ccdb_url = config['ccdb_url'] + return ccdb_url, rules + + +def process_object(object_path, rules, ccdb): + print(f"Processing {object_path}") + # logger = create_logger() + # logger = logging.getLogger() + # logger.info(f"Processing {object_path}") + # Take the first matching rule, if any + rule = findMatchingRule(rules, object_path) + if rule is None: + return + + # Apply rule on object (find the plug-in script and apply) + module = __import__(rule.policy) + stats = module.process(ccdb, object_path, int(rule.delay), # rule.migration == "True", + rule.extra_params) + # logger.info(f"{rule.policy} applied on {object_path}: {stats}") + print(f"{rule.policy} applied on {object_path}: {stats}") + +def run(args, ccdb_url, rules): # Get list of objects from CCDB ccdb = Ccdb(ccdb_url) paths = ccdb.getObjectsList(getTimestampLastExecution()) @@ -257,23 +273,55 @@ def main(): # For each object call the first matching rule logging.info("Loop through the objects and apply first matching rule.") - for object_path in paths: - logging.info(f"Processing {object_path}") - # Take the first matching rule, if any - rule = findMatchingRule(rules, object_path); - if rule == None: - continue - - # Apply rule on object (find the plug-in script and apply) - module = __import__(rule.policy) - stats = module.process(ccdb, object_path, int(rule.delay), # rule.migration == "True", - rule.extra_params) - logging.info(f"{rule.policy} applied on {object_path}: {stats}") + + pool = mp.Pool(8) + + [pool.apply(process_object, args=(object_path, rules, ccdb)) for object_path in paths] + + # for object_path in paths: + # process_object(object_path=object_path, rules=rules, ccdb=ccdb) + # logging.info(f"Processing {object_path}") + # # Take the first matching rule, if any + # rule = findMatchingRule(rules, object_path) + # if rule == None: + # continue + # + # # Apply rule on object (find the plug-in script and apply) + # module = __import__(rule.policy) + # stats = module.process(ccdb, object_path, int(rule.delay), # rule.migration == "True", + # rule.extra_params) + # logging.info(f"{rule.policy} applied on {object_path}: {stats}") + + pool.close() logging.info( f" *** DONE *** (total deleted: {ccdb.counter_deleted}, total updated: {ccdb.counter_validity_updated})") storeSavedTimestamp() +# **************** +# We start here ! +# **************** + +def main(): + # logger = create_logger() + # logger.info('Starting pooling') + prepareLogger(logging.getLogger()) + # multiprocessing_logging.install_mp_handler() + # prepareLogger(mp.get_logger()) + + # Parse arguments + args = parseArgs() + logging.getLogger().setLevel(int(args.log_level)) + + try: + with PIDFile(filename='o2-qc-repo-cleaner.pid'): + ccdb_url, rules = readConfig(args) + run(args, ccdb_url, rules) + + except AlreadyRunningError: + print('Already running. Exiting.') + + if __name__ == "__main__": # to be able to run the test code above when not imported. main() From f61b5d4a85e1aae4cdd790a64a13a5432d61b35d Mon Sep 17 00:00:00 2001 From: Barthelemy Date: Tue, 20 Jul 2021 14:41:33 +0200 Subject: [PATCH 2/4] working --- .../script/RepoCleaner/o2-qc-repo-cleaner | 81 ++++++++----------- 1 file changed, 33 insertions(+), 48 deletions(-) diff --git a/Framework/script/RepoCleaner/o2-qc-repo-cleaner b/Framework/script/RepoCleaner/o2-qc-repo-cleaner index d2b65e6c12..2b7863c312 100755 --- a/Framework/script/RepoCleaner/o2-qc-repo-cleaner +++ b/Framework/script/RepoCleaner/o2-qc-repo-cleaner @@ -205,29 +205,36 @@ def storeSavedTimestamp(): f.close() -def prepareLogger(logger): +def prepare_main_logger(): + logger = logging.getLogger() # Logging (split between stderr and stdout) formatter = logging.Formatter(fmt='%(asctime)s - %(levelname)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S') h1 = logging.StreamHandler(sys.stdout) h1.setLevel(logging.DEBUG) - # filter out everything that is above INFO level (WARN, ERROR, ...) - h1.addFilter(lambda record: record.levelno <= logging.INFO) + h1.addFilter(lambda record: record.levelno <= logging.INFO) # filter out everything that is above INFO level h1.setFormatter(formatter) logger.addHandler(h1) h2 = logging.StreamHandler(sys.stderr) - # take only warnings and error logs - h2.setLevel(logging.WARNING) + h2.setLevel(logging.WARNING) # take only warnings and error logs h2.setFormatter(formatter) logger.addHandler(h2) - try: - with pidfile.PIDFile(): - print('No other copies around, we go ahead.') - time.sleep(30) - except pidfile.AlreadyRunningError: - print('Already running, stopping.') - sys.exit(1) +def create_parallel_logger(): + # TODO merge with prepare_main_logger. It creates problems though. + logger = mp.get_logger() + logger.setLevel(logging.INFO) + formatter = logging.Formatter(fmt='%(asctime)s - %(levelname)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S') + h1 = logging.StreamHandler(sys.stdout) + h1.setLevel(logging.DEBUG) + h1.addFilter(lambda record: record.levelno <= logging.INFO) # filter out everything that is above INFO level + h1.setFormatter(formatter) + h2 = logging.StreamHandler(sys.stderr) + h2.setLevel(logging.WARNING) # take only warnings and error logs + h2.setFormatter(formatter) + if not len(logger.handlers): + logger.addHandler(h2) + logger.addHandler(h1) def readConfig(args): @@ -245,21 +252,20 @@ def readConfig(args): def process_object(object_path, rules, ccdb): - print(f"Processing {object_path}") - # logger = create_logger() - # logger = logging.getLogger() - # logger.info(f"Processing {object_path}") + logger = create_parallel_logger() + logger.info(f"Processing {object_path}") + # Take the first matching rule, if any rule = findMatchingRule(rules, object_path) if rule is None: + logger.info("rulenone") return # Apply rule on object (find the plug-in script and apply) module = __import__(rule.policy) stats = module.process(ccdb, object_path, int(rule.delay), # rule.migration == "True", rule.extra_params) - # logger.info(f"{rule.policy} applied on {object_path}: {stats}") - print(f"{rule.policy} applied on {object_path}: {stats}") + logger.info(f"{rule.policy} applied on {object_path}: {stats}") def run(args, ccdb_url, rules): @@ -274,28 +280,11 @@ def run(args, ccdb_url, rules): # For each object call the first matching rule logging.info("Loop through the objects and apply first matching rule.") - pool = mp.Pool(8) - + pool = mp.Pool(4) [pool.apply(process_object, args=(object_path, rules, ccdb)) for object_path in paths] - - # for object_path in paths: - # process_object(object_path=object_path, rules=rules, ccdb=ccdb) - # logging.info(f"Processing {object_path}") - # # Take the first matching rule, if any - # rule = findMatchingRule(rules, object_path) - # if rule == None: - # continue - # - # # Apply rule on object (find the plug-in script and apply) - # module = __import__(rule.policy) - # stats = module.process(ccdb, object_path, int(rule.delay), # rule.migration == "True", - # rule.extra_params) - # logging.info(f"{rule.policy} applied on {object_path}: {stats}") - pool.close() - logging.info( - f" *** DONE *** (total deleted: {ccdb.counter_deleted}, total updated: {ccdb.counter_validity_updated})") + logging.info(f" *** DONE *** (total deleted: {ccdb.counter_deleted}, total updated: {ccdb.counter_validity_updated})") storeSavedTimestamp() @@ -304,23 +293,19 @@ def run(args, ccdb_url, rules): # **************** def main(): - # logger = create_logger() - # logger.info('Starting pooling') - prepareLogger(logging.getLogger()) - # multiprocessing_logging.install_mp_handler() - # prepareLogger(mp.get_logger()) + prepare_main_logger() # Parse arguments args = parseArgs() logging.getLogger().setLevel(int(args.log_level)) try: - with PIDFile(filename='o2-qc-repo-cleaner.pid'): - ccdb_url, rules = readConfig(args) - run(args, ccdb_url, rules) - - except AlreadyRunningError: - print('Already running. Exiting.') + with pidfile.PIDFile(): + print('No other copies around, we go ahead.') + time.sleep(30) + except pidfile.AlreadyRunningError: + print('Already running, stopping.') + sys.exit(1) if __name__ == "__main__": # to be able to run the test code above when not imported. From b932b20f36e249c6fe11db27326bca02e6c8de70 Mon Sep 17 00:00:00 2001 From: Barthelemy Date: Tue, 20 Jul 2021 15:34:06 +0200 Subject: [PATCH 3/4] async parallel and arguemnts --- Framework/script/RepoCleaner/o2-qc-repo-cleaner | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/Framework/script/RepoCleaner/o2-qc-repo-cleaner b/Framework/script/RepoCleaner/o2-qc-repo-cleaner index 2b7863c312..3fd45c048c 100755 --- a/Framework/script/RepoCleaner/o2-qc-repo-cleaner +++ b/Framework/script/RepoCleaner/o2-qc-repo-cleaner @@ -83,6 +83,8 @@ def parseArgs(): help='Dry run, no actual deletion nor modification to the CCDB.') parser.add_argument('--only-path', dest='only_path', action='store', default="", help='Only work on given path (omit the initial slash).') + parser.add_argument('--workers', dest='workers', action='store', default="1", + help='Number of parallel workers.') args = parser.parse_args() dryable.set(args.dry_run) logging.info(args) @@ -258,7 +260,6 @@ def process_object(object_path, rules, ccdb): # Take the first matching rule, if any rule = findMatchingRule(rules, object_path) if rule is None: - logger.info("rulenone") return # Apply rule on object (find the plug-in script and apply) @@ -280,9 +281,11 @@ def run(args, ccdb_url, rules): # For each object call the first matching rule logging.info("Loop through the objects and apply first matching rule.") - pool = mp.Pool(4) - [pool.apply(process_object, args=(object_path, rules, ccdb)) for object_path in paths] + logging.info(f"workers: {args.workers}") + pool = mp.Pool(int(args.workers)) + [pool.apply_async(process_object, args=(object_path, rules, ccdb)) for object_path in paths] pool.close() + pool.join() logging.info(f" *** DONE *** (total deleted: {ccdb.counter_deleted}, total updated: {ccdb.counter_validity_updated})") storeSavedTimestamp() From 3dbd9dceb3e993b8e857df4c4da23d688ba3573e Mon Sep 17 00:00:00 2001 From: Barthelemy Date: Thu, 22 Jul 2021 09:56:15 +0200 Subject: [PATCH 4/4] fix bad merge --- Framework/script/RepoCleaner/o2-qc-repo-cleaner | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/Framework/script/RepoCleaner/o2-qc-repo-cleaner b/Framework/script/RepoCleaner/o2-qc-repo-cleaner index 3fd45c048c..b0b9c022f8 100755 --- a/Framework/script/RepoCleaner/o2-qc-repo-cleaner +++ b/Framework/script/RepoCleaner/o2-qc-repo-cleaner @@ -33,9 +33,8 @@ import time import consul import multiprocessing as mp -import pidfile - from Ccdb import Ccdb +from pidfile import PIDFile, AlreadyRunningError class Rule: @@ -237,6 +236,7 @@ def create_parallel_logger(): if not len(logger.handlers): logger.addHandler(h2) logger.addHandler(h1) + return logger def readConfig(args): @@ -277,7 +277,6 @@ def run(args, ccdb_url, rules): paths = [item for item in paths if item.startswith(args.only_path)] logging.debug(paths) logging.debug(len(paths)) - # For each object call the first matching rule logging.info("Loop through the objects and apply first matching rule.") @@ -303,12 +302,12 @@ def main(): logging.getLogger().setLevel(int(args.log_level)) try: - with pidfile.PIDFile(): - print('No other copies around, we go ahead.') - time.sleep(30) - except pidfile.AlreadyRunningError: - print('Already running, stopping.') - sys.exit(1) + with PIDFile(filename='o2-qc-repo-cleaner.pid'): + ccdb_url, rules = readConfig(args) + run(args, ccdb_url, rules) + + except AlreadyRunningError: + print('Already running. Exiting.') if __name__ == "__main__": # to be able to run the test code above when not imported.