Skip to content

Commit

Permalink
fix(core): various migration issues (#2488)
Browse files Browse the repository at this point in the history
  • Loading branch information
m-alisafaee committed Nov 30, 2021
1 parent 3025340 commit ac93b18
Show file tree
Hide file tree
Showing 11 changed files with 87 additions and 63 deletions.
2 changes: 1 addition & 1 deletion renku/core/commands/run.py
Expand Up @@ -61,7 +61,7 @@ def _run_command(
client = client_dispatcher.current_client

if name:
valid_name = get_slug(name, invalid_chars=["."])
valid_name = get_slug(name, invalid_chars=["."], lowercase=False)
if name != valid_name:
raise errors.ParameterError(f"Invalid name: '{name}' (Hint: '{valid_name}' is valid).")

Expand Down
2 changes: 1 addition & 1 deletion renku/core/management/datasets.py
Expand Up @@ -150,7 +150,7 @@ def create_dataset(
raise errors.ParameterError("Dataset name must be provided.")

if not is_dataset_name_valid(name):
valid_name = get_slug(name)
valid_name = get_slug(name, lowercase=False)
raise errors.ParameterError(f'Dataset name "{name}" is not valid (Hint: "{valid_name}" is valid).')

if self.get_dataset(name=name):
Expand Down
2 changes: 1 addition & 1 deletion renku/core/management/migrations/m_0005__2_cwl.py
Expand Up @@ -454,7 +454,7 @@ def from_client(cls, client):
if file.deleted:
continue

path = file.a_path
path = file.b_path
cache[path].append(commit)

if path.startswith(f"{client.renku_home}/workflow/") and path.endswith(".cwl"):
Expand Down
63 changes: 38 additions & 25 deletions renku/core/management/migrations/m_0009__new_metadata_storage.py
Expand Up @@ -21,6 +21,7 @@
import shutil
import traceback
import uuid
from collections import defaultdict
from itertools import chain
from pathlib import Path, PurePosixPath
from typing import List, Optional, Union
Expand Down Expand Up @@ -65,12 +66,10 @@ def migrate(migration_context):
"""Migration function."""
client = migration_context.client
committed = _commit_previous_changes(client)
# TODO: set remove=True once the migration to the new metadata is finalized
# NOTE: Initialize submodules
_ = client.repository.submodules
generate_new_metadata(
remove=False,
committed=committed,
strict=migration_context.options.strict,
migration_type=migration_context.options.type,
committed=committed, strict=migration_context.options.strict, migration_type=migration_context.options.type
)
_remove_dataset_metadata_files(client)

Expand Down Expand Up @@ -191,7 +190,7 @@ def generate_new_metadata(
try:
# NOTE: Don't migrate workflows for dataset-only migrations
if MigrationType.WORKFLOWS in migration_type:
_process_workflows(client=client, activity_gateway=activity_gateway, commit=commit, remove=remove)
_process_workflows(activity_gateway=activity_gateway, commit=commit, remove=remove, client=client)
_process_datasets(
client=client, commit=commit, datasets_provenance=datasets_provenance, is_last_commit=is_last_commit
)
Expand All @@ -212,7 +211,7 @@ def generate_new_metadata(
database_gateway.commit()


def _convert_run_to_plan(run: old_schema.Run, client: LocalClient) -> Plan:
def _convert_run_to_plan(run: old_schema.Run, project_id) -> Plan:
"""Create a Plan from a Run."""
assert not run.subprocesses, f"Cannot create a Plan from a Run with subprocesses: {run._id}"

Expand Down Expand Up @@ -289,7 +288,7 @@ def convert_output(output: old_schema.CommandOutput) -> CommandOutput:
name=run.name,
outputs=[convert_output(o) for o in run.outputs],
parameters=[convert_argument(a) for a in run.arguments],
project_id=client.project.id,
project_id=project_id,
success_codes=run.successcodes,
)

Expand Down Expand Up @@ -324,7 +323,7 @@ def _process_workflows(client: LocalClient, activity_gateway: IActivityGateway,
if file.deleted:
continue

path: str = file.a_path
path: str = file.b_path

if not path.startswith(".renku/workflow") or not path.endswith(".yaml"):
continue
Expand All @@ -341,26 +340,21 @@ def _process_workflows(client: LocalClient, activity_gateway: IActivityGateway,
activities = [workflow]

for old_activity in activities:
new_activities = _process_run_to_new_activity(process_run=old_activity)
new_activities = _process_run_to_new_activity(process_run=old_activity, client=client)
for new_activity in new_activities:
activity_gateway.add(new_activity)

if remove:
try:
os.remove(file.a_path)
os.remove(file.b_path)
except FileNotFoundError:
pass


@inject.autoparams("client_dispatcher")
def _process_run_to_new_activity(
process_run: old_schema.ProcessRun, client_dispatcher: IClientDispatcher
) -> List[Activity]:
def _process_run_to_new_activity(process_run: old_schema.ProcessRun, client: LocalClient) -> List[Activity]:
"""Convert a ProcessRun to a new Activity."""
assert not isinstance(process_run, old_schema.WorkflowRun)

client = client_dispatcher.current_client

run = process_run.association.plan

if run.subprocesses:
Expand All @@ -372,7 +366,7 @@ def _process_run_to_new_activity(
for run in runs:
activity_id = Activity.generate_id()

plan = _convert_run_to_plan(run, client)
plan = _convert_run_to_plan(run, project_id=client.project.id)

agents = [_old_agent_to_new_agent(a) for a in process_run.agents or []]
association_agent = _old_agent_to_new_agent(process_run.association.agent)
Expand Down Expand Up @@ -448,7 +442,7 @@ def _convert_used_entity(entity: old_schema.Entity, revision: str, activity_id:

checksum = client.repository.get_object_hash(revision=revision, path=entity.path)
if not checksum:
communication.warn(f"Entity '{entity.path}' not found at '{revision}'")
_EntityWarningCache.warn(path=entity.path, revision=revision)
checksum = NON_EXISTING_ENTITY_CHECKSUM

if isinstance(entity, old_schema.Collection):
Expand Down Expand Up @@ -476,7 +470,7 @@ def _convert_generated_entity(entity: old_schema.Entity, revision: str, activity
assert isinstance(entity, old_schema.Entity)

try:
entity_commit = client.repository.get_previous_commit(path=entity.path, revision=revision)
entity_commit = client.repository.get_previous_commit(path=entity.path, revision=revision, submodule=True)
except errors.GitCommitNotFoundError:
return None

Expand All @@ -485,7 +479,7 @@ def _convert_generated_entity(entity: old_schema.Entity, revision: str, activity

checksum = client.repository.get_object_hash(revision=revision, path=entity.path)
if not checksum:
communication.warn(f"Entity '{entity.path}' not found at '{revision}'")
_EntityWarningCache.warn(path=entity.path, revision=revision)
checksum = NON_EXISTING_ENTITY_CHECKSUM

if isinstance(entity, old_schema.Collection):
Expand All @@ -511,14 +505,14 @@ def _convert_invalidated_entity(entity: old_schema.Entity, client) -> Optional[E
assert not isinstance(entity, old_schema.Collection), f"Collection passed as invalidated: {entity._id}"

commit_sha = _extract_commit_sha(entity_id=entity._id)
commit = client.repository.get_previous_commit(revision=commit_sha, path=entity.path)
commit = client.repository.get_previous_commit(revision=commit_sha, path=entity.path, submodule=True)
revision = commit.hexsha
checksum = client.repository.get_object_hash(revision=revision, path=entity.path)
if not checksum:
# Entity was deleted at revision; get the one before it to have object_id
checksum = client.repository.get_object_hash(revision=f"{revision}~", path=entity.path)
if not checksum:
communication.warn(f"Entity '{entity.path}' not found at '{revision}'")
_EntityWarningCache.warn(path=entity.path, revision=revision)
checksum = NON_EXISTING_ENTITY_CHECKSUM

new_entity = Entity(checksum=checksum, path=entity.path)
Expand Down Expand Up @@ -584,7 +578,7 @@ def _old_agent_to_new_agent(

def _process_datasets(client: LocalClient, commit: Commit, datasets_provenance: DatasetsProvenance, is_last_commit):
changes = commit.get_changes(paths=".renku/datasets/*/*.yml")
changed_paths = [c.a_path for c in changes]
changed_paths = [c.b_path for c in changes if not c.deleted]
paths = [p for p in changed_paths if len(Path(p).parents) == 4] # Exclude files that are not in the right place
deleted_paths = [c.a_path for c in changes if c.deleted]

Expand Down Expand Up @@ -652,7 +646,7 @@ def copy_and_migrate_datasets():
for path in paths:
rev = revision
if path in deleted_paths:
rev = client.repository.get_previous_commit(path, revision=f"{revision}~")
rev = client.repository.get_previous_commit(path, revision=f"{revision}~", submodule=True)
identifier = get_dataset_identifier(path)
if not identifier:
continue
Expand Down Expand Up @@ -739,3 +733,22 @@ def _remove_dataset_metadata_files(client: LocalClient):
shutil.rmtree(os.path.join(client.renku_path, "refs", OLD_DATASETS_PATH))
except FileNotFoundError:
pass


class _EntityWarningCache:
"""Cache warning messages to avoid re-prints."""

cache = defaultdict(set)

@staticmethod
def warn(path, revision):
"""Print a warning if it's not already printed for the path/revision."""
path = str(path)
revision = str(revision)

already_warned = _EntityWarningCache.cache[path]
if revision in already_warned:
return

already_warned.add(revision)
communication.warn(f"Entity '{path}' not found at '{revision}'")
6 changes: 3 additions & 3 deletions renku/core/management/migrations/models/v9.py
Expand Up @@ -166,7 +166,7 @@ def __attrs_post_init__(self):
if not self.creator and self.client:
if self.client.database_path.exists():
self.creator = Person.from_commit(
self.client.repository.get_previous_commit(self.client.database_path, return_first=True)
self.client.repository.get_previous_commit(self.client.database_path, first=True)
)
else:
# this assumes the project is being newly created
Expand Down Expand Up @@ -294,7 +294,7 @@ def from_revision(cls, client, path, revision="HEAD", parent=None, find_previous
if path != "." and path_.is_dir():
entity = Collection(client=client, commit=commit, path=path, members=[], parent=parent)

files_in_commit = [c.a_path for c in commit.get_changes() if not c.deleted]
files_in_commit = [c.b_path for c in commit.get_changes() if not c.deleted]

# update members with commits
for member in path_.iterdir():
Expand Down Expand Up @@ -1558,7 +1558,7 @@ def name_validator(self, attribute, value):
"""Validate name."""
# name might have been escaped and have '%' in it
if value and not is_dataset_name_valid(value):
raise errors.ParameterError('Invalid "name": {}'.format(value))
raise errors.ParameterError(f"Invalid name: `{value}`")

@property
def short_id(self):
Expand Down
2 changes: 1 addition & 1 deletion renku/core/management/storage.py
Expand Up @@ -516,7 +516,7 @@ def migrate_files_to_lfs(self, paths):
processed = set()

for diff in old_commit.get_changes():
path_obj = Path(diff.a_path)
path_obj = Path(diff.b_path)

# NOTE: Get git object hash mapping for files and parent folders
while path_obj != repo_root:
Expand Down
2 changes: 1 addition & 1 deletion renku/core/management/workflow/plan_factory.py
Expand Up @@ -558,7 +558,7 @@ def watch(self, client_dispatcher: IClientDispatcher, no_output=False):
candidates |= {file_ for file_ in repository.untracked_files}

# Capture modified files through redirects.
candidates |= {o.a_path for o in repository.unstaged_changes if not o.deleted}
candidates |= {o.b_path for o in repository.unstaged_changes if not o.deleted}

# Include explicit outputs
candidates |= {str(path.relative_to(self.working_dir)) for path in self.explicit_outputs}
Expand Down
57 changes: 31 additions & 26 deletions renku/core/metadata/repository.py
Expand Up @@ -109,14 +109,7 @@ def staged_changes(self) -> List["Diff"]:
"""
try:
diff = self._repository.index.diff("HEAD", ignore_submodules=True)
return [
Diff(
a_path=git_unicode_unescape(d.a_path),
b_path=git_unicode_unescape(d.b_path),
change_type=d.change_type,
)
for d in diff
]
return [Diff.from_diff(d) for d in diff]
except (git.BadName, git.BadObject, git.GitError) as e:
raise errors.GitError("Cannot get staged changes") from e

Expand All @@ -125,14 +118,7 @@ def unstaged_changes(self) -> List["Diff"]:
"""Return a list of changes that are not staged."""
try:
diff = self._repository.index.diff(None, ignore_submodules=True)
return [
Diff(
a_path=git_unicode_unescape(d.a_path),
b_path=git_unicode_unescape(d.b_path),
change_type=d.change_type,
)
for d in diff
]
return [Diff.from_diff(d) for d in diff]
except (git.BadName, git.BadObject, git.GitError) as e:
raise errors.GitError("Cannot get modified changes") from e

Expand Down Expand Up @@ -287,14 +273,24 @@ def get_attributes(self, *paths: Union[Path, str]) -> Dict[str, Dict[str, str]]:
return attributes

def get_previous_commit(
self, path: Union[Path, str], revision: Union["Commit", str] = None, full_history: bool = False
self,
path: Union[Path, str],
revision: Union["Commit", str] = None,
first: bool = False,
full_history: bool = True,
submodule: bool = False,
) -> Optional["Commit"]:
"""Return a previous commit for a given path starting from ``revision``."""
revision = revision or "HEAD"
assert isinstance(revision, (Commit, str)), f"'revision' must be Commit/str not '{type(revision)}'"

commit = _find_previous_commit_helper(
repository=self, path=path, revision=str(revision), full_history=full_history
repository=self,
path=path,
revision=str(revision),
first=first,
full_history=full_history,
submodules=submodule,
)
if not commit:
raise errors.GitCommitNotFoundError(f"Cannot find previous commit for '{path}' from '{revision}'")
Expand Down Expand Up @@ -423,7 +419,7 @@ def get_content_from_submodules():

raise errors.ExportError(f"File not found in the repository: '{revision}/{checksum}:{path}'")

def get_object_hash(self, path: Union[Path, str], revision: str = None) -> Optional[str]:
def get_object_hash(self, path: Union[Path, str], revision: Union["Commit", str] = None) -> Optional[str]:
"""Return git hash of an object in a Repo or its submodule.
NOTE: path must be relative to the repo's root regardless if this function is called from a subdirectory or not.
Expand Down Expand Up @@ -715,6 +711,7 @@ def __hash__(self) -> int:
class Diff(NamedTuple):
"""A single diff object between two trees."""

# NOTE: In case a rename, a_path and b_path have different values. Make sure to use the correct one.
a_path: str
b_path: str
"""
Expand All @@ -727,6 +724,18 @@ class Diff(NamedTuple):
"""
change_type: str

@classmethod
def from_diff(cls, diff: git.Diff):
"""Create an instance from a git object."""
a_path = git_unicode_unescape(diff.a_path)
b_path = git_unicode_unescape(diff.b_path)

# NOTE: Make sure a_path or b_path are the same in case of addition or deletion
a_path = a_path or b_path
b_path = b_path or a_path

return cls(a_path=a_path, b_path=b_path, change_type=diff.change_type)

@property
def deleted(self) -> bool:
"""True if file was deleted."""
Expand Down Expand Up @@ -826,12 +835,7 @@ def get_changes(
# NOTE: A merge commit, so there is no clear diff
return []

return [
Diff(
a_path=git_unicode_unescape(d.a_path), b_path=git_unicode_unescape(d.b_path), change_type=d.change_type
)
for d in diff
]
return [Diff.from_diff(d) for d in diff]

def traverse(self) -> Generator[Object, None, None]:
"""Traverse over all objects that are present in this commit."""
Expand Down Expand Up @@ -1180,7 +1184,7 @@ def _find_previous_commit_helper(
repository: BaseRepository,
path: Union[Path, str],
revision: str = None,
first=False,
first: bool = False,
full_history: bool = False,
submodules: bool = False,
) -> Optional[Commit]:
Expand Down Expand Up @@ -1208,6 +1212,7 @@ def get_previous_commit_from_submodules() -> Optional[Commit]:
revision=revision,
first=first,
full_history=full_history,
submodules=submodules,
)
if commit:
return commit
Expand Down
2 changes: 1 addition & 1 deletion renku/core/models/dataset.py
Expand Up @@ -44,7 +44,7 @@

def is_dataset_name_valid(name):
"""Check if name is a valid slug."""
return name and name == get_slug(name)
return name and name == get_slug(name, lowercase=False)


def generate_default_name(dataset_title, dataset_version=None):
Expand Down

0 comments on commit ac93b18

Please sign in to comment.