diff --git a/config.ini.example b/config.ini.example deleted file mode 100644 index 860c76a..0000000 --- a/config.ini.example +++ /dev/null @@ -1,8 +0,0 @@ -[pulse] -userid = <username> -host = pulse.mozilla.org -port = 5671 -exchange = exchange/<username>/test -routing_key = # -queue = queue/<username>/test -password = <pulse-password> diff --git a/config.toml.example b/config.toml.example new file mode 100644 index 0000000..76cbb44 --- /dev/null +++ b/config.toml.example @@ -0,0 +1,18 @@ +[pulse] +userid = "<username>" +host = "pulse.mozilla.org" +port = 5671 +exchange = "exchange/<username>/test" +routing_key = "" +queue = "queue/<username>/test" +password = "<pulse-password>" + +[clones] +directory = "../git_hg_sync_repos/" + +[mappings.test] +git_repository = "/home/fbessou/dev/MOZI/fake-forge/git/test" + +[mappings.test.rules.esr1] +branch_pattern = "test-esr1" +mercurial_repository = "/home/fbessou/dev/MOZI/fake-forge/hg/test-esr1" \ No newline at end of file diff --git a/git_hg_sync/__main__.py b/git_hg_sync/__main__.py index a27b4c9..ec7c762 100644 --- a/git_hg_sync/__main__.py +++ b/git_hg_sync/__main__.py @@ -5,7 +5,7 @@ from kombu import Connection, Exchange, Queue from mozlog import commandline -from git_hg_sync import config +from git_hg_sync.config import Config from git_hg_sync.pulse_worker import PulseWorker from git_hg_sync.repo_synchronizer import RepoSynchronyzer @@ -19,37 +19,38 @@ def get_parser(): def get_connection(config): return Connection( - hostname=config["host"], - port=config["port"], - userid=config["userid"], - password=config["password"], + hostname=config.host, + port=config.port, + userid=config.userid, + password=config.password, heartbeat=10, ssl=True, ) def get_queue(config): - exchange = Exchange(config["exchange"], type="topic") + exchange = Exchange(config.exchange, type="topic") return Queue( - name=config["queue"], + name=config.queue, exchange=exchange, - routing_key=config["routing_key"], + routing_key=config.routing_key, exclusive=False, ) -def main(): +def main() -> None: parser = get_parser() commandline.add_logging_group(parser) args = parser.parse_args() logger = commandline.setup_logging("service", args, {"raw": sys.stdout}) - pulse_config = config.get_pulse_config(HERE.parent / "config.ini")["pulse"] + config = Config.from_file(HERE.parent / "config.toml") + pulse_config = config.pulse connection = get_connection(pulse_config) - repos_config = config.get_repos_config(HERE.parent / "repos.json") - queue = get_queue(pulse_config) - repo_synchronyzer = RepoSynchronyzer(repos_config=repos_config) + repo_synchronyzer = RepoSynchronyzer( + clones_directory=config.clones.directory, mappings=config.mappings + ) with connection as conn: logger.info(f"connected to {conn.host}") worker = PulseWorker(conn, queue, repo_synchronyzer=repo_synchronyzer) diff --git a/git_hg_sync/config.py b/git_hg_sync/config.py index 3735bad..8cb430b 100644 --- a/git_hg_sync/config.py +++ b/git_hg_sync/config.py @@ -1,16 +1,55 @@ -import configparser -import json +import pathlib +import tomllib +from collections import Counter +import pydantic -def get_pulse_config(config_file_path): - assert config_file_path.exists(), f"config file {config_file_path} doesn't exists" - config = configparser.ConfigParser() - config.read(config_file_path) - return config +class PulseConfig(pydantic.BaseModel): + userid: str + host: str + port: int + exchange: str + routing_key: str + queue: str + password: str -def get_repos_config(repo_file_path): - assert repo_file_path.exists(), f"config file {repo_file_path} doesn't exists" - with open(repo_file_path) as f: - repos = json.load(f) - return repos + +class MappingRule(pydantic.BaseModel): + branch_pattern: str + mercurial_repository: str + + +class MappingConfig(pydantic.BaseModel): + git_repository: str + rules: dict[str, MappingRule] + + +class ClonesConfig(pydantic.BaseModel): + directory: pathlib.Path + + +class Config(pydantic.BaseModel): + pulse: PulseConfig + clones: ClonesConfig + mappings: dict[str, MappingConfig] + + @pydantic.field_validator("mappings") + @staticmethod + def no_duplicate_git_repositories( + mappings: dict[str, MappingConfig] + ) -> dict[str, MappingConfig]: + counter = Counter([mapping.git_repository for mapping in mappings.values()]) + for git_repo, count in counter.items(): + if count > 1: + raise ValueError( + f"Found {count} different mappings for the same git repository." + ) + return mappings + + @staticmethod + def from_file(file_path: pathlib.Path) -> "Config": + assert file_path.exists(), f"config file {file_path} doesn't exists" + with open(file_path, "rb") as config_file: + config = tomllib.load(config_file) + return Config(**config) diff --git a/git_hg_sync/repo_synchronizer.py b/git_hg_sync/repo_synchronizer.py index d019a65..567796c 100644 --- a/git_hg_sync/repo_synchronizer.py +++ b/git_hg_sync/repo_synchronizer.py @@ -1,17 +1,21 @@ from dataclasses import dataclass from pathlib import Path +from typing import Literal from git import Repo from mozlog import get_proxy_logger +from git_hg_sync.config import MappingConfig + logger = get_proxy_logger("sync_repo") @dataclass class Push: repo_url: str - heads: list[str] - commits: list[str] + branches: dict[ + str, str + ] # Mapping between branch names (key) and corresponding commit sha (value) time: int pushid: int user: str @@ -31,45 +35,81 @@ class Tag: class RepoSynchronyzer: - def __init__(self, repos_config): - self._repos_config = repos_config + def __init__(self, clones_directory: Path, mappings: dict[str, MappingConfig]): + self._clones_directory = clones_directory + self._mappings = mappings - def get_remote(self, repo, remote_url): + def get_remote(self, repo, remote_name: Literal["git", "hg"], remote_url: str): """ get the repo if it exists, create it otherwise the repo name is the last part of the url """ - remote_name = remote_url.split("/")[-1] for rem in repo.remotes: if rem.name == remote_name: - return repo.remote(remote_name) - break + remote = repo.remote(remote_name) + remote.set_url(remote_url, allow_unsafe_protocols=True) + return remote else: - return repo.create_remote(remote_name, remote_url) + return repo.create_remote( + remote_name, remote_url, allow_unsafe_protocols=True + ) - def handle_commits(self, entity, clone_dir, remote_url, remote_target): - logger.info(f"Handle entity {entity.pushid}") + def handle_commits( + self, push_message: Push, clone_dir: Path, mapping: MappingConfig + ): + logger.info(f"Handle entity {push_message.pushid}") assert Path(clone_dir).exists(), f"clone {clone_dir} doesn't exists" repo = Repo(clone_dir) - remote = self.get_remote(repo, remote_url) + remote = self.get_remote(repo, "git", mapping.git_repository) # fetch new commits - remote.fetch() - match entity: - case Push(): - for head in entity.heads: - remote.pull(head) - case _: - pass # TODO - # push on good repo/branch - remote = repo.remote(remote_target) - remote.push() - logger.info(f"Done for entity {entity.pushid}") + for branch_name, commit in push_message.branches.items(): + remote.fetch(commit) + if branch_name in repo.branches: + branch = repo.branches[branch_name] + branch.commit = commit + else: + branch = repo.create_head(branch_name, commit) + breakpoint() + for rule_name, rule in mapping.rules.items(): + if rule.branch_pattern == branch_name: + remote = self.get_remote( + repo, "hg", "hg::" + rule.mercurial_repository + ) + remote.push(branch.name) + + # match push_message: + # case Push(): + # for head in push_message.heads: + # remote.pull(head) + # case _: + # pass # TODO + ## push on good repo/branch + # remote = self.get_remote(repo, "hg", "hg::" + mapping.rules.mercurial_repository) + # remote.push(push_message.heads) + # logger.info(f"Done for entity {push_message.pushid}") def sync(self, entity: Push | Tag) -> None: - repo_config = self._repos_config.get(entity.repo_url) - if not repo_config: - logger.warning(f"repo {entity.repo_url} is not supported yet") + if isinstance(entity, Tag): + logger.warning("Tag message not handled not implemented yet") + return + + matching_mappings = [ + (mapping_name, mapping) + for mapping_name, mapping in self._mappings.items() + if mapping.git_repository == entity.repo_url + ] + if not matching_mappings: + logger.warning(f"No mapping found for git repository {entity.repo_url} ") return + + if len(matching_mappings) > 1: + logger.warning(f"No mapping found for git repository {entity.repo_url} ") + return + + mapping_name, mapping = matching_mappings[0] + clone_directory = self._clones_directory / mapping_name self.handle_commits( - entity, repo_config["clone"], entity.repo_url, repo_config["target"] + entity, + clone_directory, + mapping, ) diff --git a/pyproject.toml b/pyproject.toml index 2c6d7c2..aa215b1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ name = "git-hg-sync" readme = "README.md" requires-python = ">=3.10" version = "0.1" -dependencies = ['kombu', 'mozillapulse', 'GitPython', 'mozlog'] +dependencies = ['kombu', 'mozillapulse', 'GitPython', 'mozlog', "pydantic"] [tool.ruff] line-length = 100 diff --git a/repos.json b/repos.json deleted file mode 100644 index 7960814..0000000 --- a/repos.json +++ /dev/null @@ -1,6 +0,0 @@ -{ - "https://github.com/djangoliv/chatzilla.git": { - "clone": "clones/chatzilla_cinnabar", - "target": "origin" - } -} diff --git a/tests/pulse_utils.py b/tests/pulse_utils.py index a4b8aae..ab67585 100644 --- a/tests/pulse_utils.py +++ b/tests/pulse_utils.py @@ -1,9 +1,10 @@ from datetime import datetime from pathlib import Path +import sys import kombu -from git_hg_sync import config +from git_hg_sync.config import Config HERE = Path(__file__).parent @@ -14,13 +15,13 @@ def send_pulse_message(pulse_config, payload): The Pulse message will be constructed from the specified payload and sent to the requested exchange. """ - userid = pulse_config["userid"] - password = pulse_config["password"] - routing_key = pulse_config["routing_key"] - host = pulse_config["host"] - port = pulse_config["port"] - exchange = pulse_config["exchange"] - queue = pulse_config["queue"] + userid = pulse_config.userid + password = pulse_config.password + routing_key = pulse_config.routing_key + host = pulse_config.host + port = pulse_config.port + exchange = pulse_config.exchange + queue = pulse_config.queue print(f"connecting to pulse at {host}:{port} as {userid}") connection = kombu.Connection( @@ -64,15 +65,33 @@ def send_pulse_message(pulse_config, payload): if __name__ == "__main__": - payload = { - "type": "tag", - "repo_url": "https://github.com/djangoliv/chatzilla.git", - "tag": "truc", - "commit": "88949ac3ad633e92cf52354d91857074e264ad12", - "time": 0, - "pushid": 0, - "user": "user", - "push_json_url": "push_json_url", - } - pulse_conf = config.get_pulse_config(HERE.parent / "config.ini")["pulse"] - send_pulse_message(pulse_conf, payload) + config = Config.from_file(HERE.parent / "config.toml") + config.mappings + message_type = sys.argv[1] + mapping = config.mappings[sys.argv[2]] + match message_type: + case "push": + payload = { + "type": "push", + "repo_url": mapping.git_repository, + "branches": {sys.argv[3]: sys.argv[4]}, + "time": 0, + "pushid": 0, + "user": "user", + "push_json_url": "push_json_url", + } + case "tag": + payload = { + "type": "tag", + "repo_url": "/home/fbessou/dev/MOZI/fake-forge/git/chatzilla", + "tag": "tag", + "commit": "88949ac3ad633e92cf52354d91857074e264ad12", + "time": 0, + "pushid": 0, + "user": "user", + "push_json_url": "push_json_url", + } + case _: + raise NotImplementedError() + print(payload) + send_pulse_message(config.pulse, payload) diff --git a/tests/test_pulse_worker.py b/tests/test_pulse_worker.py index 43ae8a5..dc314f1 100644 --- a/tests/test_pulse_worker.py +++ b/tests/test_pulse_worker.py @@ -8,8 +8,7 @@ def raw_push_entity(): return { "type": "push", "repo_url": "repo_url", - "heads": ["head"], - "commits": ["commit"], + "branches": {"mybranch": "acommitsha"}, "time": 0, "pushid": 0, "user": "user", diff --git a/tests/test_repo_synchronizer.py b/tests/test_repo_synchronizer.py index c1c2630..1aa1362 100644 --- a/tests/test_repo_synchronizer.py +++ b/tests/test_repo_synchronizer.py @@ -1,12 +1,13 @@ import pytest +from git_hg_sync.config import PulseConfig, MappingConfig from git_hg_sync.__main__ import get_connection, get_queue from git_hg_sync.repo_synchronizer import Push, RepoSynchronyzer @pytest.fixture def pulse_config(): - return { + return PulseConfig(**{ "userid": "test_user", "host": "pulse.mozilla.org", "port": 5671, @@ -14,40 +15,43 @@ def pulse_config(): "routing_key": "#", "queue": "queue/test_user/test", "password": "PULSE_PASSWORD", - } + }) @pytest.fixture -def repos_config(): +def mappings(): return { - "repo_url": { - "clone": "bad_directory", - "remote": "remote", - "target": "target", - } + "myrepo": MappingConfig(**{ + "git_repository": "myforge/myrepo.git", + "rules": { + "rule1": { + "branch_pattern": "branch_name", + "mercurial_repository": "myforge/myhgrepo" + + } + } + }) } -def test_sync_process_with_bad_repo(repos_config): - syncrepos = RepoSynchronyzer(repos_config=repos_config) - with pytest.raises(AssertionError) as e: - syncrepos.sync( - Push( - repo_url="repo_url", - heads=["head"], - commits=["commits"], - time=0, - pushid=0, - user="user", - push_json_url="push_json_url", - ) +def test_sync_process_with_bad_repo(tmp_path, mappings): + syncrepos = RepoSynchronyzer(tmp_path / "clones", mappings) + syncrepos.sync( + Push( + repo_url="repo_url", + branches={"branch1": "anothercommitsha"}, + time=0, + pushid=0, + user="user", + push_json_url="push_json_url", ) - assert str(e.value) == f"clone {repos_config['repo_url']['clone']} doesn't exists" + ) + # TODO finish that test (check that warning was triggered) def test_get_connection_and_queue(pulse_config): connection = get_connection(pulse_config) queue = get_queue(pulse_config) - assert connection.userid == pulse_config["userid"] - assert connection.host == f"{pulse_config['host']}:{pulse_config['port']}" - assert queue.name == pulse_config["queue"] + assert connection.userid == pulse_config.userid + assert connection.host == f"{pulse_config.host}:{pulse_config.port}" + assert queue.name == pulse_config.queue