From e0dcdc688c1726495080e10160d4c1312834b19e Mon Sep 17 00:00:00 2001
From: Frank Bessou <frank.bessou@logilab.fr>
Date: Thu, 25 Jul 2024 14:30:00 +0200
Subject: [PATCH 1/4] feat: use a pydantic model to parse config file

---
 config.ini.example      |  8 --------
 config.toml.example     |  8 ++++++++
 git_hg_sync/__main__.py | 21 +++++++++++----------
 git_hg_sync/config.py   | 30 +++++++++++++++++++++++-------
 pyproject.toml          |  2 +-
 tests/pulse_utils.py    | 20 ++++++++++----------
 6 files changed, 53 insertions(+), 36 deletions(-)
 delete mode 100644 config.ini.example
 create mode 100644 config.toml.example

diff --git a/config.ini.example b/config.ini.example
deleted file mode 100644
index 860c76a..0000000
--- a/config.ini.example
+++ /dev/null
@@ -1,8 +0,0 @@
-[pulse]
-userid = <username>
-host = pulse.mozilla.org
-port = 5671
-exchange = exchange/<username>/test
-routing_key = #
-queue = queue/<username>/test
-password = <pulse-password>
diff --git a/config.toml.example b/config.toml.example
new file mode 100644
index 0000000..7049e39
--- /dev/null
+++ b/config.toml.example
@@ -0,0 +1,8 @@
+[pulse]
+userid = "<username>"
+host = "pulse.mozilla.org"
+port = 5671
+exchange = "exchange/<username>/test"
+routing_key = ""
+queue = "queue/<username>/test"
+password = "<pulse-password>"
diff --git a/git_hg_sync/__main__.py b/git_hg_sync/__main__.py
index a27b4c9..fb6d5df 100644
--- a/git_hg_sync/__main__.py
+++ b/git_hg_sync/__main__.py
@@ -5,7 +5,7 @@
 from kombu import Connection, Exchange, Queue
 from mozlog import commandline
 
-from git_hg_sync import config
+from git_hg_sync.config import Config, get_repos_config
 from git_hg_sync.pulse_worker import PulseWorker
 from git_hg_sync.repo_synchronizer import RepoSynchronyzer
 
@@ -19,21 +19,21 @@ def get_parser():
 
 def get_connection(config):
     return Connection(
-        hostname=config["host"],
-        port=config["port"],
-        userid=config["userid"],
-        password=config["password"],
+        hostname=config.host,
+        port=config.port,
+        userid=config.userid,
+        password=config.password,
         heartbeat=10,
         ssl=True,
     )
 
 
 def get_queue(config):
-    exchange = Exchange(config["exchange"], type="topic")
+    exchange = Exchange(config.exchange, type="topic")
     return Queue(
-        name=config["queue"],
+        name=config.queue,
         exchange=exchange,
-        routing_key=config["routing_key"],
+        routing_key=config.routing_key,
         exclusive=False,
     )
 
@@ -43,10 +43,11 @@ def main():
     commandline.add_logging_group(parser)
     args = parser.parse_args()
     logger = commandline.setup_logging("service", args, {"raw": sys.stdout})
-    pulse_config = config.get_pulse_config(HERE.parent / "config.ini")["pulse"]
+    config = Config.from_file(HERE.parent / "config.toml")
+    pulse_config = config.pulse
     connection = get_connection(pulse_config)
 
-    repos_config = config.get_repos_config(HERE.parent / "repos.json")
+    repos_config = get_repos_config(HERE.parent / "repos.json")
 
     queue = get_queue(pulse_config)
     repo_synchronyzer = RepoSynchronyzer(repos_config=repos_config)
diff --git a/git_hg_sync/config.py b/git_hg_sync/config.py
index 3735bad..8fc913c 100644
--- a/git_hg_sync/config.py
+++ b/git_hg_sync/config.py
@@ -1,15 +1,31 @@
-import configparser
+import pathlib 
+import tomllib
 import json
 
+import pydantic
 
