diff --git a/design/001-new-workflow-commands/001-new-workflow-commands.md b/design/001-new-workflow-commands/001-new-workflow-commands.md index faedfc19a1..c1673f2106 100644 --- a/design/001-new-workflow-commands/001-new-workflow-commands.md +++ b/design/001-new-workflow-commands/001-new-workflow-commands.md @@ -117,7 +117,7 @@ This command allows execution of a workflow template using a specified runner/pr Syntax examples: ``` -$ renku workflow execute --provider --config --mapping +$ renku workflow execute --provider --config --values $ renku workflow execute --provider --config --set learning_rate=0.9 --set step3.learning_rate=0.1 --set step3.result_file=/tmp/myresult.txt ``` diff --git a/renku/cli/workflow.py b/renku/cli/workflow.py index d202203d46..40693d634c 100644 --- a/renku/cli/workflow.py +++ b/renku/cli/workflow.py @@ -230,6 +230,7 @@ from rich.console import Console from rich.markdown import Markdown +from renku.cli.utils.callback import ClickCallback from renku.core.commands.echo import ERROR from renku.core.commands.format.workflow import WORKFLOW_COLUMNS, WORKFLOW_FORMATS from renku.core.commands.view_model.composite_plan import CompositePlanViewModel @@ -237,6 +238,7 @@ from renku.core.commands.workflow import ( compose_workflow_command, edit_workflow_command, + execute_workflow_command, export_workflow_command, list_workflows_command, remove_workflow_command, @@ -244,6 +246,7 @@ workflow_inputs_command, workflow_outputs_command, ) +from renku.core.plugins.provider import available_workflow_providers from renku.core.plugins.workflow import supported_formats @@ -551,8 +554,13 @@ def edit(workflow_name, name, description, set_params, map_params, rename_params ) def export(workflow_name, format, output, values): """Export workflow.""" + communicator = ClickCallback() + result = ( - export_workflow_command().build().execute(name_or_id=workflow_name, format=format, output=output, values=values) + export_workflow_command() + .with_communicator(communicator) + .build() + .execute(name_or_id=workflow_name, format=format, output=output, values=values) ) if not output: @@ -599,3 +607,58 @@ def outputs(ctx, paths): p not in output_paths and all(Path(o) not in Path(p).parents for o in output_paths) for p in paths ): ctx.exit(1) + + +@workflow.command() +@click.option( + "provider", + "-p", + "--provider", + default="cwltool", + show_default=True, + type=click.Choice(available_workflow_providers(), case_sensitive=False), + help="The workflow engine to use.", +) +@click.option("config", "-c", "--config", metavar="", help="YAML file containing config for the provider.") +@click.option( + "set_params", + "-s", + "--set", + multiple=True, + metavar="=", + help="Set for a to be used in execution.", +) +@click.option( + "--values", + metavar="", + type=click.Path(exists=True, dir_okay=False), + help="YAML file containing parameter mappings to be used.", +) +@click.argument("name_or_id", required=True) +def execute( + provider, + config, + set_params, + values, + name_or_id, +): + """Execute a given workflow.""" + communicator = ClickCallback() + + result = ( + execute_workflow_command() + .with_communicator(communicator) + .build() + .execute( + name_or_id=name_or_id, + provider=provider, + config=config, + values=values, + set_params=set_params, + ) + ) + + if result.output: + click.echo( + "Unchanged files:\n\n\t{0}".format("\n\t".join(click.style(path, fg="yellow") for path in result.output)) + ) diff --git a/renku/core/commands/rerun.py b/renku/core/commands/rerun.py index 0147bd31ff..8c20b259dc 100644 --- a/renku/core/commands/rerun.py +++ b/renku/core/commands/rerun.py @@ -21,7 +21,7 @@ from typing import List, Set from renku.core import errors -from renku.core.commands.update import execute_workflow +from renku.core.commands.workflow import execute_workflow from renku.core.management.command_builder.command import Command, inject from renku.core.management.interface.activity_gateway import IActivityGateway from renku.core.management.interface.client_dispatcher import IClientDispatcher diff --git a/renku/core/commands/update.py b/renku/core/commands/update.py index ab6d732153..98190de72a 100644 --- a/renku/core/commands/update.py +++ b/renku/core/commands/update.py @@ -17,29 +17,20 @@ # limitations under the License. """Renku ``update`` command.""" -import uuid from collections import defaultdict from pathlib import Path from typing import List, Set -from git import Actor - from renku.core import errors +from renku.core.commands.workflow import execute_workflow from renku.core.errors import ParameterError from renku.core.management.command_builder import inject from renku.core.management.command_builder.command import Command from renku.core.management.interface.activity_gateway import IActivityGateway from renku.core.management.interface.client_dispatcher import IClientDispatcher -from renku.core.management.interface.plan_gateway import IPlanGateway -from renku.core.management.workflow.plan_factory import delete_indirect_files_list -from renku.core.models.provenance.activity import Activity, ActivityCollection -from renku.core.models.workflow.composite_plan import CompositePlan -from renku.core.models.workflow.plan import Plan -from renku.core.utils.datetime8601 import local_now -from renku.core.utils.git import add_to_git +from renku.core.models.provenance.activity import Activity from renku.core.utils.metadata import add_activity_if_recent, get_modified_activities from renku.core.utils.os import get_relative_paths -from renku.version import __version__, version_url def update_command(): @@ -121,132 +112,3 @@ def does_activity_generate_any_paths(activity): include_newest_activity(activity) return {a for activities in all_activities.values() for a in activities} - - -@inject.autoparams() -def execute_workflow( - plans: List[Plan], - command_name, - client_dispatcher: IClientDispatcher, - activity_gateway: IActivityGateway, - plan_gateway: IPlanGateway, -): - """Execute a Run with/without subprocesses.""" - client = client_dispatcher.current_client - - # NOTE: Pull inputs from Git LFS or other storage backends - if client.check_external_storage(): - inputs = [i.actual_value for p in plans for i in p.inputs] - client.pull_paths_from_storage(*inputs) - - delete_indirect_files_list(client.path) - - started_at_time = local_now() - - modified_outputs = _execute_workflow_helper(plans=plans, client=client) - - ended_at_time = local_now() - - add_to_git(client.repo.git, *modified_outputs) - - if client.repo.is_dirty(): - postfix = "s" if len(modified_outputs) > 1 else "" - commit_msg = f"renku {command_name}: committing {len(modified_outputs)} modified file{postfix}" - committer = Actor(f"renku {__version__}", version_url) - client.repo.index.commit(commit_msg, committer=committer, skip_hooks=True) - - activities = [] - - for plan in plans: - # NOTE: Update plans are copies of Plan objects. We need to use the original Plan objects to avoid duplicates. - original_plan = plan_gateway.get_by_id(plan.id) - activity = Activity.from_plan(plan=original_plan, started_at_time=started_at_time, ended_at_time=ended_at_time) - activity_gateway.add(activity) - activities.append(activity) - - if len(activities) > 1: - activity_collection = ActivityCollection(activities=activities) - activity_gateway.add_activity_collection(activity_collection) - - -# TODO: This function is created as a patch from renku/core/commands/workflow.py::_execute_workflow and -# renku/core/management/workflow/providers/cwltool_provider.py::CWLToolProvider::workflow_execute in the ``workflow -# execute`` PR (renku-python/pull/2273). Once the PR is merged remove this function and refactor -# renku/core/commands/workflow.py::_execute_workflow to accept a Plan and use it here. -def _execute_workflow_helper(plans: List[Plan], client): - """Executes a given workflow using cwltool.""" - import os - import shutil - import sys - import tempfile - from pathlib import Path - from urllib.parse import unquote - - import cwltool.factory - from cwltool.context import LoadingContext, RuntimeContext - - from renku.core import errors - from renku.core.commands.echo import progressbar - - basedir = client.path - - # NOTE: Create a ``CompositePlan`` because ``workflow_covert`` expects it - workflow = CompositePlan(id=CompositePlan.generate_id(), plans=plans, name=f"plan-collection-{uuid.uuid4().hex}") - - with tempfile.NamedTemporaryFile() as f: - # export Plan to cwl - from renku.core.management.workflow.converters.cwl import CWLExporter - - converter = CWLExporter() - converter.workflow_convert(workflow=workflow, basedir=basedir, output=Path(f.name), output_format=None) - - # run cwl with cwltool - argv = sys.argv - sys.argv = ["cwltool"] - - runtime_args = {"rm_tmpdir": False, "move_outputs": "leave", "preserve_entire_environment": True} - loading_args = {"relax_path_checks": True} - - # Keep all environment variables. - runtime_context = RuntimeContext(kwargs=runtime_args) - loading_context = LoadingContext(kwargs=loading_args) - - factory = cwltool.factory.Factory(loading_context=loading_context, runtime_context=runtime_context) - process = factory.make(os.path.relpath(str(f.name))) - - try: - outputs = process() - except cwltool.factory.WorkflowStatus: - raise errors.RenkuException("WorkflowExecuteError()") - - sys.argv = argv - - # Move outputs to correct location in the repository. - output_dirs = process.factory.executor.output_dirs - - def remove_prefix(location, prefix="file://"): - if location.startswith(prefix): - return unquote(location[len(prefix) :]) - return unquote(location) - - locations = {remove_prefix(output["location"]) for output in outputs.values()} - # make sure to not move an output if it's containing directory gets moved - locations = { - location for location in locations if not any(location.startswith(d) for d in locations if location != d) - } - - output_paths = [] - with progressbar(locations, label="Moving outputs") as bar: - for location in bar: - for output_dir in output_dirs: - if location.startswith(output_dir): - output_path = location[len(output_dir) :].lstrip(os.path.sep) - destination = basedir / output_path - output_paths.append(destination) - if destination.is_dir(): - shutil.rmtree(str(destination)) - destination = destination.parent - shutil.move(location, str(destination)) - continue - - return client.remove_unmodified(output_paths) diff --git a/renku/core/commands/workflow.py b/renku/core/commands/workflow.py index 6e8878286f..720d972dcd 100644 --- a/renku/core/commands/workflow.py +++ b/renku/core/commands/workflow.py @@ -18,9 +18,12 @@ """Renku workflow commands.""" +import uuid from datetime import datetime from pathlib import Path -from typing import List, Optional +from typing import Any, Dict, List, Optional + +from git import Actor from renku.core import errors from renku.core.commands.format.workflow import WORKFLOW_FORMATS @@ -33,11 +36,17 @@ from renku.core.management.interface.plan_gateway import IPlanGateway from renku.core.management.interface.project_gateway import IProjectGateway from renku.core.management.workflow.concrete_execution_graph import ExecutionGraph -from renku.core.management.workflow.value_resolution import apply_run_values +from renku.core.management.workflow.plan_factory import delete_indirect_files_list +from renku.core.management.workflow.value_resolution import CompositePlanValueResolver, ValueResolver +from renku.core.models.provenance.activity import Activity, ActivityCollection from renku.core.models.workflow.composite_plan import CompositePlan from renku.core.models.workflow.plan import AbstractPlan, Plan +from renku.core.plugins.provider import execute from renku.core.utils import communication +from renku.core.utils.datetime8601 import local_now +from renku.core.utils.git import add_to_git from renku.core.utils.os import get_relative_paths +from renku.version import __version__, version_url def _ref(name): @@ -51,6 +60,15 @@ def _deref(ref): return ref[len("workflows/") :] +def _safe_read_yaml(file: str) -> Dict[str, Any]: + try: + from renku.core.models import jsonld as jsonld + + return jsonld.read_yaml(file) + except Exception as e: + raise errors.ParameterError(e) + + @inject.autoparams() def _find_workflow(name_or_id: str, plan_gateway: IPlanGateway) -> AbstractPlan: workflow = plan_gateway.get_by_id(name_or_id) or plan_gateway.get_by_name(name_or_id) @@ -187,7 +205,8 @@ def _group_workflow( if link_all: # NOTE: propagate values to for linking to use - apply_run_values(plan) + rv = CompositePlanValueResolver(plan, None) + plan = rv.apply() graph = ExecutionGraph(plan, virtual_links=True) @@ -279,27 +298,19 @@ def _export_workflow( if output: output = Path(output) - from renku.core.plugins.pluginmanager import get_plugin_manager - - pm = get_plugin_manager() - supported_formats = pm.hook.workflow_format() - export_plugins = list(map(lambda x: x[0], supported_formats)) - converter = list(map(lambda x: x[0], filter(lambda x: format in x[1], supported_formats))) - if not any(converter): - raise errors.ParameterError(f"The specified workflow exporter format '{format}' is not available.") - elif len(converter) > 1: - raise errors.ConfigurationError( - f"The specified format '{format}' is supported by more than one export plugins!" - ) - if values: - from renku.core.models import jsonld as jsonld - - values = jsonld.read_yaml(values) - workflow = apply_run_values(workflow, values) - - export_plugins.remove(converter[0]) - converter = pm.subset_hook_caller("workflow_convert", export_plugins) + values = _safe_read_yaml(values) + rv = ValueResolver.get(workflow, values) + workflow = rv.apply() + if rv.missing_parameters: + communication.warn( + f'Could not resolve the following parameters in "{workflow.name}" workflow: ' + f'{",".join(rv.missing_parameters)}' + ) + + from renku.core.plugins.workflow import workflow_converter + + converter = workflow_converter(format) return converter(workflow=workflow, basedir=client.path, output=output, output_format=format) @@ -376,3 +387,100 @@ def _workflow_outputs(activity_gateway: IActivityGateway, paths: List[str] = Non def workflow_outputs_command(): """Command that shows inputs used by workflows.""" return Command().command(_workflow_outputs).require_migration().with_database(write=False) + + +@inject.autoparams() +def execute_workflow( + plans: List[Plan], + command_name, + client_dispatcher: IClientDispatcher, + activity_gateway: IActivityGateway, + plan_gateway: IPlanGateway, + provider="cwltool", + config=None, +): + """Execute a Run with/without subprocesses.""" + client = client_dispatcher.current_client + + # NOTE: Pull inputs from Git LFS or other storage backends + if client.check_external_storage(): + inputs = [i.actual_value for p in plans for i in p.inputs] + client.pull_paths_from_storage(*inputs) + + delete_indirect_files_list(client.path) + + started_at_time = local_now() + + # NOTE: Create a ``CompositePlan`` because ``workflow_covert`` expects it + workflow = CompositePlan(id=CompositePlan.generate_id(), plans=plans, name=f"plan-collection-{uuid.uuid4().hex}") + modified_outputs = execute(workflow=workflow, basedir=client.path, provider=provider, config=config) + + ended_at_time = local_now() + + add_to_git(client.repo.git, *modified_outputs) + + if client.repo.is_dirty(): + postfix = "s" if len(modified_outputs) > 1 else "" + commit_msg = f"renku {command_name}: committing {len(modified_outputs)} modified file{postfix}" + committer = Actor(f"renku {__version__}", version_url) + client.repo.index.commit(commit_msg, committer=committer, skip_hooks=True) + + activities = [] + + for plan in plans: + # NOTE: Update plans are copies of Plan objects. We need to use the original Plan objects to avoid duplicates. + original_plan = plan_gateway.get_by_id(plan.id) + activity = Activity.from_plan(plan=original_plan, started_at_time=started_at_time, ended_at_time=ended_at_time) + activity_gateway.add(activity) + activities.append(activity) + + if len(activities) > 1: + activity_collection = ActivityCollection(activities=activities) + activity_gateway.add_activity_collection(activity_collection) + + +@inject.autoparams() +def _execute_workflow( + name_or_id: str, set_params: List[str], provider: str, config: Optional[str], values: Optional[str] +): + workflow = _find_workflow(name_or_id) + + # apply the provided parameter settings provided by user + override_params = dict() + if values: + override_params.update(_safe_read_yaml(values)) + + if set_params: + for param in set_params: + name, value = param.split("=", maxsplit=1) + override_params[name] = value + + if override_params: + rv = ValueResolver.get(workflow, override_params) + workflow = rv.apply() + + if rv.missing_parameters: + communication.warn( + f'Could not resolve the following parameters in "{workflow.name}" workflow: ' + f'{",".join(rv.missing_parameters)}' + ) + + if config: + config = _safe_read_yaml(config) + + if isinstance(workflow, CompositePlan): + import networkx as nx + + graph = ExecutionGraph(workflow=workflow, virtual_links=True) + plans = list(nx.topological_sort(graph.workflow_graph)) + else: + plans = [workflow] + + execute_workflow(plans=plans, command_name="execute", provider=provider, config=config) + + +def execute_workflow_command(): + """Command that executes a workflow.""" + return ( + Command().command(_execute_workflow).require_migration().require_clean().with_database(write=True).with_commit() + ) diff --git a/renku/core/errors.py b/renku/core/errors.py index e7dfed5722..32c0bf7fc7 100644 --- a/renku/core/errors.py +++ b/renku/core/errors.py @@ -407,9 +407,19 @@ class CommitProcessingError(RenkuException): """Raised when a commit couldn't be processed during graph build.""" -class WorkflowRerunError(RenkuException): +class WorkflowExecuteError(RenkuException): """Raises when a workflow execution fails.""" + def __init__(self): + """Build a custom message.""" + + msg = "Unable to finish executing workflow" + super(WorkflowExecuteError, self).__init__(msg) + + +class WorkflowRerunError(RenkuException): + """Raises when a workflow re-execution fails.""" + def __init__(self, workflow_file): """Build a custom message.""" msg = ( diff --git a/renku/core/management/workflow/plan_factory.py b/renku/core/management/workflow/plan_factory.py index 32f5a60e04..dbdcdad59d 100644 --- a/renku/core/management/workflow/plan_factory.py +++ b/renku/core/management/workflow/plan_factory.py @@ -322,7 +322,7 @@ def _check_potential_output_directory( return candidates - def _get_mimetype(self, file: Path) -> str: + def _get_mimetype(self, file: Path) -> List[str]: """Return the MIME-TYPE of the given file.""" # TODO: specify the actual mime-type of the file return ["application/octet-stream"] diff --git a/renku/core/management/workflow/providers/cwltool_provider.py b/renku/core/management/workflow/providers/cwltool_provider.py new file mode 100644 index 0000000000..3d52f39182 --- /dev/null +++ b/renku/core/management/workflow/providers/cwltool_provider.py @@ -0,0 +1,132 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2017-2021 - Swiss Data Science Center (SDSC) +# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and +# Eidgenössische Technische Hochschule Zürich (ETHZ). +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""cwltool based provider. + +This implementation provides a plugin for executing workflows using ``cwltool``, +by simply exporting the workflow into a CWL format and then using ``cwltool`` +to run the exported CWL file. + +.. code-block:: console + + $ renku workflow execute --provider cwltool example_workflow + + +.. topic:: Specifying custom parameters for cwltool (``--config``) + + You can specify custom configuration parameters for ``cwltool`` executer + by providing a YAML file for the ``--config`` option. The YAML file + should contain both the ``LoadingContext`` and ``RuntimeContext`` key-value + pairs. + +.. code-block:: console + + $ renku workflow execute --config config.yaml --provider cwltool example_workflow +""" + +import os +import shutil +import sys +import tempfile +from pathlib import Path +from typing import Any, Dict +from urllib.parse import unquote + +import cwltool.factory +from cwltool.context import LoadingContext, RuntimeContext + +from renku.core.commands.echo import progressbar +from renku.core.errors import WorkflowExecuteError +from renku.core.models.workflow.plan import AbstractPlan +from renku.core.models.workflow.provider import IWorkflowProvider +from renku.core.plugins import hookimpl +from renku.core.plugins.workflow import workflow_converter + + +class CWLToolProvider(IWorkflowProvider): + """A workflow executor provider using cwltool.""" + + @hookimpl + def workflow_provider(self): + """Workflow provider name.""" + return (self, "cwltool") + + @hookimpl + def workflow_execute(self, workflow: AbstractPlan, basedir: Path, config: Dict[str, Any]): + """Executes a given workflow using cwltool.""" + with tempfile.NamedTemporaryFile() as f: + # export Plan to cwl + converter = workflow_converter("cwl") + converter(workflow=workflow, basedir=basedir, output=Path(f.name), output_format=None) + + # run cwl with cwltool + argv = sys.argv + sys.argv = ["cwltool"] + + runtime_args = {"rm_tmpdir": False, "move_outputs": "leave", "preserve_entire_environment": True} + loading_args = {"relax_path_checks": True} + if config: + # update both RuntimeContext and LoadingContext parameters with user supplied values + # context.ContextBase takes care that only available parameters are set in a given class + runtime_args.update(config) + loading_args.update(config) + + # Keep all environment variables. + runtime_context = RuntimeContext(kwargs=runtime_args) + loading_context = LoadingContext(kwargs=loading_args) + + factory = cwltool.factory.Factory(loading_context=loading_context, runtime_context=runtime_context) + process = factory.make(os.path.relpath(str(f.name))) + + try: + outputs = process() + except cwltool.factory.WorkflowStatus: + raise WorkflowExecuteError() + + sys.argv = argv + + # Move outputs to correct location in the repository. + output_dirs = process.factory.executor.output_dirs + + def remove_prefix(location, prefix="file://"): + if location.startswith(prefix): + return unquote(location[len(prefix) :]) + return unquote(location) + + locations = {remove_prefix(output["location"]) for output in outputs.values()} + # make sure to not move an output if it's containing directory gets moved + locations = { + location + for location in locations + if not any(location.startswith(d) for d in locations if location != d) + } + + output_paths = [] + with progressbar(locations, label="Moving outputs") as bar: + for location in bar: + for output_dir in output_dirs: + if location.startswith(output_dir): + output_path = location[len(output_dir) :].lstrip(os.path.sep) + destination = basedir / output_path + output_paths.append(destination) + if destination.is_dir(): + shutil.rmtree(str(destination)) + destination = destination.parent + shutil.move(location, str(destination)) + continue + + return output_paths diff --git a/renku/core/management/workflow/value_resolution.py b/renku/core/management/workflow/value_resolution.py index 85a0982898..1444307828 100644 --- a/renku/core/management/workflow/value_resolution.py +++ b/renku/core/management/workflow/value_resolution.py @@ -17,89 +17,135 @@ # limitations under the License. """Resolution of Worklow execution values precedence.""" -from typing import Any, Dict, Union +from abc import ABC, abstractmethod +from itertools import chain +from typing import Any, Dict, Set from renku.core import errors from renku.core.models.workflow.composite_plan import CompositePlan from renku.core.models.workflow.parameter import ParameterMapping -from renku.core.models.workflow.plan import Plan +from renku.core.models.workflow.plan import AbstractPlan, Plan -def apply_run_values(workflow: Union[CompositePlan, Plan], values: Dict[str, Any] = None) -> None: - """Applies values and default_values to a potentially nested workflow. +class ValueResolver(ABC): + """Value resolution class for an ``AbstractPlan``.""" - Order of precedence is as follows (from lowest to highest): - - Default value on a parameter - - Default value on a mapping to the parameter - - Value passed to a mapping to the parameter - - Value passed to the parameter - - Value propagated to a parameter from the source of a ParameterLink + def __init__(self, plan: AbstractPlan, values: Dict[str, Any]): + self._values = values + self.missing_parameters: Set[str] = set() + self._plan = plan + + @abstractmethod + def apply(self) -> AbstractPlan: + """Applies values and default_values to a potentially nested workflow. + + :returns: The ``AbstractPlan`` with the user provided values set. + """ + pass + + @staticmethod + def get(plan: AbstractPlan, values: Dict[str, Any]) -> "ValueResolver": + """Factory method to obtain the specific ValueResolver for a workflow. + + :param plan: a workflow. + :param values: user defined dictionary of runtime values for the provided workflow. + :returns: A ValueResolver object + """ + return PlanValueResolver(plan, values) if isinstance(plan, Plan) else CompositePlanValueResolver(plan, values) + + +class PlanValueResolver(ValueResolver): + """Value resolution class for a ``Plan``. + + Applies values and default_values to a workflow. """ - if isinstance(workflow, Plan): - return apply_single_run_values(workflow, values) + def __init__(self, plan: Plan, values: Dict[str, Any]): + super(PlanValueResolver, self).__init__(plan, values) + + def apply(self) -> AbstractPlan: + """Applies values and default_values to a ``Plan``.""" + if not self._values: + return self._plan + + values_keys = set(self._values.keys()) + for param in chain(self._plan.inputs, self._plan.outputs, self._plan.parameters): + if param.name in self._values: + param.actual_value = self._values[param.name] + values_keys.discard(param.name) - return apply_composite_run_values(workflow, values) + self.missing_parameters = values_keys + return self._plan -def apply_single_run_values(workflow: Plan, values: Dict[str, Any] = None) -> None: - """Applies values and default_values to a workflow.""" - if not values: - return workflow - for param in workflow.inputs + workflow.outputs + workflow.parameters: - if param.name in values: - param.actual_value = values[param.name] +class CompositePlanValueResolver(ValueResolver): + """Value resolution class for a ``CompositePlan``. - return workflow + Applies values and default_values to a nested workflow. + + Order of precedence is as follows (from lowest to highest): + - Default value on a parameter + - Default value on a mapping to the parameter + - Value passed to a mapping to the parameter + - Value passed to the parameter + - Value propagated to a parameter from the source of a ParameterLink + """ + def __init__(self, plan: CompositePlan, values: Dict[str, Any]): + super(CompositePlanValueResolver, self).__init__(plan, values) -def apply_composite_run_values(workflow: CompositePlan, values: Dict[str, Any] = None) -> None: - """Applies values and default_values to a nested workflow.""" + def apply(self) -> AbstractPlan: + """Applies values and default_values to a ``CompositePlan``.""" - if values: - if "parameters" in values: - # NOTE: Set mapping parameter values - apply_parameters_values(workflow, values["parameters"]) + if self._values: + if "parameters" in self._values: + # NOTE: Set mapping parameter values + self._apply_parameters_values() - if "steps" in values: - for name, step in values["steps"].items(): - child_workflow = next((w for w in workflow.plans if w.name == name), None) - if not child_workflow: - raise errors.ChildWorkflowNotFound(name, workflow.name) + if "steps" in self._values: + for name, step in self._values["steps"].items(): + child_workflow = next((w for w in self._plan.plans if w.name == name), None) + if not child_workflow: + raise errors.ChildWorkflowNotFoundError(name, self._plan.name) - apply_run_values(child_workflow, step) + rv = ValueResolver.get(child_workflow, step) + _ = rv.apply() + self.missing_parameters.update({f"steps.name.{mp}" for mp in rv.missing_parameters}) - # apply defaults - for mapping in workflow.mappings: - apply_parameter_defaults(mapping) + self.missing_parameters.update(set(self._values.keys()) - {"parameters", "steps"}) - apply_parameter_links(workflow) + # apply defaults + for mapping in self._plan.mappings: + self._apply_parameter_defaults(mapping) + apply_parameter_links(self._plan) -def apply_parameter_defaults(mapping: ParameterMapping) -> None: - """Apply default values to a mapping and contained params if they're not set already.""" + return self._plan - if not mapping.actual_value_set and mapping.default_value: - mapping.actual_value = mapping.default_value + def _apply_parameter_defaults(self, mapping: ParameterMapping) -> None: + """Apply default values to a mapping and contained params if they're not set already.""" - for mapped_to in mapping.mapped_parameters: - if isinstance(mapped_to, ParameterMapping): - apply_parameter_defaults(mapped_to) - else: - if not mapped_to.actual_value_set: - mapped_to.actual_value = mapping.default_value + if not mapping.actual_value_set and mapping.default_value: + mapping.actual_value = mapping.default_value + for mapped_to in mapping.mapped_parameters: + if isinstance(mapped_to, ParameterMapping): + self._apply_parameter_defaults(mapped_to) + else: + if not mapped_to.actual_value_set: + mapped_to.actual_value = mapping.default_value -def apply_parameters_values(workflow: CompositePlan, values: Dict[str, str]) -> None: - """Apply values to mappings of a CompositePlan.""" - for k, v in values.items(): - mapping = next((m for m in workflow.mappings if m.name == k), None) + def _apply_parameters_values(self) -> None: + """Apply values to mappings of a CompositePlan.""" + for k, v in self._values["parameters"].items(): + mapping = next((m for m in self._plan.mappings if m.name == k), None) - if not mapping: - raise errors.ParameterNotFoundError(k, workflow.name) + if not mapping: + self.missing_parameters.add(k) + continue - mapping.actual_value = v + mapping.actual_value = v def apply_parameter_links(workflow: CompositePlan) -> None: diff --git a/renku/core/models/workflow/converters/__init__.py b/renku/core/models/workflow/converters/__init__.py index 38d9765965..15b23093dd 100644 --- a/renku/core/models/workflow/converters/__init__.py +++ b/renku/core/models/workflow/converters/__init__.py @@ -21,10 +21,9 @@ from abc import ABCMeta, abstractmethod from pathlib import Path -from typing import List, Optional, Tuple, Union +from typing import List, Optional, Tuple -from renku.core.models.workflow.composite_plan import CompositePlan -from renku.core.models.workflow.plan import Plan +from renku.core.models.workflow.plan import AbstractPlan class IWorkflowConverter(metaclass=ABCMeta): @@ -37,7 +36,7 @@ def workflow_format(self) -> Tuple[IWorkflowConverter, List[str]]: @abstractmethod def workflow_convert( - self, workflow: Union[CompositePlan, Plan], basedir: Path, output: Optional[Path], output_format: Optional[str] + self, workflow: AbstractPlan, basedir: Path, output: Optional[Path], output_format: Optional[str] ) -> str: """Converts a single workflow step to a desired workflow format.""" pass diff --git a/renku/core/models/workflow/provider.py b/renku/core/models/workflow/provider.py new file mode 100644 index 0000000000..9b724ccb04 --- /dev/null +++ b/renku/core/models/workflow/provider.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2017-2021 - Swiss Data Science Center (SDSC) +# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and +# Eidgenössische Technische Hochschule Zürich (ETHZ). +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Workflow executor provider.""" + +from __future__ import annotations + +from abc import ABCMeta, abstractmethod +from pathlib import Path +from typing import Any, Dict, Tuple + +from renku.core.models.workflow.plan import AbstractPlan + + +class IWorkflowProvider(metaclass=ABCMeta): + """Abstract class for executing ``Plan``.""" + + @abstractmethod + def workflow_provider(self) -> Tuple[IWorkflowProvider, str]: + """Supported workflow description formats. + + :returns: a tuple of ``self`` and format. + """ + pass + + @abstractmethod + def workflow_execute(self, workflow: AbstractPlan, basedir: Path, config: Dict[str, Any]): + """Executes a given ``AbstractPlan`` using the provider. + + :returns: a list of output paths that were generated by this workflow. + """ + pass diff --git a/renku/core/plugins/implementations/__init__.py b/renku/core/plugins/implementations/__init__.py index 43f676ef14..86361eb865 100644 --- a/renku/core/plugins/implementations/__init__.py +++ b/renku/core/plugins/implementations/__init__.py @@ -18,7 +18,9 @@ """Renku plugin implementations.""" from renku.core.management.workflow.converters.cwl import CWLExporter +from renku.core.management.workflow.providers.cwltool_provider import CWLToolProvider __all__ = [] workflow_exporters = [CWLExporter] +workflow_providers = [CWLToolProvider] diff --git a/renku/core/plugins/pluginmanager.py b/renku/core/plugins/pluginmanager.py index 07437b2ec8..3a2f813f5b 100644 --- a/renku/core/plugins/pluginmanager.py +++ b/renku/core/plugins/pluginmanager.py @@ -21,6 +21,7 @@ import pluggy from renku.core.plugins import implementations as default_implementations +from renku.core.plugins import provider as provider_hook_specs from renku.core.plugins import run as run_hook_specs from renku.core.plugins import workflow as workflow_hook_specs @@ -29,6 +30,7 @@ def get_plugin_manager(): """The ``pluggy`` plugin manager.""" pm = pluggy.PluginManager("renku") + pm.add_hookspecs(provider_hook_specs) pm.add_hookspecs(run_hook_specs) pm.add_hookspecs(workflow_hook_specs) pm.load_setuptools_entrypoints("renku") diff --git a/renku/core/plugins/provider.py b/renku/core/plugins/provider.py new file mode 100644 index 0000000000..3408dad05c --- /dev/null +++ b/renku/core/plugins/provider.py @@ -0,0 +1,81 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2017-2021- Swiss Data Science Center (SDSC) +# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and +# Eidgenössische Technische Hochschule Zürich (ETHZ). +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Plugin hooks for renku run customization.""" +from pathlib import Path +from typing import Any, Dict, List, Tuple + +import pluggy + +from renku.core import errors +from renku.core.models.workflow.plan import AbstractPlan +from renku.core.models.workflow.provider import IWorkflowProvider + +hookspec = pluggy.HookspecMarker("renku") + + +@hookspec +def workflow_provider() -> Tuple[IWorkflowProvider, str]: + """Plugin Hook to get providers for ``workflow execute`` call. + + :returns: A tuple of the provider itself and the workflow executor backends name. + """ + raise NotImplementedError + + +@hookspec(firstresult=True) +def workflow_execute(workflow: AbstractPlan, basedir: Path, config: Dict[str, Any]): + """Plugin Hook for ``workflow execute`` call. + + Can be used to execute renku workflows with different workflow executors. + + :param workflow: a ``AbstractPlan`` object that describes the given workflow. + :param config: a configuration for the provider. + """ + raise NotImplementedError + + +def available_workflow_providers() -> List[str]: + """Returns the currently available workflow execution providers.""" + from renku.core.plugins.pluginmanager import get_plugin_manager + + pm = get_plugin_manager() + providers = pm.hook.workflow_provider() + return [p[1] for p in providers] + + +def execute(workflow: AbstractPlan, basedir: Path, config: Dict[str, Any], provider: str = "cwltool") -> List[str]: + """Executes a given workflow using the selected provider. + + :param workflow: Workflow to be executed. + :param basedir: The root directory of the renku project. + :param config: Configuration values for the workflow provider. + :param provider: The workflow executor engine to be used. + :returns: List of paths that has been modified. + """ + from renku.core.plugins.pluginmanager import get_plugin_manager + + pm = get_plugin_manager() + providers = pm.hook.workflow_provider() + provider = next(filter(lambda x: provider == x[1], providers), None) + if not provider: + raise errors.ParameterError(f"The specified workflow executor '{provider}' is not available.") + + providers.remove(provider) + executor = pm.subset_hook_caller("workflow_execute", list(map(lambda x: x[0], providers))) + + return executor(workflow=workflow, basedir=basedir, config=config) diff --git a/renku/core/plugins/workflow.py b/renku/core/plugins/workflow.py index d3a71e5fcd..1e08d57eb9 100644 --- a/renku/core/plugins/workflow.py +++ b/renku/core/plugins/workflow.py @@ -17,10 +17,11 @@ # limitations under the License. """Plugin hooks for renku workflow customization.""" from pathlib import Path -from typing import List, Optional, Tuple +from typing import Callable, List, Optional, Tuple import pluggy +from renku.core import errors from renku.core.models.workflow.converters import IWorkflowConverter from renku.core.models.workflow.plan import Plan @@ -55,10 +56,28 @@ def workflow_convert(workflow: Plan, basedir: Path, output: Optional[Path], outp pass -def supported_formats(): +def supported_formats() -> List[str]: """Returns the currently available workflow language format types.""" from renku.core.plugins.pluginmanager import get_plugin_manager pm = get_plugin_manager() supported_formats = pm.hook.workflow_format() return [format for fs in supported_formats for format in fs[1]] + + +def workflow_converter(format: str) -> Callable[[Plan, Path, Optional[Path], Optional[str]], str]: + """Returns a workflow converter function for a given format if available.""" + from renku.core.plugins.pluginmanager import get_plugin_manager + + pm = get_plugin_manager() + supported_formats = pm.hook.workflow_format() + export_plugins = list(map(lambda x: x[0], supported_formats)) + converter = list(map(lambda x: x[0], filter(lambda x: format in x[1], supported_formats))) + if not any(converter): + raise errors.ParameterError(f"The specified workflow exporter format '{format}' is not available.") + elif len(converter) > 1: + raise errors.ConfigurationError( + f"The specified format '{format}' is supported by more than one export plugins!" + ) + export_plugins.remove(converter[0]) + return pm.subset_hook_caller("workflow_convert", export_plugins) diff --git a/tests/cli/test_workflow.py b/tests/cli/test_workflow.py index c96e1ab0b1..42fe5dc22e 100644 --- a/tests/cli/test_workflow.py +++ b/tests/cli/test_workflow.py @@ -17,12 +17,17 @@ # limitations under the License. """Test ``workflow`` commands.""" +import os +import tempfile from pathlib import Path +import pytest from cwl_utils import parser_v1_2 as cwlgen from renku.cli import cli from renku.core.metadata.database import Database +from renku.core.models.jsonld import write_yaml +from renku.core.plugins.provider import available_workflow_providers from tests.utils import format_result_exception @@ -321,3 +326,71 @@ def test_workflow_show_outputs_with_directory(runner, client, run): result = runner.invoke(cli, cmd + ["output/foo", "output/bar"]) assert 0 == result.exit_code, format_result_exception(result) assert {"output"} == set(result.output.strip().split("\n")) + + +@pytest.mark.parametrize("provider", available_workflow_providers()) +@pytest.mark.parametrize("yaml", [False, True]) +@pytest.mark.parametrize( + "workflows, parameters", + [ + ([("run", 'echo "a" > output1')], {}), + ([("run", 'echo "a" > output1')], {"run": {"outputs": ["replaced"]}}), + ([("run", 'echo "a" > output1')], {"run": {"parameters": ["foo"], "outputs": ["bar"]}}), + ( + [("run1", "touch data.csv"), ("run2", "wc data.csv > output")], + {"run1": {"outputs": ["foo"]}, "run2": {"inputs": ["foo"], "outputs": ["bar"]}}, + ), + ], +) +def test_workflow_execute_command(runner, run_shell, project, capsys, client, provider, yaml, workflows, parameters): + """test workflow execute.""" + + for wf in workflows: + output = run_shell(f"renku run --name {wf[0]} -- {wf[1]}") + # Assert expected empty stdout. + assert b"" == output[0] + # Assert not allocated stderr. + assert output[1] is None + + def _execute(args): + with capsys.disabled(): + try: + cli.main( + args=args, + prog_name=runner.get_default_prog_name(cli), + ) + except SystemExit as e: + assert e.code in {None, 0} + + if not parameters: + for wf in workflows: + execute_cmd = ["workflow", "execute", "-p", provider, wf[0]] + _execute(execute_cmd) + else: + database = Database.from_path(client.database_path) + for wf in workflows: + if wf[0] in parameters: + plan = database["plans-by-name"][wf[0]] + execute_cmd = ["workflow", "execute", "-p", provider] + + overrides = dict() + for k, values in parameters[wf[0]].items(): + for i, v in enumerate(values): + overrides[getattr(plan, k)[i].name] = v + + if yaml: + fd, values_path = tempfile.mkstemp() + os.close(fd) + write_yaml(values_path, overrides) + execute_cmd += ["--values", values_path] + else: + [execute_cmd.extend(["--set", f"{k}={v}"]) for k, v in overrides.items()] + + execute_cmd.append(wf[0]) + + _execute(execute_cmd) + + # check whether parameters setting was effective + if "outputs" in parameters[wf[0]]: + for o in parameters[wf[0]]["outputs"]: + assert Path(o).resolve().exists() diff --git a/tests/core/commands/test_workflow.py b/tests/core/commands/test_workflow.py index 1d07146ce5..1d74a3b022 100644 --- a/tests/core/commands/test_workflow.py +++ b/tests/core/commands/test_workflow.py @@ -24,7 +24,7 @@ from renku.core import errors from renku.core.management.workflow.concrete_execution_graph import ExecutionGraph -from renku.core.management.workflow.value_resolution import apply_run_values +from renku.core.management.workflow.value_resolution import CompositePlanValueResolver from renku.core.models.workflow.composite_plan import CompositePlan @@ -409,9 +409,10 @@ def test_composite_plan_actual_values(composite_plan, mappings, defaults, values grouped.set_mappings_from_strings(mappings) grouped.set_mapping_defaults(defaults) - apply_run_values(grouped, values) + rv = CompositePlanValueResolver(grouped, values) - actual = _get_nested_actual_values(grouped) + actual = _get_nested_actual_values(rv.apply()) + assert len(rv.missing_parameters) == 0 assert actual == expected @@ -496,7 +497,10 @@ def test_composite_plan_auto_links(composite_plan, mappings, defaults, links, ra grouped.set_mappings_from_strings(mappings) grouped.set_mapping_defaults(defaults) - apply_run_values(grouped) + rv = CompositePlanValueResolver(grouped, None) + grouped = rv.apply() + assert len(rv.missing_parameters) == 0 + graph = ExecutionGraph(grouped, virtual_links=True) assert bool(graph.virtual_links) == links