From 6a56312832a3297fb3a0cc7b16ee538d33b9d52f Mon Sep 17 00:00:00 2001 From: Viktor Gal Date: Wed, 13 Oct 2021 13:46:11 +0200 Subject: [PATCH] fix(core): make parameters immutable (#2403) * fix #2392 * fix #2397 --- renku/core/commands/workflow.py | 47 +++++++------ .../m_0009__new_metadata_storage.py | 3 + .../core/management/workflow/plan_factory.py | 2 + renku/core/metadata/gateway/plan_gateway.py | 4 ++ renku/core/models/workflow/parameter.py | 68 ++++++++++++++++++- renku/core/models/workflow/plan.py | 13 +++- tests/cli/test_workflow.py | 6 +- 7 files changed, 113 insertions(+), 30 deletions(-) diff --git a/renku/core/commands/workflow.py b/renku/core/commands/workflow.py index 0e6da65708..f9be04bbd8 100644 --- a/renku/core/commands/workflow.py +++ b/renku/core/commands/workflow.py @@ -18,9 +18,9 @@ """Renku workflow commands.""" +import itertools import uuid from datetime import datetime -from itertools import chain from pathlib import Path from typing import Any, Dict, List, Optional @@ -223,7 +223,7 @@ def _compose_workflow( # If the user supplies their own mappings, those overrule the automatically added ones. for i, child_plan in plan_activities: - for param in chain(child_plan.inputs, child_plan.outputs, child_plan.parameters): + for param in itertools.chain(child_plan.inputs, child_plan.outputs, child_plan.parameters): try: mapping_name = f"{i}-{param.name}" plan.set_mappings_from_strings([f"{mapping_name}=@step{i}.{param.name}"]) @@ -302,28 +302,27 @@ def _edit_workflow( if isinstance(workflow, Plan): workflow.set_parameters_from_strings(set_params) - def _kv_extract(kv_string): - k, v = kv_string.split("=", maxsplit=1) - v = v.strip(' "') - return k, v - - for param_string in rename_params: - name, new_name = _kv_extract(param_string) - for param in workflow.inputs + workflow.outputs + workflow.parameters: - if param.name == name: - param.name = new_name - break - else: - raise errors.ParameterNotFoundError(parameter=name, workflow=workflow.name) - - for description_string in describe_params: - name, description = _kv_extract(description_string) - for param in workflow.inputs + workflow.outputs + workflow.parameters: - if param.name == name: - param.description = description - break - else: - raise errors.ParameterNotFoundError(parameter=name, workflow=workflow.name) + def _mod_params(workflow, changed_params, attr): + for param_string in changed_params: + name, new_value = param_string.split("=", maxsplit=1) + new_value = new_value.strip(' "') + + found = False + for collection in [workflow.inputs, workflow.outputs, workflow.parameters]: + for i, param in enumerate(collection): + if param.name == name: + new_param = param.derive(plan_id=workflow.id) + setattr(new_param, attr, new_value) + collection[i] = new_param + found = True + break + if found: + break + else: + raise errors.ParameterNotFoundError(parameter=name, workflow=workflow.name) + + _mod_params(workflow, rename_params, "name") + _mod_params(workflow, describe_params, "description") elif isinstance(workflow, CompositePlan) and len(map_params): workflow.set_mappings_from_strings(map_params) diff --git a/renku/core/management/migrations/m_0009__new_metadata_storage.py b/renku/core/management/migrations/m_0009__new_metadata_storage.py index ec487948e8..2230d4bf1f 100644 --- a/renku/core/management/migrations/m_0009__new_metadata_storage.py +++ b/renku/core/management/migrations/m_0009__new_metadata_storage.py @@ -240,6 +240,7 @@ def convert_argument(argument: old_schema.CommandArgument) -> CommandParameter: name=argument.name, position=argument.position, prefix=argument.prefix, + postfix=PurePosixPath(argument._id).name, ) def convert_input(input: old_schema.CommandInput) -> CommandInput: @@ -258,6 +259,7 @@ def convert_input(input: old_schema.CommandInput) -> CommandInput: name=input.name, position=input.position, prefix=input.prefix, + postfix=PurePosixPath(input._id).name, ) def convert_output(output: old_schema.CommandOutput) -> CommandOutput: @@ -277,6 +279,7 @@ def convert_output(output: old_schema.CommandOutput) -> CommandOutput: name=output.name, position=output.position, prefix=output.prefix, + postfix=PurePosixPath(output._id).name, ) plan = Plan( diff --git a/renku/core/management/workflow/plan_factory.py b/renku/core/management/workflow/plan_factory.py index d5112a76a7..8b5d23ce0a 100644 --- a/renku/core/management/workflow/plan_factory.py +++ b/renku/core/management/workflow/plan_factory.py @@ -380,6 +380,7 @@ def add_command_input( position=position, mapped_to=mapped_stream, encoding_format=encoding_format, + postfix=postfix, ) ) @@ -415,6 +416,7 @@ def add_command_output( position=position, mapped_to=mapped_stream, encoding_format=encoding_format, + postfix=postfix, ) ) diff --git a/renku/core/metadata/gateway/plan_gateway.py b/renku/core/metadata/gateway/plan_gateway.py index 7096272864..8c3935e6c5 100644 --- a/renku/core/metadata/gateway/plan_gateway.py +++ b/renku/core/metadata/gateway/plan_gateway.py @@ -53,4 +53,8 @@ def add(self, plan: AbstractPlan) -> None: """Add a plan to the database.""" database = self.database_dispatcher.current_database database["plans"].add(plan) + + if plan.derived_from: + derived_from = self.get_by_id(plan.derived_from) + database["plans-by-name"].pop(derived_from.name, None) database["plans-by-name"].add(plan) diff --git a/renku/core/models/workflow/parameter.py b/renku/core/models/workflow/parameter.py index bbb57d448f..d0cda4c984 100644 --- a/renku/core/models/workflow/parameter.py +++ b/renku/core/models/workflow/parameter.py @@ -18,6 +18,7 @@ """Classes to represent inputs/outputs/parameters in a Plan.""" import urllib +from abc import abstractmethod from pathlib import PurePosixPath from typing import Any, List, Optional from uuid import uuid4 @@ -25,7 +26,7 @@ from marshmallow import EXCLUDE from renku.core.errors import ParameterError -from renku.core.models.calamus import JsonLDSchema, Nested, fields, renku, schema +from renku.core.models.calamus import JsonLDSchema, Nested, fields, prov, renku, schema from renku.core.utils.urls import get_slug RANDOM_ID_LENGTH = 4 @@ -67,6 +68,8 @@ def __init__( name: str, position: Optional[int] = None, prefix: Optional[str] = None, + derived_from: str = None, + postfix: str = None, ): self.default_value: Any = default_value self.description: str = description @@ -75,6 +78,8 @@ def __init__( self.position: Optional[int] = position self.prefix: str = prefix self._v_actual_value_set = False + self.derived_from: str = derived_from + self.postfix: str = postfix if not self.name: self.name = self._get_default_name() @@ -136,6 +141,11 @@ def _generate_name(self, base) -> str: def _get_default_name(self) -> str: raise NotImplementedError + @abstractmethod + def derive(self, plan_id: str) -> "CommandParameterBase": + """Create a new command parameter from self.""" + raise NotImplementedError + class CommandParameter(CommandParameterBase): """An argument to a command that is neither input nor output.""" @@ -149,6 +159,8 @@ def __init__( name: str = None, position: Optional[int] = None, prefix: str = None, + derived_from: str = None, + postfix: str = None, ): super().__init__( default_value=default_value, @@ -157,6 +169,8 @@ def __init__( name=name, position=position, prefix=prefix, + derived_from=derived_from, + postfix=postfix, ) @staticmethod @@ -169,6 +183,19 @@ def generate_id(plan_id: str, position: Optional[int] = None, postfix: str = Non def _get_default_name(self) -> str: return self._generate_name(base="parameter") + def derive(self, plan_id: str) -> "CommandParameter": + """Create a new ``CommandParameter`` that is derived from self.""" + return CommandParameter( + default_value=self.default_value, + description=self.description, + id=CommandParameter.generate_id(plan_id=plan_id, position=self.position, postfix=self.postfix), + name=self.name, + position=self.position, + prefix=self.prefix, + derived_from=self.id, + postfix=self.postfix, + ) + class CommandInput(CommandParameterBase): """An input to a command.""" @@ -184,6 +211,8 @@ def __init__( position: Optional[int] = None, prefix: str = None, encoding_format: List[str] = None, + derived_from: str = None, + postfix: str = None, ): super().__init__( default_value=default_value, @@ -192,6 +221,8 @@ def __init__( name=name, position=position, prefix=prefix, + derived_from=derived_from, + postfix=postfix, ) self.mapped_to: MappedIOStream = mapped_to _validate_mime_type(encoding_format) @@ -209,6 +240,21 @@ def to_stream_representation(self) -> str: def _get_default_name(self) -> str: return self._generate_name(base="input") + def derive(self, plan_id: str) -> "CommandInput": + """Create a new ``CommandInput`` that is derived from self.""" + return CommandInput( + default_value=self.default_value, + description=self.description, + id=CommandInput.generate_id(plan_id=plan_id, position=self.position, postfix=self.postfix), + mapped_to=self.mapped_to, + name=self.name, + position=self.position, + prefix=self.prefix, + encoding_format=self.encoding_format, + derived_from=self.id, + postfix=self.postfix, + ) + class CommandOutput(CommandParameterBase): """An output from a command.""" @@ -225,6 +271,8 @@ def __init__( position: Optional[int] = None, prefix: str = None, encoding_format: List[str] = None, + derived_from: str = None, + postfix: str = None, ): super().__init__( default_value=default_value, @@ -233,6 +281,8 @@ def __init__( name=name, position=position, prefix=prefix, + derived_from=derived_from, + postfix=postfix, ) self.create_folder: bool = create_folder self.mapped_to: MappedIOStream = mapped_to @@ -254,6 +304,21 @@ def to_stream_representation(self) -> str: def _get_default_name(self) -> str: return self._generate_name(base="output") + def derive(self, plan_id: str) -> "CommandOutput": + """Create a new ``CommandOutput`` that is derived from self.""" + return CommandOutput( + default_value=self.default_value, + description=self.description, + id=CommandOutput.generate_id(plan_id=plan_id, position=self.position, postfix=self.postfix), + mapped_to=self.mapped_to, + name=self.name, + position=self.position, + prefix=self.prefix, + encoding_format=self.encoding_format, + derived_from=self.id, + postfix=self.postfix, + ) + class ParameterMapping(CommandParameterBase): """A mapping of child parameter(s) to a parent CompositePlan.""" @@ -359,6 +424,7 @@ class Meta: name = fields.String(schema.name, missing=None) position = fields.Integer(renku.position, missing=None) prefix = fields.String(renku.prefix, missing=None) + derived_from = fields.String(prov.wasDerivedFrom, missing=None) class CommandParameterSchema(CommandParameterBaseSchema): diff --git a/renku/core/models/workflow/plan.py b/renku/core/models/workflow/plan.py index 37e5c64efd..5f3be00bd3 100644 --- a/renku/core/models/workflow/plan.py +++ b/renku/core/models/workflow/plan.py @@ -274,9 +274,16 @@ def set_parameters_from_strings(self, params_strings: List[str]) -> None: """Set parameters by parsing parameters strings.""" for param_string in params_strings: name, value = param_string.split("=", maxsplit=1) - for param in self.inputs + self.outputs + self.parameters: - if param.name == name: - param.default_value = value + found = False + for collection in [self.inputs, self.outputs, self.parameters]: + for i, param in enumerate(collection): + if param.name == name: + new_param = param.derive(plan_id=self.id) + new_param.default_value = value + collection[i] = new_param + found = True + break + if found: break else: self.parameters.append( diff --git a/tests/cli/test_workflow.py b/tests/cli/test_workflow.py index f870e74a1e..99ac642251 100644 --- a/tests/cli/test_workflow.py +++ b/tests/cli/test_workflow.py @@ -310,13 +310,15 @@ def _get_plan_id(output): result = runner.invoke(cli, ["run", "--name", workflow_name, "touch", "data.txt"]) assert 0 == result.exit_code, format_result_exception(result) + database = Database.from_path(client.database_path) + test_plan = database["plans-by-name"][workflow_name] + cmd = ["workflow", "edit", workflow_name, "--name", "first"] result = runner.invoke(cli, cmd) assert 0 == result.exit_code, format_result_exception(result) + workflow_name = "first" database = Database.from_path(client.database_path) - - test_plan = database["plans-by-name"][workflow_name] first_plan = database["plans-by-name"]["first"] assert first_plan