diff --git a/git_hg_sync/__main__.py b/git_hg_sync/__main__.py index 6197915..a27b4c9 100644 --- a/git_hg_sync/__main__.py +++ b/git_hg_sync/__main__.py @@ -6,8 +6,8 @@ from mozlog import commandline from git_hg_sync import config -from git_hg_sync.pulse_consumer import Worker -from git_hg_sync.sync_repos import RepoSynchronyzer +from git_hg_sync.pulse_worker import PulseWorker +from git_hg_sync.repo_synchronizer import RepoSynchronyzer HERE = Path(__file__).parent @@ -52,7 +52,7 @@ def main(): repo_synchronyzer = RepoSynchronyzer(repos_config=repos_config) with connection as conn: logger.info(f"connected to {conn.host}") - worker = Worker(conn, queue, repo_synchronyzer=repo_synchronyzer) + worker = PulseWorker(conn, queue, repo_synchronyzer=repo_synchronyzer) worker.run() diff --git a/git_hg_sync/pulse_consumer.py b/git_hg_sync/pulse_consumer.py deleted file mode 100644 index 1d6a42b..0000000 --- a/git_hg_sync/pulse_consumer.py +++ /dev/null @@ -1,24 +0,0 @@ -from kombu.mixins import ConsumerMixin -from mozlog import get_proxy_logger - -logger = get_proxy_logger("pluse_consumer") - - -class Worker(ConsumerMixin): - - def __init__(self, connection, queue, *, repo_synchronyzer): - self.connection = connection - self.task_queue = queue - self.repo_synchronyzer = repo_synchronyzer - - def get_consumers(self, Consumer, channel): - consumer = Consumer( - self.task_queue, auto_declare=False, callbacks=[self.on_task] - ) - return [consumer] - - def on_task(self, body, message): - logger.info(f"Received message: {body}") - message.ack() - raw_entity = body["payload"] - self.repo_synchronyzer.sync(raw_entity) diff --git a/git_hg_sync/pulse_worker.py b/git_hg_sync/pulse_worker.py new file mode 100644 index 0000000..47c8319 --- /dev/null +++ b/git_hg_sync/pulse_worker.py @@ -0,0 +1,43 @@ +from kombu.mixins import ConsumerMixin +from mozlog import get_proxy_logger + +from git_hg_sync.repo_synchronizer import Push, Tag + +logger = get_proxy_logger("pluse_consumer") + + +class EntityTypeError(Exception): + pass + + +class PulseWorker(ConsumerMixin): + + def __init__(self, connection, queue, *, repo_synchronyzer): + self.connection = connection + self.task_queue = queue + self.repo_synchronyzer = repo_synchronyzer + + @staticmethod + def parse_entity(raw_entity): + logger.debug(f"parse_entity: {raw_entity}") + message_type = raw_entity.pop("type") + match message_type: + case "push": + return Push(**raw_entity) + case "tag": + return Tag(**raw_entity) + case _: + raise EntityTypeError(f"unsupported type {message_type}") + + def get_consumers(self, Consumer, channel): + consumer = Consumer( + self.task_queue, auto_declare=False, callbacks=[self.on_task] + ) + return [consumer] + + def on_task(self, body, message): + logger.info(f"Received message: {body}") + message.ack() + raw_entity = body["payload"] + parsed_message = PulseWorker.parse_entity(raw_entity) + self.repo_synchronyzer.sync(parsed_message) diff --git a/git_hg_sync/sync_repos.py b/git_hg_sync/repo_synchronizer.py similarity index 73% rename from git_hg_sync/sync_repos.py rename to git_hg_sync/repo_synchronizer.py index 06f0a82..82b8883 100644 --- a/git_hg_sync/sync_repos.py +++ b/git_hg_sync/repo_synchronizer.py @@ -7,13 +7,8 @@ logger = get_proxy_logger("sync_repo") -class EntityTypeError(Exception): - pass - - @dataclass class Push: - type: str repo_url: str heads: list[str] commits: list[str] @@ -25,7 +20,6 @@ class Push: @dataclass class Tag: - type: str repo_url: str tag: str commit: str @@ -40,16 +34,6 @@ class RepoSynchronyzer: def __init__(self, repos_config): self._repos_config = repos_config - def parse_entity(self, raw_entity): - logger.debug(f"parse_entity: {raw_entity}") - if raw_entity["type"] == "push": - entity = Push(**raw_entity) - elif raw_entity["type"] == "tag": - entity = Tag(**raw_entity) - else: - raise EntityTypeError(f"unsupported type {raw_entity['type']}") - return entity - def get_remote(self, repo, remote_url): """ get the repo if it exists, create it otherwise @@ -70,17 +54,17 @@ def handle_commits(self, entity, clone_dir, remote_url, remote_target): remote = self.get_remote(repo, remote_url) # fetch new commits remote.fetch() - if entity.type == "push": - remote.pull("branches/default/tip") - elif entity.type == "tag": - pass # TODO + match entity: + case Push(): + remote.pull("branches/default/tip") + case _: + pass # TODO # push on good repo/branch remote = repo.remote(remote_target) remote.push() logger.info(f"Done for entity {entity.pushid}") - def sync(self, raw_entity): - entity = self.parse_entity(raw_entity) + def sync(self, entity: Push | Tag) -> None: repo_config = self._repos_config.get(entity.repo_url) if not repo_config: logger.warning(f"repo {entity.repo_url} is not supported yet") diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..347e4ed --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,8 @@ +import pytest +import mozlog + + +@pytest.fixture(autouse=True) +def mozlog_logging(): + logger = mozlog.structuredlog.StructuredLogger("tests") + mozlog.structuredlog.set_default_logger(logger) diff --git a/tests/test_pulse_worker.py b/tests/test_pulse_worker.py new file mode 100644 index 0000000..b5d464b --- /dev/null +++ b/tests/test_pulse_worker.py @@ -0,0 +1,42 @@ +import pytest + +from git_hg_sync.pulse_worker import PulseWorker, EntityTypeError +from git_hg_sync.repo_synchronizer import Push, Tag + + +def raw_push_entity(): + return { + "type": "push", + "repo_url": "repo_url", + "heads": ["head"], + "commits": ["commit"], + "time": 0, + "pushid": 0, + "user": "user", + "push_json_url": "push_json_url", + } + + +def raw_tag_entity(): + return { + "type": "tag", + "repo_url": "repo_url", + "tag": "tag", + "commit": "commit", + "time": 0, + "pushid": 0, + "user": "user", + "push_json_url": "push_json_url", + } + + +def test_parse_entity_valid(): + push_entity = PulseWorker.parse_entity(raw_push_entity()) + assert isinstance(push_entity, Push) + tag_entity = PulseWorker.parse_entity(raw_tag_entity()) + assert isinstance(tag_entity, Tag) + + +def test_parse_invalid_type(): + with pytest.raises(EntityTypeError): + PulseWorker.parse_entity({"type": "unknown"}) diff --git a/tests/test_repo_synchronizer.py b/tests/test_repo_synchronizer.py new file mode 100644 index 0000000..16c03dd --- /dev/null +++ b/tests/test_repo_synchronizer.py @@ -0,0 +1,55 @@ +from pathlib import Path + +import pytest + +from git_hg_sync.__main__ import get_connection, get_queue +from git_hg_sync.repo_synchronizer import RepoSynchronyzer, Push + + +@pytest.fixture +def pulse_config(): + return { + "userid": "test_user", + "host": "pulse.mozilla.org", + "port": 5671, + "exchange": "exchange/test_user/test", + "routing_key": "#", + "queue": "queue/test_user/test", + "password": "PULSE_PASSWORD", + } + + +@pytest.fixture +def repos_config(): + return { + "repo_url": { + "clone": "bad_directory", + "remote": "remote", + "target": "target", + } + } + + +def test_sync_process_with_bad_repo(repos_config): + syncrepos = RepoSynchronyzer(repos_config=repos_config) + with pytest.raises(AssertionError) as e: + syncrepos.sync( + Push( + repo_url="repo_url", + heads=["head"], + commits=["commits"], + time=0, + pushid=0, + user="user", + push_json_url="push_json_url", + ) + ) + assert str(e.value) == f"clone {repos_config['repo_url']['clone']} doesn't exists" + + +def test_get_connection_and_queue(pulse_config): + connection = get_connection(pulse_config) + queue = get_queue(pulse_config) + assert connection.userid == pulse_config["userid"] + assert connection.host == f"{pulse_config['host']}:{pulse_config['port']}" + assert queue.name == pulse_config["queue"] diff --git a/tests/test_sync_repo.py b/tests/test_sync_repo.py deleted file mode 100644 index 18bdf2b..0000000 --- a/tests/test_sync_repo.py +++ /dev/null @@ -1,89 +0,0 @@ -from pathlib import Path - -import mozlog -import pytest - -from git_hg_sync import __main__, sync_repos - -HERE = Path(__file__).parent - - -@pytest.fixture -def pulse_config(): - return { - "userid": "test_user", - "host": "pulse.mozilla.org", - "port": 5671, - "exchange": "exchange/test_user/test", - "routing_key": "#", - "queue": "queue/test_user/test", - "password": "PULSE_PASSWORD", - } - - -@pytest.fixture -def repos_config(): - return { - "repo_url": { - "clone": "bad_directory", - "remote": "remote", - "target": "target", - } - } - - -raw_push_entity = { - "type": "push", - "repo_url": "repo_url", - "heads": ["head"], - "commits": ["commit"], - "time": 0, - "pushid": 0, - "user": "user", - "push_json_url": "push_json_url", -} - -raw_tag_entity = { - "type": "tag", - "repo_url": "repo_url", - "tag": "tag", - "commit": "commit", - "time": 0, - "pushid": 0, - "user": "user", - "push_json_url": "push_json_url", -} - - -def setup_module(): - logger = mozlog.structuredlog.StructuredLogger("tests") - mozlog.structuredlog.set_default_logger(logger) - - -def test_parse_entity(): - syncrepos = sync_repos.RepoSynchronyzer(None) - push_entity = syncrepos.parse_entity(raw_push_entity) - assert isinstance(push_entity, sync_repos.Push) - tag_entity = syncrepos.parse_entity(raw_tag_entity) - assert isinstance(tag_entity, sync_repos.Tag) - - -def test_sync_process_with_bad_type(): - syncrepos = sync_repos.RepoSynchronyzer(None) - with pytest.raises(sync_repos.EntityTypeError): - syncrepos.sync({"type": "badType"}) - - -def test_sync_process_with_bad_repo(repos_config): - syncrepos = sync_repos.RepoSynchronyzer(repos_config=repos_config) - with pytest.raises(AssertionError) as e: - syncrepos.sync(raw_push_entity) - assert str(e.value) == f"clone {repos_config['repo_url']['clone']} doesn't exists" - - -def test_get_connection_and_queue(pulse_config): - connection = __main__.get_connection(pulse_config) - queue = __main__.get_queue(pulse_config) - assert connection.userid == pulse_config["userid"] - assert connection.host == f"{pulse_config['host']}:{pulse_config['port']}" - assert queue.name == pulse_config["queue"]