Skip to content

Commit

Permalink
fix(core): make parameters immutable (#2403)
Browse files Browse the repository at this point in the history
* fix #2392
* fix #2397
  • Loading branch information
vigsterkr committed Oct 13, 2021
1 parent 3a2c35d commit 6a56312
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 30 deletions.
47 changes: 23 additions & 24 deletions renku/core/commands/workflow.py
Expand Up @@ -18,9 +18,9 @@
"""Renku workflow commands."""


import itertools
import uuid
from datetime import datetime
from itertools import chain
from pathlib import Path
from typing import Any, Dict, List, Optional

Expand Down Expand Up @@ -223,7 +223,7 @@ def _compose_workflow(
# If the user supplies their own mappings, those overrule the automatically added ones.

for i, child_plan in plan_activities:
for param in chain(child_plan.inputs, child_plan.outputs, child_plan.parameters):
for param in itertools.chain(child_plan.inputs, child_plan.outputs, child_plan.parameters):
try:
mapping_name = f"{i}-{param.name}"
plan.set_mappings_from_strings([f"{mapping_name}=@step{i}.{param.name}"])
Expand Down Expand Up @@ -302,28 +302,27 @@ def _edit_workflow(
if isinstance(workflow, Plan):
workflow.set_parameters_from_strings(set_params)

def _kv_extract(kv_string):
k, v = kv_string.split("=", maxsplit=1)
v = v.strip(' "')
return k, v

for param_string in rename_params:
name, new_name = _kv_extract(param_string)
for param in workflow.inputs + workflow.outputs + workflow.parameters:
if param.name == name:
param.name = new_name
break
else:
raise errors.ParameterNotFoundError(parameter=name, workflow=workflow.name)

for description_string in describe_params:
name, description = _kv_extract(description_string)
for param in workflow.inputs + workflow.outputs + workflow.parameters:
if param.name == name:
param.description = description
break
else:
raise errors.ParameterNotFoundError(parameter=name, workflow=workflow.name)
def _mod_params(workflow, changed_params, attr):
for param_string in changed_params:
name, new_value = param_string.split("=", maxsplit=1)
new_value = new_value.strip(' "')

found = False
for collection in [workflow.inputs, workflow.outputs, workflow.parameters]:
for i, param in enumerate(collection):
if param.name == name:
new_param = param.derive(plan_id=workflow.id)
setattr(new_param, attr, new_value)
collection[i] = new_param
found = True
break
if found:
break
else:
raise errors.ParameterNotFoundError(parameter=name, workflow=workflow.name)

_mod_params(workflow, rename_params, "name")
_mod_params(workflow, describe_params, "description")
elif isinstance(workflow, CompositePlan) and len(map_params):
workflow.set_mappings_from_strings(map_params)

Expand Down
Expand Up @@ -240,6 +240,7 @@ def convert_argument(argument: old_schema.CommandArgument) -> CommandParameter:
name=argument.name,
position=argument.position,
prefix=argument.prefix,
postfix=PurePosixPath(argument._id).name,
)

def convert_input(input: old_schema.CommandInput) -> CommandInput:
Expand All @@ -258,6 +259,7 @@ def convert_input(input: old_schema.CommandInput) -> CommandInput:
name=input.name,
position=input.position,
prefix=input.prefix,
postfix=PurePosixPath(input._id).name,
)

def convert_output(output: old_schema.CommandOutput) -> CommandOutput:
Expand All @@ -277,6 +279,7 @@ def convert_output(output: old_schema.CommandOutput) -> CommandOutput:
name=output.name,
position=output.position,
prefix=output.prefix,
postfix=PurePosixPath(output._id).name,
)

plan = Plan(
Expand Down
2 changes: 2 additions & 0 deletions renku/core/management/workflow/plan_factory.py
Expand Up @@ -380,6 +380,7 @@ def add_command_input(
position=position,
mapped_to=mapped_stream,
encoding_format=encoding_format,
postfix=postfix,
)
)

