Skip to content

Commit

Permalink
fix(workflow): set modification date when deriving a plan
Browse files Browse the repository at this point in the history
  • Loading branch information
m-alisafaee committed Feb 8, 2023
1 parent 7b05493 commit 9ec09a4
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 23 deletions.
3 changes: 2 additions & 1 deletion renku/command/checks/__init__.py
Expand Up @@ -30,7 +30,7 @@
from .project import check_project_id_group
from .storage import check_lfs_info
from .validate_shacl import check_datasets_structure, check_project_structure
from .workflow import check_activity_catalog
from .workflow import check_activity_catalog, check_modification_date

# Checks will be executed in the order as they are listed in __all__. They are mostly used in ``doctor`` command to
# inspect broken things. The order of operations matters when fixing issues, so, don't sort this list.
Expand All @@ -48,4 +48,5 @@
"check_missing_files",
"check_project_id_group",
"check_project_structure",
"check_modification_date",
)
69 changes: 68 additions & 1 deletion renku/command/checks/workflow.py
Expand Up @@ -17,11 +17,14 @@
# limitations under the License.
"""Checks needed to determine integrity of workflows."""

from typing import Optional, Tuple
from typing import List, Optional, Tuple, cast

from renku.command.command_builder import inject
from renku.command.util import WARNING
from renku.core.interface.plan_gateway import IPlanGateway
from renku.core.util import communication
from renku.domain_model.project_context import project_context
from renku.domain_model.workflow.plan import AbstractPlan
from renku.infrastructure.gateway.activity_gateway import reindex_catalog


Expand Down Expand Up @@ -58,3 +61,67 @@ def check_activity_catalog(fix, force, **_) -> Tuple[bool, Optional[str]]:
communication.info("Workflow metadata was rebuilt")

return True, None


@inject.autoparams("plan_gateway")
def check_modification_date(fix, plan_gateway: IPlanGateway, **_) -> Tuple[bool, Optional[str]]:
"""Check if all plans have modification date set for them.
Args:
fix(bool): Whether to fix found issues.
plan_gateway(IPlanGateway): Injected PlanGateway.
_: keyword arguments.
Returns:
Tuple[bool, Optional[str]]: Tuple of whether there are plans without modification date and a string of their IDs
"""
plans: List[AbstractPlan] = plan_gateway.get_all_plans()

to_be_processed = []
for plan in plans:
if not hasattr(plan, "date_modified") or plan.date_modified is None:
to_be_processed.append(plan)

if not to_be_processed:
return False, None
if not fix:
ids = [plan.id for plan in to_be_processed]
message = (
WARNING
+ "The following workflows have incorrect modification date (use 'renku doctor --fix' to fix them).:\n\t"
+ "\n\t".join(ids)
)
return True, message

fix_plan_dates(plans=to_be_processed, plan_gateway=plan_gateway)
project_context.database.commit()
communication.info("Workflow modification dates were fixed")

return True, None


def fix_plan_dates(plans: List[AbstractPlan], plan_gateway):
"""Set modification date on a list of plans and fix their creation date."""
processed = set()
# NOTE: switch creation date for modification date
for tail in plans:
to_be_processed: List[AbstractPlan] = []
if tail not in processed:
processed.add(tail)
to_be_processed.append(tail)
creation_date = tail.date_created
plan = tail

while plan.is_derivation():
plan = cast(AbstractPlan, plan_gateway.get_by_id(plan.derived_from))
creation_date = plan.date_created
if plan not in processed:
processed.add(plan)
to_be_processed.append(plan)

while to_be_processed:
plan = to_be_processed.pop()
plan.unfreeze()
plan.date_modified = plan.date_created
plan.date_created = creation_date
plan.freeze()
21 changes: 2 additions & 19 deletions renku/core/migration/m_0010__metadata_fixes.py
Expand Up @@ -26,6 +26,7 @@

import zstandard as zstd

