Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(workflow): failure when re-/executing a subset of workflow file steps #3263

Merged
merged 3 commits into from Jan 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions renku/core/workflow/model/workflow_file.py
Expand Up @@ -275,6 +275,7 @@ def to_command_parameter(self, plan_id: str, index: int) -> CommandParameter:
description=self.description,
id=CommandParameter.generate_id(plan_id=plan_id, name=self.name, postfix=postfix),
name=self.name,
name_set_by_user=self.name_set_by_user,
position=self.position,
postfix=postfix,
prefix=self.prefix,
Expand Down Expand Up @@ -312,6 +313,7 @@ def to_command_input(self, plan_id: str, index: int) -> CommandInput:
id=CommandInput.generate_id(plan_id=plan_id, name=self.name, postfix=postfix),
mapped_to=MappedIOStream.from_str(self.mapped_to) if self.mapped_to else None,
name=self.name,
name_set_by_user=self.name_set_by_user,
position=self.position,
postfix=postfix,
prefix=self.prefix,
Expand All @@ -336,6 +338,7 @@ def to_command_output(self, plan_id: str, index: int) -> CommandOutput:
id=CommandOutput.generate_id(plan_id=plan_id, name=self.name, postfix=postfix),
mapped_to=MappedIOStream.from_str(self.mapped_to) if self.mapped_to else None,
name=self.name,
name_set_by_user=self.name_set_by_user,
position=self.position,
postfix=postfix,
prefix=self.prefix,
Expand Down
6 changes: 4 additions & 2 deletions renku/core/workflow/workflow_file.py
Expand Up @@ -108,6 +108,8 @@ def filter_steps(workflow: WorkflowFileCompositePlan, steps: List[str]) -> List[
return [s for s in workflow.plans if s.unqualified_name in selected_steps]


def get_all_workflow_file_inputs_and_outputs(workflow_file: WorkflowFile) -> List[str]:
def get_workflow_file_inputs_and_outputs(workflow_file: WorkflowFile, steps: List[str]) -> List[str]:
"""Return a list of all inputs and outputs that must be committed."""
return [io.path for step in workflow_file.steps for io in itertools.chain(step.inputs, step.outputs) if io.persist]
selected_steps = [s for s in workflow_file.steps if s.name in steps] if steps else workflow_file.steps

return [io.path for step in selected_steps for io in itertools.chain(step.inputs, step.outputs) if io.persist]
30 changes: 27 additions & 3 deletions renku/domain_model/workflow/parameter.py
Expand Up @@ -68,13 +68,18 @@ def generate_id(stream_type: str) -> str:
class CommandParameterBase:
"""Represents a parameter for a Plan."""

# NOTE: This attribute is only used by workflow-file machinery to check if plans are the same or not. We need it,
# because names are generated randomly when not set by users which make the comparison return incorrect result.
name_set_by_user: bool = False

def __init__(
self,
*,
default_value: Any,
description: Optional[str],
id: str,
name: Optional[str],
name_set_by_user: bool = False,
position: Optional[int] = None,
prefix: Optional[str] = None,
derived_from: Optional[str] = None,
Expand All @@ -90,6 +95,7 @@ def __init__(
self.derived_from: Optional[str] = derived_from
# NOTE: ``postfix`` is used only to generate a nicer ``id`` for a parameter. Its value isn't used anywhere else.
self.postfix: Optional[str] = postfix
self.name_set_by_user: bool = name_set_by_user

if name is not None:
self.name: str = name
Expand Down Expand Up @@ -132,10 +138,13 @@ def role(self) -> str:
@staticmethod
def _get_equality_attributes() -> List[str]:
"""Return a list of attributes values that determine if instances are equal."""
return ["name", "description", "default_value", "prefix", "position"]
# NOTE: We treat name differently
return ["description", "default_value", "prefix", "position"]

def is_equal_to(self, other) -> bool:
"""Return if attributes that cause a change in the parameter, are the same."""
if self.name_set_by_user != other.name_set_by_user or (self.name_set_by_user and self.name != other.name):
return False
return all(getattr(self, a) == getattr(other, a) for a in self._get_equality_attributes())

def to_argv(self, quote_string: bool = True) -> List[Any]:
Expand Down Expand Up @@ -193,6 +202,7 @@ def __init__(
description: str = None,
id: str,
name: str = None,
name_set_by_user: bool = False,
position: Optional[int] = None,
prefix: str = None,
derived_from: str = None,
Expand All @@ -203,6 +213,7 @@ def __init__(
description=description,
id=id,
name=name,
name_set_by_user=name_set_by_user,
position=position,
prefix=prefix,
derived_from=derived_from,
Expand Down Expand Up @@ -245,6 +256,7 @@ def __init__(
id: str,
mapped_to: Optional[MappedIOStream] = None,
name: Optional[str] = None,
name_set_by_user: bool = False,
position: Optional[int] = None,
prefix: Optional[str] = None,
encoding_format: Optional[List[str]] = None,
Expand All @@ -258,6 +270,7 @@ def __init__(
description=description,
id=id,
name=name,
name_set_by_user=name_set_by_user,
position=position,
prefix=prefix,
derived_from=derived_from,
Expand Down Expand Up @@ -323,6 +336,7 @@ def __init__(
id: str,
mapped_to: Optional[MappedIOStream] = None,
name: Optional[str] = None,
name_set_by_user: bool = False,
position: Optional[int] = None,
prefix: Optional[str] = None,
encoding_format: Optional[List[str]] = None,
Expand All @@ -336,6 +350,7 @@ def __init__(
description=description,
id=id,
name=name,
name_set_by_user=name_set_by_user,
position=position,
prefix=prefix,
derived_from=derived_from,
Expand Down Expand Up @@ -381,7 +396,8 @@ def is_equal_to(self, other) -> bool:
@staticmethod
def _get_equality_attributes() -> List[str]:
"""Return a list of attributes values that determine if instances are equal."""
return CommandParameterBase._get_equality_attributes() + ["encoding_format", "create_folder"]
# NOTE: Don't include ``create_folder`` in comparison since its value is state-dependent
return CommandParameterBase._get_equality_attributes() + ["encoding_format"]

def derive(self, plan_id: str) -> "CommandOutput":
"""Create a new ``CommandOutput`` that is derived from self."""
Expand All @@ -400,10 +416,18 @@ def __init__(
description: Optional[str] = None,
id: str,
name: Optional[str] = None,
name_set_by_user: bool = False,
mapped_parameters: List[CommandParameterBase],
**kwargs,
):
super().__init__(default_value=default_value, description=description, id=id, name=name, **kwargs)
super().__init__(
default_value=default_value,
description=description,
id=id,
name=name,
name_set_by_user=name_set_by_user,
**kwargs,
)

self.mapped_parameters: List[CommandParameterBase] = mapped_parameters

Expand Down
9 changes: 6 additions & 3 deletions renku/ui/cli/run.py
Expand Up @@ -481,7 +481,7 @@
from renku.core import errors
from renku.core.plugin.workflow_file_parser import read_workflow_file
from renku.core.util.os import is_subpath
from renku.core.workflow.workflow_file import get_all_workflow_file_inputs_and_outputs
from renku.core.workflow.workflow_file import get_workflow_file_inputs_and_outputs
from renku.domain_model.project_context import project_context
from renku.ui.cli.utils.callback import ClickCallback
from renku.ui.cli.utils.plugins import available_workflow_providers
Expand Down Expand Up @@ -594,6 +594,7 @@ def is_workflow_file() -> bool:
communicator.warn("All flags other than '--file', '--verbose', '--dry-run', and 'no-commit' are ignored")

path = command_line[0]
steps = command_line[1:]
no_commit = no_commit or dry_run

# NOTE: Read the workflow file to get list of generated files that should be committed
Expand All @@ -603,7 +604,9 @@ def is_workflow_file() -> bool:
else:
workflow_file = read_workflow_file(path=path, parser="renku")
commit_only = (
[path] + get_all_workflow_file_inputs_and_outputs(workflow_file) + [str(project_context.metadata_path)]
[path]
+ get_workflow_file_inputs_and_outputs(workflow_file=workflow_file, steps=steps)
+ [str(project_context.metadata_path)]
)

provider = provider or "local"
Expand All @@ -612,7 +615,7 @@ def is_workflow_file() -> bool:
run_workflow_file_command(no_commit=no_commit, commit_only=commit_only)
.with_communicator(communicator)
.build()
.execute(path=path, steps=command_line[1:], dry_run=dry_run, workflow_file=workflow_file, provider=provider)
.execute(path=path, steps=steps, dry_run=dry_run, workflow_file=workflow_file, provider=provider)
)

if dry_run:
Expand Down
46 changes: 42 additions & 4 deletions tests/cli/test_workflow_file.py
Expand Up @@ -71,12 +71,15 @@ def test_dry_run_workflow_file(runner, workflow_file_project):

def test_run_workflow_file_with_selected_steps(runner, workflow_file_project):
"""Test running a sub-set of steps of a workflow file."""
result = runner.invoke(cli, ["run", "--dry-run", workflow_file_project.workflow_file, "head", "line-count"])
result = runner.invoke(cli, ["run", workflow_file_project.workflow_file, "head", "tail"])
assert 0 == result.exit_code, format_result_exception(result)

assert "Will execute step 'head': head $n $models $colors > $temporary-result" in result.output
assert "Will execute step 'tail': tail $parameters intermediate > results/output.csv" not in result.output
assert "Will execute step 'line-count': wc -l $models-and-colors > $output" in result.output
assert "Executing step 'workflow-file.head':" in result.output
assert "Executing step 'workflow-file.tail':" in result.output
assert "Executing step 'workflow-file.line-count':" not in result.output

# Third step's output isn't created
assert not (workflow_file_project.path / "results" / "output.csv.wc").exists()


def test_run_workflow_file_with_no_commit(runner, workflow_file_project):
Expand Down Expand Up @@ -357,6 +360,41 @@ def test_workflow_file_plan_versioning(runner, workflow_file_project, with_injec
assert line_count_3.derived_from is None


def test_workflow_file_plan_versioning_with_selected_steps(runner, workflow_file_project, with_injection):
"""Test plans are versioned correctly when executing subsets of steps."""
result = runner.invoke(cli, ["run", workflow_file_project.workflow_file, "head", "tail"])
assert 0 == result.exit_code, format_result_exception(result)
time.sleep(1)

with with_injection():
plan_gateway = PlanGateway()
root_plan_1 = plan_gateway.get_by_name("workflow-file")
head_1 = plan_gateway.get_by_name("workflow-file.head")
tail_1 = plan_gateway.get_by_name("workflow-file.tail")
line_count_1 = plan_gateway.get_by_name("workflow-file.line-count")

result = runner.invoke(cli, ["run", workflow_file_project.workflow_file])
assert 0 == result.exit_code, format_result_exception(result)

time.sleep(1)

with with_injection():
plan_gateway = PlanGateway()
root_plan_2 = plan_gateway.get_by_name("workflow-file")
head_2 = plan_gateway.get_by_name("workflow-file.head")
tail_2 = plan_gateway.get_by_name("workflow-file.tail")
line_count_2 = plan_gateway.get_by_name("workflow-file.line-count")

# Plan `line-count` wasn't executed in the first run
assert line_count_1 is None
assert line_count_2 is not None

# Everything else is the same
assert root_plan_2.id == root_plan_1.id
assert head_2.id == head_1.id
assert tail_2.id == tail_1.id


def test_duplicate_workflow_file_plan_name(runner, workflow_file_project):
"""Test workflow file execution fails if a plan with the same name exists."""
workflow_file_project.repository.add(all=True)
Expand Down