From ef55ffb0eea47f0c7004338137fff5e729abf72f Mon Sep 17 00:00:00 2001
From: Frank Bessou <frank.bessou@logilab.fr>
Date: Tue, 7 Jan 2025 11:52:00 +0100
Subject: [PATCH 01/10] fix: typo on should_stop when handling SIGINT

---
 git_hg_sync/application.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/git_hg_sync/application.py b/git_hg_sync/application.py
index a4e8a6b..bbf922c 100644
--- a/git_hg_sync/application.py
+++ b/git_hg_sync/application.py
@@ -31,7 +31,7 @@ def signal_handler(sig: int, frame: Optional[FrameType]) -> None:
             if self._worker.should_stop:
                 logger.info("Process killed by user")
                 sys.exit(1)
-            self._worker.shoud_stop = True
+            self._worker.should_stop = True
             logger.info("Process exiting gracefully")
 
         signal.signal(signal.SIGINT, signal_handler)

From af57cb596e8cc4408ca8cf268c4c896110c8bcbb Mon Sep 17 00:00:00 2001
From: Frank Bessou <frank.bessou@logilab.fr>
Date: Tue, 7 Jan 2025 18:51:07 +0100
Subject: [PATCH 02/10] feat: add operation type (as literal) into each
 operation

---
 git_hg_sync/mapping.py | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/git_hg_sync/mapping.py b/git_hg_sync/mapping.py
index 99e1dd6..d2b67b9 100644
--- a/git_hg_sync/mapping.py
+++ b/git_hg_sync/mapping.py
@@ -1,7 +1,7 @@
 import re
 from dataclasses import dataclass
 from functools import cached_property
-from typing import Sequence, TypeAlias
+from typing import Sequence, TypeAlias, Literal
 
 import pydantic
 
@@ -16,6 +16,8 @@ class SyncBranchOperation:
     # Destination (hg)
     destination_branch: str
 
+    type: Literal["SyncBranchOperation"] = "SyncBranchOperation"
+
 
 @dataclass
 class SyncTagOperation:
@@ -26,6 +28,8 @@ class SyncTagOperation:
     tag: str
     tags_destination_branch: str
 
+    type: Literal["SyncTagOperation"] = "SyncTagOperation"
+
 
 SyncOperation: TypeAlias = SyncBranchOperation | SyncTagOperation
 

From 758493f4e6389cb3ff16e2b0a3cebd131494711e Mon Sep 17 00:00:00 2001
From: Frank Bessou <frank.bessou@logilab.fr>
Date: Tue, 7 Jan 2025 19:02:55 +0100
Subject: [PATCH 03/10] feat: use module name as component for logger

---
 git_hg_sync/pulse_worker.py      | 2 +-
 git_hg_sync/repo_synchronizer.py | 3 ++-
 2 files changed, 3 insertions(+), 2 deletions(-)

diff --git a/git_hg_sync/pulse_worker.py b/git_hg_sync/pulse_worker.py
index 0ec8ebb..badc40d 100644
--- a/git_hg_sync/pulse_worker.py
+++ b/git_hg_sync/pulse_worker.py
@@ -5,7 +5,7 @@
 
 from git_hg_sync.events import Push, Tag
 
-logger = get_proxy_logger("pluse_consumer")
+logger = get_proxy_logger(__name__)
 
 
 class EventHandler(Protocol):
diff --git a/git_hg_sync/repo_synchronizer.py b/git_hg_sync/repo_synchronizer.py
index 2317b7f..53a85f9 100644
--- a/git_hg_sync/repo_synchronizer.py
+++ b/git_hg_sync/repo_synchronizer.py
@@ -5,7 +5,8 @@
 from git_hg_sync.mapping import SyncOperation, SyncBranchOperation, SyncTagOperation
 from mozlog import get_proxy_logger
 
-logger = get_proxy_logger("sync_repo")
+
+logger = get_proxy_logger(__name__)
 
 
 class RepoSyncError(Exception):

