From c61a5ab7410cf4135d773d667fdc9016c5ead6f1 Mon Sep 17 00:00:00 2001 From: Mohammad Alisafaee Date: Wed, 15 Sep 2021 09:37:27 +0200 Subject: [PATCH] feat(core): add renku rerun command (#2319) --- renku/cli/rerun.py | 78 +------ renku/core/commands/options.py | 52 ----- renku/core/commands/rerun.py | 91 ++++++-- renku/core/commands/update.py | 23 +- .../management/interface/activity_gateway.py | 12 +- .../core/metadata/gateway/activity_gateway.py | 27 ++- .../core/metadata/gateway/database_gateway.py | 72 +++---- renku/core/models/provenance/activity.py | 6 + renku/core/utils/metadata.py | 16 ++ tests/cli/fixtures/cli_runner.py | 7 +- tests/cli/test_rerun.py | 196 ++++++------------ tests/core/fixtures/core_database.py | 43 +--- 12 files changed, 244 insertions(+), 379 deletions(-) diff --git a/renku/cli/rerun.py b/renku/cli/rerun.py index 3f0ced43dc..700dfa103b 100644 --- a/renku/cli/rerun.py +++ b/renku/cli/rerun.py @@ -36,92 +36,30 @@ $ renku rerun C -If you would like to recreate a file which was one of several produced by -a tool, then these files must be recreated as well. See the explanation in -:ref:`updating siblings `. +Note that all other outputs of the executed workflow will be recreated as well. """ -import os -import sys -from pathlib import Path - import click from renku.cli.utils.callback import ClickCallback from renku.core import errors -from renku.core.commands.options import option_siblings -from renku.core.commands.rerun import rerun_workflows -from renku.core.management.command_builder import inject -from renku.core.management.interface.client_dispatcher import IClientDispatcher - - -def show_inputs(workflow): - """Show workflow inputs and exit.""" - for input_ in workflow.inputs: - click.echo("{id}: {default}".format(id=input_._id, default=input_.consumes.path)) - sys.exit(0) - - -@inject.autoparams() -def edit_inputs(workflow, client_dispatcher: IClientDispatcher): - """Edit workflow inputs.""" - client = client_dispatcher.current_client - for input_ in workflow.inputs: - new_path = click.prompt("{0._id}".format(input_), default=input_.consumes.path) - input_.consumes.path = str(Path(os.path.abspath(new_path)).relative_to(client.path)) - - try: - input_.consumes.commit = client.find_previous_commit(input_.consumes.path) - except KeyError: - raise errors.DirtyRepository(f"Please commit {input_.consumes.path} before using it as an input.") - - input_.consumes._id = input_.consumes.default_id() - input_.consumes._label = input_.consumes.default_label() - - for step in workflow.subprocesses: - for parameter in step.process.parameters: - parameter.default_value = click.prompt("{0._id}".format(parameter), default=parameter.default_value) - - return workflow +from renku.core.commands.rerun import rerun_command @click.command() -@click.option("--revision", default="HEAD") @click.option( "--from", - "roots", + "sources", type=click.Path(exists=True, dir_okay=False), multiple=True, help="Start an execution from this file.", ) -@option_siblings -@click.option( - "--default-inputs", - "inputs", - default=True, - flag_value=lambda workflow: workflow, - help="Use default inputs.", - type=click.types.UnprocessedParamType(), -) -@click.option( - "--show-inputs", - "inputs", - flag_value=show_inputs, - help=show_inputs.__doc__, - type=click.types.UnprocessedParamType(), -) -@click.option( - "--edit-inputs", - "inputs", - flag_value=edit_inputs, - help=edit_inputs.__doc__, - type=click.types.UnprocessedParamType(), -) @click.argument("paths", type=click.Path(exists=True, dir_okay=True), nargs=-1, required=True) -def rerun(revision, roots, siblings, inputs, paths): +def rerun(sources, paths): """Recreate files generated by a sequence of ``run`` commands.""" communicator = ClickCallback() - rerun_workflows().with_communicator(communicator).build().execute( - revision=revision, roots=roots, siblings=siblings, inputs=inputs, paths=paths - ) + try: + rerun_command().with_communicator(communicator).build().execute(sources=sources, paths=paths) + except errors.NothingToExecuteError: + exit(1) diff --git a/renku/core/commands/options.py b/renku/core/commands/options.py index bbe291bc97..39bdcac2b7 100644 --- a/renku/core/commands/options.py +++ b/renku/core/commands/options.py @@ -19,8 +19,6 @@ import click -from renku.core.errors import RenkuException - from .git import set_git_isolation @@ -45,56 +43,6 @@ def install_completion(ctx, attr, value): # pragma: no cover ) -def check_siblings(graph, outputs): - """Check that all outputs have their siblings listed.""" - siblings = set() - for node in outputs: - siblings |= graph.siblings(node) - - siblings = {node.path for node in siblings} - missing = siblings - {node.path for node in outputs} - missing = {m for m in missing if all(not m.startswith(node.path) for node in outputs)} - - if missing: - msg = "Include the files above in the command " "or use the --with-siblings option." - raise RenkuException( - "There are missing output siblings:\n\n" - "\t{0}\n\n{1}".format("\n\t".join(click.style(path, fg="red") for path in missing), msg) - ) - return outputs - - -def with_siblings(graph, outputs): - """Include all missing siblings.""" - siblings = set() - for node in outputs: - siblings |= graph.siblings(node) - return siblings - - -option_check_siblings = click.option( - "--check-siblings", - "siblings", - flag_value=check_siblings, - default=True, - help=check_siblings.__doc__, - type=click.types.UnprocessedParamType(), -) -option_with_siblings = click.option( - "--with-siblings", - "siblings", - flag_value=with_siblings, - default=True, - help=with_siblings.__doc__, - type=click.types.UnprocessedParamType(), -) - - -def option_siblings(func): - """Combine siblings options.""" - return option_check_siblings(option_with_siblings(func)) - - option_external_storage_requested = click.option( "external_storage_requested", "--external-storage/--no-external-storage", diff --git a/renku/core/commands/rerun.py b/renku/core/commands/rerun.py index 4da0ed9626..0147bd31ff 100644 --- a/renku/core/commands/rerun.py +++ b/renku/core/commands/rerun.py @@ -15,17 +15,27 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -"""Renku rerun command.""" +"""Renku ``rerun`` command.""" +from collections import defaultdict +from typing import List, Set + +from renku.core import errors +from renku.core.commands.update import execute_workflow from renku.core.management.command_builder.command import Command, inject +from renku.core.management.interface.activity_gateway import IActivityGateway from renku.core.management.interface.client_dispatcher import IClientDispatcher +from renku.core.models.provenance.activity import Activity +from renku.core.utils import communication +from renku.core.utils.metadata import add_activity_if_recent +from renku.core.utils.os import get_relative_paths -def rerun_workflows(): +def rerun_command(): """Recreate files generated by a sequence of ``run`` commands.""" return ( Command() - .command(_rerun_workflows) + .command(_rerun) .require_migration() .require_clean() .require_nodejs() @@ -35,23 +45,58 @@ def rerun_workflows(): @inject.autoparams() -def _rerun_workflows(revision, roots, siblings, inputs, paths, client_dispatcher: IClientDispatcher): - pass - # TODO: implement with new database - # graph = Graph(client) - # outputs = graph.build(paths=paths, revision=revision) - - # # Check or extend siblings of outputs. - # outputs = siblings(graph, outputs) - # output_paths = {node.path for node in outputs} - - # # Normalize and check all starting paths. - # roots = {graph.normalize_path(root) for root in roots} - # output_paths -= roots - # outputs = [o for o in outputs if o.path not in roots] - - # # Generate workflow and check inputs. - # # NOTE The workflow creation is done before opening a new file. - # workflow = inputs(graph.as_workflow(input_paths=roots, output_paths=output_paths, outputs=outputs)) - - # execute_workflow(workflow=workflow, output_paths=output_paths, command_name="rerun", update_commits=False) +def _rerun( + sources: List[str], paths: List[str], client_dispatcher: IClientDispatcher, activity_gateway: IActivityGateway +): + client = client_dispatcher.current_client + + sources = sources or [] + sources = get_relative_paths(base=client.path, paths=sources) + paths = paths or [] + paths = get_relative_paths(base=client.path, paths=paths) + + activities = _get_activities(paths, sources, activity_gateway) + + if len(activities) == 0: + raise errors.NothingToExecuteError() + + plans = [a.plan_with_values for a in activities] + + execute_workflow(plans=plans, command_name="rerun") + + +def _get_activities(paths: List[str], sources: List[str], activity_gateway: IActivityGateway) -> Set[Activity]: + all_activities = defaultdict(set) + + def include_newest_activity(activity): + existing_activities = all_activities[activity.association.plan.id] + add_activity_if_recent(activity=activity, activities=existing_activities) + + for path in paths: + activities = activity_gateway.get_activities_by_generation(path) + + if len(activities) == 0: + communication.warn(f"Path '{path}' is not generated by any workflows.") + continue + + latest_activity = max(activities, key=lambda a: a.ended_at_time) + + upstream_chains = activity_gateway.get_upstream_activity_chains(latest_activity) + + if sources: + # NOTE: Add the activity to check if it also matches the condition + upstream_chains.append((latest_activity,)) + # NOTE: Only include paths that is using at least one of the sources + upstream_chains = [c for c in upstream_chains if any(u.entity.path in sources for u in c[0].usages)] + + # NOTE: Include activity only if any of its upstream match the condition + if upstream_chains: + include_newest_activity(latest_activity) + else: + include_newest_activity(latest_activity) + + for chain in upstream_chains: + for activity in chain: + include_newest_activity(activity) + + return {a for activities in all_activities.values() for a in activities} diff --git a/renku/core/commands/update.py b/renku/core/commands/update.py index 82eb23241b..ab6d732153 100644 --- a/renku/core/commands/update.py +++ b/renku/core/commands/update.py @@ -37,7 +37,7 @@ from renku.core.models.workflow.plan import Plan from renku.core.utils.datetime8601 import local_now from renku.core.utils.git import add_to_git -from renku.core.utils.metadata import get_modified_activities +from renku.core.utils.metadata import add_activity_if_recent, get_modified_activities from renku.core.utils.os import get_relative_paths from renku.version import __version__, version_url @@ -92,28 +92,9 @@ def _get_downstream_activities( """Return an ordered list of activities so that an activities comes before all its downstream activities.""" all_activities = defaultdict(set) - def have_identical_inputs_and_outputs(activity1, activity2): - return sorted(u.entity.path for u in activity1.usages) == sorted( - u.entity.path for u in activity2.usages - ) and sorted(g.entity.path for g in activity1.generations) == sorted( - g.entity.path for g in activity2.generations - ) - def include_newest_activity(activity): existing_activities = all_activities[activity.association.plan.id] - - if activity in existing_activities: - return - - for existing_activity in existing_activities: - if have_identical_inputs_and_outputs(activity, existing_activity): - if activity.ended_at_time > existing_activity.ended_at_time: # activity is newer - existing_activities.remove(existing_activity) - existing_activities.add(activity) - return - - # No similar activity was found - existing_activities.add(activity) + add_activity_if_recent(activity=activity, activities=existing_activities) def does_activity_generate_any_paths(activity): is_same = any(g.entity.path in paths for g in activity.generations) diff --git a/renku/core/management/interface/activity_gateway.py b/renku/core/management/interface/activity_gateway.py index 4c26fa367e..902dcbaad6 100644 --- a/renku/core/management/interface/activity_gateway.py +++ b/renku/core/management/interface/activity_gateway.py @@ -18,7 +18,8 @@ """Renku activity gateway interface.""" from abc import ABC -from typing import Dict, List, Set, Tuple +from pathlib import Path +from typing import Dict, List, Set, Tuple, Union from renku.core.models.provenance.activity import Activity, ActivityCollection, Usage from renku.core.models.workflow.plan import AbstractPlan @@ -43,12 +44,21 @@ def get_all_generation_paths(self) -> List[str]: """Return all generation paths.""" raise NotImplementedError + def get_activities_by_generation(self, path: Union[Path, str]) -> List[Activity]: + """Return the list of all activities that generate a path.""" + raise NotImplementedError + def get_downstream_activities(self, activity: Activity, max_depth=None) -> Set[Activity]: """Get downstream activities that depend on this activity.""" raise NotImplementedError def get_downstream_activity_chains(self, activity: Activity) -> List[Tuple[Activity, ...]]: """Get a list of tuples of all downstream paths of this activity.""" + raise NotImplementedError + + def get_upstream_activity_chains(self, activity: Activity) -> List[Tuple[Activity, ...]]: + """Get a list of tuples of all upstream paths of this activity.""" + raise NotImplementedError def get_all_activities(self) -> List[Activity]: """Get all activities in the project.""" diff --git a/renku/core/metadata/gateway/activity_gateway.py b/renku/core/metadata/gateway/activity_gateway.py index 67cd64edab..9e997365d4 100644 --- a/renku/core/metadata/gateway/activity_gateway.py +++ b/renku/core/metadata/gateway/activity_gateway.py @@ -19,7 +19,7 @@ import os from pathlib import Path -from typing import Dict, List, Set, Tuple +from typing import Dict, List, Set, Tuple, Union from persistent.list import PersistentList @@ -62,6 +62,12 @@ def get_all_generation_paths(self) -> List[str]: return list(database["activities-by-generation"].keys()) + def get_activities_by_generation(self, path: Union[Path, str]) -> List[Activity]: + """Return the list of all activities that generate a path.""" + by_generation = self.database_dispatcher.current_database["activities-by-generation"] + + return by_generation.get(str(path), []) + def get_downstream_activities(self, activity: Activity, max_depth=None) -> Set[Activity]: """Get downstream activities that depend on this activity.""" # NOTE: since indices are populated one way when adding an activity, we need to query two indices @@ -80,10 +86,21 @@ def get_downstream_activity_chains(self, activity: Activity) -> List[Tuple[Activ activity_catalog = database["activity-catalog"] tok = activity_catalog.tokenizeQuery downstream_chains = activity_catalog.findRelationChains(tok(upstream=activity)) - downstream_chains = [tuple(adr.downstream[0] for adr in d) for d in downstream_chains] + downstream_chains = [tuple(r.downstream for r in c) for c in downstream_chains] return downstream_chains + def get_upstream_activity_chains(self, activity: Activity) -> List[Tuple[Activity, ...]]: + """Get a list of tuples of all upstream paths of this activity.""" + database = self.database_dispatcher.current_database + + activity_catalog = database["activity-catalog"] + tok = activity_catalog.tokenizeQuery + upstream_chains = activity_catalog.findRelationChains(tok(downstream=activity)) + upstream_chains = [tuple(r.upstream for r in c) for c in upstream_chains] + + return upstream_chains + def get_all_activities(self) -> List[Activity]: """Get all activities in the project.""" return list(self.database_dispatcher.current_database["activities"].values()) @@ -138,10 +155,12 @@ def update_latest_activity_by_plan(plan): downstreams.extend(by_usage[generation.entity.path]) if upstreams: - database["activity-catalog"].index(ActivityDownstreamRelation(downstream=[activity], upstream=upstreams)) + for s in upstreams: + database["activity-catalog"].index(ActivityDownstreamRelation(downstream=activity, upstream=s)) if downstreams: - database["activity-catalog"].index(ActivityDownstreamRelation(downstream=downstreams, upstream=[activity])) + for s in downstreams: + database["activity-catalog"].index(ActivityDownstreamRelation(downstream=s, upstream=activity)) assert isinstance(activity.association.plan, Plan) diff --git a/renku/core/metadata/gateway/database_gateway.py b/renku/core/metadata/gateway/database_gateway.py index 00b68957b0..f21dce46c8 100644 --- a/renku/core/metadata/gateway/database_gateway.py +++ b/renku/core/metadata/gateway/database_gateway.py @@ -88,6 +88,38 @@ def load_downstream_relations(token, catalog, cache, database_dispatcher: IDatab return btree[token] +def initialize_database(database): + """Initialize an empty database with all required metadata.""" + database.add_index(name="activities", object_type=Activity, attribute="id") + database.add_index(name="latest-activity-by-plan", object_type=Activity, attribute="association.plan.id") + database.add_root_object(name="activities-by-usage", obj=BTrees.OOBTree.OOBTree()) + database.add_root_object(name="activities-by-generation", obj=BTrees.OOBTree.OOBTree()) + + database.add_index(name="activity-collections", object_type=ActivityCollection, attribute="id") + + database.add_root_object(name="_downstream_relations", obj=BTrees.OOBTree.OOBTree()) + + activity_catalog = Catalog(dump_downstream_relations, load_downstream_relations, btree=BTrees.family32.OO) + activity_catalog.addValueIndex( + IActivityDownstreamRelation["downstream"], dump_activity, load_activity, btree=BTrees.family32.OO + ) + activity_catalog.addValueIndex( + IActivityDownstreamRelation["upstream"], dump_activity, load_activity, btree=BTrees.family32.OO + ) + # NOTE: Transitive query factory is needed for transitive (follow more than 1 edge) queries + downstream_transitive_factory = TransposingTransitive("downstream", "upstream") + activity_catalog.addDefaultQueryFactory(downstream_transitive_factory) + + database.add_root_object(name="activity-catalog", obj=activity_catalog) + + database.add_index(name="plans", object_type=AbstractPlan, attribute="id") + database.add_index(name="plans-by-name", object_type=AbstractPlan, attribute="name") + + database.add_index(name="datasets", object_type=Dataset, attribute="name") + database.add_index(name="datasets-provenance-tails", object_type=Dataset, attribute="id") + database.add_index(name="datasets-tags", object_type=PersistentList) + + class DatabaseGateway(IDatabaseGateway): """Gateway for base database operations.""" @@ -97,46 +129,8 @@ def initialize(self) -> None: """Initialize the database.""" database = self.database_dispatcher.current_database - # NOTE: Transitive query factory is needed for transitive (follow more than 1 edge) queries - downstream_transitive_factory = TransposingTransitive("downstream", "upstream") - database.clear() - - database.add_index(name="activities", object_type=Activity, attribute="id") - database.add_index(name="latest-activity-by-plan", object_type=Activity, attribute="association.plan.id") - database.add_root_object(name="activities-by-usage", obj=BTrees.OOBTree.OOBTree()) - database.add_root_object(name="activities-by-generation", obj=BTrees.OOBTree.OOBTree()) - - database.add_index(name="activity-collections", object_type=ActivityCollection, attribute="id") - - database.add_root_object(name="_downstream_relations", obj=BTrees.OOBTree.OOBTree()) - - activity_catalog = Catalog(dump_downstream_relations, load_downstream_relations, btree=BTrees.family32.OO) - activity_catalog.addValueIndex( - IActivityDownstreamRelation["downstream"], - dump_activity, - load_activity, - btree=BTrees.family32.OO, - multiple=True, - ) - activity_catalog.addValueIndex( - IActivityDownstreamRelation["upstream"], - dump_activity, - load_activity, - btree=BTrees.family32.OO, - multiple=True, - ) - activity_catalog.addDefaultQueryFactory(downstream_transitive_factory) - - database.add_root_object(name="activity-catalog", obj=activity_catalog) - - database.add_index(name="plans", object_type=AbstractPlan, attribute="id") - database.add_index(name="plans-by-name", object_type=AbstractPlan, attribute="name") - - database.add_index(name="datasets", object_type=Dataset, attribute="name") - database.add_index(name="datasets-provenance-tails", object_type=Dataset, attribute="id") - database.add_index(name="datasets-tags", object_type=PersistentList) - + initialize_database(database) database.commit() def commit(self) -> None: diff --git a/renku/core/models/provenance/activity.py b/renku/core/models/provenance/activity.py index 4e3de755b3..cd39b5e4c1 100644 --- a/renku/core/models/provenance/activity.py +++ b/renku/core/models/provenance/activity.py @@ -228,6 +228,12 @@ def generate_id() -> str: # TODO: make id generation idempotent return f"/activities/{uuid4().hex}" + def has_identical_inputs_and_outputs_as(self, other: "Activity"): + """Return true if all input and outputs paths are identical regardless of the order.""" + return sorted(u.entity.path for u in self.usages) == sorted(u.entity.path for u in other.usages) and sorted( + g.entity.path for g in self.generations + ) == sorted(g.entity.path for g in other.generations) + class ActivityCollection(Persistent): """Represent a list of activities.""" diff --git a/renku/core/utils/metadata.py b/renku/core/utils/metadata.py index f5b7003f43..2b4d5f3c83 100644 --- a/renku/core/utils/metadata.py +++ b/renku/core/utils/metadata.py @@ -94,3 +94,19 @@ def get_modified_activities( modified.add((activity, entity)) return modified, deleted + + +def add_activity_if_recent(activity: Activity, activities: Set[Activity]): + """Add ``activity`` to ``activities`` if it's not in the set or is the latest executed instance.""" + if activity in activities: + return + + for existing_activity in activities: + if activity.has_identical_inputs_and_outputs_as(existing_activity): + if activity.ended_at_time > existing_activity.ended_at_time: # activity is newer + activities.remove(existing_activity) + activities.add(activity) + return + + # NOTE: No similar activity was found + activities.add(activity) diff --git a/tests/cli/fixtures/cli_runner.py b/tests/cli/fixtures/cli_runner.py index 196e24f41c..12bd83caea 100644 --- a/tests/cli/fixtures/cli_runner.py +++ b/tests/cli/fixtures/cli_runner.py @@ -17,10 +17,13 @@ # limitations under the License. """Renku CLI fixtures for execution management.""" +from collections import namedtuple from typing import List, Tuple, Union import pytest +Result = namedtuple("Result", "exit_code, activities") + @pytest.fixture def renku_cli(client, run, client_database_injection_manager): @@ -40,6 +43,8 @@ def _get_activities(activity_gateway: IActivityGateway): with client_database_injection_manager(client): activities_before = _get_activities() + args = [str(a) for a in args] + exit_code = run(args, **kwargs) with client_database_injection_manager(client): @@ -52,6 +57,6 @@ def _get_activities(activity_gateway: IActivityGateway): elif len(new_activities) == 1: new_activities = new_activities[0] - return exit_code, new_activities + return Result(exit_code, new_activities) return renku_cli_ diff --git a/tests/cli/test_rerun.py b/tests/cli/test_rerun.py index d05decf90b..04e592bc91 100644 --- a/tests/cli/test_rerun.py +++ b/tests/cli/test_rerun.py @@ -17,8 +17,6 @@ # limitations under the License. """Test ``rerun`` command.""" -from __future__ import absolute_import, print_function - import os import subprocess from pathlib import Path @@ -28,15 +26,36 @@ from click.testing import CliRunner from renku.cli import cli -from tests.utils import format_result_exception +from tests.utils import format_result_exception, write_and_commit_file + + +def test_rerun(project, renku_cli): + """Test rerun.""" + output = Path(project) / "output.txt" + + cmd = ["run", "python", "-S", "-c", "import random; print(random.random())"] + + assert 0 == renku_cli(*cmd, stdout=output).exit_code + + content = output.read_text().strip() + + def rerun(): + assert 0 == renku_cli("rerun", output).exit_code + return output.read_text().strip() + + for _ in range(10): + new_content = rerun() + if content != new_content: + break + + assert content != new_content, "Something is not random" -@pytest.mark.skip(reason="renku rerun not implemented with new metadata yet, reenable later") @pytest.mark.parametrize( - "source,selected", + "source, output", [ - ("coffee-orders-โ˜•-by-locationtest.csv", "๐Ÿ˜works.txt"), - ("source.txt", "selected.txt"), + ("input with space.txt", "output .txt"), + ("coffee-orders-โ˜•-by-location.csv", "๐Ÿ˜works.txt"), ("test-ๆ„›", "ๆˆๅŠŸ"), ("๊ทธ๋ž˜ํ”„", "์„ฑ๊ณต"), ("ูŠุญุงูˆู„", "ู†ุฌุงุญ.txt"), @@ -44,134 +63,63 @@ ("๐’ƒ.c", "๐’.txt"), ], ) -def test_simple_rerun(runner, project, run, no_lfs_warning, source, selected): - """Test simple file recreation.""" - greetings = {"hello", "hola", "ahoj"} - +def test_rerun_with_special_paths(project, renku_cli, source, output): + """Test rerun with unicode/whitespace filenames.""" cwd = Path(project) source = cwd / source - selected = cwd / selected + output = cwd / output - repo = git.Repo(project) + assert 0 == renku_cli("run", "python", "-S", "-c", "import random; print(random.random())", stdout=source).exit_code + assert 0 == renku_cli("run", "cat", source, stdout=output).exit_code - with source.open("w") as f: - f.write("\n".join(greetings)) + content = output.read_text().strip() - repo.git.add("--all") - repo.index.commit("Created greetings") - - cmd = ["run", "python", "-S", "-c", "import sys, random; print(random.choice(sys.stdin.readlines()))"] - - assert 0 == run(cmd, stdin=source, stdout=selected) - - with selected.open("r") as f: - greeting = f.read().strip() - assert greeting in greetings - - def _rerun(): - """Return greeting after reruning.""" - result = run(args=("rerun", str(selected))) - assert 0 == result - with selected.open("r") as fp: - greeting = fp.read().strip() - assert greeting in greetings - return greeting - - for _ in range(100): - new_greeting = _rerun() - if greeting != new_greeting: - break - - assert greeting != new_greeting, "Something is not random" + def rerun(): + assert 0 == renku_cli("rerun", output).exit_code + return output.read_text().strip() - for _ in range(100): - new_greeting = _rerun() - if greeting == new_greeting: + for _ in range(10): + new_content = rerun() + if content != new_content: break - assert greeting == new_greeting, "Something is not random" + assert content != output.read_text().strip(), "The output should have changed." -@pytest.mark.skip(reason="renku rerun not implemented with new metadata yet, reenable later") -def test_rerun_with_inputs(runner, project, run): - """Test file recreation with specified inputs.""" - cwd = Path(project) - first = cwd / "first.txt" - second = cwd / "second.txt" - inputs = (first, second) - - output = cwd / "output.txt" - - cmd = ["run", "python", "-S", "-c", "import random; print(random.random())"] - - for file_ in inputs: - assert 0 == run(args=cmd, stdout=file_), "Random number generation." - - cmd = ["run", "cat"] + [str(path) for path in inputs] - assert 0 == run(args=cmd, stdout=output) - - with output.open("r") as f: - initial_data = f.read() - - assert 0 == run(args=("rerun", str(output))) - - with output.open("r") as f: - assert f.read() != initial_data, "The output should have changed." - - -@pytest.mark.skip(reason="renku rerun not implemented with new metadata yet, reenable later") -def test_rerun_with_inputs_with_spaces(runner, project, run): +@pytest.mark.parametrize("source, content", [("input1", "input1 new-input2 old"), ("input2", "input1 old-input2 new")]) +def test_rerun_with_from(project, renku_cli, source, content): """Test file recreation with specified inputs.""" + repo = git.Repo(project) cwd = Path(project) - input_ = cwd / "foo bar.txt" - - output = cwd / "output.txt" - - cmd = ["run", "python", "-S", "-c", "import random; print(random.random())"] + input1 = cwd / "input1" + input2 = cwd / "input2" + intermediate1 = cwd / "intermediate1" + intermediate2 = cwd / "intermediate2" + output = cwd / "output" - assert 0 == run(args=cmd, stdout=input_), "Random number generation." + write_and_commit_file(repo, input1, "input1 old-") + write_and_commit_file(repo, input2, "input2 old") - cmd = ["run", "cat"] + [str(input_)] - assert 0 == run(args=cmd, stdout=output) + assert 0 == renku_cli("run", "cp", input1, intermediate1).exit_code + assert 0 == renku_cli("run", "cp", input2, intermediate2).exit_code - with output.open("r") as f: - initial_data = f.read() + assert 0 == renku_cli("run", "cat", intermediate1, intermediate2, stdout=output).exit_code - assert 0 == run(args=("rerun", str(output))) + # Update both inputs + write_and_commit_file(repo, input1, "input1 new-") + write_and_commit_file(repo, input2, "input2 new") - with output.open("r") as f: - assert f.read() != initial_data, "The output should have changed." + commit_sha_before = repo.head.object.hexsha + assert 0 == renku_cli("rerun", "--from", source, output).exit_code -@pytest.mark.skip(reason="renku rerun not implemented with new metadata yet, reenable later") -def test_rerun_with_inputs_with_from(runner, project, run): - """Test file recreation with specified inputs.""" - cwd = Path(project) - first = cwd / "first.txt" - second = cwd / "second.txt" - inputs = (first, second) - - output = cwd / "output.txt" - - cmd = ["run", "python", "-S", "-c", "import random; print(random.random())"] - - for file_ in inputs: - assert 0 == run(args=cmd, stdout=file_), "Random number generation." + assert content == output.read_text() - cmd = ["run", "cat"] + [str(path) for path in inputs] - assert 0 == run(args=cmd, stdout=output) + commit_sha_after = repo.head.object.hexsha + assert commit_sha_before != commit_sha_after - # Keep the first file unchanged. - with first.open("r") as f: - first_data = f.read() - assert 0 == run(args=("rerun", "--from", str(first), str(output))) - - with output.open("r") as f: - assert f.read().startswith(first_data) - - -@pytest.mark.skip(reason="renku rerun not implemented with new metadata yet, reenable later") +@pytest.mark.skip(reason="renku rerun not implemented with --edit-inputs yet, reenable later") def test_rerun_with_edited_inputs(project, run, no_lfs_warning): """Test input modification.""" runner = CliRunner(mix_stderr=False) @@ -222,23 +170,18 @@ def test_rerun_with_edited_inputs(project, run, no_lfs_warning): assert third_fp.read() == second_fp.read() -@pytest.mark.skip(reason="renku rerun not implemented with new metadata yet, reenable later") -@pytest.mark.parametrize("cmd, exit_code", (("update", 0), ("rerun", 1))) -def test_input_update_and_rerun(cmd, exit_code, runner, project, run): - """Test update and rerun of an input.""" +def test_rerun_with_no_execution(project, runner): + """Test rerun when no workflow is executed.""" repo = git.Repo(project) - cwd = Path(project) - input_ = cwd / "input.txt" - with input_.open("w") as f: - f.write("first") + input = os.path.join(project, "data", "input.txt") + write_and_commit_file(repo, input, "content") - repo.git.add("--all") - repo.index.commit("Created input.txt") + result = runner.invoke(cli, ["rerun", input], catch_exceptions=False) - assert exit_code == run(args=(cmd, input_.name)) + assert 1 == result.exit_code + assert "Path 'data/input.txt' is not generated by any workflows." in result.output -@pytest.mark.skip(reason="renku rerun not implemented with new metadata yet, reenable later") def test_output_directory(runner, project, run, no_lfs_size_limit): """Test detection of output directory.""" cwd = Path(project) @@ -279,11 +222,10 @@ def test_output_directory(runner, project, run, no_lfs_size_limit): assert 0 == run(args=("rerun", str(source_wc))) assert {data.name} == {path.name for path in destination.iterdir()} - cmd = ["export", "graph"] + cmd = ["graph", "export"] result = runner.invoke(cli, cmd, catch_exceptions=False) destination_data = str(Path("destination") / "data.txt") assert destination_data in result.output, cmd - assert " directory)" in result.output cmd = ["run", "cp", "-r", str(source), str(invalid_destination)] result = runner.invoke(cli, cmd, catch_exceptions=False) diff --git a/tests/core/fixtures/core_database.py b/tests/core/fixtures/core_database.py index 983f68bc53..ed114bb637 100644 --- a/tests/core/fixtures/core_database.py +++ b/tests/core/fixtures/core_database.py @@ -22,12 +22,11 @@ from typing import Tuple import pytest -from zc.relation.queryfactory import TransposingTransitive from renku.core import errors from renku.core.management.interface.database_dispatcher import IDatabaseDispatcher from renku.core.metadata.database import Database -from renku.core.models.provenance.activity import ActivityCollection +from renku.core.metadata.gateway.database_gateway import initialize_database class DummyStorage: @@ -92,48 +91,10 @@ def finalize_dispatcher(self) -> None: @pytest.fixture def database() -> Tuple[Database, DummyStorage]: """A Database with in-memory storage.""" - import BTrees - from zc.relation.catalog import Catalog - - from renku.core.metadata.gateway.database_gateway import ( - IActivityDownstreamRelation, - dump_activity, - dump_downstream_relations, - load_activity, - load_downstream_relations, - ) - from renku.core.models.dataset import Dataset - from renku.core.models.provenance.activity import Activity - from renku.core.models.workflow.plan import AbstractPlan - storage = DummyStorage() database = Database(storage=storage) - database.add_index(name="activities", object_type=Activity, attribute="id") - database.add_index(name="latest-activity-by-plan", object_type=Activity, attribute="association.plan.id") - database.add_root_object(name="activities-by-usage", obj=BTrees.OOBTree.OOBTree()) - database.add_root_object(name="activities-by-generation", obj=BTrees.OOBTree.OOBTree()) - - database.add_index(name="activity-collections", object_type=ActivityCollection, attribute="id") - - database.add_root_object(name="_downstream_relations", obj=BTrees.OOBTree.OOBTree()) - - activity_catalog = Catalog(dump_downstream_relations, load_downstream_relations, btree=BTrees.family32.OO) - activity_catalog.addValueIndex( - IActivityDownstreamRelation["downstream"], dump_activity, load_activity, btree=BTrees.family32.OO, multiple=True - ) - activity_catalog.addValueIndex( - IActivityDownstreamRelation["upstream"], dump_activity, load_activity, btree=BTrees.family32.OO, multiple=True - ) - downstream_transitive_factory = TransposingTransitive("downstream", "upstream") - activity_catalog.addDefaultQueryFactory(downstream_transitive_factory) - database.add_root_object(name="activity-catalog", obj=activity_catalog) - - database.add_index(name="plans", object_type=AbstractPlan, attribute="id") - database.add_index(name="plans-by-name", object_type=AbstractPlan, attribute="name") - - database.add_index(name="datasets", object_type=Dataset, attribute="name") - database.add_index(name="datasets-provenance-tails", object_type=Dataset, attribute="id") + initialize_database(database) yield database, storage