from renku.command.checks.workflow import fix_plan_dates
from renku.command.command_builder import inject
from renku.core.interface.activity_gateway import IActivityGateway
from renku.core.interface.dataset_gateway import IDatasetGateway
Expand Down Expand Up @@ -257,25 +258,7 @@ def fix_plan_times(activity_gateway: IActivityGateway, plan_gateway: IPlanGatewa
plan.date_created = activity_map[plan.id].started_at_time
plan.freeze()

# NOTE: switch creation date for modification date
for tail in plan_gateway.get_newest_plans_by_names(include_deleted=True).values():
stack: List[AbstractPlan] = []
stack.append(tail)
creation_date = tail.date_created
plan = tail

while plan.is_derivation():
plan = cast(AbstractPlan, plan_gateway.get_by_id(plan.derived_from))
creation_date = plan.date_created
stack.append(plan)

while stack:
plan = stack.pop()
plan.unfreeze()
plan.date_modified = plan.date_created
plan.date_created = creation_date
plan.freeze

fix_plan_dates(plans=plans, plan_gateway=plan_gateway)
database.commit()


Expand Down
2 changes: 1 addition & 1 deletion renku/domain_model/workflow/plan.py
Expand Up @@ -317,7 +317,7 @@ def derive(self, creator: Optional[Person] = None) -> "Plan":
"""Create a new ``Plan`` that is derived from self."""
derived = copy.copy(self)
derived.derived_from = self.id
derived.date_created = local_now()
derived.date_modified = local_now()
derived.parameters = self.parameters.copy()
derived.inputs = self.inputs.copy()
derived.keywords = copy.deepcopy(self.keywords)
Expand Down
8 changes: 7 additions & 1 deletion tests/cli/test_workflow.py
Expand Up @@ -25,6 +25,7 @@
import shutil
import sys
import tempfile
import time
import uuid
from pathlib import Path

Expand All @@ -36,6 +37,7 @@
from renku.core.plugin.provider import available_workflow_providers
from renku.core.util.git import with_commit
from renku.core.util.yaml import write_yaml
from renku.domain_model.workflow.plan import Plan
from renku.infrastructure.database import Database
from renku.infrastructure.gateway.activity_gateway import ActivityGateway
from renku.infrastructure.gateway.plan_gateway import PlanGateway
Expand Down Expand Up @@ -428,17 +430,21 @@ def _get_plan_id(output):
database = Database.from_path(project.database_path)
test_plan = database["plans-by-name"][workflow_name]

time.sleep(1)

cmd = ["workflow", "edit", workflow_name, "--name", "first"]
result = runner.invoke(cli, cmd)
assert 0 == result.exit_code, format_result_exception(result)

workflow_name = "first"
database = Database.from_path(project.database_path)
first_plan = database["plans-by-name"]["first"]
first_plan: Plan = database["plans-by-name"]["first"]

assert first_plan
assert first_plan.name == "first"
assert first_plan.derived_from == test_plan.id
assert first_plan.date_created == test_plan.date_created
assert (first_plan.date_modified - first_plan.date_created).total_seconds() >= 1

cmd = ["workflow", "edit", workflow_name, "--description", "Test workflow"]
result = runner.invoke(cli, cmd)
Expand Down
21 changes: 21 additions & 0 deletions tests/core/test_plan.py
Expand Up @@ -16,9 +16,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Renku plan management tests."""
from datetime import datetime

import pytest

from renku.command.checks import check_modification_date
from renku.core import errors
from renku.core.workflow.plan import (
get_activities,
Expand Down Expand Up @@ -160,3 +162,22 @@ def test_get_activities(project_with_injection):
plan_activities = set(get_activities(plan))

assert set(activities[0:5]) == plan_activities


def test_modification_date_fix(project_with_injection):
"""Check that plans without modification date are fixed."""
_, _, plan, _, _, unrelated = create_dummy_plans()

date_created = plan.date_created
dummy_date = datetime(2023, 2, 8, 0, 42, 0)

# Remove change modification and creation dates on some plans
plan.date_created = dummy_date
del plan.date_modified
unrelated.date_modified = None

check_modification_date(fix=True)

assert dummy_date == plan.date_modified
assert unrelated.date_created == unrelated.date_modified
assert date_created == plan.date_created

0 comments on commit 9ec09a4

Please sign in to comment.