Expand Down Expand Up @@ -415,6 +416,7 @@ def add_command_output(
position=position,
mapped_to=mapped_stream,
encoding_format=encoding_format,
postfix=postfix,
)
)

Expand Down
4 changes: 4 additions & 0 deletions renku/core/metadata/gateway/plan_gateway.py
Expand Up @@ -53,4 +53,8 @@ def add(self, plan: AbstractPlan) -> None:
"""Add a plan to the database."""
database = self.database_dispatcher.current_database
database["plans"].add(plan)

if plan.derived_from:
derived_from = self.get_by_id(plan.derived_from)
database["plans-by-name"].pop(derived_from.name, None)
database["plans-by-name"].add(plan)
68 changes: 67 additions & 1 deletion renku/core/models/workflow/parameter.py
Expand Up @@ -18,14 +18,15 @@
"""Classes to represent inputs/outputs/parameters in a Plan."""

import urllib
from abc import abstractmethod
from pathlib import PurePosixPath
from typing import Any, List, Optional
from uuid import uuid4

from marshmallow import EXCLUDE

from renku.core.errors import ParameterError
from renku.core.models.calamus import JsonLDSchema, Nested, fields, renku, schema
from renku.core.models.calamus import JsonLDSchema, Nested, fields, prov, renku, schema
from renku.core.utils.urls import get_slug

