Skip to content

Commit

Permalink
chore: remove unused code in renku lfs migrate (#3476)
Browse files Browse the repository at this point in the history
  • Loading branch information
Panaetius committed May 25, 2023
1 parent e02e5bf commit 5730ce5
Showing 1 changed file with 32 additions and 113 deletions.
145 changes: 32 additions & 113 deletions renku/core/storage.py
Expand Up @@ -15,7 +15,6 @@
# limitations under the License.
"""Logic for handling a data storage."""

import csv
import functools
import itertools
import os
Expand All @@ -26,7 +25,7 @@
from pathlib import Path
from shutil import move, which
from subprocess import PIPE, STDOUT, check_output, run
from typing import TYPE_CHECKING, List, Optional, Tuple
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union

import pathspec

Expand All @@ -39,7 +38,6 @@
from renku.domain_model.project_context import project_context

if TYPE_CHECKING:
from renku.domain_model.entity import Entity # type: ignore
from renku.infrastructure.repository import Repository


Expand Down Expand Up @@ -99,12 +97,12 @@ def wrapper(*args, **kwargs):


@functools.lru_cache
def storage_installed():
def storage_installed() -> bool:
"""Verify that git-lfs is installed and on system PATH."""
return bool(which("git-lfs"))


def storage_installed_locally():
def storage_installed_locally() -> bool:
"""Verify that git-lfs is installed for the project."""
repo_config = project_context.repository.get_configuration(scope="local")
return repo_config.has_section('filter "lfs"')
Expand All @@ -129,7 +127,7 @@ def check_external_storage():
return is_storage_installed


def renku_lfs_ignore():
def renku_lfs_ignore() -> pathspec.PathSpec:
"""Gets pathspec for files to not add to LFS."""
ignore_path = project_context.path / RENKU_LFS_IGNORE_PATH

Expand All @@ -141,14 +139,14 @@ def renku_lfs_ignore():
return pathspec.PathSpec.from_lines("renku_gitwildmatch", lines)


def get_minimum_lfs_file_size():
def get_minimum_lfs_file_size() -> int:
"""The minimum size of a file in bytes to be added to lfs."""
size = get_value("renku", "lfs_threshold")

return parse_file_size(size)


def init_external_storage(force=False):
def init_external_storage(force: bool = False) -> None:
"""Initialize the external storage for data."""
try:
result = run(
Expand All @@ -166,13 +164,13 @@ def init_external_storage(force=False):


@check_external_storage_wrapper
def track_paths_in_storage(*paths):
def track_paths_in_storage(*paths: Union[Path, str]) -> Optional[List[str]]:
"""Track paths in the external storage."""
if not project_context.external_storage_requested or not check_external_storage():
return
return None

# Calculate which paths can be tracked in lfs
track_paths = []
track_paths: List[str] = []
attrs = project_context.repository.get_attributes(*paths)

for path in paths:
Expand Down Expand Up @@ -210,7 +208,7 @@ def track_paths_in_storage(*paths):
universal_newlines=True,
)

if result.returncode != 0:
if result and result.returncode != 0:
raise errors.GitLFSError(f"Error executing 'git lfs track: \n {result.stdout}")
except (KeyboardInterrupt, OSError) as e:
raise errors.ParameterError(f"Couldn't run 'git lfs':\n{e}")
Expand All @@ -227,7 +225,7 @@ def track_paths_in_storage(*paths):


@check_external_storage_wrapper
def untrack_paths_from_storage(*paths):
def untrack_paths_from_storage(*paths: Union[Path, str]) -> None:
"""Untrack paths from the external storage."""
try:
result = run_command(
Expand All @@ -239,25 +237,25 @@ def untrack_paths_from_storage(*paths):
universal_newlines=True,
)

if result.returncode != 0:
if result and result.returncode != 0:
raise errors.GitLFSError(f"Error executing 'git lfs untrack: \n {result.stdout}")
except (KeyboardInterrupt, OSError) as e:
raise errors.ParameterError(f"Couldn't run 'git lfs':\n{e}")


@check_external_storage_wrapper
def list_tracked_paths():
def list_tracked_paths() -> List[Path]:
"""List paths tracked in lfs."""
try:
files = check_output(_CMD_STORAGE_LIST, cwd=project_context.path, encoding="UTF-8")
except (KeyboardInterrupt, OSError) as e:
raise errors.ParameterError(f"Couldn't run 'git lfs ls-files':\n{e}")
files_split = [project_context.path / f for f in files.splitlines()]
files_split: List[Path] = [project_context.path / f for f in files.splitlines()]
return files_split


@check_external_storage_wrapper
def list_unpushed_lfs_paths(repository: "Repository"):
def list_unpushed_lfs_paths(repository: "Repository") -> List[Path]:
"""List paths tracked in lfs for a repository."""

if len(repository.remotes) < 1 or (repository.active_branch and not repository.active_branch.remote_branch):
Expand All @@ -279,7 +277,7 @@ def list_unpushed_lfs_paths(repository: "Repository"):


@check_external_storage_wrapper
def pull_paths_from_storage(repository: "Repository", *paths):
def pull_paths_from_storage(repository: "Repository", *paths: Union[Path, str]):
"""Pull paths from LFS."""
project_dict = defaultdict(list)

Expand All @@ -304,19 +302,19 @@ def pull_paths_from_storage(repository: "Repository", *paths):
universal_newlines=True,
)

if result.returncode != 0:
if result and result.returncode != 0:
raise errors.GitLFSError(f"Cannot pull LFS objects from server:\n {result.stdout}")