-def get_pulse_config(config_file_path):
-    assert config_file_path.exists(), f"config file {config_file_path} doesn't exists"
-    config = configparser.ConfigParser()
-    config.read(config_file_path)
-    return config
+class PulseConfig(pydantic.BaseModel):
+    userid: str
+    host: str
+    port: int
+    exchange: str
+    routing_key: str
+    queue: str
+    password: str
 
 
-def get_repos_config(repo_file_path):
+class Config(pydantic.BaseModel):
+    pulse: PulseConfig
+
+    @staticmethod
+    def from_file(file_path: pathlib.Path) -> "Config":
+        assert file_path.exists(), f"config file {file_path} doesn't exists"
+        with open(file_path, "rb") as config_file:
+            config = tomllib.load(config_file)
+        return Config(**config)
+
+
+def get_repos_config(repo_file_path: pathlib.Path):
     assert repo_file_path.exists(), f"config file {repo_file_path} doesn't exists"
     with open(repo_file_path) as f:
         repos = json.load(f)
diff --git a/pyproject.toml b/pyproject.toml
index 2c6d7c2..aa215b1 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -7,7 +7,7 @@ name = "git-hg-sync"
 readme = "README.md"
 requires-python = ">=3.10"
 version = "0.1"
-dependencies = ['kombu', 'mozillapulse', 'GitPython', 'mozlog']
+dependencies = ['kombu', 'mozillapulse', 'GitPython', 'mozlog', "pydantic"]
 
 [tool.ruff]
 line-length = 100
diff --git a/tests/pulse_utils.py b/tests/pulse_utils.py
index a4b8aae..75010ce 100644
--- a/tests/pulse_utils.py
+++ b/tests/pulse_utils.py
@@ -3,7 +3,7 @@
 
 import kombu
 
-from git_hg_sync import config
+from git_hg_sync.config import Config 
 
 HERE = Path(__file__).parent
 
