Skip to content

Commit

Permalink
fix: parallelize dataset add for performance
Browse files Browse the repository at this point in the history
  • Loading branch information
Ralf Grubenmann committed Feb 28, 2023
1 parent 994077f commit 0f8886e
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 11 deletions.
18 changes: 13 additions & 5 deletions renku/core/dataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -802,9 +802,12 @@ def add_datadir_files_to_dataset(dataset: Dataset) -> None:
# NOTE: Add existing files to dataset
dataset_files: List[DatasetFile] = []
files: List[Path] = []
for file in get_files(datadir):
existing_files: List[Union[Path, str]] = list(get_files(datadir))
checksums = project_context.repository.get_object_hashes(existing_files)

for file in cast(List[Path], existing_files):
files.append(file)
dataset_files.append(DatasetFile.from_path(path=file, source=file))
dataset_files.append(DatasetFile.from_path(path=file, source=file, checksum=checksums.get(file)))

if not dataset_files:
return
Expand Down Expand Up @@ -907,11 +910,14 @@ def move_files(dataset_gateway: IDatasetGateway, files: Dict[Path, Path], to_dat
progress_name = "Updating dataset metadata"
communication.start_progress(progress_name, total=len(files))
try:
checksums = project_context.repository.get_object_hashes(
[file.relative_to(project_context.path) for file in files.values()]
)
for src, dst in files.items():
src = src.relative_to(project_context.path)
dst = dst.relative_to(project_context.path)
# NOTE: Files are moved at this point, so, we can use dst
new_dataset_file = DatasetFile.from_path(dst)
new_dataset_file = DatasetFile.from_path(dst, checksum=checksums.get(dst))

for dataset in datasets:
removed = dataset.unlink_file(src, missing_ok=True)
Expand Down Expand Up @@ -1007,9 +1013,11 @@ def update_dataset_local_files(

def _update_datasets_files_metadata(updated_files: List[DynamicProxy], deleted_files: List[DynamicProxy], delete: bool):
modified_datasets = {}

checksums = project_context.repository.get_object_hashes([file.entity.path for file in updated_files])
for file in updated_files:
new_file = DatasetFile.from_path(path=file.entity.path, based_on=file.based_on, source=file.source)
new_file = DatasetFile.from_path(
path=file.entity.path, based_on=file.based_on, source=file.source, checksum=checksums.get(file.entity.path)
)
modified_datasets[file.dataset.name] = file.dataset
file.dataset.add_or_update_files(new_file)

Expand Down
10 changes: 9 additions & 1 deletion renku/core/dataset/dataset_add.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,8 +401,16 @@ def add_files_to_repository(dataset: Dataset, files: List[DatasetAddMetadata]):
def update_dataset_metadata(dataset: Dataset, files: List[DatasetAddMetadata], clear_files_before: bool):
"""Add newly-added files to the dataset's metadata."""
dataset_files = []
repo_paths: List[Union[Path, str]] = [
file.entity_path for file in files if (project_context.path / file.entity_path).exists()
]

checksums = project_context.repository.get_object_hashes(repo_paths)

for file in files:
dataset_file = DatasetFile.from_path(path=file.entity_path, source=file.url, based_on=file.based_on)
dataset_file = DatasetFile.from_path(
path=file.entity_path, source=file.url, based_on=file.based_on, checksum=checksums.get(file.entity_path)
)
dataset_files.append(dataset_file)

if clear_files_before:
Expand Down
11 changes: 8 additions & 3 deletions renku/core/util/git.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,11 @@ def is_path_safe(path: Union[Path, str]) -> bool:


def get_entity_from_revision(
repository: "Repository", path: Union[Path, str], revision: Optional[str] = None, bypass_cache: bool = False
repository: "Repository",
path: Union[Path, str],
revision: Optional[str] = None,
bypass_cache: bool = False,
checksum: Optional[str] = None,
) -> "Entity":
"""Return an Entity instance from given path and revision.
Expand All @@ -374,7 +378,7 @@ def get_entity_from_revision(
path(Union[Path, str]): The path of the entity.
revision(str, optional): The revision to check at (Default value = None).
bypass_cache(bool): Whether to ignore cached entries and get information from disk (Default value = False).
checksum(str, optional): Pre-calculated checksum for performance reasons, will be calculated if not set.
Returns:
Entity: The Entity for the given path and revision.
Expand Down Expand Up @@ -407,7 +411,8 @@ def get_directory_members(absolute_path: Path) -> List[Entity]:
return cached_entry

# NOTE: For untracked directory the hash is None; make sure to stage them first before calling this function.
checksum = repository.get_object_hash(revision=revision, path=path)
if not checksum:
checksum = repository.get_object_hash(revision=revision, path=path)
# NOTE: If object was not found at a revision it's either removed or exists in a different revision; keep the
# entity and use revision as checksum
if isinstance(revision, str) and revision == "HEAD":
Expand Down
12 changes: 10 additions & 2 deletions renku/domain_model/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,13 @@ def __init__(
self.source: Optional[str] = str(source)

@classmethod
def from_path(cls, path: Union[str, Path], source=None, based_on: Optional[RemoteEntity] = None) -> "DatasetFile":
def from_path(
cls,
path: Union[str, Path],
source=None,
based_on: Optional[RemoteEntity] = None,
checksum: Optional[str] = None,
) -> "DatasetFile":
"""Return an instance from a path."""
from renku.domain_model.entity import NON_EXISTING_ENTITY_CHECKSUM, Entity

Expand All @@ -269,7 +275,9 @@ def from_path(cls, path: Union[str, Path], source=None, based_on: Optional[Remot
id = Entity.generate_id(checksum=checksum, path=path)
entity = Entity(id=id, checksum=checksum, path=path)
else:
entity = get_entity_from_revision(repository=project_context.repository, path=path, bypass_cache=True)
entity = get_entity_from_revision(
repository=project_context.repository, path=path, bypass_cache=True, checksum=checksum
)

is_external = is_external_file(path=path, project_path=project_context.path)
return cls(entity=entity, is_external=is_external, source=source, based_on=based_on)
Expand Down

0 comments on commit 0f8886e

Please sign in to comment.