Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separation of concerns #2

Merged
merged 6 commits into from
Jul 25, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor: move json message parsing to pulse_worker
  • Loading branch information
fbessou committed Jul 24, 2024
commit 6b81ddf2fbbae9aa81af2e3d9997ba46e0838334
21 changes: 20 additions & 1 deletion git_hg_sync/pulse_worker.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,34 @@
from kombu.mixins import ConsumerMixin
from mozlog import get_proxy_logger

from git_hg_sync.repo_synchronizer import Push, Tag

logger = get_proxy_logger("pluse_consumer")


class EntityTypeError(Exception):
pass


class PulseWorker(ConsumerMixin):

def __init__(self, connection, queue, *, repo_synchronyzer):
self.connection = connection
self.task_queue = queue
self.repo_synchronyzer = repo_synchronyzer

@staticmethod
def parse_entity(raw_entity):
logger.debug(f"parse_entity: {raw_entity}")
message_type = raw_entity.pop("type")
match message_type:
case "push":
return Push(**raw_entity)
case "tag":
return Tag(**raw_entity)
case _:
raise EntityTypeError(f"unsupported type {message_type}")

def get_consumers(self, Consumer, channel):
consumer = Consumer(
self.task_queue, auto_declare=False, callbacks=[self.on_task]
@@ -21,4 +39,5 @@ def on_task(self, body, message):
logger.info(f"Received message: {body}")
message.ack()
raw_entity = body["payload"]
self.repo_synchronyzer.sync(raw_entity)
parsed_message = PulseWorker.parse_entity(raw_entity)
self.repo_synchronyzer.sync(parsed_message)
18 changes: 1 addition & 17 deletions git_hg_sync/repo_synchronizer.py
Original file line number Diff line number Diff line change
@@ -7,10 +7,6 @@
logger = get_proxy_logger("sync_repo")


class EntityTypeError(Exception):
pass


@dataclass
class Push:
repo_url: str
@@ -38,17 +34,6 @@ class RepoSynchronyzer:
def __init__(self, repos_config):
self._repos_config = repos_config

def parse_entity(self, raw_entity):
logger.debug(f"parse_entity: {raw_entity}")
message_type = raw_entity.pop("type")
match message_type:
case "push":
return Push(**raw_entity)
case "tag":
return Tag(**raw_entity)
case _:
raise EntityTypeError(f"unsupported type {message_type}")

def get_remote(self, repo, remote_url):
"""
get the repo if it exists, create it otherwise
@@ -79,8 +64,7 @@ def handle_commits(self, entity, clone_dir, remote_url, remote_target):
remote.push()
logger.info(f"Done for entity {entity.pushid}")

def sync(self, raw_entity):
entity = self.parse_entity(raw_entity)
def sync(self, entity: Push | Tag) -> None:
repo_config = self._repos_config.get(entity.repo_url)
if not repo_config:
logger.warning(f"repo {entity.repo_url} is not supported yet")
40 changes: 40 additions & 0 deletions tests/test_pulse_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import pytest

from git_hg_sync.pulse_worker import PulseWorker, EntityTypeError
from git_hg_sync.repo_synchronizer import Push, Tag

def raw_push_entity():
return {
"type": "push",
"repo_url": "repo_url",
"heads": ["head"],
"commits": ["commit"],
"time": 0,
"pushid": 0,
"user": "user",
"push_json_url": "push_json_url",
}


def raw_tag_entity():
return {
"type": "tag",
"repo_url": "repo_url",
"tag": "tag",
"commit": "commit",
"time": 0,
"pushid": 0,
"user": "user",
"push_json_url": "push_json_url",
}

def test_parse_entity_valid():
push_entity = PulseWorker.parse_entity(raw_push_entity())
assert isinstance(push_entity, Push)
tag_entity = PulseWorker.parse_entity(raw_tag_entity())
assert isinstance(tag_entity, Tag)


def test_parse_invalid_type():
with pytest.raises(EntityTypeError):
PulseWorker.parse_entity({"type": "unknown"})
53 changes: 6 additions & 47 deletions tests/test_repo_synchronizer.py
Original file line number Diff line number Diff line change
@@ -2,10 +2,8 @@

import pytest

from git_hg_sync import __main__, repo_synchronizer

HERE = Path(__file__).parent

from git_hg_sync.__main__ import get_connection, get_queue
from git_hg_sync.repo_synchronizer import RepoSynchronyzer, Push

@pytest.fixture
def pulse_config():
@@ -30,56 +28,17 @@ def repos_config():
}
}

def raw_push_entity():
return {
"type": "push",
"repo_url": "repo_url",
"heads": ["head"],
"commits": ["commit"],
"time": 0,
"pushid": 0,
"user": "user",
"push_json_url": "push_json_url",
}


def raw_tag_entity():
return {
"type": "tag",
"repo_url": "repo_url",
"tag": "tag",
"commit": "commit",
"time": 0,
"pushid": 0,
"user": "user",
"push_json_url": "push_json_url",
}


def test_parse_entity():
syncrepos = repo_synchronizer.RepoSynchronyzer(None)
push_entity = syncrepos.parse_entity(raw_push_entity())
assert isinstance(push_entity, repo_synchronizer.Push)
tag_entity = syncrepos.parse_entity(raw_tag_entity())
assert isinstance(tag_entity, repo_synchronizer.Tag)


def test_sync_process_with_bad_type():
syncrepos = repo_synchronizer.RepoSynchronyzer(None)
with pytest.raises(repo_synchronizer.EntityTypeError):
syncrepos.sync({"type": "badType"})


def test_sync_process_with_bad_repo(repos_config):
syncrepos = repo_synchronizer.RepoSynchronyzer(repos_config=repos_config)
syncrepos = RepoSynchronyzer(repos_config=repos_config)
with pytest.raises(AssertionError) as e:
syncrepos.sync(raw_push_entity())
syncrepos.sync(Push(repo_url="repo_url", heads=["head"], commits=["commits"], time=0, pushid=0, user="user", push_json_url="push_json_url"))
assert str(e.value) == f"clone {repos_config['repo_url']['clone']} doesn't exists"


def test_get_connection_and_queue(pulse_config):
connection = __main__.get_connection(pulse_config)
queue = __main__.get_queue(pulse_config)
connection = get_connection(pulse_config)
queue = get_queue(pulse_config)
assert connection.userid == pulse_config["userid"]
assert connection.host == f"{pulse_config['host']}:{pulse_config['port']}"
assert queue.name == pulse_config["queue"]