Skip to content

Commit

Permalink
fix(core): make status and update consider all relevant activities (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Panaetius committed Nov 24, 2021
1 parent 6780b59 commit c7e2d66
Show file tree
Hide file tree
Showing 11 changed files with 97 additions and 123 deletions.
10 changes: 7 additions & 3 deletions renku/core/commands/status.py
Expand Up @@ -26,7 +26,7 @@
from renku.core.management.interface.client_dispatcher import IClientDispatcher
from renku.core.models.entity import Entity
from renku.core.models.provenance.activity import Activity
from renku.core.utils.metadata import get_modified_activities
from renku.core.utils.metadata import add_activity_if_recent, get_modified_activities
from renku.core.utils.os import get_relative_path_to_cwd, get_relative_paths


Expand Down Expand Up @@ -87,7 +87,11 @@ def mark_generations_as_stale(activity):

def _get_modified_paths(activity_gateway, repository) -> Tuple[Set[Tuple[Activity, Entity]], Set[str]]:
"""Get modified and deleted usages/inputs of a list of activities."""
latest_activities = activity_gateway.get_latest_activity_per_plan().values()
modified, deleted = get_modified_activities(activities=latest_activities, repository=repository)
all_activities = activity_gateway.get_all_activities()

relevant_activities = set()
for activity in all_activities:
add_activity_if_recent(activity, relevant_activities)
modified, deleted = get_modified_activities(activities=list(relevant_activities), repository=repository)

return modified, {e.path for _, e in deleted}
8 changes: 5 additions & 3 deletions renku/core/commands/update.py
Expand Up @@ -108,9 +108,11 @@ def _is_activity_valid(activity: Activity, plan_gateway: IPlanGateway, client_di

def _get_modified_activities_and_paths(repository, activity_gateway) -> Tuple[Set[Activity], Set[str]]:
"""Return latest activities that one of their inputs is modified."""
latest_activities = activity_gateway.get_latest_activity_per_plan().values()
modified, _ = get_modified_activities(activities=latest_activities, repository=repository)

all_activities = activity_gateway.get_all_activities()
relevant_activities = set()
for activity in all_activities:
add_activity_if_recent(activity, relevant_activities)
modified, _ = get_modified_activities(activities=list(relevant_activities), repository=repository)
return {a for a, _ in modified if _is_activity_valid(a)}, {e.path for _, e in modified}


Expand Down
3 changes: 2 additions & 1 deletion renku/core/commands/workflow.py
Expand Up @@ -480,7 +480,8 @@ def execute_workflow(
for plan in plans:
# NOTE: Update plans are copies of Plan objects. We need to use the original Plan objects to avoid duplicates.
original_plan = plan_gateway.get_by_id(plan.id)
activity = Activity.from_plan(plan=original_plan, started_at_time=started_at_time, ended_at_time=ended_at_time)
activity = Activity.from_plan(plan=plan, started_at_time=started_at_time, ended_at_time=ended_at_time)
activity.association.plan = original_plan
activity_gateway.add(activity)
activities.append(activity)

Expand Down
13 changes: 2 additions & 11 deletions renku/core/management/interface/activity_gateway.py
Expand Up @@ -19,23 +19,14 @@

from abc import ABC
from pathlib import Path
from typing import Dict, List, Optional, Set, Tuple, Union
from typing import List, Optional, Set, Tuple, Union

from renku.core.models.provenance.activity import Activity, ActivityCollection, Usage
from renku.core.models.workflow.plan import AbstractPlan
from renku.core.models.provenance.activity import Activity, ActivityCollection


class IActivityGateway(ABC):
"""Interface for the ActivityGateway."""

def get_latest_activity_per_plan(self):
"""Get latest activity for each plan."""
raise NotImplementedError

def get_plans_and_usages_for_latest_activities(self) -> Dict[AbstractPlan, List[Usage]]:
"""Get all usages associated with a plan by its latest activity."""
raise NotImplementedError

def get_all_usage_paths(self) -> List[str]:
"""Return all usage paths."""
raise NotImplementedError
Expand Down
16 changes: 16 additions & 0 deletions renku/core/metadata/database.py
Expand Up @@ -41,6 +41,8 @@
TYPE_TYPE = "type"
FUNCTION_TYPE = "function"
REFERENCE_TYPE = "reference"
SET_TYPE = "set"
FROZEN_SET_TYPE = "frozenset"
MARKER = object()

"""NOTE: These are used as _p_serial to mark if an object was read from storage or is new"""
Expand Down Expand Up @@ -619,6 +621,16 @@ def _serialize_helper(self, object):
return object
elif isinstance(object, list):
return [self._serialize_helper(value) for value in object]
elif isinstance(object, set):
return {
"@renku_data_type": SET_TYPE,
"@renku_data_value": [self._serialize_helper(value) for value in object],
}
elif isinstance(object, frozenset):
return {
"@renku_data_type": FROZEN_SET_TYPE,
"@renku_data_value": [self._serialize_helper(value) for value in object],
}
elif isinstance(object, dict):
result = dict()
items = sorted(object.items(), key=lambda x: x[0])
Expand Down Expand Up @@ -747,6 +759,10 @@ def _deserialize_helper(self, data, create=True):
elif object_type == REFERENCE_TYPE:
# NOTE: we had a circular reference, we return the (not yet finalized) class here
return self._deserialization_cache[data["@renku_data_value"]]
elif object_type == SET_TYPE:
return set([self._deserialize_helper(value) for value in data["@renku_data_value"]])
elif object_type == FROZEN_SET_TYPE:
return frozenset([self._deserialize_helper(value) for value in data["@renku_data_value"]])

cls = self._get_class(object_type)

Expand Down
26 changes: 3 additions & 23 deletions renku/core/metadata/gateway/activity_gateway.py
Expand Up @@ -18,7 +18,7 @@
"""Renku activity database gateway implementation."""

from pathlib import Path
from typing import Dict, List, Optional, Set, Tuple, Union
from typing import List, Optional, Set, Tuple, Union

from persistent.list import PersistentList

Expand All @@ -27,8 +27,8 @@
from renku.core.management.interface.database_dispatcher import IDatabaseDispatcher
from renku.core.management.interface.plan_gateway import IPlanGateway
from renku.core.metadata.gateway.database_gateway import ActivityDownstreamRelation
from renku.core.models.provenance.activity import Activity, ActivityCollection, Usage
from renku.core.models.workflow.plan import AbstractPlan, Plan
from renku.core.models.provenance.activity import Activity, ActivityCollection
from renku.core.models.workflow.plan import Plan
from renku.core.utils.os import are_paths_related


Expand All @@ -37,18 +37,6 @@ class ActivityGateway(IActivityGateway):

database_dispatcher = inject.attr(IDatabaseDispatcher)

def get_latest_activity_per_plan(self) -> Dict[AbstractPlan, Activity]:
"""Get latest activity for each plan."""
plan_activities = self.database_dispatcher.current_database["latest-activity-by-plan"].values()

return {a.association.plan: a for a in plan_activities}

def get_plans_and_usages_for_latest_activities(self) -> Dict[AbstractPlan, List[Usage]]:
"""Get all usages associated with a plan by its latest activity."""
plan_activities = self.database_dispatcher.current_database["latest-activity-by-plan"].values()

return {a.association.plan: a.usages for a in plan_activities}

def get_all_usage_paths(self) -> List[str]:
"""Return all usage paths."""
database = self.database_dispatcher.current_database
Expand Down Expand Up @@ -119,12 +107,6 @@ def get_all_activities(self) -> List[Activity]:
def add(self, activity: Activity):
"""Add an ``Activity`` to storage."""

def update_latest_activity_by_plan(plan):
existing_activity = database["latest-activity-by-plan"].get(plan.id)

if not existing_activity or existing_activity.ended_at_time < activity.ended_at_time:
database["latest-activity-by-plan"].add(activity, key=plan.id, verify=False)

database = self.database_dispatcher.current_database

database["activities"].add(activity)
Expand Down Expand Up @@ -166,8 +148,6 @@ def update_latest_activity_by_plan(plan):
plan_gateway = inject.instance(IPlanGateway)
plan_gateway.add(activity.association.plan)

update_latest_activity_by_plan(activity.association.plan)

def add_activity_collection(self, activity_collection: ActivityCollection):
"""Add an ``ActivityCollection`` to storage."""
database = self.database_dispatcher.current_database
Expand Down
1 change: 0 additions & 1 deletion renku/core/metadata/gateway/database_gateway.py
Expand Up @@ -90,7 +90,6 @@ def load_downstream_relations(token, catalog, cache, database_dispatcher: IDatab
def initialize_database(database):
"""Initialize an empty database with all required metadata."""
database.add_index(name="activities", object_type=Activity, attribute="id")
database.add_index(name="latest-activity-by-plan", object_type=Activity, attribute="association.plan.id")
database.add_root_object(name="activities-by-usage", obj=BTrees.OOBTree.OOBTree())
database.add_root_object(name="activities-by-generation", obj=BTrees.OOBTree.OOBTree())

Expand Down
4 changes: 3 additions & 1 deletion renku/core/utils/metadata.py
Expand Up @@ -87,10 +87,12 @@ def get_modified_activities(
modified = set()
deleted = set()

checksum_cache = {}

for activity in activities:
for usage in activity.usages:
entity = usage.entity
current_checksum = repository.get_object_hash(path=entity.path)
current_checksum = checksum_cache.setdefault(entity.path, repository.get_object_hash(path=entity.path))
if current_checksum is None:
deleted.add((activity, entity))
elif current_checksum != entity.checksum:
Expand Down
2 changes: 1 addition & 1 deletion tests/cli/fixtures/cli_runner.py
Expand Up @@ -38,7 +38,7 @@ def renku_cli(client, run, client_database_injection_manager):
def renku_cli_(*args, **kwargs) -> Tuple[int, Union[None, Activity, List[Activity]]]:
@inject.autoparams()
def _get_activities(activity_gateway: IActivityGateway):
return {a.id: a for a in activity_gateway.get_latest_activity_per_plan().values()}
return {a.id: a for a in activity_gateway.get_all_activities()}

with client_database_injection_manager(client):
activities_before = _get_activities()
Expand Down
59 changes: 58 additions & 1 deletion tests/cli/test_update.py
Expand Up @@ -199,7 +199,9 @@ def test_update_workflow_without_outputs(runner, project, run):

write_and_commit_file(repo, source, "content")

assert 0 == runner.invoke(cli, ["run", "cat", "--no-output", source]).exit_code
result = runner.invoke(cli, ["run", "cat", "--no-output", source])

assert 0 == result.exit_code, format_result_exception(result)

write_and_commit_file(repo, source, "changes")

Expand Down Expand Up @@ -415,3 +417,58 @@ def test_update_multiple_paths_common_output(project, renku_cli, runner):
assert "r2" not in result.output
assert "r3" in result.output
assert "r4" in result.output


def test_update_with_execute(runner, client, renku_cli, client_database_injection_manager):
"""Test output is updated when source changes."""
source1 = Path("source.txt")
output1 = Path("output.txt")
source2 = Path("source2.txt")
output2 = Path("output2.txt")
script = Path("cp.sh")

write_and_commit_file(client.repository, source1, "content_a")
write_and_commit_file(client.repository, source2, "content_b")
write_and_commit_file(client.repository, script, "cp $1 $2")

result = runner.invoke(cli, ["run", "--name", "test", "bash", str(script), str(source1), str(output1)])
assert 0 == result.exit_code, format_result_exception(result)

assert (
0
== renku_cli(
"workflow", "execute", "--set", f"input-2={source2}", "--set", f"output-3={output2}", "test"
).exit_code
)

assert "content_a" == (client.path / output1).read_text()
assert "content_b" == (client.path / output2).read_text()

result = runner.invoke(cli, ["status"])
assert 0 == result.exit_code, format_result_exception(result)

write_and_commit_file(client.repository, script, "cp $1 $2\necho 'modified' >> $2")

result = runner.invoke(cli, ["status"])
assert 1 == result.exit_code

assert 0 == renku_cli("update", "--all").exit_code

result = runner.invoke(cli, ["status"])
assert 0 == result.exit_code

assert "content_amodified\n" == (client.path / output1).read_text()
assert "content_bmodified\n" == (client.path / output2).read_text()

write_and_commit_file(client.repository, script, "cp $1 $2\necho 'even more modified' >> $2")

result = runner.invoke(cli, ["status"])
assert 1 == result.exit_code

assert 0 == renku_cli("update", "--all").exit_code

result = runner.invoke(cli, ["status"])
assert 0 == result.exit_code

assert "content_aeven more modified\n" == (client.path / output1).read_text()
assert "content_beven more modified\n" == (client.path / output2).read_text()
78 changes: 0 additions & 78 deletions tests/core/metadata/test_activity_gateway.py
Expand Up @@ -17,90 +17,12 @@
# limitations under the License.
"""Test activity database gateways."""

from datetime import datetime, timedelta

from renku.core.metadata.gateway.activity_gateway import ActivityGateway
from renku.core.models.provenance.activity import Activity, Association
from renku.core.models.workflow.plan import Plan
from tests.utils import create_dummy_activity


def test_activity_gateway_get_latest_activity(dummy_database_injection_manager):
"""Test getting latest activity for a plan."""

plan = Plan(id=Plan.generate_id(), name="plan")
plan2 = Plan(id=Plan.generate_id(), name="plan2")

activity1_id = Activity.generate_id()
activity1 = Activity(
id=activity1_id,
ended_at_time=datetime.utcnow() - timedelta(hours=1),
association=Association(id=Association.generate_id(activity1_id), plan=plan),
)

activity2_id = Activity.generate_id()
activity2 = Activity(
id=activity2_id,
ended_at_time=datetime.utcnow(),
association=Association(id=Association.generate_id(activity2_id), plan=plan),
)

activity3_id = Activity.generate_id()
activity3 = Activity(
id=activity3_id,
ended_at_time=datetime.utcnow(),
association=Association(id=Association.generate_id(activity3_id), plan=plan2),
)

with dummy_database_injection_manager(None):
activity_gateway = ActivityGateway()

activity_gateway.add(activity1)

latest_activities = activity_gateway.get_latest_activity_per_plan()
assert len(latest_activities) == 1
assert {activity1_id} == {a.id for a in latest_activities.values()}
assert {plan.id} == {p.id for p in latest_activities.keys()}

activity_gateway.add(activity3)

latest_activities = activity_gateway.get_latest_activity_per_plan()
assert len(latest_activities) == 2
assert {activity1_id, activity3_id} == {a.id for a in latest_activities.values()}
assert {plan.id, plan2.id} == {p.id for p in latest_activities.keys()}

activity_gateway.add(activity2)

latest_activities = activity_gateway.get_latest_activity_per_plan()
assert len(latest_activities) == 2
assert {activity2_id, activity3_id} == {a.id for a in latest_activities.values()}
assert {plan.id, plan2.id} == {p.id for p in latest_activities.keys()}


def test_activity_gateway_get_latest_plan_usages(dummy_database_injection_manager):
"""Test getting latest activity for a plan."""

plan = Plan(id=Plan.generate_id(), name="plan")
plan2 = Plan(id=Plan.generate_id(), name="plan2")

activity1 = create_dummy_activity(plan=plan, usages=["in1"])
activity2 = create_dummy_activity(plan=plan, usages=["in1"])
activity3 = create_dummy_activity(plan=plan2, usages=["in2"])

with dummy_database_injection_manager(None):
activity_gateway = ActivityGateway()

activity_gateway.add(activity1)
activity_gateway.add(activity2)
activity_gateway.add(activity3)

latest_usages = activity_gateway.get_plans_and_usages_for_latest_activities()

assert len(latest_usages) == 2
assert latest_usages[plan] == activity2.usages
assert latest_usages[plan2] == activity3.usages


def test_activity_gateway_downstream_activities(dummy_database_injection_manager):
"""test getting downstream activities work."""
plan = Plan(id=Plan.generate_id(), name="plan")
Expand Down

0 comments on commit c7e2d66

Please sign in to comment.