Skip to content

Commit

Permalink
fix(core): fix workflow graph generation and CompositePlan view (#2436)
Browse files Browse the repository at this point in the history
  • Loading branch information
Panaetius committed Nov 15, 2021
1 parent 3a40e83 commit 4bb0f08
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 6 deletions.
18 changes: 15 additions & 3 deletions renku/core/commands/view_model/composite_plan.py
Expand Up @@ -21,6 +21,7 @@

from renku.core.models.workflow.composite_plan import CompositePlan
from renku.core.models.workflow.parameter import ParameterLink, ParameterMapping
from renku.core.models.workflow.plan import AbstractPlan


class ParameterMappingViewModel:
Expand Down Expand Up @@ -51,9 +52,20 @@ def __init__(self, source: str, sinks: List[str]):
self.sinks = sinks

@classmethod
def from_link(cls, link: ParameterLink):
def from_link(cls, link: ParameterLink, plan: AbstractPlan):
"""Create view model from ``ParameterLink``."""
return cls(source=link.source.name, sinks=[s.name for s in link.sinks])
source_path = plan.get_parameter_path(link.source)
source_path.append(link.source)
source_path = ".".join(p.name for p in source_path[1:])

sinks = []

for sink in link.sinks:
sink_path = plan.get_parameter_path(sink)
sink_path.append(sink)
sink_path = ".".join(p.name for p in sink_path[1:])
sinks.append(sink_path)
return cls(source=source_path, sinks=sinks)


class StepViewModel:
Expand Down Expand Up @@ -92,6 +104,6 @@ def from_composite_plan(cls, plan: CompositePlan):
name=plan.name,
description=plan.description,
mappings=[ParameterMappingViewModel.from_mapping(mapping) for mapping in plan.mappings],
links=[ParameterLinkViewModel.from_link(link) for link in plan.links],
links=[ParameterLinkViewModel.from_link(link, plan) for link in plan.links],
steps=[StepViewModel(id=s.id, name=s.name) for s in plan.plans],
)
10 changes: 7 additions & 3 deletions renku/core/management/workflow/activity.py
Expand Up @@ -155,17 +155,21 @@ def connect_nodes_by_execution_order():

def create_order_among_activities(activities: Set[Activity], path):
for a, b in itertools.combinations(activities, 2):
if networkx.has_path(graph, a, b) or networkx.has_path(graph, b, a):
if (networkx.has_path(graph, a, b) and path in overridden_activities[a]) or (
networkx.has_path(graph, b, a) and path in overridden_activities[b]
):
continue

# NOTE: More recent activity should be executed after the other one
# NOTE: This won't introduce a cycle in the graph because there is no other path between the two nodes
comparison = a.compare_to(b)
if comparison < 0:
graph.add_edge(a, b)
if not networkx.has_path(graph, a, b):
graph.add_edge(a, b)
overridden_activities[a].add(path)
elif comparison > 0:
graph.add_edge(b, a)
if not networkx.has_path(graph, b, a):
graph.add_edge(b, a)
overridden_activities[b].add(path)
else:
raise ValueError(f"Cannot create an order between activities {a.id} and {b.id}")
Expand Down
12 changes: 12 additions & 0 deletions renku/core/models/workflow/composite_plan.py
Expand Up @@ -211,6 +211,18 @@ def find_parameter(self, parameter: CommandParameterBase):

return False

def get_parameter_path(self, parameter: CommandParameterBase):
"""Get the path to a parameter inside this plan."""
if parameter in self.mappings:
return [self]

for plan in self.plans:
path = plan.get_parameter_path(parameter)
if path:
return [self] + path

return None

def get_parameter_by_id(self, parameter_id: str) -> CommandParameterBase:
"""Get a parameter on this plan by id."""
mapping = next((p for p in self.mappings if parameter_id == p.id), None)
Expand Down
7 changes: 7 additions & 0 deletions renku/core/models/workflow/plan.py
Expand Up @@ -213,6 +213,13 @@ def find_parameter(self, parameter: CommandParameterBase) -> bool:
"""Find if a parameter exists on this plan."""
return any(parameter.id == p.id for p in self.inputs + self.outputs + self.parameters)

def get_parameter_path(self, parameter: CommandParameterBase):
"""Get the path to a parameter inside this plan."""
if self.find_parameter(parameter):
return [self]

return None

def get_parameter_by_id(self, parameter_id: str) -> CommandParameterBase:
"""Get a parameter on this plan by id."""
return next((p for p in self.inputs + self.outputs + self.parameters if parameter_id == p.id), None)
Expand Down

0 comments on commit 4bb0f08

Please sign in to comment.