From 889cad303451f4198d11be5a0c1c0b364838e685 Mon Sep 17 00:00:00 2001
From: Frank Bessou <frank.bessou@logilab.fr>
Date: Tue, 7 Jan 2025 19:04:05 +0100
Subject: [PATCH 04/10] tests: enable logging to stdout in tests

---
 tests/conftest.py | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/tests/conftest.py b/tests/conftest.py
index cb84835..e8cf94a 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -1,12 +1,17 @@
+import sys
+
 import mozlog
 import pytest
 
 from git_hg_sync.config import PulseConfig
 
 
-@pytest.fixture(autouse=True)
+@pytest.fixture(autouse=True, scope="session")
 def mozlog_logging() -> None:
     logger = mozlog.structuredlog.StructuredLogger("tests")
+    logger.add_handler(
+        mozlog.handlers.StreamHandler(sys.stdout, mozlog.formatters.JSONFormatter())
+    )
     mozlog.structuredlog.set_default_logger(logger)
 
 

From a7d16ccb9a0894fca167bf05aa56bfcfa12f691d Mon Sep 17 00:00:00 2001
From: Frank Bessou <frank.bessou@logilab.fr>
Date: Tue, 7 Jan 2025 19:04:30 +0100
Subject: [PATCH 05/10] feat: add logging to repo synchronizer

---
 git_hg_sync/repo_synchronizer.py | 61 ++++++++++++++++++++++++++++----
 1 file changed, 55 insertions(+), 6 deletions(-)

diff --git a/git_hg_sync/repo_synchronizer.py b/git_hg_sync/repo_synchronizer.py
index 53a85f9..ea0589d 100644
--- a/git_hg_sync/repo_synchronizer.py
+++ b/git_hg_sync/repo_synchronizer.py
@@ -1,3 +1,5 @@
+import json
+from dataclasses import asdict
 from pathlib import Path
 
 
@@ -18,7 +20,6 @@ class MercurialMetadataNotFoundError(RepoSyncError):
 
 
 class RepoSynchronizer:
