Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more logging and ensure that stages that use network are retried before failing. #20

Closed
wants to merge 10 commits into from
3 changes: 1 addition & 2 deletions git_hg_sync/__main__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import argparse
import sys
import logging
from pathlib import Path

@@ -73,7 +72,7 @@ def main() -> None:
parser = get_parser()
commandline.add_logging_group(parser)
args = parser.parse_args()
logger = commandline.setup_logging("service", args, {"raw": sys.stdout})
logger = commandline.setup_logging("service", args)
config = Config.from_file(args.config)

sentry_config = config.sentry
33 changes: 28 additions & 5 deletions git_hg_sync/application.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from dataclasses import asdict
import json
import signal
import sys
from types import FrameType
@@ -9,6 +11,7 @@
from git_hg_sync.mapping import Mapping, SyncOperation
from git_hg_sync.pulse_worker import PulseWorker
from git_hg_sync.repo_synchronizer import RepoSynchronizer
from git_hg_sync.retry import retry

logger = get_proxy_logger(__name__)

@@ -29,16 +32,18 @@ def __init__(
def run(self) -> None:
def signal_handler(sig: int, frame: Optional[FrameType]) -> None:
if self._worker.should_stop:
logger.info("Process killed by user")
logger.error("Process killed by user")
sys.exit(1)
self._worker.shoud_stop = True
self._worker.should_stop = True
logger.info("Process exiting gracefully")

signal.signal(signal.SIGINT, signal_handler)
self._worker.run()

def _handle_push_event(self, push_event: Push) -> None:
logger.info(f"Handling push event: {push_event.pushid}")
json_event = json.dumps(asdict(push_event))
logger.info(f"Handling push event. {json_event}")

synchronizer = self._repo_synchronizers[push_event.repo_url]
operations_by_destination: dict[str, list[SyncOperation]] = {}

@@ -50,11 +55,29 @@ def _handle_push_event(self, push_event: Push) -> None:
).append(match.operation)

for destination, operations in operations_by_destination.items():
synchronizer.sync(destination, operations)
try:
retry(
lambda: synchronizer.sync(destination, operations),
tries=3,
action="executing sync operations",
delay=5,
)
except Exception:
error_data = json.dumps(
{
"destination_url": destination,
"operations": [asdict(operation) for operation in operations],
}
)
logger.error(
f"An error prevented completion of the following sync operations. {error_data}",
exc_info=True,
)

def _handle_event(self, event: Push | Tag) -> None:
if event.repo_url not in self._repo_synchronizers:
logger.info("Ignoring event for untracked repository: %()s", event.repo_url)
ignored_event = json.dumps(asdict(event))
logger.info(f"Ignoring event for untracked repository. {ignored_event}")
return
match event:
case Push():
6 changes: 5 additions & 1 deletion git_hg_sync/mapping.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import re
from dataclasses import dataclass
from functools import cached_property
from typing import Sequence, TypeAlias
from typing import Sequence, TypeAlias, Literal

import pydantic

@@ -16,6 +16,8 @@ class SyncBranchOperation:
# Destination (hg)
destination_branch: str

type: Literal["SyncBranchOperation"] = "SyncBranchOperation"


@dataclass
class SyncTagOperation:
@@ -26,6 +28,8 @@ class SyncTagOperation:
tag: str
tags_destination_branch: str

type: Literal["SyncTagOperation"] = "SyncTagOperation"


SyncOperation: TypeAlias = SyncBranchOperation | SyncTagOperation

2 changes: 1 addition & 1 deletion git_hg_sync/pulse_worker.py
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@

from git_hg_sync.events import Push, Tag

logger = get_proxy_logger("pluse_consumer")
logger = get_proxy_logger(__name__)


class EventHandler(Protocol):
115 changes: 97 additions & 18 deletions git_hg_sync/repo_synchronizer.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import json
from dataclasses import asdict
from pathlib import Path


from git import Repo, exc
from git_hg_sync.mapping import SyncOperation, SyncBranchOperation, SyncTagOperation
from git_hg_sync.retry import retry
from mozlog import get_proxy_logger

logger = get_proxy_logger("sync_repo")

logger = get_proxy_logger(__name__)


class RepoSyncError(Exception):
@@ -17,7 +21,6 @@ class MercurialMetadataNotFoundError(RepoSyncError):


class RepoSynchronizer:

def __init__(
self,
clone_directory: Path,
@@ -26,12 +29,21 @@ def __init__(
self._clone_directory = clone_directory
self._src_remote = url

def _repo_config_as_dict(self, repo: Repo):
with repo.config_reader() as reader:
return {
section: dict(reader.items(section)) for section in reader.sections()
}

def _get_clone_repo(self) -> Repo:
"""Get a GitPython Repo object pointing to a git clone of the source
remote."""
log_data = json.dumps({"clone_directory": str(self._clone_directory)})
if self._clone_directory.exists():
logger.debug(f"Clone directory exists. Using it .{log_data}")
repo = Repo(self._clone_directory)
else:
logger.debug(f"Clone directory does not exist. Creating it. {log_data}")
repo = Repo.init(self._clone_directory)

# Ensure that the clone repository is well configured
@@ -49,19 +61,52 @@ def _fetch_all_from_remote(self, repo: Repo, remote: str) -> None:
repo.git.fetch([remote])
except exc.GitCommandError as e:
# can't fetch if repo is empty
if "fatal: couldn't find remote ref HEAD" not in e.stderr:
raise e
if "fatal: couldn't find remote ref HEAD" in e.stderr:
return
raise

def _log_data(self, **kwargs):
return json.dumps(
{
"clone_directory": str(self._clone_directory),
"source_remote": self._src_remote,
**kwargs,
}
)

def sync(self, destination_url: str, operations: list[SyncOperation]) -> None:
repo = self._get_clone_repo()
destination_remote = f"hg::{destination_url}"

json_operations = self._log_data(
destination_remote=destination_remote,
operations=[asdict(op) for op in operations],
)
logger.info(f"Synchronizing. {json_operations}")

repo = self._get_clone_repo()

logger.debug(
f"Git clone configuration. {self._log_data(configuration=self._repo_config_as_dict(repo))}"
)

# Ensure we have all commits from destination repository
self._fetch_all_from_remote(repo, destination_remote)
logger.debug(f"Fetching all commits from destination. {self._log_data()}")
retry(
lambda: self._fetch_all_from_remote(repo, destination_remote),
tries=2,
action="fetching commits from destination",
)

# Get commits we want to send to destination repository
commits_to_fetch = [operation.source_commit for operation in operations]
repo.git.fetch([self._src_remote, *commits_to_fetch])
logger.debug(
f"Fetching source commits. {self._log_data(commits=commits_to_fetch)}"
)
retry(
lambda: repo.git.fetch([self._src_remote, *commits_to_fetch]),
tries=2,
action="fetching source commits",
)

push_args = [destination_remote]

@@ -79,8 +124,22 @@ def sync(self, destination_url: str, operations: list[SyncOperation]) -> None:
# tagging can only be done on a commit that already have mercurial
# metadata
if branch_ops:
repo.git.execute(
["git", "-c", "cinnabar.data=force", "push", "--dry-run", *push_args]
logger.debug(
f"Adding mercurial metadata to git commits. {self._log_data(args=push_args)}"
)
retry(
lambda: repo.git.execute(
[
"git",
"-c",
"cinnabar.data=force",
"push",
"--dry-run",
*push_args,
]
),
tries=2,
action="adding mercurial metadata to git commits",
)

# Handle tag operations
@@ -91,21 +150,32 @@ def sync(self, destination_url: str, operations: list[SyncOperation]) -> None:
# Create tag branches locally
tag_branches = set([op.tags_destination_branch for op in tag_ops])
for tag_branch in tag_branches:
repo.git.fetch(
[
"-f",
destination_remote,
f"refs/heads/branches/{tag_branch}/tip:{tag_branch}",
]
logger.debug(
f"Get tag branch from destination. {self._log_data(tag_branch=tag_branch)}."
)
retry(
lambda: repo.git.fetch(
[
"-f",
destination_remote,
f"refs/heads/branches/{tag_branch}/tip:{tag_branch}",
]
),
tries=2,
action="getting tag branch from destination",
)
push_args.append(f"{tag_branch}:refs/heads/branches/{tag_branch}/tip")

# Create tags
for tag_operation in tag_ops:
if not self._commit_has_mercurial_metadata(
commit_has_metadata = self._commit_has_mercurial_metadata(
repo, tag_operation.source_commit
):
)
if not commit_has_metadata:
raise MercurialMetadataNotFoundError()
logger.debug(
f"Creating tag. {self._log_data(operation=asdict(tag_operation))}"
)
repo.git.cinnabar(
[
"tag",
@@ -117,4 +187,13 @@ def sync(self, destination_url: str, operations: list[SyncOperation]) -> None:
)

# Push commits, branches and tags to destination
repo.git.push(*push_args)
logged_command = ["git", "push", *push_args]
logger.debug(
f"Pushing branches and tags to destination. {self._log_data(command=logged_command)}"
)
retry(
lambda: repo.git.push(*push_args),
tries=2,
delay=5,
action="pushing branch and tags to destination",
)
47 changes: 47 additions & 0 deletions git_hg_sync/retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from collections.abc import Callable
import time
from typing import TypeVar

from mozlog import get_proxy_logger

TResult = TypeVar("TResult")

logger = get_proxy_logger(__name__)


def retry(
func: Callable[[], TResult], tries: int, action: str = "", delay: float = 0.0
) -> TResult:
"""
Retry a function on failure.
Args:
func (Callable): The function to retry.
tries (int): The number of attempts to make before failing.
action (str): A description of the action being performed for better logging context (eg. "fetching commits")
delay (float): The delay in seconds between attempts.

Returns:
_Ret: The return value of the function if successful.

Raises:
Exception: The last exception raised if all attempts fail.
"""
for attempt in range(1, tries + 1):
try:
return func()
except Exception as exc:
action_text = f" while {action}" if action else ""
if attempt < tries:
logger.error(
f"Attempt {attempt}/{tries} failed{action_text} with error: {type(exc).__name__}: {exc}. Retrying...",
exc_info=True,
)
if delay > 0:
time.sleep(delay)
else:
logger.error(
f"Attempt {attempt}/{tries} failed{action_text}. Aborting."
)
raise

assert False, "unreachable"
7 changes: 6 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import sys

import mozlog
import pytest

from git_hg_sync.config import PulseConfig


@pytest.fixture(autouse=True)
@pytest.fixture(autouse=True, scope="session")
def mozlog_logging() -> None:
logger = mozlog.structuredlog.StructuredLogger("tests")
logger.add_handler(
mozlog.handlers.StreamHandler(sys.stdout, mozlog.formatters.JSONFormatter())
)
mozlog.structuredlog.set_default_logger(logger)


Loading
Oops, something went wrong.
Loading
Oops, something went wrong.