From ec3bc7b19cd9e16659de73761f9ad7e437bc3095 Mon Sep 17 00:00:00 2001 From: Mohammad Alisafaee Date: Fri, 27 Aug 2021 19:22:39 +0200 Subject: [PATCH] feat(core): make update work with new storage --- renku/cli/update.py | 41 +-- renku/core/commands/status.py | 2 +- renku/core/commands/update.py | 237 ++++++++---- .../management/interface/activity_gateway.py | 2 +- .../workflow/concrete_execution_graph.py | 13 +- .../management/workflow/converters/cwl.py | 40 ++- renku/core/metadata/database.py | 16 +- .../core/metadata/gateway/activity_gateway.py | 39 +- renku/core/models/provenance/activity.py | 71 +++- renku/core/models/provenance/parameter.py | 5 + renku/core/models/workflow/composite_plan.py | 90 +++-- renku/core/models/workflow/plan.py | 4 +- renku/core/utils/git.py | 20 +- tests/cli/fixtures/cli_runner.py | 2 +- tests/cli/test_indirect.py | 18 +- tests/cli/test_output_option.py | 42 ++- tests/cli/test_update.py | 339 ++++++------------ 17 files changed, 550 insertions(+), 431 deletions(-) diff --git a/renku/cli/update.py b/renku/cli/update.py index 3cd56a985c..248cb14158 100644 --- a/renku/cli/update.py +++ b/renku/cli/update.py @@ -20,12 +20,14 @@ Recreating outdated files ~~~~~~~~~~~~~~~~~~~~~~~~~ -The information about dependencies for each file in the repository is generated -from information stored in the underlying Git repository. +The information about dependencies for each file in a Renku project is stored +in various metadata. -A minimal dependency graph is generated for each outdated file stored in the -repository. It means that only the necessary steps will be executed and the -workflow used to orchestrate these steps is stored in the repository. +When an update command is executed, Renku looks into the most recent execution +of each workflow and checks which one is outdated (i.e. at least one of its +inputs is modified). It generates a minimal dependency graph for each outdated +file stored in the repository. It means that only the necessary steps will be +executed. Assume that the following history for the file ``H`` exists. @@ -53,7 +55,7 @@ .. code-block:: console - $ renku update E + $ renku update E H * Update all files by simply running @@ -96,23 +98,13 @@ \ (D) -An attempt to update a single file would fail with the following error. - -.. code-block:: console - - $ renku update C - Error: There are missing output siblings: - - B - D - - Include the files above in the command or use --with-siblings option. +An attempt to update a single file would updates its siblings as well. The following commands will produce the same result. .. code-block:: console - $ renku update --with-siblings C + $ renku update C $ renku update B C D """ @@ -120,20 +112,13 @@ import click from renku.cli.utils.callback import ClickCallback -from renku.core.commands.options import option_siblings -from renku.core.commands.update import update_workflows +from renku.core.commands.update import update_command @click.command() -@click.option("--revision", default="HEAD") -@click.option("--no-output", is_flag=True, default=False, help="Display commands without output files.") @click.option("--all", "-a", "update_all", is_flag=True, default=False, help="Update all outdated files.") -@option_siblings @click.argument("paths", type=click.Path(exists=True, dir_okay=True), nargs=-1) -def update(revision, no_output, update_all, siblings, paths): +def update(update_all, paths): """Update existing files by rerunning their outdated workflow.""" communicator = ClickCallback() - - update_workflows().with_communicator(communicator).build().execute( - revision=revision, no_output=no_output, update_all=update_all, siblings=siblings, paths=paths - ) + update_command().with_communicator(communicator).build().execute(update_all=update_all, paths=paths) diff --git a/renku/core/commands/status.py b/renku/core/commands/status.py index ff01ee5fbd..e5888bc8da 100644 --- a/renku/core/commands/status.py +++ b/renku/core/commands/status.py @@ -15,7 +15,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -"""Renku show command.""" +"""Renku ``status`` command.""" from collections import defaultdict from typing import List, Set, Tuple diff --git a/renku/core/commands/update.py b/renku/core/commands/update.py index f6dc3ad508..4ed57ff9fe 100644 --- a/renku/core/commands/update.py +++ b/renku/core/commands/update.py @@ -15,29 +15,34 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -"""Renku update command.""" +"""Renku ``update`` command.""" import uuid +from typing import Generator, List, Union +import networkx from git import Actor -from renku.core.commands.cwl_runner import execute from renku.core.errors import ParameterError from renku.core.management.command_builder import inject from renku.core.management.command_builder.command import Command from renku.core.management.interface.activity_gateway import IActivityGateway from renku.core.management.interface.client_dispatcher import IClientDispatcher -from renku.core.management.workflow.converters.cwl import CWLConverter from renku.core.management.workflow.plan_factory import delete_indirect_files_list -from renku.core.utils.git import add_to_git +from renku.core.models.provenance.activity import Activity +from renku.core.models.workflow.composite_plan import CompositePlan, PlanCollection +from renku.core.models.workflow.plan import Plan +from renku.core.utils.datetime8601 import local_now +from renku.core.utils.git import add_to_git, get_modified_entities +from renku.core.utils.os import get_relative_paths from renku.version import __version__, version_url -def update_workflows(): +def update_command(): """Update existing files by rerunning their outdated workflow.""" return ( Command() - .command(_update_workflows) + .command(_update) .require_migration() .require_clean() .require_nodejs() @@ -46,96 +51,192 @@ def update_workflows(): ) -def _update_workflows(revision, no_output, update_all, siblings, paths): +@inject.autoparams() +def _update(update_all, client_dispatcher: IClientDispatcher, activity_gateway: IActivityGateway, paths=None): if not paths and not update_all: raise ParameterError("Either PATHS or --all/-a should be specified.") if paths and update_all: raise ParameterError("Cannot use PATHS and --all/-a at the same time.") - # TODO: Implement this properly with new database - # graph = Graph() - # outputs = graph.build(revision=revision, can_be_cwl=no_output, paths=paths) - # outputs = {node for node in outputs if graph.need_update(node)} - # if not outputs: - # communication.echo("All files were generated from the latest inputs.") - # sys.exit(0) + client = client_dispatcher.current_client - # # Check or extend siblings of outputs. - # outputs = siblings(graph, outputs) - # output_paths = {node.path for node in outputs if _safe_path(node.path)} + paths = paths or [] + paths = get_relative_paths(base=client.path, paths=paths) - # # Get all clean nodes. - # input_paths = {node.path for node in graph.nodes} - output_paths + modified_activities = _get_modified_activities(client, activity_gateway) + ordered_activities = _get_ordered_downstream_activities(modified_activities, activity_gateway, paths) - # # Store the generated workflow used for updating paths. - # workflow = graph.as_workflow(input_paths=input_paths, output_paths=output_paths, outputs=outputs) + update_plan = _create_plan_from_activity_list(ordered_activities) - # execute_workflow(workflow, output_paths, command_name="update", update_commits=True) + execute_workflow(workflow=update_plan, command_name="update") -@inject.autoparams() -def execute_workflow( - workflow, - output_paths, - command_name, - update_commits, - client_dispatcher: IClientDispatcher, - activity_gateway: IActivityGateway, +def _get_modified_activities(client, activity_gateway) -> Generator[Activity, None, None]: + """Return latest activities that one of their inputs is modified.""" + latest_activities = activity_gateway.get_latest_activity_per_plan().values() + + used_entities = (u.entity for a in latest_activities for u in a.usages) + modified, _ = get_modified_entities(entities=used_entities, repo=client.repo) + return (a for a in latest_activities if any(u.entity in modified for u in a.usages)) + + +def _get_ordered_downstream_activities( + starting_activities: Generator[Activity, None, None], activity_gateway: IActivityGateway, paths: List[str] ): - """Execute a Run with/without subprocesses.""" - client = client_dispatcher.current_client + """Return an ordered list of activities so that an activities comes before all its downstream activities.""" + graph = networkx.DiGraph() - wf, path = CWLConverter.convert(workflow, client.path) - # Don't compute paths if storage is disabled. - if client.check_external_storage(): - # Make sure all inputs are pulled from a storage. - paths_ = (i.consumes.path for i in workflow.inputs) - client.pull_paths_from_storage(*paths_) + activities = set(starting_activities) + while activities: + activity = activities.pop() + child_activities = activity_gateway.get_downstream_activities(activity, max_depth=1) - delete_indirect_files_list(client.path) + if len(child_activities) > 0: + activities |= child_activities + for child in child_activities: + graph.add_edge(activity, child) + elif activity not in graph: + graph.add_node(activity) - # Execute the workflow and relocate all output files. - # FIXME get new output paths for edited tools - # output_paths = {path for _, path in workflow.iter_output_files()} - execute(path, output_paths=output_paths) + if paths: + tail_activities = {activity for activity in graph if any(g.entity.path in paths for g in activity.generations)} - paths = [o.produces.path for o in workflow.outputs] + # NOTE: Add tail nodes and their ancestors are required for an update + required_activities = tail_activities.copy() + for activity in tail_activities: + parents = networkx.ancestors(graph, activity) + required_activities.update(parents) - add_to_git(client.repo.git, *paths) + original_graph = graph.copy() + # Exclude non-required activities + for activity in original_graph: + if activity not in required_activities: + graph.remove_node(activity) - if client.repo.is_dirty(): - commit_msg = f"renku {command_name}: committing {len(paths)} newly added files" + return list(networkx.algorithms.dag.topological_sort(graph)) - committer = Actor(f"renku {__version__}", version_url) - client.repo.index.commit(commit_msg, committer=committer, skip_hooks=True) +def _create_plan_from_activity_list(activities: List[Activity]) -> Union[Plan, PlanCollection]: + """Create a CompositePlan by using Plans from an activity list.""" + plans = [a.to_plan() for a in activities] - workflow_name = f"{uuid.uuid4().hex}_{command_name}.yaml" + if len(plans) == 1: + return plans[0] - path = client.workflow_path / workflow_name + return PlanCollection(id=PlanCollection.generate_id(), plans=plans, name=f"plan-collection-{uuid.uuid4().hex}") - workflow.update_id_and_label_from_commit_path(client, client.repo.head.commit, path) - # TODO: implement properly with new database - # if not workflow.subprocesses: # Update parameters if there is only one step - # _update_run_parameters(run=workflow, working_dir=client.path) +@inject.autoparams() +def execute_workflow( + workflow: Union[CompositePlan, Plan, PlanCollection], + command_name, + client_dispatcher: IClientDispatcher, + activity_gateway: IActivityGateway, +): + """Execute a Run with/without subprocesses.""" + client = client_dispatcher.current_client + + # NOTE: Pull inputs from Git LFS or other storage backends + if client.check_external_storage(): + inputs = [i.actual_value for i in workflow.inputs] + client.pull_paths_from_storage(*inputs) - # cls = WorkflowRun if workflow.subprocesses else ProcessRun - # activity = cls.from_run(run=workflow, path=path, update_commits=update_commits) - # activity.to_yaml(path=path) - # client.add_to_activity_index(activity) + delete_indirect_files_list(client.path) - # activity_gateway.add(activity) + started_at_time = local_now() + modified_outputs = _execute_workflow_helper(workflow=workflow, client=client) -# def _update_run_parameters(run, working_dir): + ended_at_time = local_now() -# default_parameters = {p.name: p for p in run.run_parameters} + add_to_git(client.repo.git, *modified_outputs) -# indirect_parameters = read_indirect_parameters(working_dir) -# for name, value in indirect_parameters.items(): -# id_ = RunParameter.generate_id(run_id=run._id, name=name) -# parameter = RunParameter(id=id_, name=name, value=value) -# default_parameters[name] = parameter + if client.repo.is_dirty(): + postfix = "s" if len(modified_outputs) > 1 else "" + commit_msg = f"renku {command_name}: committing {len(modified_outputs)} modified file{postfix}" + committer = Actor(f"renku {__version__}", version_url) + client.repo.index.commit(commit_msg, committer=committer, skip_hooks=True) -# run.run_parameters = list(default_parameters.values()) + activity = Activity.from_plan(plan=workflow, started_at_time=started_at_time, ended_at_time=ended_at_time) + + activity_gateway.add(activity) + + +# TODO: This function is created as a patch from renku/core/commands/workflow.py::_execute_workflow and +# renku/core/management/workflow/providers/cwltool_provider.py::CWLToolProvider::workflow_execute in the ``workflow +# execute`` PR (renku-python/pull/2273). Once the PR is merged remove this function and refactor +# renku/core/commands/workflow.py::_execute_workflow to accept a Plan and use it here. +def _execute_workflow_helper(workflow: Union[Plan, PlanCollection], client): + """Executes a given workflow using cwltool.""" + import os + import shutil + import sys + import tempfile + from pathlib import Path + from urllib.parse import unquote + + import cwltool.factory + from cwltool.context import LoadingContext, RuntimeContext + + from renku.core import errors + from renku.core.commands.echo import progressbar + + basedir = client.path + + with tempfile.NamedTemporaryFile() as f: + # export Plan to cwl + from renku.core.management.workflow.converters.cwl import CWLExporter + + converter = CWLExporter() + converter.workflow_convert(workflow=workflow, basedir=basedir, output=Path(f.name), output_format=None) + + # run cwl with cwltool + argv = sys.argv + sys.argv = ["cwltool"] + + runtime_args = {"rm_tmpdir": False, "move_outputs": "leave", "preserve_entire_environment": True} + loading_args = {"relax_path_checks": True} + + # Keep all environment variables. + runtime_context = RuntimeContext(kwargs=runtime_args) + loading_context = LoadingContext(kwargs=loading_args) + + factory = cwltool.factory.Factory(loading_context=loading_context, runtime_context=runtime_context) + process = factory.make(os.path.relpath(str(f.name))) + + try: + outputs = process() + except cwltool.factory.WorkflowStatus: + raise errors.RenkuException("WorkflowExecuteError()") + + sys.argv = argv + + # Move outputs to correct location in the repository. + output_dirs = process.factory.executor.output_dirs + + def remove_prefix(location, prefix="file://"): + if location.startswith(prefix): + return unquote(location[len(prefix) :]) + return unquote(location) + + locations = {remove_prefix(output["location"]) for output in outputs.values()} + # make sure to not move an output if it's containing directory gets moved + locations = { + location for location in locations if not any(location.startswith(d) for d in locations if location != d) + } + + output_paths = [] + with progressbar(locations, label="Moving outputs") as bar: + for location in bar: + for output_dir in output_dirs: + if location.startswith(output_dir): + output_path = location[len(output_dir) :].lstrip(os.path.sep) + destination = basedir / output_path + output_paths.append(destination) + if destination.is_dir(): + shutil.rmtree(str(destination)) + destination = destination.parent + shutil.move(location, str(destination)) + continue + + return client.remove_unmodified(output_paths) diff --git a/renku/core/management/interface/activity_gateway.py b/renku/core/management/interface/activity_gateway.py index 9dc1e3c8ee..058ce750e2 100644 --- a/renku/core/management/interface/activity_gateway.py +++ b/renku/core/management/interface/activity_gateway.py @@ -35,7 +35,7 @@ def get_plans_and_usages_for_latest_activities(self) -> Dict[AbstractPlan, List[ """Get all usages associated with a plan by its latest activity.""" raise NotImplementedError - def get_downstream_activities(self, activity: Activity) -> Set[Activity]: + def get_downstream_activities(self, activity: Activity, max_depth=None) -> Set[Activity]: """Get downstream activities that depend on this activity.""" raise NotImplementedError diff --git a/renku/core/management/workflow/concrete_execution_graph.py b/renku/core/management/workflow/concrete_execution_graph.py index d3e6a744be..8418511e73 100644 --- a/renku/core/management/workflow/concrete_execution_graph.py +++ b/renku/core/management/workflow/concrete_execution_graph.py @@ -30,8 +30,8 @@ class ExecutionGraph: """Represents an execution graph for one or more workflow steps.""" - def __init__(self, workflow: Union["plan.Plan", "composite_plan.CompositePlan"], virtual_links: bool = False): - self.workflow: Union["plan.Plan", "composite_plan.CompositePlan"] = workflow + def __init__(self, workflow: Union["plan.Plan", "composite_plan.PlanCollection"], virtual_links: bool = False): + self.workflow: Union["plan.Plan", "composite_plan.PlanCollection"] = workflow self.virtual_links = [] self.calculate_concrete_execution_graph(virtual_links=virtual_links) @@ -54,7 +54,7 @@ def calculate_concrete_execution_graph(self, virtual_links: bool = False): while workflow_stack: workflow = workflow_stack.pop() - if isinstance(workflow, composite_plan.CompositePlan): + if isinstance(workflow, composite_plan.PlanCollection): workflow_stack.extend(workflow.plans) self._add_composite_plan_links_to_graph(workflow) @@ -89,7 +89,7 @@ def _create_virtual_links( self._add_leaf_parameter_link(*edge) self.virtual_links.append(edge) - def _add_composite_plan_links_to_graph(self, workflow: "composite_plan.CompositePlan") -> None: + def _add_composite_plan_links_to_graph(self, workflow: "composite_plan.PlanCollection") -> None: """Adds links for a grouped run to the graph.""" if not workflow.links: return @@ -137,10 +137,13 @@ def workflow_graph(self): for node in self.graph.nodes: if not isinstance(node, parameter.CommandInput): - if isinstance(node, (composite_plan.CompositePlan, plan.Plan)): + if isinstance(node, (composite_plan.PlanCollection, plan.Plan)): workflow_graph.add_node(node) continue + if not next(self.graph.predecessors(node), None): + continue + target = next(self.graph.successors(node), None) source = next(self.graph.predecessors(next(self.graph.predecessors(node), None)), None) workflow_graph.add_edge(source, target) diff --git a/renku/core/management/workflow/converters/cwl.py b/renku/core/management/workflow/converters/cwl.py index 1fd6c7a9cc..2767225340 100644 --- a/renku/core/management/workflow/converters/cwl.py +++ b/renku/core/management/workflow/converters/cwl.py @@ -30,7 +30,7 @@ from renku.core import errors from renku.core.management.workflow.concrete_execution_graph import ExecutionGraph from renku.core.models.entity import Collection -from renku.core.models.workflow.composite_plan import CompositePlan +from renku.core.models.workflow.composite_plan import PlanCollection from renku.core.models.workflow.converters import IWorkflowConverter from renku.core.models.workflow.parameter import DIRECTORY_MIME_TYPE, CommandInput, CommandOutput, CommandParameter from renku.core.models.workflow.plan import Plan @@ -436,7 +436,7 @@ def _convert_parameter(parameter): class CWLExporter(IWorkflowConverter): - """Converts a ``CompositePlan`` or a ``Plan`` to cwl format.""" + """Converts a ``Plan`` or a ``PlanCollection`` to cwl format.""" @hookimpl def workflow_format(self): @@ -445,7 +445,7 @@ def workflow_format(self): @hookimpl def workflow_convert( - self, workflow: Union[CompositePlan, Plan], basedir: Path, output: Optional[Path], output_format: Optional[str] + self, workflow: Union[Plan, PlanCollection], basedir: Path, output: Optional[Path], output_format: Optional[str] ): """Converts the specified workflow to cwl format.""" filename = None @@ -458,7 +458,7 @@ def workflow_convert( else: tmpdir = Path(tempfile.mkdtemp()) - if isinstance(workflow, CompositePlan): + if isinstance(workflow, PlanCollection): tool_object, path = CWLExporter._convert_composite( workflow, tmpdir, basedir, filename=filename, output_format=output_format ) @@ -475,7 +475,7 @@ def _sanitize_id(id): @staticmethod def _convert_composite( - workflow: CompositePlan, tmpdir: Path, basedir: Path, filename: Optional[Path], output_format: Optional[str] + workflow: PlanCollection, tmpdir: Path, basedir: Path, filename: Optional[Path], output_format: Optional[str] ): """Converts a composite plan to a cwl file.""" inputs = {} @@ -493,14 +493,16 @@ def _convert_composite( cycles = [map(lambda x: x.name, cycle) for cycle in cycles] raise errors.GraphCycleError(cycles) - for i, wf in enumerate(graph.workflow_graph.nodes): + import networkx as nx + + for i, wf in enumerate(nx.topological_sort(graph.workflow_graph)): _, path = CWLExporter._convert_step( workflow=wf, tmpdir=tmpdir, basedir=basedir, filename=None, output_format=output_format ) step = WorkflowStep("step_{}".format(i), str(path.resolve())) for input in wf.inputs: - input_path = input.default_value + input_path = input.actual_value sanitized_id = CWLExporter._sanitize_id(input.id) if input_path in inputs: @@ -522,7 +524,7 @@ def _convert_composite( for parameter in wf.parameters: argument_id = "argument_{}".format(argument_index) - arguments[argument_id] = parameter.default_value + arguments[argument_id] = parameter.actual_value step.inputs.append(cwlgen.WorkflowStepInput(CWLExporter._sanitize_id(parameter.id), source=argument_id)) argument_index += 1 @@ -531,7 +533,7 @@ def _convert_composite( if output.mapped_to: sanitized_id = "output_{}".format(output.mapped_to.stream_type) - outputs[output.default_value] = (sanitized_id, step.id) + outputs[output.actual_value] = (sanitized_id, step.id) step.out.append(cwlgen.WorkflowStepOutput(sanitized_id)) steps.append(step) @@ -547,7 +549,7 @@ def _convert_composite( cwlgen.InputParameter( id_, param_type=type_, - default={"location": path.resolve().as_uri(), "class": type_}, + default={"location": Path(path).resolve().as_uri(), "class": type_}, ) ) @@ -572,7 +574,7 @@ def _convert_composite( @staticmethod def _convert_step( - workflow: CompositePlan, tmpdir: Path, basedir: Path, filename: Optional[Path], output_format: Optional[str] + workflow: Plan, tmpdir: Path, basedir: Path, filename: Optional[Path], output_format: Optional[str] ): """Converts a single workflow step to a cwl file.""" stdin, stdout, stderr = None, None, None @@ -583,9 +585,9 @@ def _convert_step( if not output_.mapped_to: continue if output_.mapped_to.stream_type == "stderr": - stderr = output_.default_value + stderr = output_.actual_value if output_.mapped_to.stream_type == "stdout": - stdout = output_.default_value + stdout = output_.actual_value tool_object = CommandLineTool( tool_id=str(uuid4()), @@ -602,7 +604,7 @@ def _convert_step( dirents = [] for output_ in workflow.outputs: - path = output_.default_value + path = output_.actual_value if not os.path.isdir(path): path = str(Path(path).parent) if path != "." and path not in dirents and output_.create_folder: @@ -625,7 +627,7 @@ def _convert_step( workdir_req.listing.append( cwlgen.InitialWorkDirRequirement.Dirent( - entry="$(inputs.{})".format(tool_input.id), entryname=input_.default_value, writable=False + entry="$(inputs.{})".format(tool_input.id), entryname=input_.actual_value, writable=False ) ) @@ -680,7 +682,7 @@ def _convert_step( @staticmethod def _convert_parameter(parameter: CommandParameter): """Converts an parameter to a CWL input.""" - value, type_ = _get_argument_type(parameter.default_value) + value, type_ = _get_argument_type(parameter.actual_value) separate = None prefix = None @@ -722,7 +724,7 @@ def _convert_input(input: CommandInput, basedir: Path): sanitized_id, param_type=type_, input_binding=cwlgen.CommandLineBinding(position=input.position, prefix=prefix, separate=separate), - default={"location": (basedir / input.default_value).resolve().as_uri(), "class": type_}, + default={"location": (basedir / input.actual_value).resolve().as_uri(), "class": type_}, ) @staticmethod @@ -758,7 +760,7 @@ def _convert_output(output: CommandInput): "{}_arg".format(sanitized_id), param_type="string", input_binding=cwlgen.CommandLineBinding(position=output.position, prefix=prefix, separate=separate), - default=output.default_value, + default=output.actual_value, ) outp = cwlgen.CommandOutputParameter( sanitized_id, @@ -769,7 +771,7 @@ def _convert_output(output: CommandInput): return ( cwlgen.CommandOutputParameter( - sanitized_id, param_type=type_, output_binding=cwlgen.CommandOutputBinding(glob=output.default_value) + sanitized_id, param_type=type_, output_binding=cwlgen.CommandOutputBinding(glob=output.actual_value) ), None, ) diff --git a/renku/core/metadata/database.py b/renku/core/metadata/database.py index aad1abc821..57311aefce 100644 --- a/renku/core/metadata/database.py +++ b/renku/core/metadata/database.py @@ -494,15 +494,16 @@ def items(self): """Return an iterator of keys and values.""" return self._entries.items() - def add(self, object: persistent.Persistent, *, key: Optional[str] = None, key_object=None): + def add(self, object: persistent.Persistent, *, key: Optional[str] = None, key_object=None, verify=True): """Update index with object. If `Index._attribute` is not None then key is automatically generated. Key is extracted from `key_object` if it is not None; otherwise, it's extracted from `object`. """ assert isinstance(object, self._object_type), f"Cannot add objects of type '{type(object)}'" - - key = self._verify_and_get_key(object=object, key_object=key_object, key=key, missing_key_object_ok=False) + key = self._verify_and_get_key( + object=object, key_object=key_object, key=key, missing_key_object_ok=False, verify=verify + ) self._entries[key] = object def generate_key(self, object: persistent.Persistent, *, key_object=None): @@ -512,7 +513,9 @@ def generate_key(self, object: persistent.Persistent, *, key_object=None): """ return self._verify_and_get_key(object=object, key_object=key_object, key=None, missing_key_object_ok=False) - def _verify_and_get_key(self, *, object: persistent.Persistent, key_object, key, missing_key_object_ok): + def _verify_and_get_key( + self, *, object: persistent.Persistent, key_object, key, missing_key_object_ok, verify=True + ): if self._key_type: if not missing_key_object_ok: assert isinstance(key_object, self._key_type), f"Invalid key type: {type(key_object)} for '{self.name}'" @@ -523,7 +526,10 @@ def _verify_and_get_key(self, *, object: persistent.Persistent, key_object, key, key_object = key_object or object correct_key = get_attribute(key_object, self._attribute) if key is not None: - assert key == correct_key, f"Incorrect key for index '{self.name}': '{key}' != '{correct_key}'" + if verify: + assert key == correct_key, f"Incorrect key for index '{self.name}': '{key}' != '{correct_key}'" + else: + correct_key = key else: assert key is not None, "No key is provided" correct_key = key diff --git a/renku/core/metadata/gateway/activity_gateway.py b/renku/core/metadata/gateway/activity_gateway.py index 73bace3c60..60ddf17ed8 100644 --- a/renku/core/metadata/gateway/activity_gateway.py +++ b/renku/core/metadata/gateway/activity_gateway.py @@ -17,6 +17,7 @@ # limitations under the License. """Renku activity database gateway implementation.""" +import os from pathlib import Path from typing import Dict, List, Set @@ -29,7 +30,8 @@ from renku.core.metadata.gateway.database_gateway import ActivityDownstreamRelation from renku.core.models.entity import Collection from renku.core.models.provenance.activity import Activity, Usage -from renku.core.models.workflow.plan import AbstractPlan +from renku.core.models.workflow.composite_plan import CompositePlan +from renku.core.models.workflow.plan import AbstractPlan, Plan class ActivityGateway(IActivityGateway): @@ -49,13 +51,14 @@ def get_plans_and_usages_for_latest_activities(self) -> Dict[AbstractPlan, List[ return {a.association.plan: a.usages for a in plan_activities} - def get_downstream_activities(self, activity: Activity) -> Set[Activity]: + def get_downstream_activities(self, activity: Activity, max_depth=None) -> Set[Activity]: """Get downstream activities that depend on this activity.""" # NOTE: since indices are populated one way when adding an activity, we need to query two indices database = self.database_dispatcher.current_database - tok = database["activity-catalog"].tokenizeQuery - downstream = set(database["activity-catalog"].findValues("downstream", tok(upstream=activity))) + activity_catalog = database["activity-catalog"] + tok = activity_catalog.tokenizeQuery + downstream = set(activity_catalog.findValues("downstream", tok(upstream=activity), maxDepth=max_depth)) return downstream @@ -65,6 +68,13 @@ def get_all_activities(self) -> List[Activity]: def add(self, activity: Activity): """Add an ``Activity`` to storage.""" + + def update_latest_activity_by_plan(plan): + existing_activity = database["latest-activity-by-plan"].get(plan.id) + + if not existing_activity or existing_activity.ended_at_time < activity.ended_at_time: + database["latest-activity-by-plan"].add(activity, key=plan.id, verify=False) + database = self.database_dispatcher.current_database database["activities"].add(activity) @@ -82,9 +92,9 @@ def add(self, activity: Activity): if isinstance(usage.entity, Collection): # NOTE: Get dependants that are in a generated directory - for path, activities in database["activities-by-generation"].items(): + for path, activities in by_generation.items(): parent = Path(usage.entity.path).resolve() - child = Path(path).resolve() + child = Path(os.path.abspath(path)) if parent == child or parent in child.parents: upstreams.extend(activities) elif usage.entity.path in by_generation: @@ -97,9 +107,9 @@ def add(self, activity: Activity): if isinstance(generation.entity, Collection): # NOTE: Get dependants that are in a generated directory - for path, activities in by_generation.items(): + for path, activities in by_usage.items(): parent = Path(generation.entity.path).resolve() - child = Path(path).resolve() + child = Path(os.path.abspath(path)) if parent == child or parent in child.parents: downstreams.extend(activities) elif generation.entity.path in by_usage: @@ -111,11 +121,12 @@ def add(self, activity: Activity): if downstreams: database["activity-catalog"].index(ActivityDownstreamRelation(downstream=downstreams, upstream=[activity])) - plan_gateway = inject.instance(IPlanGateway) - - plan_gateway.add(activity.association.plan) + if isinstance(activity.association.plan, (CompositePlan, Plan)): + plan_gateway = inject.instance(IPlanGateway) - existing_activity = database["latest-activity-by-plan"].get(activity.association.plan.id) + plan_gateway.add(activity.association.plan) - if not existing_activity or existing_activity.ended_at_time < activity.ended_at_time: - database["latest-activity-by-plan"].add(activity) + update_latest_activity_by_plan(activity.association.plan) + else: # A PlanCollection + for p in activity.association.plan.plans: + update_latest_activity_by_plan(p) diff --git a/renku/core/models/provenance/activity.py b/renku/core/models/provenance/activity.py index 794ca7cdaa..c8c666629e 100644 --- a/renku/core/models/provenance/activity.py +++ b/renku/core/models/provenance/activity.py @@ -16,7 +16,7 @@ # See the License for the specific language governing permissions and # limitations under the License. """Represent an execution of a Plan.""" - +import itertools from datetime import datetime from typing import List, Union from uuid import uuid4 @@ -39,6 +39,7 @@ VariableParameterValue, VariableParameterValueSchema, ) +from renku.core.models.workflow.composite_plan import CompositePlan, PlanCollection from renku.core.models.workflow.plan import Plan, PlanSchema NON_EXISTING_ENTITY_CHECKSUM = "0" * 40 @@ -126,53 +127,75 @@ def __init__( # TODO: influenced = attr.ib(kw_only=True) @classmethod - @inject.autoparams() + @inject.autoparams("client_dispatcher", "project_gateway") def from_plan( cls, - plan: Plan, + plan: Union[Plan, PlanCollection], client_dispatcher: IClientDispatcher, project_gateway: IProjectGateway, started_at_time: datetime, ended_at_time: datetime, - annotations: List[Annotation], + annotations: List[Annotation] = None, commit=None, update_commits=False, ): - """Convert a ``Plan`` to a ``Activity``.""" - from renku.core.models.provenance.agent import SoftwareAgent + """Convert a list of ``Plan`` to an ``Activity``.""" from renku.core.plugins.pluginmanager import get_plugin_manager + def create_parameters(): + parameters = [] + + for path_parameter in itertools.chain(plan.inputs, plan.outputs): + id = PathParameterValue.generate_id(activity_id) + parameters.append(PathParameterValue(id=id, parameter=path_parameter, path=path_parameter.actual_value)) + + for parameter in plan.parameters: + id = VariableParameterValue.generate_id(activity_id) + parameters.append(VariableParameterValue(id=id, parameter=parameter, value=parameter.actual_value)) + + return parameters + client = client_dispatcher.current_client if not commit: commit = client.repo.head.commit + activity_id = cls.generate_id() + + input_paths = [i.actual_value for i in plan.inputs] + output_paths = [o.actual_value for o in plan.outputs] + + # if isinstance(plan, PlanCollection): + # # NOTE: Don't include intermediate paths as Activity's inputs or outputs + # inputs = [p for p in input_paths if p not in output_paths] + # outputs = [p for p in output_paths if p not in input_paths] + # else: + # inputs = input_paths + # outputs = output_paths + + inputs = input_paths + outputs = output_paths + usages = [] generations = [] - parameter_values = [] - activity_id = cls.generate_id() - - for input_ in plan.inputs: - input_path = input_.default_value + for input_path in inputs: entity = Entity.from_revision(client, path=input_path, revision=commit.hexsha) - dependency = Usage(entity=entity, id=Usage.generate_id(activity_id)) - usages.append(dependency) - for output in plan.outputs: - output_path = output.default_value + for output_path in outputs: entity = Entity.from_revision(client, path=output_path, revision=commit.hexsha) - generation = Generation(entity=entity, id=Usage.generate_id(activity_id)) - generations.append(generation) agent = SoftwareAgent.from_commit(commit) person = Person.from_client(client) association = Association(agent=agent, id=Association.generate_id(activity_id), plan=plan) + # NOTE: Create parameters from all plan's inputs, outputs, and parameters + parameter_values = create_parameters() + activity = cls( id=activity_id, association=association, @@ -201,6 +224,20 @@ def generate_id() -> str: # TODO: make id generation idempotent return f"/activities/{uuid4().hex}" + def to_plan(self) -> Union[CompositePlan, Plan, PlanCollection]: + """Return activity's plan with input/output/parameter values set from activity's parameters.""" + plan = self.association.plan + # TODO: This won't work if a plan is repeated within a composite plan + plan_parameters = { + p.id: p for p in itertools.chain(plan.inputs or [], plan.outputs or [], plan.parameters or []) + } + + for parameter in self.parameters: + plan_parameter = plan_parameters[parameter.parameter.id] + plan_parameter.actual_value = parameter.value + + return plan + class AssociationSchema(JsonLDSchema): """Association schema.""" diff --git a/renku/core/models/provenance/parameter.py b/renku/core/models/provenance/parameter.py index 5cf167990f..57a49cf493 100644 --- a/renku/core/models/provenance/parameter.py +++ b/renku/core/models/provenance/parameter.py @@ -54,6 +54,11 @@ def __init__(self, *, id: str, parameter: Union[CommandInput, CommandOutput], pa self.parameter: Union[CommandInput, CommandOutput] = parameter self.path: Union[Path, str] = str(path) + @property + def value(self): + """Return parameter's value.""" + return self.path + class VariableParameterValue(ParameterValue): """Value for a parameter in provenance.""" diff --git a/renku/core/models/workflow/composite_plan.py b/renku/core/models/workflow/composite_plan.py index 158b2f1dd3..60a5a137da 100644 --- a/renku/core/models/workflow/composite_plan.py +++ b/renku/core/models/workflow/composite_plan.py @@ -19,7 +19,7 @@ from collections import defaultdict from datetime import datetime -from typing import Callable, Dict, List, Optional, Tuple, Union +from typing import Callable, Dict, Generator, List, Optional, Tuple, Union from uuid import uuid4 from marshmallow import EXCLUDE @@ -39,7 +39,59 @@ from renku.core.models.workflow.plan import MAX_GENERATED_NAME_LENGTH, AbstractPlan, Plan, PlanSchema -class CompositePlan(AbstractPlan): +class PlanCollection(AbstractPlan): + """A collection that contains a sorted list of other plans.""" + + def __init__( + self, + *, + derived_from: str = None, + description: str = None, + id: str, + invalidated_at: datetime = None, + keywords: List[str] = None, + name: str, + plans: List[Union["CompositePlan", Plan, "PlanCollection"]] = None, + project_id: str = None, + ): + super().__init__( + derived_from=derived_from, + description=description, + id=id, + invalidated_at=invalidated_at, + keywords=keywords, + name=name, + project_id=project_id, + ) + self.plans: List[Union["CompositePlan", Plan, "PlanCollection"]] = plans + self.links: List[ParameterLink] = [] + + @property + def inputs(self) -> Generator[CommandInput, None, None]: + """Return a generator of all plans' inputs.""" + return (i for p in self.plans for i in p.inputs) + + @property + def outputs(self) -> Generator[CommandOutput, None, None]: + """Return a generator of all plans' outputs.""" + return (o for p in self.plans for o in p.outputs) + + @property + def parameters(self) -> Generator[CommandParameter, None, None]: + """Return a generator of all plans' inputs.""" + return (e for p in self.plans for e in p.parameters) + + def find_parameter_workflow(self, parameter: CommandParameterBase) -> Optional[Union["CompositePlan", Plan]]: + """Return the workflow a parameter belongs to.""" + for plan in self.plans: + found = plan.find_parameter_workflow(parameter) + if found: + return found + + return None + + +class CompositePlan(PlanCollection): """A plan containing child plans.""" def __init__( @@ -56,27 +108,20 @@ def __init__( mappings: List[ParameterMapping] = None, links: List[ParameterLink] = None, ): - self.description: str = description - self.id: str = id - self.invalidated_at: datetime = invalidated_at - self.keywords: List[str] = keywords or [] - self.project_id: str = project_id - - self.name: str = name - self.derived_from: str = derived_from - - AbstractPlan.validate_name(name) + super().__init__( + derived_from=derived_from, + description=description, + id=id, + invalidated_at=invalidated_at, + keywords=keywords, + name=name, + plans=plans, + project_id=project_id, + ) - self.plans: List[Union["CompositePlan", Plan]] = plans self.mappings: List[ParameterMapping] = mappings or [] self.links: List[ParameterLink] = links or [] - @staticmethod - def generate_id(uuid: str = None) -> str: - """Generate an identifier for Plan.""" - uuid = uuid or uuid4().hex - return f"/plans/{uuid}" - def _find_existing_mapping( self, targets: List[CommandParameterBase] ) -> Dict[CommandParameterBase, List[ParameterMapping]]: @@ -223,12 +268,7 @@ def find_parameter_workflow(self, parameter: CommandParameterBase) -> Optional[U if parameter in self.mappings: return self - for plan in self.plans: - found = plan.find_parameter_workflow(parameter) - if found: - return found - - return None + return super().find_parameter_workflow(parameter) def find_link_by_target(self, target: CommandInput): """Find a link on this or a child workflow that has target as a sink.""" diff --git a/renku/core/models/workflow/plan.py b/renku/core/models/workflow/plan.py index 5f3c9479c8..283903d1da 100644 --- a/renku/core/models/workflow/plan.py +++ b/renku/core/models/workflow/plan.py @@ -55,6 +55,7 @@ def __init__( invalidated_at: datetime = None, keywords: List[str] = None, name: str = None, + project_id: str = None, derived_from: str = None, ): self.description: str = description @@ -62,6 +63,7 @@ def __init__( self.invalidated_at: datetime = invalidated_at self.keywords: List[str] = keywords or [] self.name: str = name + self.project_id: str = project_id self.derived_from: str = derived_from if not self.name: @@ -150,7 +152,6 @@ def __init__( self.inputs: List[CommandInput] = inputs or [] self.outputs: List[CommandOutput] = outputs or [] self.parameters: List[CommandParameter] = parameters or [] - self.project_id: str = project_id self.success_codes: List[int] = success_codes or [] super().__init__( id=id, @@ -158,6 +159,7 @@ def __init__( invalidated_at=invalidated_at, keywords=keywords, name=name, + project_id=project_id, derived_from=derived_from, ) diff --git a/renku/core/utils/git.py b/renku/core/utils/git.py index 94b987b19b..3c3f0b5dc6 100644 --- a/renku/core/utils/git.py +++ b/renku/core/utils/git.py @@ -114,7 +114,10 @@ def get_renku_repo_url(remote_url, deployment_hostname=None, access_token=None): def get_object_hash(repo: Repo, path: Union[Path, str], revision: str = None) -> Optional[str]: - """Return git hash of an object in a Repo or its submodule.""" + """Return git hash of an object in a Repo or its submodule. + + NOTE: path must be relative to the repo's root regardless if this function is called from a subdirectory or not. + """ def get_object_hash_from_submodules() -> Optional[str]: for submodule in repo.submodules: @@ -225,3 +228,18 @@ def get_previous_commit_from_submodules() -> Optional[Commit]: def get_path(url: str): """Return path part of a url.""" return urllib.parse.urlparse(url).path + + +def get_modified_entities(entities, repo: Repo): + """Get modified and deleted entities.""" + modified = set() + deleted = set() + + for entity in entities: + current_checksum = get_object_hash(repo=repo, path=entity.path) + if not current_checksum: + deleted.add(entity) + elif current_checksum != entity.checksum: + modified.add(entity) + + return modified, deleted diff --git a/tests/cli/fixtures/cli_runner.py b/tests/cli/fixtures/cli_runner.py index 48bbf28cbe..b270953536 100644 --- a/tests/cli/fixtures/cli_runner.py +++ b/tests/cli/fixtures/cli_runner.py @@ -47,7 +47,7 @@ def _get_activities(activity_gateway: IActivityGateway): assert len(new_activities) <= 1 if new_activities: - return exit_code, activities_after[new_activities.pop()].association.plan + return exit_code, activities_after[new_activities.pop()] return exit_code, None diff --git a/tests/cli/test_indirect.py b/tests/cli/test_indirect.py index 85f6a3999f..a3c5851f48 100644 --- a/tests/cli/test_indirect.py +++ b/tests/cli/test_indirect.py @@ -44,8 +44,9 @@ def test_indirect_inputs_outputs(renku_cli, client): client.repo.git.add("--all") client.repo.index.commit("test setup") - exit_code, plan = renku_cli("run", "sh", "-c", "sh script.sh") + exit_code, activity = renku_cli("run", "sh", "-c", "sh script.sh") + plan = activity.association.plan assert 0 == exit_code assert 2 == len(plan.inputs) assert 1 == len(plan.parameters) @@ -81,10 +82,10 @@ def test_duplicate_indirect_inputs(renku_cli, client): client.repo.git.add("--all") client.repo.index.commit("test setup") - exit_code, plan = renku_cli("run", "--no-output", "sh", "-c", "sh script.sh", "baz") + exit_code, activity = renku_cli("run", "--no-output", "sh", "-c", "sh script.sh", "baz") assert 0 == exit_code - assert {"baz", "foo/bar"} == {i.default_value for i in plan.inputs} + assert {"baz", "foo/bar"} == {i.default_value for i in activity.association.plan.inputs} def test_duplicate_indirect_outputs(renku_cli, client): @@ -110,10 +111,10 @@ def test_duplicate_indirect_outputs(renku_cli, client): client.repo.git.add("--all") client.repo.index.commit("test setup") - exit_code, plan = renku_cli("run", "sh", "-c", "sh script.sh") + exit_code, activity = renku_cli("run", "sh", "-c", "sh script.sh") assert 0 == exit_code - assert {"baz", "foo/bar"} == {o.default_value for o in plan.outputs} + assert {"baz", "foo/bar"} == {o.default_value for o in activity.association.plan.outputs} def test_indirect_parameters(renku_cli, client): @@ -133,8 +134,9 @@ def test_indirect_parameters(renku_cli, client): client.repo.git.add("--all") client.repo.index.commit("test setup") - exit_code, plan = renku_cli("run", "--no-output", "sh", "-c", "sh script.sh") + exit_code, activity = renku_cli("run", "--no-output", "sh", "-c", "sh script.sh") + plan = activity.association.plan assert 0 == exit_code assert {"c-1", "param 1", "param-2", "param3"} == {a.name for a in plan.parameters} assert {"sh script.sh", "forty-two", "42.42", "42"} == {a.default_value for a in plan.parameters} @@ -175,7 +177,7 @@ def test_indirect_parameters_update(renku_cli, client): client.repo.git.add("--all") client.repo.index.commit("test setup") - exit_code, plan = renku_cli("update", "--all") + exit_code, activity = renku_cli("update", "--all") assert 0 == exit_code - assert {"forty-two-updated", "42.42", "42"} == {a.default_value for a in plan.parameters} + assert {"forty-two-updated", "42.42", "42"} == {a.default_value for a in activity.association.plan.parameters} diff --git a/tests/cli/test_output_option.py b/tests/cli/test_output_option.py index 97c8b49ad3..985676cea5 100644 --- a/tests/cli/test_output_option.py +++ b/tests/cli/test_output_option.py @@ -26,8 +26,9 @@ def test_run_succeeds_normally(renku_cli, client, subdirectory): """Test when an output is detected""" foo = os.path.relpath(client.path / "foo", os.getcwd()) - exit_code, plan = renku_cli("run", "touch", foo) + exit_code, activity = renku_cli("run", "touch", foo) + plan = activity.association.plan assert 0 == exit_code assert 0 == len(plan.inputs) assert 1 == len(plan.outputs) @@ -46,8 +47,9 @@ def test_with_no_output_option(renku_cli, client, subdirectory): """Test --no-output option with no output detection""" foo = os.path.relpath(client.path / "foo", os.getcwd()) renku_cli("run", "touch", foo) - exit_code, plan = renku_cli("run", "--no-output", "touch", foo) + exit_code, activity = renku_cli("run", "--no-output", "touch", foo) + plan = activity.association.plan assert 0 == exit_code assert 1 == len(plan.inputs) assert "foo" == str(plan.inputs[0].default_value) @@ -63,8 +65,9 @@ def test_explicit_outputs_and_normal_outputs(renku_cli, client, subdirectory): baz = os.path.relpath(client.path / "baz", os.getcwd()) qux = os.path.join(foo, "qux") - exit_code, plan = renku_cli("run", "--output", foo, "--output", bar, "touch", baz, qux) + exit_code, activity = renku_cli("run", "--output", foo, "--output", bar, "touch", baz, qux) + plan = activity.association.plan assert 0 == exit_code plan.inputs.sort(key=lambda e: e.position) assert 4 == len(plan.outputs) @@ -102,10 +105,10 @@ def test_output_directory_without_separate_outputs(renku_cli, client): See https://github.com/SwissDataScienceCenter/renku-python/issues/387 """ a_script = ("sh", "-c", 'mkdir -p "$0"; touch "$0/$1"') - exit_code, plan = renku_cli("run", *a_script, "outdir", "foo") + exit_code, activity = renku_cli("run", *a_script, "outdir", "foo") assert 0 == exit_code - assert 1 == len(plan.outputs) + assert 1 == len(activity.association.plan.outputs) def test_explicit_inputs_must_exist(renku_cli): @@ -136,7 +139,9 @@ def test_explicit_inputs_and_outputs_are_listed(renku_cli, client): renku_cli("run", "touch", "foo/file") renku_cli("run", "touch", "bar", "baz") - exit_code, plan = renku_cli("run", "--input", "foo", "--input", "bar", "--output", "baz", "echo") + exit_code, activity = renku_cli("run", "--input", "foo", "--input", "bar", "--output", "baz", "echo") + + plan = activity.association.plan assert 0 == exit_code assert 2 == len(plan.inputs) @@ -157,8 +162,9 @@ def test_explicit_inputs_can_be_in_inputs(renku_cli, client, subdirectory): foo = os.path.relpath(client.path / "foo", os.getcwd()) renku_cli("run", "touch", foo) - exit_code, plan = renku_cli("run", "--input", foo, "--no-output", "ls", foo) + exit_code, activity = renku_cli("run", "--input", foo, "--no-output", "ls", foo) + plan = activity.association.plan assert 0 == exit_code assert 1 == len(plan.inputs) @@ -199,8 +205,9 @@ def test_no_explicit_or_detected_output(renku_cli): def test_no_output_and_disabled_detection(renku_cli): """Test --no-output works with no output detection.""" - exit_code, plan = renku_cli("run", "--no-output-detection", "--no-output", "echo") + exit_code, activity = renku_cli("run", "--no-output-detection", "--no-output", "echo") + plan = activity.association.plan assert 0 == exit_code assert 0 == len(plan.inputs) assert 0 == len(plan.outputs) @@ -208,10 +215,11 @@ def test_no_output_and_disabled_detection(renku_cli): def test_disabled_detection(renku_cli): """Test disabled auto-detection of inputs and outputs.""" - exit_code, plan = renku_cli( + exit_code, activity = renku_cli( "run", "--no-input-detection", "--no-output-detection", "--output", "README.md", "touch", "some-files" ) + plan = activity.association.plan assert 0 == exit_code assert 0 == len(plan.inputs) assert 1 == len(plan.outputs) @@ -220,10 +228,11 @@ def test_disabled_detection(renku_cli): def test_inputs_must_be_passed_with_no_detection(renku_cli, client): """Test when detection is disabled, inputs must be explicitly passed.""" - exit_code, plan = renku_cli( + exit_code, activity = renku_cli( "run", "--no-input-detection", "--input", "Dockerfile", "--no-output", "ls", "-l", "README.md", "Dockerfile" ) + plan = activity.association.plan assert 0 == exit_code assert 1 == len(plan.inputs) assert plan.inputs[0].position is not None @@ -236,10 +245,11 @@ def test_overlapping_explicit_outputs(renku_cli, client): foo.mkdir() renku_cli("run", "touch", "foo/bar") - exit_code, plan = renku_cli( + exit_code, activity = renku_cli( "run", "--no-input-detection", "--no-output-detection", "--output", "foo", "--output", "foo/bar", "echo" ) + plan = activity.association.plan assert 0 == exit_code assert 0 == len(plan.inputs) assert 2 == len(plan.outputs) @@ -249,17 +259,18 @@ def test_overlapping_explicit_outputs(renku_cli, client): def test_std_streams_must_be_in_explicits(renku_cli): """Test when auto-detection is disabled, std streams must be passed explicitly.""" - exit_code, plan = renku_cli( + exit_code, activity = renku_cli( "run", "--no-output-detection", "--output", "Dockerfile", "ls", stdin="README.md", stdout="out", stderr="err" ) + plan = activity.association.plan assert 0 == exit_code assert 1 == len(plan.inputs) assert "README.md" == str(plan.inputs[0].default_value) assert 1 == len(plan.outputs) assert "Dockerfile" == str(plan.outputs[0].default_value) - exit_code, plan = renku_cli( + exit_code, activity = renku_cli( "run", "--no-input-detection", "--no-output-detection", @@ -275,6 +286,7 @@ def test_std_streams_must_be_in_explicits(renku_cli): stderr="err", ) + plan = activity.association.plan assert 0 == exit_code assert 1 == len(plan.inputs) assert "README.md" == str(plan.inputs[0].default_value) @@ -285,7 +297,7 @@ def test_std_streams_must_be_in_explicits(renku_cli): def test_explicit_input_as_out_streams(renku_cli): """Test cannot use explicit inputs as stdout/stderr when auto-detection is disabled.""" - exit_code, plan = renku_cli( + exit_code, _ = renku_cli( "run", "--no-input-detection", "--no-output-detection", @@ -302,7 +314,7 @@ def test_explicit_input_as_out_streams(renku_cli): def test_explicit_output_as_stdin(renku_cli): """Test cannot use explicit outputs as stdin when auto-detection is disabled.""" - exit_code, plan = renku_cli( + exit_code, _ = renku_cli( "run", "--no-input-detection", "--no-output-detection", "--output", "README.md", "ls", stdin="README.md" ) diff --git a/tests/cli/test_update.py b/tests/cli/test_update.py index 02155621d9..144aa9b69a 100644 --- a/tests/cli/test_update.py +++ b/tests/cli/test_update.py @@ -17,344 +17,239 @@ # limitations under the License. """Test ``update`` command.""" +import os from pathlib import Path import git -import pytest from renku.cli import cli from renku.core.management.repository import DEFAULT_DATA_DIR as DATA_DIR -from tests.utils import format_result_exception +from renku.core.models.workflow.composite_plan import PlanCollection +from renku.core.models.workflow.plan import Plan +from tests.utils import format_result_exception, write_and_commit_file -def update_and_commit(data, file_, repo): - """Update source.txt.""" - with file_.open("w") as fp: - fp.write(data) - - repo.git.add(file_) - repo.index.commit("Updated source.txt") - - -@pytest.mark.skip(reason="renku update not implemented with new metadata yet, reenable later") -def test_update(runner, project, renku_cli, no_lfs_warning): - """Test automatic file update.""" - from renku.core.utils.shacl import validate_graph - - cwd = Path(project) - data = cwd / DATA_DIR - data.mkdir(exist_ok=True, parents=True) - source = cwd / "source.txt" - output = data / "result.txt" - +def test_update(runner, project, renku_cli): + """Test output is updated when source changes.""" repo = git.Repo(project) + source = os.path.join(project, "source.txt") + output = os.path.join(project, "output.txt") - update_and_commit("1", source, repo) + write_and_commit_file(repo, source, "content") - exit_code, run = renku_cli("run", "wc", "-c", stdin=source, stdout=output) + exit_code, previous_activity = renku_cli("run", "head", "-1", source, stdout=output) assert 0 == exit_code - assert 0 == len(run.subprocesses) - previous_run_id = run._id - - with output.open("r") as f: - assert f.read().strip() == "1" - result = runner.invoke(cli, ["status"]) - assert 0 == result.exit_code, format_result_exception(result) + write_and_commit_file(repo, source, "changed content") - update_and_commit("12", source, repo) + exit_code, activity = renku_cli("update", "--all") - result = runner.invoke(cli, ["status"]) - assert 1 == result.exit_code - - exit_code, run = renku_cli("update", "--all") assert 0 == exit_code - assert 0 == len(run.subprocesses) - assert previous_run_id == run._id - previous_run_id = run._id + plan = activity.association.plan + assert previous_activity.association.plan.id == plan.id + assert isinstance(plan, Plan) + assert not isinstance(plan, PlanCollection) + + assert "changed content" == Path(output).read_text() result = runner.invoke(cli, ["status"]) assert 0 == result.exit_code, format_result_exception(result) - with output.open("r") as f: - assert f.read().strip() == "2" - # Source has been updated but output is unchanged. - update_and_commit("34", source, repo) +def test_update_multiple_steps(runner, project, renku_cli): + """Test update in a multi-step workflow.""" + repo = git.Repo(project) + source = os.path.join(project, "source.txt") + intermediate = os.path.join(project, "intermediate.txt") + output = os.path.join(project, "output.txt") - result = runner.invoke(cli, ["status"]) - assert 1 == result.exit_code + write_and_commit_file(repo, source, "content") - exit_code, run = renku_cli("update", "--all") + exit_code, activity1 = renku_cli("run", "cp", source, intermediate) + assert 0 == exit_code + exit_code, activity2 = renku_cli("run", "cp", intermediate, output) assert 0 == exit_code - assert 0 == len(run.subprocesses) - assert previous_run_id == run._id - - result = runner.invoke(cli, ["status"]) - assert 0 == result.exit_code, format_result_exception(result) - with output.open("r") as f: - assert f.read().strip() == "2" + write_and_commit_file(repo, source, "changed content") - from renku.cli.graph import GRAPH_FORMATS + exit_code, activity = renku_cli("update", "--all") - for output_format in GRAPH_FORMATS: - # Make sure the graph contains the original parent. - result = runner.invoke(cli, ["graph", "export", "--format", output_format], catch_exceptions=False) - assert 0 == result.exit_code, format_result_exception(result) - assert source.name in result.output, output_format + assert 0 == exit_code + plan = activity.association.plan + # assert previous_activity.association.plan.id == plan.id + assert not isinstance(plan, Plan) + assert isinstance(plan, PlanCollection) + assert {p.id for p in plan.plans} == {activity1.association.plan.id, activity2.association.plan.id} - if output_format == "nt": - r, _, t = validate_graph(result.output, format="nt") - assert r is True, t + assert "changed content" == Path(intermediate).read_text() + assert "changed content" == Path(output).read_text() + result = runner.invoke(cli, ["status"]) + assert 0 == result.exit_code, format_result_exception(result) -@pytest.mark.skip(reason="renku update not implemented with new metadata yet, reenable later") -def test_update_multiple_steps(runner, project, renku_cli, no_lfs_warning): - """Test automatic file update.""" - cwd = Path(project) - data = cwd / "data" - data.mkdir(exist_ok=True, parents=True) - source = cwd / "source.txt" - intermediate = cwd / "intermediate.txt" - output = cwd / "result.txt" +def test_update_multiple_steps_with_path(runner, project, renku_cli): + """Test update in a multi-step workflow when a path is specified.""" repo = git.Repo(project) + source = os.path.join(project, "source.txt") + intermediate = os.path.join(project, "intermediate.txt") + output = os.path.join(project, "output.txt") - update_and_commit("1", source, repo) + write_and_commit_file(repo, source, "content") - exit_code, run = renku_cli("run", "cp", str(source), str(intermediate)) + exit_code, activity1 = renku_cli("run", "cp", source, intermediate) assert 0 == exit_code - assert 0 == len(run.subprocesses) - - with intermediate.open("r") as f: - assert f.read().strip() == "1" - - result = runner.invoke(cli, ["status"]) - assert 0 == result.exit_code, format_result_exception(result) - - exit_code, run = renku_cli("run", "cp", str(intermediate), str(output)) + exit_code, activity2 = renku_cli("run", "cp", intermediate, output) assert 0 == exit_code - assert 0 == len(run.subprocesses) - with output.open("r") as f: - assert f.read().strip() == "1" + write_and_commit_file(repo, source, "changed content") - result = runner.invoke(cli, ["status"]) - assert 0 == result.exit_code, format_result_exception(result) - - update_and_commit("2", source, repo) + exit_code, activity = renku_cli("update", intermediate) - result = runner.invoke(cli, ["status"]) - assert 1 == result.exit_code - - exit_code, run = renku_cli("update", "--all") assert 0 == exit_code - assert 2 == len(run.subprocesses) + plan = activity.association.plan + # assert previous_activity.association.plan.id == plan.id + assert isinstance(plan, Plan) + assert not isinstance(plan, PlanCollection) + assert plan.id == activity1.association.plan.id - result = runner.invoke(cli, ["graph", "export"], catch_exceptions=False) - assert "(part of" in result.output, result.output + assert "changed content" == Path(intermediate).read_text() + assert "content" == Path(output).read_text() result = runner.invoke(cli, ["status"]) - assert 0 == result.exit_code, format_result_exception(result) + assert 1 == result.exit_code, format_result_exception(result) + assert "output.txt: intermediate.txt" in result.output + assert "source.txt" not in result.output - with output.open("r") as f: - assert f.read().strip() == "2" - -@pytest.mark.skip(reason="renku update not implemented with new metadata yet, reenable later") -def test_workflow_without_outputs(runner, project, run): +def test_update_workflow_without_outputs(runner, project, run): """Test workflow without outputs.""" repo = git.Repo(project) - cwd = Path(project) - input_ = cwd / "input.txt" - with input_.open("w") as f: - f.write("first") + source = os.path.join(project, "source.txt") - repo.git.add("--all") - repo.index.commit("Created input.txt") + write_and_commit_file(repo, source, "content") - cmd = ["run", "cat", "--no-output", input_.name] - result = runner.invoke(cli, cmd) - assert 0 == result.exit_code, format_result_exception(result) + assert 0 == runner.invoke(cli, ["run", "cat", "--no-output", source]).exit_code - cmd = ["status", "--no-output"] - result = runner.invoke(cli, cmd) - assert 0 == result.exit_code, format_result_exception(result) + write_and_commit_file(repo, source, "changes") - with input_.open("w") as f: - f.write("second") + assert 1 == runner.invoke(cli, ["status"]).exit_code - repo.git.add("--all") - repo.index.commit("Updated input.txt") + assert 0 == run(args=["update", "--all"]) - cmd = ["status", "--no-output"] - result = runner.invoke(cli, cmd) - assert 1 == result.exit_code - assert 0 == run(args=("update", "--no-output", "--all")) + result = runner.invoke(cli, ["status"]) - cmd = ["status", "--no-output"] - result = runner.invoke(cli, cmd) + # NOTE: Activity is updated or otherwise status would still return 1 assert 0 == result.exit_code, format_result_exception(result) -@pytest.mark.skip(reason="renku update not implemented with new metadata yet, reenable later") -def test_siblings_update(runner, project, run, no_lfs_warning): - """Test detection of siblings during update.""" - cwd = Path(project) - parent = cwd / "parent.txt" - brother = cwd / "brother.txt" - sister = cwd / "sister.txt" - siblings = {brother, sister} - +def test_update_siblings(project, run, no_lfs_warning): + """Test all generations of an activity are updated together.""" repo = git.Repo(project) + parent = os.path.join(project, "parent.txt") + brother = os.path.join(project, "brother.txt") + sister = os.path.join(project, "sister.txt") + siblings = [Path(brother), Path(sister)] - def update_source(data): - """Update parent.txt.""" - with parent.open("w") as fp: - fp.write(data) + write_and_commit_file(repo, parent, "content") - repo.git.add("--all") - repo.index.commit("Updated parent.txt") - - update_source("1") - - # The output files do not exist. - assert not any(sibling.exists() for sibling in siblings) - - cmd = ["run", "tee", "brother.txt"] - assert 0 == run(args=cmd, stdin=parent, stdout=sister) + assert 0 == run(args=["run", "tee", brother, sister], stdin=parent) # The output file is copied from the source. for sibling in siblings: - with sibling.open("r") as f: - assert f.read().strip() == "1", sibling + assert "content" == sibling.read_text() - update_source("2") + write_and_commit_file(repo, parent, "changed content") - # Siblings must be updated together. - for sibling in siblings: - assert 1 == run(args=("update", sibling.name)) - - # Update brother and check the sister has not been changed. - assert 0 == run(args=("update", "--with-siblings", brother.name)) + assert 0 == run(args=["update", brother]) for sibling in siblings: - with sibling.open("r") as f: - assert f.read().strip() == "2", sibling - - update_source("3") + assert "changed content" == sibling.read_text() # Siblings kept together even when one is removed. - repo.index.remove([brother.name], working_tree=True) + repo.index.remove([brother], working_tree=True) repo.index.commit("Brother removed") + assert not os.path.exists(brother) - assert not brother.exists() + write_and_commit_file(repo, parent, "more content") - # Update should find also missing siblings. - assert 1 == run(args=("update", "--all")) - assert 0 == run(args=("update", "--with-siblings", "--all")) + # Update should create the missing sibling + assert 0 == run(args=["update", "--all"]) for sibling in siblings: - with sibling.open("r") as f: - assert f.read().strip() == "3", sibling + assert "more content" == sibling.read_text() -@pytest.mark.skip(reason="renku update not implemented with new metadata yet, reenable later") -def test_siblings_in_output_directory(runner, project, run): +def test_update_siblings_in_output_directory(project, run): """Files in output directory are linked or removed after update.""" repo = git.Repo(project) - cwd = Path(project) - source = cwd / "source.txt" - output = cwd / "output" - - files = [ - ("first", "1"), - ("second", "2"), - ("third", "3"), - ] + source = os.path.join(project, "source.txt") + output = Path(os.path.join(project, "output")) # a directory def write_source(): """Write source from files.""" - with source.open("w") as fp: - fp.write("\n".join(" ".join(line) for line in files) + "\n") - - repo.git.add("--all") - repo.index.commit("Update source.txt") + write_and_commit_file(repo, source, content="\n".join(" ".join(line) for line in files) + "\n") def check_files(): """Check file content.""" assert len(files) == len(list(output.rglob("*"))) for name, content in files: - with (output / name).open() as fp: - assert content == fp.read().strip(), name + assert content == (output / name).read_text().strip() + files = [("first", "1"), ("second", "2"), ("third", "3")] write_source() script = 'mkdir -p "$0"; ' "cat - | while read -r name content; do " 'echo "$content" > "$0/$name"; done' - base_sh = ["sh", "-c", script, "output"] - assert not output.exists() - assert 0 == run(args=["run"] + base_sh, stdin=source) - assert output.exists() + assert not os.path.exists(output) + + assert 0 == run(args=["run", "sh", "-c", script, "output"], stdin=source) + + assert os.path.exists(output) check_files() - files = [ - ("first", "11"), - ("third", "3"), - ("fourth", "4"), - ] + files = [("third", "3"), ("fourth", "4")] write_source() + assert 0 == run(args=["update", "output"]) + check_files() -@pytest.mark.skip("renku update not implemented with new database, reenable once that is done") -def test_relative_path_for_directory_input(client, run, renku_cli): +def test_update_relative_path_for_directory_input(client, run, renku_cli): """Test having a directory input generates relative path in CWL.""" - (client.path / DATA_DIR / "file1").write_text("file1") - client.repo.git.add("--all") - client.repo.index.commit("Add file") + write_and_commit_file(client.repo, client.path / DATA_DIR / "file1", "file1") assert 0 == run(args=["run", "ls", DATA_DIR], stdout="ls.data") - (client.path / DATA_DIR / "file2").write_text("file2") - client.repo.git.add("--all") - client.repo.index.commit("Add one more file") + write_and_commit_file(client.repo, client.path / DATA_DIR / "file2", "file2") + + exit_code, activity = renku_cli("update", "--all") - exit_code, plan = renku_cli("update", "--all") assert 0 == exit_code + plan = activity.association.plan assert 1 == len(plan.inputs) assert "data" == plan.inputs[0].default_value -def test_update_no_args(runner, project, renku_cli, no_lfs_warning): +def test_update_no_args(runner, project, no_lfs_warning): """Test calling update with no args raises ParameterError.""" - cwd = Path(project) - data = cwd / DATA_DIR - data.mkdir(exist_ok=True, parents=True) - source = cwd / "source.txt" - output = data / "result.txt" - repo = git.Repo(project) + source = os.path.join(project, "source.txt") + output = os.path.join(project, "output.txt") - update_and_commit("1", source, repo) + write_and_commit_file(repo, source, "content") - exit_code, run = renku_cli("run", "wc", "-c", stdin=source, stdout=output) - assert 0 == exit_code - - result = runner.invoke(cli, ["status"]) - assert 0 == result.exit_code, format_result_exception(result) + assert 0 == runner.invoke(cli, ["run", "cp", source, output]).exit_code - update_and_commit("12", source, repo) - - result = runner.invoke(cli, ["status"]) - assert 1 == result.exit_code + write_and_commit_file(repo, source, "changed content") before_commit = repo.head.commit - exit_code, run = renku_cli("update") - assert 2 == exit_code + result = runner.invoke(cli, ["update"]) + + assert 2 == result.exit_code + assert "Either PATHS or --all/-a should be specified" in result.output assert before_commit == repo.head.commit