@check_external_storage_wrapper
def clean_storage_cache(*check_paths):
def clean_storage_cache(*check_paths: Union[Path, str]) -> Tuple[List[str], List[str]]:
"""Remove paths from lfs cache."""
project_dict = defaultdict(list)
repositories = {}
tracked_paths = {}
unpushed_paths = {}
untracked_paths = []
local_only_paths = []
repositories: Dict[Path, "Repository"] = {}
tracked_paths: Dict[Path, List[Path]] = {}
unpushed_paths: Dict[Path, List[Path]] = {}
untracked_paths: List[str] = []
local_only_paths: List[str] = []

repository = project_context.repository

Expand Down Expand Up @@ -386,7 +384,7 @@ def clean_storage_cache(*check_paths):


@check_external_storage_wrapper
def checkout_paths_from_storage(*paths):
def checkout_paths_from_storage(*paths: Union[Path, str]):
"""Checkout a paths from LFS."""
result = run_command(
_CMD_STORAGE_CHECKOUT,
Expand All @@ -397,18 +395,18 @@ def checkout_paths_from_storage(*paths):
universal_newlines=True,
)

if result.returncode != 0:
if result and result.returncode != 0:
raise errors.GitLFSError(f"Error executing 'git lfs checkout: \n {result.stdout}")


def check_requires_tracking(*paths):
def check_requires_tracking(*paths: Union[Path, str]) -> Optional[List[str]]:
"""Check paths and return a list of those that must be tracked."""

if not project_context.external_storage_requested:
return
return None

attrs = project_context.repository.get_attributes(*paths)
track_paths = []
track_paths: List[str] = []

for path in paths:
absolute_path = Path(os.path.abspath(project_context.path / path))
Expand Down Expand Up @@ -470,7 +468,7 @@ def add_migrate_pattern(pattern, collection):
return includes, excludes


def check_lfs_migrate_info(everything=False, use_size_filter=True):
def check_lfs_migrate_info(everything: bool = False, use_size_filter: bool = True) -> List[str]:
"""Return list of file groups in history should be in LFS."""
ref = (
["--everything"]
Expand Down Expand Up @@ -510,7 +508,7 @@ def check_lfs_migrate_info(everything=False, use_size_filter=True):
if lfs_output.returncode != 0:
raise errors.GitLFSError(f"Error executing 'git lfs migrate info: \n {lfs_output.stdout}")

groups = []
groups: List[str] = []
files_re = re.compile(r"(.*\s+[\d.]+\s+\S+).*")

for line in lfs_output.stdout.split("\n"):
Expand All @@ -526,19 +524,15 @@ def check_lfs_migrate_info(everything=False, use_size_filter=True):
return groups


def migrate_files_to_lfs(paths):
def migrate_files_to_lfs(paths: List[str]):
"""Migrate files to Git LFS."""
if paths:
includes: List[str] = ["--include", ",".join(paths)]
excludes: List[str] = []
else:
includes, excludes = get_lfs_migrate_filters()

tempdir = Path(tempfile.mkdtemp())
map_path = tempdir / "objectmap.csv"
object_map = [f"--object-map={map_path}"]

command = _CMD_STORAGE_MIGRATE_IMPORT + includes + excludes + object_map
command = _CMD_STORAGE_MIGRATE_IMPORT + includes + excludes

try:
lfs_output = run(command, stdout=PIPE, stderr=STDOUT, cwd=project_context.path, text=True)
Expand All @@ -547,78 +541,3 @@ def migrate_files_to_lfs(paths):

if lfs_output.returncode != 0:
raise errors.GitLFSError(f"Error executing 'git lfs migrate import: \n {lfs_output.stdout}")

with open(map_path, newline="") as csvfile:
reader = csv.reader(csvfile, delimiter=",")

commit_sha_mapping = [(r[0], r[1]) for r in reader]

os.remove(map_path)

sha_mapping = dict()

repo_root = Path(".")
repository = project_context.repository

for old_commit_sha, new_commit_sha in commit_sha_mapping:
old_commit = repository.get_commit(old_commit_sha)
new_commit = repository.get_commit(new_commit_sha)
processed = set()

for diff in old_commit.get_changes():
path_obj = Path(diff.b_path)

# NOTE: Get git object hash mapping for files and parent folders
while path_obj != repo_root:
if path_obj in processed:
break

path_str = str(path_obj)
old_sha = old_commit.tree[path_str].hexsha
new_sha = new_commit.tree[path_str].hexsha

sha_mapping[old_sha] = new_sha

processed.add(path_obj)
path_obj = path_obj.parent

def _map_checksum(entity, checksum_mapping) -> Optional["Entity"]:
"""Update the checksum and id of an entity based on a mapping."""
from renku.domain_model.entity import Entity
from renku.domain_model.provenance.activity import Collection

if entity.checksum not in checksum_mapping:
return None

new_checksum = checksum_mapping[entity.checksum]

if isinstance(entity, Collection) and entity.members:
members = []
for member in entity.members:
new_member = _map_checksum(member, checksum_mapping)
if new_member:
members.append(new_member)
else:
members.append(member)
new_entity: Entity = Collection(checksum=new_checksum, path=entity.path, members=members)
else:
new_entity = Entity(checksum=new_checksum, path=entity.path)

return new_entity

def _map_checksum_old(entity, checksum_mapping):
"""Update the checksum and id of an entity based on a mapping."""
# TODO: Remove this method once moved to Entity with 'id' field
from renku.domain_model.provenance.activity import Collection

if entity.checksum not in checksum_mapping:
return

new_checksum = checksum_mapping[entity.checksum]

entity._id = entity._id.replace(entity.checksum, new_checksum)
entity.checksum = new_checksum

if isinstance(entity, Collection) and entity.members:
for member in entity.members:
_map_checksum_old(member, checksum_mapping)

0 comments on commit 5730ce5

Please sign in to comment.