diff --git a/renku/core/storage.py b/renku/core/storage.py index d2429c589d..c42df05e2f 100644 --- a/renku/core/storage.py +++ b/renku/core/storage.py @@ -15,7 +15,6 @@ # limitations under the License. """Logic for handling a data storage.""" -import csv import functools import itertools import os @@ -26,7 +25,7 @@ from pathlib import Path from shutil import move, which from subprocess import PIPE, STDOUT, check_output, run -from typing import TYPE_CHECKING, List, Optional, Tuple +from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union import pathspec @@ -39,7 +38,6 @@ from renku.domain_model.project_context import project_context if TYPE_CHECKING: - from renku.domain_model.entity import Entity # type: ignore from renku.infrastructure.repository import Repository @@ -99,12 +97,12 @@ def wrapper(*args, **kwargs): @functools.lru_cache -def storage_installed(): +def storage_installed() -> bool: """Verify that git-lfs is installed and on system PATH.""" return bool(which("git-lfs")) -def storage_installed_locally(): +def storage_installed_locally() -> bool: """Verify that git-lfs is installed for the project.""" repo_config = project_context.repository.get_configuration(scope="local") return repo_config.has_section('filter "lfs"') @@ -129,7 +127,7 @@ def check_external_storage(): return is_storage_installed -def renku_lfs_ignore(): +def renku_lfs_ignore() -> pathspec.PathSpec: """Gets pathspec for files to not add to LFS.""" ignore_path = project_context.path / RENKU_LFS_IGNORE_PATH @@ -141,14 +139,14 @@ def renku_lfs_ignore(): return pathspec.PathSpec.from_lines("renku_gitwildmatch", lines) -def get_minimum_lfs_file_size(): +def get_minimum_lfs_file_size() -> int: """The minimum size of a file in bytes to be added to lfs.""" size = get_value("renku", "lfs_threshold") return parse_file_size(size) -def init_external_storage(force=False): +def init_external_storage(force: bool = False) -> None: """Initialize the external storage for data.""" try: result = run( @@ -166,13 +164,13 @@ def init_external_storage(force=False): @check_external_storage_wrapper -def track_paths_in_storage(*paths): +def track_paths_in_storage(*paths: Union[Path, str]) -> Optional[List[str]]: """Track paths in the external storage.""" if not project_context.external_storage_requested or not check_external_storage(): - return + return None # Calculate which paths can be tracked in lfs - track_paths = [] + track_paths: List[str] = [] attrs = project_context.repository.get_attributes(*paths) for path in paths: @@ -210,7 +208,7 @@ def track_paths_in_storage(*paths): universal_newlines=True, ) - if result.returncode != 0: + if result and result.returncode != 0: raise errors.GitLFSError(f"Error executing 'git lfs track: \n {result.stdout}") except (KeyboardInterrupt, OSError) as e: raise errors.ParameterError(f"Couldn't run 'git lfs':\n{e}") @@ -227,7 +225,7 @@ def track_paths_in_storage(*paths): @check_external_storage_wrapper -def untrack_paths_from_storage(*paths): +def untrack_paths_from_storage(*paths: Union[Path, str]) -> None: """Untrack paths from the external storage.""" try: result = run_command( @@ -239,25 +237,25 @@ def untrack_paths_from_storage(*paths): universal_newlines=True, ) - if result.returncode != 0: + if result and result.returncode != 0: raise errors.GitLFSError(f"Error executing 'git lfs untrack: \n {result.stdout}") except (KeyboardInterrupt, OSError) as e: raise errors.ParameterError(f"Couldn't run 'git lfs':\n{e}") @check_external_storage_wrapper -def list_tracked_paths(): +def list_tracked_paths() -> List[Path]: """List paths tracked in lfs.""" try: files = check_output(_CMD_STORAGE_LIST, cwd=project_context.path, encoding="UTF-8") except (KeyboardInterrupt, OSError) as e: raise errors.ParameterError(f"Couldn't run 'git lfs ls-files':\n{e}") - files_split = [project_context.path / f for f in files.splitlines()] + files_split: List[Path] = [project_context.path / f for f in files.splitlines()] return files_split @check_external_storage_wrapper -def list_unpushed_lfs_paths(repository: "Repository"): +def list_unpushed_lfs_paths(repository: "Repository") -> List[Path]: """List paths tracked in lfs for a repository.""" if len(repository.remotes) < 1 or (repository.active_branch and not repository.active_branch.remote_branch): @@ -279,7 +277,7 @@ def list_unpushed_lfs_paths(repository: "Repository"): @check_external_storage_wrapper -def pull_paths_from_storage(repository: "Repository", *paths): +def pull_paths_from_storage(repository: "Repository", *paths: Union[Path, str]): """Pull paths from LFS.""" project_dict = defaultdict(list) @@ -304,19 +302,19 @@ def pull_paths_from_storage(repository: "Repository", *paths): universal_newlines=True, ) - if result.returncode != 0: + if result and result.returncode != 0: raise errors.GitLFSError(f"Cannot pull LFS objects from server:\n {result.stdout}") @check_external_storage_wrapper -def clean_storage_cache(*check_paths): +def clean_storage_cache(*check_paths: Union[Path, str]) -> Tuple[List[str], List[str]]: """Remove paths from lfs cache.""" project_dict = defaultdict(list) - repositories = {} - tracked_paths = {} - unpushed_paths = {} - untracked_paths = [] - local_only_paths = [] + repositories: Dict[Path, "Repository"] = {} + tracked_paths: Dict[Path, List[Path]] = {} + unpushed_paths: Dict[Path, List[Path]] = {} + untracked_paths: List[str] = [] + local_only_paths: List[str] = [] repository = project_context.repository @@ -386,7 +384,7 @@ def clean_storage_cache(*check_paths): @check_external_storage_wrapper -def checkout_paths_from_storage(*paths): +def checkout_paths_from_storage(*paths: Union[Path, str]): """Checkout a paths from LFS.""" result = run_command( _CMD_STORAGE_CHECKOUT, @@ -397,18 +395,18 @@ def checkout_paths_from_storage(*paths): universal_newlines=True, ) - if result.returncode != 0: + if result and result.returncode != 0: raise errors.GitLFSError(f"Error executing 'git lfs checkout: \n {result.stdout}") -def check_requires_tracking(*paths): +def check_requires_tracking(*paths: Union[Path, str]) -> Optional[List[str]]: """Check paths and return a list of those that must be tracked.""" if not project_context.external_storage_requested: - return + return None attrs = project_context.repository.get_attributes(*paths) - track_paths = [] + track_paths: List[str] = [] for path in paths: absolute_path = Path(os.path.abspath(project_context.path / path)) @@ -470,7 +468,7 @@ def add_migrate_pattern(pattern, collection): return includes, excludes -def check_lfs_migrate_info(everything=False, use_size_filter=True): +def check_lfs_migrate_info(everything: bool = False, use_size_filter: bool = True) -> List[str]: """Return list of file groups in history should be in LFS.""" ref = ( ["--everything"] @@ -510,7 +508,7 @@ def check_lfs_migrate_info(everything=False, use_size_filter=True): if lfs_output.returncode != 0: raise errors.GitLFSError(f"Error executing 'git lfs migrate info: \n {lfs_output.stdout}") - groups = [] + groups: List[str] = [] files_re = re.compile(r"(.*\s+[\d.]+\s+\S+).*") for line in lfs_output.stdout.split("\n"): @@ -526,7 +524,7 @@ def check_lfs_migrate_info(everything=False, use_size_filter=True): return groups -def migrate_files_to_lfs(paths): +def migrate_files_to_lfs(paths: List[str]): """Migrate files to Git LFS.""" if paths: includes: List[str] = ["--include", ",".join(paths)] @@ -534,11 +532,7 @@ def migrate_files_to_lfs(paths): else: includes, excludes = get_lfs_migrate_filters() - tempdir = Path(tempfile.mkdtemp()) - map_path = tempdir / "objectmap.csv" - object_map = [f"--object-map={map_path}"] - - command = _CMD_STORAGE_MIGRATE_IMPORT + includes + excludes + object_map + command = _CMD_STORAGE_MIGRATE_IMPORT + includes + excludes try: lfs_output = run(command, stdout=PIPE, stderr=STDOUT, cwd=project_context.path, text=True) @@ -547,78 +541,3 @@ def migrate_files_to_lfs(paths): if lfs_output.returncode != 0: raise errors.GitLFSError(f"Error executing 'git lfs migrate import: \n {lfs_output.stdout}") - - with open(map_path, newline="") as csvfile: - reader = csv.reader(csvfile, delimiter=",") - - commit_sha_mapping = [(r[0], r[1]) for r in reader] - - os.remove(map_path) - - sha_mapping = dict() - - repo_root = Path(".") - repository = project_context.repository - - for old_commit_sha, new_commit_sha in commit_sha_mapping: - old_commit = repository.get_commit(old_commit_sha) - new_commit = repository.get_commit(new_commit_sha) - processed = set() - - for diff in old_commit.get_changes(): - path_obj = Path(diff.b_path) - - # NOTE: Get git object hash mapping for files and parent folders - while path_obj != repo_root: - if path_obj in processed: - break - - path_str = str(path_obj) - old_sha = old_commit.tree[path_str].hexsha - new_sha = new_commit.tree[path_str].hexsha - - sha_mapping[old_sha] = new_sha - - processed.add(path_obj) - path_obj = path_obj.parent - - def _map_checksum(entity, checksum_mapping) -> Optional["Entity"]: - """Update the checksum and id of an entity based on a mapping.""" - from renku.domain_model.entity import Entity - from renku.domain_model.provenance.activity import Collection - - if entity.checksum not in checksum_mapping: - return None - - new_checksum = checksum_mapping[entity.checksum] - - if isinstance(entity, Collection) and entity.members: - members = [] - for member in entity.members: - new_member = _map_checksum(member, checksum_mapping) - if new_member: - members.append(new_member) - else: - members.append(member) - new_entity: Entity = Collection(checksum=new_checksum, path=entity.path, members=members) - else: - new_entity = Entity(checksum=new_checksum, path=entity.path) - - return new_entity - - def _map_checksum_old(entity, checksum_mapping): - """Update the checksum and id of an entity based on a mapping.""" - # TODO: Remove this method once moved to Entity with 'id' field - from renku.domain_model.provenance.activity import Collection - - if entity.checksum not in checksum_mapping: - return - - new_checksum = checksum_mapping[entity.checksum] - - entity._id = entity._id.replace(entity.checksum, new_checksum) - entity.checksum = new_checksum - - if isinstance(entity, Collection) and entity.members: - for member in entity.members: - _map_checksum_old(member, checksum_mapping)