Skip to content

Commit

Permalink
fix(core): automatically cleanup dangling git processes (#2928)
Browse files Browse the repository at this point in the history
  • Loading branch information
Panaetius committed May 31, 2022
1 parent 5cc006c commit 56b06b5
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 64 deletions.
5 changes: 5 additions & 0 deletions renku/command/command_builder/command.py
Expand Up @@ -171,6 +171,7 @@ def __init__(self) -> None:
self._track_std_streams: bool = False
self._working_directory: Optional[str] = None
self._client: Optional["LocalClient"] = None
self._client_was_created: bool = False

def __getattr__(self, name: str) -> Any:
"""Bubble up attributes of wrapped builders."""
Expand Down Expand Up @@ -205,6 +206,7 @@ def _injection_pre_hook(self, builder: "Command", context: dict, *args, **kwargs
dispatcher.push_created_client_to_stack(self._client)
else:
self._client = dispatcher.push_client_to_stack(path=default_path(self._working_directory or "."))
self._client_was_created = True
ctx = click.Context(click.Command(builder._operation)) # type: ignore
else:
if not self._client:
Expand Down Expand Up @@ -237,6 +239,9 @@ def _post_hook(self, builder: "Command", context: dict, result: "CommandResult",
"""
remove_injector()

if self._client_was_created and self._client and self._client.repository is not None:
self._client.repository.close()

if result.error:
raise result.error

Expand Down
4 changes: 4 additions & 0 deletions renku/core/management/git.py
Expand Up @@ -295,6 +295,10 @@ def __attrs_post_init__(self):
except errors.GitError:
self.repository = None

def __del__(self):
if self.repository:
self.repository.close()

@property
def modified_paths(self):
"""Return paths of modified files."""
Expand Down
71 changes: 65 additions & 6 deletions renku/infrastructure/repository.py
Expand Up @@ -86,6 +86,15 @@ def __init__(self, path: Union[Path, str] = ".", repository: Optional[git.Repo]
def __repr__(self) -> str:
return f"<{self.__class__.__name__} {self.path}>"

def __enter__(self):
return self

def __exit__(self, *args):
self.close()

def __del__(self):
self.close()

@property
def path(self) -> Path:
"""Absolute path to the repository's root."""
Expand Down Expand Up @@ -675,6 +684,16 @@ def get_user(self) -> "Actor":
configuration = self.get_configuration()
return Repository._get_user_from_configuration(configuration)

def close(self) -> None:
"""Close the underlying repository.
Cleans up dangling processes.
"""
if getattr(self, "_repository", None) is not None:
self._repository.close() # type:ignore
del self._repository
self._repository = None

@staticmethod
def get_global_user() -> "Actor":
"""Return the global git user."""
Expand Down Expand Up @@ -795,6 +814,7 @@ def __init__(
self, path: Union[Path, str] = ".", search_parent_directories: bool = False, repository: git.Repo = None
):
repo = repository or _create_repository(path, search_parent_directories)

super().__init__(path=Path(repo.working_dir).resolve(), repository=repo) # type: ignore

@classmethod
Expand Down Expand Up @@ -864,7 +884,7 @@ def __init__(self, parent: git.Repo, name: str, path: Union[Path, str], url: str
self._name: str = name
self._url: str = url
try:
self._repository: git.Repo = _create_repository(path, search_parent_directories=False)
self._repository: Optional[git.Repo] = _create_repository(path, search_parent_directories=False)
except errors.GitError:
# NOTE: Submodule directory doesn't exist yet, so, we ignore the error
pass
Expand All @@ -881,6 +901,12 @@ def __str__(self) -> str:
def __repr__(self) -> str:
return f"<Submodule {self.relative_path}>"

def __del__(self) -> None:
if getattr(self, "_repository", None) is not None:
self._repository.close() # type:ignore
del self._repository
self._repository = None

@property
def name(self) -> str:
"""Return submodule's name."""
Expand All @@ -901,42 +927,75 @@ class SubmoduleManager:
"""Manage submodules of a Repository."""

def __init__(self, repository: git.Repo):
self._repository = repository
self._repository: Optional[git.Repo] = repository
self._submodule_cache: Dict[str, Submodule] = {} # type: ignore
try:
self.update()
except errors.GitError:
# NOTE: Update fails if submodule repo cannot be cloned. Repository still works but submodules are broken.
pass

def _get_submodule(self, submodule: git.Submodule) -> Submodule: # type: ignore
"""Get a submodule from local cache."""
if self._repository is None:
raise errors.ParameterError("Repository not set.")

if submodule.name not in self._submodule_cache:
submodule_result = Submodule.from_submodule(self._repository, submodule)
self._submodule_cache[submodule.name] = submodule_result
return self._submodule_cache[submodule.name]

def __getitem__(self, name: str) -> Submodule:
if self._repository is None:
raise errors.ParameterError("Repository not set.")

try:
submodule = self._repository.submodules[name]
except IndexError:
raise errors.GitError(f"Submodule '{name}' not found")
else:
return Submodule.from_submodule(self._repository, submodule)
return self._get_submodule(submodule)

def __iter__(self):
return (Submodule.from_submodule(self._repository, s) for s in self._repository.submodules)
if self._repository is None:
raise errors.ParameterError("Repository not set.")

for s in self._repository.submodules:

yield self._get_submodule(s)

def __len__(self) -> int:
if self._repository is None:
raise errors.ParameterError("Repository not set.")

return len(self._repository.submodules)

def __repr__(self) -> str:
return str(list(self))

def remove(self, submodule: Union[Submodule, str], force: bool = False):
"""Remove an existing submodule."""
if self._repository is None:
raise errors.ParameterError("Repository not set.")

name = submodule if isinstance(submodule, str) else submodule.name

try:
submodule = self._repository.submodules[name]
submodule.remove(force=force)
git_submodule = self._repository.submodules[name]
git_submodule.remove(force=force)

if name in self._submodule_cache:
submodule = self._submodule_cache[name]
del self._submodule_cache[name]
submodule.close()
except git.GitError as e:
raise errors.GitError(f"Cannot delete submodule '{submodule}'") from e

def update(self, initialize: bool = True):
"""Update all submodule."""
if self._repository is None:
raise errors.ParameterError("Repository not set.")

# NOTE: Git complains if ``--init`` comes before ``update``
args = ("update", "--init") if initialize else ("update",)
_run_git_command(self._repository, "submodule", *args)
Expand Down
117 changes: 59 additions & 58 deletions renku/ui/service/controllers/api/mixins.py
Expand Up @@ -190,37 +190,37 @@ def execute_op(self):
ref = self.request_data.get("ref", None)

if ref:
repository = Repository(project.abs_path)
if ref != repository.active_branch.name:
# NOTE: Command called for different branch than the one used in cache, change branch
if len(repository.remotes) != 1:
raise RenkuException("Couldn't find remote for project in cache.")
origin = repository.remotes[0]
remote_branch = f"{origin}/{ref}"

with project.write_lock():
# NOTE: Add new ref to remote branches
repository.run_git_command("remote", "set-branches", "--add", origin, ref)
if self.migrate_project or self.clone_depth == PROJECT_CLONE_NO_DEPTH:
repository.fetch(origin, ref)
else:
repository.fetch(origin, ref, depth=self.clone_depth)

# NOTE: Switch to new ref
repository.run_git_command("checkout", "--track", "-f", "-b", ref, remote_branch)

# NOTE: cleanup remote branches in case a remote was deleted (fetch fails otherwise)
repository.run_git_command("remote", "prune", origin)

for branch in repository.branches:
if branch.remote_branch and not branch.remote_branch.is_valid():
repository.branches.remove(branch, force=True)
# NOTE: Remove left-over refspec
try:
with repository.get_configuration(writable=True) as config:
config.remove_value(f"remote.{origin}.fetch", f"origin.{branch}$")
except GitConfigurationError:
pass
with Repository(project.abs_path) as repository:
if ref != repository.active_branch.name:
# NOTE: Command called for different branch than the one used in cache, change branch
if len(repository.remotes) != 1:
raise RenkuException("Couldn't find remote for project in cache.")
origin = repository.remotes[0]
remote_branch = f"{origin}/{ref}"

with project.write_lock():
# NOTE: Add new ref to remote branches
repository.run_git_command("remote", "set-branches", "--add", origin, ref)
if self.migrate_project or self.clone_depth == PROJECT_CLONE_NO_DEPTH:
repository.fetch(origin, ref)
else:
repository.fetch(origin, ref, depth=self.clone_depth)

# NOTE: Switch to new ref
repository.run_git_command("checkout", "--track", "-f", "-b", ref, remote_branch)

# NOTE: cleanup remote branches in case a remote was deleted (fetch fails otherwise)
repository.run_git_command("remote", "prune", origin)

for branch in repository.branches:
if branch.remote_branch and not branch.remote_branch.is_valid():
repository.branches.remove(branch, force=True)
# NOTE: Remove left-over refspec
try:
with repository.get_configuration(writable=True) as config:
config.remove_value(f"remote.{origin}.fetch", f"origin.{branch}$")
except GitConfigurationError:
pass
else:
self.reset_local_repo(project)

Expand Down Expand Up @@ -250,33 +250,33 @@ def reset_local_repo(self, project):
# NOTE: return immediately in case of multiple writers waiting
return

repository = Repository(project.abs_path)
origin = None
tracking_branch = repository.active_branch.remote_branch
if tracking_branch:
origin = tracking_branch.remote
elif len(repository.remotes) == 1:
origin = repository.remotes[0]

if origin:
unshallow = self.migrate_project or self.clone_depth == PROJECT_CLONE_NO_DEPTH
if unshallow:
try:
# NOTE: It could happen that repository is already un-shallowed,
# in this case we don't want to leak git exception, but still want to fetch.
repository.fetch("origin", repository.active_branch, unshallow=True)
except GitCommandError:
repository.fetch("origin", repository.active_branch)

repository.reset(f"{origin}/{repository.active_branch}", hard=True)
else:
try:
# NOTE: it rarely happens that origin is not reachable. Try again if it fails.
repository.fetch("origin", repository.active_branch)
with Repository(project.abs_path) as repository:
origin = None
tracking_branch = repository.active_branch.remote_branch
if tracking_branch:
origin = tracking_branch.remote
elif len(repository.remotes) == 1:
origin = repository.remotes[0]

if origin:
unshallow = self.migrate_project or self.clone_depth == PROJECT_CLONE_NO_DEPTH
if unshallow:
try:
# NOTE: It could happen that repository is already un-shallowed,
# in this case we don't want to leak git exception, but still want to fetch.
repository.fetch("origin", repository.active_branch, unshallow=True)
except GitCommandError:
repository.fetch("origin", repository.active_branch)

repository.reset(f"{origin}/{repository.active_branch}", hard=True)
except GitCommandError as e:
project.purge()
raise IntermittentCacheError(e)
else:
try:
# NOTE: it rarely happens that origin is not reachable. Try again if it fails.
repository.fetch("origin", repository.active_branch)
repository.reset(f"{origin}/{repository.active_branch}", hard=True)
except GitCommandError as e:
project.purge()
raise IntermittentCacheError(e)
project.last_fetched_at = datetime.utcnow()
project.save()
except (portalocker.LockException, portalocker.AlreadyLocked) as e:
Expand Down Expand Up @@ -346,7 +346,8 @@ def sync(self, remote="origin"):
if self.project_path is None:
raise RenkuException("unable to sync with remote since no operation has been executed")

return push_changes(Repository(self.project_path), remote=remote)
with Repository(self.project_path) as repository:
return push_changes(repository, remote=remote)

def execute_and_sync(self, remote="origin"):
"""Execute operation which controller implements and sync with the remote."""
Expand Down

0 comments on commit 56b06b5

Please sign in to comment.