Skip to content

Commit

Permalink
Add RunValues class for value resolution
Browse files Browse the repository at this point in the history
this class does a values resolution for a given AbstractPlan using user
supplied values as well as holds information regarding none effective
settings.
  • Loading branch information
Viktor Gal committed Aug 30, 2021
1 parent 35cdf5c commit 0b63e15
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 59 deletions.
22 changes: 18 additions & 4 deletions renku/core/commands/workflow.py
Expand Up @@ -32,7 +32,7 @@
from renku.core.management.interface.plan_gateway import IPlanGateway
from renku.core.management.interface.project_gateway import IProjectGateway
from renku.core.management.workflow.concrete_execution_graph import ExecutionGraph
from renku.core.management.workflow.value_resolution import apply_run_values
from renku.core.management.workflow.value_resolution import RunValues
from renku.core.models.workflow.composite_plan import CompositePlan
from renku.core.models.workflow.plan import AbstractPlan, Plan
from renku.core.utils import communication
Expand Down Expand Up @@ -191,7 +191,8 @@ def _group_workflow(

if link_all:
# NOTE: propagate values to for linking to use
apply_run_values(plan)
rv = RunValues(plan, None)
plan = rv.apply()

graph = ExecutionGraph(plan, virtual_links=True)

Expand Down Expand Up @@ -285,7 +286,13 @@ def _export_workflow(

if values:
values = _safe_read_yaml(values)
workflow = apply_run_values(workflow, values)
rv = RunValues(workflow, values)
workflow = rv.apply()
if rv.missing_parameters:
communication.warn(
f'Could not resolve the following parameters in "{workflow.name}" workflow: '
f'{",".join(rv.missing_parameters)}'
)

from renku.core.plugins.workflow import workflow_converter

Expand Down Expand Up @@ -331,7 +338,14 @@ def _execute_workflow(
override_params[name] = value

if override_params:
workflow = apply_run_values(workflow, override_params)
rv = RunValues(workflow, override_params)
workflow = rv.apply()

if rv.missing_parameters:
communication.warn(
f'Could not resolve the following parameters in "{workflow.name}" workflow: '
f'{",".join(rv.missing_parameters)}'
)

if config:
config = _safe_read_yaml(config)
Expand Down
114 changes: 63 additions & 51 deletions renku/core/management/workflow/value_resolution.py
Expand Up @@ -17,16 +17,18 @@
# limitations under the License.
"""Resolution of Worklow execution values precedence."""

from typing import Any, Dict, Union
from itertools import chain
from typing import Any, Dict, Set

from renku.core import errors
from renku.core.models.workflow.composite_plan import CompositePlan
from renku.core.models.workflow.parameter import ParameterMapping
from renku.core.models.workflow.plan import Plan
from renku.core.models.workflow.plan import AbstractPlan, Plan


def apply_run_values(workflow: Union[CompositePlan, Plan], values: Dict[str, Any] = None) -> None:
"""Applies values and default_values to a potentially nested workflow.
class RunValues:
"""Value resolution class for ``AbstractPlan``.
Applies values and default_values to a potentially nested workflow.
Order of precedence is as follows (from lowest to highest):
- Default value on a parameter
Expand All @@ -36,70 +38,80 @@ def apply_run_values(workflow: Union[CompositePlan, Plan], values: Dict[str, Any
- Value propagated to a parameter from the source of a ParameterLink
"""

if isinstance(workflow, Plan):
return apply_single_run_values(workflow, values)

return apply_composite_run_values(workflow, values)

def __init__(self, plan: AbstractPlan, values: Dict[str, Any]):
self._values = values
self.missing_parameters: Set[str] = {}
self._plan = plan

def apply_single_run_values(workflow: Plan, values: Dict[str, Any] = None) -> None:
"""Applies values and default_values to a workflow."""
if not values:
return workflow
def apply(self) -> AbstractPlan:
"""Applies values and default_values to a potentially nested workflow.
for param in workflow.inputs + workflow.outputs + workflow.parameters:
if param.name in values:
param.actual_value = values[param.name]
:returns: The ``AbstractPlan`` with the user provided values set.
"""
self._apply_single_run_values() if isinstance(self._plan, Plan) else self._apply_composite_run_values()
return self._plan

return workflow
def _apply_single_run_values(self) -> None:
"""Applies values and default_values to a workflow."""
if not self._values:
return

values_keys = set(self._values.keys())
for param in chain(self._plan.inputs, self._plan.outputs, self._plan.parameters):
if param.name in self._values:
param.actual_value = self._values[param.name]
values_keys.discard(param.name)

def apply_composite_run_values(workflow: CompositePlan, values: Dict[str, Any] = None) -> None:
"""Applies values and default_values to a nested workflow."""
self.missing_parameters = values_keys

if values:
if "parameters" in values:
# NOTE: Set mapping parameter values
apply_parameters_values(workflow, values["parameters"])
def _apply_composite_run_values(self) -> None:
"""Applies values and default_values to a nested workflow."""

if "steps" in values:
for name, step in values["steps"].items():
child_workflow = next((w for w in workflow.plans if w.name == name), None)
if not child_workflow:
raise errors.ChildWorkflowNotFound(name, workflow.name)
if self._values:
if "parameters" in self._values:
# NOTE: Set mapping parameter values
self._apply_parameters_values()

apply_run_values(child_workflow, step)
if "steps" in self._values:
for name, step in self._values["steps"].items():
child_workflow = next((w for w in self._plan.plans if w.name == name), None)
if not child_workflow:
self.missing_parameters.add(f"steps.{name}")

# apply defaults
for mapping in workflow.mappings:
apply_parameter_defaults(mapping)
rv = RunValues(child_workflow, step)
_ = rv.apply()
self.missing_parameters.update(rv.missing_parameters)

apply_parameter_links(workflow)
self.missing_parameters.update(set(self._values.keys()) - set(["parameters", "steps"]))

# apply defaults
for mapping in self._plan.mappings:
self._apply_parameter_defaults(mapping)

def apply_parameter_defaults(mapping: ParameterMapping) -> None:
"""Apply default values to a mapping and contained params if they're not set already."""
apply_parameter_links(self._plan)

if not mapping.actual_value_set and mapping.default_value:
mapping.actual_value = mapping.default_value
def _apply_parameter_defaults(self, mapping: ParameterMapping) -> None:
"""Apply default values to a mapping and contained params if they're not set already."""

for mapped_to in mapping.mapped_parameters:
if isinstance(mapped_to, ParameterMapping):
apply_parameter_defaults(mapped_to)
else:
if not mapped_to.actual_value_set:
mapped_to.actual_value = mapping.default_value
if not mapping.actual_value_set and mapping.default_value:
mapping.actual_value = mapping.default_value

for mapped_to in mapping.mapped_parameters:
if isinstance(mapped_to, ParameterMapping):
self._apply_parameter_defaults(mapped_to)
else:
if not mapped_to.actual_value_set:
mapped_to.actual_value = mapping.default_value

def apply_parameters_values(workflow: CompositePlan, values: Dict[str, str]) -> None:
"""Apply values to mappings of a CompositePlan."""
for k, v in values.items():
mapping = next((m for m in workflow.mappings if m.name == k), None)
def _apply_parameters_values(self) -> None:
"""Apply values to mappings of a CompositePlan."""
for k, v in self._values["parameters"].items():
mapping = next((m for m in self._plan.mappings if m.name == k), None)

if not mapping:
raise errors.ParameterNotFoundError(k, workflow.name)
if not mapping:
self.missing_parameters.add(k)

mapping.actual_value = v
mapping.actual_value = v


def apply_parameter_links(workflow: CompositePlan) -> None:
Expand Down
12 changes: 8 additions & 4 deletions tests/core/commands/test_workflow.py
Expand Up @@ -24,7 +24,7 @@

from renku.core import errors
from renku.core.management.workflow.concrete_execution_graph import ExecutionGraph
from renku.core.management.workflow.value_resolution import apply_run_values
from renku.core.management.workflow.value_resolution import RunValues
from renku.core.models.workflow.composite_plan import CompositePlan


Expand Down Expand Up @@ -409,9 +409,10 @@ def test_composite_plan_actual_values(composite_plan, mappings, defaults, values

grouped.set_mappings_from_strings(mappings)
grouped.set_mapping_defaults(defaults)
apply_run_values(grouped, values)
rv = RunValues(grouped, values)
assert len(rv.missing_parameters) == 0

actual = _get_nested_actual_values(grouped)
actual = _get_nested_actual_values(rv.apply())

assert actual == expected

Expand Down Expand Up @@ -496,7 +497,10 @@ def test_composite_plan_auto_links(composite_plan, mappings, defaults, links, ra
grouped.set_mappings_from_strings(mappings)
grouped.set_mapping_defaults(defaults)

apply_run_values(grouped)
rv = RunValues(grouped, None)
grouped = rv.apply()
assert len(rv.missing_parameters) == 0

graph = ExecutionGraph(grouped, virtual_links=True)

assert bool(graph.virtual_links) == links
Expand Down

0 comments on commit 0b63e15

Please sign in to comment.