From ef55ffb0eea47f0c7004338137fff5e729abf72f Mon Sep 17 00:00:00 2001 From: Frank Bessou <frank.bessou@logilab.fr> Date: Tue, 7 Jan 2025 11:52:00 +0100 Subject: [PATCH 01/10] fix: typo on should_stop when handling SIGINT --- git_hg_sync/application.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/git_hg_sync/application.py b/git_hg_sync/application.py index a4e8a6b..bbf922c 100644 --- a/git_hg_sync/application.py +++ b/git_hg_sync/application.py @@ -31,7 +31,7 @@ def signal_handler(sig: int, frame: Optional[FrameType]) -> None: if self._worker.should_stop: logger.info("Process killed by user") sys.exit(1) - self._worker.shoud_stop = True + self._worker.should_stop = True logger.info("Process exiting gracefully") signal.signal(signal.SIGINT, signal_handler) From af57cb596e8cc4408ca8cf268c4c896110c8bcbb Mon Sep 17 00:00:00 2001 From: Frank Bessou <frank.bessou@logilab.fr> Date: Tue, 7 Jan 2025 18:51:07 +0100 Subject: [PATCH 02/10] feat: add operation type (as literal) into each operation --- git_hg_sync/mapping.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/git_hg_sync/mapping.py b/git_hg_sync/mapping.py index 99e1dd6..d2b67b9 100644 --- a/git_hg_sync/mapping.py +++ b/git_hg_sync/mapping.py @@ -1,7 +1,7 @@ import re from dataclasses import dataclass from functools import cached_property -from typing import Sequence, TypeAlias +from typing import Sequence, TypeAlias, Literal import pydantic @@ -16,6 +16,8 @@ class SyncBranchOperation: # Destination (hg) destination_branch: str + type: Literal["SyncBranchOperation"] = "SyncBranchOperation" + @dataclass class SyncTagOperation: @@ -26,6 +28,8 @@ class SyncTagOperation: tag: str tags_destination_branch: str + type: Literal["SyncTagOperation"] = "SyncTagOperation" + SyncOperation: TypeAlias = SyncBranchOperation | SyncTagOperation From 758493f4e6389cb3ff16e2b0a3cebd131494711e Mon Sep 17 00:00:00 2001 From: Frank Bessou <frank.bessou@logilab.fr> Date: Tue, 7 Jan 2025 19:02:55 +0100 Subject: [PATCH 03/10] feat: use module name as component for logger --- git_hg_sync/pulse_worker.py | 2 +- git_hg_sync/repo_synchronizer.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/git_hg_sync/pulse_worker.py b/git_hg_sync/pulse_worker.py index 0ec8ebb..badc40d 100644 --- a/git_hg_sync/pulse_worker.py +++ b/git_hg_sync/pulse_worker.py @@ -5,7 +5,7 @@ from git_hg_sync.events import Push, Tag -logger = get_proxy_logger("pluse_consumer") +logger = get_proxy_logger(__name__) class EventHandler(Protocol): diff --git a/git_hg_sync/repo_synchronizer.py b/git_hg_sync/repo_synchronizer.py index 2317b7f..53a85f9 100644 --- a/git_hg_sync/repo_synchronizer.py +++ b/git_hg_sync/repo_synchronizer.py @@ -5,7 +5,8 @@ from git_hg_sync.mapping import SyncOperation, SyncBranchOperation, SyncTagOperation from mozlog import get_proxy_logger -logger = get_proxy_logger("sync_repo") + +logger = get_proxy_logger(__name__) class RepoSyncError(Exception): From 889cad303451f4198d11be5a0c1c0b364838e685 Mon Sep 17 00:00:00 2001 From: Frank Bessou <frank.bessou@logilab.fr> Date: Tue, 7 Jan 2025 19:04:05 +0100 Subject: [PATCH 04/10] tests: enable logging to stdout in tests --- tests/conftest.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/conftest.py b/tests/conftest.py index cb84835..e8cf94a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,12 +1,17 @@ +import sys + import mozlog import pytest from git_hg_sync.config import PulseConfig -@pytest.fixture(autouse=True) +@pytest.fixture(autouse=True, scope="session") def mozlog_logging() -> None: logger = mozlog.structuredlog.StructuredLogger("tests") + logger.add_handler( + mozlog.handlers.StreamHandler(sys.stdout, mozlog.formatters.JSONFormatter()) + ) mozlog.structuredlog.set_default_logger(logger) From a7d16ccb9a0894fca167bf05aa56bfcfa12f691d Mon Sep 17 00:00:00 2001 From: Frank Bessou <frank.bessou@logilab.fr> Date: Tue, 7 Jan 2025 19:04:30 +0100 Subject: [PATCH 05/10] feat: add logging to repo synchronizer --- git_hg_sync/repo_synchronizer.py | 61 ++++++++++++++++++++++++++++---- 1 file changed, 55 insertions(+), 6 deletions(-) diff --git a/git_hg_sync/repo_synchronizer.py b/git_hg_sync/repo_synchronizer.py index 53a85f9..ea0589d 100644 --- a/git_hg_sync/repo_synchronizer.py +++ b/git_hg_sync/repo_synchronizer.py @@ -1,3 +1,5 @@ +import json +from dataclasses import asdict from pathlib import Path @@ -18,7 +20,6 @@ class MercurialMetadataNotFoundError(RepoSyncError): class RepoSynchronizer: - def __init__( self, clone_directory: Path, @@ -27,12 +28,21 @@ def __init__( self._clone_directory = clone_directory self._src_remote = url + def _repo_config_as_dict(self, repo: Repo): + with repo.config_reader() as reader: + return { + section: dict(reader.items(section)) for section in reader.sections() + } + def _get_clone_repo(self) -> Repo: """Get a GitPython Repo object pointing to a git clone of the source remote.""" + log_data = json.dumps({"clone_directory": str(self._clone_directory)}) if self._clone_directory.exists(): + logger.debug(f"Clone directory exists. Using it .{log_data}") repo = Repo(self._clone_directory) else: + logger.debug(f"Clone directory does not exist. Creating it. {log_data}") repo = Repo.init(self._clone_directory) # Ensure that the clone repository is well configured @@ -50,18 +60,43 @@ def _fetch_all_from_remote(self, repo: Repo, remote: str) -> None: repo.git.fetch([remote]) except exc.GitCommandError as e: # can't fetch if repo is empty - if "fatal: couldn't find remote ref HEAD" not in e.stderr: - raise e + if "fatal: couldn't find remote ref HEAD" in e.stderr: + return + raise + + def _log_data(self, **kwargs): + return json.dumps( + { + "clone_directory": str(self._clone_directory), + "source_remote": self._src_remote, + **kwargs, + } + ) def sync(self, destination_url: str, operations: list[SyncOperation]) -> None: - repo = self._get_clone_repo() destination_remote = f"hg::{destination_url}" + json_operations = self._log_data( + destination_remote=destination_remote, + operations=[asdict(op) for op in operations], + ) + logger.info(f"Synchronizing. {json_operations}") + + repo = self._get_clone_repo() + + logger.debug( + f"Git clone configuration. {self._log_data(configuration=self._repo_config_as_dict(repo))}" + ) + # Ensure we have all commits from destination repository + logger.debug(f"Fetching all commits from destination. {self._log_data()}") self._fetch_all_from_remote(repo, destination_remote) # Get commits we want to send to destination repository commits_to_fetch = [operation.source_commit for operation in operations] + logger.debug( + f"Fetching source commits. {self._log_data(commits=commits_to_fetch)}" + ) repo.git.fetch([self._src_remote, *commits_to_fetch]) push_args = [destination_remote] @@ -80,6 +115,9 @@ def sync(self, destination_url: str, operations: list[SyncOperation]) -> None: # tagging can only be done on a commit that already have mercurial # metadata if branch_ops: + logger.debug( + f"Adding mercurial metadata to git commits. {self._log_data(args=push_args)}" + ) repo.git.execute( ["git", "-c", "cinnabar.data=force", "push", "--dry-run", *push_args] ) @@ -92,6 +130,9 @@ def sync(self, destination_url: str, operations: list[SyncOperation]) -> None: # Create tag branches locally tag_branches = set([op.tags_destination_branch for op in tag_ops]) for tag_branch in tag_branches: + logger.debug( + f"Get tag branch from destination. {self._log_data(tag_branch=tag_branch)}." + ) repo.git.fetch( [ "-f", @@ -103,10 +144,14 @@ def sync(self, destination_url: str, operations: list[SyncOperation]) -> None: # Create tags for tag_operation in tag_ops: - if not self._commit_has_mercurial_metadata( + commit_has_metadata = self._commit_has_mercurial_metadata( repo, tag_operation.source_commit - ): + ) + if not commit_has_metadata: raise MercurialMetadataNotFoundError() + logger.debug( + f"Creating tag. {self._log_data(operation=asdict(tag_operation))}" + ) repo.git.cinnabar( [ "tag", @@ -118,4 +163,8 @@ def sync(self, destination_url: str, operations: list[SyncOperation]) -> None: ) # Push commits, branches and tags to destination + logged_command = ["git", "push", *push_args] + logger.debug( + f"Pushing branches and tags to destination. {self._log_data(command=logged_command)}" + ) repo.git.push(*push_args) From 3c4aa03d40b1bb04f4879e35b60c31eea512d99d Mon Sep 17 00:00:00 2001 From: Frank Bessou <frank.bessou@logilab.fr> Date: Tue, 7 Jan 2025 19:10:11 +0100 Subject: [PATCH 06/10] feat: improve logging of application --- git_hg_sync/application.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/git_hg_sync/application.py b/git_hg_sync/application.py index bbf922c..d6f8d7b 100644 --- a/git_hg_sync/application.py +++ b/git_hg_sync/application.py @@ -1,3 +1,5 @@ +from dataclasses import asdict +import json import signal import sys from types import FrameType @@ -29,7 +31,7 @@ def __init__( def run(self) -> None: def signal_handler(sig: int, frame: Optional[FrameType]) -> None: if self._worker.should_stop: - logger.info("Process killed by user") + logger.error("Process killed by user") sys.exit(1) self._worker.should_stop = True logger.info("Process exiting gracefully") @@ -38,7 +40,9 @@ def signal_handler(sig: int, frame: Optional[FrameType]) -> None: self._worker.run() def _handle_push_event(self, push_event: Push) -> None: - logger.info(f"Handling push event: {push_event.pushid}") + json_event = json.dumps(asdict(push_event)) + logger.info(f"Handling push event. {json_event}") + synchronizer = self._repo_synchronizers[push_event.repo_url] operations_by_destination: dict[str, list[SyncOperation]] = {} From 3f3de3b90b37e174cfabf1108efdb8b9d1efc5e3 Mon Sep 17 00:00:00 2001 From: Frank Bessou <frank.bessou@logilab.fr> Date: Tue, 7 Jan 2025 20:14:10 +0100 Subject: [PATCH 07/10] feat: add a retry function --- git_hg_sync/retry.py | 47 +++++++++++++++++++++++++++++ tests/test_retry.py | 70 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 117 insertions(+) create mode 100644 git_hg_sync/retry.py create mode 100644 tests/test_retry.py diff --git a/git_hg_sync/retry.py b/git_hg_sync/retry.py new file mode 100644 index 0000000..766a812 --- /dev/null +++ b/git_hg_sync/retry.py @@ -0,0 +1,47 @@ +from collections.abc import Callable +import time +from typing import TypeVar + +from mozlog import get_proxy_logger + +TResult = TypeVar("TResult") + +logger = get_proxy_logger(__name__) + + +def retry( + func: Callable[[], TResult], tries: int, action: str = "", delay: float = 0.0 +) -> TResult: + """ + Retry a function on failure. + Args: + func (Callable): The function to retry. + tries (int): The number of attempts to make before failing. + action (str): A description of the action being performed for better logging context (eg. "fetching commits") + delay (float): The delay in seconds between attempts. + + Returns: + _Ret: The return value of the function if successful. + + Raises: + Exception: The last exception raised if all attempts fail. + """ + for attempt in range(1, tries + 1): + try: + return func() + except Exception as exc: + action_text = f" while {action}" if action else "" + if attempt < tries: + logger.error( + f"Attempt {attempt}/{tries} failed{action_text} with error: {type(exc).__name__}: {exc}. Retrying...", + exc_info=True, + ) + if delay > 0: + time.sleep(delay) + else: + logger.error( + f"Attempt {attempt}/{tries} failed{action_text}. Aborting." + ) + raise + + assert False, "unreachable" diff --git a/tests/test_retry.py b/tests/test_retry.py new file mode 100644 index 0000000..4cc5880 --- /dev/null +++ b/tests/test_retry.py @@ -0,0 +1,70 @@ +import unittest.mock as mock +from typing import Literal, Never + +import pytest +from git_hg_sync.retry import retry, logger + + +def test_retry_does_not_log_error_on_immediate_success(): + def true_on_first_call(): + return True + + with mock.patch.object(logger, "error", wraps=logger.error) as error_spy: + assert retry(true_on_first_call, tries=2) is True + + error_spy.assert_not_called() + + +def test_retry_logs_error_on_first_failing_try(): + count = 0 + + def true_on_second_call() -> Literal[True]: + nonlocal count + if count == 0: + count += 1 + raise Exception("Error on first call") + return True + + with mock.patch.object(logger, "error", wraps=logger.error) as error_spy: + assert retry(true_on_second_call, tries=2) is True + assert "Retrying" in error_spy.call_args.args[0] + + +def test_retry_abort_on_failing_last_try(): + def always_raising() -> Never: + raise Exception("Error on first call") + + with mock.patch.object(logger, "error", wraps=logger.error) as error_spy: + with pytest.raises(Exception): + retry(always_raising, tries=2) + + assert "Aborting" in error_spy.call_args.args[0] + + +def test_retry_raises_inner_exception_on_last_failure(): + count = 0 + + class MyCustomError(Exception): + pass + + def always_raising(): + nonlocal count + count += 1 + raise MyCustomError(f"Called {count} times") + + with pytest.raises(MyCustomError) as exc_info: + retry(always_raising, tries=3) + + assert exc_info.value.args[0] == "Called 3 times" + + +def test_retry_shows_action_on_failed_attempt(caplog): + + def always_raising(): + raise ValueError("Commits too old") + + with mock.patch.object(logger, "error", wraps=logger.error) as error_spy: + with pytest.raises(ValueError): + retry(always_raising, tries=2, action="fetching old commits") + + assert "while fetching old commits" in error_spy.call_args.args[0] From 65fe56ff8d37534ec5d3f3f75dce957c1b295b7d Mon Sep 17 00:00:00 2001 From: Frank Bessou <frank.bessou@logilab.fr> Date: Tue, 7 Jan 2025 20:28:38 +0100 Subject: [PATCH 08/10] feat: retry git commands that use network in case of failure --- git_hg_sync/repo_synchronizer.py | 51 +++++++++++++++++++++++++------- 1 file changed, 40 insertions(+), 11 deletions(-) diff --git a/git_hg_sync/repo_synchronizer.py b/git_hg_sync/repo_synchronizer.py index ea0589d..f504a52 100644 --- a/git_hg_sync/repo_synchronizer.py +++ b/git_hg_sync/repo_synchronizer.py @@ -5,6 +5,7 @@ from git import Repo, exc from git_hg_sync.mapping import SyncOperation, SyncBranchOperation, SyncTagOperation +from git_hg_sync.retry import retry from mozlog import get_proxy_logger @@ -90,14 +91,22 @@ def sync(self, destination_url: str, operations: list[SyncOperation]) -> None: # Ensure we have all commits from destination repository logger.debug(f"Fetching all commits from destination. {self._log_data()}") - self._fetch_all_from_remote(repo, destination_remote) + retry( + lambda: self._fetch_all_from_remote(repo, destination_remote), + tries=2, + action="fetching commits from destination", + ) # Get commits we want to send to destination repository commits_to_fetch = [operation.source_commit for operation in operations] logger.debug( f"Fetching source commits. {self._log_data(commits=commits_to_fetch)}" ) - repo.git.fetch([self._src_remote, *commits_to_fetch]) + retry( + lambda: repo.git.fetch([self._src_remote, *commits_to_fetch]), + tries=2, + action="fetching source commits", + ) push_args = [destination_remote] @@ -118,8 +127,19 @@ def sync(self, destination_url: str, operations: list[SyncOperation]) -> None: logger.debug( f"Adding mercurial metadata to git commits. {self._log_data(args=push_args)}" ) - repo.git.execute( - ["git", "-c", "cinnabar.data=force", "push", "--dry-run", *push_args] + retry( + lambda: repo.git.execute( + [ + "git", + "-c", + "cinnabar.data=force", + "push", + "--dry-run", + *push_args, + ] + ), + tries=2, + action="adding mercurial metadata to git commits", ) # Handle tag operations @@ -133,12 +153,16 @@ def sync(self, destination_url: str, operations: list[SyncOperation]) -> None: logger.debug( f"Get tag branch from destination. {self._log_data(tag_branch=tag_branch)}." ) - repo.git.fetch( - [ - "-f", - destination_remote, - f"refs/heads/branches/{tag_branch}/tip:{tag_branch}", - ] + retry( + lambda: repo.git.fetch( + [ + "-f", + destination_remote, + f"refs/heads/branches/{tag_branch}/tip:{tag_branch}", + ] + ), + tries=2, + action="getting tag branch from destination", ) push_args.append(f"{tag_branch}:refs/heads/branches/{tag_branch}/tip") @@ -167,4 +191,9 @@ def sync(self, destination_url: str, operations: list[SyncOperation]) -> None: logger.debug( f"Pushing branches and tags to destination. {self._log_data(command=logged_command)}" ) - repo.git.push(*push_args) + retry( + lambda: repo.git.push(*push_args), + tries=2, + delay=5, + action="pushing branch and tags to destination", + ) From 9f033466ffcc0b656f74135b96fb4e620dc9d64a Mon Sep 17 00:00:00 2001 From: Frank Bessou <frank.bessou@logilab.fr> Date: Tue, 7 Jan 2025 21:04:39 +0100 Subject: [PATCH 09/10] fix: take into account logger in cli It seems that specifying default args in `setup_logging` bypasses the user provided logger. --- git_hg_sync/__main__.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/git_hg_sync/__main__.py b/git_hg_sync/__main__.py index 6cba536..272e46e 100644 --- a/git_hg_sync/__main__.py +++ b/git_hg_sync/__main__.py @@ -1,5 +1,4 @@ import argparse -import sys import logging from pathlib import Path @@ -73,7 +72,7 @@ def main() -> None: parser = get_parser() commandline.add_logging_group(parser) args = parser.parse_args() - logger = commandline.setup_logging("service", args, {"raw": sys.stdout}) + logger = commandline.setup_logging("service", args) config = Config.from_file(args.config) sentry_config = config.sentry From 578b084283f629c657df0514c718d54834c107ca Mon Sep 17 00:00:00 2001 From: Frank Bessou <frank.bessou@logilab.fr> Date: Tue, 7 Jan 2025 21:06:18 +0100 Subject: [PATCH 10/10] feat: retry whole execution of destination/operations syncing on failure --- git_hg_sync/application.py | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/git_hg_sync/application.py b/git_hg_sync/application.py index d6f8d7b..1e7b42d 100644 --- a/git_hg_sync/application.py +++ b/git_hg_sync/application.py @@ -11,6 +11,7 @@ from git_hg_sync.mapping import Mapping, SyncOperation from git_hg_sync.pulse_worker import PulseWorker from git_hg_sync.repo_synchronizer import RepoSynchronizer +from git_hg_sync.retry import retry logger = get_proxy_logger(__name__) @@ -54,11 +55,29 @@ def _handle_push_event(self, push_event: Push) -> None: ).append(match.operation) for destination, operations in operations_by_destination.items(): - synchronizer.sync(destination, operations) + try: + retry( + lambda: synchronizer.sync(destination, operations), + tries=3, + action="executing sync operations", + delay=5, + ) + except Exception: + error_data = json.dumps( + { + "destination_url": destination, + "operations": [asdict(operation) for operation in operations], + } + ) + logger.error( + f"An error prevented completion of the following sync operations. {error_data}", + exc_info=True, + ) def _handle_event(self, event: Push | Tag) -> None: if event.repo_url not in self._repo_synchronizers: - logger.info("Ignoring event for untracked repository: %()s", event.repo_url) + ignored_event = json.dumps(asdict(event)) + logger.info(f"Ignoring event for untracked repository. {ignored_event}") return match event: case Push():