-
     def __init__(
         self,
         clone_directory: Path,
@@ -27,12 +28,21 @@ def __init__(
         self._clone_directory = clone_directory
         self._src_remote = url
 
+    def _repo_config_as_dict(self, repo: Repo):
+        with repo.config_reader() as reader:
+            return {
+                section: dict(reader.items(section)) for section in reader.sections()
+            }
+
     def _get_clone_repo(self) -> Repo:
         """Get a GitPython Repo object pointing to a git clone of the source
         remote."""
+        log_data = json.dumps({"clone_directory": str(self._clone_directory)})
         if self._clone_directory.exists():
+            logger.debug(f"Clone directory exists. Using it .{log_data}")
             repo = Repo(self._clone_directory)
         else:
+            logger.debug(f"Clone directory does not exist. Creating it. {log_data}")
             repo = Repo.init(self._clone_directory)
 
         # Ensure that the clone repository is well configured
@@ -50,18 +60,43 @@ def _fetch_all_from_remote(self, repo: Repo, remote: str) -> None:
             repo.git.fetch([remote])
         except exc.GitCommandError as e:
             # can't fetch if repo is empty
-            if "fatal: couldn't find remote ref HEAD" not in e.stderr:
-                raise e
+            if "fatal: couldn't find remote ref HEAD" in e.stderr:
+                return
+            raise
+
+    def _log_data(self, **kwargs):
+        return json.dumps(
+            {
+                "clone_directory": str(self._clone_directory),
+                "source_remote": self._src_remote,
+                **kwargs,
+            }
+        )
 
     def sync(self, destination_url: str, operations: list[SyncOperation]) -> None:
-        repo = self._get_clone_repo()
         destination_remote = f"hg::{destination_url}"
 
+        json_operations = self._log_data(
+            destination_remote=destination_remote,
+            operations=[asdict(op) for op in operations],
+        )
+        logger.info(f"Synchronizing. {json_operations}")
+
+        repo = self._get_clone_repo()
+
+        logger.debug(
+            f"Git clone configuration. {self._log_data(configuration=self._repo_config_as_dict(repo))}"
+        )
+
         # Ensure we have all commits from destination repository
+        logger.debug(f"Fetching all commits from destination. {self._log_data()}")
         self._fetch_all_from_remote(repo, destination_remote)
 
         # Get commits we want to send to destination repository
         commits_to_fetch = [operation.source_commit for operation in operations]
+        logger.debug(
+            f"Fetching source commits. {self._log_data(commits=commits_to_fetch)}"
+        )
         repo.git.fetch([self._src_remote, *commits_to_fetch])
 
         push_args = [destination_remote]
@@ -80,6 +115,9 @@ def sync(self, destination_url: str, operations: list[SyncOperation]) -> None:
         # tagging can only be done on a commit that already have mercurial
         # metadata
         if branch_ops:
+            logger.debug(
+                f"Adding mercurial metadata to git commits. {self._log_data(args=push_args)}"
+            )
             repo.git.execute(
                 ["git", "-c", "cinnabar.data=force", "push", "--dry-run", *push_args]
             )
@@ -92,6 +130,9 @@ def sync(self, destination_url: str, operations: list[SyncOperation]) -> None:
         # Create tag branches locally
         tag_branches = set([op.tags_destination_branch for op in tag_ops])
         for tag_branch in tag_branches:
+            logger.debug(
+                f"Get tag branch from destination. {self._log_data(tag_branch=tag_branch)}."
+            )
             repo.git.fetch(
                 [
                     "-f",
@@ -103,10 +144,14 @@ def sync(self, destination_url: str, operations: list[SyncOperation]) -> None:
 
         # Create tags
         for tag_operation in tag_ops:
-            if not self._commit_has_mercurial_metadata(
+            commit_has_metadata = self._commit_has_mercurial_metadata(
                 repo, tag_operation.source_commit
-            ):
+            )
+            if not commit_has_metadata:
                 raise MercurialMetadataNotFoundError()
+            logger.debug(
+                f"Creating tag. {self._log_data(operation=asdict(tag_operation))}"
+            )
             repo.git.cinnabar(
                 [
                     "tag",
@@ -118,4 +163,8 @@ def sync(self, destination_url: str, operations: list[SyncOperation]) -> None:
             )
 
         # Push commits, branches and tags to destination
+        logged_command = ["git", "push", *push_args]
+        logger.debug(
+            f"Pushing branches and tags to destination. {self._log_data(command=logged_command)}"
+        )
         repo.git.push(*push_args)

From 3c4aa03d40b1bb04f4879e35b60c31eea512d99d Mon Sep 17 00:00:00 2001
From: Frank Bessou <frank.bessou@logilab.fr>
Date: Tue, 7 Jan 2025 19:10:11 +0100
Subject: [PATCH 06/10] feat: improve logging of application

---
 git_hg_sync/application.py | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git a/git_hg_sync/application.py b/git_hg_sync/application.py
index bbf922c..d6f8d7b 100644
--- a/git_hg_sync/application.py
+++ b/git_hg_sync/application.py
@@ -1,3 +1,5 @@
+from dataclasses import asdict
+import json
 import signal
 import sys
 from types import FrameType
@@ -29,7 +31,7 @@ def __init__(
     def run(self) -> None:
         def signal_handler(sig: int, frame: Optional[FrameType]) -> None:
             if self._worker.should_stop:
-                logger.info("Process killed by user")
+                logger.error("Process killed by user")
                 sys.exit(1)
             self._worker.should_stop = True
             logger.info("Process exiting gracefully")
@@ -38,7 +40,9 @@ def signal_handler(sig: int, frame: Optional[FrameType]) -> None:
         self._worker.run()
 
     def _handle_push_event(self, push_event: Push) -> None:
-        logger.info(f"Handling push event: {push_event.pushid}")
+        json_event = json.dumps(asdict(push_event))
+        logger.info(f"Handling push event. {json_event}")
+
         synchronizer = self._repo_synchronizers[push_event.repo_url]
         operations_by_destination: dict[str, list[SyncOperation]] = {}
 

From 3f3de3b90b37e174cfabf1108efdb8b9d1efc5e3 Mon Sep 17 00:00:00 2001
From: Frank Bessou <frank.bessou@logilab.fr>
Date: Tue, 7 Jan 2025 20:14:10 +0100
Subject: [PATCH 07/10] feat: add a retry function

---
 git_hg_sync/retry.py | 47 +++++++++++++++++++++++++++++
 tests/test_retry.py  | 70 ++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 117 insertions(+)
 create mode 100644 git_hg_sync/retry.py
 create mode 100644 tests/test_retry.py

diff --git a/git_hg_sync/retry.py b/git_hg_sync/retry.py
new file mode 100644
index 0000000..766a812
--- /dev/null
+++ b/git_hg_sync/retry.py
@@ -0,0 +1,47 @@
+from collections.abc import Callable
+import time
+from typing import TypeVar
+
+from mozlog import get_proxy_logger
+
+TResult = TypeVar("TResult")
+
+logger = get_proxy_logger(__name__)
+
+
+def retry(
+    func: Callable[[], TResult], tries: int, action: str = "", delay: float = 0.0
+) -> TResult:
+    """
+    Retry a function on failure.
+    Args:
+        func (Callable): The function to retry.
+        tries (int): The number of attempts to make before failing.
+        action (str): A description of the action being performed for better logging context (eg. "fetching commits")
+        delay (float): The delay in seconds between attempts.
+
+    Returns:
+        _Ret: The return value of the function if successful.
+
+    Raises:
+        Exception: The last exception raised if all attempts fail.
+    """
+    for attempt in range(1, tries + 1):
+        try:
+            return func()
+        except Exception as exc:
+            action_text = f" while {action}" if action else ""
+            if attempt < tries:
+                logger.error(
+                    f"Attempt {attempt}/{tries} failed{action_text} with error: {type(exc).__name__}: {exc}. Retrying...",
+                    exc_info=True,
+                )
+                if delay > 0:
+                    time.sleep(delay)
+            else:
+                logger.error(
+                    f"Attempt {attempt}/{tries} failed{action_text}. Aborting."
+                )
+                raise
+
+    assert False, "unreachable"
diff --git a/tests/test_retry.py b/tests/test_retry.py
new file mode 100644
index 0000000..4cc5880
--- /dev/null
+++ b/tests/test_retry.py
@@ -0,0 +1,70 @@
+import unittest.mock as mock
+from typing import Literal, Never
+
+import pytest
+from git_hg_sync.retry import retry, logger
+
+
+def test_retry_does_not_log_error_on_immediate_success():
+    def true_on_first_call():
+        return True
+
+    with mock.patch.object(logger, "error", wraps=logger.error) as error_spy:
+        assert retry(true_on_first_call, tries=2) is True
+
+        error_spy.assert_not_called()
+
+
+def test_retry_logs_error_on_first_failing_try():
+    count = 0
+
+    def true_on_second_call() -> Literal[True]:
+        nonlocal count
+        if count == 0:
+            count += 1
+            raise Exception("Error on first call")
+        return True
+
+    with mock.patch.object(logger, "error", wraps=logger.error) as error_spy:
+        assert retry(true_on_second_call, tries=2) is True
+        assert "Retrying" in error_spy.call_args.args[0]
+
+
+def test_retry_abort_on_failing_last_try():
+    def always_raising() -> Never:
+        raise Exception("Error on first call")
+
+    with mock.patch.object(logger, "error", wraps=logger.error) as error_spy:
+        with pytest.raises(Exception):
+            retry(always_raising, tries=2)
+
+        assert "Aborting" in error_spy.call_args.args[0]
+
+
+def test_retry_raises_inner_exception_on_last_failure():
+    count = 0
+
+    class MyCustomError(Exception):
+        pass
+
+    def always_raising():
+        nonlocal count
+        count += 1
+        raise MyCustomError(f"Called {count} times")
+
+    with pytest.raises(MyCustomError) as exc_info:
+        retry(always_raising, tries=3)
+
+    assert exc_info.value.args[0] == "Called 3 times"
+
+
+def test_retry_shows_action_on_failed_attempt(caplog):
+
+    def always_raising():
+        raise ValueError("Commits too old")
+
+    with mock.patch.object(logger, "error", wraps=logger.error) as error_spy:
+        with pytest.raises(ValueError):
+            retry(always_raising, tries=2, action="fetching old commits")
+
+    assert "while fetching old commits" in error_spy.call_args.args[0]

From 65fe56ff8d37534ec5d3f3f75dce957c1b295b7d Mon Sep 17 00:00:00 2001
From: Frank Bessou <frank.bessou@logilab.fr>
Date: Tue, 7 Jan 2025 20:28:38 +0100
Subject: [PATCH 08/10] feat: retry git commands that use network in case of
 failure

---
 git_hg_sync/repo_synchronizer.py | 51 +++++++++++++++++++++++++-------
 1 file changed, 40 insertions(+), 11 deletions(-)

diff --git a/git_hg_sync/repo_synchronizer.py b/git_hg_sync/repo_synchronizer.py
index ea0589d..f504a52 100644
--- a/git_hg_sync/repo_synchronizer.py
+++ b/git_hg_sync/repo_synchronizer.py
@@ -5,6 +5,7 @@
 
 from git import Repo, exc
 from git_hg_sync.mapping import SyncOperation, SyncBranchOperation, SyncTagOperation
+from git_hg_sync.retry import retry
 from mozlog import get_proxy_logger
 
 
@@ -90,14 +91,22 @@ def sync(self, destination_url: str, operations: list[SyncOperation]) -> None:
 
         # Ensure we have all commits from destination repository
         logger.debug(f"Fetching all commits from destination. {self._log_data()}")
-        self._fetch_all_from_remote(repo, destination_remote)
+        retry(
+            lambda: self._fetch_all_from_remote(repo, destination_remote),
+            tries=2,
+            action="fetching commits from destination",
+        )
 
         # Get commits we want to send to destination repository
         commits_to_fetch = [operation.source_commit for operation in operations]
         logger.debug(
             f"Fetching source commits. {self._log_data(commits=commits_to_fetch)}"
         )
-        repo.git.fetch([self._src_remote, *commits_to_fetch])
+        retry(
+            lambda: repo.git.fetch([self._src_remote, *commits_to_fetch]),
+            tries=2,
+            action="fetching source commits",
+        )
 
         push_args = [destination_remote]
 
@@ -118,8 +127,19 @@ def sync(self, destination_url: str, operations: list[SyncOperation]) -> None:
             logger.debug(
                 f"Adding mercurial metadata to git commits. {self._log_data(args=push_args)}"
             )
-            repo.git.execute(
-                ["git", "-c", "cinnabar.data=force", "push", "--dry-run", *push_args]
+            retry(
+                lambda: repo.git.execute(
+                    [
+                        "git",
+                        "-c",
+                        "cinnabar.data=force",
+                        "push",
+                        "--dry-run",
+                        *push_args,
+                    ]
+                ),
+                tries=2,
+                action="adding mercurial metadata to git commits",
             )
 
         # Handle tag operations
@@ -133,12 +153,16 @@ def sync(self, destination_url: str, operations: list[SyncOperation]) -> None:
             logger.debug(
                 f"Get tag branch from destination. {self._log_data(tag_branch=tag_branch)}."
             )
-            repo.git.fetch(
-                [
-                    "-f",
-                    destination_remote,
-                    f"refs/heads/branches/{tag_branch}/tip:{tag_branch}",
-                ]
+            retry(
+                lambda: repo.git.fetch(
+                    [
+                        "-f",
+                        destination_remote,
+                        f"refs/heads/branches/{tag_branch}/tip:{tag_branch}",
+                    ]
+                ),
+                tries=2,
+                action="getting tag branch from destination",
             )
             push_args.append(f"{tag_branch}:refs/heads/branches/{tag_branch}/tip")
 
@@ -167,4 +191,9 @@ def sync(self, destination_url: str, operations: list[SyncOperation]) -> None:
         logger.debug(
             f"Pushing branches and tags to destination. {self._log_data(command=logged_command)}"
         )
-        repo.git.push(*push_args)
+        retry(
+            lambda: repo.git.push(*push_args),
+            tries=2,
+            delay=5,
+            action="pushing branch and tags to destination",
+        )

From 9f033466ffcc0b656f74135b96fb4e620dc9d64a Mon Sep 17 00:00:00 2001
From: Frank Bessou <frank.bessou@logilab.fr>
Date: Tue, 7 Jan 2025 21:04:39 +0100
Subject: [PATCH 09/10] fix: take into account logger in cli

It seems that specifying default args in `setup_logging` bypasses the user provided logger.
---
 git_hg_sync/__main__.py | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/git_hg_sync/__main__.py b/git_hg_sync/__main__.py
index 6cba536..272e46e 100644
--- a/git_hg_sync/__main__.py
+++ b/git_hg_sync/__main__.py
@@ -1,5 +1,4 @@
 import argparse
-import sys
 import logging
 from pathlib import Path
 
@@ -73,7 +72,7 @@ def main() -> None:
     parser = get_parser()
     commandline.add_logging_group(parser)
     args = parser.parse_args()
-    logger = commandline.setup_logging("service", args, {"raw": sys.stdout})
+    logger = commandline.setup_logging("service", args)
     config = Config.from_file(args.config)
 
     sentry_config = config.sentry

From 578b084283f629c657df0514c718d54834c107ca Mon Sep 17 00:00:00 2001
From: Frank Bessou <frank.bessou@logilab.fr>
Date: Tue, 7 Jan 2025 21:06:18 +0100
Subject: [PATCH 10/10] feat: retry whole execution of destination/operations
 syncing on failure

---
 git_hg_sync/application.py | 23 +++++++++++++++++++++--
 1 file changed, 21 insertions(+), 2 deletions(-)

diff --git a/git_hg_sync/application.py b/git_hg_sync/application.py
index d6f8d7b..1e7b42d 100644
--- a/git_hg_sync/application.py
+++ b/git_hg_sync/application.py
@@ -11,6 +11,7 @@
 from git_hg_sync.mapping import Mapping, SyncOperation
 from git_hg_sync.pulse_worker import PulseWorker
 from git_hg_sync.repo_synchronizer import RepoSynchronizer
+from git_hg_sync.retry import retry
 
 logger = get_proxy_logger(__name__)
 
@@ -54,11 +55,29 @@ def _handle_push_event(self, push_event: Push) -> None:
                     ).append(match.operation)
 
         for destination, operations in operations_by_destination.items():
-            synchronizer.sync(destination, operations)
+            try:
+                retry(
+                    lambda: synchronizer.sync(destination, operations),
+                    tries=3,
+                    action="executing sync operations",
+                    delay=5,
+                )
+            except Exception:
+                error_data = json.dumps(
+                    {
+                        "destination_url": destination,
+                        "operations": [asdict(operation) for operation in operations],
+                    }
+                )
+                logger.error(
+                    f"An error prevented completion of the following sync operations. {error_data}",
+                    exc_info=True,
+                )
 
     def _handle_event(self, event: Push | Tag) -> None:
         if event.repo_url not in self._repo_synchronizers:
-            logger.info("Ignoring event for untracked repository: %()s", event.repo_url)
+            ignored_event = json.dumps(asdict(event))
+            logger.info(f"Ignoring event for untracked repository. {ignored_event}")
             return
         match event:
             case Push():