Skip to content

Commit

Permalink
feat(core): pass parameters as env vars to scripts (and renku.api)
Browse files Browse the repository at this point in the history
  • Loading branch information
Panaetius committed Jan 21, 2022
1 parent 5118774 commit b2a7183
Show file tree
Hide file tree
Showing 9 changed files with 89 additions and 7 deletions.
10 changes: 9 additions & 1 deletion docs/how-to-guides/implementing_a_provider.rst
Expand Up @@ -43,7 +43,9 @@ A simple example of a ``MyProvider`` workflow executor plugin:
The execution of the workflow(s) shall be defined in ``workflow_execute`` function, where

- ``dag`` is a Directed Acyclic Graph of :py:class:`Plans<renku.core.models.workflow.plan>` to be executed represented with a `networkx.DiGraph <https://networkx.org/documentation/stable/reference/classes/digraph.html>`_,
- ``dag`` is a Directed Acyclic Graph of :py:class:`Plans<renku.core.models.workflow.plan>`
to be executed represented with a
`networkx.DiGraph <https://networkx.org/documentation/stable/reference/classes/digraph.html>`_,
- ``basedir`` is the absolute path to the project,
- ``config`` dictionary contains the provider related optional configuration parameters.

Expand All @@ -52,3 +54,9 @@ should be the plugin object, i.e. ``self`` and the string is a unique identifier
provider plugin. This unique string will be the string that the user can provide to the
``--provider`` command line argument to select this plugin for executing the desired
workflows.

A provider HAS to set environment variables for a plans parameters, so they can be used by scripts.
These environment variables have to be prefixed with the value of the
``renku.core.plugins.provider.RENKU_ENV_PREFIX`` constant. So for a parameter with name
``my-param`` and the ``RENKU_ENV_PREFIX`` value of ``RENKU_ENV_``, the environment variable
should be called ``RENKU_ENV_my-param``.
14 changes: 13 additions & 1 deletion renku/api/models/run.py
Expand Up @@ -42,15 +42,18 @@
"""

from os import PathLike
import re
from os import PathLike, environ
from pathlib import Path

from renku.api.models.project import ensure_project_context
from renku.core import errors
from renku.core.management.workflow.plan_factory import (
add_indirect_parameter,
get_indirect_inputs_path,
get_indirect_outputs_path,
)
from renku.core.plugins.provider import RENKU_ENV_PREFIX


class _PathBase(PathLike):
Expand Down Expand Up @@ -97,6 +100,15 @@ def _get_indirect_list_path(project_path):
@ensure_project_context
def parameter(name, value, project):
"""Store parameter's name and value."""
if not re.match("[a-zA-Z0-9-_]+", name):
raise errors.ParameterError(
f"Name {name} contains illegal characters. Only characters, numbers, _ and - are allowed."
)
env_value = environ.get(f"{RENKU_ENV_PREFIX}{name}", None)

if env_value:
value = env_value

add_indirect_parameter(project.path, name=name, value=value)

return value
Expand Down
5 changes: 5 additions & 0 deletions renku/cli/workflow.py
Expand Up @@ -134,6 +134,11 @@
myotherworkflow:
language: en
In addition to being passed on the command line and being available to
``renku.api.*`` classes in Python scripts, parameters are also set as
environment variables when executing the command, in the form of
``RENKU_ENV_<parameter name>``.
Provider specific settings can be passed as file using the ``--config`` parameter.
.. cheatsheet::
Expand Down
14 changes: 14 additions & 0 deletions renku/core/management/workflow/converters/cwl.py
Expand Up @@ -33,6 +33,7 @@
from renku.core.models.workflow.parameter import DIRECTORY_MIME_TYPE, CommandInput, CommandParameter
from renku.core.models.workflow.plan import Plan
from renku.core.plugins import hookimpl
from renku.core.plugins.provider import RENKU_ENV_PREFIX


class CommandLineTool(cwlgen.CommandLineTool):
Expand Down Expand Up @@ -248,6 +249,7 @@ def _convert_step(
workdir_req = cwlgen.InitialWorkDirRequirement([])
jsrequirement = False

environment_variables = []
dirents = []

for output_ in workflow.outputs:
Expand All @@ -264,6 +266,10 @@ def _convert_step(
)
dirents.append(path)
jsrequirement = True

environment_variables.append(
cwlgen.EnvVarRequirement.EnvironmentDef(f"{RENKU_ENV_PREFIX}{output_.name}", output_.actual_value)
)
outp, arg = CWLExporter._convert_output(output_)
tool_object.outputs.append(outp)
if arg:
Expand All @@ -278,11 +284,17 @@ def _convert_step(
)
)

environment_variables.append(
cwlgen.EnvVarRequirement.EnvironmentDef(f"{RENKU_ENV_PREFIX}{input_.name}", input_.actual_value)
)
tool_object.inputs.append(tool_input)
if input_.mapped_to:
tool_object.stdin = "$(inputs.{}.path)".format(tool_input.id)
jsrequirement = True
for parameter in workflow.parameters:
environment_variables.append(
cwlgen.EnvVarRequirement.EnvironmentDef(f"{RENKU_ENV_PREFIX}{parameter.name}", parameter.actual_value)
)
tool_object.inputs.append(CWLExporter._convert_parameter(parameter))

