Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core): improve renku status performance #2482

Merged
merged 5 commits into from
Dec 1, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 4 additions & 5 deletions renku/core/commands/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from renku.core.management.interface.client_dispatcher import IClientDispatcher
from renku.core.models.entity import Entity
from renku.core.models.provenance.activity import Activity
from renku.core.utils.metadata import add_activity_if_recent, get_modified_activities
from renku.core.utils.metadata import filter_overridden_activities, get_modified_activities
from renku.core.utils.os import get_relative_path_to_cwd, get_relative_paths


Expand Down Expand Up @@ -89,9 +89,8 @@ def _get_modified_paths(activity_gateway, repository) -> Tuple[Set[Tuple[Activit
"""Get modified and deleted usages/inputs of a list of activities."""
all_activities = activity_gateway.get_all_activities()

relevant_activities = set()
for activity in all_activities:
add_activity_if_recent(activity, relevant_activities)
modified, deleted = get_modified_activities(activities=list(relevant_activities), repository=repository)
relevant_activities = filter_overridden_activities(all_activities)

modified, deleted = get_modified_activities(activities=relevant_activities, repository=repository)

return modified, {e.path for _, e in deleted}
6 changes: 2 additions & 4 deletions renku/core/commands/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from renku.core.management.interface.plan_gateway import IPlanGateway
from renku.core.management.workflow.activity import sort_activities
from renku.core.models.provenance.activity import Activity
from renku.core.utils.metadata import add_activity_if_recent, get_modified_activities
from renku.core.utils.metadata import add_activity_if_recent, filter_overridden_activities, get_modified_activities
from renku.core.utils.os import get_relative_paths


Expand Down Expand Up @@ -109,9 +109,7 @@ def _is_activity_valid(activity: Activity, plan_gateway: IPlanGateway, client_di
def _get_modified_activities_and_paths(repository, activity_gateway) -> Tuple[Set[Activity], Set[str]]:
"""Return latest activities that one of their inputs is modified."""
all_activities = activity_gateway.get_all_activities()
relevant_activities = set()
for activity in all_activities:
add_activity_if_recent(activity, relevant_activities)
relevant_activities = filter_overridden_activities(all_activities)
modified, _ = get_modified_activities(activities=list(relevant_activities), repository=repository)
return {a for a, _ in modified if _is_activity_valid(a)}, {e.path for _, e in modified}

Expand Down
13 changes: 11 additions & 2 deletions renku/core/management/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,9 @@ def update_dataset_local_files(self, records: List[DynamicProxy], delete=False):

try:
communication.start_progress(progress_text, len(records))
check_paths = []
records_to_check = []

for file in records:
communication.update_progress(progress_text, 1)

Expand All @@ -813,7 +816,13 @@ def update_dataset_local_files(self, records: List[DynamicProxy], delete=False):
deleted_files.append(file)
continue

current_checksum = self.repository.get_object_hash(revision="HEAD", path=file.entity.path)
check_paths.append(file.entity.path)
records_to_check.append(file)

checksums = self.repository.get_object_hashes(check_paths)

for file in records_to_check:
current_checksum = checksums.get(file.entity.path)
if not current_checksum:
deleted_files.append(file)
elif current_checksum != file.entity.checksum:
Expand Down Expand Up @@ -959,7 +968,7 @@ def _create_pointer_file(self, target, checksum=None):

def _calculate_checksum(self, filepath):
try:
return self.repository.hash_object(filepath)
return self.repository.hash_objects([filepath])[0]
except errors.GitCommandError:
raise

Expand Down
126 changes: 123 additions & 3 deletions renku/core/metadata/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import tempfile
from collections import defaultdict
from datetime import datetime
from functools import lru_cache
from itertools import zip_longest
from pathlib import Path
from typing import Any, BinaryIO, Callable, Dict, Generator, List, NamedTuple, Optional, Tuple, Union
Expand Down Expand Up @@ -97,6 +98,7 @@ def remotes(self) -> "RemoteManager":
return RemoteManager(self._repository)

@property
@lru_cache()
def submodules(self) -> "SubmoduleManager":
"""Return a list of submodules."""
return SubmoduleManager(self._repository)
Expand Down Expand Up @@ -423,6 +425,65 @@ def get_content_from_submodules():

raise errors.ExportError(f"File not found in the repository: '{revision}/{checksum}:{path}'")

def get_object_hashes(self, paths: List[Union[Path, str]], revision: str = None) -> Dict[str, str]:
Panaetius marked this conversation as resolved.
Show resolved Hide resolved
"""Return git hash of an object in a Repo or its submodule.

NOTE: path must be relative to the repo's root regardless if this function is called from a subdirectory or not.
"""
absolute_paths = set(get_absolute_path(path, self.path) for path in paths)

hashes = {}
# NOTE: If revision is not specified, we use hash-object to hash the (possibly) modified object
if not revision:
staged_files = [d.a_path for d in self.staged_changes] if self.head.is_valid() else []
modified_files = [item.b_path for item in self.unstaged_changes if item.b_path]
Panaetius marked this conversation as resolved.
Show resolved Hide resolved
dirty_files = {os.path.join(self.path, p) for p in self.untracked_files + modified_files + staged_files}
dirty_files = {p for p in dirty_files if p in absolute_paths and not os.path.isdir(p)}
clean_files = {p for p in absolute_paths if p not in dirty_files}
dirty_files = list(dirty_files)

dirty_files_hashes = Repository.hash_objects(dirty_files)
hashes.update(zip(dirty_files, dirty_files_hashes))

if not clean_files:
return hashes
revision = "HEAD"
absolute_paths = clean_files

submodule_paths = defaultdict(list)
main_repo_paths = []

if self.submodules:
Panaetius marked this conversation as resolved.
Show resolved Hide resolved
for absolute_path in absolute_paths:
for submodule in self.submodules:
try:
Path(absolute_path).relative_to(submodule.path)
submodule_paths[submodule].append(absolute_path)
Panaetius marked this conversation as resolved.
Show resolved Hide resolved
except ValueError:
main_repo_paths.append(os.path.relpath(absolute_path, start=self.path))
Panaetius marked this conversation as resolved.
Show resolved Hide resolved
else:
main_repo_paths = map(lambda p: os.path.relpath(p, start=self.path), absolute_paths)

if main_repo_paths:
main_repo_paths = Repository.list_paths_in_revision(
main_repo_paths, revision=revision, working_dir=self.path
)
Panaetius marked this conversation as resolved.
Show resolved Hide resolved

for batch in split_paths(*main_repo_paths):
main_repo_hashes = self.run_git_command(
"rev-parse", *[f"{revision}:{relative_path}" for relative_path in batch]
)
hashes.update(zip(batch, main_repo_hashes.splitlines()))

if not submodule_paths:
return hashes

for submodule, submodule_paths in submodule_paths.items():
submodule_hashes = submodule.get_object_hashes(paths=submodule_paths, revision="HEAD")
hashes.update(submodule_hashes)

return hashes

def get_object_hash(self, path: Union[Path, str], revision: str = None) -> Optional[str]:
"""Return git hash of an object in a Repo or its submodule.

Expand All @@ -433,7 +494,7 @@ def get_object_hash(self, path: Union[Path, str], revision: str = None) -> Optio
# NOTE: If revision is not specified, we use hash-object to hash the (possibly) modified object
if not revision:
try:
return Repository.hash_object(absolute_path)
return Repository.hash_objects([absolute_path])[0]
Panaetius marked this conversation as resolved.
Show resolved Hide resolved
except errors.GitCommandError:
# NOTE: If object does not exist anymore, hash-object doesn't work, fall back to rev-parse
revision = "HEAD"
Expand Down Expand Up @@ -518,10 +579,69 @@ def get_global_configuration(writable: bool = False) -> "Configuration":
return Configuration(repository=None, writable=writable)

@staticmethod
def hash_object(path: Union[Path, str]) -> str:
def list_paths_in_revision(
paths: List[Union[Path, str]] = None, revision: str = "HEAD", working_dir="."
) -> List[str]:
"""List all paths that exist in a revision.

Warning: ls-tree is sensitive to the current working directory, make sure to set working_dir to repo root.
Panaetius marked this conversation as resolved.
Show resolved Hide resolved
"""
dirs = []
files = []

for path in paths:
if os.path.isdir(path):
dirs.append(path)
else:
files.append(path)

if dirs or files:
result = []
try:
if files:
for batch in split_paths(*files):
existing_paths = git.Git(working_dir=working_dir).ls_tree(*batch, r=revision, name_only=True)
result.extend(existing_paths.splitlines())

if dirs:
for batch in split_paths(*dirs):
existing_paths = git.Git(working_dir=working_dir).ls_tree(
*batch, d=True, r=revision, name_only=True
)
result.extend(existing_paths.splitlines())

return result
except git.GitCommandError as e:
raise errors.GitCommandError(
message=f"Git command failed: {str(e)}",
command=e.command,
stdout=e.stdout,
stderr=e.stderr,
status=e.status,
) from e
else:
try:
existing_files = git.Git().ls_tree(r=revision, name_only=True).splitlines()
existing_dirs = git.Git().ls_tree(r=revision, name_only=True, d=True).splitlines()
return existing_dirs + existing_files
except git.GitCommandError as e:
raise errors.GitCommandError(
message=f"Git command failed: {str(e)}",
command=e.command,
stdout=e.stdout,
stderr=e.stderr,
status=e.status,
) from e
Panaetius marked this conversation as resolved.
Show resolved Hide resolved

@staticmethod
def hash_objects(paths: List[Union[Path, str]]) -> List[str]:
"""Create a git hash for a path. The path doesn't need to be in a repository."""
hashes = []
try:
return git.Git().hash_object(path)
for batch in split_paths(*paths):
calculated_hashes = git.Git().hash_object(*batch)
hashes.extend(calculated_hashes.splitlines())
return hashes
except git.GitCommandError as e:
raise errors.GitCommandError(
message=f"Git command failed: {str(e)}",
Expand Down
46 changes: 44 additions & 2 deletions renku/core/utils/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,18 @@ def get_modified_activities(
modified = set()
deleted = set()

checksum_cache = {}
paths = []

for activity in activities:
for usage in activity.usages:
paths.append(usage.entity.path)

hashes = repository.get_object_hashes(paths=paths, revision="HEAD")

for activity in activities:
for usage in activity.usages:
entity = usage.entity
current_checksum = checksum_cache.setdefault(entity.path, repository.get_object_hash(path=entity.path))
current_checksum = hashes.get(entity.path, None)
if current_checksum is None:
deleted.add((activity, entity))
elif current_checksum != entity.checksum:
Expand All @@ -101,6 +107,42 @@ def get_modified_activities(
return modified, deleted


def filter_overridden_activities(activities: List["Activity"]) -> List["Activity"]:
"""Filter out overridden activities from a list of activities."""
relevant_activities = {}

for activity in activities[::-1]:
outputs = frozenset(g.entity.path for g in activity.generations)

subset_of = []
superset_of = []

for k, a in relevant_activities.items():
if outputs.issubset(k):
subset_of.append((k, a))
elif outputs.issuperset(k):
superset_of.append((k, a))

if not subset_of and not superset_of:
relevant_activities[outputs] = activity
continue

subset_of = [(k, a) for k, a in relevant_activities.items() if outputs.issubset(k) or outputs.issuperset(k)]
Panaetius marked this conversation as resolved.
Show resolved Hide resolved

if subset_of and any(activity.ended_at_time < s.ended_at_time for _, s in subset_of):
# activity is a subset of another, newer activity, ignore it
continue

older_subsets = [k for k, s in superset_of if activity.ended_at_time > s.ended_at_time]

for older_subset in older_subsets:
del relevant_activities[older_subset]

relevant_activities[outputs] = activity

return relevant_activities.values()
Panaetius marked this conversation as resolved.
Show resolved Hide resolved


def add_activity_if_recent(activity: "Activity", activities: Set["Activity"]):
"""Add ``activity`` to ``activities`` if it's not in the set or is the latest executed instance."""
if activity in activities:
Expand Down
14 changes: 7 additions & 7 deletions tests/core/metadata/test_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ def test_hash_objects(git_repository, path):

assert committed_object_hash == git_repository.get_object_hash(path, revision="HEAD")
assert committed_object_hash == git_repository.get_object_hash(git_repository.path / path, revision="HEAD")
assert committed_object_hash == Repository.hash_object(path)
assert committed_object_hash == Repository.hash_object(git_repository.path / path)
assert committed_object_hash == Repository.hash_objects([path])[0]
assert committed_object_hash == Repository.hash_objects([git_repository.path / path])[0]


def test_hash_modified_objects(git_repository):
Expand All @@ -181,14 +181,14 @@ def test_hash_modified_objects(git_repository):
# current object's hash if revision is None
assert committed_object_hash == git_repository.get_object_hash("A", revision="HEAD")
assert modified_object_hash == git_repository.get_object_hash("A", revision=None)
assert modified_object_hash == Repository.hash_object("A")
assert modified_object_hash == Repository.hash_objects(["A"])[0]

# NOTE: Returned results are the same if object is staged
git_repository.add("A")

assert committed_object_hash == git_repository.get_object_hash("A", revision="HEAD")
assert modified_object_hash == git_repository.get_object_hash("A")
assert modified_object_hash == Repository.hash_object("A")
assert modified_object_hash == Repository.hash_objects(["A"])[0]


def test_hash_deleted_objects(git_repository):
Expand All @@ -197,7 +197,7 @@ def test_hash_deleted_objects(git_repository):
assert git_repository.get_object_hash("B") is None

with pytest.raises(errors.GitCommandError):
Repository.hash_object("B")
Repository.hash_objects(["B"])[0]


def test_hash_directories(git_repository):
Expand All @@ -209,7 +209,7 @@ def test_hash_directories(git_repository):
assert git_repository.get_object_hash("X") is None

with pytest.raises(errors.GitCommandError):
Repository.hash_object("X")
Repository.hash_objects(["X"])[0]

# NOTE: When staging a directory then the hash can be calculated
git_repository.add("X")
Expand All @@ -226,4 +226,4 @@ def test_hash_directories(git_repository):
assert directory_hash == git_repository.get_object_hash("X")

with pytest.raises(errors.GitCommandError):
Repository.hash_object("X")
Repository.hash_objects(["X"])[0]