RANDOM_ID_LENGTH = 4
Expand Down Expand Up @@ -67,6 +68,8 @@ def __init__(
name: str,
position: Optional[int] = None,
prefix: Optional[str] = None,
derived_from: str = None,
postfix: str = None,
):
self.default_value: Any = default_value
self.description: str = description
Expand All @@ -75,6 +78,8 @@ def __init__(
self.position: Optional[int] = position
self.prefix: str = prefix
self._v_actual_value_set = False
self.derived_from: str = derived_from
self.postfix: str = postfix

if not self.name:
self.name = self._get_default_name()
Expand Down Expand Up @@ -136,6 +141,11 @@ def _generate_name(self, base) -> str:
def _get_default_name(self) -> str:
raise NotImplementedError

@abstractmethod
def derive(self, plan_id: str) -> "CommandParameterBase":
"""Create a new command parameter from self."""
raise NotImplementedError


class CommandParameter(CommandParameterBase):
"""An argument to a command that is neither input nor output."""
Expand All @@ -149,6 +159,8 @@ def __init__(
name: str = None,
position: Optional[int] = None,
prefix: str = None,
derived_from: str = None,
postfix: str = None,
):
super().__init__(
default_value=default_value,
Expand All @@ -157,6 +169,8 @@ def __init__(
name=name,
position=position,
prefix=prefix,
derived_from=derived_from,
postfix=postfix,
)

@staticmethod
Expand All @@ -169,6 +183,19 @@ def generate_id(plan_id: str, position: Optional[int] = None, postfix: str = Non
def _get_default_name(self) -> str:
return self._generate_name(base="parameter")

def derive(self, plan_id: str) -> "CommandParameter":
"""Create a new ``CommandParameter`` that is derived from self."""
return CommandParameter(
default_value=self.default_value,
description=self.description,
id=CommandParameter.generate_id(plan_id=plan_id, position=self.position, postfix=self.postfix),
name=self.name,
position=self.position,
prefix=self.prefix,
derived_from=self.id,
postfix=self.postfix,
)


class CommandInput(CommandParameterBase):
"""An input to a command."""
Expand All @@ -184,6 +211,8 @@ def __init__(
position: Optional[int] = None,
prefix: str = None,
encoding_format: List[str] = None,
derived_from: str = None,
postfix: str = None,
):
super().__init__(
default_value=default_value,
Expand All @@ -192,6 +221,8 @@ def __init__(
name=name,
position=position,
prefix=prefix,
derived_from=derived_from,
postfix=postfix,
)
self.mapped_to: MappedIOStream = mapped_to
_validate_mime_type(encoding_format)
Expand All @@ -209,6 +240,21 @@ def to_stream_representation(self) -> str:
def _get_default_name(self) -> str:
return self._generate_name(base="input")

def derive(self, plan_id: str) -> "CommandInput":
"""Create a new ``CommandInput`` that is derived from self."""
return CommandInput(
default_value=self.default_value,
description=self.description,
id=CommandInput.generate_id(plan_id=plan_id, position=self.position, postfix=self.postfix),
mapped_to=self.mapped_to,
name=self.name,
position=self.position,
prefix=self.prefix,
encoding_format=self.encoding_format,
derived_from=self.id,
postfix=self.postfix,
)


class CommandOutput(CommandParameterBase):
"""An output from a command."""
Expand All @@ -225,6 +271,8 @@ def __init__(
position: Optional[int] = None,
prefix: str = None,
encoding_format: List[str] = None,
derived_from: str = None,
postfix: str = None,
):
super().__init__(
default_value=default_value,
Expand All @@ -233,6 +281,8 @@ def __init__(
name=name,
position=position,
prefix=prefix,
derived_from=derived_from,
postfix=postfix,
)
self.create_folder: bool = create_folder
self.mapped_to: MappedIOStream = mapped_to
Expand All @@ -254,6 +304,21 @@ def to_stream_representation(self) -> str:
def _get_default_name(self) -> str:
return self._generate_name(base="output")

def derive(self, plan_id: str) -> "CommandOutput":
"""Create a new ``CommandOutput`` that is derived from self."""
return CommandOutput(
default_value=self.default_value,
description=self.description,
id=CommandOutput.generate_id(plan_id=plan_id, position=self.position, postfix=self.postfix),
mapped_to=self.mapped_to,
name=self.name,
position=self.position,
prefix=self.prefix,
encoding_format=self.encoding_format,
derived_from=self.id,
postfix=self.postfix,
)


class ParameterMapping(CommandParameterBase):
"""A mapping of child parameter(s) to a parent CompositePlan."""
Expand Down Expand Up @@ -359,6 +424,7 @@ class Meta:
name = fields.String(schema.name, missing=None)
position = fields.Integer(renku.position, missing=None)
prefix = fields.String(renku.prefix, missing=None)
derived_from = fields.String(prov.wasDerivedFrom, missing=None)


class CommandParameterSchema(CommandParameterBaseSchema):
Expand Down
13 changes: 10 additions & 3 deletions renku/core/models/workflow/plan.py
Expand Up @@ -274,9 +274,16 @@ def set_parameters_from_strings(self, params_strings: List[str]) -> None:
"""Set parameters by parsing parameters strings."""
for param_string in params_strings:
name, value = param_string.split("=", maxsplit=1)
for param in self.inputs + self.outputs + self.parameters:
if param.name == name:
param.default_value = value
found = False
for collection in [self.inputs, self.outputs, self.parameters]:
for i, param in enumerate(collection):
if param.name == name:
new_param = param.derive(plan_id=self.id)
new_param.default_value = value
collection[i] = new_param
found = True
break
if found:
break
else:
self.parameters.append(
Expand Down
6 changes: 4 additions & 2 deletions tests/cli/test_workflow.py
Expand Up @@ -310,13 +310,15 @@ def _get_plan_id(output):
result = runner.invoke(cli, ["run", "--name", workflow_name, "touch", "data.txt"])
assert 0 == result.exit_code, format_result_exception(result)

database = Database.from_path(client.database_path)
test_plan = database["plans-by-name"][workflow_name]

cmd = ["workflow", "edit", workflow_name, "--name", "first"]
result = runner.invoke(cli, cmd)
assert 0 == result.exit_code, format_result_exception(result)

workflow_name = "first"
database = Database.from_path(client.database_path)

test_plan = database["plans-by-name"][workflow_name]
first_plan = database["plans-by-name"]["first"]

assert first_plan
Expand Down

0 comments on commit 6a56312

Please sign in to comment.