From 85ab14199c83815f6d3a63fd6dc721638b3b4c95 Mon Sep 17 00:00:00 2001 From: Ralf Grubenmann Date: Wed, 1 Mar 2023 15:36:18 +0100 Subject: [PATCH] fix: parallelize dataset add for performance (#3338) --- renku/core/dataset/dataset.py | 18 +++++++++++++----- renku/core/dataset/dataset_add.py | 27 ++++++++++++++++++++------- renku/core/util/git.py | 11 ++++++++--- renku/domain_model/dataset.py | 12 ++++++++++-- 4 files changed, 51 insertions(+), 17 deletions(-) diff --git a/renku/core/dataset/dataset.py b/renku/core/dataset/dataset.py index 906e3269c3..28bf9fae57 100644 --- a/renku/core/dataset/dataset.py +++ b/renku/core/dataset/dataset.py @@ -802,9 +802,12 @@ def add_datadir_files_to_dataset(dataset: Dataset) -> None: # NOTE: Add existing files to dataset dataset_files: List[DatasetFile] = [] files: List[Path] = [] - for file in get_files(datadir): + existing_files: List[Union[Path, str]] = list(get_files(datadir)) + checksums = project_context.repository.get_object_hashes(existing_files) + + for file in cast(List[Path], existing_files): files.append(file) - dataset_files.append(DatasetFile.from_path(path=file, source=file)) + dataset_files.append(DatasetFile.from_path(path=file, source=file, checksum=checksums.get(file))) if not dataset_files: return @@ -907,11 +910,14 @@ def move_files(dataset_gateway: IDatasetGateway, files: Dict[Path, Path], to_dat progress_name = "Updating dataset metadata" communication.start_progress(progress_name, total=len(files)) try: + checksums = project_context.repository.get_object_hashes( + [file.relative_to(project_context.path) for file in files.values()] + ) for src, dst in files.items(): src = src.relative_to(project_context.path) dst = dst.relative_to(project_context.path) # NOTE: Files are moved at this point, so, we can use dst - new_dataset_file = DatasetFile.from_path(dst) + new_dataset_file = DatasetFile.from_path(dst, checksum=checksums.get(dst)) for dataset in datasets: removed = dataset.unlink_file(src, missing_ok=True) @@ -1007,9 +1013,11 @@ def update_dataset_local_files( def _update_datasets_files_metadata(updated_files: List[DynamicProxy], deleted_files: List[DynamicProxy], delete: bool): modified_datasets = {} - + checksums = project_context.repository.get_object_hashes([file.entity.path for file in updated_files]) for file in updated_files: - new_file = DatasetFile.from_path(path=file.entity.path, based_on=file.based_on, source=file.source) + new_file = DatasetFile.from_path( + path=file.entity.path, based_on=file.based_on, source=file.source, checksum=checksums.get(file.entity.path) + ) modified_datasets[file.dataset.name] = file.dataset file.dataset.add_or_update_files(new_file) diff --git a/renku/core/dataset/dataset_add.py b/renku/core/dataset/dataset_add.py index ac047b5fc4..5654033b76 100644 --- a/renku/core/dataset/dataset_add.py +++ b/renku/core/dataset/dataset_add.py @@ -299,9 +299,9 @@ def get_upload_uri(dataset: Dataset, entity_path: Union[Path, str]) -> str: def move_files_to_dataset(dataset: Dataset, files: List[DatasetAddMetadata]): """Copy/Move files into a dataset's directory.""" - def move_file(file: DatasetAddMetadata, storage: Optional[IStorage]): + def move_file(file: DatasetAddMetadata, storage: Optional[IStorage]) -> bool: if not file.has_action: - return + return False if file.action in ( DatasetAddAction.COPY, @@ -350,9 +350,6 @@ def move_file(file: DatasetAddMetadata, storage: Optional[IStorage]): else: raise - if track_in_lfs and not dataset.storage: - track_paths_in_storage(file.destination) - # NOTE: We always copy the files to the dataset's data dir. If dataset has a storage backend, we also upload the # file to the remote storage. if storage: @@ -367,14 +364,22 @@ def move_file(file: DatasetAddMetadata, storage: Optional[IStorage]): file.based_on = RemoteEntity(url=file_uri, path=file.entity_path, checksum=md5_hash) + return track_in_lfs + dataset_storage = None if dataset.storage: provider = ProviderFactory.get_storage_provider(uri=dataset.storage) dataset_storage = provider.get_storage() + lfs_files = [] + for dataset_file in files: # TODO: Parallelize copy/download/upload - move_file(file=dataset_file, storage=dataset_storage) + if move_file(file=dataset_file, storage=dataset_storage): + lfs_files.append(dataset_file.destination) + + if lfs_files and not dataset.storage: + track_paths_in_storage(*lfs_files) def add_files_to_repository(dataset: Dataset, files: List[DatasetAddMetadata]): @@ -401,8 +406,16 @@ def add_files_to_repository(dataset: Dataset, files: List[DatasetAddMetadata]): def update_dataset_metadata(dataset: Dataset, files: List[DatasetAddMetadata], clear_files_before: bool): """Add newly-added files to the dataset's metadata.""" dataset_files = [] + repo_paths: List[Union[Path, str]] = [ + file.entity_path for file in files if (project_context.path / file.entity_path).exists() + ] + + checksums = project_context.repository.get_object_hashes(repo_paths) + for file in files: - dataset_file = DatasetFile.from_path(path=file.entity_path, source=file.url, based_on=file.based_on) + dataset_file = DatasetFile.from_path( + path=file.entity_path, source=file.url, based_on=file.based_on, checksum=checksums.get(file.entity_path) + ) dataset_files.append(dataset_file) if clear_files_before: diff --git a/renku/core/util/git.py b/renku/core/util/git.py index 1dedf37018..d69ae76001 100644 --- a/renku/core/util/git.py +++ b/renku/core/util/git.py @@ -365,7 +365,11 @@ def is_path_safe(path: Union[Path, str]) -> bool: def get_entity_from_revision( - repository: "Repository", path: Union[Path, str], revision: Optional[str] = None, bypass_cache: bool = False + repository: "Repository", + path: Union[Path, str], + revision: Optional[str] = None, + bypass_cache: bool = False, + checksum: Optional[str] = None, ) -> "Entity": """Return an Entity instance from given path and revision. @@ -374,7 +378,7 @@ def get_entity_from_revision( path(Union[Path, str]): The path of the entity. revision(str, optional): The revision to check at (Default value = None). bypass_cache(bool): Whether to ignore cached entries and get information from disk (Default value = False). - + checksum(str, optional): Pre-calculated checksum for performance reasons, will be calculated if not set. Returns: Entity: The Entity for the given path and revision. @@ -407,7 +411,8 @@ def get_directory_members(absolute_path: Path) -> List[Entity]: return cached_entry # NOTE: For untracked directory the hash is None; make sure to stage them first before calling this function. - checksum = repository.get_object_hash(revision=revision, path=path) + if not checksum: + checksum = repository.get_object_hash(revision=revision, path=path) # NOTE: If object was not found at a revision it's either removed or exists in a different revision; keep the # entity and use revision as checksum if isinstance(revision, str) and revision == "HEAD": diff --git a/renku/domain_model/dataset.py b/renku/domain_model/dataset.py index b55ae2250d..e176cfd94e 100644 --- a/renku/domain_model/dataset.py +++ b/renku/domain_model/dataset.py @@ -259,7 +259,13 @@ def __init__( self.source: Optional[str] = str(source) @classmethod - def from_path(cls, path: Union[str, Path], source=None, based_on: Optional[RemoteEntity] = None) -> "DatasetFile": + def from_path( + cls, + path: Union[str, Path], + source=None, + based_on: Optional[RemoteEntity] = None, + checksum: Optional[str] = None, + ) -> "DatasetFile": """Return an instance from a path.""" from renku.domain_model.entity import NON_EXISTING_ENTITY_CHECKSUM, Entity @@ -269,7 +275,9 @@ def from_path(cls, path: Union[str, Path], source=None, based_on: Optional[Remot id = Entity.generate_id(checksum=checksum, path=path) entity = Entity(id=id, checksum=checksum, path=path) else: - entity = get_entity_from_revision(repository=project_context.repository, path=path, bypass_cache=True) + entity = get_entity_from_revision( + repository=project_context.repository, path=path, bypass_cache=True, checksum=checksum + ) is_external = is_external_file(path=path, project_path=project_context.path) return cls(entity=entity, is_external=is_external, source=source, based_on=based_on)