Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separation of concerns #2

Merged
merged 6 commits into from
Jul 25, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions git_hg_sync/__main__.py
Original file line number Diff line number Diff line change
@@ -6,8 +6,8 @@
from mozlog import commandline

from git_hg_sync import config
from git_hg_sync.pulse_consumer import Worker
from git_hg_sync.sync_repos import RepoSynchronyzer
from git_hg_sync.pulse_worker import PulseWorker
from git_hg_sync.repo_synchronizer import RepoSynchronyzer

HERE = Path(__file__).parent

@@ -52,7 +52,7 @@ def main():
repo_synchronyzer = RepoSynchronyzer(repos_config=repos_config)
with connection as conn:
logger.info(f"connected to {conn.host}")
worker = Worker(conn, queue, repo_synchronyzer=repo_synchronyzer)
worker = PulseWorker(conn, queue, repo_synchronyzer=repo_synchronyzer)
worker.run()


24 changes: 0 additions & 24 deletions git_hg_sync/pulse_consumer.py

This file was deleted.

43 changes: 43 additions & 0 deletions git_hg_sync/pulse_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from kombu.mixins import ConsumerMixin
from mozlog import get_proxy_logger

from git_hg_sync.repo_synchronizer import Push, Tag

logger = get_proxy_logger("pluse_consumer")


class EntityTypeError(Exception):
pass


class PulseWorker(ConsumerMixin):

def __init__(self, connection, queue, *, repo_synchronyzer):
self.connection = connection
self.task_queue = queue
self.repo_synchronyzer = repo_synchronyzer

@staticmethod
def parse_entity(raw_entity):
logger.debug(f"parse_entity: {raw_entity}")
message_type = raw_entity.pop("type")
match message_type:
case "push":
return Push(**raw_entity)
case "tag":
return Tag(**raw_entity)
case _:
raise EntityTypeError(f"unsupported type {message_type}")

def get_consumers(self, Consumer, channel):
consumer = Consumer(
self.task_queue, auto_declare=False, callbacks=[self.on_task]
)
return [consumer]

def on_task(self, body, message):
logger.info(f"Received message: {body}")
message.ack()
raw_entity = body["payload"]
parsed_message = PulseWorker.parse_entity(raw_entity)
self.repo_synchronyzer.sync(parsed_message)
28 changes: 6 additions & 22 deletions git_hg_sync/sync_repos.py → git_hg_sync/repo_synchronizer.py
Original file line number Diff line number Diff line change
@@ -7,13 +7,8 @@
logger = get_proxy_logger("sync_repo")


class EntityTypeError(Exception):
pass


@dataclass
class Push:
type: str
repo_url: str
heads: list[str]
commits: list[str]
@@ -25,7 +20,6 @@ class Push:

@dataclass
class Tag:
type: str
repo_url: str
tag: str
commit: str
@@ -40,16 +34,6 @@ class RepoSynchronyzer:
def __init__(self, repos_config):
self._repos_config = repos_config

def parse_entity(self, raw_entity):
logger.debug(f"parse_entity: {raw_entity}")
if raw_entity["type"] == "push":
entity = Push(**raw_entity)
elif raw_entity["type"] == "tag":
entity = Tag(**raw_entity)
else:
raise EntityTypeError(f"unsupported type {raw_entity['type']}")
return entity

def get_remote(self, repo, remote_url):
"""
get the repo if it exists, create it otherwise
@@ -70,17 +54,17 @@ def handle_commits(self, entity, clone_dir, remote_url, remote_target):
remote = self.get_remote(repo, remote_url)
# fetch new commits
remote.fetch()
if entity.type == "push":
remote.pull("branches/default/tip")
elif entity.type == "tag":
pass # TODO
match entity:
case Push():
remote.pull("branches/default/tip")
case _:
pass # TODO
# push on good repo/branch
remote = repo.remote(remote_target)
remote.push()
logger.info(f"Done for entity {entity.pushid}")

def sync(self, raw_entity):
entity = self.parse_entity(raw_entity)
def sync(self, entity: Push | Tag) -> None:
repo_config = self._repos_config.get(entity.repo_url)
if not repo_config:
logger.warning(f"repo {entity.repo_url} is not supported yet")
8 changes: 8 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import pytest
import mozlog


@pytest.fixture(autouse=True)
def mozlog_logging():
logger = mozlog.structuredlog.StructuredLogger("tests")
mozlog.structuredlog.set_default_logger(logger)
42 changes: 42 additions & 0 deletions tests/test_pulse_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import pytest

from git_hg_sync.pulse_worker import PulseWorker, EntityTypeError
from git_hg_sync.repo_synchronizer import Push, Tag


def raw_push_entity():
return {
"type": "push",
"repo_url": "repo_url",
"heads": ["head"],
"commits": ["commit"],
"time": 0,
"pushid": 0,
"user": "user",
"push_json_url": "push_json_url",
}


def raw_tag_entity():
return {
"type": "tag",
"repo_url": "repo_url",
"tag": "tag",
"commit": "commit",
"time": 0,
"pushid": 0,
"user": "user",
"push_json_url": "push_json_url",
}


def test_parse_entity_valid():
push_entity = PulseWorker.parse_entity(raw_push_entity())
assert isinstance(push_entity, Push)
tag_entity = PulseWorker.parse_entity(raw_tag_entity())
assert isinstance(tag_entity, Tag)


def test_parse_invalid_type():
with pytest.raises(EntityTypeError):
PulseWorker.parse_entity({"type": "unknown"})
55 changes: 55 additions & 0 deletions tests/test_repo_synchronizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from pathlib import Path

import pytest

from git_hg_sync.__main__ import get_connection, get_queue
from git_hg_sync.repo_synchronizer import RepoSynchronyzer, Push


@pytest.fixture
def pulse_config():
return {
"userid": "test_user",
"host": "pulse.mozilla.org",
"port": 5671,
"exchange": "exchange/test_user/test",
"routing_key": "#",
"queue": "queue/test_user/test",
"password": "PULSE_PASSWORD",
}


@pytest.fixture
def repos_config():
return {
"repo_url": {
"clone": "bad_directory",
"remote": "remote",
"target": "target",
}
}


def test_sync_process_with_bad_repo(repos_config):
syncrepos = RepoSynchronyzer(repos_config=repos_config)
with pytest.raises(AssertionError) as e:
syncrepos.sync(
Push(
repo_url="repo_url",
heads=["head"],
commits=["commits"],
time=0,
pushid=0,
user="user",
push_json_url="push_json_url",
)
)
assert str(e.value) == f"clone {repos_config['repo_url']['clone']} doesn't exists"


def test_get_connection_and_queue(pulse_config):
connection = get_connection(pulse_config)
queue = get_queue(pulse_config)
assert connection.userid == pulse_config["userid"]
assert connection.host == f"{pulse_config['host']}:{pulse_config['port']}"
assert queue.name == pulse_config["queue"]
89 changes: 0 additions & 89 deletions tests/test_sync_repo.py

This file was deleted.

Loading