Skip to content

Commit 0191d95

Browse files
committedJul 22, 2024
refacto: use kombu mixin
1 parent 29d7e8d commit 0191d95

10 files changed

+215
-208
lines changed
 

‎git_hg_sync/config.py

+5-11
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,15 @@
11
import configparser
22
import json
3-
from pathlib import Path
43

5-
HERE = Path(__file__).parent
64

7-
8-
def get_pulse_config():
5+
def get_pulse_config(config_file_path):
6+
assert config_file_path.exists(), f"config file {config_file_path} doesn't exists"
97
config = configparser.ConfigParser()
10-
config.read(HERE.parent / "config.ini")
8+
config.read(config_file_path)
119
return config
1210

1311

14-
def get_repos_config():
15-
with open(HERE.parent / "repos.json") as f:
12+
def get_repos_config(repo_file_path):
13+
with open(repo_file_path) as f:
1614
repos = json.load(f)
1715
return repos
18-
19-
20-
if __name__ == "__main__":
21-
get_pulse_config()

‎git_hg_sync/git_cinnabar.py

-27
This file was deleted.

‎git_hg_sync/pulse_consumer.py

+26-33
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,26 @@
1-
from kombu import Connection, Exchange, Queue
2-
3-
4-
def get_connection(pulse_conf):
5-
return Connection(
6-
hostname=pulse_conf["host"],
7-
port=pulse_conf["port"],
8-
userid=pulse_conf["userid"],
9-
password=pulse_conf["password"],
10-
ssl=True,
11-
)
12-
13-
14-
def get_consumer(pulse_conf, connection, callbacks):
15-
exchange = pulse_conf["exchange"]
16-
exchange = Exchange(exchange, type="topic")
17-
exchange(connection).declare(
18-
passive=True
19-
) # raise an error if exchange doesn't exist
20-
21-
queue = Queue(
22-
name=pulse_conf["queue"],
23-
exchange=exchange,
24-
routing_key=pulse_conf["routing_key"],
25-
durable=True,
26-
exclusive=False,
27-
auto_delete=False,
28-
)
29-
30-
consumer = connection.Consumer(queue, auto_declare=False, callbacks=callbacks)
31-
consumer.queues[0].queue_declare()
32-
consumer.queues[0].queue_bind()
33-
return consumer
1+
import logging
2+
3+
from kombu.mixins import ConsumerMixin
4+
5+
from git_hg_sync import sync_repos
6+
7+
logger = logging.getLogger()
8+
9+
10+
class Worker(ConsumerMixin):
11+
12+
def __init__(self, connection, queue):
13+
self.connection = connection
14+
self.task_queue = queue
15+
16+
def get_consumers(self, Consumer, channel):
17+
consumer = Consumer(
18+
self.task_queue, auto_declare=False, callbacks=[self.on_task]
19+
)
20+
return [consumer]
21+
22+
def on_task(self, body, message):
23+
logger.info(f"Received message: {body}")
24+
message.ack()
25+
raw_entity = body["payload"]
26+
sync_repos.process(raw_entity)

‎git_hg_sync/service.py

+43
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import logging
2+
from pathlib import Path
3+
4+
from kombu import Connection, Exchange, Queue
5+
6+
from git_hg_sync import config
7+
from git_hg_sync.pulse_consumer import Worker
8+
9+
HERE = Path(__file__).parent
10+
11+
12+
def get_connection(config):
13+
return Connection(
14+
hostname=config["host"],
15+
port=config["port"],
16+
userid=config["userid"],
17+
password=config["password"],
18+
ssl=True,
19+
)
20+
21+
22+
def get_queue(config):
23+
exchange = Exchange(config["exchange"], type="topic")
24+
return Queue(
25+
name=config["queue"],
26+
exchange=exchange,
27+
routing_key=config["routing_key"],
28+
exclusive=False,
29+
)
30+
31+
32+
def main():
33+
logging.basicConfig(level=logging.INFO)
34+
pulse_config = config.get_pulse_config(HERE.parent / "config.ini")["pulse"]
35+
connection = get_connection(pulse_config)
36+
queue = get_queue(pulse_config)
37+
with connection as conn:
38+
worker = Worker(conn, queue)
39+
worker.run()
40+
41+
42+
if __name__ == "__main__":
43+
main()

‎git_hg_sync/sync.py

-69
This file was deleted.

‎git_hg_sync/sync_repos.py

+65
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import logging
2+
from dataclasses import dataclass
3+
from pathlib import Path
4+
5+
from git import Repo
6+
7+
from git_hg_sync import config
8+
9+
logger = logging.getLogger()
10+
HERE = Path(__file__).parent
11+
12+
13+
@dataclass
14+
class Push:
15+
type: str
16+
repo_url: str
17+
heads: list[str]
18+
commits: list[str]
19+
time: int
20+
pushid: int
21+
user: str
22+
push_json_url: str
23+
24+
25+
@dataclass
26+
class Tag:
27+
type: str
28+
repo_url: str
29+
tag: str
30+
commit: str
31+
time: int
32+
pushid: int
33+
user: str
34+
push_json_url: str
35+
36+
37+
def parse_entity(raw_entity):
38+
if raw_entity["type"] == "push":
39+
entity = Push(**raw_entity)
40+
elif raw_entity["type"] == "tag":
41+
entity = Tag(**raw_entity)
42+
else:
43+
raise AttributeError(f"unsupported type {raw_entity['type']}")
44+
return entity
45+
46+
47+
def process(raw_entity):
48+
entity = parse_entity(raw_entity)
49+
repo_config = config.get_repos_config(HERE.parent / "repos.json").get(
50+
entity.repo_url
51+
)
52+
if not repo_config:
53+
logger.warning(f"repo {entity.repo_url} is not supported yet")
54+
else:
55+
repo = Repo(repo_config["clone"])
56+
# fetch new commits
57+
remote = repo.remote(repo_config["remote"])
58+
remote.fetch()
59+
# add commits to the good branch
60+
for commit_sha in entity.commits:
61+
logger.info(f"handle commit {commit_sha}")
62+
repo.git.cherry_pick(commit_sha)
63+
# push on good repo/branch
64+
remote = repo.remote(repo_config["target"])
65+
remote.push()

‎tests/config_test.ini

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
[pulse]
2+
userid = test
3+
host = localhost
4+
port = 5671
5+
exchange = exchange/test/test
6+
routing_key = #
7+
queue = queue/test/test
8+
password = pass_test

‎tests/pulse_utils.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
import logging
22
from datetime import datetime
3+
from pathlib import Path
34

45
import kombu
56

67
from git_hg_sync import config
78

89
logger = logging.getLogger()
10+
HERE = Path(__file__).parent
911

1012

1113
def send_pulse_message(pulse_config, payload):
@@ -76,5 +78,5 @@ def send_pulse_message(pulse_config, payload):
7678
"user": "user",
7779
"push_json_url": "push_json_url",
7880
}
79-
pulse_conf = config.get_pulse_config()["pulse"]
81+
pulse_conf = config.get_pulse_config(HERE.parent / "config.ini")["pulse"]
8082
send_pulse_message(pulse_conf, payload)

‎tests/test_pulse.py

-67
This file was deleted.

0 commit comments

Comments
 (0)
Failed to load comments.