workdir_req.listing.append(
Expand All @@ -303,6 +315,8 @@ def _convert_step(
tool_object.requirements.append(workdir_req)
if jsrequirement:
tool_object.requirements.append(cwlgen.InlineJavascriptRequirement())
if environment_variables:
tool_object.requirements.append(cwlgen.EnvVarRequirement(environment_variables))

if not filename:
filename = "{}.cwl".format(uuid4())
Expand Down
5 changes: 2 additions & 3 deletions renku/core/management/workflow/providers/cwltool.py
Expand Up @@ -77,7 +77,6 @@ def workflow_execute(self, dag: nx.DiGraph, basedir: Path, config: Dict[str, Any
# run cwl with cwltool
argv = sys.argv
sys.argv = ["cwltool"]

runtime_args = {"rm_tmpdir": False, "move_outputs": "leave", "preserve_entire_environment": True}
loading_args = {"relax_path_checks": True}
if config:
Expand Down Expand Up @@ -111,8 +110,8 @@ def workflow_execute(self, dag: nx.DiGraph, basedir: Path, config: Dict[str, Any

try:
outputs = process()
except cwltool.factory.WorkflowStatus:
raise WorkflowExecuteError()
except cwltool.factory.WorkflowStatus as e:
raise WorkflowExecuteError() from e

sys.argv = argv

Expand Down
9 changes: 9 additions & 0 deletions renku/core/management/workflow/providers/toil.py
Expand Up @@ -43,6 +43,7 @@
from renku.core.models.workflow.plan import Plan
from renku.core.models.workflow.provider import IWorkflowProvider
from renku.core.plugins import hookimpl
from renku.core.plugins.provider import RENKU_ENV_PREFIX


class AbstractToilJob(Job):
Expand All @@ -53,6 +54,7 @@ def __init__(self, workflow: Plan, *args, **kwargs):
self.workflow = workflow
self._input_files: Dict[str, FileID] = {}
self._parents_promise = []
self._environment = os.environ.copy()

@abstractmethod
def _execute(self, command_line: List[str], mapped_std: Dict[str, str]) -> int:
Expand Down Expand Up @@ -95,17 +97,23 @@ def _read_input(input: str, file_metadata):
)
_read_input(i.actual_value, file_metadata)

self._environment[f"{RENKU_ENV_PREFIX}{i.name}"] = i.actual_value

if i.mapped_to:
mapped_std[i.mapped_to.stream_type] = i.actual_value

for o in self.workflow.outputs:
self._environment[f"{RENKU_ENV_PREFIX}{o.name}"] = o.actual_value
output_path = Path(o.actual_value)
if len(output_path.parts) > 1:
output_path.parent.mkdir(parents=True, exist_ok=True)

if o.mapped_to:
mapped_std[o.mapped_to.stream_type] = o.actual_value

for p in self.workflow.parameters:
self._environment[f"{RENKU_ENV_PREFIX}{p.name}"] = p.actual_value

# construct cmd
cmd = []

Expand Down Expand Up @@ -147,6 +155,7 @@ def _execute(self, command_line: List[str], mapped_std: Dict[str, str]) -> int:
command_line,
cwd=os.getcwd(),
**{key: open(value, mode="r" if key == "stdin" else "w") for key, value in mapped_std.items()},
env=self._environment,
)


Expand Down
2 changes: 2 additions & 0 deletions renku/core/plugins/provider.py
Expand Up @@ -27,6 +27,8 @@
from renku.core import errors
from renku.core.models.workflow.provider import IWorkflowProvider

RENKU_ENV_PREFIX = "RENKU_ENV_"

hookspec = pluggy.HookspecMarker("renku")


Expand Down
4 changes: 2 additions & 2 deletions tests/api/test_run.py
Expand Up @@ -109,11 +109,11 @@ def test_parameters(client):
with Project():
p2 = Parameter("param-2", "42")

p3 = Parameter(" parameter 3 ", 42.42)
p3 = Parameter("parameter_3 ", 42.42)

assert (42, "42", 42.42) == (p1, p2, p3)

data = read_indirect_parameters(client.path)

assert {"parameter 1", "param-2", " parameter 3 "} == set(data.keys())
assert {"parameter 1", "param-2", "parameter_3 "} == set(data.keys())
assert {42, "42", 42.42} == set(data.values())
33 changes: 33 additions & 0 deletions tests/cli/test_workflow.py
Expand Up @@ -599,6 +599,39 @@ def _flatten_dict(obj, key_string=""):
assert Path(o).resolve().exists()


@pytest.mark.parametrize("provider", available_workflow_providers())
def test_workflow_execute_command_with_api_parameter_set(runner, run_shell, project, capsys, client, provider):
"""Test executing a workflow with --set for a renku.api.Parameter."""
script = client.path / "script.py"
output = client.path / "output"

with client.commit():
script.write_text(
"from renku.api import Parameter\n"
"import os\n"
"import json\n"
"with open('/tmp/env', 'w') as f:\n"
" f.write(json.dumps(os.environ.copy(), indent=2))\n"
'print(Parameter("test", "hello world"))\n'
)

result = run_shell(f"renku run --name run1 -- python {script} > {output}")

# Assert expected empty stdout.
assert b"" == result[0]
# Assert not allocated stderr.
assert result[1] is None

assert "hello world\n" == output.read_text()

result = run_shell(f"renku workflow execute -p {provider} --set test=goodbye run1")

# Assert not allocated stderr.
assert result[1] is None

assert "goodbye\n" == output.read_text()


def test_workflow_visualize_non_interactive(runner, project, client, workflow_graph):
"""Test renku workflow visualize in non-interactive mode."""

Expand Down

0 comments on commit b2a7183

Please sign in to comment.