Skip to content

Commit

Permalink
feat(workflow): workflow revert command (#2956)
Browse files Browse the repository at this point in the history
  • Loading branch information
m-alisafaee committed Jun 29, 2022
1 parent 610e88a commit cb0e73d
Show file tree
Hide file tree
Showing 28 changed files with 1,036 additions and 216 deletions.
Binary file modified docs/_static/cheatsheet/cheatsheet.pdf
Binary file not shown.
2 changes: 1 addition & 1 deletion docs/cheatsheet_hash
@@ -1,2 +1,2 @@
3e7d7567c17eba64945b70a3c1009343 cheatsheet.tex
62d4f12d9be4d80c0dfbdef2957fde9a cheatsheet.tex
c70c179e07f04186ec05497564165f11 sdsc_cheatsheet.cls
34 changes: 5 additions & 29 deletions renku/command/checks/activities.py
Expand Up @@ -17,7 +17,7 @@
# limitations under the License.
"""Checks needed to determine integrity of datasets."""

from itertools import chain
import itertools

import click

Expand All @@ -33,7 +33,7 @@ def check_migrated_activity_ids(
client, fix, activity_gateway: IActivityGateway, database_dispatcher: IDatabaseDispatcher
):
"""Check that activity ids were correctly migrated in the past."""
activities = activity_gateway.get_all_activities()
activities = activity_gateway.get_all_activities(include_deleted=True)

wrong_activities = [a for a in activities if not a.id.startswith("/activities/")]

Expand All @@ -42,40 +42,16 @@ def check_migrated_activity_ids(
for activity in wrong_activities:
communication.info(f"Fixing activity '{activity.id}'")

old_id = activity.id

# NOTE: Remove activity relations
tok = current_database["activity-catalog"].tokenizeQuery
relations = chain(
list(current_database["activity-catalog"].findRelationChains(tok(downstream=activity))),
list(current_database["activity-catalog"].findRelationChains(tok(upstream=activity))),
)
for rel_collection in relations:
for r in list(rel_collection):
current_database["activity-catalog"].unindex(r)

current_database["activities"].pop(old_id)
activity_gateway.remove(activity, keep_reference=False)

# NOTE: Modify id on activity and children
activity.unfreeze()
activity.id = f"/activities/{activity.id}"
activity._p_oid = current_database.hash_id(activity.id)
activity.freeze()

for usage in activity.usages:
current_database["activities-by-usage"][usage.entity.path] = [
a for a in current_database["activities-by-usage"][usage.entity.path] if a != activity
]
object.__setattr__(usage, "id", f"/activities/{usage.id}")

for generation in activity.generations:
current_database["activities-by-generation"][generation.entity.path] = [
a for a in current_database["activities-by-generation"][generation.entity.path] if a != activity
]
object.__setattr__(generation, "id", f"/activities/{generation.id}")

for parameter in activity.parameters:
object.__setattr__(parameter, "id", f"/activities/{parameter.id}")
for attribute in itertools.chain(activity.usages, activity.generations, activity.parameters):
object.__setattr__(attribute, "id", f"/activities/{attribute.id}") # type: ignore

activity.association.id = f"/activities/{activity.association.id}"

Expand Down
5 changes: 4 additions & 1 deletion renku/command/graph.py
Expand Up @@ -160,7 +160,10 @@ def _get_graph_for_all_objects(
List of JSON-LD metadata.
"""
project = project_gateway.get_project()
objects: List[Union[Project, Dataset, DatasetTag, Activity, AbstractPlan]] = activity_gateway.get_all_activities()
# NOTE: Include deleted activities when exporting graph
objects: List[Union[Project, Dataset, DatasetTag, Activity, AbstractPlan]] = activity_gateway.get_all_activities(
include_deleted=True
)

processed_plans = set()

Expand Down
2 changes: 1 addition & 1 deletion renku/command/update.py
Expand Up @@ -62,7 +62,7 @@ def _update(
paths = get_relative_paths(base=client.path, paths=[Path.cwd() / p for p in paths])

modified, _ = get_all_modified_and_deleted_activities_and_entities(client.repository)
modified_activities = {a for a, _ in modified if is_activity_valid(a)}
modified_activities = {a for a, _ in modified if not a.deleted and is_activity_valid(a)}
modified_paths = {e.path for _, e in modified}

activities = get_downstream_generating_activities(
Expand Down
46 changes: 16 additions & 30 deletions renku/command/workflow.py
Expand Up @@ -17,14 +17,14 @@
# limitations under the License.
"""Renku workflow commands."""


import itertools
import re
from collections import defaultdict
from functools import reduce
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, cast

from renku.core.workflow.plan import remove_plan
from renku.domain_model.provenance.annotation import Annotation

if TYPE_CHECKING:
Expand All @@ -45,7 +45,12 @@
from renku.core.util import communication
from renku.core.util.datetime8601 import local_now
from renku.core.util.os import are_paths_related, get_relative_paths, safe_read_yaml
from renku.core.workflow.activity import create_activity_graph, get_activities_until_paths, sort_activities
from renku.core.workflow.activity import (
create_activity_graph,
get_activities_until_paths,
revert_activity,
sort_activities,
)
from renku.core.workflow.concrete_execution_graph import ExecutionGraph
from renku.core.workflow.plan_factory import delete_indirect_files_list
from renku.core.workflow.value_resolution import CompositePlanValueResolver, ValueResolver
Expand Down Expand Up @@ -110,35 +115,9 @@ def list_workflows_command():
return Command().command(_list_workflows).require_migration().with_database(write=False)


@inject.autoparams()
def _remove_workflow(name: str, force: bool, plan_gateway: IPlanGateway):
"""Remove the remote named <name>.
Args:
name (str): The name of the Plan to remove.
force (bool): Whether to force removal or not.
plan_gateway(IPlanGateway): The injected Plan gateway.
Raises:
errors.ParameterError: If the Plan doesn't exist or was already deleted.
"""
workflows = plan_gateway.get_newest_plans_by_names()
plan = None
if name.startswith("/plans/"):
plan = next(filter(lambda x: x.id == name, workflows.values()), None)
if not plan and name not in workflows:
raise errors.ParameterError(f'The specified workflow is "{name}" is not an active workflow.')

if not force:
prompt_text = f'You are about to remove the following workflow "{name}".' + "\n" + "\nDo you wish to continue?"
communication.confirm(prompt_text, abort=True, warning=True)

plan = plan or workflows[name]
plan.delete()


def remove_workflow_command():
def remove_plan_command():
"""Command that removes the workflow named <name>."""
return Command().command(_remove_workflow).require_clean().with_database(write=True).with_commit()
return Command().command(remove_plan).require_clean().with_database(write=True).with_commit()


def _show_workflow(name_or_id: str):
Expand Down Expand Up @@ -954,3 +933,10 @@ def iterate_workflow_command():
return (
Command().command(_iterate_workflow).require_migration().require_clean().with_database(write=True).with_commit()
)


def revert_activity_command():
"""Command that reverts an activity."""
return (
Command().command(revert_activity).require_migration().require_clean().with_database(write=True).with_commit()
)
8 changes: 8 additions & 0 deletions renku/core/errors.py
Expand Up @@ -33,6 +33,14 @@ class RenkuException(Exception):
"""


class ActivityDownstreamNotEmptyError(RenkuException):
"""Raised when an activity cannot be deleted because its downstream is not empty."""

def __init__(self, activity):
self.activity = activity
super().__init__(f"Activity '{activity.id}' has non-empty downstream")


class LockError(RenkuException):
"""Raise when a project cannot be locked."""

Expand Down
16 changes: 15 additions & 1 deletion renku/core/interface/activity_gateway.py
Expand Up @@ -27,6 +27,10 @@
class IActivityGateway(ABC):
"""Interface for the ActivityGateway."""

def get_by_id(self, id: str) -> Optional[Activity]:
"""Get an activity by id."""
raise NotImplementedError

def get_all_usage_paths(self) -> List[str]:
"""Return all usage paths."""
raise NotImplementedError
Expand Down Expand Up @@ -59,7 +63,7 @@ def get_upstream_activity_chains(self, activity: Activity) -> List[Tuple[Activit
"""Get a list of tuples of all upstream paths of this activity."""
raise NotImplementedError

def get_all_activities(self) -> List[Activity]:
def get_all_activities(self, include_deleted: bool = False) -> List[Activity]:
"""Get all activities in the project."""
raise NotImplementedError

Expand All @@ -74,3 +78,13 @@ def add_activity_collection(self, activity_collection: ActivityCollection):
def get_all_activity_collections(self) -> List[ActivityCollection]:
"""Get all activity collections in the project."""
raise NotImplementedError

def remove(self, activity: Activity, keep_reference: bool = True, force: bool = False):
"""Remove an activity from the storage.
Args:
activity(Activity): The activity to be removed.
keep_reference(bool): Whether to keep the activity in the ``activities`` index or not.
force(bool): Force-delete the activity even if it has downstream activities.
"""
raise NotImplementedError
6 changes: 3 additions & 3 deletions renku/core/interface/plan_gateway.py
Expand Up @@ -26,7 +26,7 @@
class IPlanGateway(ABC):
"""Interface for the PlanGateway."""

def get_by_id(self, id: str) -> Optional[AbstractPlan]:
def get_by_id(self, id: Optional[str]) -> Optional[AbstractPlan]:
"""Get a plan by id."""
raise NotImplementedError

Expand All @@ -38,8 +38,8 @@ def list_by_name(self, starts_with: str, ends_with: str = None) -> List[str]:
"""Search plans by name."""
raise NotImplementedError

def get_newest_plans_by_names(self, with_invalidated: bool = False) -> Dict[str, AbstractPlan]:
"""Return a list of all newest plans with their names."""
def get_newest_plans_by_names(self, include_deleted: bool = False) -> Dict[str, AbstractPlan]:
"""Return a mapping of all plan names to their newest plans."""
raise NotImplementedError

def get_all_plans(self) -> List[AbstractPlan]:
Expand Down

0 comments on commit cb0e73d

Please sign in to comment.