Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core): improve renku status performance #2482

Merged
merged 5 commits into from
Dec 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
150 changes: 51 additions & 99 deletions poetry.lock

Large diffs are not rendered by default.

9 changes: 4 additions & 5 deletions renku/core/commands/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from renku.core.management.interface.client_dispatcher import IClientDispatcher
from renku.core.models.entity import Entity
from renku.core.models.provenance.activity import Activity
from renku.core.utils.metadata import add_activity_if_recent, get_modified_activities
from renku.core.utils.metadata import filter_overridden_activities, get_modified_activities
from renku.core.utils.os import get_relative_path_to_cwd, get_relative_paths


Expand Down Expand Up @@ -89,9 +89,8 @@ def _get_modified_paths(activity_gateway, repository) -> Tuple[Set[Tuple[Activit
"""Get modified and deleted usages/inputs of a list of activities."""
all_activities = activity_gateway.get_all_activities()

relevant_activities = set()
for activity in all_activities:
add_activity_if_recent(activity, relevant_activities)
modified, deleted = get_modified_activities(activities=list(relevant_activities), repository=repository)
relevant_activities = filter_overridden_activities(all_activities)

modified, deleted = get_modified_activities(activities=relevant_activities, repository=repository)

return modified, {e.path for _, e in deleted}
6 changes: 2 additions & 4 deletions renku/core/commands/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from renku.core.management.workflow.activity import sort_activities
from renku.core.management.workflow.concrete_execution_graph import ExecutionGraph
from renku.core.models.provenance.activity import Activity
from renku.core.utils.metadata import add_activity_if_recent, get_modified_activities
from renku.core.utils.metadata import add_activity_if_recent, filter_overridden_activities, get_modified_activities
from renku.core.utils.os import get_relative_paths


Expand Down Expand Up @@ -117,9 +117,7 @@ def _is_activity_valid(activity: Activity, plan_gateway: IPlanGateway, client_di
def _get_modified_activities_and_paths(repository, activity_gateway) -> Tuple[Set[Activity], Set[str]]:
"""Return latest activities that one of their inputs is modified."""
all_activities = activity_gateway.get_all_activities()
relevant_activities = set()
for activity in all_activities:
add_activity_if_recent(activity, relevant_activities)
relevant_activities = filter_overridden_activities(all_activities)
modified, _ = get_modified_activities(activities=list(relevant_activities), repository=repository)
return {a for a, _ in modified if _is_activity_valid(a)}, {e.path for _, e in modified}

Expand Down
13 changes: 11 additions & 2 deletions renku/core/management/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,9 @@ def update_dataset_local_files(self, records: List[DynamicProxy], delete=False):

try:
communication.start_progress(progress_text, len(records))
check_paths = []
records_to_check = []

for file in records:
communication.update_progress(progress_text, 1)

Expand All @@ -813,7 +816,13 @@ def update_dataset_local_files(self, records: List[DynamicProxy], delete=False):
deleted_files.append(file)
continue

current_checksum = self.repository.get_object_hash(revision="HEAD", path=file.entity.path)
check_paths.append(file.entity.path)
records_to_check.append(file)

checksums = self.repository.get_object_hashes(check_paths)

for file in records_to_check:
current_checksum = checksums.get(file.entity.path)
if not current_checksum:
deleted_files.append(file)
elif current_checksum != file.entity.checksum:
Expand Down Expand Up @@ -959,7 +968,7 @@ def _create_pointer_file(self, target, checksum=None):

def _calculate_checksum(self, filepath):
try:
return self.repository.hash_object(filepath)
return self.repository.hash_objects([filepath])[0]
except errors.GitCommandError:
raise

Expand Down
152 changes: 148 additions & 4 deletions renku/core/metadata/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@
import tempfile
from collections import defaultdict
from datetime import datetime
from functools import lru_cache
from itertools import zip_longest
from pathlib import Path
from typing import Any, BinaryIO, Callable, Dict, Generator, List, NamedTuple, Optional, Tuple, Union
from typing import Any, BinaryIO, Callable, Dict, Generator, List, NamedTuple, Optional, Set, Tuple, Union

import git

Expand Down Expand Up @@ -97,6 +98,7 @@ def remotes(self) -> "RemoteManager":
return RemoteManager(self._repository)

@property
@lru_cache()
def submodules(self) -> "SubmoduleManager":
"""Return a list of submodules."""
return SubmoduleManager(self._repository)
Expand Down Expand Up @@ -419,6 +421,92 @@ def get_content_from_submodules():

raise errors.ExportError(f"File not found in the repository: '{revision}/{checksum}:{path}'")

def get_object_hashes(self, paths: List[Union[Path, str]], revision: str = None) -> Dict[str, str]:
Panaetius marked this conversation as resolved.
Show resolved Hide resolved
"""Return git hash of an object in a Repo or its submodule.

NOTE: path must be relative to the repo's root regardless if this function is called from a subdirectory or not.
"""

def _get_uncommitted_file_hashes(paths: Set[Union[Path, str]]) -> Dict[str, str]:
"""Get hashes for all modified/uncommitted/staged files."""
staged_files = [d.a_path for d in self.staged_changes] if self.head.is_valid() else []
modified_files = [item.b_path for item in self.unstaged_changes if not item.deleted]
dirty_files = {os.path.join(self.path, p) for p in self.untracked_files + modified_files + staged_files}
dirty_files = {p for p in dirty_files if p in paths and not os.path.isdir(p)}
dirty_files = list(dirty_files)

dirty_files_hashes = Repository.hash_objects(dirty_files)
return dict(zip(dirty_files, dirty_files_hashes))

def _get_hashes_from_revision(
paths: Set[Union[Path, str]], revision: str, repository: BaseRepository
) -> Dict[str, str]:
"""Get hashes for paths in a specific revision."""
existing_paths = repository.get_existing_paths_in_revision(paths, revision=revision)
result = {}
for batch in split_paths(*existing_paths):
hashes = self.run_git_command("rev-parse", *[f"{revision}:{relative_path}" for relative_path in batch])
result.update(zip(batch, hashes.splitlines()))

for path in paths:
if path not in result:
result[path] = None

return result

path_mapping = {get_absolute_path(path, self.path): path for path in paths}
absolute_paths = set(path_mapping.keys())

hashes = {}
# NOTE: If revision is not specified, we use hash-object to hash the (possibly) modified object
if not revision:
uncommitted_hashes = _get_uncommitted_file_hashes(absolute_paths)

hashes.update({path_mapping[p]: h for p, h in uncommitted_hashes.items()})

if len(hashes) == len(absolute_paths):
# NOTE: there were only uncommitted files
return hashes

revision = "HEAD"
absolute_paths = {p for p in absolute_paths if p not in uncommitted_hashes}

submodule_paths = defaultdict(list)
main_repo_paths = []

if len(self.submodules) > 0:
# NOTE: filter paths belonging to main repo from those belonging to submodules
for absolute_path in absolute_paths:
found = False
for submodule in self.submodules:
try:
Path(absolute_path).relative_to(submodule.path)
submodule_paths[submodule].append(absolute_path)
Panaetius marked this conversation as resolved.
Show resolved Hide resolved
found = True
break
except ValueError:
continue

if not found:
main_repo_paths.append(os.path.relpath(absolute_path, start=self.path))
else:
main_repo_paths = list(map(lambda p: os.path.relpath(p, start=self.path), absolute_paths))

if main_repo_paths:
# NOTE: Get hashes for paths in the main repository
revision_hashes = _get_hashes_from_revision(main_repo_paths, revision, self)
hashes.update({path_mapping[get_absolute_path(p, self.path)]: h for p, h in revision_hashes.items()})

if not submodule_paths:
return hashes

# NOTE: Get hashes for paths in submodules
for submodule, submodule_paths in submodule_paths.items():
submodule_hashes = submodule.get_object_hashes(paths=submodule_paths, revision="HEAD")
hashes.update({path_mapping[get_absolute_path(p, self.path)]: h for p, h in submodule_hashes.items()})

return hashes

def get_object_hash(self, path: Union[Path, str], revision: Union["Commit", str] = None) -> Optional[str]:
"""Return git hash of an object in a Repo or its submodule.

Expand Down Expand Up @@ -513,11 +601,59 @@ def get_global_configuration(writable: bool = False) -> "Configuration":
"""Return global git configuration."""
return Configuration(repository=None, writable=writable)

def get_existing_paths_in_revision(
self, paths: Union[List[Union[Path, str]], Set[Union[Path, str]]] = None, revision: str = "HEAD"
) -> List[str]:
"""List all paths that exist in a revision."""

try:
if paths:
dirs = []
files = []

for path in paths:
if os.path.isdir(path):
dirs.append(path)
else:
files.append(path)
result = []
if files:
# NOTE: check existing files
for batch in split_paths(*files):
existing_paths = git.Git(working_dir=self.path).ls_tree(*batch, r=revision, name_only=True)
result.extend(existing_paths.splitlines())

if dirs:
# NOTE: check existing dirs
for batch in split_paths(*dirs):
existing_paths = git.Git(working_dir=self.path).ls_tree(
*batch, d=True, r=revision, name_only=True
)
result.extend(existing_paths.splitlines())

return result
else:
existing_files = git.Git().ls_tree(r=revision, name_only=True).splitlines()
existing_dirs = git.Git().ls_tree(r=revision, name_only=True, d=True).splitlines()
return existing_dirs + existing_files
except git.GitCommandError as e:
raise errors.GitCommandError(
message=f"Git command failed: {str(e)}",
command=e.command,
stdout=e.stdout,
stderr=e.stderr,
status=e.status,
) from e

@staticmethod
def hash_object(path: Union[Path, str]) -> str:
"""Create a git hash for a path. The path doesn't need to be in a repository."""
def hash_objects(paths: List[Union[Path, str]]) -> List[str]:
"""Create a git hash for a list of paths. The paths don't need to be in a repository."""
hashes = []
try:
return git.Git().hash_object(path)
for batch in split_paths(*paths):
calculated_hashes = git.Git().hash_object(*batch)
hashes.extend(calculated_hashes.splitlines())
return hashes
except git.GitCommandError as e:
raise errors.GitCommandError(
message=f"Git command failed: {str(e)}",
Expand All @@ -527,6 +663,14 @@ def hash_object(path: Union[Path, str]) -> str:
status=e.status,
) from e

@staticmethod
def hash_object(path: Union[Path, str]) -> str:
"""Create a git hash for a a path. The path doesn't need to be in a repository."""
result = BaseRepository.hash_objects([path])

if result and len(result) > 0:
return result[0]


class Repository(BaseRepository):
"""Abstract Base repository."""
Expand Down
45 changes: 43 additions & 2 deletions renku/core/utils/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,18 @@ def get_modified_activities(
modified = set()
deleted = set()

checksum_cache = {}
paths = []

for activity in activities:
for usage in activity.usages:
paths.append(usage.entity.path)

hashes = repository.get_object_hashes(paths=paths, revision="HEAD")

for activity in activities:
for usage in activity.usages:
entity = usage.entity
current_checksum = checksum_cache.setdefault(entity.path, repository.get_object_hash(path=entity.path))
current_checksum = hashes.get(entity.path, None)
if current_checksum is None:
deleted.add((activity, entity))
elif current_checksum != entity.checksum:
Expand All @@ -101,6 +107,41 @@ def get_modified_activities(
return modified, deleted


def filter_overridden_activities(activities: List["Activity"]) -> List["Activity"]:
"""Filter out overridden activities from a list of activities."""
relevant_activities = {}

for activity in activities[::-1]:
outputs = frozenset(g.entity.path for g in activity.generations)

subset_of = set()
superset_of = set()

for k, a in relevant_activities.items():
if outputs.issubset(k):
subset_of.add((k, a))
elif outputs.issuperset(k):
superset_of.add((k, a))

if not subset_of and not superset_of:
relevant_activities[outputs] = activity
continue

if subset_of and any(activity.ended_at_time < s.ended_at_time for _, s in subset_of):
# activity is a subset of another, newer activity, ignore it
continue

older_subsets = [k for k, s in superset_of if activity.ended_at_time > s.ended_at_time]

for older_subset in older_subsets:
# remove other activities that this activity is a superset of
del relevant_activities[older_subset]

relevant_activities[outputs] = activity

return list(relevant_activities.values())


def add_activity_if_recent(activity: "Activity", activities: Set["Activity"]):
"""Add ``activity`` to ``activities`` if it's not in the set or is the latest executed instance."""
if activity in activities:
Expand Down