@@ -14,13 +14,13 @@ def send_pulse_message(pulse_config, payload):
     The Pulse message will be constructed from the specified payload
     and sent to the requested exchange.
     """
-    userid = pulse_config["userid"]
-    password = pulse_config["password"]
-    routing_key = pulse_config["routing_key"]
-    host = pulse_config["host"]
-    port = pulse_config["port"]
-    exchange = pulse_config["exchange"]
-    queue = pulse_config["queue"]
+    userid = pulse_config.userid
+    password = pulse_config.password
+    routing_key = pulse_config.routing_key
+    host = pulse_config.host
+    port = pulse_config.port
+    exchange = pulse_config.exchange
+    queue = pulse_config.queue
     print(f"connecting to pulse at {host}:{port} as {userid}")
 
     connection = kombu.Connection(
@@ -74,5 +74,5 @@ def send_pulse_message(pulse_config, payload):
         "user": "user",
         "push_json_url": "push_json_url",
     }
-    pulse_conf = config.get_pulse_config(HERE.parent / "config.ini")["pulse"]
-    send_pulse_message(pulse_conf, payload)
+    config = Config.from_file(HERE.parent / "config.toml")
+    send_pulse_message(config.pulse, payload)

From ef9a5135a94ffc3b54c8a7c7b5f741257bdafc99 Mon Sep 17 00:00:00 2001
From: Frank Bessou <frank.bessou@logilab.fr>
Date: Fri, 26 Jul 2024 12:17:50 +0200
Subject: [PATCH 2/4] feat: move mappings to config.toml

---
 config.toml.example              | 10 ++++
 git_hg_sync/__main__.py          | 10 ++--
 git_hg_sync/config.py            | 41 +++++++++++---
 git_hg_sync/repo_synchronizer.py | 94 +++++++++++++++++++++++---------
 repos.json                       |  6 --
 tests/pulse_utils.py             | 41 ++++++++++----
 tests/test_pulse_worker.py       |  3 +-
 tests/test_repo_synchronizer.py  | 54 +++++++++---------
 8 files changed, 174 insertions(+), 85 deletions(-)
 delete mode 100644 repos.json

diff --git a/config.toml.example b/config.toml.example
index 7049e39..76cbb44 100644
--- a/config.toml.example
+++ b/config.toml.example
@@ -6,3 +6,13 @@ exchange = "exchange/<username>/test"
 routing_key = ""
 queue = "queue/<username>/test"
 password = "<pulse-password>"
+
+[clones]
+directory = "../git_hg_sync_repos/"
+
+[mappings.test]
+git_repository = "/home/fbessou/dev/MOZI/fake-forge/git/test"
+
+[mappings.test.rules.esr1]
+branch_pattern = "test-esr1"
+mercurial_repository = "/home/fbessou/dev/MOZI/fake-forge/hg/test-esr1"
\ No newline at end of file
diff --git a/git_hg_sync/__main__.py b/git_hg_sync/__main__.py
index fb6d5df..ec7c762 100644
--- a/git_hg_sync/__main__.py
+++ b/git_hg_sync/__main__.py
@@ -5,7 +5,7 @@
 from kombu import Connection, Exchange, Queue
 from mozlog import commandline
 
-from git_hg_sync.config import Config, get_repos_config
+from git_hg_sync.config import Config
 from git_hg_sync.pulse_worker import PulseWorker
 from git_hg_sync.repo_synchronizer import RepoSynchronyzer
 
@@ -38,7 +38,7 @@ def get_queue(config):
     )
 
 
-def main():
+def main() -> None:
     parser = get_parser()
     commandline.add_logging_group(parser)
     args = parser.parse_args()
@@ -47,10 +47,10 @@ def main():
     pulse_config = config.pulse
     connection = get_connection(pulse_config)
 
-    repos_config = get_repos_config(HERE.parent / "repos.json")
-
     queue = get_queue(pulse_config)
-    repo_synchronyzer = RepoSynchronyzer(repos_config=repos_config)
+    repo_synchronyzer = RepoSynchronyzer(
+        clones_directory=config.clones.directory, mappings=config.mappings
+    )
     with connection as conn:
         logger.info(f"connected to {conn.host}")
         worker = PulseWorker(conn, queue, repo_synchronyzer=repo_synchronyzer)
diff --git a/git_hg_sync/config.py b/git_hg_sync/config.py
index 8fc913c..8cb430b 100644
--- a/git_hg_sync/config.py
+++ b/git_hg_sync/config.py
@@ -1,9 +1,10 @@
-import pathlib 
+import pathlib
 import tomllib
-import json
+from collections import Counter
 
 import pydantic
 
+
 class PulseConfig(pydantic.BaseModel):
     userid: str
     host: str
@@ -14,8 +15,37 @@ class PulseConfig(pydantic.BaseModel):
     password: str
 
 
+class MappingRule(pydantic.BaseModel):
+    branch_pattern: str
+    mercurial_repository: str
+
+
+class MappingConfig(pydantic.BaseModel):
+    git_repository: str
+    rules: dict[str, MappingRule]
+
+
+class ClonesConfig(pydantic.BaseModel):
+    directory: pathlib.Path
+
+
 class Config(pydantic.BaseModel):
     pulse: PulseConfig
+    clones: ClonesConfig
+    mappings: dict[str, MappingConfig]
+
+    @pydantic.field_validator("mappings")
+    @staticmethod
+    def no_duplicate_git_repositories(
+        mappings: dict[str, MappingConfig]
+    ) -> dict[str, MappingConfig]:
+        counter = Counter([mapping.git_repository for mapping in mappings.values()])
+        for git_repo, count in counter.items():
+            if count > 1:
+                raise ValueError(
+                    f"Found {count} different mappings for the same git repository."
+                )
+        return mappings
 
     @staticmethod
     def from_file(file_path: pathlib.Path) -> "Config":
@@ -23,10 +53,3 @@ def from_file(file_path: pathlib.Path) -> "Config":
         with open(file_path, "rb") as config_file:
             config = tomllib.load(config_file)
         return Config(**config)
-
-
-def get_repos_config(repo_file_path: pathlib.Path):
-    assert repo_file_path.exists(), f"config file {repo_file_path} doesn't exists"
-    with open(repo_file_path) as f:
-        repos = json.load(f)
-    return repos
diff --git a/git_hg_sync/repo_synchronizer.py b/git_hg_sync/repo_synchronizer.py
index d019a65..567796c 100644
--- a/git_hg_sync/repo_synchronizer.py
+++ b/git_hg_sync/repo_synchronizer.py
@@ -1,17 +1,21 @@
 from dataclasses import dataclass
 from pathlib import Path
+from typing import Literal
 
 from git import Repo
 from mozlog import get_proxy_logger
 
+from git_hg_sync.config import MappingConfig
+
 logger = get_proxy_logger("sync_repo")
 
 
 @dataclass
 class Push:
     repo_url: str
-    heads: list[str]
-    commits: list[str]
+    branches: dict[
+        str, str
+    ]  # Mapping between branch names (key) and corresponding commit sha (value)
     time: int
     pushid: int
     user: str
@@ -31,45 +35,81 @@ class Tag:
 
 class RepoSynchronyzer:
 
-    def __init__(self, repos_config):
-        self._repos_config = repos_config
+    def __init__(self, clones_directory: Path, mappings: dict[str, MappingConfig]):
+        self._clones_directory = clones_directory
+        self._mappings = mappings
 
-    def get_remote(self, repo, remote_url):
+    def get_remote(self, repo, remote_name: Literal["git", "hg"], remote_url: str):
         """
         get the repo if it exists, create it otherwise
         the repo name is the last part of the url
         """
-        remote_name = remote_url.split("/")[-1]
         for rem in repo.remotes:
             if rem.name == remote_name:
-                return repo.remote(remote_name)
-                break
+                remote = repo.remote(remote_name)
+                remote.set_url(remote_url, allow_unsafe_protocols=True)
+                return remote
         else:
-            return repo.create_remote(remote_name, remote_url)
+            return repo.create_remote(
+                remote_name, remote_url, allow_unsafe_protocols=True
+            )
 
-    def handle_commits(self, entity, clone_dir, remote_url, remote_target):
-        logger.info(f"Handle entity {entity.pushid}")
+    def handle_commits(
+        self, push_message: Push, clone_dir: Path, mapping: MappingConfig
+    ):
+        logger.info(f"Handle entity {push_message.pushid}")
         assert Path(clone_dir).exists(), f"clone {clone_dir} doesn't exists"
         repo = Repo(clone_dir)
-        remote = self.get_remote(repo, remote_url)
+        remote = self.get_remote(repo, "git", mapping.git_repository)
         # fetch new commits
-        remote.fetch()
-        match entity:
-            case Push():
-                for head in entity.heads:
-                    remote.pull(head)
-            case _:
-                pass  # TODO
-        # push on good repo/branch
-        remote = repo.remote(remote_target)
-        remote.push()
-        logger.info(f"Done for entity {entity.pushid}")
+        for branch_name, commit in push_message.branches.items():
+            remote.fetch(commit)
+            if branch_name in repo.branches:
+                branch = repo.branches[branch_name]
+                branch.commit = commit
+            else:
+                branch = repo.create_head(branch_name, commit)
+            breakpoint()
+            for rule_name, rule in mapping.rules.items():
+                if rule.branch_pattern == branch_name:
+                    remote = self.get_remote(
+                        repo, "hg", "hg::" + rule.mercurial_repository
+                    )
+                    remote.push(branch.name)
+
+        # match push_message:
+        #    case Push():
+        #        for head in push_message.heads:
+        #            remote.pull(head)
+        #    case _:
+        #        pass  # TODO
+        ## push on good repo/branch
+        # remote = self.get_remote(repo, "hg", "hg::" + mapping.rules.mercurial_repository)
+        # remote.push(push_message.heads)
+        # logger.info(f"Done for entity {push_message.pushid}")
 
     def sync(self, entity: Push | Tag) -> None:
-        repo_config = self._repos_config.get(entity.repo_url)
-        if not repo_config:
-            logger.warning(f"repo {entity.repo_url} is not supported yet")
+        if isinstance(entity, Tag):
+            logger.warning("Tag message not handled not implemented yet")
+            return
+
+        matching_mappings = [
+            (mapping_name, mapping)
+            for mapping_name, mapping in self._mappings.items()
+            if mapping.git_repository == entity.repo_url
+        ]
+        if not matching_mappings:
+            logger.warning(f"No mapping found for git repository {entity.repo_url} ")
             return
+
+        if len(matching_mappings) > 1:
+            logger.warning(f"No mapping found for git repository {entity.repo_url} ")
+            return
+
+        mapping_name, mapping = matching_mappings[0]
+        clone_directory = self._clones_directory / mapping_name
         self.handle_commits(
-            entity, repo_config["clone"], entity.repo_url, repo_config["target"]
+            entity,
+            clone_directory,
+            mapping,
         )
diff --git a/repos.json b/repos.json
deleted file mode 100644
index 7960814..0000000
--- a/repos.json
+++ /dev/null
@@ -1,6 +0,0 @@
-{
-  "https://github.com/djangoliv/chatzilla.git": {
-    "clone": "clones/chatzilla_cinnabar",
-    "target": "origin"
-  }
-}
diff --git a/tests/pulse_utils.py b/tests/pulse_utils.py
index 75010ce..ab67585 100644
--- a/tests/pulse_utils.py
+++ b/tests/pulse_utils.py
@@ -1,9 +1,10 @@
 from datetime import datetime
 from pathlib import Path
+import sys
 
 import kombu
 
-from git_hg_sync.config import Config 
+from git_hg_sync.config import Config
 
 HERE = Path(__file__).parent
 
@@ -64,15 +65,33 @@ def send_pulse_message(pulse_config, payload):
 
 
 if __name__ == "__main__":
-    payload = {
-        "type": "tag",
-        "repo_url": "https://github.com/djangoliv/chatzilla.git",
-        "tag": "truc",
-        "commit": "88949ac3ad633e92cf52354d91857074e264ad12",
-        "time": 0,
-        "pushid": 0,
-        "user": "user",
-        "push_json_url": "push_json_url",
-    }
     config = Config.from_file(HERE.parent / "config.toml")
+    config.mappings
+    message_type = sys.argv[1]
+    mapping = config.mappings[sys.argv[2]]
+    match message_type:
+        case "push":
+            payload = {
+                "type": "push",
+                "repo_url": mapping.git_repository,
+                "branches": {sys.argv[3]: sys.argv[4]},
+                "time": 0,
+                "pushid": 0,
+                "user": "user",
+                "push_json_url": "push_json_url",
+            }
+        case "tag":
+            payload = {
+                "type": "tag",
+                "repo_url": "/home/fbessou/dev/MOZI/fake-forge/git/chatzilla",
+                "tag": "tag",
+                "commit": "88949ac3ad633e92cf52354d91857074e264ad12",
+                "time": 0,
+                "pushid": 0,
+                "user": "user",
+                "push_json_url": "push_json_url",
+            }
+        case _:
+            raise NotImplementedError()
+    print(payload)
     send_pulse_message(config.pulse, payload)
diff --git a/tests/test_pulse_worker.py b/tests/test_pulse_worker.py
index 43ae8a5..dc314f1 100644
--- a/tests/test_pulse_worker.py
+++ b/tests/test_pulse_worker.py
@@ -8,8 +8,7 @@ def raw_push_entity():
     return {
         "type": "push",
         "repo_url": "repo_url",
-        "heads": ["head"],
-        "commits": ["commit"],
+        "branches": {"mybranch": "acommitsha"},
         "time": 0,
         "pushid": 0,
         "user": "user",
diff --git a/tests/test_repo_synchronizer.py b/tests/test_repo_synchronizer.py
index c1c2630..1aa1362 100644
--- a/tests/test_repo_synchronizer.py
+++ b/tests/test_repo_synchronizer.py
@@ -1,12 +1,13 @@
 import pytest
 
+from git_hg_sync.config import PulseConfig, MappingConfig
 from git_hg_sync.__main__ import get_connection, get_queue
 from git_hg_sync.repo_synchronizer import Push, RepoSynchronyzer
 
 
 @pytest.fixture
 def pulse_config():
-    return {
+    return PulseConfig(**{
         "userid": "test_user",
         "host": "pulse.mozilla.org",
         "port": 5671,
@@ -14,40 +15,43 @@ def pulse_config():
         "routing_key": "#",
         "queue": "queue/test_user/test",
         "password": "PULSE_PASSWORD",
-    }
+    })
 
 
 @pytest.fixture
-def repos_config():
+def mappings():
     return {
-        "repo_url": {
-            "clone": "bad_directory",
-            "remote": "remote",
-            "target": "target",
-        }
+        "myrepo": MappingConfig(**{
+            "git_repository": "myforge/myrepo.git",
+            "rules": {
+                "rule1": {
+                    "branch_pattern": "branch_name",
+                    "mercurial_repository": "myforge/myhgrepo"
+
+                }
+            }
+        })
     }
 
 
-def test_sync_process_with_bad_repo(repos_config):
-    syncrepos = RepoSynchronyzer(repos_config=repos_config)
-    with pytest.raises(AssertionError) as e:
-        syncrepos.sync(
-            Push(
-                repo_url="repo_url",
-                heads=["head"],
-                commits=["commits"],
-                time=0,
-                pushid=0,
-                user="user",
-                push_json_url="push_json_url",
-            )
+def test_sync_process_with_bad_repo(tmp_path, mappings):
+    syncrepos = RepoSynchronyzer(tmp_path / "clones", mappings)
+    syncrepos.sync(
+        Push(
+            repo_url="repo_url",
+            branches={"branch1": "anothercommitsha"},
+            time=0,
+            pushid=0,
+            user="user",
+            push_json_url="push_json_url",
         )
-    assert str(e.value) == f"clone {repos_config['repo_url']['clone']} doesn't exists"
+    )
+    # TODO finish that test (check that warning was triggered)
 
 
 def test_get_connection_and_queue(pulse_config):
     connection = get_connection(pulse_config)
     queue = get_queue(pulse_config)
-    assert connection.userid == pulse_config["userid"]
-    assert connection.host == f"{pulse_config['host']}:{pulse_config['port']}"
-    assert queue.name == pulse_config["queue"]
+    assert connection.userid == pulse_config.userid
+    assert connection.host == f"{pulse_config.host}:{pulse_config.port}"
+    assert queue.name == pulse_config.queue

From 7bfd31ef525d885d56bf1bc0b23a7a0c6885fdec Mon Sep 17 00:00:00 2001
From: Olivier Giorgis <ogiorgis@logilab.fr>
Date: Fri, 26 Jul 2024 15:34:41 +0200
Subject: [PATCH 3/4] fix lint

---
 .github/workflows/test.yml      |  2 +-
 tests/test_repo_synchronizer.py | 39 ++++++++++++++++++---------------
 2 files changed, 22 insertions(+), 19 deletions(-)

diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
index 0508d64..df20149 100644
--- a/.github/workflows/test.yml
+++ b/.github/workflows/test.yml
@@ -8,7 +8,7 @@ jobs:
       - name: Set up Python 3.10
         uses: actions/setup-python@v5
         with:
-          python-version: '3.10'
+          python-version: '3.11'
           cache: 'pip'
       - name: Install dependencies
         run: |
diff --git a/tests/test_repo_synchronizer.py b/tests/test_repo_synchronizer.py
index 1aa1362..2579405 100644
--- a/tests/test_repo_synchronizer.py
+++ b/tests/test_repo_synchronizer.py
@@ -7,30 +7,33 @@
 
 @pytest.fixture
 def pulse_config():
-    return PulseConfig(**{
-        "userid": "test_user",
-        "host": "pulse.mozilla.org",
-        "port": 5671,
-        "exchange": "exchange/test_user/test",
-        "routing_key": "#",
-        "queue": "queue/test_user/test",
-        "password": "PULSE_PASSWORD",
-    })
+    return PulseConfig(
+        **{
+            "userid": "test_user",
+            "host": "pulse.mozilla.org",
+            "port": 5671,
+            "exchange": "exchange/test_user/test",
+            "routing_key": "#",
+            "queue": "queue/test_user/test",
+            "password": "PULSE_PASSWORD",
+        }
+    )
 
 
 @pytest.fixture
 def mappings():
     return {
-        "myrepo": MappingConfig(**{
-            "git_repository": "myforge/myrepo.git",
-            "rules": {
-                "rule1": {
-                    "branch_pattern": "branch_name",
-                    "mercurial_repository": "myforge/myhgrepo"
-
-                }
+        "myrepo": MappingConfig(
+            **{
+                "git_repository": "myforge/myrepo.git",
+                "rules": {
+                    "rule1": {
+                        "branch_pattern": "branch_name",
+                        "mercurial_repository": "myforge/myhgrepo",
+                    }
+                },
             }
-        })
+        )
     }
 
 

From b112e9353274c0fccc3175c3c84148dee83b3ffc Mon Sep 17 00:00:00 2001
From: Olivier Giorgis <ogiorgis@logilab.fr>
Date: Thu, 25 Jul 2024 12:18:00 +0200
Subject: [PATCH 4/4] add rabbitMQ in docker-compose

---
 .gitignore                      |  2 +-
 docker-compose.yaml             | 23 ++++++++++++++++++++
 tests/pulse_utils.py            |  8 +++----
 tests/test_integration.py       | 38 +++++++++++++++++++++++++++++++++
 tests/test_repo_synchronizer.py |  2 +-
 5 files changed, 67 insertions(+), 6 deletions(-)
 create mode 100644 tests/test_integration.py

diff --git a/.gitignore b/.gitignore
index ed8a54d..974453b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,5 +1,5 @@
 clones
 .tox
 **/__pycache__/
-config.ini
+config.toml
 .coverage
diff --git a/docker-compose.yaml b/docker-compose.yaml
index 0d12bd5..b392d3c 100644
--- a/docker-compose.yaml
+++ b/docker-compose.yaml
@@ -2,3 +2,26 @@ services:
   sync:
     build: .
     command: ['pytest', 'tests', '-p', 'no:cacheprovider']
+    environment:
+      - RABBITMQ=true
+    depends_on:
+      pulse:
+        condition: service_healthy
+    networks:
+      - pulse_network
+
+  pulse:
+    image: rabbitmq:3-management-alpine
+    healthcheck:
+      test: rabbitmq-diagnostics -q ping
+      interval: 5s
+      timeout: 2s
+      retries: 1
+    ports:
+      - 15672:15672
+    networks:
+      - pulse_network
+
+networks:
+  pulse_network:
+    driver: bridge
diff --git a/tests/pulse_utils.py b/tests/pulse_utils.py
index ab67585..2829223 100644
--- a/tests/pulse_utils.py
+++ b/tests/pulse_utils.py
@@ -1,6 +1,6 @@
+import sys
 from datetime import datetime
 from pathlib import Path
-import sys
 
 import kombu
 
@@ -9,7 +9,7 @@
 HERE = Path(__file__).parent
 
 
-def send_pulse_message(pulse_config, payload):
+def send_pulse_message(pulse_config, payload, ssl=True):
     """Send a pulse message
     The routing key will be constructed from the repository URL.
     The Pulse message will be constructed from the specified payload
