Skip to content

Commit

Permalink
feat(core): add support for template variables for workflow parameters (
Browse files Browse the repository at this point in the history
#2704)

* add template variable support for workflow parameters
* wrap KeyError with renku ParameterError

Co-authored-by: Ralf Grubenmann <ralf.grubenmann@sdsc.ethz.ch>
  • Loading branch information
Viktor Gal and Panaetius committed Apr 21, 2022
1 parent 5e93aa5 commit 7e6e0da
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 17 deletions.
57 changes: 57 additions & 0 deletions renku/core/util/template_vars.py
@@ -0,0 +1,57 @@
# -*- coding: utf-8 -*-
#
# Copyright 2017-2022 - Swiss Data Science Center (SDSC)
# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
# Eidgenössische Technische Hochschule Zürich (ETHZ).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Template variable utility methods."""

import datetime
from string import Formatter
from typing import Any, Iterable, Mapping, Tuple, Union

from renku.core.errors import ParameterError
from renku.domain_model.workflow.parameter import CommandParameterBase


class TemplateVariableFormatter(Formatter):
"""Template variable formatter for `CommandParameterBase`."""

RESERVED_KEYS = ["iter_index"]

def __init__(self):
super(TemplateVariableFormatter, self).__init__()

def apply(self, param: str, parameters: Mapping[str, Any] = {}) -> str:
"""Renders the parameter template into its final value."""
try:
return super().vformat(param, args=[datetime.datetime.now()], kwargs=parameters)
except KeyError as e:
raise ParameterError(f"Could not resolve the variable {str(e)}")

def get_value(self, key, args, kwargs):
"""Ignore some special keys when formatting the variable."""
if key in self.RESERVED_KEYS:
return key
return super().get_value(key, args, kwargs)

@staticmethod
def to_map(parameters: Iterable[Union[CommandParameterBase, Tuple[str, str]]]) -> Mapping[str, str]:
"""Converts a list of `CommandParameterBase` into parameter name-value dictionary."""
return dict(
map(
lambda x: (x.name, x.actual_value) if isinstance(x, CommandParameterBase) else (x[1], str(x[0])),
parameters,
)
)
11 changes: 10 additions & 1 deletion renku/core/workflow/value_resolution.py
Expand Up @@ -22,6 +22,7 @@
from typing import Any, Dict, Optional, Set

from renku.core import errors
from renku.core.util.template_vars import TemplateVariableFormatter
from renku.domain_model.workflow.composite_plan import CompositePlan
from renku.domain_model.workflow.parameter import ParameterMapping
from renku.domain_model.workflow.plan import AbstractPlan, Plan
Expand All @@ -34,6 +35,7 @@ def __init__(self, plan: AbstractPlan, values: Optional[Dict[str, Any]]):
self._values = values
self.missing_parameters: Set[str] = set()
self._plan = plan
self._template_engine = TemplateVariableFormatter()

@abstractmethod
def apply(self) -> AbstractPlan:
Expand Down Expand Up @@ -77,11 +79,18 @@ def apply(self) -> AbstractPlan:
return self._plan

values_keys = set(self._values.keys())
for param in chain(self._plan.inputs, self._plan.outputs, self._plan.parameters):
for param in chain(self._plan.inputs, self._plan.parameters, self._plan.outputs):
if param.name in self._values:
param.actual_value = self._values[param.name]
values_keys.discard(param.name)

# NOTE: we need 2-pass the plan parameters as values can be overridden
# that should be reflected in the params_map
params_map = TemplateVariableFormatter.to_map(chain(self._plan.inputs, self._plan.parameters))
for param in chain(self._plan.inputs, self._plan.parameters, self._plan.outputs):
if isinstance(param.actual_value, str):
param.actual_value = self._template_engine.apply(param.actual_value, params_map)

self.missing_parameters = values_keys

return self._plan
Expand Down
2 changes: 1 addition & 1 deletion tests/cli/test_output_option.py
Expand Up @@ -224,8 +224,8 @@ def test_explicit_inputs_can_be_in_inputs(renku_cli, client, subdirectory):

exit_code, activity = renku_cli("run", "--input", foo, "--no-output", "ls", foo)

plan = activity.association.plan
assert 0 == exit_code
plan = activity.association.plan
assert 1 == len(plan.inputs)

assert "foo" == str(plan.inputs[0].default_value)
Expand Down
6 changes: 4 additions & 2 deletions tests/cli/test_status.py
Expand Up @@ -33,7 +33,8 @@ def test_status(runner, project, subdirectory):

write_and_commit_file(repo, source, "content")

assert 0 == runner.invoke(cli, ["run", "cp", source, output]).exit_code
result = runner.invoke(cli, ["run", "cp", source, output])
assert 0 == result.exit_code, format_result_exception(result)

result = runner.invoke(cli, ["status"])
assert 0 == result.exit_code, format_result_exception(result)
Expand Down Expand Up @@ -149,7 +150,8 @@ def test_status_with_path_all_generation(runner, project):

write_and_commit_file(repo, source, "content")

assert 0 == runner.invoke(cli, ["run", "--input", source, "touch", output1, output2]).exit_code
result = runner.invoke(cli, ["run", "--input", source, "touch", output1, output2])
assert 0 == result.exit_code, format_result_exception(result)

write_and_commit_file(repo, source, "new content")

Expand Down
3 changes: 2 additions & 1 deletion tests/cli/test_update.py
Expand Up @@ -329,7 +329,8 @@ def test_update_no_args(runner, project, no_lfs_warning, provider):

write_and_commit_file(repo, source, "content")

assert 0 == runner.invoke(cli, ["run", "cp", source, output]).exit_code
result = runner.invoke(cli, ["run", "cp", source, output])
assert 0 == result.exit_code, format_result_exception(result)

write_and_commit_file(repo, source, "changed content")

Expand Down
55 changes: 43 additions & 12 deletions tests/cli/test_workflow.py
Expand Up @@ -17,6 +17,7 @@
# limitations under the License.
"""Test ``workflow`` commands."""

import datetime
import itertools
import logging
import os
Expand All @@ -38,6 +39,17 @@
from tests.utils import format_result_exception, write_and_commit_file


def _execute(capsys, runner, args):
with capsys.disabled():
try:
cli.main(
args=args,
prog_name=runner.get_default_prog_name(cli),
)
except SystemExit as e:
assert e.code in {None, 0}


def test_workflow_list(runner, project, run_shell, client):
"""Test listing of workflows."""
# Run a shell command with pipe.
Expand Down Expand Up @@ -541,16 +553,6 @@ def test_workflow_execute_command(runner, run_shell, project, capsys, client, pr
result = runner.invoke(cli, cmd)
assert 0 == result.exit_code, format_result_exception(result)

def _execute(args):
with capsys.disabled():
try:
cli.main(
args=args,
prog_name=runner.get_default_prog_name(cli),
)
except SystemExit as e:
assert e.code in {None, 0}

def _flatten_dict(obj, key_string=""):
if type(obj) == dict:
key_string = key_string + "." if key_string else key_string
Expand All @@ -563,7 +565,7 @@ def _flatten_dict(obj, key_string=""):

if not parameters:
execute_cmd = ["workflow", "execute", "-p", provider, workflow_name]
_execute(execute_cmd)
_execute(capsys, runner, execute_cmd)
else:
database = Database.from_path(client.database_path)
plan = database["plans-by-name"][workflow_name]
Expand Down Expand Up @@ -600,7 +602,7 @@ def _flatten_dict(obj, key_string=""):

execute_cmd.append(workflow_name)

_execute(execute_cmd)
_execute(capsys, runner, execute_cmd)

# check whether parameters setting was effective
for o in outputs:
Expand Down Expand Up @@ -1135,3 +1137,32 @@ def test_workflow_execute_docker_toil_stderr(runner, client, run_shell):

assert 1 == result.exit_code, format_result_exception(result)
assert "Cannot run workflows that have stdin or stderr redirection with Docker" in result.output


@pytest.mark.parametrize("provider", available_workflow_providers())
@pytest.mark.parametrize(
"workflow, parameters, outputs",
[
(
"touch foo",
{"output-1": "{:%Y-%m-%d}"},
[datetime.datetime.now().strftime("%Y-%m-%d")],
)
],
)
def test_workflow_templated_params(runner, run_shell, client, capsys, workflow, parameters, provider, outputs):
workflow_name = "foobar"

# Run a shell command with pipe.
output = run_shell(f"renku run --name {workflow_name} {workflow}")
# Assert expected empty stdout.
assert b"" == output[0]
# Assert not allocated stderr.
assert output[1] is None

execute_cmd = ["workflow", "execute", "-p", provider, workflow_name]
[execute_cmd.extend(["--set", f"{k}={v}"]) for k, v in parameters.items()]
_execute(capsys, runner, execute_cmd)

for o in outputs:
assert Path(o).resolve().exists()

0 comments on commit 7e6e0da

Please sign in to comment.