diff --git a/README.rst b/README.rst index 05c17f9..0f661f0 100644 --- a/README.rst +++ b/README.rst @@ -206,6 +206,18 @@ To work around API limitation, you must first generate a Changes ======= +1.3.0 (August 21, 2018) +----------------------- + +* Improve configuration file parsing by mimicing + Kaptan's behavior of resolving handler by extension (#22) + +1.2.1 (July, 12, 2018) +---------------------- + +* show-closed-prs now displays merge status +* some documentation improvements + 1.2.0 (May, 17, 2017) --------------------- @@ -243,6 +255,7 @@ Contributors * Jairo Llopis (Tecnativa_) * Stéphane Bidoul (ACSONE_) * Dave Lasley (LasLabs_) +* Patric Tombez .. _ACSONE: https://www.acsone.eu .. _Tecnativa: https://www.tecnativa.com diff --git a/git_aggregator/config.py b/git_aggregator/config.py index 98c0869..ecb5cd4 100644 --- a/git_aggregator/config.py +++ b/git_aggregator/config.py @@ -134,8 +134,8 @@ def load_config(config, expand_env=False): if not os.path.exists(config): raise ConfigException('Unable to find configuration file: %s' % config) - fExt = os.path.splitext(config)[-1] - conf = kaptan.Kaptan(handler=fExt.lstrip('.')) + file_extension = os.path.splitext(config)[1][1:] + conf = kaptan.Kaptan(handler=kaptan.HANDLER_EXT.get(file_extension)) if expand_env: with open(config, 'r') as file_handler: diff --git a/git_aggregator/log.py b/git_aggregator/log.py index 931b482..c1cfe0e 100644 --- a/git_aggregator/log.py +++ b/git_aggregator/log.py @@ -40,8 +40,13 @@ def default_log_template(self, record): '%(name)s', Fore.RESET, Style.RESET_ALL, ' ' ] + threadName = [ + ' ', Fore.BLUE, Style.DIM, Style.BRIGHT, + '%(threadName)s ', + Fore.RESET, Style.RESET_ALL, ' ' + ] - tpl = "".join(reset + levelname + asctime + name + reset) + tpl = "".join(reset + levelname + asctime + name + threadName + reset) return tpl @@ -59,7 +64,7 @@ def format(self, record): except Exception as e: record.message = "Bad message (%r): %r" % (e, record.__dict__) - date_format = '%H:%m:%S' + date_format = '%H:%M:%S' record.asctime = time.strftime( date_format, self.converter(record.created) ) @@ -93,6 +98,11 @@ def debug_log_template(self, record): '%(name)s', Fore.RESET, Style.RESET_ALL, ' ' ] + threadName = [ + ' ', Fore.BLUE, Style.DIM, Style.BRIGHT, + '%(threadName)s ', + Fore.RESET, Style.RESET_ALL, ' ' + ] module_funcName = [ Fore.GREEN, Style.BRIGHT, '%(module)s.%(funcName)s()' @@ -103,7 +113,8 @@ def debug_log_template(self, record): ] tpl = ''.join( - reset + levelname + asctime + name + module_funcName + lineno + reset + reset + levelname + asctime + name + threadName + module_funcName + + lineno + reset ) return tpl diff --git a/git_aggregator/main.py b/git_aggregator/main.py index 36b0710..0848f37 100644 --- a/git_aggregator/main.py +++ b/git_aggregator/main.py @@ -4,11 +4,19 @@ import logging import os +import sys +import threading +import traceback +try: + from Queue import Queue, Empty as EmptyQueue +except ImportError: + from queue import Queue, Empty as EmptyQueue import argparse import argcomplete import fnmatch +from .utils import ThreadNameKeeper from .log import DebugLogFormatter from .log import LogFormatter from .config import load_config @@ -99,6 +107,16 @@ def get_parser(): help='Expand environment variables in configuration file', ) + main_parser.add_argument( + '-j', '--jobs', + dest='jobs', + default=1, + type=int, + help='Amount of processes to use when aggregating repos. ' + 'This is useful when there are a lot of large repos. ' + 'Set `1` or less to disable multiprocessing (default).', + ) + main_parser.add_argument( 'command', nargs='?', @@ -159,20 +177,70 @@ def load_aggregate(args): r.push() +def aggregate_repo(repo, args, sem, err_queue): + """Aggregate one repo according to the args. + + Args: + repo (Repo): The repository to aggregate. + args (argparse.Namespace): CLI arguments. + """ + try: + logger.debug('%s' % repo) + dirmatch = args.dirmatch + if not match_dir(repo.cwd, dirmatch): + logger.info("Skip %s", repo.cwd) + return + if args.command == 'aggregate': + repo.aggregate() + if args.do_push: + repo.push() + elif args.command == 'show-closed-prs': + repo.show_closed_prs() + except Exception: + err_queue.put_nowait(sys.exc_info()) + finally: + sem.release() + + def run(args): """Load YAML and JSON configs and run the command specified in args.command""" + repos = load_config(args.config, args.expand_env) - dirmatch = args.dirmatch + + jobs = max(args.jobs, 1) + threads = [] + sem = threading.Semaphore(jobs) + err_queue = Queue() + for repo_dict in repos: + if not err_queue.empty(): + break + + sem.acquire() r = Repo(**repo_dict) - logger.debug('%s' % r) - if not match_dir(r.cwd, dirmatch): - logger.info("Skip %s", r.cwd) - continue - if args.command == 'aggregate': - r.aggregate() - if args.do_push: - r.push() - elif args.command == 'show-closed-prs': - r.show_closed_prs() + tname = os.path.basename(repo_dict['cwd']) + + if jobs > 1: + t = threading.Thread( + target=aggregate_repo, args=(r, args, sem, err_queue)) + t.daemon = True + t.name = tname + threads.append(t) + t.start() + else: + with ThreadNameKeeper(): + threading.current_thread().name = tname + aggregate_repo(r, args, sem, err_queue) + + for t in threads: + t.join() + + if not err_queue.empty(): + while True: + try: + exc_type, exc_obj, exc_trace = err_queue.get_nowait() + except EmptyQueue: + break + traceback.print_exception(exc_type, exc_obj, exc_trace) + sys.exit(1) diff --git a/git_aggregator/repo.py b/git_aggregator/repo.py index 86d5d68..c13b176 100644 --- a/git_aggregator/repo.py +++ b/git_aggregator/repo.py @@ -11,7 +11,6 @@ import requests -from .utils import working_directory_keeper from .exception import GitAggregatorException from ._compat import console_to_str @@ -123,7 +122,7 @@ def init_git_version(cls, v_str): try: version = cls._git_version = tuple( int(x) for x in v_str.split()[2].split('.')[:3]) - except: + except Exception: raise ValueError("Could not parse git version output %r. Please " "report this" % v_str) return version @@ -166,25 +165,23 @@ def aggregate(self): logger.info('Start aggregation of %s', self.cwd) target_dir = self.cwd - with working_directory_keeper: - is_new = not os.path.exists(target_dir) - if is_new: - self.init_repository(target_dir) - - os.chdir(target_dir) - self._switch_to_branch(self.target['branch']) - for r in self.remotes: - self._set_remote(**r) - self.fetch() - merges = self.merges - if not is_new: - # reset to the first merge - origin = merges[0] - merges = merges[1:] - self._reset_to(origin["remote"], origin["ref"]) - for merge in merges: - self._merge(merge) - self._execute_shell_command_after() + is_new = not os.path.exists(target_dir) + if is_new: + self.init_repository(target_dir) + + self._switch_to_branch(self.target['branch']) + for r in self.remotes: + self._set_remote(**r) + self.fetch() + merges = self.merges + if not is_new: + # reset to the first merge + origin = merges[0] + merges = merges[1:] + self._reset_to(origin["remote"], origin["ref"]) + for merge in merges: + self._merge(merge) + self._execute_shell_command_after() logger.info('End aggregation of %s', self.cwd) def init_repository(self, target_dir): @@ -204,9 +201,7 @@ def push(self): remote = self.target['remote'] branch = self.target['branch'] logger.info("Push %s to %s", branch, remote) - with working_directory_keeper: - os.chdir(self.cwd) - self.log_call(['git', 'push', '-f', remote, branch]) + self.log_call(['git', 'push', '-f', remote, branch], cwd=self.cwd) def _fetch_options(self, merge): """Get the fetch options from the given merge dict.""" @@ -227,17 +222,17 @@ def _reset_to(self, remote, ref): cmd = ['git', 'reset', '--hard', sha] if logger.getEffectiveLevel() != logging.DEBUG: cmd.insert(2, '--quiet') - self.log_call(cmd) + self.log_call(cmd, cwd=self.cwd) def _switch_to_branch(self, branch_name): # check if the branch already exists logger.info("Switch to branch %s", branch_name) - self.log_call(['git', 'checkout', '-B', branch_name]) + self.log_call(['git', 'checkout', '-B', branch_name], cwd=self.cwd) def _execute_shell_command_after(self): logger.info('Execute shell after commands') for cmd in self.shell_command_after: - self.log_call(cmd, shell=True) + self.log_call(cmd, shell=True, cwd=self.cwd) def _merge(self, merge): logger.info("Pull %s, %s", merge["remote"], merge["ref"]) @@ -250,12 +245,13 @@ def _merge(self, merge): if logger.getEffectiveLevel() != logging.DEBUG: cmd += ('--quiet',) cmd += self._fetch_options(merge) + (merge["remote"], merge["ref"]) - self.log_call(cmd) + self.log_call(cmd, cwd=self.cwd) def _get_remotes(self): lines = self.log_call( ['git', 'remote', '-v'], - callwith=subprocess.check_output).splitlines() + callwith=subprocess.check_output, + cwd=self.cwd).splitlines() remotes = {} for line in lines: name, url = line.split('\t') @@ -281,12 +277,12 @@ def _set_remote(self, name, url): return if not exising_url: logger.info('Adding remote %s <%s>', name, url) - self.log_call(['git', 'remote', 'add', name, url]) + self.log_call(['git', 'remote', 'add', name, url], cwd=self.cwd) else: logger.info('Remote remote %s <%s> -> <%s>', name, exising_url, url) - self.log_call(['git', 'remote', 'rm', name]) - self.log_call(['git', 'remote', 'add', name, url]) + self.log_call(['git', 'remote', 'rm', name], cwd=self.cwd) + self.log_call(['git', 'remote', 'add', name, url], cwd=self.cwd) def _github_api_get(self, path): url = 'https://api.github.com' + path @@ -297,8 +293,8 @@ def _github_api_get(self, path): def show_closed_prs(self): REPO_RE = re.compile( - '^(https://github.com/|git@github.com:)' - '(?P.*?)/(?P.*?)(.git)?$') + '^(https://github.com/|git@github.com:)' + '(?P.*?)/(?P.*?)(.git)?$') PULL_RE = re.compile( '^(refs/)?pull/(?P[0-9]+)/head$') remotes = {r['name']: r['url'] for r in self.remotes} @@ -325,6 +321,7 @@ def show_closed_prs(self): format(**locals())) continue state = r.json().get('state') + merged = (not r.json().get('merged') and 'not ' or '') + 'merged' if state != 'open': logger.info('https://github.com/{owner}/{repo}/pull/{pr} ' - 'in state {state}'.format(**locals())) + 'in state {state} ({merged})'.format(**locals())) diff --git a/git_aggregator/utils.py b/git_aggregator/utils.py index ab242c2..063e787 100644 --- a/git_aggregator/utils.py +++ b/git_aggregator/utils.py @@ -3,11 +3,12 @@ # © ANYBOX https://github.com/anybox/anybox.recipe.odoo # License AGPLv3 (http://www.gnu.org/licenses/agpl-3.0-standalone.html) import os +import threading import logging logger = logging.getLogger(__name__) -class WorkingDirectoryKeeper(object): +class WorkingDirectoryKeeper(object): # DEPRECATED """A context manager to get back the working directory as it was before. If you want to stack working directory keepers, you need a new instance for each stage. @@ -27,3 +28,15 @@ def __exit__(self, *exc_args): working_directory_keeper = WorkingDirectoryKeeper() + + +class ThreadNameKeeper(object): + """A contect manager to get back the thread name as it was before. It + is meant to be used when modifying the 'MainThread' tread. + """ + + def __enter__(self): + self._name = threading.current_thread().name + + def __exit__(self, *exc_args): + threading.current_thread().name = self._name diff --git a/tests/test_log.py b/tests/test_log.py new file mode 100644 index 0000000..b588e2a --- /dev/null +++ b/tests/test_log.py @@ -0,0 +1,56 @@ +# -*- coding: utf-8 -*- +# License AGPLv3 (http://www.gnu.org/licenses/agpl-3.0-standalone.html) + +import logging +import threading +import unittest + +from git_aggregator import main +from git_aggregator.utils import ThreadNameKeeper + +logger = logging.getLogger(__name__) + + +def reset_logger(): + for handler in logger.handlers[:]: + logger.removeHandler(handler) + + +class TestLog(unittest.TestCase): + + def setUp(self): + """ Setup """ + super(TestLog, self).setUp() + reset_logger() + + def test_info(self): + """ Test log.LogFormatter. """ + main.setup_logger(logger, level=logging.INFO) + # self._suite = unittest.TestLoader().loadTestsFromName( + # 'tests.test_repo.TestRepo.test_multithreading') + # unittest.TextTestRunner(verbosity=0).run(self._suite) + logger.debug('This message SHOULD NOT be visible.') + logger.info('Message from MainThread.') + with ThreadNameKeeper(): + name = threading.current_thread().name = 'repo_name' + logger.info('Message from %s.', name) + logger.info('Hello again from MainThread.') + + def test_debug(self): + """ Test log.DebugLogFormatter. """ + main.setup_logger(logger, level=logging.DEBUG) + logger.debug('This message SHOULD be visible.') + logger.info('Message from MainThread.') + with ThreadNameKeeper(): + name = threading.current_thread().name = 'repo_name' + logger.info('Message from %s.', name) + logger.info('Hello again from MainThread.') + + def test_colors(self): + """ Test log.LEVEL_COLORS. """ + main.setup_logger(logger, level=logging.DEBUG) + logger.debug('Color: Fore.BLUE') + logger.info('Color: Fore.GREEN') + logger.warning('Color: Fore.YELLOW') + logger.error('Color: Fore.RED') + logger.critical('Color: Fore.RED') diff --git a/tests/test_repo.py b/tests/test_repo.py index 12e8daa..b839900 100644 --- a/tests/test_repo.py +++ b/tests/test_repo.py @@ -3,6 +3,7 @@ # License AGPLv3 (http://www.gnu.org/licenses/agpl-3.0-standalone.html) # Parts of the code comes from ANYBOX # https://github.com/anybox/anybox.recipe.odoo +import argparse import os import shutil import unittest @@ -17,6 +18,8 @@ from urllib.request import pathname2url import logging from tempfile import mkdtemp +from textwrap import dedent + from git_aggregator.utils import WorkingDirectoryKeeper,\ working_directory_keeper @@ -46,7 +49,8 @@ def git_write_commit(repo_dir, filepath, contents, msg="Unit test commit"): with open(filepath, 'w') as f: f.write(contents) subprocess.call(['git', 'add', filepath]) - subprocess.call(['git', 'commit', '-m', msg]) + # Ignore local hooks with '-n' + subprocess.call(['git', 'commit', '-n', '-m', msg]) return subprocess.check_output( ['git', 'rev-parse', '--verify', 'HEAD']).strip() @@ -280,3 +284,58 @@ def test_depth(self): self.assertEqual(len(log_r1.splitlines()), 2) # Full fetch: all 3 commits self.assertEqual(len(log_r2.splitlines()), 2) + + def test_multithreading(self): + """Clone two repos and do a merge in a third one, simultaneously.""" + config_yaml = os.path.join(self.sandbox, 'config.yaml') + with open(config_yaml, 'w') as f: + f.write(dedent(""" + ./repo1: + remotes: + r1: %(r1_remote_url)s + merges: + - r1 tag1 + target: r1 agg + ./repo2: + remotes: + r2: %(r2_remote_url)s + merges: + - r2 b2 + target: r2 agg + ./repo3: + remotes: + r1: %(r1_remote_url)s + r2: %(r2_remote_url)s + merges: + - r1 tag1 + - r2 b2 + target: r1 agg + """ % { + 'r1_remote_url': self.url_remote1, + 'r2_remote_url': self.url_remote2, + })) + + args = argparse.Namespace( + command='aggregate', + config=config_yaml, + jobs=3, + dirmatch=None, + do_push=False, + expand_env=False) + + with working_directory_keeper: + os.chdir(self.sandbox) + main.run(args) + + repo1_dir = os.path.join(self.sandbox, 'repo1') + repo2_dir = os.path.join(self.sandbox, 'repo2') + repo3_dir = os.path.join(self.sandbox, 'repo3') + + self.assertTrue(os.path.isfile(os.path.join(repo1_dir, 'tracked'))) + self.assertFalse(os.path.isfile(os.path.join(repo1_dir, 'tracked2'))) + + self.assertTrue(os.path.isfile(os.path.join(repo2_dir, 'tracked'))) + self.assertTrue(os.path.isfile(os.path.join(repo2_dir, 'tracked2'))) + + self.assertTrue(os.path.isfile(os.path.join(repo3_dir, 'tracked'))) + self.assertTrue(os.path.isfile(os.path.join(repo3_dir, 'tracked2')))