@@ -30,7 +30,7 @@ def send_pulse_message(pulse_config, payload):
         userid=userid,
         password=password,
         connect_timeout=100,
-        ssl=True,
+        ssl=ssl,
     )
     connection.connect()
 
@@ -56,7 +56,7 @@ def send_pulse_message(pulse_config, payload):
                 "exchange": exchange,
                 "routing_key": routing_key,
                 "serializer": "json",
-                "sent": datetime.utcnow().isoformat(),
+                "sent": datetime.now(),
             },
         }
 
diff --git a/tests/test_integration.py b/tests/test_integration.py
new file mode 100644
index 0000000..d7fd580
--- /dev/null
+++ b/tests/test_integration.py
@@ -0,0 +1,38 @@
+import os
+
+import pulse_utils
+import pytest
+
+from git_hg_sync.config import PulseConfig
+
+NO_RABBITMQ = not (os.getenv("RABBITMQ") == "true")
+
+
+@pytest.fixture
+def pulse_config():
+    return PulseConfig(
+        **{
+            "userid": "guest",
+            "host": "pulse",
+            "port": 5672,
+            "exchange": "exchange/guest/test",
+            "routing_key": "#",
+            "queue": "queue/guest/test",
+            "password": "guest",
+        }
+    )
+
+
+@pytest.mark.skipif(NO_RABBITMQ, reason="Test doesn't work without rabbitMq")
+def test_send(pulse_config):
+    payload = {
+        "type": "tag",
+        "repo_url": "repo.git",
+        "tag": "Tag",
+        "commit": "sha",
+        "time": 0,
+        "pushid": 0,
+        "user": "user",
+        "push_json_url": "push_json_url",
+    }
+    pulse_utils.send_pulse_message(pulse_config, payload, ssl=False)
diff --git a/tests/test_repo_synchronizer.py b/tests/test_repo_synchronizer.py
index 2579405..f81475c 100644
--- a/tests/test_repo_synchronizer.py
+++ b/tests/test_repo_synchronizer.py
@@ -1,7 +1,7 @@
 import pytest
 
-from git_hg_sync.config import PulseConfig, MappingConfig
 from git_hg_sync.__main__ import get_connection, get_queue
+from git_hg_sync.config import MappingConfig, PulseConfig
 from git_hg_sync.repo_synchronizer import Push, RepoSynchronyzer